Skip to content

Commit

Permalink
Merge pull request #6138 from MetRonnie/xtrig-sequential
Browse files Browse the repository at this point in the history
Fix xtrigger `sequential` arg validation & improve docs
  • Loading branch information
MetRonnie authored Jun 17, 2024
2 parents f426bc4 + 0393a03 commit 978d0f6
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 38 deletions.
11 changes: 10 additions & 1 deletion cylc/flow/dbstatecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def __init__(self, rund, workflow, db_path=None):
if not os.path.exists(db_path):
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), db_path)

self.conn = sqlite3.connect(db_path, timeout=10.0)
self.conn: sqlite3.Connection = sqlite3.connect(db_path, timeout=10.0)

# Get workflow point format.
try:
Expand All @@ -84,8 +84,17 @@ def __init__(self, rund, workflow, db_path=None):
self.db_point_fmt = self._get_db_point_format_compat()
self.c7_back_compat_mode = True
except sqlite3.OperationalError:
with suppress(Exception):
self.conn.close()
raise exc # original error

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
"""Close DB connection when leaving context manager."""
self.conn.close()

def adjust_point_to_db(self, cycle, offset):
"""Adjust a cycle point (with offset) to the DB point format.
Expand Down
10 changes: 3 additions & 7 deletions cylc/flow/task_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,14 +419,10 @@ def get_completed_outputs(self) -> Dict[str, str]:
Replace message with "forced" if the output was forced.
"""
def _get_msg(message):
if message in self._forced:
return FORCED_COMPLETION_MSG
else:
return message

return {
self._message_to_trigger[message]: _get_msg(message)
self._message_to_trigger[message]: (
FORCED_COMPLETION_MSG if message in self._forced else message
)
for message, is_completed in self._completed.items()
if is_completed
}
Expand Down
15 changes: 10 additions & 5 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,16 @@ def _handle_sequential_kwarg(
)
fctx.func_kwargs.setdefault('sequential', sequential_param.default)

elif 'sequential' in fctx.func_kwargs:
# xtrig marked as sequential, so add 'sequential' arg to signature
sig = add_kwarg_to_sig(
sig, 'sequential', fctx.func_kwargs['sequential']
)
if 'sequential' in fctx.func_kwargs:
# xtrig marked as sequential in function call
value = fctx.func_kwargs['sequential']
if not isinstance(value, bool):
raise XtriggerConfigError(
label, fctx.func_name,
f"invalid argument 'sequential={value}' - must be boolean"
)
if not sequential_param:
sig = add_kwarg_to_sig(sig, 'sequential', value)
return sig

