summaryrefslogtreecommitdiff
path: root/roles/base/templates/user_expiration_control.py.j2
diff options
context:
space:
mode:
authorrepliqa <sarzilhossain@proton.me>2025-07-23 14:06:15 +0600
committerrepliqa <sarzilhossain@proton.me>2025-07-23 14:06:15 +0600
commit69acb7a82a68eeb439e55b994281056df52c81b1 (patch)
tree7c6a53694e11511a3014470c213255a503f9c95e /roles/base/templates/user_expiration_control.py.j2
v0.0.1alphaHEADmain
Diffstat (limited to 'roles/base/templates/user_expiration_control.py.j2')
-rw-r--r--roles/base/templates/user_expiration_control.py.j299
1 files changed, 99 insertions, 0 deletions
diff --git a/roles/base/templates/user_expiration_control.py.j2 b/roles/base/templates/user_expiration_control.py.j2
new file mode 100644
index 00000000..36551617
--- /dev/null
+++ b/roles/base/templates/user_expiration_control.py.j2
@@ -0,0 +1,99 @@
+#!/usr/local/bin/python3
+
+import json, os, subprocess, shutil
+from datetime import datetime
+
+EXPIRE_USER_JSON_PATH = "/var/reactance/.user_expiration.json"
+EXPIRE_WEB_JSON_PATH = "/var/reactance/.web_expiration.json"
+OCSERV_CONFIG_PATH = "/var/reactance/ocserv/etc/ocserv.passwd"
+HYSTERIA_CONFIG_FILE = "/var/reactance/hysteria/etc/config.json"
+XRAY_CONFIG_PATH = "/var/reactance/xray/etc/config.json"
+SSH_ROOT = "/var/reactance/sshvpn/.ssh"
+AUTHORIZED_KEYS = os.path.join(SSH_ROOT, "authorized_keys")
+
+def ocserv_get_users():
+ ocserv_config_dict = {}
+ if os.path.isfile(OCSERV_CONFIG_PATH):
+ with open(OCSERV_CONFIG_PATH, "r") as f:
+ ocserv_content = f.read()
+ ocserv_config_dict = dict(map(lambda x: x.split(':*:'), list(filter(lambda x: x != '', ocserv_content.split("\n")))))
+ return ocserv_config_dict
+
+def xray_get_users():
+ with open(XRAY_CONFIG_PATH, "r") as f:
+ xray_config_dict = json.loads(f.read())
+ return xray_config_dict
+
+def hysteria_get_users():
+ with open(HYSTERIA_CONFIG_FILE, "r") as f:
+ hysteria_config_dict = json.loads(f.read())
+ return hysteria_config_dict
+
+def sshvpn_get_users():
+ previous_users = [".".join(i.split('.')[:-1]) for i in os.listdir(SSH_ROOT) if i.endswith(".pub")]
+ return previous_users
+
+def ocserv_user_purge(users_to_remove):
+ for user in users_to_remove:
+ subprocess.run(f"ocpasswd -d {user} -c {OCSERV_CONFIG_PATH}", shell=True)
+
+def xray_user_purge(users_to_remove):
+ xray_config_dict = xray_get_users()
+ for i, inbound in enumerate(xray_config_dict['inbounds']):
+ previous_users_list = inbound['settings']['clients']
+ new_users_list = previous_users_list.copy()
+ for user in previous_users_list:
+ if user['email'] in users_to_remove:
+ new_users_list.remove(user)
+ xray_config_dict['inbounds'][i]['settings']['clients'] = new_users_list
+ with open(XRAY_CONFIG_PATH, "w") as f:
+ f.write(json.dumps(xray_config_dict, indent=1))
+
+def hysteria_user_purge(users_to_remove):
+ hysteria_config_dict = hysteria_get_users()
+ previous_users_dict = hysteria_config_dict["auth"]["userpass"]
+ new_users_dict = {}
+ for user in previous_users_dict.keys():
+ if user not in users_to_remove:
+ new_users_dict[user] = previous_users_dict[user]
+ hysteria_config_dict["auth"]["userpass"] = new_users_dict
+ with open(HYSTERIA_CONFIG_FILE, "w") as f:
+ f.write(json.dumps(hysteria_config_dict, indent=1))
+
+def sshvpn_user_purge(users_to_remove):
+ previous_users_list = sshvpn_get_users()
+ for user in previous_users_list:
+ if user in users_to_remove:
+ os.remove(f"{SSH_ROOT}/{user}.pub")
+ os.remove(f"{SSH_ROOT}/{user}")
+
+ # Overwrite existing authorized_key file
+ users_pubkey_files = [os.path.join(SSH_ROOT, i) for i in os.listdir(SSH_ROOT) if i.endswith(".pub")]
+ with open(AUTHORIZED_KEYS, "w") as f:
+ for pubkey_file in users_pubkey_files:
+ with open(pubkey_file, "r") as pkey:
+ f.write(pkey.read())
+
+def main():
+ current_unix_time = datetime.now().timestamp()
+
+ with open(EXPIRE_USER_JSON_PATH, "r") as f:
+ expire_user_dict = json.loads(f.read())
+ for exp in expire_user_dict.keys():
+ if float(exp) <= current_unix_time:
+ users = expire_user_dict[exp]
+
+ xray_user_purge(users)
+ sshvpn_user_purge(users)
+ ocserv_user_purge(users)
+ hysteria_user_purge(users)
+ with open(EXPIRE_USER_JSON_PATH, "w") as f:
+ f.write(json.dumps(expire_user_dict, indent=1))
+
+ with open(EXPIRE_WEB_JSON_PATH, "r") as f:
+ expire_web_dict = json.loads(f.read())
+ for exp in expire_web_dict.keys():
+ shutil.rmtree(f"/var/www/reactance/{exp}")
+
+if __name__ == "__main__":
+ main()