summaryrefslogtreecommitdiff
path: root/roles/base/templates/user_expiration_control.py.j2
blob: 36551617804c11c4109dc7235895fcec7a4c5d60 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#!/usr/local/bin/python3

import json, os, subprocess, shutil
from datetime import datetime

EXPIRE_USER_JSON_PATH = "/var/reactance/.user_expiration.json"
EXPIRE_WEB_JSON_PATH = "/var/reactance/.web_expiration.json"
OCSERV_CONFIG_PATH = "/var/reactance/ocserv/etc/ocserv.passwd"
HYSTERIA_CONFIG_FILE = "/var/reactance/hysteria/etc/config.json"
XRAY_CONFIG_PATH = "/var/reactance/xray/etc/config.json"
SSH_ROOT = "/var/reactance/sshvpn/.ssh"
AUTHORIZED_KEYS = os.path.join(SSH_ROOT, "authorized_keys")

def ocserv_get_users():
    ocserv_config_dict = {}
    if os.path.isfile(OCSERV_CONFIG_PATH):
        with open(OCSERV_CONFIG_PATH, "r") as f:
            ocserv_content = f.read()
            ocserv_config_dict = dict(map(lambda x: x.split(':*:'), list(filter(lambda x: x != '', ocserv_content.split("\n")))))
    return ocserv_config_dict

def xray_get_users():
    with open(XRAY_CONFIG_PATH, "r") as f:
        xray_config_dict = json.loads(f.read())
    return xray_config_dict

def hysteria_get_users():
    with open(HYSTERIA_CONFIG_FILE, "r") as f:
        hysteria_config_dict = json.loads(f.read())
    return hysteria_config_dict

def sshvpn_get_users():
    previous_users = [".".join(i.split('.')[:-1]) for i in os.listdir(SSH_ROOT) if i.endswith(".pub")]
    return previous_users

def ocserv_user_purge(users_to_remove):
    for user in users_to_remove:
        subprocess.run(f"ocpasswd -d {user} -c {OCSERV_CONFIG_PATH}", shell=True)
 
def xray_user_purge(users_to_remove):
    xray_config_dict = xray_get_users()
    for i, inbound in enumerate(xray_config_dict['inbounds']):
        previous_users_list = inbound['settings']['clients']
        new_users_list = previous_users_list.copy()
        for user in previous_users_list:
            if user['email'] in users_to_remove:
                new_users_list.remove(user)
        xray_config_dict['inbounds'][i]['settings']['clients'] = new_users_list
    with open(XRAY_CONFIG_PATH, "w") as f:
        f.write(json.dumps(xray_config_dict, indent=1))

def hysteria_user_purge(users_to_remove):
    hysteria_config_dict = hysteria_get_users()
    previous_users_dict = hysteria_config_dict["auth"]["userpass"]
    new_users_dict = {}
    for user in previous_users_dict.keys():
        if user not in users_to_remove:
            new_users_dict[user] = previous_users_dict[user]
    hysteria_config_dict["auth"]["userpass"] = new_users_dict
    with open(HYSTERIA_CONFIG_FILE, "w") as f:
        f.write(json.dumps(hysteria_config_dict, indent=1))

def sshvpn_user_purge(users_to_remove):
    previous_users_list = sshvpn_get_users()
    for user in previous_users_list:
        if user in users_to_remove:
            os.remove(f"{SSH_ROOT}/{user}.pub")
            os.remove(f"{SSH_ROOT}/{user}")

    # Overwrite existing authorized_key file
    users_pubkey_files = [os.path.join(SSH_ROOT, i) for i in os.listdir(SSH_ROOT) if i.endswith(".pub")]
    with open(AUTHORIZED_KEYS, "w") as f:
        for pubkey_file in users_pubkey_files:
            with open(pubkey_file, "r") as pkey:
                f.write(pkey.read())

def main():
    current_unix_time = datetime.now().timestamp()

    with open(EXPIRE_USER_JSON_PATH, "r") as f:
        expire_user_dict = json.loads(f.read())
    for exp in expire_user_dict.keys():
        if float(exp) <= current_unix_time:
            users = expire_user_dict[exp]

            xray_user_purge(users)
            sshvpn_user_purge(users)
            ocserv_user_purge(users)
            hysteria_user_purge(users)
    with open(EXPIRE_USER_JSON_PATH, "w") as f:
        f.write(json.dumps(expire_user_dict, indent=1))

    with open(EXPIRE_WEB_JSON_PATH, "r") as f:
        expire_web_dict = json.loads(f.read())
    for exp in expire_web_dict.keys():
        shutil.rmtree(f"/var/www/reactance/{exp}")

if __name__ == "__main__":
    main()