@staticmethod
Expand Down
3 changes: 3 additions & 0 deletions cylc/flow/xtriggers/wall_clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ def wall_clock(offset: str = 'PT0S', sequential: bool = True):
Wall-clock xtriggers are run sequentially by default.
See :ref:`Sequential Xtriggers` for more details.
.. versionchanged:: 8.3.0
The ``sequential`` argument was added.
"""
# NOTE: This is just a placeholder for the actual implementation.
# This is only used for validating the signature and for autodocs.
Expand Down
9 changes: 8 additions & 1 deletion cylc/flow/xtriggers/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def workflow_state(
If the status or output has been achieved, return {True, result}.
Arg:
Args:
workflow_task_id:
ID (workflow//point/task:selector) of the target task.
offset:
Expand All @@ -62,6 +62,13 @@ def workflow_state(
Dict of workflow, task, point, offset,
status, message, trigger, flow_num, run_dir
.. versionchanged:: 8.3.0
The ``workflow_task_id`` argument was introduced to replace the
separate ``workflow``, ``point``, ``task``, ``status``, and ``message``
arguments (which are still supported for backwards compatibility).
The ``flow_num`` argument was added. The ``cylc_run_dir`` argument
was renamed to ``alt_cylc_run_dir``.
"""
poller = WorkflowPoller(
workflow_task_id,
Expand Down
22 changes: 9 additions & 13 deletions tests/integration/test_dbstatecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,15 @@
from asyncio import sleep
import pytest
from textwrap import dedent
from typing import TYPE_CHECKING

from cylc.flow.dbstatecheck import CylcWorkflowDBChecker as Checker


if TYPE_CHECKING:
from cylc.flow.dbstatecheck import CylcWorkflowDBChecker
from cylc.flow.dbstatecheck import CylcWorkflowDBChecker
from cylc.flow.scheduler import Scheduler


@pytest.fixture(scope='module')
async def checker(
mod_flow, mod_scheduler, mod_run, mod_complete
) -> 'CylcWorkflowDBChecker':
):
"""Make a real world database.
We could just write the database manually but this is a better
Expand All @@ -53,17 +49,17 @@ async def checker(
'output': {'outputs': {'trigger': 'message'}}
}
})
schd = mod_scheduler(wid, paused_start=False)
schd: Scheduler = mod_scheduler(wid, paused_start=False)
async with mod_run(schd):
await mod_complete(schd)
schd.pool.force_trigger_tasks(['1000/good'], [2])
schd.pool.force_trigger_tasks(['1000/good'], ['2'])
# Allow a cycle of the main loop to pass so that flow 2 can be
# added to db
await sleep(1)
yield Checker(
'somestring', 'utterbunkum',
schd.workflow_db_mgr.pub_path
)
with CylcWorkflowDBChecker(
'somestring', 'utterbunkum', schd.workflow_db_mgr.pub_path
) as _checker:
yield _checker


def test_basic(checker):
Expand Down
27 changes: 23 additions & 4 deletions tests/integration/test_sequential_xtriggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,8 @@ async def test_sequential_arg_ok(
assert len(list_cycles(schd)) == expected_num_cycles


def test_sequential_arg_bad(
flow, validate
):
"""Test validation of 'sequential' arg for custom xtriggers"""
def test_sequential_arg_bad(flow, validate):
"""Test validation of 'sequential' arg for custom xtrigger function def"""
wid = flow({
'scheduling': {
'xtriggers': {
Expand Down Expand Up @@ -194,6 +192,27 @@ def xtrig2(x, sequential='True'):
) in str(excinfo.value)


def test_sequential_arg_bad2(flow, validate):
"""Test validation of 'sequential' arg for xtrigger calls"""
wid = flow({
'scheduling': {
'initial cycle point': '2000',
'xtriggers': {
'clock': 'wall_clock(sequential=3)',
},
'graph': {
'R1': '@clock => foo',
},
},
})

with pytest.raises(XtriggerConfigError) as excinfo:
validate(wid)
assert (
"invalid argument 'sequential=3' - must be boolean"
) in str(excinfo.value)


@pytest.mark.parametrize('is_sequential', [True, False])
async def test_any_sequential(flow, scheduler, start, is_sequential: bool):
"""Test that a task is marked as sequential if any of its xtriggers are."""
Expand Down
13 changes: 6 additions & 7 deletions tests/unit/test_db_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,13 @@ def test_cylc_7_db_wflow_params_table(_setup_db):
rf'("cycle_point_format", "{ptformat}")'
)
db_file_name = _setup_db([create, insert])
checker = CylcWorkflowDBChecker('foo', 'bar', db_path=db_file_name)
with CylcWorkflowDBChecker('foo', 'bar', db_path=db_file_name) as checker:
with pytest.raises(
sqlite3.OperationalError, match="no such table: workflow_params"
):
checker._get_db_point_format()

with pytest.raises(
sqlite3.OperationalError, match="no such table: workflow_params"
):
checker._get_db_point_format()

assert checker.db_point_fmt == ptformat
assert checker.db_point_fmt == ptformat


def test_pre_830_task_action_timers(_setup_db):
Expand Down

0 comments on commit 978d0f6

Please sign in to comment.