Skip to content

Commit

Permalink
Simplify usage of queues in nanny (#6655)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored Aug 10, 2023
1 parent 1f8a11c commit df3214b
Showing 1 changed file with 11 additions and 27 deletions.
38 changes: 11 additions & 27 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from collections.abc import Callable, Collection
from inspect import isawaitable
from queue import Empty
from time import sleep as sync_sleep
from typing import TYPE_CHECKING, ClassVar, Literal

from toolz import merge
Expand Down Expand Up @@ -594,11 +593,9 @@ async def close(
await asyncio.gather(*(td for td in teardowns if isawaitable(td)))

self.stop()
try:
if self.process is not None:
await self.kill(timeout=timeout, reason=reason)
except Exception:
logger.exception("Error in Nanny killing Worker subprocess")
if self.process is not None:
await self.kill(timeout=timeout, reason=reason)

self.process = None
await self.rpc.close()
self.status = Status.closed
Expand Down Expand Up @@ -689,9 +686,9 @@ async def start(self) -> Status:
if self.status == Status.starting:
await self.running.wait()
return self.status

self.init_result_q = init_q = get_mp_context().Queue()
self.child_stop_q = get_mp_context().Queue()
mp_ctx = get_mp_context()
self.init_result_q = mp_ctx.Queue()
self.child_stop_q = mp_ctx.Queue()
uid = uuid.uuid4().hex

self.process = AsyncProcess(
Expand Down Expand Up @@ -740,8 +737,6 @@ async def start(self) -> Status:
self.status = Status.running
self.running.set()

init_q.close()

return self.status

def _on_exit(self, proc):
Expand Down Expand Up @@ -825,22 +820,17 @@ async def kill(
logger.info("Nanny asking worker to close. Reason: %s", reason)

process = self.process
assert process
queue = self.child_stop_q
assert queue
wait_timeout = timeout * 0.8
queue.put(
self.child_stop_q.put(
{
"op": "stop",
"timeout": wait_timeout,
"executor_wait": executor_wait,
"reason": reason,
}
)
await asyncio.sleep(0) # otherwise we get broken pipe errors
queue.close()
del queue

self.child_stop_q.close()
assert process is not None
try:
try:
await process.join(wait_timeout)
Expand Down Expand Up @@ -963,14 +953,8 @@ async def run() -> None:
logger.exception(f"Failed to {failure_type} worker")
init_result_q.put({"uid": uid, "exception": e})
init_result_q.close()
# If we hit an exception here we need to wait for a least
# one interval for the outside to pick up this message.
# Otherwise we arrive in a race condition where the process
# cleanup wipes the queue before the exception can be
# properly handled. See also
# WorkerProcess._wait_until_connected (the 3 is for good
# measure)
sync_sleep(cls._init_msg_interval * 3)
finally:
init_result_q.join_thread()

with contextlib.ExitStack() as stack:

Expand Down

0 comments on commit df3214b

Please sign in to comment.