Skip to content

Commit

Permalink
Reimplement xtrigger sequential arg post-merge & add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
MetRonnie authored and dwsutherland committed Feb 29, 2024
1 parent 81cd15f commit 2d1bd6a
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 59 deletions.
59 changes: 49 additions & 10 deletions cylc/flow/subprocctx.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,39 @@
Coerce more value type from string (to time point, duration, xtriggers, etc.).
"""

from inspect import Parameter
import json
from shlex import quote
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union

from cylc.flow.wallclock import get_current_time_string

if TYPE_CHECKING:
from inspect import Signature


def add_kwarg_to_sig(
sig: 'Signature', arg_name: str, default: Any
) -> 'Signature':
"""Return a new signature with a kwarg added."""
# Note: added kwarg has to be before **kwargs ("variadic") in the signature
positional_or_keyword: List[Parameter] = []
variadic: List[Parameter] = []
for param in sig.parameters.values():
if param.kind == Parameter.VAR_KEYWORD:
variadic.append(param)
else:
positional_or_keyword.append(param)
return sig.replace(parameters=[
*positional_or_keyword,
Parameter(
arg_name,
kind=Parameter.KEYWORD_ONLY,
default=default,
),
*variadic,
])


class SubProcContext: # noqa: SIM119 (not really relevant to this case)
"""Represent the context of an external command to run as a subprocess.
Expand Down Expand Up @@ -115,23 +143,31 @@ class SubFuncContext(SubProcContext):
Attributes:
# See also parent class attributes.
.label (str):
.label:
function label under [xtriggers] in flow.cylc
.func_name (str):
.func_name:
function name
.func_args (list):
.func_args:
function positional args
.func_kwargs (dict):
.func_kwargs:
function keyword args
.intvl (float - seconds):
function call interval (how often to check the external trigger)
.ret_val (bool, dict)
.intvl:
function call interval in secs (how often to check the
external trigger)
.ret_val
function return: (satisfied?, result to pass to trigger tasks)
"""

DEFAULT_INTVL = 10.0

def __init__(self, label, func_name, func_args, func_kwargs, intvl=None):
def __init__(
self,
label: str,
func_name: str,
func_args: List[Any],
func_kwargs: Dict[str, Any],
intvl: Union[float, str] = DEFAULT_INTVL
):
"""Initialize a function context."""
self.label = label
self.func_name = func_name
Expand All @@ -141,9 +177,12 @@ def __init__(self, label, func_name, func_args, func_kwargs, intvl=None):
self.intvl = float(intvl)
except (TypeError, ValueError):
self.intvl = self.DEFAULT_INTVL
self.ret_val = (False, None) # (satisfied, broadcast)
self.ret_val: Tuple[
bool, Optional[dict]
] = (False, None) # (satisfied, broadcast)
super(SubFuncContext, self).__init__(
'xtrigger-func', cmd=[], shell=False)
'xtrigger-func', cmd=[], shell=False
)

def update_command(self, workflow_run_dir):
"""Update the function wrap command after changes."""
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ def get_tasks_by_point(self) -> 'Dict[PointBase, List[TaskProxy]]':

return point_itasks

def get_task(self, point, name) -> Optional[TaskProxy]:
def get_task(self, point: 'PointBase', name: str) -> Optional[TaskProxy]:
"""Retrieve a task from the pool."""
rel_id = f'{point}/{name}'
for pool in (self.main_pool, self.hidden_pool):
Expand Down Expand Up @@ -1747,7 +1747,7 @@ def remove_tasks(self, items):

def force_trigger_tasks(
self, items: Iterable[str],
flow: List[str],
flow: List[Union[str, int]],
flow_wait: bool = False,
flow_descr: Optional[str] = None
) -> int:
Expand Down
27 changes: 25 additions & 2 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from cylc.flow.exceptions import XtriggerConfigError
import cylc.flow.flags
from cylc.flow.hostuserutil import get_user
from cylc.flow.subprocctx import add_kwarg_to_sig
from cylc.flow.subprocpool import get_xtrig_func
from cylc.flow.xtriggers.wall_clock import _wall_clock

Expand Down Expand Up @@ -352,10 +353,32 @@ def check_xtrigger(
x_argspec.args.index('sequential')
]

# Validate args and kwargs against the function signature
sig = signature(func)
sig_str = fctx.get_signature()

# Handle reserved 'sequential' kwarg:
sequential_param = sig.parameters.get('sequential', None)
if sequential_param:
if not isinstance(sequential_param.default, bool):
raise XtriggerConfigError(
label,
(
f"xtrigger '{fname}' function definition contains "
"reserved argument 'sequential' that has no "
"boolean default"
)
)
fctx.func_kwargs.setdefault('sequential', sequential_param.default)
elif 'sequential' in fctx.func_kwargs:
# xtrig call marked as sequential; add 'sequential' arg to
# signature for validation
sig = add_kwarg_to_sig(
sig, 'sequential', fctx.func_kwargs['sequential']
)

# Validate args and kwargs against the function signature
try:
bound_args = signature(func).bind(
bound_args = sig.bind(
*fctx.func_args, **fctx.func_kwargs
)
except TypeError as exc:
Expand Down
15 changes: 0 additions & 15 deletions tests/integration/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,6 @@ def test_validate_implicit_task_name(
are blacklisted get caught and raise errors.
"""
id_ = flow({
'scheduler': {
'allow implicit tasks': 'True'
},
'scheduling': {
'graph': {
'R1': task_name
Expand Down Expand Up @@ -189,9 +186,6 @@ def test_no_graph(flow, validate):
def test_parse_special_tasks_invalid(flow, validate, section):
"""It should fail for invalid "special tasks"."""
id_ = flow({
'scheduler': {
'allow implicit tasks': 'True',
},
'scheduling': {
'initial cycle point': 'now',
'special tasks': {
Expand All @@ -211,9 +205,6 @@ def test_parse_special_tasks_invalid(flow, validate, section):
def test_parse_special_tasks_interval(flow, validate):
"""It should fail for invalid durations in clock-triggers."""
id_ = flow({
'scheduler': {
'allow implicit tasks': 'True',
},
'scheduling': {
'initial cycle point': 'now',
'special tasks': {
Expand Down Expand Up @@ -359,7 +350,6 @@ def test_xtrig_validation_wall_clock(
https://github.com/cylc/cylc-flow/issues/5448
"""
id_ = flow({
'scheduler': {'allow implicit tasks': True},
'scheduling': {
'initial cycle point': '1012',
'xtriggers': {'myxt': 'wall_clock(offset=PT7MH)'},
Expand All @@ -378,7 +368,6 @@ def test_xtrig_implicit_wall_clock(flow: Fixture, validate: Fixture):
xtrigger definition.
"""
wid = flow({
'scheduler': {'allow implicit tasks': True},
'scheduling': {
'initial cycle point': '2024',
'graph': {'R1': '@wall_clock => foo'},
Expand All @@ -396,7 +385,6 @@ def test_xtrig_validation_echo(
https://github.com/cylc/cylc-flow/issues/5448
"""
id_ = flow({
'scheduler': {'allow implicit tasks': True},
'scheduling': {
'xtriggers': {'myxt': 'echo()'},
'graph': {'R1': '@myxt => foo'},
Expand All @@ -418,7 +406,6 @@ def test_xtrig_validation_xrandom(
https://github.com/cylc/cylc-flow/issues/5448
"""
id_ = flow({
'scheduler': {'allow implicit tasks': True},
'scheduling': {
'xtriggers': {'myxt': 'xrandom(200)'},
'graph': {'R1': '@myxt => foo'},
Expand Down Expand Up @@ -459,7 +446,6 @@ def kustom_validate(args):
)

id_ = flow({
'scheduler': {'allow implicit tasks': True},
'scheduling': {
'initial cycle point': '1012',
'xtriggers': {'myxt': 'kustom_xt(feature=42)'},
Expand Down Expand Up @@ -490,7 +476,6 @@ def test_xtrig_signature_validation(
):
"""Test automatic xtrigger function signature validation."""
id_ = flow({
'scheduler': {'allow implicit tasks': True},
'scheduling': {
'initial cycle point': '2024',
'xtriggers': {'myxt': xtrig_call},
Expand Down
Loading

0 comments on commit 2d1bd6a

Please sign in to comment.