diff options
| author | repliqa <sarzilhossain@proton.me> | 2025-07-23 14:06:15 +0600 |
|---|---|---|
| committer | repliqa <sarzilhossain@proton.me> | 2025-07-23 14:06:15 +0600 |
| commit | 69acb7a82a68eeb439e55b994281056df52c81b1 (patch) | |
| tree | 7c6a53694e11511a3014470c213255a503f9c95e /library/xray.py | |
Diffstat (limited to 'library/xray.py')
| -rw-r--r-- | library/xray.py | 104 |
1 files changed, 104 insertions, 0 deletions
diff --git a/library/xray.py b/library/xray.py new file mode 100644 index 00000000..ff2d0357 --- /dev/null +++ b/library/xray.py @@ -0,0 +1,104 @@ +#!/usr/local/bin/python3 + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +from ansible.module_utils.basic import AnsibleModule +import json, shlex, os +from datetime import datetime + +XRAY_CONFIG_PATH = "/var/reactance/xray/etc/config.json" +VISION_PUBKEY_FILE = "/var/reactance/xray/public_key" + +def exec_shell(cmd, module): + rc, stdout, stderr = module.run_command(cmd, environ_update={'TERM': 'dumb'}) + if rc != 0: + module.fail_json(stderr) + return stdout.rstrip() +def xray_gen_password(protocol, module): + login_method = {'vmess': 'id', 'vless': 'id', 'trojan': 'password'}[protocol] + return login_method, exec_shell({'trojan': 'openssl rand -hex 32', 'vless': '/var/reactance/xray/bin/xray uuid', 'vmess': '/var/reactance/xray/bin/xray uuid'}[protocol], module) + +def xray_get_users(protocol): + with open(XRAY_CONFIG_PATH, "r") as f: + xray_config_dict = json.loads(f.read()) + inbounds = xray_config_dict["inbounds"] + protos_users = {} + for inbound in inbounds: + if inbound['protocol'] == protocol: + protos_users[inbound['protocol']] = [j['email'] for j in inbound['settings']['clients']] + return protos_users, xray_config_dict + +def xray_user_control(update_password, protocol, address, service_port, public_key, module): + previous_users, xray_config_dict = xray_get_users(protocol) + user_pass_list = [] + all_users_dict = {} + new_users_dict = {} + + # search through all inbound protocools + for i, inbound in enumerate(xray_config_dict['inbounds']): + if inbound['protocol'] == protocol: + previous_users_dict = inbound['settings']['clients'] + previous_users_list = [i['email'] for i in previous_users_dict] + selected_users = set(update_password.keys()) + + # keep old passwords + for user in previous_users_dict: + if user['email'] in selected_users and not update_password[user['email']]: + user_pass_list.append(user) + all_users_dict[user['email']] = user[{'vmess': 'id', 'vless': 'id', 'trojan': 'password'}[protocol]] + + # generate new passwords + for user in selected_users: + if user not in all_users_dict.keys(): + login_method, xray_password = xray_gen_password(protocol, module) + new_user = { 'email': user, login_method: xray_password } + xray_url = f"{protocol}://{ xray_password }@{ address }:{ service_port }?security=reality&sni=behindthename.com&fp=chrome&pbk={ public_key }" + all_users_dict[user] = {protocol: xray_password} + if protocol in ["vless", "vmess"]: + new_user["flow"] = "xtls-rprx-vision" + xray_url += "&flow=xtls-rprx-vision" + xray_url += f"#{protocol}_{user}" + user_pass_list.append(new_user) + new_users_dict[user] = {protocol: xray_url} + + xray_config_dict['inbounds'][i]['settings']['clients'] = user_pass_list + + with open(XRAY_CONFIG_PATH, "w") as f: + f.write(json.dumps(xray_config_dict, indent=1)) + + return new_users_dict + +def run_module(): + module = AnsibleModule( + argument_spec=dict( + users = dict(type='list', required=True), + protocol = dict(type='str', required=True), + address = dict(type='str', required=True), + service_port = dict(type='int', required=True), + public_key = dict(type='str', required=True) + ), + supports_check_mode=True + ) + + users = module.params["users"] + protocol = module.params["protocol"] + address = module.params["address"] + service_port = module.params["service_port"] + public_key = module.params["public_key"] + update_password = {} + + for user in users: + if 'regen' in user.keys() and user['regen']: + update_password[user['user']] = True + else: + update_password[user['user']] = False + + new_users_dict = xray_user_control(update_password, protocol, address, service_port, public_key, module) + module.exit_json(changed=True, msg=new_users_dict) + +def main(): + run_module() + +if __name__ == "__main__": + main() |
