Skip to content

Add no-clients-timeout option #8757

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ properties:

Works in conjunction with idle-timeout.

no-clients-timeout:
type:
- string
- "null"
description: |
Shut down the scheduler after this duration if no client heartbeats received,
and there are no running or queued tasks.

work-stealing:
type: boolean
description: |
Expand Down
1 change: 1 addition & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ distributed:
events-cleanup-delay: 1h
idle-timeout: null # Shut down after this duration, like "1h" or "30 minutes"
no-workers-timeout: null # Shut down if there are tasks but no workers to process them
no-clients-timeout: null # Shut down after this duration of client inactivity, similar to idle-timeout
work-stealing: True # workers should steal tasks from each other
work-stealing-interval: 100ms # Callback time for work stealing
worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers
Expand Down
41 changes: 41 additions & 0 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3582,6 +3582,8 @@
idle_timeout: float | None
_no_workers_since: float | None # Note: not None iff there are pending tasks
no_workers_timeout: float | None
_no_clients_since: float
no_clients_timeout: float | None

def __init__(
self,
Expand All @@ -3597,6 +3599,7 @@
security=None,
worker_ttl=None,
idle_timeout=None,
no_clients_timeout=None,
interface=None,
host=None,
port=0,
Expand Down Expand Up @@ -3656,6 +3659,11 @@
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None
self.no_clients_timeout = parse_timedelta(
no_clients_timeout
or dask.config.get("distributed.scheduler.no-clients-timeout")
)
self._no_clients_since = time()

self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
Expand Down Expand Up @@ -3934,6 +3942,9 @@
pc = PeriodicCallback(self._check_no_workers, 250)
self.periodic_callbacks["no-workers-timeout"] = pc

pc = PeriodicCallback(self._check_no_clients, 250)
self.periodic_callbacks["no-clients-timeout"] = pc

if extensions is None:
extensions = DEFAULT_EXTENSIONS.copy()
if not dask.config.get("distributed.scheduler.work-stealing"):
Expand Down Expand Up @@ -8521,6 +8532,36 @@
self._ongoing_background_tasks.call_soon(self.close)
return self.idle_since

def _check_no_clients(self) -> None:
"""Shut down the schedule if there are no running tasks / no tasks ready to
run, and there hasn't been any client connection for
`distributed.scheduler.no-workers-timeout`."""
if self.status in (Status.closing, Status.closed):
return # pragma: nocover

if (
self.queued
or self.unrunnable
or any(ws.processing for ws in self.workers.values())
):
return

if self.jupyter:
self._no_clients_since = max(

Check warning on line 8550 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L8550

Added line #L8550 was not covered by tests
self._no_clients_since,
self._jupyter_server_application.web_app.last_activity().timestamp(),
)
if self.clients:
clients_hb = max(c.last_seen for c in self.clients.values())
self._no_clients_since = max(self._no_clients_since, clients_hb)

if (
self.no_clients_timeout
and time() > self._no_clients_since + self.no_clients_timeout
):
logger.info("No tasks and no client heartbeat; shutting scheduler down")
self._ongoing_background_tasks.call_soon(self.close)

def _check_no_workers(self) -> None:
"""Shut down the scheduler if there have been tasks ready to run which have
nowhere to run for `distributed.scheduler.no-workers-timeout`, and there
Expand Down
30 changes: 30 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2541,6 +2541,36 @@ async def test_no_workers_timeout_processing(c, s, a, b):
await asyncio.sleep(0.01)


@pytest.mark.slow
@gen_cluster(client=True)
async def test_no_clients_timeout(c, s, a, b):
"""Tests that with setting `no_clients_timeout` value, a submitted future
does complete, but afterwards cluster shuts down with the expected log message."""

beginning = time()
future = c.submit(slowinc, 1)
while not s.tasks:
await asyncio.sleep(0.01)
s.no_clients_timeout = 1.0
pc = PeriodicCallback(s._check_no_clients, 10)
pc.start()
await future

with captured_logger("distributed.scheduler") as logs:
start = time()
while s.status != Status.closed:
await asyncio.sleep(0.01)
assert time() < start + 3

start = time()
while not (a.status == Status.closed and b.status == Status.closed):
await asyncio.sleep(0.01)
assert time() < start + 1

assert "No tasks and no client heartbeat" in logs.getvalue()
pc.stop()


@gen_cluster(client=True, config={"distributed.scheduler.bandwidth": "100 GB"})
async def test_bandwidth(c, s, a, b):
start = s.bandwidth
Expand Down
Loading