Skip to content

Commit c3ee5de

Browse files
committed
expose details about worker start timeout in the exception message
so that calling code can have more precise logic about how to handle the error
1 parent fea2284 commit c3ee5de

File tree

3 files changed

+35
-8
lines changed

3 files changed

+35
-8
lines changed

distributed/client.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@
127127
unpack_remotedata,
128128
)
129129
from distributed.worker import get_client, get_worker, secede
130+
from distributed.exceptions import WorkerStartTimeoutError
130131

131132
if TYPE_CHECKING:
132133
from typing_extensions import TypeAlias
@@ -1651,10 +1652,7 @@ def running_workers(info):
16511652

16521653
while running_workers(info) < n_workers:
16531654
if deadline and time() > deadline:
1654-
raise TimeoutError(
1655-
"Only %d/%d workers arrived after %s"
1656-
% (running_workers(info), n_workers, timeout)
1657-
)
1655+
raise WorkerStartTimeoutError(running_workers(info), n_workers, timeout)
16581656
await asyncio.sleep(0.1)
16591657
info = await self.scheduler.identity()
16601658
self._scheduler_identity = SchedulerInfo(info)

distributed/deploy/cluster.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
format_dashboard_link,
3030
log_errors,
3131
)
32+
from distributed.exceptions import WorkerStartTimeoutError
3233

3334
logger = logging.getLogger(__name__)
3435

@@ -610,10 +611,7 @@ def running_workers(info):
610611

611612
while n_workers and running_workers(self.scheduler_info) < n_workers:
612613
if deadline and time() > deadline:
613-
raise TimeoutError(
614-
"Only %d/%d workers arrived after %s"
615-
% (running_workers(self.scheduler_info), n_workers, timeout)
616-
)
614+
raise WorkerStartTimeoutError(running_workers(self.scheduler_info), n_workers, timeout)
617615
await asyncio.sleep(0.1)
618616

619617
self.scheduler_info = SchedulerInfo(await self.scheduler_comm.identity())

distributed/exceptions.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import annotations
22

3+
from asyncio import TimeoutError
4+
35

46
class Reschedule(Exception):
57
"""Reschedule this task
@@ -13,3 +15,32 @@ class Reschedule(Exception):
1315
load across the cluster has significantly changed since first scheduling
1416
the task.
1517
"""
18+
19+
20+
class WorkerStartTimeoutError(TimeoutError):
21+
"""Raised when the expected number of workers to not start within the timeout period."""
22+
23+
def __init__(self, available_workers: int, expected_workers: int, timeout) -> None:
24+
super().__init__(available_workers, expected_workers, timeout)
25+
26+
@property
27+
def available_workers(self) -> int:
28+
"""Number of workers that are available."""
29+
return self.args[0]
30+
31+
@property
32+
def expected_workers(self) -> int:
33+
"""Number of workers that were expected to be available."""
34+
return self.args[1]
35+
36+
@property
37+
def timeout(self) -> float:
38+
"""Timeout period in seconds."""
39+
return self.args[2]
40+
41+
def __str__(self) -> str:
42+
return "Only %d/%d workers arrived after %s" % (
43+
self.available_workers,
44+
self.expected_workers,
45+
self.timeout,
46+
)

0 commit comments

Comments
 (0)