Skip to content

Commit fcb7223

Browse files
committed
Use default scheduler port for LocalCluster
1. Unify multiple occurrences of 8786 into a single constant `DEFAULT_SCHEDULER_PORT`. 2. Change the default for the local cluster to use that instead of `0` (random port). 3. Introduce a fallback address for scheduler start in case of no port having been given.
1 parent 8564dc7 commit fcb7223

File tree

6 files changed

+48
-15
lines changed

6 files changed

+48
-15
lines changed

distributed/cli/dask_scheduler.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
enable_proctitle_on_children,
2121
enable_proctitle_on_current,
2222
)
23+
from distributed.scheduler import DEFAULT_SCHEDULER_PORT
2324

2425
logger = logging.getLogger("distributed.scheduler")
2526

@@ -165,9 +166,9 @@ def main(
165166

166167
if port is None and (not host or not re.search(r":\d", host)):
167168
if isinstance(protocol, list):
168-
port = [8786] + [0] * (len(protocol) - 1)
169+
port = [DEFAULT_SCHEDULER_PORT] + [0] * (len(protocol) - 1)
169170
else:
170-
port = 8786
171+
port = DEFAULT_SCHEDULER_PORT
171172

172173
if isinstance(protocol, list) or isinstance(port, list):
173174
if (not isinstance(protocol, list) or not isinstance(port, list)) or len(

distributed/cli/dask_ssh.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import click
99

1010
from distributed.deploy.old_ssh import SSHCluster
11+
from distributed.scheduler import DEFAULT_SCHEDULER_PORT
1112

1213
logger = logging.getLogger("distributed.dask_ssh")
1314

@@ -30,7 +31,7 @@
3031
)
3132
@click.option(
3233
"--scheduler-port",
33-
default=8786,
34+
default=DEFAULT_SCHEDULER_PORT,
3435
show_default=True,
3536
type=int,
3637
help="Specify scheduler port number.",

distributed/core.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -859,13 +859,30 @@ async def listen(self, port_or_addr=None, allow_offload=True, **kwargs):
859859
else:
860860
addr = port_or_addr
861861
assert isinstance(addr, str)
862-
listener = await listen(
863-
addr,
864-
self.handle_comm,
865-
deserialize=self.deserialize,
866-
allow_offload=allow_offload,
867-
**kwargs,
868-
)
862+
try:
863+
listener = await listen(
864+
addr,
865+
self.handle_comm,
866+
deserialize=self.deserialize,
867+
allow_offload=allow_offload,
868+
**kwargs,
869+
)
870+
except OSError:
871+
fallback_port_or_addr = kwargs.get("fallback_port_or_addr", None)
872+
if not fallback_port_or_addr:
873+
raise
874+
warnings.warn(
875+
f"Address {addr} is already in use.\n"
876+
f"Falling back to {fallback_port_or_addr} instead"
877+
)
878+
listener = await listen(
879+
fallback_port_or_addr,
880+
self.handle_comm,
881+
deserialize=self.deserialize,
882+
allow_offload=allow_offload,
883+
**kwargs,
884+
)
885+
869886
self.listeners.append(listener)
870887

871888
def handle_comm(self, comm: Comm) -> NoOpAwaitable:

distributed/deploy/local.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ def __init__(
120120
start=None,
121121
host=None,
122122
ip=None,
123-
scheduler_port=0,
123+
scheduler_port=None,
124124
silence_logs=logging.WARN,
125125
dashboard_address=":8787",
126126
worker_dashboard_address=None,

distributed/scheduler.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@
195195
"stealing": WorkStealing,
196196
}
197197

198+
DEFAULT_SCHEDULER_PORT = 8786
199+
198200

199201
class ClientState:
200202
"""A simple object holding information about a client."""
@@ -3574,7 +3576,6 @@ class Scheduler(SchedulerState, ServerNode):
35743576
Time we expect certain functions to take, e.g. ``{'sum': 0.25}``
35753577
"""
35763578

3577-
default_port = 8786
35783579
_instances: ClassVar[weakref.WeakSet[Scheduler]] = weakref.WeakSet()
35793580

35803581
worker_ttl: float | None
@@ -3686,8 +3687,18 @@ def __init__(
36863687
interface=interface,
36873688
protocol=protocol,
36883689
security=security,
3689-
default_port=self.default_port,
3690+
default_port=DEFAULT_SCHEDULER_PORT,
36903691
)
3692+
if port is None:
3693+
self._fallback_start_addresses = addresses_from_user_args(
3694+
host=host,
3695+
port=0,
3696+
interface=interface,
3697+
protocol=protocol,
3698+
security=security,
3699+
)
3700+
else:
3701+
self._fallback_start_addresses = []
36913702

36923703
http_server_modules = dask.config.get("distributed.scheduler.http.routes")
36933704
show_dashboard = dashboard or (dashboard is None and dashboard_address)
@@ -4094,11 +4105,14 @@ async def start_unsafe(self) -> Self:
40944105

40954106
self._clear_task_state()
40964107

4097-
for addr in self._start_address:
4108+
for addr, fallback_addr in itertools.zip_longest(
4109+
self._start_address, self._fallback_start_addresses
4110+
):
40984111
await self.listen(
40994112
addr,
41004113
allow_offload=False,
41014114
handshake_overrides={"pickle-protocol": 4, "compression": None},
4115+
fallback_port_or_addr=fallback_addr,
41024116
**self.security.get_listen_args("scheduler"),
41034117
)
41044118
self.ip = get_address_host(self.listen_address)

distributed/utils_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2530,7 +2530,7 @@ def _bind_port(port):
25302530
s.listen(1)
25312531
yield s
25322532

2533-
default_ports = [8786]
2533+
default_ports = [Scheduler.DEFAULT_SCHEDULER_PORT]
25342534

25352535
while time() - start < _TEST_TIMEOUT:
25362536
try:

0 commit comments

Comments
 (0)