1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
#!/usr/local/bin/python3
import json, os, subprocess, shutil
from datetime import datetime
EXPIRE_USER_JSON_PATH = "/var/reactance/.user_expiration.json"
EXPIRE_WEB_JSON_PATH = "/var/reactance/.web_expiration.json"
OCSERV_CONFIG_PATH = "/var/reactance/ocserv/etc/ocserv.passwd"
HYSTERIA_CONFIG_FILE = "/var/reactance/hysteria/etc/config.json"
XRAY_CONFIG_PATH = "/var/reactance/xray/etc/config.json"
SSH_ROOT = "/var/reactance/sshvpn/.ssh"
AUTHORIZED_KEYS = os.path.join(SSH_ROOT, "authorized_keys")
def ocserv_get_users():
ocserv_config_dict = {}
if os.path.isfile(OCSERV_CONFIG_PATH):
with open(OCSERV_CONFIG_PATH, "r") as f:
ocserv_content = f.read()
ocserv_config_dict = dict(map(lambda x: x.split(':*:'), list(filter(lambda x: x != '', ocserv_content.split("\n")))))
return ocserv_config_dict
def xray_get_users():
with open(XRAY_CONFIG_PATH, "r") as f:
xray_config_dict = json.loads(f.read())
return xray_config_dict
def hysteria_get_users():
with open(HYSTERIA_CONFIG_FILE, "r") as f:
hysteria_config_dict = json.loads(f.read())
return hysteria_config_dict
def sshvpn_get_users():
previous_users = [".".join(i.split('.')[:-1]) for i in os.listdir(SSH_ROOT) if i.endswith(".pub")]
return previous_users
def ocserv_user_purge(users_to_remove):
for user in users_to_remove:
subprocess.run(f"ocpasswd -d {user} -c {OCSERV_CONFIG_PATH}", shell=True)
def xray_user_purge(users_to_remove):
xray_config_dict = xray_get_users()
for i, inbound in enumerate(xray_config_dict['inbounds']):
previous_users_list = inbound['settings']['clients']
new_users_list = previous_users_list.copy()
for user in previous_users_list:
if user['email'] in users_to_remove:
new_users_list.remove(user)
xray_config_dict['inbounds'][i]['settings']['clients'] = new_users_list
with open(XRAY_CONFIG_PATH, "w") as f:
f.write(json.dumps(xray_config_dict, indent=1))
def hysteria_user_purge(users_to_remove):
hysteria_config_dict = hysteria_get_users()
previous_users_dict = hysteria_config_dict["auth"]["userpass"]
new_users_dict = {}
for user in previous_users_dict.keys():
if user not in users_to_remove:
new_users_dict[user] = previous_users_dict[user]
hysteria_config_dict["auth"]["userpass"] = new_users_dict
with open(HYSTERIA_CONFIG_FILE, "w") as f:
f.write(json.dumps(hysteria_config_dict, indent=1))
def sshvpn_user_purge(users_to_remove):
previous_users_list = sshvpn_get_users()
for user in previous_users_list:
if user in users_to_remove:
os.remove(f"{SSH_ROOT}/{user}.pub")
os.remove(f"{SSH_ROOT}/{user}")
# Overwrite existing authorized_key file
users_pubkey_files = [os.path.join(SSH_ROOT, i) for i in os.listdir(SSH_ROOT) if i.endswith(".pub")]
with open(AUTHORIZED_KEYS, "w") as f:
for pubkey_file in users_pubkey_files:
with open(pubkey_file, "r") as pkey:
f.write(pkey.read())
def main():
current_unix_time = datetime.now().timestamp()
with open(EXPIRE_USER_JSON_PATH, "r") as f:
expire_user_dict = json.loads(f.read())
for exp in expire_user_dict.keys():
if float(exp) <= current_unix_time:
users = expire_user_dict[exp]
xray_user_purge(users)
sshvpn_user_purge(users)
ocserv_user_purge(users)
hysteria_user_purge(users)
with open(EXPIRE_USER_JSON_PATH, "w") as f:
f.write(json.dumps(expire_user_dict, indent=1))
with open(EXPIRE_WEB_JSON_PATH, "r") as f:
expire_web_dict = json.loads(f.read())
for exp in expire_web_dict.keys():
shutil.rmtree(f"/var/www/reactance/{exp}")
if __name__ == "__main__":
main()
|