Skip to content

Commit

Permalink
More post-merge tweaks.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Feb 13, 2025
1 parent 3bed101 commit 85dea2a
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 88 deletions.
3 changes: 2 additions & 1 deletion changes.d/5090.feat.md
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
Implement initial and final graphs, distinct from the main cycling graph.
Distinct initial and final graphs, separated from the main cycling graph,
to make it easier to configure special behaviour at startup and shutdown.
31 changes: 26 additions & 5 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,20 @@ def _warn_if_queues_have_implicit_tasks(
def prelim_process_graph(self) -> None:
"""Ensure graph is not empty; set integer cycling mode and icp/fcp = 1
for simplest "R1 = foo" type graphs.
Set cycling mode and initial cycle point (ICP) 1 for acyclic graphs.
Cycling mode and ICP can be omitted in acyclic graphs, where the value
of the single cycle point doesn't matter and the need for it is merely
a function of Cylc internals.
No need to add or adjust final cycle point in acyclic graphs - they
halt at the end of the acyclic graph.
Note:
- The default cycling mode, gregorian, requires an ICP.
- We can't currently distinguish between not-set and gregorian.
"""
graphdict = self.cfg['scheduling']['graph']
if not any(graphdict.values()):
Expand All @@ -619,15 +633,21 @@ def prelim_process_graph(self) -> None:
'cycling mode' not in self.cfg['scheduling'] and
self.cfg['scheduling'].get('initial cycle point', '1') == '1' and
all(
item in [
'graph', '1', 'R1',
seq in [
'R1',
str(NOCYCLE_SEQ_ALPHA),
str(NOCYCLE_SEQ_OMEGA)
str(NOCYCLE_SEQ_OMEGA),
'graph', # Cylc 7 back-compat
'1' # Cylc 7 back-compat?
]
for item in graphdict
for seq in graphdict
)
):
# Non-cycling graph, assume integer cycling mode with '1' cycle
# Pure acyclic graph, assume integer cycling mode with '1' cycle
# Note typos in "alpha", "omega", or "R1"
# will appear as cyclic here, but they will be fatal later anyway
# during proper recurrance checking after cycling initialization.

self.cfg['scheduling']['cycling mode'] = INTEGER_CYCLING_TYPE
for key in ('initial cycle point', 'final cycle point'):
if key not in self.cfg['scheduling']:
Expand Down Expand Up @@ -2260,6 +2280,7 @@ def load_graph(self):
seq = get_sequence(section, icp, fcp)
except (AttributeError, TypeError, ValueError, CylcError) as exc:
try:

Check warning on line 2282 in cylc/flow/config.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/config.py#L2282

Added line #L2282 was not covered by tests
# is it an alpha or omega graph?
seq = NocycleSequence(section)
except ValueError:

Check warning on line 2285 in cylc/flow/config.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/config.py#L2284-L2285

Added lines #L2284 - L2285 were not covered by tests
if cylc.flow.flags.verbosity > 1:
Expand Down
162 changes: 89 additions & 73 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ async def configure(self, params):
"""Configure the scheduler.
* Load the flow configuration.
* Load/write workflow parameters from the DB.
* Load/write workflow parameters from/to the DB.
* Get the data store rolling.
"""
Expand All @@ -487,8 +487,10 @@ async def configure(self, params):

self.profiler.log_memory("scheduler.py: before load_flow_file")
try:
cfg = self.load_flow_file()
self.apply_new_config(cfg, is_reload=False)
self.apply_new_config(
self.load_flow_file(),
is_reload=False
)
except ParsecError as exc:
# Mark this exc as expected (see docstring for .schd_expected):
exc.schd_expected = True
Expand Down Expand Up @@ -559,17 +561,6 @@ async def configure(self, params):
timer.reset()
self.timers[event] = timer

if self.is_restart and not self.pool.get_tasks():
# This workflow completed before restart; wait for intervention.
with suppress(KeyError):
self.timers[self.EVENT_RESTART_TIMEOUT].reset()
self.is_restart_timeout_wait = True
LOG.warning(
"This workflow already ran to completion."
"\nTo make it continue, trigger new tasks"
" before the restart timeout."
)

# Main loop plugins
self.main_loop_plugins = main_loop.load(
self.cylc_config.get('main loop', {}),
Expand Down Expand Up @@ -643,6 +634,11 @@ def log_start(self) -> None:
f'Run mode: {self.get_run_mode().value}',
extra=RotatingLogFileHandler.header_extra
)
LOG.info(
"Cycling mode:"
f" {self.config.cfg['scheduling']['cycling mode']}",
extra=RotatingLogFileHandler.header_extra
)
LOG.info(
f'Initial point: {self.config.initial_point}',
extra=RotatingLogFileHandler.header_extra
Expand All @@ -662,15 +658,21 @@ def log_start(self) -> None:
extra=RotatingLogFileHandler.header_extra
)

def _get_graph_loaders(self) -> None:
"""Get next graphs base on current pool content."""
# Check pool points in case this is a restart.
# TODO REALLY NEED TO CHECK DB FOR SECTIONS THAT RAN ALREADY.
async def _get_graph_loaders(self) -> None:
"""Tee up one or more graphs to load the task pool.
In a restart the pool is already loaded - examine it to see which
graphs to run next. Otherwise, tee up all the configured graphs.
Note a "graph loader" load the task pool with initial parentless
tasks defined by the graph.
TODO: post-restart examination of the task pool is fallible, we
really need to record graphs finished in the DB.
"""
points = [p.value for p in self.pool.get_points()]
if self.is_restart and not points:
# Restart with empty pool: only unfinished event handlers.
# No graph to load.
return

if (
Expand All @@ -683,8 +685,7 @@ def _get_graph_loaders(self) -> None:
)

if (
self.config.sequences
and (
self.config.sequences and (
not points
or (
not any(p not in NOCYCLE_POINTS for p in points)
Expand All @@ -705,66 +706,33 @@ def _get_graph_loaders(self) -> None:
and not self.is_restart
):
# Alpha section exists and hasn't started yet.
# (Never in a restart).
# (Not in a restart - the pool already loaded from DB!).
self.graph_loaders.append(

Check warning on line 710 in cylc/flow/scheduler.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scheduler.py#L710

Added line #L710 was not covered by tests
partial(self.pool.load_nocycle_graph, NOCYCLE_SEQ_ALPHA)
)

async def run_graphs(self):
self.graph_loaders = []
if self.is_restart:
# Restart from DB.
self.task_job_mgr.task_remote_mgr.is_restart = True
self.task_job_mgr.task_remote_mgr.rsync_includes = (
self.config.get_validated_rsync_includes())
self._load_pool_from_db()
all_tasks = self.pool.get_tasks()
if all_tasks:
# (If we're not restarting a finished workflow)
self.restart_remote_init()
# Poll all pollable tasks
await commands.run_cmd(commands.poll_tasks(self, ['*/*']))

# If we shut down with manually triggered waiting tasks,
# submit them to run now.
# NOTE: this will run tasks that were triggered with
# the trigger "--on-resume" option, even if the workflow
# is restarted as paused. Option to be removed at 8.5.0.
pre_prep_tasks = []
for itask in all_tasks:
if (
itask.is_manual_submit
and itask.state(TASK_STATUS_WAITING)
):
itask.waiting_on_job_prep = True
pre_prep_tasks.append(itask)

self.start_job_submission(pre_prep_tasks)

# TODO - WHY DOESN'T '*/*' MATCH THE FOLLOWING?
await commands.run_cmd(
commands.poll_tasks(self, [f"{NOCYCLE_PT_ALPHA}/*"]))
await commands.run_cmd(
commands.poll_tasks(self, [f"{NOCYCLE_PT_OMEGA}/*"]))

self._get_graph_loaders()

if self.pool.active_tasks:
# pool already loaded for integration test!
while await self._main_loop():
pass
# next graphs depends on content of restart pool
while self.graph_loaders:
(self.graph_loaders.pop())()
while await self._main_loop():
pass
elif self.pool.active_tasks:
# pool loaded for integration test!
return

Check warning on line 721 in cylc/flow/scheduler.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scheduler.py#L721

Added line #L721 was not covered by tests

if self.is_restart:
await self._load_pool_from_db()
self.pool.compute_runahead(force=True)
self.pool.release_runahead_tasks()

await self._get_graph_loaders()

while await self._main_loop():
pass
while self.graph_loaders:
(self.graph_loaders.pop())()
while await self._main_loop():
pass

Check warning on line 735 in cylc/flow/scheduler.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scheduler.py#L735

Added line #L735 was not covered by tests
else:
self._get_graph_loaders()
while self.graph_loaders:
(self.graph_loaders.pop())()
while await self._main_loop():
pass

async def run_scheduler(self) -> None:
"""Start the scheduler main loop."""
Expand Down Expand Up @@ -940,9 +908,14 @@ def _load_pool_from_point(self):
LOG.info(msg)
self.pool.load_from_point()

def _load_pool_from_db(self):
async def _load_pool_from_db(self):
"""Load task pool from DB, for a restart."""
LOG.info("LOADING DB FOR RESTART")

self.task_job_mgr.task_remote_mgr.is_restart = True
self.task_job_mgr.task_remote_mgr.rsync_includes = (
self.config.get_validated_rsync_includes())

self.workflow_db_mgr.pri_dao.select_broadcast_states(
self.broadcast_mgr.load_db_broadcast_states)
self.broadcast_mgr.post_load_db_coerce()
Expand All @@ -965,6 +938,48 @@ def _load_pool_from_db(self):
if self.restored_stop_task_id is not None:
self.pool.set_stop_task(self.restored_stop_task_id)

Check warning on line 939 in cylc/flow/scheduler.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scheduler.py#L939

Added line #L939 was not covered by tests

self.log_start()

all_tasks = self.pool.get_tasks()
if not all_tasks:
# Restart with empty pool: only unfinished event handlers.
# This workflow completed before restart; wait for intervention.
# TODO - WHAT IF IT SHUT DOWN BETWEEN GRAPHS
with suppress(KeyError):
self.timers[self.EVENT_RESTART_TIMEOUT].reset()
self.is_restart_timeout_wait = True
LOG.warning(
"This workflow already ran to completion."
"\nTo make it continue, trigger new tasks"
" before the restart timeout."
)
return

self.restart_remote_init()
# Poll all pollable tasks
await commands.run_cmd(commands.poll_tasks(self, ['*/*']))
# TODO - WHY DOESN'T '*/*' MATCH THE FOLLOWING?
await commands.run_cmd(
commands.poll_tasks(self, [f"{NOCYCLE_PT_ALPHA}/*"]))
await commands.run_cmd(
commands.poll_tasks(self, [f"{NOCYCLE_PT_OMEGA}/*"]))

# If we shut down with manually triggered waiting tasks,
# submit them to run now.
# NOTE: this will run tasks that were triggered with
# the trigger "--on-resume" option, even if the workflow
# is restarted as paused. Option to be removed at 8.5.0.
pre_prep_tasks = []
for itask in all_tasks:
if (
itask.is_manual_submit
and itask.state(TASK_STATUS_WAITING)
):
itask.waiting_on_job_prep = True
pre_prep_tasks.append(itask)

Check warning on line 979 in cylc/flow/scheduler.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scheduler.py#L978-L979

Added lines #L978 - L979 were not covered by tests

self.start_job_submission(pre_prep_tasks)

def restart_remote_init(self):
"""Remote init for all submitted/running tasks in the pool."""
self.task_job_mgr.task_remote_mgr.is_restart = True
Expand Down Expand Up @@ -1676,6 +1691,7 @@ def timeout_check(self):

async def workflow_shutdown(self):
"""Determines if the workflow can be shutdown yet."""
# from pudb import set_trace; set_trace()
if self.pool.check_abort_on_task_fails():
self._set_stop(StopMode.AUTO_ON_TASK_FAILURE)

Expand Down
10 changes: 5 additions & 5 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def _swap_out(self, itask):
self.active_tasks_changed = True

def load_nocycle_graph(self, seq):
"""blah """
"""Load task pool from a no-cycle graph."""
LOG.info(f"LOADING {seq} GRAPH")
flow_num = self.flow_mgr.get_flow_num(meta=f"original {seq} flow")
self.runahead_limit_point = None

Check warning on line 214 in cylc/flow/task_pool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_pool.py#L212-L214

Added lines #L212 - L214 were not covered by tests
Expand Down Expand Up @@ -284,7 +284,7 @@ def release_runahead_tasks(self):
Return True if any tasks are released, else False.
Call when RH limit changes.
"""
if not self.active_tasks or not self.runahead_limit_point:
if not self.active_tasks:
# (At start-up task pool might not exist yet)
return False

Expand All @@ -297,10 +297,10 @@ def release_runahead_tasks(self):
for point, itask_id_map in self.active_tasks.items()
for itask in itask_id_map.values()
if (
self.runahead_limit_point and
point <= self.runahead_limit_point
or
str(point) in NOCYCLE_POINTS
or
(self.runahead_limit_point and
point <= self.runahead_limit_point)
)
if itask.state.is_runahead
]
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/utils/flow_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,10 @@ def _make_flow(
return workflow_id


def _load_graph(sched):
async def _load_graph(sched):
"""Get scheduler to load the main graph."""
if sched.is_restart:
sched._load_pool_from_db()
await sched._load_pool_from_db()
elif sched.options.starttask:
sched._load_pool_from_tasks()
else:
Expand Down Expand Up @@ -170,7 +170,7 @@ async def _start_flow(
# exception occurs in Scheduler
try:
await schd.start()
_load_graph(schd)
await _load_graph(schd)
finally:
# After this `yield`, the `with` block of the context manager
# is executed:
Expand Down Expand Up @@ -202,7 +202,7 @@ async def _run_flow(
# exception occurs in Scheduler
try:
await schd.start()
_load_graph(schd)
await _load_graph(schd)
# Do not await as we need to yield control to the main loop:
task = asyncio.create_task(schd.run_scheduler())
finally:
Expand Down

0 comments on commit 85dea2a

Please sign in to comment.