From 6ce34d890b023386faa7b191c1f16a5318054898 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 14 Aug 2023 12:55:07 +0100 Subject: [PATCH] Propagate CancelledError in gather_from_workers (#8089) --- distributed/utils_comm.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/distributed/utils_comm.py b/distributed/utils_comm.py index 1ba22e3dea..04266a97da 100644 --- a/distributed/utils_comm.py +++ b/distributed/utils_comm.py @@ -76,8 +76,8 @@ async def gather_from_workers( return data, list(to_gather), failed_keys, list(missing_workers) - tasks = { - address: asyncio.create_task( + tasks = [ + asyncio.create_task( retry_operation( partial( get_data_from_worker, @@ -92,23 +92,28 @@ async def gather_from_workers( name=f"get-data-from-{address}", ) for address, keys in d.items() - } - for address, task in tasks.items(): - try: - r = await task - except OSError: + ] + + results = await asyncio.gather(*tasks, return_exceptions=True) + for address, r in zip(d, results): + if isinstance(r, OSError): missing_workers.add(address) - except Exception: + elif isinstance(r, Exception): # For example, deserialization error - logger.exception( + logger.error( "Unexpected error while collecting tasks %s from %s", d[address], address, + exc_info=r, ) for key in d[address]: failed_keys.append(key) del to_gather[key] + elif isinstance(r, BaseException): # pragma: nocover + # for example, asyncio.CancelledError + raise r else: + assert isinstance(r, dict), r if r["status"] == "busy": busy_workers.add(address) continue