Skip to content

Commit

Permalink
Report delayed errors (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfan5 authored Oct 14, 2024
1 parent dcc8d5e commit 3d08cd4
Showing 1 changed file with 71 additions and 19 deletions.
90 changes: 71 additions & 19 deletions server.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,28 @@ def announce():
server["total_clients"] = server["clients"]
server["pop_v"] = server["total_clients"] / server["updates"]

old_err = errorTracker.get(getErrorPK(server))

finishRequestAsync(server)

if old_err:
return ("Request has been filed, "
"but the previous request encountered the following error:\n" +
old_err, 409)
return "Request has been filed.", 202

# Utilities

# returns a primary key suitable for saving and replaying an error unique to a
# server that was announced.
def getErrorPK(server):
# some failures depend on the client IP too
return(server["ip"], server["address"], server["port"])

# check if something is a domain name (approximate)
def isDomain(s):
return "." in s and s.rpartition(".")[2][0].isalpha()

# Returns ping time in seconds (up), False (down), or None (error).
def serverUp(info):
sock = None
Expand All @@ -211,7 +227,7 @@ def serverUp(info):
# [7] u8 type (PACKET_TYPE_ORIGINAL)
buf = b"\x4f\x45\x74\x03\x00\x00\x00\x01"
sock.send(buf)
start = time.time()
start = time.monotonic()
# receive reliable packet of type CONTROL, subtype SET_PEER_ID,
# with our assigned peer id as data
# [0] u32 protocol_id (PROTOCOL_ID)
Expand All @@ -223,7 +239,7 @@ def serverUp(info):
# [11] u8 controltype (CONTROLTYPE_SET_PEER_ID)
# [12] session_t peer_id_new
data = sock.recv(1024)
end = time.time()
end = time.monotonic()
if not data:
return False
peer_id = data[12:14]
Expand All @@ -248,10 +264,6 @@ def serverUp(info):


def checkRequestAddress(server):
# will fall back to IP of requester, can't possibly be wrong
if "address" not in server or not server["address"]:
return

name = server["address"].lower()

# example value from minetest.conf
Expand Down Expand Up @@ -393,6 +405,10 @@ def checkRequest(server):
s = s.replace(c, "")
server[field] = s

# default value
if "address" not in server or not server["address"]:
server["address"] = server["ip"]

return True


Expand All @@ -404,26 +420,27 @@ def finishRequestAsync(server):


def asyncFinishThread(server):
checkAddress = False
if not "address" in server or not server["address"]:
server["address"] = server["ip"]
else:
checkAddress = True
checkAddress = server["ip"] != server["address"]

try:
info = socket.getaddrinfo(server["address"],
server["port"],
type=socket.SOCK_DGRAM,
proto=socket.SOL_UDP)
except socket.gaierror:
app.logger.warning("Unable to get address info for %s." % (server["address"],))
err = "Unable to get address info for %s" % server["address"]
app.logger.warning(err)
errorTracker.put(getErrorPK(server), err)
return

if checkAddress:
addresses = set(data[4][0] for data in info)
if not server["ip"] in addresses:
app.logger.warning("Invalid IP %s for address %s (address valid for %s)."
% (server["ip"], server["address"], addresses))
err = "Requester IP %s does not match host %s" % (server["ip"], server["address"])
if isDomain(server["address"]):
err += " (valid: %s)" % " ".join(addresses)
app.logger.warning(err)
errorTracker.put(getErrorPK(server), err)
return

geo = geoip_lookup_continent(info[-1][4][0])
Expand All @@ -432,12 +449,16 @@ def asyncFinishThread(server):

server["ping"] = serverUp(info[0])
if not server["ping"]:
app.logger.warning("Server %s:%d has no ping."
% (server["address"], server["port"]))
err = "Server %s port %d did not respond to ping" % (server["address"], server["port"])
if isDomain(server["address"]):
err += " (tried %s)" % info[0][4][0]
app.logger.warning(err)
errorTracker.put(getErrorPK(server), err)
return

# success!
errorTracker.remove(getErrorPK(server))
del server["action"]

serverList.update(server)


Expand Down Expand Up @@ -561,19 +582,50 @@ def update(self, server):
self.save()


class ErrorTracker:
VALIDITY_TIME = 600

def __init__(self):
self.table = {}
self.lock = RLock()

def put(self, k, info):
with self.lock:
self.table[k] = (time.monotonic() + ErrorTracker.VALIDITY_TIME, info)

def remove(self, k):
with self.lock:
self.table.pop(k, None)

def get(self, k):
with self.lock:
e = self.table.get(k)
if e and e[0] >= time.monotonic():
return e[1]

def cleanup(self):
with self.lock:
now = time.monotonic()
table = {k: e for k, e in self.table.items() if e[0] >= now}
self.table = table


class PurgeThread(Thread):
def __init__(self):
Thread.__init__(self)
self.daemon = True
Thread.__init__(self, daemon=True)
def run(self):
while True:
time.sleep(60)
serverList.purgeOld()
errorTracker.cleanup()


# Globals / Startup

serverList = ServerList()

errorTracker = ErrorTracker()

PurgeThread().start()

if __name__ == "__main__":
Expand Down

0 comments on commit 3d08cd4

Please sign in to comment.