From 85dea2adb80ecaf76b055a8b894efc501e7a8d26 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Mon, 10 Feb 2025 09:59:06 +0000 Subject: [PATCH] More post-merge tweaks. --- changes.d/5090.feat.md | 3 +- cylc/flow/config.py | 31 ++++- cylc/flow/scheduler.py | 162 ++++++++++++++------------ cylc/flow/task_pool.py | 10 +- tests/integration/utils/flow_tools.py | 8 +- 5 files changed, 126 insertions(+), 88 deletions(-) diff --git a/changes.d/5090.feat.md b/changes.d/5090.feat.md index 76071afe652..a9f4e947f27 100644 --- a/changes.d/5090.feat.md +++ b/changes.d/5090.feat.md @@ -1 +1,2 @@ -Implement initial and final graphs, distinct from the main cycling graph. +Distinct initial and final graphs, separated from the main cycling graph, +to make it easier to configure special behaviour at startup and shutdown. diff --git a/cylc/flow/config.py b/cylc/flow/config.py index 325bcdc91c8..2d2ea5e126a 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -610,6 +610,20 @@ def _warn_if_queues_have_implicit_tasks( def prelim_process_graph(self) -> None: """Ensure graph is not empty; set integer cycling mode and icp/fcp = 1 for simplest "R1 = foo" type graphs. + + Set cycling mode and initial cycle point (ICP) 1 for acyclic graphs. + + Cycling mode and ICP can be omitted in acyclic graphs, where the value + of the single cycle point doesn't matter and the need for it is merely + a function of Cylc internals. + + No need to add or adjust final cycle point in acyclic graphs - they + halt at the end of the acyclic graph. + + Note: + - The default cycling mode, gregorian, requires an ICP. + - We can't currently distinguish between not-set and gregorian. + """ graphdict = self.cfg['scheduling']['graph'] if not any(graphdict.values()): @@ -619,15 +633,21 @@ def prelim_process_graph(self) -> None: 'cycling mode' not in self.cfg['scheduling'] and self.cfg['scheduling'].get('initial cycle point', '1') == '1' and all( - item in [ - 'graph', '1', 'R1', + seq in [ + 'R1', str(NOCYCLE_SEQ_ALPHA), - str(NOCYCLE_SEQ_OMEGA) + str(NOCYCLE_SEQ_OMEGA), + 'graph', # Cylc 7 back-compat + '1' # Cylc 7 back-compat? ] - for item in graphdict + for seq in graphdict ) ): - # Non-cycling graph, assume integer cycling mode with '1' cycle + # Pure acyclic graph, assume integer cycling mode with '1' cycle + # Note typos in "alpha", "omega", or "R1" + # will appear as cyclic here, but they will be fatal later anyway + # during proper recurrance checking after cycling initialization. + self.cfg['scheduling']['cycling mode'] = INTEGER_CYCLING_TYPE for key in ('initial cycle point', 'final cycle point'): if key not in self.cfg['scheduling']: @@ -2260,6 +2280,7 @@ def load_graph(self): seq = get_sequence(section, icp, fcp) except (AttributeError, TypeError, ValueError, CylcError) as exc: try: + # is it an alpha or omega graph? seq = NocycleSequence(section) except ValueError: if cylc.flow.flags.verbosity > 1: diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 8bb8b2dadef..8546eb3ba88 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -460,7 +460,7 @@ async def configure(self, params): """Configure the scheduler. * Load the flow configuration. - * Load/write workflow parameters from the DB. + * Load/write workflow parameters from/to the DB. * Get the data store rolling. """ @@ -487,8 +487,10 @@ async def configure(self, params): self.profiler.log_memory("scheduler.py: before load_flow_file") try: - cfg = self.load_flow_file() - self.apply_new_config(cfg, is_reload=False) + self.apply_new_config( + self.load_flow_file(), + is_reload=False + ) except ParsecError as exc: # Mark this exc as expected (see docstring for .schd_expected): exc.schd_expected = True @@ -559,17 +561,6 @@ async def configure(self, params): timer.reset() self.timers[event] = timer - if self.is_restart and not self.pool.get_tasks(): - # This workflow completed before restart; wait for intervention. - with suppress(KeyError): - self.timers[self.EVENT_RESTART_TIMEOUT].reset() - self.is_restart_timeout_wait = True - LOG.warning( - "This workflow already ran to completion." - "\nTo make it continue, trigger new tasks" - " before the restart timeout." - ) - # Main loop plugins self.main_loop_plugins = main_loop.load( self.cylc_config.get('main loop', {}), @@ -643,6 +634,11 @@ def log_start(self) -> None: f'Run mode: {self.get_run_mode().value}', extra=RotatingLogFileHandler.header_extra ) + LOG.info( + "Cycling mode:" + f" {self.config.cfg['scheduling']['cycling mode']}", + extra=RotatingLogFileHandler.header_extra + ) LOG.info( f'Initial point: {self.config.initial_point}', extra=RotatingLogFileHandler.header_extra @@ -662,15 +658,21 @@ def log_start(self) -> None: extra=RotatingLogFileHandler.header_extra ) - def _get_graph_loaders(self) -> None: - """Get next graphs base on current pool content.""" - # Check pool points in case this is a restart. - # TODO REALLY NEED TO CHECK DB FOR SECTIONS THAT RAN ALREADY. + async def _get_graph_loaders(self) -> None: + """Tee up one or more graphs to load the task pool. + + In a restart the pool is already loaded - examine it to see which + graphs to run next. Otherwise, tee up all the configured graphs. + + Note a "graph loader" load the task pool with initial parentless + tasks defined by the graph. + + TODO: post-restart examination of the task pool is fallible, we + really need to record graphs finished in the DB. + """ points = [p.value for p in self.pool.get_points()] if self.is_restart and not points: - # Restart with empty pool: only unfinished event handlers. - # No graph to load. return if ( @@ -683,8 +685,7 @@ def _get_graph_loaders(self) -> None: ) if ( - self.config.sequences - and ( + self.config.sequences and ( not points or ( not any(p not in NOCYCLE_POINTS for p in points) @@ -705,66 +706,33 @@ def _get_graph_loaders(self) -> None: and not self.is_restart ): # Alpha section exists and hasn't started yet. - # (Never in a restart). + # (Not in a restart - the pool already loaded from DB!). self.graph_loaders.append( partial(self.pool.load_nocycle_graph, NOCYCLE_SEQ_ALPHA) ) async def run_graphs(self): self.graph_loaders = [] - if self.is_restart: - # Restart from DB. - self.task_job_mgr.task_remote_mgr.is_restart = True - self.task_job_mgr.task_remote_mgr.rsync_includes = ( - self.config.get_validated_rsync_includes()) - self._load_pool_from_db() - all_tasks = self.pool.get_tasks() - if all_tasks: - # (If we're not restarting a finished workflow) - self.restart_remote_init() - # Poll all pollable tasks - await commands.run_cmd(commands.poll_tasks(self, ['*/*'])) - - # If we shut down with manually triggered waiting tasks, - # submit them to run now. - # NOTE: this will run tasks that were triggered with - # the trigger "--on-resume" option, even if the workflow - # is restarted as paused. Option to be removed at 8.5.0. - pre_prep_tasks = [] - for itask in all_tasks: - if ( - itask.is_manual_submit - and itask.state(TASK_STATUS_WAITING) - ): - itask.waiting_on_job_prep = True - pre_prep_tasks.append(itask) - - self.start_job_submission(pre_prep_tasks) - - # TODO - WHY DOESN'T '*/*' MATCH THE FOLLOWING? - await commands.run_cmd( - commands.poll_tasks(self, [f"{NOCYCLE_PT_ALPHA}/*"])) - await commands.run_cmd( - commands.poll_tasks(self, [f"{NOCYCLE_PT_OMEGA}/*"])) - - self._get_graph_loaders() + + if self.pool.active_tasks: + # pool already loaded for integration test! while await self._main_loop(): pass - # next graphs depends on content of restart pool - while self.graph_loaders: - (self.graph_loaders.pop())() - while await self._main_loop(): - pass - elif self.pool.active_tasks: - # pool loaded for integration test! + return + + if self.is_restart: + await self._load_pool_from_db() + self.pool.compute_runahead(force=True) + self.pool.release_runahead_tasks() + + await self._get_graph_loaders() + + while await self._main_loop(): + pass + while self.graph_loaders: + (self.graph_loaders.pop())() while await self._main_loop(): pass - else: - self._get_graph_loaders() - while self.graph_loaders: - (self.graph_loaders.pop())() - while await self._main_loop(): - pass async def run_scheduler(self) -> None: """Start the scheduler main loop.""" @@ -940,9 +908,14 @@ def _load_pool_from_point(self): LOG.info(msg) self.pool.load_from_point() - def _load_pool_from_db(self): + async def _load_pool_from_db(self): """Load task pool from DB, for a restart.""" LOG.info("LOADING DB FOR RESTART") + + self.task_job_mgr.task_remote_mgr.is_restart = True + self.task_job_mgr.task_remote_mgr.rsync_includes = ( + self.config.get_validated_rsync_includes()) + self.workflow_db_mgr.pri_dao.select_broadcast_states( self.broadcast_mgr.load_db_broadcast_states) self.broadcast_mgr.post_load_db_coerce() @@ -965,6 +938,48 @@ def _load_pool_from_db(self): if self.restored_stop_task_id is not None: self.pool.set_stop_task(self.restored_stop_task_id) + self.log_start() + + all_tasks = self.pool.get_tasks() + if not all_tasks: + # Restart with empty pool: only unfinished event handlers. + # This workflow completed before restart; wait for intervention. + # TODO - WHAT IF IT SHUT DOWN BETWEEN GRAPHS + with suppress(KeyError): + self.timers[self.EVENT_RESTART_TIMEOUT].reset() + self.is_restart_timeout_wait = True + LOG.warning( + "This workflow already ran to completion." + "\nTo make it continue, trigger new tasks" + " before the restart timeout." + ) + return + + self.restart_remote_init() + # Poll all pollable tasks + await commands.run_cmd(commands.poll_tasks(self, ['*/*'])) + # TODO - WHY DOESN'T '*/*' MATCH THE FOLLOWING? + await commands.run_cmd( + commands.poll_tasks(self, [f"{NOCYCLE_PT_ALPHA}/*"])) + await commands.run_cmd( + commands.poll_tasks(self, [f"{NOCYCLE_PT_OMEGA}/*"])) + + # If we shut down with manually triggered waiting tasks, + # submit them to run now. + # NOTE: this will run tasks that were triggered with + # the trigger "--on-resume" option, even if the workflow + # is restarted as paused. Option to be removed at 8.5.0. + pre_prep_tasks = [] + for itask in all_tasks: + if ( + itask.is_manual_submit + and itask.state(TASK_STATUS_WAITING) + ): + itask.waiting_on_job_prep = True + pre_prep_tasks.append(itask) + + self.start_job_submission(pre_prep_tasks) + def restart_remote_init(self): """Remote init for all submitted/running tasks in the pool.""" self.task_job_mgr.task_remote_mgr.is_restart = True @@ -1676,6 +1691,7 @@ def timeout_check(self): async def workflow_shutdown(self): """Determines if the workflow can be shutdown yet.""" + # from pudb import set_trace; set_trace() if self.pool.check_abort_on_task_fails(): self._set_stop(StopMode.AUTO_ON_TASK_FAILURE) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 17f1295c1c9..926475f8f1d 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -208,7 +208,7 @@ def _swap_out(self, itask): self.active_tasks_changed = True def load_nocycle_graph(self, seq): - """blah """ + """Load task pool from a no-cycle graph.""" LOG.info(f"LOADING {seq} GRAPH") flow_num = self.flow_mgr.get_flow_num(meta=f"original {seq} flow") self.runahead_limit_point = None @@ -284,7 +284,7 @@ def release_runahead_tasks(self): Return True if any tasks are released, else False. Call when RH limit changes. """ - if not self.active_tasks or not self.runahead_limit_point: + if not self.active_tasks: # (At start-up task pool might not exist yet) return False @@ -297,10 +297,10 @@ def release_runahead_tasks(self): for point, itask_id_map in self.active_tasks.items() for itask in itask_id_map.values() if ( - self.runahead_limit_point and - point <= self.runahead_limit_point - or str(point) in NOCYCLE_POINTS + or + (self.runahead_limit_point and + point <= self.runahead_limit_point) ) if itask.state.is_runahead ] diff --git a/tests/integration/utils/flow_tools.py b/tests/integration/utils/flow_tools.py index 790c5c9963f..56cd481a3f9 100644 --- a/tests/integration/utils/flow_tools.py +++ b/tests/integration/utils/flow_tools.py @@ -119,10 +119,10 @@ def _make_flow( return workflow_id -def _load_graph(sched): +async def _load_graph(sched): """Get scheduler to load the main graph.""" if sched.is_restart: - sched._load_pool_from_db() + await sched._load_pool_from_db() elif sched.options.starttask: sched._load_pool_from_tasks() else: @@ -170,7 +170,7 @@ async def _start_flow( # exception occurs in Scheduler try: await schd.start() - _load_graph(schd) + await _load_graph(schd) finally: # After this `yield`, the `with` block of the context manager # is executed: @@ -202,7 +202,7 @@ async def _run_flow( # exception occurs in Scheduler try: await schd.start() - _load_graph(schd) + await _load_graph(schd) # Do not await as we need to yield control to the main loop: task = asyncio.create_task(schd.run_scheduler()) finally: