From 6f7dd50a05dad9c3d8034eb44c9b16771c49d1fa Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Thu, 30 Nov 2023 01:23:03 +1300 Subject: [PATCH] Review fixes 2 Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- cylc/flow/cfgspec/workflow.py | 9 +++++---- cylc/flow/task_pool.py | 14 ++++++-------- cylc/flow/task_proxy.py | 11 +++-------- cylc/flow/xtrigger_mgr.py | 16 +++++++++++----- tests/functional/xtriggers/04-sequential.t | 14 +++++++------- 5 files changed, 32 insertions(+), 32 deletions(-) diff --git a/cylc/flow/cfgspec/workflow.py b/cylc/flow/cfgspec/workflow.py index 0af93a7e629..0ef8affebd1 100644 --- a/cylc/flow/cfgspec/workflow.py +++ b/cylc/flow/cfgspec/workflow.py @@ -803,11 +803,12 @@ def get_script_common_text(this: str, example: Optional[str] = None): ''') Conf('sequential xtriggers default', VDR.V_BOOLEAN, False, desc=''' - Set to ``True``, this allows for sequential spawning of associated - parentless tasks on xtrigger satisfaction. - Instead of out to the runahead limit (default: ``False``). + When set to ``True``, parentless tasks that trigger off xtriggers + will only spawn sequentially, i.e. on the satisfaction of the + xtriggers in order. Otherwise, these tasks will all spawn at the + same time up to the runahead limit. - This workflow wide default can be overridden by a reserved + This workflow-wide default can be overridden by a reserved keyword argument in the xtrigger function declaration and/or function (``sequential=True/False``). diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index d462ef2fba8..98a58ac7ff2 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -754,9 +754,10 @@ def _get_spawned_or_merged_task( def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None: """Spawn parentless task instances from point to runahead limit. - Sequentially checked xtriggers with spawn corresponding task out to the - next task with any xtrigger with the same behaviour, or to the runahead - limit (whichever occurs first). + Sequentially checked xtriggers will spawn the next occurrence of their + corresponding tasks. These tasks will keep spawning until they depend + on any unsatisfied xtrigger of the same sequential behavior, are no + longer parentless, and/or hit the runahead limit. """ if not flow_nums or point is None: @@ -806,10 +807,7 @@ def remove(self, itask, reason=""): msg += f" ({reason})" if itask.is_xtrigger_sequential: - with suppress(ValueError): - self.xtrigger_mgr.sequential_spawn_next.remove( - itask.identity - ) + self.xtrigger_mgr.sequential_spawn_next.discard(itask.identity) try: del self.hidden_pool[itask.point][itask.identity] @@ -1723,7 +1721,7 @@ def remove_tasks(self, items): """Remove tasks from the pool.""" itasks, _, bad_items = self.filter_task_proxies(items) for itask in itasks: - # Spawn next occurance of xtrigger sequential task. + # Spawn next occurrence of xtrigger sequential task. if itask.is_xtrigger_sequential: self.spawn_to_rh_limit( itask.tdef, diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index d9dd7481aa1..6bfd0a05ea2 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -263,16 +263,11 @@ def __init__( self.state = TaskState(tdef, self.point, status, is_held) # Set xtrigger checking type, which effects parentless spawning. - if ( + self.is_xtrigger_sequential = bool( sequential_xtrigger_labels and self.tdef.is_parentless(start_point) - and set(self.state.xtriggers.keys()).intersection( - sequential_xtrigger_labels - ) - ): - self.is_xtrigger_sequential = True - else: - self.is_xtrigger_sequential = False + and sequential_xtrigger_labels.intersection(self.state.xtriggers) + ) # Determine graph children of this task (for spawning). if data_mode: diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index a60eba83800..d27054323e9 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -21,7 +21,14 @@ import re from copy import deepcopy from time import time -from typing import Any, Dict, Optional, Tuple, TYPE_CHECKING +from typing import ( + Any, + Dict, + Optional, + Set, + Tuple, + TYPE_CHECKING +) from cylc.flow import LOG from cylc.flow.exceptions import XtriggerConfigError @@ -244,15 +251,15 @@ def __init__( self.active: list = [] # Clock labels, to avoid repeated string comparisons - self.wall_clock_labels: set = set() + self.wall_clock_labels: Set[str] = set() # Workflow wide default, used when not specified in xtrigger kwargs. self.sequential_xtriggers_default = False # Labels whose xtriggers are sequentially checked. - self.sequential_xtrigger_labels: set = set() + self.sequential_xtrigger_labels: Set[str] = set() # Gather parentless tasks whose xtrigger(s) have been satisfied # (these will be used to spawn the next occurance). - self.sequential_spawn_next: set = set() + self.sequential_spawn_next: Set[str] = set() self.workflow_run_dir = workflow_run_dir @@ -300,7 +307,6 @@ def check_xtrigger( * If the function module was not found. * If the function was not found in the xtrigger module. * If the function is not callable. - * If the function is not callable. * If any string template in the function context arguments are not present in the expected template values. * If the arguments do not match the function signature. diff --git a/tests/functional/xtriggers/04-sequential.t b/tests/functional/xtriggers/04-sequential.t index 6e101611eaa..3b55e234892 100644 --- a/tests/functional/xtriggers/04-sequential.t +++ b/tests/functional/xtriggers/04-sequential.t @@ -43,14 +43,14 @@ init_workflow "${TEST_NAME_BASE}" << '__FLOW_CONFIG__' ):PT1S [[graph]] R1 = """ -@clock_1 => a -b -""" + @clock_1 => a + b + """ +P1Y/P1Y = """ -@clock_2 => a -@clock_2 => b -@up_1 => c -""" + @clock_2 => a + @clock_2 => b + @up_1 => c + """ __FLOW_CONFIG__ run_ok "${TEST_NAME_BASE}-val" cylc validate "${WORKFLOW_NAME}"