diff --git a/cylc/flow/workflow_db_mgr.py b/cylc/flow/workflow_db_mgr.py index 7d6968b137c..354fff76b1e 100644 --- a/cylc/flow/workflow_db_mgr.py +++ b/cylc/flow/workflow_db_mgr.py @@ -405,9 +405,10 @@ def put_task_event_timers(self, task_events_mgr): def put_xtriggers(self, sat_xtrig): """Put statements to update external triggers table.""" for sig, res in sat_xtrig.items(): - self.db_inserts_map[self.TABLE_XTRIGGERS].append({ - "signature": sig, - "results": json.dumps(res)}) + if not sig.startswith('wall_clock('): + self.db_inserts_map[self.TABLE_XTRIGGERS].append({ + "signature": sig, + "results": json.dumps(res)}) def put_update_task_state(self, itask): """Update task_states table for current state of itask. diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index f2f24b09ab5..bd69bbaeed9 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -20,11 +20,11 @@ from pathlib import Path import pytest from shutil import rmtree +from time import time from typing import List, TYPE_CHECKING, Set, Tuple, Union from cylc.flow.config import WorkflowConfig from cylc.flow.option_parsers import Options -from cylc.flow.network.client import WorkflowRuntimeClient from cylc.flow.pathutil import get_cylc_run_dir from cylc.flow.rundb import CylcWorkflowDAO from cylc.flow.scripts.validate import ValidateOptions @@ -34,6 +34,7 @@ ) from cylc.flow.wallclock import get_current_time_string from cylc.flow.workflow_files import infer_latest_run_from_id +from cylc.flow.workflow_status import StopMode from .utils import _rm_if_empty from .utils.flow_tools import ( @@ -47,6 +48,7 @@ if TYPE_CHECKING: from cylc.flow.scheduler import Scheduler from cylc.flow.task_proxy import TaskProxy + from cylc.flow.network.client import WorkflowRuntimeClient InstallOpts = Options(install_gop()) @@ -323,7 +325,7 @@ def _inner( def gql_query(): """Execute a GraphQL query given a workflow runtime client.""" async def _gql_query( - client: WorkflowRuntimeClient, query_str: str + client: 'WorkflowRuntimeClient', query_str: str ) -> object: ret = await client.async_request( 'graphql', { @@ -473,3 +475,83 @@ def _inner(source, **kwargs): workflow_id = infer_latest_run_from_id(workflow_id) return workflow_id yield _inner + + +@pytest.fixture +def complete(): + """Wait for the workflow, or tasks within it to complete. + + Args: + schd: + The scheduler to await. + tokens_list: + If specified, this will wait for the tasks represented by these + tokens to be marked as completed by the task pool. + stop_mode: + If tokens_list is not provided, this will wait for the scheduler + to be shutdown with the specified mode (default = AUTO, i.e. + workflow completed normally). + timeout: + Max time to wait for the condition to be met. + + Note, if you need to increase this, you might want to rethink your + test. + + Note, use this timeout rather than wrapping the complete call with + async_timeout (handles shutdown logic more cleanly). + + """ + async def _complete( + schd, + *tokens_list, + stop_mode=StopMode.AUTO, + timeout=60, + ): + start_time = time() + tokens_list = [tokens.task for tokens in tokens_list] + + # capture task completion + remove_if_complete = schd.pool.remove_if_complete + + def _remove_if_complete(itask): + ret = remove_if_complete(itask) + if ret and itask.tokens.task in tokens_list: + tokens_list.remove(itask.tokens.task) + return ret + + schd.pool.remove_if_complete = _remove_if_complete + + # capture workflow shutdown + set_stop = schd._set_stop + has_shutdown = False + + def _set_stop(mode=None): + nonlocal has_shutdown, stop_mode + if mode == stop_mode: + has_shutdown = True + return set_stop(mode) + else: + set_stop(mode) + raise Exception(f'Workflow bailed with stop mode = {mode}') + + schd._set_stop = _set_stop + + # determine the completion condition + if tokens_list: + condition = lambda: bool(tokens_list) + else: + condition = lambda: bool(not has_shutdown) + + # wait for the condition to be met + while condition(): + # allow the main loop to advance + await asyncio.sleep(0) + if time() - start_time > timeout: + raise Exception( + f'Timeout waiting for {", ".join(map(str, tokens_list))}' + ) + + # restore regular shutdown logic + schd._set_stop = set_stop + + return _complete diff --git a/tests/integration/test_workflow_db_mgr.py b/tests/integration/test_workflow_db_mgr.py index fa15c9165b0..cf4fca7e064 100644 --- a/tests/integration/test_workflow_db_mgr.py +++ b/tests/integration/test_workflow_db_mgr.py @@ -14,10 +14,15 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import asyncio import pytest import sqlite3 +from typing import TYPE_CHECKING -from cylc.flow.scheduler import Scheduler +from cylc.flow.cycling.iso8601 import ISO8601Point + +if TYPE_CHECKING: + from cylc.flow.scheduler import Scheduler async def test_restart_number( @@ -29,7 +34,7 @@ async def test_restart_number( async def test(expected_restart_num: int, do_reload: bool = False): """(Re)start the workflow and check the restart number is as expected. """ - schd: Scheduler = scheduler(id_, paused_start=True) + schd: 'Scheduler' = scheduler(id_, paused_start=True) async with start(schd) as log: if do_reload: schd.command_reload_workflow() @@ -52,7 +57,7 @@ async def test(expected_restart_num: int, do_reload: bool = False): await test(expected_restart_num=3) -def db_remove_column(schd: Scheduler, table: str, column: str) -> None: +def db_remove_column(schd: 'Scheduler', table: str, column: str) -> None: """Remove a column from a scheduler DB table. ALTER TABLE DROP COLUMN is not supported by sqlite yet, so we have to copy @@ -82,7 +87,7 @@ async def test_db_upgrade_pre_803( id_ = flow(one_conf) # Run a scheduler to create a DB. - schd: Scheduler = scheduler(id_, paused_start=True) + schd: 'Scheduler' = scheduler(id_, paused_start=True) async with start(schd): assert ('n_restart', '0') in db_select(schd, False, 'workflow_params') @@ -90,7 +95,7 @@ async def test_db_upgrade_pre_803( db_remove_column(schd, "task_states", "is_manual_submit") db_remove_column(schd, "task_jobs", "flow_nums") - schd: Scheduler = scheduler(id_, paused_start=True) + schd: 'Scheduler' = scheduler(id_, paused_start=True) # Restart should fail due to the missing column. with pytest.raises(sqlite3.OperationalError): @@ -98,7 +103,7 @@ async def test_db_upgrade_pre_803( pass assert ('n_restart', '1') in db_select(schd, False, 'workflow_params') - schd: Scheduler = scheduler(id_, paused_start=True) + schd: 'Scheduler' = scheduler(id_, paused_start=True) # Run the DB upgrader for version 8.0.2 # (8.0.2 requires upgrade) @@ -117,7 +122,7 @@ async def test_workflow_param_rapid_toggle( https://github.com/cylc/cylc-flow/issues/5593 """ - schd: Scheduler = scheduler(flow(one_conf), paused_start=False) + schd: 'Scheduler' = scheduler(flow(one_conf), paused_start=False) async with run(schd): assert schd.is_paused is False schd.pause_workflow() @@ -127,3 +132,50 @@ async def test_workflow_param_rapid_toggle( w_params = dict(schd.workflow_db_mgr.pri_dao.select_workflow_params()) assert w_params['is_paused'] == '0' + + +async def test_record_only_non_clock_triggers( + flow, run, scheduler, complete, db_select +): + """Database does not record wall_clock xtriggers. + + https://github.com/cylc/cylc-flow/issues/5911 + + Includes: + - Not in DB: A normal wall clock xtrigger (wall_clock). + - In DB: An xrandom mis-labelled as wall_clock trigger DB). + - Not in DB: An execution retry xtrigger. + + @TODO: Refactor to use simulation mode to speedup after Simulation + mode upgrade bugfixes: This should speed this test up considerably. + """ + rawpoint = '1348' + id_ = flow({ + "scheduler": { + 'cycle point format': '%Y', + 'allow implicit tasks': True + }, + "scheduling": { + "initial cycle point": rawpoint, + "xtriggers": { + "another": "xrandom(100)", + "wall_clock": "xrandom(100, _=Not a real wall clock trigger)", + "real_wall_clock": "wall_clock()" + }, + "graph": { + "R1": """ + @another & @wall_clock & @real_wall_clock => foo + @real_wall_clock => bar + """ + } + }, + }) + schd = scheduler(id_, paused_start=False, run_mode='simulation') + + async with run(schd): + await complete(schd, timeout=20) + + # Assert that (only) the real clock trigger is not in the db: + assert db_select(schd, False, 'xtriggers', 'signature') == [ + ('xrandom(100)',), + ('xrandom(100, _=Not a real wall clock trigger)',)]