Skip to content

Commit

Permalink
Improve documentation of scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
saraedum committed Dec 9, 2024
1 parent 46dda88 commit 3be93a1
Showing 1 changed file with 71 additions and 16 deletions.
87 changes: 71 additions & 16 deletions flatsurvey/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
r"""
Runs a survey with dask on the local machine or in cluster.
TODO: Give full examples.
This implemnts a (somewhat unnecessary) wrapper for the dask API.
EXAMPLES:
We compute the orbit closure of the (1,1,1) and the (1,1,2) triangles::
>>> from flatsurvey.surfaces import Ngons
>>> ngons = Ngons.click.callback(3, 'e-antic', min=0, limit=None, count=2, literature='include', family=None, filter=None)
>>> from flatsurvey.jobs import OrbitClosure
>>> scheduler = Scheduler(generators=[ngons], goals=[OrbitClosure], bindings=[], reporters=[])
>>> await scheduler.start()
"""
# *********************************************************************
# This file is part of flatsurvey.
Expand Down Expand Up @@ -30,63 +43,102 @@ class Scheduler:
A scheduler that splits a survey into commands that are sent out to workers
via the dask protocol.
>>> Scheduler(generators=[], bindings=[], goals=[], reporters=[])
INPUT::
- ``generators`` -- a list of generators of surfaces, e.g., a list of lists
of surfaces
- ``goals`` -- a list of goals that should be resolved for each surface
such as :class:`OrbitClosure`.
- ``reporters`` -- a list of reporters that should be used to report the
``goals`` (default: the empty list which means the reporting will be done
to stdout)
- ``bindings`` -- additional bindings that are not covered by
``generators``, ``goals``, and ``reporters`` that should be taken into
account by the dependency injection (default: the empty list.)
- ``scheduler`` -- a dask scheduler file to connect to; if not given (the
default) then a dask scheduler is started by this process
- ``queue`` -- the number of processes to initially submit into the dask
scheduler before we wait for workers to finish (default: thrice the
number of workers threads)
- ``debug`` -- whether to attach a debbugger when the scheduler crashes
(default: ``False``.)
EXAMPLES::
>>> Scheduler(generators=[], goals=[])
Scheduler
"""

def __init__(
self,
generators,
bindings,
goals,
reporters,
reporters=(),
bindings=(),
scheduler=None,
queue=16,
dry_run=False,
quiet=False,
queue=None,
debug=False,
):
self._generators = generators
self._bindings = bindings
self._generators = list(generators)
self._bindings = list(bindings)
self._goals = goals
self._reporters = reporters
self._scheduler = scheduler
self._queue_limit = queue
self._dry_run = dry_run
self._quiet = quiet
self._debug = debug

self._enable_shared_bindings()

def __repr__(self):
return "Scheduler"
return "Scheduler(…)"

async def _create_pool(self):
r"""
Return a new dask pool to schedule jobs.
"""
import dask.config

# We do not spawn workers as daemons so that they can have child
# processes, see worker/dask.py (only has an effect if no
# scheduler_file is set.)
dask.config.set({"distributed.worker.daemon": False})

import dask.distributed

from multiprocessing import cpu_count

return await dask.distributed.Client(
scheduler_file=self._scheduler,
direct_to_workers=True,
# Connections are not very expensive but this number should be big
# enough so that we can communicate with all workers in large
# clusters.
connection_limit=2**16,
# We want to use dask through its modern asynchronous API.
asynchronous=True,
n_workers=8,
# Start a worker for each execution thread on the CPU. (Only
# relevant if we are not using an external scheduler.)
n_workers=cpu_count(),
# We run each worker single-threaded, see worker/dask.py.
nthreads=1,
# We preload worker/dask.py unless scheduler_file is set, see
# documentation there.
preload="flatsurvey.worker.dask",
# Disable the dask nanny, see module documentation of worker/dask.py
processes=False,
)

async def start(self):
r"""
Run the scheduler until it has run out of jobs to schedule.
Run the scheduler until all jobs have been scheduled and all jobs
terminated.
>>> import asyncio
>>> scheduler = Scheduler(generators=[], bindings=[], goals=[], reporters=[])
Expand Down Expand Up @@ -122,7 +174,7 @@ async def consume_one():
return completed

# Fill the job queue with a base line of queue_limit many jobs.
for _ in range(self._queue_limit):
for _ in range(self._queue_limit or 3 * sum((await pool.nthreads()).values())):
if not await schedule_one():
break

Expand Down Expand Up @@ -155,7 +207,10 @@ async def consume_one():

raise
finally:
await pool.close(0)
# Terminate all workers immediately if we crash out of this code
# block. (If we terminated normally, then there's nothing we have
# to wait for.
pool.close(0)

def _enable_shared_bindings(self):
shared = [binding for binding in self._bindings if binding.scope == "SHARED"]
Expand Down

0 comments on commit 3be93a1

Please sign in to comment.