diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index 45534e9be80..cf70ffdab61 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -91,6 +91,14 @@ properties: Works in conjunction with idle-timeout. + no-clients-timeout: + type: + - string + - "null" + description: | + Shut down the scheduler after this duration if no client heartbeats received, + and there are no running or queued tasks. + work-stealing: type: boolean description: | diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index a7ec037e27c..4d77990357e 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -19,6 +19,7 @@ distributed: events-cleanup-delay: 1h idle-timeout: null # Shut down after this duration, like "1h" or "30 minutes" no-workers-timeout: null # Shut down if there are tasks but no workers to process them + no-clients-timeout: null # Shut down after this duration of client inactivity, similar to idle-timeout work-stealing: True # workers should steal tasks from each other work-stealing-interval: 100ms # Callback time for work stealing worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 0273d333da3..180a7aadd44 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3582,6 +3582,8 @@ class Scheduler(SchedulerState, ServerNode): idle_timeout: float | None _no_workers_since: float | None # Note: not None iff there are pending tasks no_workers_timeout: float | None + _no_clients_since: float + no_clients_timeout: float | None def __init__( self, @@ -3597,6 +3599,7 @@ def __init__( security=None, worker_ttl=None, idle_timeout=None, + no_clients_timeout=None, interface=None, host=None, port=0, @@ -3656,6 +3659,11 @@ def __init__( dask.config.get("distributed.scheduler.no-workers-timeout") ) self._no_workers_since = None + self.no_clients_timeout = parse_timedelta( + no_clients_timeout + or dask.config.get("distributed.scheduler.no-clients-timeout") + ) + self._no_clients_since = time() self.time_started = self.idle_since # compatibility for dask-gateway self._replica_lock = RLock() @@ -3934,6 +3942,9 @@ async def post(self): pc = PeriodicCallback(self._check_no_workers, 250) self.periodic_callbacks["no-workers-timeout"] = pc + pc = PeriodicCallback(self._check_no_clients, 250) + self.periodic_callbacks["no-clients-timeout"] = pc + if extensions is None: extensions = DEFAULT_EXTENSIONS.copy() if not dask.config.get("distributed.scheduler.work-stealing"): @@ -8521,6 +8532,36 @@ def check_idle(self) -> float | None: self._ongoing_background_tasks.call_soon(self.close) return self.idle_since + def _check_no_clients(self) -> None: + """Shut down the schedule if there are no running tasks / no tasks ready to + run, and there hasn't been any client connection for + `distributed.scheduler.no-workers-timeout`.""" + if self.status in (Status.closing, Status.closed): + return # pragma: nocover + + if ( + self.queued + or self.unrunnable + or any(ws.processing for ws in self.workers.values()) + ): + return + + if self.jupyter: + self._no_clients_since = max( + self._no_clients_since, + self._jupyter_server_application.web_app.last_activity().timestamp(), + ) + if self.clients: + clients_hb = max(c.last_seen for c in self.clients.values()) + self._no_clients_since = max(self._no_clients_since, clients_hb) + + if ( + self.no_clients_timeout + and time() > self._no_clients_since + self.no_clients_timeout + ): + logger.info("No tasks and no client heartbeat; shutting scheduler down") + self._ongoing_background_tasks.call_soon(self.close) + def _check_no_workers(self) -> None: """Shut down the scheduler if there have been tasks ready to run which have nowhere to run for `distributed.scheduler.no-workers-timeout`, and there diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 9aaea1288a4..305364021d4 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -2541,6 +2541,36 @@ async def test_no_workers_timeout_processing(c, s, a, b): await asyncio.sleep(0.01) +@pytest.mark.slow +@gen_cluster(client=True) +async def test_no_clients_timeout(c, s, a, b): + """Tests that with setting `no_clients_timeout` value, a submitted future + does complete, but afterwards cluster shuts down with the expected log message.""" + + beginning = time() + future = c.submit(slowinc, 1) + while not s.tasks: + await asyncio.sleep(0.01) + s.no_clients_timeout = 1.0 + pc = PeriodicCallback(s._check_no_clients, 10) + pc.start() + await future + + with captured_logger("distributed.scheduler") as logs: + start = time() + while s.status != Status.closed: + await asyncio.sleep(0.01) + assert time() < start + 3 + + start = time() + while not (a.status == Status.closed and b.status == Status.closed): + await asyncio.sleep(0.01) + assert time() < start + 1 + + assert "No tasks and no client heartbeat" in logs.getvalue() + pc.stop() + + @gen_cluster(client=True, config={"distributed.scheduler.bandwidth": "100 GB"}) async def test_bandwidth(c, s, a, b): start = s.bandwidth