Skip to content

Commit

Permalink
Propagate CancelledError in gather_from_workers (#8089)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Aug 14, 2023
1 parent a1659fd commit 6ce34d8
Showing 1 changed file with 14 additions and 9 deletions.
23 changes: 14 additions & 9 deletions distributed/utils_comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ async def gather_from_workers(

return data, list(to_gather), failed_keys, list(missing_workers)

tasks = {
address: asyncio.create_task(
tasks = [
asyncio.create_task(
retry_operation(
partial(
get_data_from_worker,
Expand All @@ -92,23 +92,28 @@ async def gather_from_workers(
name=f"get-data-from-{address}",
)
for address, keys in d.items()
}
for address, task in tasks.items():
try:
r = await task
except OSError:
]

results = await asyncio.gather(*tasks, return_exceptions=True)
for address, r in zip(d, results):
if isinstance(r, OSError):
missing_workers.add(address)
except Exception:
elif isinstance(r, Exception):
# For example, deserialization error
logger.exception(
logger.error(
"Unexpected error while collecting tasks %s from %s",
d[address],
address,
exc_info=r,
)
for key in d[address]:
failed_keys.append(key)
del to_gather[key]
elif isinstance(r, BaseException): # pragma: nocover
# for example, asyncio.CancelledError
raise r
else:
assert isinstance(r, dict), r
if r["status"] == "busy":
busy_workers.add(address)
continue
Expand Down

0 comments on commit 6ce34d8

Please sign in to comment.