################################################################################# # This file is part of Spheres. # Copyright (C) 2019 Jesusalva # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . ################################################################################# # Adds an extra layer of security to the server # Really basic stuff, still better than nothing, though import threading, time, traceback from utils import now, stdout, dl_search, ifte from consts import (BL_UPDATETIME, INT_MAX, BAN_TIME, BAN_AUTHED, BAN_UNAUTHED, PACKET_NACK, PACKET_ACK, PACKET_REGX) blacklist = [] ############################################################### ## Startup Methods # Import K-Line, G-Line and Z-Line try: f=open("Z-Line.txt", "r") for ip in f: blacklist.append([ip.replace("\n", ""), INT_MAX]) f.close() except: stdout("No Z-Line.txt config file found") pass try: f=open("G-Line.txt", "r") for ip in f: blacklist.append([ip.replace("\n", ""), INT_MAX]) f.close() except: stdout("No G-Line.txt config file found") pass try: f=open("K-Line.txt", "r") for ip in f: blacklist.append([ip.replace("\n", ""), INT_MAX]) f.close() except: stdout("No K-Line.txt config file found") pass # Inform what is current blacklist #stdout("Blacklist configuration:\n%s\n\n" % str(blacklist)) ############################################################################### ## Public Methods def is_banned(ip): global blacklist #print("Searching on blacklist") bl=dl_search(blacklist, 0, ip) print("Blacklist search result: %s" % str(bl)) if (bl != "ERROR"): return True else: return False def ban_ip(ip, until=INT_MAX): global blacklist blacklist.append([ip, until]) stdout("%s has been banned until %d." % (ip, until)) # TODO: kick users when they are banned or klined? return def unban_ip(ip): global blacklist try: blacklist.remove(dl_search(blacklist, 0, ip)) stdout("%s has been unbanned." % (ip)) except: stdout("%s is not banned." % (ip)) return def get_score(proto): if (proto == PACKET_REGX): return 2 elif proto == PACKET_NACK: return 5 elif proto == PACKET_ACK: return 0 else: stdout("\"%s\" is not a valid packet reply code" % str(proto)) return 0 def score(conn, score): #print("Score request: %d" % score) conn.MS_score += score limit = ifte(conn.MS_auth, BAN_AUTHED, BAN_UNAUTHED) if (conn.MS_score >= limit): stdout("Banning %s (%d/%d lame)" % (conn.address[0], conn.MS_score, limit)) ban_ip(conn.address[0], now()+BAN_TIME) time.sleep(0.1) try: conn.close(status=1000, reason="K-Lined") except: traceback.print_exc() print("Failed to close, retrying...") try: conn.close(conn, status=1000, reason="K-Lined") stdout("Connection closed!") except: traceback.print_exc() print("Still failed! Connection was kept alive.") return ############################################################################### ## Private Methods def blacklist_update(): for ban in blacklist: if ban[1] < now(): blacklist.remove(ban) stdout("Unbanning %s..." % ban[0]) bl_update=threading.Timer(BL_UPDATETIME, blacklist_update) bl_update.daemon=True bl_update.start() # Run, and keep running forever blacklist_update()