summaryrefslogtreecommitdiff
path: root/library/xray.py
blob: ff2d0357f61baee3f0e81804c9633c5758aabeea (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/usr/local/bin/python3

from __future__ import absolute_import, division, print_function
__metaclass__ = type

from ansible.module_utils.basic import AnsibleModule
import json, shlex, os
from datetime import datetime

XRAY_CONFIG_PATH = "/var/reactance/xray/etc/config.json"
VISION_PUBKEY_FILE = "/var/reactance/xray/public_key"

def exec_shell(cmd, module):
    rc, stdout, stderr = module.run_command(cmd, environ_update={'TERM': 'dumb'})
    if rc != 0:
        module.fail_json(stderr)
    return stdout.rstrip()
def xray_gen_password(protocol, module):
    login_method = {'vmess': 'id', 'vless': 'id', 'trojan': 'password'}[protocol]
    return login_method, exec_shell({'trojan': 'openssl rand -hex 32', 'vless': '/var/reactance/xray/bin/xray uuid', 'vmess': '/var/reactance/xray/bin/xray uuid'}[protocol], module)

def xray_get_users(protocol):
    with open(XRAY_CONFIG_PATH, "r") as f:
        xray_config_dict = json.loads(f.read())
    inbounds = xray_config_dict["inbounds"]
    protos_users = {}
    for inbound in inbounds:
        if inbound['protocol'] == protocol:
            protos_users[inbound['protocol']] = [j['email'] for j in inbound['settings']['clients']]
    return protos_users, xray_config_dict

def xray_user_control(update_password, protocol, address, service_port, public_key, module):
    previous_users, xray_config_dict = xray_get_users(protocol)
    user_pass_list = []
    all_users_dict = {}
    new_users_dict = {}

    # search through all inbound protocools
    for i, inbound in enumerate(xray_config_dict['inbounds']):
        if inbound['protocol'] == protocol:
            previous_users_dict = inbound['settings']['clients']
            previous_users_list = [i['email'] for i in previous_users_dict]
            selected_users = set(update_password.keys())

            # keep old passwords
            for user in previous_users_dict:
                if user['email'] in selected_users and not update_password[user['email']]:
                    user_pass_list.append(user)
                    all_users_dict[user['email']] = user[{'vmess': 'id', 'vless': 'id', 'trojan': 'password'}[protocol]]

            # generate new passwords
            for user in selected_users:
                if user not in all_users_dict.keys():
                    login_method, xray_password = xray_gen_password(protocol, module)
                    new_user = { 'email': user, login_method: xray_password }
                    xray_url = f"{protocol}://{ xray_password }@{ address }:{ service_port }?security=reality&sni=behindthename.com&fp=chrome&pbk={ public_key }"
                    all_users_dict[user] = {protocol: xray_password}
                    if protocol in ["vless", "vmess"]:
                        new_user["flow"] = "xtls-rprx-vision"
                        xray_url += "&flow=xtls-rprx-vision"
                    xray_url += f"#{protocol}_{user}"
                    user_pass_list.append(new_user)
                    new_users_dict[user] = {protocol: xray_url}
		
            xray_config_dict['inbounds'][i]['settings']['clients'] = user_pass_list

    with open(XRAY_CONFIG_PATH, "w") as f:
        f.write(json.dumps(xray_config_dict, indent=1))

    return new_users_dict

def run_module():
    module = AnsibleModule(
        argument_spec=dict(
            users = dict(type='list', required=True),
            protocol = dict(type='str', required=True),
	        address = dict(type='str', required=True),
            service_port = dict(type='int', required=True),
	        public_key = dict(type='str', required=True)
        ),
        supports_check_mode=True
    )

    users = module.params["users"]
    protocol = module.params["protocol"]
    address = module.params["address"]
    service_port = module.params["service_port"]
    public_key = module.params["public_key"]
    update_password = {}

    for user in users:
        if 'regen' in user.keys() and user['regen']:
            update_password[user['user']] = True
        else:
            update_password[user['user']] = False

    new_users_dict = xray_user_control(update_password, protocol, address, service_port, public_key, module)
    module.exit_json(changed=True, msg=new_users_dict)

def main():
    run_module()

if __name__ == "__main__":
    main()