1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
#!/usr/local/bin/python3
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from ansible.module_utils.basic import AnsibleModule
import json, shlex, os
from datetime import datetime
XRAY_CONFIG_PATH = "/var/reactance/xray/etc/config.json"
VISION_PUBKEY_FILE = "/var/reactance/xray/public_key"
def exec_shell(cmd, module):
rc, stdout, stderr = module.run_command(cmd, environ_update={'TERM': 'dumb'})
if rc != 0:
module.fail_json(stderr)
return stdout.rstrip()
def xray_gen_password(protocol, module):
login_method = {'vmess': 'id', 'vless': 'id', 'trojan': 'password'}[protocol]
return login_method, exec_shell({'trojan': 'openssl rand -hex 32', 'vless': '/var/reactance/xray/bin/xray uuid', 'vmess': '/var/reactance/xray/bin/xray uuid'}[protocol], module)
def xray_get_users(protocol):
with open(XRAY_CONFIG_PATH, "r") as f:
xray_config_dict = json.loads(f.read())
inbounds = xray_config_dict["inbounds"]
protos_users = {}
for inbound in inbounds:
if inbound['protocol'] == protocol:
protos_users[inbound['protocol']] = [j['email'] for j in inbound['settings']['clients']]
return protos_users, xray_config_dict
def xray_user_control(update_password, protocol, address, service_port, public_key, module):
previous_users, xray_config_dict = xray_get_users(protocol)
user_pass_list = []
all_users_dict = {}
new_users_dict = {}
# search through all inbound protocools
for i, inbound in enumerate(xray_config_dict['inbounds']):
if inbound['protocol'] == protocol:
previous_users_dict = inbound['settings']['clients']
previous_users_list = [i['email'] for i in previous_users_dict]
selected_users = set(update_password.keys())
# keep old passwords
for user in previous_users_dict:
if user['email'] in selected_users and not update_password[user['email']]:
user_pass_list.append(user)
all_users_dict[user['email']] = user[{'vmess': 'id', 'vless': 'id', 'trojan': 'password'}[protocol]]
# generate new passwords
for user in selected_users:
if user not in all_users_dict.keys():
login_method, xray_password = xray_gen_password(protocol, module)
new_user = { 'email': user, login_method: xray_password }
xray_url = f"{protocol}://{ xray_password }@{ address }:{ service_port }?security=reality&sni=behindthename.com&fp=chrome&pbk={ public_key }"
all_users_dict[user] = {protocol: xray_password}
if protocol in ["vless", "vmess"]:
new_user["flow"] = "xtls-rprx-vision"
xray_url += "&flow=xtls-rprx-vision"
xray_url += f"#{protocol}_{user}"
user_pass_list.append(new_user)
new_users_dict[user] = {protocol: xray_url}
xray_config_dict['inbounds'][i]['settings']['clients'] = user_pass_list
with open(XRAY_CONFIG_PATH, "w") as f:
f.write(json.dumps(xray_config_dict, indent=1))
return new_users_dict
def run_module():
module = AnsibleModule(
argument_spec=dict(
users = dict(type='list', required=True),
protocol = dict(type='str', required=True),
address = dict(type='str', required=True),
service_port = dict(type='int', required=True),
public_key = dict(type='str', required=True)
),
supports_check_mode=True
)
users = module.params["users"]
protocol = module.params["protocol"]
address = module.params["address"]
service_port = module.params["service_port"]
public_key = module.params["public_key"]
update_password = {}
for user in users:
if 'regen' in user.keys() and user['regen']:
update_password[user['user']] = True
else:
update_password[user['user']] = False
new_users_dict = xray_user_control(update_password, protocol, address, service_port, public_key, module)
module.exit_json(changed=True, msg=new_users_dict)
def main():
run_module()
if __name__ == "__main__":
main()
|