#!/usr/local/bin/python3 import json, os, subprocess, shutil from datetime import datetime EXPIRE_USER_JSON_PATH = "/var/reactance/.user_expiration.json" EXPIRE_WEB_JSON_PATH = "/var/reactance/.web_expiration.json" OCSERV_CONFIG_PATH = "/var/reactance/ocserv/etc/ocserv.passwd" HYSTERIA_CONFIG_FILE = "/var/reactance/hysteria/etc/config.json" XRAY_CONFIG_PATH = "/var/reactance/xray/etc/config.json" SSH_ROOT = "/var/reactance/sshvpn/.ssh" AUTHORIZED_KEYS = os.path.join(SSH_ROOT, "authorized_keys") def ocserv_get_users(): ocserv_config_dict = {} if os.path.isfile(OCSERV_CONFIG_PATH): with open(OCSERV_CONFIG_PATH, "r") as f: ocserv_content = f.read() ocserv_config_dict = dict(map(lambda x: x.split(':*:'), list(filter(lambda x: x != '', ocserv_content.split("\n"))))) return ocserv_config_dict def xray_get_users(): with open(XRAY_CONFIG_PATH, "r") as f: xray_config_dict = json.loads(f.read()) return xray_config_dict def hysteria_get_users(): with open(HYSTERIA_CONFIG_FILE, "r") as f: hysteria_config_dict = json.loads(f.read()) return hysteria_config_dict def sshvpn_get_users(): previous_users = [".".join(i.split('.')[:-1]) for i in os.listdir(SSH_ROOT) if i.endswith(".pub")] return previous_users def ocserv_user_purge(users_to_remove): for user in users_to_remove: subprocess.run(f"ocpasswd -d {user} -c {OCSERV_CONFIG_PATH}", shell=True) def xray_user_purge(users_to_remove): xray_config_dict = xray_get_users() for i, inbound in enumerate(xray_config_dict['inbounds']): previous_users_list = inbound['settings']['clients'] new_users_list = previous_users_list.copy() for user in previous_users_list: if user['email'] in users_to_remove: new_users_list.remove(user) xray_config_dict['inbounds'][i]['settings']['clients'] = new_users_list with open(XRAY_CONFIG_PATH, "w") as f: f.write(json.dumps(xray_config_dict, indent=1)) def hysteria_user_purge(users_to_remove): hysteria_config_dict = hysteria_get_users() previous_users_dict = hysteria_config_dict["auth"]["userpass"] new_users_dict = {} for user in previous_users_dict.keys(): if user not in users_to_remove: new_users_dict[user] = previous_users_dict[user] hysteria_config_dict["auth"]["userpass"] = new_users_dict with open(HYSTERIA_CONFIG_FILE, "w") as f: f.write(json.dumps(hysteria_config_dict, indent=1)) def sshvpn_user_purge(users_to_remove): previous_users_list = sshvpn_get_users() for user in previous_users_list: if user in users_to_remove: os.remove(f"{SSH_ROOT}/{user}.pub") os.remove(f"{SSH_ROOT}/{user}") # Overwrite existing authorized_key file users_pubkey_files = [os.path.join(SSH_ROOT, i) for i in os.listdir(SSH_ROOT) if i.endswith(".pub")] with open(AUTHORIZED_KEYS, "w") as f: for pubkey_file in users_pubkey_files: with open(pubkey_file, "r") as pkey: f.write(pkey.read()) def main(): current_unix_time = datetime.now().timestamp() with open(EXPIRE_USER_JSON_PATH, "r") as f: expire_user_dict = json.loads(f.read()) for exp in expire_user_dict.keys(): if float(exp) <= current_unix_time: users = expire_user_dict[exp] xray_user_purge(users) sshvpn_user_purge(users) ocserv_user_purge(users) hysteria_user_purge(users) with open(EXPIRE_USER_JSON_PATH, "w") as f: f.write(json.dumps(expire_user_dict, indent=1)) with open(EXPIRE_WEB_JSON_PATH, "r") as f: expire_web_dict = json.loads(f.read()) for exp in expire_web_dict.keys(): shutil.rmtree(f"/var/www/reactance/{exp}") if __name__ == "__main__": main()