diff --git a/cylc/flow/subprocctx.py b/cylc/flow/subprocctx.py index 84f1c68e70b..d76e34ce9a4 100644 --- a/cylc/flow/subprocctx.py +++ b/cylc/flow/subprocctx.py @@ -18,11 +18,39 @@ Coerce more value type from string (to time point, duration, xtriggers, etc.). """ +from inspect import Parameter import json from shlex import quote +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union from cylc.flow.wallclock import get_current_time_string +if TYPE_CHECKING: + from inspect import Signature + + +def add_kwarg_to_sig( + sig: 'Signature', arg_name: str, default: Any +) -> 'Signature': + """Return a new signature with a kwarg added.""" + # Note: added kwarg has to be before **kwargs ("variadic") in the signature + positional_or_keyword: List[Parameter] = [] + variadic: List[Parameter] = [] + for param in sig.parameters.values(): + if param.kind == Parameter.VAR_KEYWORD: + variadic.append(param) + else: + positional_or_keyword.append(param) + return sig.replace(parameters=[ + *positional_or_keyword, + Parameter( + arg_name, + kind=Parameter.KEYWORD_ONLY, + default=default, + ), + *variadic, + ]) + class SubProcContext: # noqa: SIM119 (not really relevant to this case) """Represent the context of an external command to run as a subprocess. @@ -115,23 +143,31 @@ class SubFuncContext(SubProcContext): Attributes: # See also parent class attributes. - .label (str): + .label: function label under [xtriggers] in flow.cylc - .func_name (str): + .func_name: function name - .func_args (list): + .func_args: function positional args - .func_kwargs (dict): + .func_kwargs: function keyword args - .intvl (float - seconds): - function call interval (how often to check the external trigger) - .ret_val (bool, dict) + .intvl: + function call interval in secs (how often to check the + external trigger) + .ret_val function return: (satisfied?, result to pass to trigger tasks) """ DEFAULT_INTVL = 10.0 - def __init__(self, label, func_name, func_args, func_kwargs, intvl=None): + def __init__( + self, + label: str, + func_name: str, + func_args: List[Any], + func_kwargs: Dict[str, Any], + intvl: Union[float, str] = DEFAULT_INTVL + ): """Initialize a function context.""" self.label = label self.func_name = func_name @@ -141,9 +177,12 @@ def __init__(self, label, func_name, func_args, func_kwargs, intvl=None): self.intvl = float(intvl) except (TypeError, ValueError): self.intvl = self.DEFAULT_INTVL - self.ret_val = (False, None) # (satisfied, broadcast) + self.ret_val: Tuple[ + bool, Optional[dict] + ] = (False, None) # (satisfied, broadcast) super(SubFuncContext, self).__init__( - 'xtrigger-func', cmd=[], shell=False) + 'xtrigger-func', cmd=[], shell=False + ) def update_command(self, workflow_run_dir): """Update the function wrap command after changes.""" diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 970320f5e43..ed28ab2613a 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -884,7 +884,7 @@ def get_tasks_by_point(self) -> 'Dict[PointBase, List[TaskProxy]]': return point_itasks - def get_task(self, point, name) -> Optional[TaskProxy]: + def get_task(self, point: 'PointBase', name: str) -> Optional[TaskProxy]: """Retrieve a task from the pool.""" rel_id = f'{point}/{name}' for pool in (self.main_pool, self.hidden_pool): @@ -1747,7 +1747,7 @@ def remove_tasks(self, items): def force_trigger_tasks( self, items: Iterable[str], - flow: List[str], + flow: List[Union[str, int]], flow_wait: bool = False, flow_descr: Optional[str] = None ) -> int: diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index 9df985744a5..ea4b546dae6 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -34,6 +34,7 @@ from cylc.flow.exceptions import XtriggerConfigError import cylc.flow.flags from cylc.flow.hostuserutil import get_user +from cylc.flow.subprocctx import add_kwarg_to_sig from cylc.flow.subprocpool import get_xtrig_func from cylc.flow.xtriggers.wall_clock import _wall_clock @@ -352,10 +353,32 @@ def check_xtrigger( x_argspec.args.index('sequential') ] - # Validate args and kwargs against the function signature + sig = signature(func) sig_str = fctx.get_signature() + + # Handle reserved 'sequential' kwarg: + sequential_param = sig.parameters.get('sequential', None) + if sequential_param: + if not isinstance(sequential_param.default, bool): + raise XtriggerConfigError( + label, + ( + f"xtrigger '{fname}' function definition contains " + "reserved argument 'sequential' that has no " + "boolean default" + ) + ) + fctx.func_kwargs.setdefault('sequential', sequential_param.default) + elif 'sequential' in fctx.func_kwargs: + # xtrig call marked as sequential; add 'sequential' arg to + # signature for validation + sig = add_kwarg_to_sig( + sig, 'sequential', fctx.func_kwargs['sequential'] + ) + + # Validate args and kwargs against the function signature try: - bound_args = signature(func).bind( + bound_args = sig.bind( *fctx.func_args, **fctx.func_kwargs ) except TypeError as exc: diff --git a/tests/integration/test_config.py b/tests/integration/test_config.py index 90db5b2bb11..72ec69eda16 100644 --- a/tests/integration/test_config.py +++ b/tests/integration/test_config.py @@ -90,9 +90,6 @@ def test_validate_implicit_task_name( are blacklisted get caught and raise errors. """ id_ = flow({ - 'scheduler': { - 'allow implicit tasks': 'True' - }, 'scheduling': { 'graph': { 'R1': task_name @@ -189,9 +186,6 @@ def test_no_graph(flow, validate): def test_parse_special_tasks_invalid(flow, validate, section): """It should fail for invalid "special tasks".""" id_ = flow({ - 'scheduler': { - 'allow implicit tasks': 'True', - }, 'scheduling': { 'initial cycle point': 'now', 'special tasks': { @@ -211,9 +205,6 @@ def test_parse_special_tasks_invalid(flow, validate, section): def test_parse_special_tasks_interval(flow, validate): """It should fail for invalid durations in clock-triggers.""" id_ = flow({ - 'scheduler': { - 'allow implicit tasks': 'True', - }, 'scheduling': { 'initial cycle point': 'now', 'special tasks': { @@ -359,7 +350,6 @@ def test_xtrig_validation_wall_clock( https://github.com/cylc/cylc-flow/issues/5448 """ id_ = flow({ - 'scheduler': {'allow implicit tasks': True}, 'scheduling': { 'initial cycle point': '1012', 'xtriggers': {'myxt': 'wall_clock(offset=PT7MH)'}, @@ -378,7 +368,6 @@ def test_xtrig_implicit_wall_clock(flow: Fixture, validate: Fixture): xtrigger definition. """ wid = flow({ - 'scheduler': {'allow implicit tasks': True}, 'scheduling': { 'initial cycle point': '2024', 'graph': {'R1': '@wall_clock => foo'}, @@ -396,7 +385,6 @@ def test_xtrig_validation_echo( https://github.com/cylc/cylc-flow/issues/5448 """ id_ = flow({ - 'scheduler': {'allow implicit tasks': True}, 'scheduling': { 'xtriggers': {'myxt': 'echo()'}, 'graph': {'R1': '@myxt => foo'}, @@ -418,7 +406,6 @@ def test_xtrig_validation_xrandom( https://github.com/cylc/cylc-flow/issues/5448 """ id_ = flow({ - 'scheduler': {'allow implicit tasks': True}, 'scheduling': { 'xtriggers': {'myxt': 'xrandom(200)'}, 'graph': {'R1': '@myxt => foo'}, @@ -459,7 +446,6 @@ def kustom_validate(args): ) id_ = flow({ - 'scheduler': {'allow implicit tasks': True}, 'scheduling': { 'initial cycle point': '1012', 'xtriggers': {'myxt': 'kustom_xt(feature=42)'}, @@ -490,7 +476,6 @@ def test_xtrig_signature_validation( ): """Test automatic xtrigger function signature validation.""" id_ = flow({ - 'scheduler': {'allow implicit tasks': True}, 'scheduling': { 'initial cycle point': '2024', 'xtriggers': {'myxt': xtrig_call}, diff --git a/tests/integration/test_sequential_xtriggers.py b/tests/integration/test_sequential_xtriggers.py index 051227eabee..0d39fa5e95c 100644 --- a/tests/integration/test_sequential_xtriggers.py +++ b/tests/integration/test_sequential_xtriggers.py @@ -14,18 +14,28 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +# mypy: disable-error-code=union-attr + """Test interactions with sequential xtriggers.""" +from unittest.mock import patch import pytest +from cylc.flow.cycling.integer import IntegerPoint from cylc.flow.cycling.iso8601 import ISO8601Point +from cylc.flow.exceptions import XtriggerConfigError +from cylc.flow.scheduler import Scheduler + + +def list_cycles(schd: Scheduler): + """List the task instance cycle points present in the pool.""" + return sorted(itask.tokens['cycle'] for itask in schd.pool.get_all_tasks()) @pytest.fixture() def sequential(flow, scheduler): id_ = flow({ 'scheduler': { - 'allow implicit tasks': 'True', 'cycle point format': 'CCYY', }, 'scheduling': { @@ -36,17 +46,7 @@ def sequential(flow, scheduler): } } }) - - sequential = scheduler(id_) - - def list_tasks(): - """List the task instance cycle points present in the pool.""" - nonlocal sequential - return sorted(itask.tokens['cycle'] for itask in sequential.pool.get_all_tasks()) - - sequential.list_tasks = list_tasks - - return sequential + return scheduler(id_) async def test_remove(sequential, start): @@ -57,7 +57,7 @@ async def test_remove(sequential, start): """ async with start(sequential): # the scheduler starts with one task in the pool - assert sequential.list_tasks() == ['2000'] + assert list_cycles(sequential) == ['2000'] # it sequentially spawns out to the runahead limit for year in range(2000, 2010): @@ -66,7 +66,7 @@ async def test_remove(sequential, start): break sequential.xtrigger_mgr.call_xtriggers_async(foo) sequential.pool.spawn_parentless_sequential_xtriggers() - assert sequential.list_tasks() == [ + assert list_cycles(sequential) == [ '2000', '2001', '2002', @@ -77,7 +77,7 @@ async def test_remove(sequential, start): sequential.pool.remove_tasks(['*']) # the next cycle should be automatically spawned - assert sequential.list_tasks() == ['2004'] + assert list_cycles(sequential) == ['2004'] # NOTE: You won't spot this issue in a functional test because the # re-spawned tasks are detected as completed and automatically removed. @@ -94,14 +94,14 @@ async def test_trigger(sequential, start): doesn't cancel it's future instances. """ async with start(sequential): - assert sequential.list_tasks() == ['2000'] + assert list_cycles(sequential) == ['2000'] foo = sequential.pool.get_task(ISO8601Point('2000'), 'foo') sequential.pool.force_trigger_tasks([foo.identity], {1}) foo.state_reset('succeeded') sequential.pool.spawn_on_output(foo, 'succeeded') - assert sequential.list_tasks() == ['2001'] + assert list_cycles(sequential) == ['2001'] async def test_reload(sequential, start): @@ -126,13 +126,125 @@ async def test_reload(sequential, start): assert post_reload.is_xtrigger_sequential is True -# TODO: test that a task is marked as sequential if any of its xtriggers are -# sequential (as opposed to all)? +@pytest.mark.parametrize('is_sequential', [True, False]) +@pytest.mark.parametrize('xtrig_def', [ + 'wall_clock(sequential={})', + 'wall_clock(PT1H, sequential={})', + 'xrandom(1, 1, sequential={})', +]) +async def test_sequential_arg_ok( + flow, scheduler, start, xtrig_def: str, is_sequential: bool +): + """Test passing the sequential argument to xtriggers.""" + wid = flow({ + 'scheduler': { + 'cycle point format': 'CCYY', + }, + 'scheduling': { + 'initial cycle point': '2000', + 'runahead limit': 'P1', + 'xtriggers': { + 'myxt': xtrig_def.format(is_sequential), + }, + 'graph': { + 'P1Y': '@myxt => foo', + } + } + }) + schd: Scheduler = scheduler(wid) + expected_num_cycles = 1 if is_sequential else 3 + async with start(schd): + itask = schd.pool.get_task(ISO8601Point('2000'), 'foo') + assert itask.is_xtrigger_sequential is is_sequential + assert len(list_cycles(schd)) == expected_num_cycles + + +def test_sequential_arg_bad( + flow, validate +): + """Test validation of 'sequential' arg for custom xtriggers""" + wid = flow({ + 'scheduling': { + 'xtriggers': { + 'myxt': 'custom_xt(42)' + }, + 'graph': { + 'R1': '@myxt => foo' + } + } + }) + + def xtrig1(x, sequential): + """This uses 'sequential' without a default value""" + return True + + def xtrig2(x, sequential='True'): + """This uses 'sequential' with a default of wrong type""" + return True + + for xtrig in (xtrig1, xtrig2): + with patch( + 'cylc.flow.xtrigger_mgr.get_xtrig_func', + return_value=xtrig + ): + with pytest.raises(XtriggerConfigError) as excinfo: + validate(wid) + assert ( + "reserved argument 'sequential' that has no boolean default" + ) in str(excinfo.value) + + +@pytest.mark.parametrize('is_sequential', [True, False]) +async def test_any_sequential(flow, scheduler, start, is_sequential: bool): + """Test that a task is marked as sequential if any of its xtriggers are.""" + wid = flow({ + 'scheduling': { + 'xtriggers': { + 'xt1': 'custom_xt()', + 'xt2': f'custom_xt(sequential={is_sequential})', + 'xt3': 'custom_xt(sequential=False)', + }, + 'graph': { + 'R1': '@xt1 & @xt2 & @xt3 => foo', + } + } + }) + + with patch( + 'cylc.flow.xtrigger_mgr.get_xtrig_func', + return_value=lambda *a, **k: True + ): + schd: Scheduler = scheduler(wid) + async with start(schd): + itask = schd.pool.get_task(IntegerPoint('1'), 'foo') + assert itask.is_xtrigger_sequential is is_sequential -# TODO: test setting the sequential argument in [scheduling][xtrigger] items -# changes the behaviour -# TODO: test the interaction between "spawn from xtriggers sequentially" and the -# sequential argument to [scheduling][xtrigger] -# * Should we be able to override the default by setting sequential=False? -# * Or should that result in a validation error? +async def test_override(flow, scheduler, start): + """Test that the 'sequential=False' arg can override a default of True.""" + wid = flow({ + 'scheduling': { + 'spawn from xtriggers sequentially': True, + 'xtriggers': { + 'xt1': 'custom_xt()', + 'xt2': 'custom_xt(sequential=False)', + }, + 'graph': { + 'R1': ''' + @xt1 => foo + @xt2 => bar + ''', + } + } + }) + + with patch( + 'cylc.flow.xtrigger_mgr.get_xtrig_func', + return_value=lambda *a, **k: True + ): + schd: Scheduler = scheduler(wid) + async with start(schd): + foo = schd.pool.get_task(IntegerPoint('1'), 'foo') + assert foo.is_xtrigger_sequential is True + bar = schd.pool.get_task(IntegerPoint('1'), 'bar') + assert bar.is_xtrigger_sequential is False diff --git a/tests/unit/test_xtrigger_mgr.py b/tests/unit/test_xtrigger_mgr.py index 78f8fe6d969..3cfee363d15 100644 --- a/tests/unit/test_xtrigger_mgr.py +++ b/tests/unit/test_xtrigger_mgr.py @@ -24,7 +24,7 @@ from cylc.flow.subprocctx import SubFuncContext from cylc.flow.task_proxy import TaskProxy from cylc.flow.taskdef import TaskDef -from cylc.flow.xtrigger_mgr import RE_STR_TMPL +from cylc.flow.xtrigger_mgr import RE_STR_TMPL, XtriggerManager def test_constructor(xtrigger_mgr): @@ -68,7 +68,7 @@ def test_add_xtrigger_with_params(xtrigger_mgr): assert xtrig == xtrigger_mgr.functx_map["xtrig"] -def test_check_xtrigger_with_unknown_params(xtrigger_mgr): +def test_check_xtrigger_with_unknown_params(): """Test for adding an xtrigger with an unknown parameter. The XTriggerManager contains a list of specific parameters that are @@ -90,10 +90,12 @@ def test_check_xtrigger_with_unknown_params(xtrigger_mgr): XtriggerConfigError, match="Illegal template in xtrigger: what_is_this" ): - xtrigger_mgr.check_xtrigger("xtrig", xtrig, 'fdir') + XtriggerManager.check_xtrigger("xtrig", xtrig, 'fdir') -def test_check_xtrigger_with_deprecated_params(xtrigger_mgr, caplog): +def test_check_xtrigger_with_deprecated_params( + caplog: pytest.LogCaptureFixture +): """It should flag deprecated template variables.""" xtrig = SubFuncContext( label="echo", @@ -102,7 +104,7 @@ def test_check_xtrigger_with_deprecated_params(xtrigger_mgr, caplog): func_kwargs={"succeed": True} ) caplog.set_level(logging.WARNING, CYLC_LOG) - xtrigger_mgr.check_xtrigger("xtrig", xtrig, 'fdir') + XtriggerManager.check_xtrigger("xtrig", xtrig, 'fdir') assert caplog.messages == [ 'Xtrigger "xtrig" uses deprecated template variables: suite_name' ]