diff --git a/changes.d/5738.feat.md b/changes.d/5738.feat.md new file mode 100644 index 00000000000..09ff84bec93 --- /dev/null +++ b/changes.d/5738.feat.md @@ -0,0 +1 @@ +Optionally spawn parentless xtriggered tasks sequentially - i.e., one at a time, after the previous xtrigger is satisfied, instead of all at once out to the runahead limit. The `wall_clock` xtrigger is now sequential by default. diff --git a/cylc/flow/cfgspec/workflow.py b/cylc/flow/cfgspec/workflow.py index 96288808e59..8ad315a7b2c 100644 --- a/cylc/flow/cfgspec/workflow.py +++ b/cylc/flow/cfgspec/workflow.py @@ -802,9 +802,21 @@ def get_script_common_text(this: str, example: Optional[str] = None): :ref:`SequentialTasks`. ''') + Conf('sequential xtriggers', VDR.V_BOOLEAN, False, + desc=''' + If ``True``, tasks that only depend on xtriggers will not spawn + until the xtrigger of previous (cycle point) instance is satisfied. + Otherwise, they will all spawn at once out to the runahead limit. + + This setting can be overridden by the reserved keyword argument + ``sequential`` in individual xtrigger declarations. + + One sequential xtrigger on a parentless task with multiple + xtriggers will cause sequential spawning. + ''') with Conf('xtriggers', desc=''' - This section is for *External Trigger* function declarations - - see :ref:`Section External Triggers`. + This section is for *External Trigger* function declarations - + see :ref:`Section External Triggers`. '''): Conf('', VDR.V_XTRIGGER, desc=''' Any user-defined event trigger function declarations and diff --git a/cylc/flow/config.py b/cylc/flow/config.py index f871531adeb..ad87315c4c1 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -1705,16 +1705,21 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, self.taskdefs[right].add_dependency(dependency, seq) validator = XtriggerNameValidator.validate - for label in self.cfg['scheduling']['xtriggers']: + xtrigs = self.cfg['scheduling']['xtriggers'] + for label in xtrigs: valid, msg = validator(label) if not valid: raise WorkflowConfigError( f'Invalid xtrigger name "{label}" - {msg}' ) + if self.xtrigger_mgr is not None: + self.xtrigger_mgr.sequential_xtriggers_default = ( + self.cfg['scheduling']['sequential xtriggers'] + ) for label in xtrig_labels: try: - xtrig = self.cfg['scheduling']['xtriggers'][label] + xtrig = xtrigs[label] except KeyError: if label != 'wall_clock': raise WorkflowConfigError(f"xtrigger not defined: {label}") diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index 7a59e76d7a4..da5119bb707 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -1192,7 +1192,10 @@ def generate_ghost_task( point, flow_nums, submit_num=0, - data_mode=True + data_mode=True, + sequential_xtrigger_labels=( + self.schd.xtrigger_mgr.sequential_xtrigger_labels + ), ) is_orphan = False diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index d630889a9d4..a07f6744cea 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -481,6 +481,7 @@ async def configure(self, params): self.config, self.workflow_db_mgr, self.task_events_mgr, + self.xtrigger_mgr, self.data_store_mgr, self.flow_mgr ) @@ -1748,6 +1749,9 @@ async def main_loop(self) -> None: if all(itask.is_ready_to_run()): self.pool.queue_task(itask) + if self.xtrigger_mgr.sequential_spawn_next: + self.pool.spawn_parentless_sequential_xtriggers() + if self.xtrigger_mgr.do_housekeeping: self.xtrigger_mgr.housekeep(self.pool.get_tasks()) diff --git a/cylc/flow/subprocctx.py b/cylc/flow/subprocctx.py index 84f1c68e70b..d76e34ce9a4 100644 --- a/cylc/flow/subprocctx.py +++ b/cylc/flow/subprocctx.py @@ -18,11 +18,39 @@ Coerce more value type from string (to time point, duration, xtriggers, etc.). """ +from inspect import Parameter import json from shlex import quote +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union from cylc.flow.wallclock import get_current_time_string +if TYPE_CHECKING: + from inspect import Signature + + +def add_kwarg_to_sig( + sig: 'Signature', arg_name: str, default: Any +) -> 'Signature': + """Return a new signature with a kwarg added.""" + # Note: added kwarg has to be before **kwargs ("variadic") in the signature + positional_or_keyword: List[Parameter] = [] + variadic: List[Parameter] = [] + for param in sig.parameters.values(): + if param.kind == Parameter.VAR_KEYWORD: + variadic.append(param) + else: + positional_or_keyword.append(param) + return sig.replace(parameters=[ + *positional_or_keyword, + Parameter( + arg_name, + kind=Parameter.KEYWORD_ONLY, + default=default, + ), + *variadic, + ]) + class SubProcContext: # noqa: SIM119 (not really relevant to this case) """Represent the context of an external command to run as a subprocess. @@ -115,23 +143,31 @@ class SubFuncContext(SubProcContext): Attributes: # See also parent class attributes. - .label (str): + .label: function label under [xtriggers] in flow.cylc - .func_name (str): + .func_name: function name - .func_args (list): + .func_args: function positional args - .func_kwargs (dict): + .func_kwargs: function keyword args - .intvl (float - seconds): - function call interval (how often to check the external trigger) - .ret_val (bool, dict) + .intvl: + function call interval in secs (how often to check the + external trigger) + .ret_val function return: (satisfied?, result to pass to trigger tasks) """ DEFAULT_INTVL = 10.0 - def __init__(self, label, func_name, func_args, func_kwargs, intvl=None): + def __init__( + self, + label: str, + func_name: str, + func_args: List[Any], + func_kwargs: Dict[str, Any], + intvl: Union[float, str] = DEFAULT_INTVL + ): """Initialize a function context.""" self.label = label self.func_name = func_name @@ -141,9 +177,12 @@ def __init__(self, label, func_name, func_args, func_kwargs, intvl=None): self.intvl = float(intvl) except (TypeError, ValueError): self.intvl = self.DEFAULT_INTVL - self.ret_val = (False, None) # (satisfied, broadcast) + self.ret_val: Tuple[ + bool, Optional[dict] + ] = (False, None) # (satisfied, broadcast) super(SubFuncContext, self).__init__( - 'xtrigger-func', cmd=[], shell=False) + 'xtrigger-func', cmd=[], shell=False + ) def update_command(self, workflow_run_dir): """Update the function wrap command after changes.""" diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 1e97d3c3034..2b214d70943 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -90,6 +90,7 @@ from cylc.flow.data_store_mgr import DataStoreMgr from cylc.flow.taskdef import TaskDef from cylc.flow.task_events_mgr import TaskEventsManager + from cylc.flow.xtrigger_mgr import XtriggerManager from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager from cylc.flow.flow_mgr import FlowMgr, FlowNums @@ -110,6 +111,7 @@ def __init__( config: 'WorkflowConfig', workflow_db_mgr: 'WorkflowDatabaseManager', task_events_mgr: 'TaskEventsManager', + xtrigger_mgr: 'XtriggerManager', data_store_mgr: 'DataStoreMgr', flow_mgr: 'FlowMgr' ) -> None: @@ -119,6 +121,7 @@ def __init__( self.workflow_db_mgr: 'WorkflowDatabaseManager' = workflow_db_mgr self.task_events_mgr: 'TaskEventsManager' = task_events_mgr self.task_events_mgr.spawn_func = self.spawn_on_output + self.xtrigger_mgr: 'XtriggerManager' = xtrigger_mgr self.data_store_mgr: 'DataStoreMgr' = data_store_mgr self.flow_mgr: 'FlowMgr' = flow_mgr @@ -267,7 +270,7 @@ def release_runahead_tasks(self): for itask in release_me: self.rh_release_and_queue(itask) - if itask.flow_nums: + if itask.flow_nums and not itask.is_xtrigger_sequential: self.spawn_to_rh_limit( itask.tdef, itask.tdef.next_point(itask.point), @@ -477,7 +480,10 @@ def load_db_task_pool_for_restart(self, row_idx, row): submit_num=submit_num, is_late=bool(is_late), flow_wait=bool(flow_wait), - is_manual_submit=bool(is_manual_submit) + is_manual_submit=bool(is_manual_submit), + sequential_xtrigger_labels=( + self.xtrigger_mgr.sequential_xtrigger_labels + ), ) except WorkflowConfigError: @@ -694,53 +700,99 @@ def rh_release_and_queue(self, itask) -> None: def get_or_spawn_task( self, point: 'PointBase', - name: str, + tdef: 'TaskDef', flow_nums: 'FlowNums', flow_wait: bool = False - ) -> Optional[TaskProxy]: + ) -> 'Tuple[Optional[TaskProxy], bool, bool]': """Return new or existing task point/name with merged flow_nums. + Returns: + tuple - (itask, is_in_pool, is_xtrig_sequential) + + itask: + The requested task proxy, or None if task does not + exist or cannot spawn. + is_in_pool: + Was the task found in a pool. + is_xtrig_sequential: + Is the next task occurrence spawned on xtrigger satisfaction, + or do all occurrence spawn out to the runahead limit. + It does not add a spawned task proxy to the pool. """ ntask = self._get_task_by_id( - Tokens(cycle=str(point), task=name).relative_id + Tokens(cycle=str(point), task=tdef.name).relative_id ) + is_in_pool = False + is_xtrig_sequential = False if ntask is None: # ntask does not exist: spawn it in the flow. - ntask = self.spawn_task(name, point, flow_nums, flow_wait) + ntask = self.spawn_task(tdef.name, point, flow_nums, flow_wait) + # if the task was found set xtrigger checking type. + # otherwise find the xtrigger type if it can't spawn + # for whatever reason. + if ntask is not None: + is_xtrig_sequential = ntask.is_xtrigger_sequential + elif any( + xtrig_label in self.xtrigger_mgr.sequential_xtrigger_labels + for sequence, xtrig_labels in tdef.xtrig_labels.items() + for xtrig_label in xtrig_labels + if sequence.is_valid(point) + ): + is_xtrig_sequential = True else: # ntask already exists (n=0): merge flows. + is_in_pool = True self.merge_flows(ntask, flow_nums) - return ntask # may be None + is_xtrig_sequential = ntask.is_xtrigger_sequential + # ntask may still be None + return ntask, is_in_pool, is_xtrig_sequential def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None: - """Spawn parentless task instances from point to runahead limit.""" + """Spawn parentless task instances from point to runahead limit. + + Sequentially checked xtriggers will spawn the next occurrence of their + corresponding tasks. These tasks will keep spawning until they depend + on any unsatisfied xtrigger of the same sequential behavior, are no + longer parentless, and/or hit the runahead limit. + + """ if not flow_nums or point is None: # Force-triggered no-flow task. # Or called with an invalid next_point. return if self.runahead_limit_point is None: self.compute_runahead() + + is_xtrig_sequential = False while point is not None and (point <= self.runahead_limit_point): if tdef.is_parentless(point): - ntask = self.get_or_spawn_task( - point, tdef.name, flow_nums + ntask, is_in_pool, is_xtrig_sequential = ( + self.get_or_spawn_task( + point, + tdef, + flow_nums + ) ) if ntask is not None: - self.add_to_pool(ntask) + if not is_in_pool: + self.add_to_pool(ntask) self.rh_release_and_queue(ntask) + if is_xtrig_sequential: + break point = tdef.next_point(point) # Once more for the runahead-limited task (don't release it). - self.spawn_if_parentless(tdef, point, flow_nums) + if not is_xtrig_sequential: + self.spawn_if_parentless(tdef, point, flow_nums) def spawn_if_parentless(self, tdef, point, flow_nums): """Spawn a task if parentless, regardless of runahead limit.""" if flow_nums and point is not None and tdef.is_parentless(point): - ntask = self.get_or_spawn_task( - point, tdef.name, flow_nums + ntask, is_in_pool, _ = self.get_or_spawn_task( + point, tdef, flow_nums ) - if ntask is not None: + if ntask is not None and not is_in_pool: self.add_to_pool(ntask) def remove(self, itask, reason=None): @@ -759,6 +811,13 @@ def remove(self, itask, reason=None): msg = "task completed" else: msg = f"removed ({reason})" + + if itask.is_xtrigger_sequential: + self.xtrigger_mgr.sequential_spawn_next.discard(itask.identity) + self.xtrigger_mgr.sequential_has_spawned_next.discard( + itask.identity + ) + try: del self.active_tasks[itask.point][itask.identity] except KeyError: @@ -799,7 +858,7 @@ def get_tasks_by_point(self) -> 'Dict[PointBase, List[TaskProxy]]': point_itasks[point] = list(itask_id_map.values()) return point_itasks - def get_task(self, point, name) -> Optional[TaskProxy]: + def get_task(self, point: 'PointBase', name: str) -> Optional[TaskProxy]: """Retrieve a task from the pool.""" rel_id = f'{point}/{name}' tasks = self.active_tasks.get(point) @@ -953,6 +1012,9 @@ def reload_taskdefs(self, config: 'WorkflowConfig') -> None: itask.point, itask.flow_nums, itask.state.status, + sequential_xtrigger_labels=( + self.xtrigger_mgr.sequential_xtrigger_labels + ), ) itask.copy_to_reload_successor( new_task, @@ -1665,7 +1727,10 @@ def _get_task_proxy_db_outputs( flow_wait=flow_wait, submit_num=submit_num, transient=transient, - is_manual_submit=is_manual_submit + is_manual_submit=is_manual_submit, + sequential_xtrigger_labels=( + self.xtrigger_mgr.sequential_xtrigger_labels + ), ) if itask is None: return None @@ -1913,6 +1978,22 @@ def remove_tasks(self, items): """Remove tasks from the pool (forced by command).""" itasks, _, bad_items = self.filter_task_proxies(items) for itask in itasks: + # Spawn next occurrence of xtrigger sequential task. + if ( + itask.is_xtrigger_sequential + and ( + itask.identity not in + self.xtrigger_mgr.sequential_has_spawned_next + ) + ): + self.xtrigger_mgr.sequential_has_spawned_next.add( + itask.identity + ) + self.spawn_to_rh_limit( + itask.tdef, + itask.tdef.next_point(itask.point), + itask.flow_nums + ) self.remove(itask, 'request') if self.compute_runahead(): self.release_runahead_tasks() @@ -1969,6 +2050,23 @@ def _force_trigger(self, itask): itask.tdef.next_point(itask.point), itask.flow_nums ) + # Task may be set running before xtrigger is satisfied, + # if so check/spawn if xtrigger sequential. + elif ( + itask.is_xtrigger_sequential + and ( + itask.identity not in + self.xtrigger_mgr.sequential_has_spawned_next + ) + ): + self.xtrigger_mgr.sequential_has_spawned_next.add( + itask.identity + ) + self.spawn_to_rh_limit( + itask.tdef, + itask.tdef.next_point(itask.point), + itask.flow_nums + ) else: # De-queue it to run now. self.task_queue_mgr.force_release_task(itask) @@ -2029,6 +2127,9 @@ def force_trigger_tasks( flow_nums, flow_wait=flow_wait, submit_num=submit_num, + sequential_xtrigger_labels=( + self.xtrigger_mgr.sequential_xtrigger_labels + ), ) if itask is None: continue @@ -2043,6 +2144,20 @@ def force_trigger_tasks( self.add_to_pool(itask) self._force_trigger(itask) + def spawn_parentless_sequential_xtriggers(self): + """Spawn successor(s) of parentless wall clock satisfied tasks.""" + while self.xtrigger_mgr.sequential_spawn_next: + taskid = self.xtrigger_mgr.sequential_spawn_next.pop() + self.xtrigger_mgr.sequential_has_spawned_next.add(taskid) + itask = self._get_task_by_id(taskid) + # Will spawn out to RH limit or next parentless clock trigger + # or non-parentless. + self.spawn_to_rh_limit( + itask.tdef, + itask.tdef.next_point(itask.point), + itask.flow_nums + ) + def clock_expire_tasks(self): """Expire any tasks past their clock-expiry time.""" for itask in self.get_tasks(): diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 43fa210592f..898017c8da1 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -164,6 +164,9 @@ class TaskProxy: .transient: This is a transient proxy - not to be added to the task pool, but used e.g. to spawn children, or to get task-specific information. + .is_xtrigger_sequential: + A flag used to determine whether this task needs to wait for + xtrigger satisfaction to spawn. Args: tdef: The definition object of this task. @@ -206,7 +209,8 @@ class TaskProxy: 'try_timers', 'waiting_on_job_prep', 'mode_settings', - 'transient' + 'transient', + 'is_xtrigger_sequential', ] def __init__( @@ -222,7 +226,8 @@ def __init__( is_manual_submit: bool = False, flow_wait: bool = False, data_mode: bool = False, - transient: bool = False + transient: bool = False, + sequential_xtrigger_labels: Optional[Set[str]] = None, ) -> None: self.tdef = tdef @@ -284,6 +289,13 @@ def __init__( self.state = TaskState(tdef, self.point, status, is_held) + # Set xtrigger checking type, which effects parentless spawning. + self.is_xtrigger_sequential = bool( + sequential_xtrigger_labels + and self.tdef.is_parentless(start_point) + and sequential_xtrigger_labels.intersection(self.state.xtriggers) + ) + # Determine graph children of this task (for spawning). if data_mode: self.graph_children = {} diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index b891165dbd3..7f53b061463 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -21,12 +21,20 @@ import re from copy import deepcopy from time import time -from typing import Any, Dict, Optional, Tuple, TYPE_CHECKING +from typing import ( + Any, + Dict, + Optional, + Set, + Tuple, + TYPE_CHECKING +) from cylc.flow import LOG from cylc.flow.exceptions import XtriggerConfigError import cylc.flow.flags from cylc.flow.hostuserutil import get_user +from cylc.flow.subprocctx import add_kwarg_to_sig from cylc.flow.subprocpool import get_xtrig_func from cylc.flow.xtriggers.wall_clock import _wall_clock @@ -186,6 +194,30 @@ class XtriggerManager: managed uniquely - i.e. many tasks depending on the same clock trigger (with same offset from cycle point) get satisfied by the same call. + Parentless tasks with xtrigger(s) are, by default, spawned out to the + runahead limit. This results in non-sequential, and potentially + unnecessary, checking out to this limit (and may introduce clutter to + user interfaces). An option to make this sequential is now available, + by changing the default for all xtriggers in a workflow, and a way to + override this default with a (reserved) keyword function argument + (i.e. "sequential=True/False"): + + # Example: + [scheduling] + sequential xtriggers = True + [[xtriggers]] + # "sequential=False" here overrides workflow and function default. + clock_0 = wall_clock(sequential=False) + workflow_x = workflow_state( + workflow=other, + point=%(task_cycle_point)s, + ):PT30S + [[graph]] + PT1H = ''' + @workflow_x => foo & bar # spawned on workflow_x satisfaction + @clock_0 => baz # baz spawned out to RH + ''' + Args: workflow: workflow name user: workflow owner @@ -217,6 +249,18 @@ def __init__( # Signatures of active functions (waiting on callback). self.active: list = [] + # Clock labels, to avoid repeated string comparisons + self.wall_clock_labels: Set[str] = set() + + # Workflow wide default, used when not specified in xtrigger kwargs. + self.sequential_xtriggers_default = False + # Labels whose xtriggers are sequentially checked. + self.sequential_xtrigger_labels: Set[str] = set() + # Gather parentless tasks whose xtrigger(s) have been satisfied + # (these will be used to spawn the next occurrence). + self.sequential_spawn_next: Set[str] = set() + self.sequential_has_spawned_next: Set[str] = set() + self.workflow_run_dir = workflow_run_dir # For function arg templating. @@ -286,10 +330,32 @@ def check_xtrigger( label, f"'{fname}' not callable in xtrigger module '{fname}'", ) - # Validate args and kwargs against the function signature + sig = signature(func) sig_str = fctx.get_signature() + + # Handle reserved 'sequential' kwarg: + sequential_param = sig.parameters.get('sequential', None) + if sequential_param: + if not isinstance(sequential_param.default, bool): + raise XtriggerConfigError( + label, + ( + f"xtrigger '{fname}' function definition contains " + "reserved argument 'sequential' that has no " + "boolean default" + ) + ) + fctx.func_kwargs.setdefault('sequential', sequential_param.default) + elif 'sequential' in fctx.func_kwargs: + # xtrig call marked as sequential; add 'sequential' arg to + # signature for validation + sig = add_kwarg_to_sig( + sig, 'sequential', fctx.func_kwargs['sequential'] + ) + + # Validate args and kwargs against the function signature try: - bound_args = signature(func).bind( + bound_args = sig.bind( *fctx.func_args, **fctx.func_kwargs ) except TypeError as exc: @@ -365,6 +431,13 @@ def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None: """ self.functx_map[label] = fctx + if fctx.func_kwargs.pop( + 'sequential', + self.sequential_xtriggers_default + ): + self.sequential_xtrigger_labels.add(label) + if fctx.func_name == "wall_clock": + self.wall_clock_labels.add(label) def mutate_trig(self, label, kwargs): self.functx_map[label].func_kwargs.update(kwargs) @@ -433,7 +506,7 @@ def get_xtrig_ctx( args = [] kwargs = {} - if ctx.func_name == "wall_clock": + if label in self.wall_clock_labels: if "trigger_time" in ctx.func_kwargs: # noqa: SIM401 (readabilty) # Internal (retry timer): trigger_time already set. kwargs["trigger_time"] = ctx.func_kwargs["trigger_time"] @@ -472,8 +545,8 @@ def call_xtriggers_async(self, itask: 'TaskProxy'): itask: task proxy to check. """ for label, sig, ctx, _ in self._get_xtrigs(itask, unsat_only=True): - # Special case: quick synchronous clock check: - if sig.startswith("wall_clock"): + if label in self.wall_clock_labels: + # Special case: quick synchronous clock check. if sig in self.sat_xtrig: # Already satisfied, just update the task itask.state.xtriggers[label] = True @@ -484,6 +557,8 @@ def call_xtriggers_async(self, itask: 'TaskProxy'): self.data_store_mgr.delta_task_xtrigger(sig, True) self.workflow_db_mgr.put_xtriggers({sig: {}}) LOG.info('xtrigger satisfied: %s = %s', label, sig) + if self.all_task_seq_xtriggers_satisfied(itask): + self.sequential_spawn_next.add(itask.identity) self.do_housekeeping = True continue # General case: potentially slow asynchronous function call. @@ -502,6 +577,8 @@ def call_xtriggers_async(self, itask: 'TaskProxy'): [itask.tdef.name], xtrigger_env ) + if self.all_task_seq_xtriggers_satisfied(itask): + self.sequential_spawn_next.add(itask.identity) continue # Call the function to check the unsatisfied xtrigger. @@ -533,6 +610,14 @@ def housekeep(self, itasks): del self.sat_xtrig[sig] self.do_housekeeping = False + def all_task_seq_xtriggers_satisfied(self, itask: 'TaskProxy') -> bool: + """Check if all sequential xtriggers are satisfied for a task.""" + return itask.is_xtrigger_sequential and all( + itask.state.xtriggers[label] + for label in itask.state.xtriggers + if label in self.sequential_xtrigger_labels + ) + def callback(self, ctx: 'SubFuncContext'): """Callback for asynchronous xtrigger functions. diff --git a/cylc/flow/xtriggers/wall_clock.py b/cylc/flow/xtriggers/wall_clock.py index d5d1a009154..91ba49a8961 100644 --- a/cylc/flow/xtriggers/wall_clock.py +++ b/cylc/flow/xtriggers/wall_clock.py @@ -22,7 +22,7 @@ from cylc.flow.exceptions import WorkflowConfigError -def wall_clock(offset: str = 'PT0S'): +def wall_clock(offset: str = 'PT0S', sequential: bool = True): """Trigger at a specific real "wall clock" time relative to the cycle point in the graph. @@ -48,6 +48,8 @@ def _wall_clock(trigger_time: int) -> bool: Args: trigger_time: Trigger time as seconds since Unix epoch. + sequential (bool): + Used by the workflow to flag corresponding xtriggers as sequential. """ return time() > trigger_time diff --git a/tests/functional/cylc-config/00-simple/section1.stdout b/tests/functional/cylc-config/00-simple/section1.stdout index 972b7a55606..f5ff23b77ae 100644 --- a/tests/functional/cylc-config/00-simple/section1.stdout +++ b/tests/functional/cylc-config/00-simple/section1.stdout @@ -6,6 +6,7 @@ hold after cycle point = stop after cycle point = cycling mode = integer runahead limit = P4 +sequential xtriggers = False [[queues]] [[[default]]] limit = 100 diff --git a/tests/functional/xtriggers/04-sequential.t b/tests/functional/xtriggers/04-sequential.t new file mode 100644 index 00000000000..211aa47277f --- /dev/null +++ b/tests/functional/xtriggers/04-sequential.t @@ -0,0 +1,110 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +# Test xtrigger sequential spawning - +# + +. "$(dirname "$0")/test_header" + +set_test_number 7 + +# Test workflow uses built-in 'echo' xtrigger. +init_workflow "${TEST_NAME_BASE}" << '__FLOW_CONFIG__' +[scheduler] + cycle point format = %Y + allow implicit tasks = True +[scheduling] + initial cycle point = 3000 + runahead limit = P5 + sequential xtriggers = True + [[xtriggers]] + clock_1 = wall_clock(offset=P2Y, sequential=False) + clock_2 = wall_clock() + up_1 = workflow_state(\ + workflow=%(workflow)s, \ + task=b, \ + point=%(point)s, \ + offset=-P1Y, \ + sequential=False \ + ):PT1S + [[graph]] + R1 = """ + @clock_1 => a + b + """ + +P1Y/P1Y = """ + @clock_2 => a + @clock_2 => b + @up_1 => c + """ +__FLOW_CONFIG__ + +run_ok "${TEST_NAME_BASE}-val" cylc validate "${WORKFLOW_NAME}" + +# Run workflow; it will stall waiting on the never-satisfied xtriggers. +cylc play "${WORKFLOW_NAME}" + +poll_grep_workflow_log -E '3001/c/.* => succeeded' + +cylc stop --max-polls=10 --interval=2 "${WORKFLOW_NAME}" + +cylc play "${WORKFLOW_NAME}" + +cylc show "${WORKFLOW_NAME}//3001/a" | grep -E 'state: ' > 3001.a.log +cylc show "${WORKFLOW_NAME}//3002/a" 2>&1 >/dev/null \ + | grep -E 'No matching' > 3002.a.log + +# 3001/a should be spawned at both 3000/3001. +cmp_ok 3001.a.log - <<__END__ +state: waiting +__END__ +# 3002/a should not exist. +cmp_ok 3002.a.log - <<__END__ +No matching active tasks found: 3002/a +__END__ + +cylc reload "${WORKFLOW_NAME}" + +cylc remove "${WORKFLOW_NAME}//3001/b" + +cylc show "${WORKFLOW_NAME}//3002/b" | grep -E 'state: ' > 3002.b.log +cylc show "${WORKFLOW_NAME}//3003/b" 2>&1 >/dev/null \ + | grep -E 'No matching' > 3003.b.log + +# 3002/b should be only at 3002. +cmp_ok 3002.b.log - <<__END__ +state: waiting +__END__ +cmp_ok 3003.b.log - <<__END__ +No matching active tasks found: 3003/b +__END__ + +cylc show "${WORKFLOW_NAME}//3002/c" | grep -E 'state: ' > 3002.c.log +cylc show "${WORKFLOW_NAME}//3005/c" | grep -E 'state: ' > 3005.c.log + +# c should be from 3002-3005. +cmp_ok 3002.c.log - <<__END__ +state: waiting +__END__ +cmp_ok 3005.c.log - <<__END__ +state: waiting +__END__ + + +cylc stop --now --max-polls=10 --interval=2 "${WORKFLOW_NAME}" +purge +exit diff --git a/tests/integration/test_config.py b/tests/integration/test_config.py index 581ec4d83fb..0a186aa41bd 100644 --- a/tests/integration/test_config.py +++ b/tests/integration/test_config.py @@ -90,9 +90,6 @@ def test_validate_implicit_task_name( are blacklisted get caught and raise errors. """ id_ = flow({ - 'scheduler': { - 'allow implicit tasks': 'True' - }, 'scheduling': { 'graph': { 'R1': task_name @@ -189,9 +186,6 @@ def test_no_graph(flow, validate): def test_parse_special_tasks_invalid(flow, validate, section): """It should fail for invalid "special tasks".""" id_ = flow({ - 'scheduler': { - 'allow implicit tasks': 'True', - }, 'scheduling': { 'initial cycle point': 'now', 'special tasks': { @@ -211,9 +205,6 @@ def test_parse_special_tasks_invalid(flow, validate, section): def test_parse_special_tasks_interval(flow, validate): """It should fail for invalid durations in clock-triggers.""" id_ = flow({ - 'scheduler': { - 'allow implicit tasks': 'True', - }, 'scheduling': { 'initial cycle point': 'now', 'special tasks': { @@ -359,7 +350,6 @@ def test_xtrig_validation_wall_clock( https://github.com/cylc/cylc-flow/issues/5448 """ id_ = flow({ - 'scheduler': {'allow implicit tasks': True}, 'scheduling': { 'initial cycle point': '1012', 'xtriggers': {'myxt': 'wall_clock(offset=PT7MH)'}, @@ -378,7 +368,6 @@ def test_xtrig_implicit_wall_clock(flow: Fixture, validate: Fixture): xtrigger definition. """ wid = flow({ - 'scheduler': {'allow implicit tasks': True}, 'scheduling': { 'initial cycle point': '2024', 'graph': {'R1': '@wall_clock => foo'}, @@ -396,7 +385,6 @@ def test_xtrig_validation_echo( https://github.com/cylc/cylc-flow/issues/5448 """ id_ = flow({ - 'scheduler': {'allow implicit tasks': True}, 'scheduling': { 'xtriggers': {'myxt': 'echo()'}, 'graph': {'R1': '@myxt => foo'}, @@ -418,7 +406,6 @@ def test_xtrig_validation_xrandom( https://github.com/cylc/cylc-flow/issues/5448 """ id_ = flow({ - 'scheduler': {'allow implicit tasks': True}, 'scheduling': { 'xtriggers': {'myxt': 'xrandom(200)'}, 'graph': {'R1': '@myxt => foo'}, @@ -459,7 +446,6 @@ def kustom_validate(args): ) id_ = flow({ - 'scheduler': {'allow implicit tasks': True}, 'scheduling': { 'initial cycle point': '1012', 'xtriggers': {'myxt': 'kustom_xt(feature=42)'}, @@ -490,7 +476,6 @@ def test_xtrig_signature_validation( ): """Test automatic xtrigger function signature validation.""" id_ = flow({ - 'scheduler': {'allow implicit tasks': True}, 'scheduling': { 'initial cycle point': '2024', 'xtriggers': {'myxt': xtrig_call}, diff --git a/tests/integration/test_sequential_xtriggers.py b/tests/integration/test_sequential_xtriggers.py new file mode 100644 index 00000000000..cbe0051d084 --- /dev/null +++ b/tests/integration/test_sequential_xtriggers.py @@ -0,0 +1,250 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +# mypy: disable-error-code=union-attr + +"""Test interactions with sequential xtriggers.""" + +from unittest.mock import patch +import pytest +from cylc.flow.cycling.integer import IntegerPoint + +from cylc.flow.cycling.iso8601 import ISO8601Point +from cylc.flow.exceptions import XtriggerConfigError +from cylc.flow.scheduler import Scheduler + + +def list_cycles(schd: Scheduler): + """List the task instance cycle points present in the pool.""" + return sorted(itask.tokens['cycle'] for itask in schd.pool.get_tasks()) + + +@pytest.fixture() +def sequential(flow, scheduler): + id_ = flow({ + 'scheduler': { + 'cycle point format': 'CCYY', + }, + 'scheduling': { + 'runahead limit': 'P2', + 'initial cycle point': '2000', + 'graph': { + 'P1Y': '@wall_clock => foo', + } + } + }) + return scheduler(id_) + + +async def test_remove(sequential, start): + """It should spawn the next instance when a task is removed. + + Ensure that removing a task with a sequential xtrigger does not break the + chain causing future instances to be removed from the workflow. + """ + async with start(sequential): + # the scheduler starts with one task in the pool + assert list_cycles(sequential) == ['2000'] + + # it sequentially spawns out to the runahead limit + for year in range(2000, 2010): + foo = sequential.pool.get_task(ISO8601Point(f'{year}'), 'foo') + if foo.state(is_runahead=True): + break + sequential.xtrigger_mgr.call_xtriggers_async(foo) + sequential.pool.spawn_parentless_sequential_xtriggers() + assert list_cycles(sequential) == [ + '2000', + '2001', + '2002', + '2003', + ] + + # remove all tasks in the pool + sequential.pool.remove_tasks(['*']) + + # the next cycle should be automatically spawned + assert list_cycles(sequential) == ['2004'] + + # NOTE: You won't spot this issue in a functional test because the + # re-spawned tasks are detected as completed and automatically removed. + # So ATM not dangerous, but potentially inefficient. + + +async def test_trigger(sequential, start): + """It should spawn its next instance if triggered ahead of time. + + If you manually trigger a sequentially spawned task before its xtriggers + have become satisfied, then the sequential spawning chain is broken. + + The task pool should defend against this to ensure that triggering a task + doesn't cancel it's future instances. + """ + async with start(sequential): + assert list_cycles(sequential) == ['2000'] + + foo = sequential.pool.get_task(ISO8601Point('2000'), 'foo') + sequential.pool.force_trigger_tasks([foo.identity], {1}) + foo.state_reset('succeeded') + sequential.pool.spawn_on_output(foo, 'succeeded') + + assert list_cycles(sequential) == ['2000', '2001'] + + +async def test_reload(sequential, start): + """It should set the is_xtrigger_sequential flag on reload. + + TODO: test that changes to the sequential status in the config get picked + up on reload + """ + async with start(sequential): + # the task should be marked as sequential + pre_reload = sequential.pool.get_task(ISO8601Point('2000'), 'foo') + assert pre_reload.is_xtrigger_sequential is True + + # reload the workflow + sequential.pool.reload_taskdefs(sequential.config) + + # the original task proxy should have been replaced + post_reload = sequential.pool.get_task(ISO8601Point('2000'), 'foo') + assert id(pre_reload) != id(post_reload) + + # the new task should be marked as sequential + assert post_reload.is_xtrigger_sequential is True + + +@pytest.mark.parametrize('is_sequential', [True, False]) +@pytest.mark.parametrize('xtrig_def', [ + 'wall_clock(sequential={})', + 'wall_clock(PT1H, sequential={})', + 'xrandom(1, 1, sequential={})', +]) +async def test_sequential_arg_ok( + flow, scheduler, start, xtrig_def: str, is_sequential: bool +): + """Test passing the sequential argument to xtriggers.""" + wid = flow({ + 'scheduler': { + 'cycle point format': 'CCYY', + }, + 'scheduling': { + 'initial cycle point': '2000', + 'runahead limit': 'P1', + 'xtriggers': { + 'myxt': xtrig_def.format(is_sequential), + }, + 'graph': { + 'P1Y': '@myxt => foo', + } + } + }) + schd: Scheduler = scheduler(wid) + expected_num_cycles = 1 if is_sequential else 3 + async with start(schd): + itask = schd.pool.get_task(ISO8601Point('2000'), 'foo') + assert itask.is_xtrigger_sequential is is_sequential + assert len(list_cycles(schd)) == expected_num_cycles + + +def test_sequential_arg_bad( + flow, validate +): + """Test validation of 'sequential' arg for custom xtriggers""" + wid = flow({ + 'scheduling': { + 'xtriggers': { + 'myxt': 'custom_xt(42)' + }, + 'graph': { + 'R1': '@myxt => foo' + } + } + }) + + def xtrig1(x, sequential): + """This uses 'sequential' without a default value""" + return True + + def xtrig2(x, sequential='True'): + """This uses 'sequential' with a default of wrong type""" + return True + + for xtrig in (xtrig1, xtrig2): + with patch( + 'cylc.flow.xtrigger_mgr.get_xtrig_func', + return_value=xtrig + ): + with pytest.raises(XtriggerConfigError) as excinfo: + validate(wid) + assert ( + "reserved argument 'sequential' that has no boolean default" + ) in str(excinfo.value) + + +@pytest.mark.parametrize('is_sequential', [True, False]) +async def test_any_sequential(flow, scheduler, start, is_sequential: bool): + """Test that a task is marked as sequential if any of its xtriggers are.""" + wid = flow({ + 'scheduling': { + 'xtriggers': { + 'xt1': 'custom_xt()', + 'xt2': f'custom_xt(sequential={is_sequential})', + 'xt3': 'custom_xt(sequential=False)', + }, + 'graph': { + 'R1': '@xt1 & @xt2 & @xt3 => foo', + } + } + }) + + with patch( + 'cylc.flow.xtrigger_mgr.get_xtrig_func', + return_value=lambda *a, **k: True + ): + schd: Scheduler = scheduler(wid) + async with start(schd): + itask = schd.pool.get_task(IntegerPoint('1'), 'foo') + assert itask.is_xtrigger_sequential is is_sequential + + +async def test_override(flow, scheduler, start): + """Test that the 'sequential=False' arg can override a default of True.""" + wid = flow({ + 'scheduling': { + 'sequential xtriggers': True, + 'xtriggers': { + 'xt1': 'custom_xt()', + 'xt2': 'custom_xt(sequential=False)', + }, + 'graph': { + 'R1': ''' + @xt1 => foo + @xt2 => bar + ''', + } + } + }) + + with patch( + 'cylc.flow.xtrigger_mgr.get_xtrig_func', + return_value=lambda *a, **k: True + ): + schd: Scheduler = scheduler(wid) + async with start(schd): + foo = schd.pool.get_task(IntegerPoint('1'), 'foo') + assert foo.is_xtrigger_sequential is True + bar = schd.pool.get_task(IntegerPoint('1'), 'bar') + assert bar.is_xtrigger_sequential is False diff --git a/tests/unit/test_xtrigger_mgr.py b/tests/unit/test_xtrigger_mgr.py index 78f8fe6d969..3cfee363d15 100644 --- a/tests/unit/test_xtrigger_mgr.py +++ b/tests/unit/test_xtrigger_mgr.py @@ -24,7 +24,7 @@ from cylc.flow.subprocctx import SubFuncContext from cylc.flow.task_proxy import TaskProxy from cylc.flow.taskdef import TaskDef -from cylc.flow.xtrigger_mgr import RE_STR_TMPL +from cylc.flow.xtrigger_mgr import RE_STR_TMPL, XtriggerManager def test_constructor(xtrigger_mgr): @@ -68,7 +68,7 @@ def test_add_xtrigger_with_params(xtrigger_mgr): assert xtrig == xtrigger_mgr.functx_map["xtrig"] -def test_check_xtrigger_with_unknown_params(xtrigger_mgr): +def test_check_xtrigger_with_unknown_params(): """Test for adding an xtrigger with an unknown parameter. The XTriggerManager contains a list of specific parameters that are @@ -90,10 +90,12 @@ def test_check_xtrigger_with_unknown_params(xtrigger_mgr): XtriggerConfigError, match="Illegal template in xtrigger: what_is_this" ): - xtrigger_mgr.check_xtrigger("xtrig", xtrig, 'fdir') + XtriggerManager.check_xtrigger("xtrig", xtrig, 'fdir') -def test_check_xtrigger_with_deprecated_params(xtrigger_mgr, caplog): +def test_check_xtrigger_with_deprecated_params( + caplog: pytest.LogCaptureFixture +): """It should flag deprecated template variables.""" xtrig = SubFuncContext( label="echo", @@ -102,7 +104,7 @@ def test_check_xtrigger_with_deprecated_params(xtrigger_mgr, caplog): func_kwargs={"succeed": True} ) caplog.set_level(logging.WARNING, CYLC_LOG) - xtrigger_mgr.check_xtrigger("xtrig", xtrig, 'fdir') + XtriggerManager.check_xtrigger("xtrig", xtrig, 'fdir') assert caplog.messages == [ 'Xtrigger "xtrig" uses deprecated template variables: suite_name' ]