#!/usr/bin/python3 ##################################### ## This is the console-only version ## It is only a TMW Vault Wrapper ## ## For lore, you need to use the app ##################################### ## (C) The Mana World Team, 2021 ## Published under the MIT License ##################################### import requests, json, traceback, subprocess, hmac, struct, time, os, getpass execute=subprocess.call serverlist = [] ##################################### #VAULT_HOST = "https://localhost:13370" #VAULT_CERT = "http://localhost/launcher/cert.pem" #SERVERLIST = "http://localhost/launcher" VAULT_HOST = "https://api.themanaworld.org:13370" VAULT_CERT = False #"https://tmw2.org/launcher/cert.pem" SERVERLIST = "https://updates.tmw2.org/launcher" # Vault SSL wrapper vault=requests.Session() if VAULT_CERT: vault.cert = "cert.pem" vault.verify = "cert.pem" ##################################### def stdout(message, bd=False): if bd: print("\033[1m%s\033[0m" % message) else: print(message) def sdelay(delta=0.02): time.sleep(delta) return # IF Then Else (IFTE) def ifte(ifs, then, elses): if (ifs): return then else: return elses # Sanitize a command (strip some flow control chars) # While it covers all control operators and most metacharacters, # it doesn't covers well the reserved words. # ...Of course, it relies on this client not being compromised. def san(cmd): return cmd.replace(";", "").replace("|", "").replace(">", "").replace("<", "").replace("&", "").replace("(", "").replace(")", "").replace("\n", "").replace("[[", "").replace("]]", "") # Returns number of seconds since UNIX EPOCH def now(): return int(time.time()) # Search for array[?][key]==search in an array of dicts # Returns the dictionary, or returns None def dl_search(array, key, search): try: r=(item for item in array if item[key] == search).next() except: r=None if r is None: stdout("dlsearch: r is None") return r # Search for array[?][key]==search in an array of dicts # Returns the index, or returns -1 def dl_search_idx(array, key, search): try: r=next((i for i, item in enumerate(array) if item[key] == search), None) except: traceback.print_exc() r=-1 return ifte(r is None, -1, r) def nullp(arg): return def validate_server(srv): name="Unknown Server"; ok=False try: name=srv["Name"] #stdout("Validating server \"%s\"..." % name) nullp("Host: %s" % srv["Host"]) nullp("Port: %04d" % srv["Port"]) nullp("Desc: %s" % srv["Desc"]) nullp("Link: %s" % srv["Link"]) nullp("News: %s" % srv["News"]) nullp("Back: %s" % srv["Back"]) nullp("UUID: %s" % srv["UUID"]) nullp("Help: %s" % srv["Help"]) nullp("Online List: %s" % srv["Online"]) nullp("Policy: %s" % srv["Policy"]) if not "Type" in srv.keys(): srv["Type"]="evol2" stdout("Server \"%s\" loaded." % name) ok=True except: traceback.print_exc() stdout("Validation for server \"%s\" FAILED!" % name) return ok def update_serverlist(hosti): global serverlist, host try: r=requests.get("%s/server_list.json" % hosti, timeout=10.0) if (r.status_code != 200): raise AssertionError("Mirror %s seems to be down!\nReturned error %03d\n" % (hosti.replace("https://", "").replace("http://", ""), r.status_code)) j=json.loads(r.text) ## If we reached here, then everything is likely fine ## We can now build the persistent data print("Fetching server list and assets...") slist=len(j) for server in j: if (validate_server(server)): serverlist.append(server) ## If we were truly successful, save this host as our mirror host = hosti return True except: traceback.print_exc() return False def launch_game(idx): ## Get/Set basic data HOST=serverlist[idx]["Host"] PORT=serverlist[idx]["Port"] CMD="" OPT="" PWD="" ## Get credentials auth = {"vaultId": vaultId, "token": vaultToken, "world": serverlist[idx]["UUID"]} try: r=vault.post(VAULT_HOST+"/world_pass", json=auth, timeout=15.0) ## We got the access credentials if r.status_code == 200: auth2=r.json() PWD=" -U %s -P %s" % (auth2["user"], auth2["pass"]) ## We were refused by Vault elif r.status_code == 403: stdout("Warning: Connection was refused (Vault logout?)") return -1 ## We are rate-limited, try again elif r.status_code == 429: stdout("Rate limited!") return -1 ## Internal error, maybe? else: stdout("Get World Auth - Returned error code %d" % r.status_code) return -1 except: traceback.print_exc() return -1 ## Handle server type CMD="manaplus" OPT="-s %s -y %s -p %s -S" % (HOST, serverlist[idx]["Type"], PORT) ## Execute the app app=execute(san("%s %s%s" % (CMD, OPT, PWD)), shell=True) # nosec return app ## Only meaningful on Linux os.environ["APPIMAGELAUNCHER_DISABLE"]="1" ################################################################################# ## Build the Vault interface (infinite loop) while VAULT_CERT: try: ###################################### # Fetch a new PEM Certificate stdout("Fetching SSL Certificate from %s" % VAULT_CERT) r = requests.get(VAULT_CERT) if (r.status_code != 200): raise Exception("\nFailed to fetch Vault SSL Certificate.\n\n Returned HTTP error %d." % r.status_code) # Save it with open("cert.pem", 'wb') as fd: for chunk in r.iter_content(chunk_size=128): fd.write(chunk) # Presumably all good, so do not continue and break the loop stdout("PEM Download OK") break except: traceback.print_exc() stdout("Automatically retry in 5 seconds...") time.sleep(5.0) ################################################################################# ## Construct session data update_serverlist(SERVERLIST) email = input("Vault Email: ") passw = getpass.getpass(prompt="Vault Password: ") totps = input("Vault 2FA Code: ") data = {"mail": email, "pass": passw, "totp": totps[:6] } r = vault.post(VAULT_HOST+"/user_auth", json=data) if (r.status_code != 200): print("Vault returned error %d\n\nPlease try again later." % r.status_code) exit(1) auth2 = r.json() vaultId = auth2["vaultId"] vaultToken = auth2["token"] print("Connected to vault!") try: while True: command=str(input("> ")) # No command inserted? Do nothing if command == "": continue # We have a command, prepare it for processing stdout("CONSOLE: %s" % command) cmd=command.split(' ')[0].lower() com=" ".join(command.split(' ')[1:]) # Parse the command # TODO: grant gems to an user if cmd in ["help", "h"]: print("help - Shows this help text") print("list - List servers") print("exit - Closes the program") print("connect - Connects to ") print("") print("You need ManaPlus installed for Evol2 worlds.") elif cmd in ["list", "servers"]: for w in serverlist: print("\033[1m%s\033[0m" % w["Name"]) print("Type: %s" % w["Type"]) print("Help: %s" % w["Help"]) elif cmd in ["exit", "quit", "close"]: break elif cmd in ["connect", "run", "world"]: target=dl_search_idx(serverlist, "Name", com) print("Target \"%s\" - %s" % (com, str(target))) try: idx=int(target) except: stdout("ERROR: Unknown world.") continue app=launch_game(idx) ## Handle MLP while app == 7: stdout("[CLIENT] Mirror Lake triggered.") r=vault.post(VAULT_HOST+"/getlake", json=auth, timeout=15.0) if r.status_code == 200: goto=r.json()["world"] stdout("MLP Target: %s" % str(goto)) if goto == "" or goto.lower() == "vault": stdout("Mirror Lake False Positive") break try: idx=int(dl_search_idx(serverlist, "UUID", goto)) app=launch_game(idx) except: traceback.print_exc() break else: stdout("ERROR: Unknown command. Try \"help\".") except: stdout("Abrupt error: Terminating!") traceback.print_exc()