From fc6a563136a8f571d4b1f33b19e488e4059ce08d Mon Sep 17 00:00:00 2001 From: "Michael J. Sullivan" Date: Thu, 7 Mar 2024 13:24:45 -0800 Subject: [PATCH] Fix a ResourceWarning caused by SimpleAdaptivePool (#7009) This was a huge pain to track down and fix and probably not worth spending my morning on it. I ended up editing my installed `subprocess.py` to see what the command line was for the process that was leaking. I then spent a long time thinking there might be a uvloop bug, because I confused myself and thought we were using uvloop in the test suite (after I added a debug print that failed under uvloop---but the failure occured when running the real server). The confusing part to me was that I felt that a subprocess protocol shouldn't be getting notified that it has disconnected unless the child process has actually exited, but I had missed that the protocol we use to communicate with the workers is a normal socket protocol which *obviously* closes when the socket does. Anyway the fix is to arrange to wait for the children to exit, which like everything involving protocols is annoying because the callback isn't async. --- edb/server/compiler_pool/pool.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/edb/server/compiler_pool/pool.py b/edb/server/compiler_pool/pool.py index a3040ee4284..e5407ceb7cb 100644 --- a/edb/server/compiler_pool/pool.py +++ b/edb/server/compiler_pool/pool.py @@ -885,6 +885,7 @@ def __init__(self, *, pool_size, **kwargs): self._expected_num_workers = 0 self._scale_down_handle = None self._max_num_workers = pool_size + self._cleanups = {} async def _start(self): async with asyncio.TaskGroup() as g: @@ -897,6 +898,8 @@ async def _stop(self): for transport in transports.values(): await transport._wait() transport.close() + for cleanup in list(self._cleanups.values()): + await cleanup async def _acquire_worker( self, *, condition=None, weighter=None, **compiler_args @@ -940,12 +943,21 @@ def _release_worker(self, worker, *, put_in_front: bool = True): self._scale_down, ) + async def _wait_on_dying(self, pid, trans): + await trans._wait() + self._cleanups.pop(pid) + def worker_disconnected(self, pid): num_workers_before = len(self._workers) super().worker_disconnected(pid) trans = self._worker_transports.pop(pid, None) if trans: trans.close() + # amsg.Server notifies us when the *pipe* to the worker closes, + # so we need to fire off a task to make sure that we wait for + # the worker to exit, in order to avoid a warning. + self._cleanups[pid] = ( + self._loop.create_task(self._wait_on_dying(pid, trans))) if not self._running: return if len(self._workers) < self._pool_size: