summaryrefslogtreecommitdiff
path: root/library/xray.py
diff options
context:
space:
mode:
authorrepliqa <sarzilhossain@proton.me>2025-07-23 14:06:15 +0600
committerrepliqa <sarzilhossain@proton.me>2025-07-23 14:06:15 +0600
commit69acb7a82a68eeb439e55b994281056df52c81b1 (patch)
tree7c6a53694e11511a3014470c213255a503f9c95e /library/xray.py
v0.0.1alphaHEADmain
Diffstat (limited to 'library/xray.py')
-rw-r--r--library/xray.py104
1 files changed, 104 insertions, 0 deletions
diff --git a/library/xray.py b/library/xray.py
new file mode 100644
index 00000000..ff2d0357
--- /dev/null
+++ b/library/xray.py
@@ -0,0 +1,104 @@
+#!/usr/local/bin/python3
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+from ansible.module_utils.basic import AnsibleModule
+import json, shlex, os
+from datetime import datetime
+
+XRAY_CONFIG_PATH = "/var/reactance/xray/etc/config.json"
+VISION_PUBKEY_FILE = "/var/reactance/xray/public_key"
+
+def exec_shell(cmd, module):
+ rc, stdout, stderr = module.run_command(cmd, environ_update={'TERM': 'dumb'})
+ if rc != 0:
+ module.fail_json(stderr)
+ return stdout.rstrip()
+def xray_gen_password(protocol, module):
+ login_method = {'vmess': 'id', 'vless': 'id', 'trojan': 'password'}[protocol]
+ return login_method, exec_shell({'trojan': 'openssl rand -hex 32', 'vless': '/var/reactance/xray/bin/xray uuid', 'vmess': '/var/reactance/xray/bin/xray uuid'}[protocol], module)
+
+def xray_get_users(protocol):
+ with open(XRAY_CONFIG_PATH, "r") as f:
+ xray_config_dict = json.loads(f.read())
+ inbounds = xray_config_dict["inbounds"]
+ protos_users = {}
+ for inbound in inbounds:
+ if inbound['protocol'] == protocol:
+ protos_users[inbound['protocol']] = [j['email'] for j in inbound['settings']['clients']]
+ return protos_users, xray_config_dict
+
+def xray_user_control(update_password, protocol, address, service_port, public_key, module):
+ previous_users, xray_config_dict = xray_get_users(protocol)
+ user_pass_list = []
+ all_users_dict = {}
+ new_users_dict = {}
+
+ # search through all inbound protocools
+ for i, inbound in enumerate(xray_config_dict['inbounds']):
+ if inbound['protocol'] == protocol:
+ previous_users_dict = inbound['settings']['clients']
+ previous_users_list = [i['email'] for i in previous_users_dict]
+ selected_users = set(update_password.keys())
+
+ # keep old passwords
+ for user in previous_users_dict:
+ if user['email'] in selected_users and not update_password[user['email']]:
+ user_pass_list.append(user)
+ all_users_dict[user['email']] = user[{'vmess': 'id', 'vless': 'id', 'trojan': 'password'}[protocol]]
+
+ # generate new passwords
+ for user in selected_users:
+ if user not in all_users_dict.keys():
+ login_method, xray_password = xray_gen_password(protocol, module)
+ new_user = { 'email': user, login_method: xray_password }
+ xray_url = f"{protocol}://{ xray_password }@{ address }:{ service_port }?security=reality&sni=behindthename.com&fp=chrome&pbk={ public_key }"
+ all_users_dict[user] = {protocol: xray_password}
+ if protocol in ["vless", "vmess"]:
+ new_user["flow"] = "xtls-rprx-vision"
+ xray_url += "&flow=xtls-rprx-vision"
+ xray_url += f"#{protocol}_{user}"
+ user_pass_list.append(new_user)
+ new_users_dict[user] = {protocol: xray_url}
+
+ xray_config_dict['inbounds'][i]['settings']['clients'] = user_pass_list
+
+ with open(XRAY_CONFIG_PATH, "w") as f:
+ f.write(json.dumps(xray_config_dict, indent=1))
+
+ return new_users_dict
+
+def run_module():
+ module = AnsibleModule(
+ argument_spec=dict(
+ users = dict(type='list', required=True),
+ protocol = dict(type='str', required=True),
+ address = dict(type='str', required=True),
+ service_port = dict(type='int', required=True),
+ public_key = dict(type='str', required=True)
+ ),
+ supports_check_mode=True
+ )
+
+ users = module.params["users"]
+ protocol = module.params["protocol"]
+ address = module.params["address"]
+ service_port = module.params["service_port"]
+ public_key = module.params["public_key"]
+ update_password = {}
+
+ for user in users:
+ if 'regen' in user.keys() and user['regen']:
+ update_password[user['user']] = True
+ else:
+ update_password[user['user']] = False
+
+ new_users_dict = xray_user_control(update_password, protocol, address, service_port, public_key, module)
+ module.exit_json(changed=True, msg=new_users_dict)
+
+def main():
+ run_module()
+
+if __name__ == "__main__":
+ main()