From bac3c99c1b23487e38d965a79173ce9519e19c75 Mon Sep 17 00:00:00 2001 From: Julien Rische Date: Mon, 18 Nov 2024 09:38:13 +0100 Subject: [PATCH 1/3] Use exponential backoff for connection retries Calls to socket.connect() are non-blocking, hence all subsequent calls to socket.sendall() will fail if the target KDC service is temporarily or indefinitely unreachable. Since the kdcproxy task uses busy-looping, it results in the journal to be flooded with warning logs. This commit introduces a per-socket reactivation delay which increases exponentially as the number of reties is incremented, until timeout is reached (i.e. 100ms, 200ms, 400ms, 800ms, 1.6s, 3.2s, ...). Signed-off-by: Julien Rische --- kdcproxy/__init__.py | 63 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 60 insertions(+), 3 deletions(-) diff --git a/kdcproxy/__init__.py b/kdcproxy/__init__.py index 1493b30..d0ca43e 100644 --- a/kdcproxy/__init__.py +++ b/kdcproxy/__init__.py @@ -61,6 +61,13 @@ def __str__(self): return "%d %s" % (self.code, httplib.responses[self.code]) +class SocketException(Exception): + + def __init__(self, message, sock): + super(Exception, self).__init__(message) + self.sockfno = sock.fileno() + + class Application: MAX_LENGTH = 128 * 1024 SOCKTYPES = { @@ -68,10 +75,23 @@ class Application: "udp": socket.SOCK_DGRAM, } + def addr2socktypename(self, addr): + ret = None + for name in self.SOCKTYPES: + if self.SOCKTYPES[name] == addr[1]: + ret = name + break + return ret + def __init__(self): self.__resolver = MetaResolver() def __await_reply(self, pr, rsocks, wsocks, timeout): + starting_time = time.time() + send_error = None + recv_error = None + failing_sock = None + reactivations = {} extra = 0 read_buffers = {} while (timeout + extra) > time.time(): @@ -92,6 +112,12 @@ def __await_reply(self, pr, rsocks, wsocks, timeout): pass for sock in w: + # Fetch reactivation tuple: + # 1st element: reactivation index (-1 = first activation) + # 2nd element: planned reactivation time (0.0 = now) + (rn, rt) = reactivations.get(sock, (-1, 0.0)) + if rt > time.time(): + continue try: if self.sock_type(sock) == socket.SOCK_DGRAM: # If we proxy over UDP, remove the 4-byte length @@ -101,8 +127,13 @@ def __await_reply(self, pr, rsocks, wsocks, timeout): sock.sendall(pr.request) extra = 10 # New connections get 10 extra seconds except Exception as e: - logging.warning("Conection broken while writing (%s)", e) + send_error = e + failing_sock = sock + reactivations[sock] = (rn + 1, + time.time() + 2.0**(rn + 1) / 10) continue + if sock in reactivations: + del reactivations[sock] rsocks.append(sock) wsocks.remove(sock) @@ -110,7 +141,8 @@ def __await_reply(self, pr, rsocks, wsocks, timeout): try: reply = self.__handle_recv(sock, read_buffers) except Exception as e: - logging.warning("Connection broken while reading (%s)", e) + recv_error = e + failing_sock = sock if self.sock_type(sock) == socket.SOCK_STREAM: # Remove broken TCP socket from readers rsocks.remove(sock) @@ -118,6 +150,21 @@ def __await_reply(self, pr, rsocks, wsocks, timeout): if reply is not None: return reply + if reactivations: + raise SocketException("Timeout while sending packets after %.2fs " + "and %d tries: %s" % ( + (timeout + extra) - starting_time, + sum(map(lambda r: r[0], + reactivations.values())), + send_error), + failing_sock) + elif recv_error is not None: + raise SocketException("Timeout while receiving packets after " + "%.2fs: %s" % ( + (timeout + extra) - starting_time, + recv_error), + failing_sock) + return None def __handle_recv(self, sock, read_buffers): @@ -215,6 +262,7 @@ def __call__(self, env, start_response): reply = None wsocks = [] rsocks = [] + sockfno2addr = {} for server in map(urlparse.urlparse, servers): # Enforce valid, supported URIs scheme = server.scheme.lower().split("+", 1) @@ -261,6 +309,7 @@ def __call__(self, env, start_response): continue except io.BlockingIOError: pass + sockfno2addr[sock.fileno()] = addr wsocks.append(sock) # Resend packets to UDP servers @@ -271,7 +320,15 @@ def __call__(self, env, start_response): # Call select() timeout = time.time() + (15 if addr is None else 2) - reply = self.__await_reply(pr, rsocks, wsocks, timeout) + try: + reply = self.__await_reply(pr, rsocks, wsocks, timeout) + except SocketException as e: + fail_addr = sockfno2addr[e.sockfno] + fail_socktype = self.addr2socktypename(fail_addr) + fail_ip = fail_addr[4][0] + fail_port = fail_addr[4][1] + logging.warning("Exchange with %s:[%s]:%d failed: %s", + fail_socktype, fail_ip, fail_port, e) if reply is not None: break From c8a69dbc0777579ba3bf3d156baed0966327ebc2 Mon Sep 17 00:00:00 2001 From: Julien Rische Date: Mon, 18 Nov 2024 10:01:16 +0100 Subject: [PATCH 2/3] Use dedicated "kdcproxy" logger Signed-off-by: Julien Rische --- kdcproxy/__init__.py | 7 +++++-- kdcproxy/config/__init__.py | 7 +++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/kdcproxy/__init__.py b/kdcproxy/__init__.py index d0ca43e..ce96a0c 100644 --- a/kdcproxy/__init__.py +++ b/kdcproxy/__init__.py @@ -38,6 +38,9 @@ import httplib import urlparse +logging.basicConfig() +logger = logging.getLogger('kdcproxy') + class HTTPException(Exception): @@ -327,8 +330,8 @@ def __call__(self, env, start_response): fail_socktype = self.addr2socktypename(fail_addr) fail_ip = fail_addr[4][0] fail_port = fail_addr[4][1] - logging.warning("Exchange with %s:[%s]:%d failed: %s", - fail_socktype, fail_ip, fail_port, e) + logger.warning("Exchange with %s:[%s]:%d failed: %s", + fail_socktype, fail_ip, fail_port, e) if reply is not None: break diff --git a/kdcproxy/config/__init__.py b/kdcproxy/config/__init__.py index a1435b7..8e17c5b 100644 --- a/kdcproxy/config/__init__.py +++ b/kdcproxy/config/__init__.py @@ -32,6 +32,9 @@ import dns.rdatatype import dns.resolver +logging.basicConfig() +logger = logging.getLogger('kdcproxy') + class IResolver(object): @@ -60,14 +63,14 @@ def __init__(self, filenames=None): try: self.__cp.read(filenames) except configparser.Error: - logging.error("Unable to read config file(s): %s", filenames) + logger.error("Unable to read config file(s): %s", filenames) try: mod = self.__cp.get(self.GLOBAL, "configs") try: importlib.import_module("kdcproxy.config." + mod) except ImportError as e: - logging.log(logging.ERROR, "Error reading config: %s" % e) + logger.log(logging.ERROR, "Error reading config: %s" % e) except configparser.Error: pass From cde2416ca0370dd0f0bfb2c9fc9a2a6fac1d6397 Mon Sep 17 00:00:00 2001 From: Julien Rische Date: Fri, 22 Nov 2024 11:44:52 +0100 Subject: [PATCH 3/3] Fix "doc" tox task Signed-off-by: Julien Rische --- tox.ini | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index b2cf0b9..9a640f1 100644 --- a/tox.ini +++ b/tox.ini @@ -28,11 +28,12 @@ deps = doc8 docutils markdown + rst2html basepython = python3 commands = doc8 --allow-long-titles README python setup.py check --restructuredtext --metadata --strict - rst2html.py README {toxworkdir}/README.html + rst2html README {toxworkdir}/README.html markdown_py README.md -f {toxworkdir}/README.md.html [pytest]