#!/usr/bin/python3
#####################################
## This is the console-only version
## It is only a TMW Vault Wrapper
##
## For lore, you need to use the app
#####################################
## (C) The Mana World Team, 2021
## Published under the MIT License
#####################################
import requests, json, traceback, subprocess, hmac, struct, time, os, getpass
execute=subprocess.call
serverlist = []
#####################################
#VAULT_HOST = "https://localhost:13370"
#VAULT_CERT = "http://localhost/launcher/cert.pem"
#SERVERLIST = "http://localhost/launcher"
VAULT_HOST = "https://api.themanaworld.org:13370"
VAULT_CERT = False #"https://tmw2.org/launcher/cert.pem"
SERVERLIST = "https://tmw2.org/launcher"
# Vault SSL wrapper
vault=requests.Session()
if VAULT_CERT:
vault.cert = "cert.pem"
vault.verify = "cert.pem"
#####################################
def stdout(message, bd=False):
if bd:
print("\033[1m%s\033[0m" % message)
else:
print(message)
def sdelay(delta=0.02):
time.sleep(delta)
return
# IF Then Else (IFTE)
def ifte(ifs, then, elses):
if (ifs):
return then
else:
return elses
# Sanitize a command (strip some flow control chars)
# While it covers all control operators and most metacharacters,
# it doesn't covers well the reserved words.
# ...Of course, it relies on this client not being compromised.
def san(cmd):
return cmd.replace(";", "").replace("|", "").replace(">", "").replace("<", "").replace("&", "").replace("(", "").replace(")", "").replace("\n", "").replace("[[", "").replace("]]", "")
# Returns number of seconds since UNIX EPOCH
def now():
return int(time.time())
# Search for array[?][key]==search in an array of dicts
# Returns the dictionary, or returns None
def dl_search(array, key, search):
try:
r=(item for item in array if item[key] == search).next()
except:
r=None
if r is None:
stdout("dlsearch: r is None")
return r
# Search for array[?][key]==search in an array of dicts
# Returns the index, or returns -1
def dl_search_idx(array, key, search):
try:
r=next((i for i, item in enumerate(array) if item[key] == search), None)
except:
traceback.print_exc()
r=-1
return ifte(r is None, -1, r)
def nullp(arg):
return
def validate_server(srv):
name="Unknown Server"; ok=False
try:
name=srv["Name"]
#stdout("Validating server \"%s\"..." % name)
nullp("Host: %s" % srv["Host"])
nullp("Port: %04d" % srv["Port"])
nullp("Desc: %s" % srv["Desc"])
nullp("Link: %s" % srv["Link"])
nullp("News: %s" % srv["News"])
nullp("Back: %s" % srv["Back"])
nullp("UUID: %s" % srv["UUID"])
nullp("Help: %s" % srv["Help"])
nullp("Online List: %s" % srv["Online"])
nullp("Policy: %s" % srv["Policy"])
if not "Type" in srv.keys():
srv["Type"]="evol2"
stdout("Server \"%s\" loaded." % name)
ok=True
except:
traceback.print_exc()
stdout("Validation for server \"%s\" FAILED!" % name)
return ok
def update_serverlist(hosti):
global serverlist, host
try:
r=requests.get("%s/server_list.json" % hosti, timeout=10.0)
if (r.status_code != 200):
raise AssertionError("Mirror %s seems to be down!\nReturned error %03d\n" % (hosti.replace("https://", "").replace("http://", ""), r.status_code))
j=json.loads(r.text)
## If we reached here, then everything is likely fine
## We can now build the persistent data
print("Fetching server list and assets...")
slist=len(j)
for server in j:
if (validate_server(server)):
serverlist.append(server)
## If we were truly successful, save this host as our mirror
host = hosti
return True
except:
traceback.print_exc()
return False
def launch_game(idx):
## Get/Set basic data
HOST=serverlist[idx]["Host"]
PORT=serverlist[idx]["Port"]
CMD=""
OPT=""
PWD=""
## Get credentials
auth = {"vaultId": vaultId,
"token": vaultToken,
"world": serverlist[idx]["UUID"]}
try:
r=vault.post(VAULT_HOST+"/world_pass", json=auth, timeout=15.0)
## We got the access credentials
if r.status_code == 200:
auth2=r.json()
PWD=" -U %s -P %s" % (auth2["user"], auth2["pass"])
## We were refused by Vault
elif r.status_code == 403:
stdout("Warning: Connection was refused (Vault logout?)")
return -1
## We are rate-limited, try again
elif r.status_code == 429:
stdout("Rate limited!")
return -1
## Internal error, maybe?
else:
stdout("Get World Auth - Returned error code %d" % r.status_code)
return -1
except:
traceback.print_exc()
return -1
## Handle server type
if serverlist[idx]["Type"].lower() == "evol2":
CMD="manaplus"
OPT="-s %s -y evol2 -p %s -S" % (HOST, PORT)
## Execute the app
app=execute(san("%s %s%s" % (CMD, OPT, PWD)), shell=True) # nosec
return app
## Only meaningful on Linux
os.environ["APPIMAGELAUNCHER_DISABLE"]="1"
#################################################################################
## Build the Vault interface (infinite loop)
while VAULT_CERT:
try:
######################################
# Fetch a new PEM Certificate
stdout("Fetching SSL Certificate from %s" % VAULT_CERT)
r = requests.get(VAULT_CERT)
if (r.status_code != 200):
raise Exception("\nFailed to fetch Vault SSL Certificate.\n\n Returned HTTP error %d." % r.status_code)
# Save it
with open("cert.pem", 'wb') as fd:
for chunk in r.iter_content(chunk_size=128):
fd.write(chunk)
# Presumably all good, so do not continue and break the loop
stdout("PEM Download OK")
break
except:
traceback.print_exc()
stdout("Automatically retry in 5 seconds...")
time.sleep(5.0)
#################################################################################
## Construct session data
update_serverlist(SERVERLIST)
email = input("Vault Email: ")
passw = getpass.getpass(prompt="Vault Password: ")
totps = input("Vault 2FA Code: ")
data = {"mail": email,
"pass": passw,
"totp": totps[:6]
}
r = vault.post(VAULT_HOST+"/user_auth", json=data)
if (r.status_code != 200):
print("Vault returned error %d\n\nPlease try again later." % r.status_code)
exit(1)
auth2 = r.json()
vaultId = auth2["vaultId"]
vaultToken = auth2["token"]
print("Connected to vault!")
try:
while True:
command=str(input("> "))
# No command inserted? Do nothing
if command == "":
continue
# We have a command, prepare it for processing
stdout("CONSOLE: %s" % command)
cmd=command.split(' ')[0].lower()
com=" ".join(command.split(' ')[1:])
# Parse the command
# TODO: grant gems to an user
if cmd in ["help", "h"]:
print("help - Shows this help text")
print("list - List servers")
print("exit - Closes the program")
print("connect <world> - Connects to <world>")
print("")
print("You need ManaPlus installed for Evol2 worlds.")
elif cmd in ["list", "servers"]:
for w in serverlist:
print("\033[1m%s\033[0m" % w["Name"])
print("Type: %s" % w["Type"])
print("Help: %s" % w["Help"])
elif cmd in ["exit", "quit", "close"]:
break
elif cmd in ["connect", "run", "world"]:
target=dl_search_idx(serverlist, "Name", com)
print("Target \"%s\" - %s" % (com, str(target)))
try:
idx=int(target)
except:
stdout("ERROR: Unknown world.")
continue
app=launch_game(idx)
## Handle MLP
while app == 7:
stdout("[CLIENT] Mirror Lake triggered.")
r=vault.post(VAULT_HOST+"/getlake", json=auth, timeout=15.0)
if r.status_code == 200:
goto=r.json()["world"]
stdout("MLP Target: %s" % str(goto))
if goto == "" or goto.lower() == "vault":
stdout("Mirror Lake False Positive")
break
try:
idx=int(dl_search_idx(serverlist, "UUID", goto))
app=launch_game(idx)
except:
traceback.print_exc()
break
else:
stdout("ERROR: Unknown command. Try \"help\".")
except:
stdout("Abrupt error: Terminating!")
traceback.print_exc()