Skip to content

Commit

Permalink
Fix a ResourceWarning caused by SimpleAdaptivePool (#7009)
Browse files Browse the repository at this point in the history
This was a huge pain to track down and fix and probably not worth
spending my morning on it. I ended up editing my installed
`subprocess.py` to see what the command line was for the process that
was leaking.

I then spent a long time thinking there might be a uvloop bug, because
I confused myself and thought we were using uvloop in the test suite
(after I added a debug print that failed under uvloop---but the
failure occured when running the real server).

The confusing part to me was that I felt that a subprocess protocol
shouldn't be getting notified that it has disconnected unless the
child process has actually exited, but I had missed that the protocol
we use to communicate with the workers is a normal socket protocol
which *obviously* closes when the socket does.

Anyway the fix is to arrange to wait for the children to exit, which
like everything involving protocols is annoying because the callback
isn't async.
  • Loading branch information
msullivan authored Mar 7, 2024
1 parent 47764a0 commit fc6a563
Showing 1 changed file with 12 additions and 0 deletions.
12 changes: 12 additions & 0 deletions edb/server/compiler_pool/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,7 @@ def __init__(self, *, pool_size, **kwargs):
self._expected_num_workers = 0
self._scale_down_handle = None
self._max_num_workers = pool_size
self._cleanups = {}

async def _start(self):
async with asyncio.TaskGroup() as g:
Expand All @@ -897,6 +898,8 @@ async def _stop(self):
for transport in transports.values():
await transport._wait()
transport.close()
for cleanup in list(self._cleanups.values()):
await cleanup

async def _acquire_worker(
self, *, condition=None, weighter=None, **compiler_args
Expand Down Expand Up @@ -940,12 +943,21 @@ def _release_worker(self, worker, *, put_in_front: bool = True):
self._scale_down,
)

async def _wait_on_dying(self, pid, trans):
await trans._wait()
self._cleanups.pop(pid)

def worker_disconnected(self, pid):
num_workers_before = len(self._workers)
super().worker_disconnected(pid)
trans = self._worker_transports.pop(pid, None)
if trans:
trans.close()
# amsg.Server notifies us when the *pipe* to the worker closes,
# so we need to fire off a task to make sure that we wait for
# the worker to exit, in order to avoid a warning.
self._cleanups[pid] = (
self._loop.create_task(self._wait_on_dying(pid, trans)))
if not self._running:
return
if len(self._workers) < self._pool_size:
Expand Down

0 comments on commit fc6a563

Please sign in to comment.