From c3dc43a7a023f1856534e43ca4dd9e9e4db74d00 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Mon, 15 Jan 2024 10:30:31 +0000 Subject: [PATCH 1/7] Don't record satisfied xtriggers in the DB Closes https://github.com/cylc/cylc-flow/issues/5911 --- cylc/flow/workflow_db_mgr.py | 7 +++-- tests/integration/test_workflow_db_mgr.py | 37 +++++++++++++++++++++++ 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/cylc/flow/workflow_db_mgr.py b/cylc/flow/workflow_db_mgr.py index 7d6968b137c..1e58ca6c188 100644 --- a/cylc/flow/workflow_db_mgr.py +++ b/cylc/flow/workflow_db_mgr.py @@ -405,9 +405,10 @@ def put_task_event_timers(self, task_events_mgr): def put_xtriggers(self, sat_xtrig): """Put statements to update external triggers table.""" for sig, res in sat_xtrig.items(): - self.db_inserts_map[self.TABLE_XTRIGGERS].append({ - "signature": sig, - "results": json.dumps(res)}) + if not sig.startswith('wall_clock(trigger_time='): + self.db_inserts_map[self.TABLE_XTRIGGERS].append({ + "signature": sig, + "results": json.dumps(res)}) def put_update_task_state(self, itask): """Update task_states table for current state of itask. diff --git a/tests/integration/test_workflow_db_mgr.py b/tests/integration/test_workflow_db_mgr.py index fa15c9165b0..9e9722caff3 100644 --- a/tests/integration/test_workflow_db_mgr.py +++ b/tests/integration/test_workflow_db_mgr.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import asyncio import pytest import sqlite3 @@ -127,3 +128,39 @@ async def test_workflow_param_rapid_toggle( w_params = dict(schd.workflow_db_mgr.pri_dao.select_workflow_params()) assert w_params['is_paused'] == '0' + + +async def test_record_only_non_clock_triggers(flow, run, scheduler): + """Database does not record wall_clock xtriggers. + + https://github.com/cylc/cylc-flow/issues/5911 + """ + id_ = flow({ + "scheduler": { + "allow implicit tasks": True, + 'cycle point format': '%Y' + }, + "scheduling": { + "initial cycle point": "1348", + "xtriggers": { + "another": "xrandom(100)", + "wall_clock": "xrandom(100, _=Not a real wall clock trigger)", + "real_wall_clock": "wall_clock()" + }, + "graph": { + "R1": "@another & @wall_clock & @real_wall_clock => foo" + } + }, + }) + # Run workflow unto completion: + schd = scheduler(id_, paused_start=False) + async with run(schd) as log: + while 'Workflow shutting down - AUTOMATIC' not in log.messages: + await asyncio.sleep(1) + + # Get xtriggers db table: + info = schd.workflow_db_mgr.get_pri_dao().conn.execute( + 'SELECT * FROM xtriggers').fetchall() + + # All xtriggers are xrandom: None are wall_clock: + assert all(i[0].startswith('xrandom') for i in info) From 5a592f957104ad05e230c842d921d01912c61b6a Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Mon, 15 Jan 2024 11:17:54 +0000 Subject: [PATCH 2/7] Test that no wall_clock xtriggers are saved to the DB for a retry. --- tests/integration/test_workflow_db_mgr.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_workflow_db_mgr.py b/tests/integration/test_workflow_db_mgr.py index 9e9722caff3..56a26d93fab 100644 --- a/tests/integration/test_workflow_db_mgr.py +++ b/tests/integration/test_workflow_db_mgr.py @@ -134,10 +134,17 @@ async def test_record_only_non_clock_triggers(flow, run, scheduler): """Database does not record wall_clock xtriggers. https://github.com/cylc/cylc-flow/issues/5911 + + Includes: + - Not in DB: A normal wall clock xtrigger (wall_clock). + - In DB: An xrandom mis-labelled as wall_clock trigger DB). + - Not in DB: An execution retry xtrigger. + + @TODO: Refactor to use simulation mode to speedup after Simulation + mode upgrade bugfixes: This should speed this test up considerably. """ id_ = flow({ "scheduler": { - "allow implicit tasks": True, 'cycle point format': '%Y' }, "scheduling": { @@ -151,12 +158,19 @@ async def test_record_only_non_clock_triggers(flow, run, scheduler): "R1": "@another & @wall_clock & @real_wall_clock => foo" } }, + 'runtime': { + 'foo': { + 'execution retry delays': 'PT0S', + 'script': ( + 'test $CYLC_TASK_SUBMIT_NUMBER == 1 && exit 1 || exit 0') + } + } }) # Run workflow unto completion: - schd = scheduler(id_, paused_start=False) + schd = scheduler(id_, paused_start=False, run_mode='live') async with run(schd) as log: while 'Workflow shutting down - AUTOMATIC' not in log.messages: - await asyncio.sleep(1) + await asyncio.sleep(0.5) # Get xtriggers db table: info = schd.workflow_db_mgr.get_pri_dao().conn.execute( From 56e65fc61dc8733f3834e1fd22dc41f98c35a651 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Mon, 15 Jan 2024 15:53:19 +0000 Subject: [PATCH 3/7] Apply suggestions from code review Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- tests/integration/test_workflow_db_mgr.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/tests/integration/test_workflow_db_mgr.py b/tests/integration/test_workflow_db_mgr.py index 56a26d93fab..5b1df987302 100644 --- a/tests/integration/test_workflow_db_mgr.py +++ b/tests/integration/test_workflow_db_mgr.py @@ -161,8 +161,7 @@ async def test_record_only_non_clock_triggers(flow, run, scheduler): 'runtime': { 'foo': { 'execution retry delays': 'PT0S', - 'script': ( - 'test $CYLC_TASK_SUBMIT_NUMBER == 1 && exit 1 || exit 0') + 'script': 'test $CYLC_TASK_SUBMIT_NUMBER != 1' } } }) @@ -172,9 +171,7 @@ async def test_record_only_non_clock_triggers(flow, run, scheduler): while 'Workflow shutting down - AUTOMATIC' not in log.messages: await asyncio.sleep(0.5) - # Get xtriggers db table: - info = schd.workflow_db_mgr.get_pri_dao().conn.execute( - 'SELECT * FROM xtriggers').fetchall() - - # All xtriggers are xrandom: None are wall_clock: - assert all(i[0].startswith('xrandom') for i in info) + assert db_select(schd, False, 'xtriggers', 'signature') == [ + ('xrandom(100)',), + ('xrandom(100, _=Not a real wall clock trigger)',) + ] From d1e38aadc180244b224e9a2524dbcd2e6ec1e167 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Tue, 16 Jan 2024 14:21:20 +0000 Subject: [PATCH 4/7] Update cylc/flow/workflow_db_mgr.py Co-authored-by: Oliver Sanders --- cylc/flow/workflow_db_mgr.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cylc/flow/workflow_db_mgr.py b/cylc/flow/workflow_db_mgr.py index 1e58ca6c188..354fff76b1e 100644 --- a/cylc/flow/workflow_db_mgr.py +++ b/cylc/flow/workflow_db_mgr.py @@ -405,7 +405,7 @@ def put_task_event_timers(self, task_events_mgr): def put_xtriggers(self, sat_xtrig): """Put statements to update external triggers table.""" for sig, res in sat_xtrig.items(): - if not sig.startswith('wall_clock(trigger_time='): + if not sig.startswith('wall_clock('): self.db_inserts_map[self.TABLE_XTRIGGERS].append({ "signature": sig, "results": json.dumps(res)}) From 65cc6120297d2302a3320892f0b5cd2325f78714 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Tue, 16 Jan 2024 15:54:11 +0000 Subject: [PATCH 5/7] wip --- cylc/flow/task_pool.py | 4 ++ tests/integration/conftest.py | 88 ++++++++++++++++++++++- tests/integration/test_workflow_db_mgr.py | 25 +++++-- 3 files changed, 108 insertions(+), 9 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 2885bacba7b..4478653da96 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1733,6 +1733,7 @@ def sim_time_check(self, message_queue: 'Queue[TaskMsg]') -> bool: """Simulation mode: simulate task run times and set states.""" if not self.config.run_mode('simulation'): return False + breakpoint(header='start of sim_time_check') sim_task_state_changed = False now = time() for itask in self.get_tasks(): @@ -1753,11 +1754,14 @@ def sim_time_check(self, message_queue: 'Queue[TaskMsg]') -> bool: ) and ( itask.get_try_num() == 1 or not conf['fail try 1 only'] ): + breakpoint(header='fail') message_queue.put( TaskMsg(job_d, now_str, 'CRITICAL', TASK_STATUS_FAILED) ) else: # Simulate message outputs. + breakpoint(header='success') + for msg in itask.tdef.rtconfig['outputs'].values(): message_queue.put( TaskMsg(job_d, now_str, 'DEBUG', msg) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index f2f24b09ab5..b8ac755837e 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -20,11 +20,11 @@ from pathlib import Path import pytest from shutil import rmtree +from time import time from typing import List, TYPE_CHECKING, Set, Tuple, Union from cylc.flow.config import WorkflowConfig from cylc.flow.option_parsers import Options -from cylc.flow.network.client import WorkflowRuntimeClient from cylc.flow.pathutil import get_cylc_run_dir from cylc.flow.rundb import CylcWorkflowDAO from cylc.flow.scripts.validate import ValidateOptions @@ -34,6 +34,7 @@ ) from cylc.flow.wallclock import get_current_time_string from cylc.flow.workflow_files import infer_latest_run_from_id +from cylc.flow.workflow_status import StopMode from .utils import _rm_if_empty from .utils.flow_tools import ( @@ -47,6 +48,7 @@ if TYPE_CHECKING: from cylc.flow.scheduler import Scheduler from cylc.flow.task_proxy import TaskProxy + from cylc.flow.network.client import WorkflowRuntimeClient InstallOpts = Options(install_gop()) @@ -323,7 +325,7 @@ def _inner( def gql_query(): """Execute a GraphQL query given a workflow runtime client.""" async def _gql_query( - client: WorkflowRuntimeClient, query_str: str + client: 'WorkflowRuntimeClient', query_str: str ) -> object: ret = await client.async_request( 'graphql', { @@ -473,3 +475,85 @@ def _inner(source, **kwargs): workflow_id = infer_latest_run_from_id(workflow_id) return workflow_id yield _inner + + +@pytest.fixture +def complete(): + """Wait for the workflow, or tasks within it to complete. + + Args: + schd: + The scheduler to await. + tokens_list: + If specified, this will wait for the tasks represented by these + tokens to be marked as completed by the task pool. + stop_mode: + If tokens_list is not provided, this will wait for the scheduler + to be shutdown with the specified mode (default = AUTO, i.e. + workflow completed normally). + timeout: + Max time to wait for the condition to be met. + + Note, if you need to increase this, you might want to rethink your + test. + + Note, use this timeout rather than wrapping the complete call with + async_timeout (handles shutdown logic more cleanly). + + """ + async def _complete( + schd, + *tokens_list, + stop_mode=StopMode.AUTO, + timeout=60, + ): + start_time = time() + tokens_list = [tokens.task for tokens in tokens_list] + + # capture task completion + remove_if_complete = schd.pool.remove_if_complete + + def _remove_if_complete(itask): + ret = remove_if_complete(itask) + if ret and itask.tokens.task in tokens_list: + tokens_list.remove(itask.tokens.task) + return ret + + schd.pool.remove_if_complete = _remove_if_complete + + # capture workflow shutdown + set_stop = schd._set_stop + has_shutdown = False + + def _set_stop(mode=None): + nonlocal has_shutdown, stop_mode + if mode == stop_mode: + has_shutdown = True + return set_stop(mode) + else: + set_stop(mode) + raise Exception(f'Workflow bailed with stop mode = {mode}') + + schd._set_stop = _set_stop + + # determine the completion condition + if tokens_list: + condition = lambda: bool(tokens_list) + else: + condition = lambda: bool(not has_shutdown) + + # wait for the condition to be met + while condition(): + # allow the main loop to advance + await asyncio.sleep(0) + if time() - start_time > timeout: + raise Exception( + f'Timeout waiting for {", ".join(map(str, tokens_list))}' + ) + + # restore regular shutdown logic + schd._set_stop = set_stop + + return _complete + + diff --git a/tests/integration/test_workflow_db_mgr.py b/tests/integration/test_workflow_db_mgr.py index 5b1df987302..52b7245e746 100644 --- a/tests/integration/test_workflow_db_mgr.py +++ b/tests/integration/test_workflow_db_mgr.py @@ -130,7 +130,9 @@ async def test_workflow_param_rapid_toggle( assert w_params['is_paused'] == '0' -async def test_record_only_non_clock_triggers(flow, run, scheduler): +async def test_record_only_non_clock_triggers( + flow, run, scheduler, complete, db_select +): """Database does not record wall_clock xtriggers. https://github.com/cylc/cylc-flow/issues/5911 @@ -155,21 +157,30 @@ async def test_record_only_non_clock_triggers(flow, run, scheduler): "real_wall_clock": "wall_clock()" }, "graph": { - "R1": "@another & @wall_clock & @real_wall_clock => foo" + "R1": """ + @another & @wall_clock & @real_wall_clock => foo + @real_wall_clock => bar + """ } }, 'runtime': { 'foo': { - 'execution retry delays': 'PT0S', - 'script': 'test $CYLC_TASK_SUBMIT_NUMBER != 1' + 'execution retry delays': 'PT1S', + 'submission retry delays': 'PT0S', + 'simulation': { + 'default run length': 'PT0S', + 'fail cycle points': '1348' + } } } }) # Run workflow unto completion: - schd = scheduler(id_, paused_start=False, run_mode='live') + schd = scheduler(id_, paused_start=False, run_mode='simulation') + import asyncio async with run(schd) as log: - while 'Workflow shutting down - AUTOMATIC' not in log.messages: - await asyncio.sleep(0.5) + #schd.pool.get_tasks()[0].state.status = 'running' + await complete(schd, timeout=10) + assert db_select(schd, False, 'xtriggers', 'signature') == [ ('xrandom(100)',), From e51cfe987b3f559df469661cf269075d20590d3f Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Fri, 26 Jan 2024 10:58:10 +0000 Subject: [PATCH 6/7] Response to review: Remove real script from test workflow. --- cylc/flow/task_pool.py | 4 -- tests/integration/test_workflow_db_mgr.py | 49 +++++++++++------------ 2 files changed, 23 insertions(+), 30 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 4478653da96..2885bacba7b 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1733,7 +1733,6 @@ def sim_time_check(self, message_queue: 'Queue[TaskMsg]') -> bool: """Simulation mode: simulate task run times and set states.""" if not self.config.run_mode('simulation'): return False - breakpoint(header='start of sim_time_check') sim_task_state_changed = False now = time() for itask in self.get_tasks(): @@ -1754,14 +1753,11 @@ def sim_time_check(self, message_queue: 'Queue[TaskMsg]') -> bool: ) and ( itask.get_try_num() == 1 or not conf['fail try 1 only'] ): - breakpoint(header='fail') message_queue.put( TaskMsg(job_d, now_str, 'CRITICAL', TASK_STATUS_FAILED) ) else: # Simulate message outputs. - breakpoint(header='success') - for msg in itask.tdef.rtconfig['outputs'].values(): message_queue.put( TaskMsg(job_d, now_str, 'DEBUG', msg) diff --git a/tests/integration/test_workflow_db_mgr.py b/tests/integration/test_workflow_db_mgr.py index 52b7245e746..70ec8d03daf 100644 --- a/tests/integration/test_workflow_db_mgr.py +++ b/tests/integration/test_workflow_db_mgr.py @@ -17,8 +17,12 @@ import asyncio import pytest import sqlite3 +from typing import TYPE_CHECKING -from cylc.flow.scheduler import Scheduler +from cylc.flow.cycling.iso8601 import ISO8601Point + +if TYPE_CHECKING: + from cylc.flow.scheduler import Scheduler async def test_restart_number( @@ -30,7 +34,7 @@ async def test_restart_number( async def test(expected_restart_num: int, do_reload: bool = False): """(Re)start the workflow and check the restart number is as expected. """ - schd: Scheduler = scheduler(id_, paused_start=True) + schd: 'Scheduler' = scheduler(id_, paused_start=True) async with start(schd) as log: if do_reload: schd.command_reload_workflow() @@ -53,7 +57,7 @@ async def test(expected_restart_num: int, do_reload: bool = False): await test(expected_restart_num=3) -def db_remove_column(schd: Scheduler, table: str, column: str) -> None: +def db_remove_column(schd: 'Scheduler', table: str, column: str) -> None: """Remove a column from a scheduler DB table. ALTER TABLE DROP COLUMN is not supported by sqlite yet, so we have to copy @@ -83,7 +87,7 @@ async def test_db_upgrade_pre_803( id_ = flow(one_conf) # Run a scheduler to create a DB. - schd: Scheduler = scheduler(id_, paused_start=True) + schd: 'Scheduler' = scheduler(id_, paused_start=True) async with start(schd): assert ('n_restart', '0') in db_select(schd, False, 'workflow_params') @@ -91,7 +95,7 @@ async def test_db_upgrade_pre_803( db_remove_column(schd, "task_states", "is_manual_submit") db_remove_column(schd, "task_jobs", "flow_nums") - schd: Scheduler = scheduler(id_, paused_start=True) + schd: 'Scheduler' = scheduler(id_, paused_start=True) # Restart should fail due to the missing column. with pytest.raises(sqlite3.OperationalError): @@ -99,7 +103,7 @@ async def test_db_upgrade_pre_803( pass assert ('n_restart', '1') in db_select(schd, False, 'workflow_params') - schd: Scheduler = scheduler(id_, paused_start=True) + schd: 'Scheduler' = scheduler(id_, paused_start=True) # Run the DB upgrader for version 8.0.2 # (8.0.2 requires upgrade) @@ -118,7 +122,7 @@ async def test_workflow_param_rapid_toggle( https://github.com/cylc/cylc-flow/issues/5593 """ - schd: Scheduler = scheduler(flow(one_conf), paused_start=False) + schd: 'Scheduler' = scheduler(flow(one_conf), paused_start=False) async with run(schd): assert schd.is_paused is False schd.pause_workflow() @@ -145,12 +149,14 @@ async def test_record_only_non_clock_triggers( @TODO: Refactor to use simulation mode to speedup after Simulation mode upgrade bugfixes: This should speed this test up considerably. """ + rawpoint = '1348' id_ = flow({ "scheduler": { - 'cycle point format': '%Y' + 'cycle point format': '%Y', + 'allow implicit tasks': True }, "scheduling": { - "initial cycle point": "1348", + "initial cycle point": rawpoint, "xtriggers": { "another": "xrandom(100)", "wall_clock": "xrandom(100, _=Not a real wall clock trigger)", @@ -163,26 +169,17 @@ async def test_record_only_non_clock_triggers( """ } }, - 'runtime': { - 'foo': { - 'execution retry delays': 'PT1S', - 'submission retry delays': 'PT0S', - 'simulation': { - 'default run length': 'PT0S', - 'fail cycle points': '1348' - } - } - } }) - # Run workflow unto completion: schd = scheduler(id_, paused_start=False, run_mode='simulation') - import asyncio - async with run(schd) as log: - #schd.pool.get_tasks()[0].state.status = 'running' - await complete(schd, timeout=10) + async with run(schd): + foo = schd.pool.get_task(ISO8601Point(rawpoint), 'foo') + + # Wait until all xtriggers for foo are satisfied: + while not all(foo.state.xtriggers.values()): + await asyncio.sleep(1) + # Assert that (only) the real clock trigger is not in the db: assert db_select(schd, False, 'xtriggers', 'signature') == [ ('xrandom(100)',), - ('xrandom(100, _=Not a real wall clock trigger)',) - ] + ('xrandom(100, _=Not a real wall clock trigger)',)] From 94f3a769360bc78c639f45d9ec25442244c73e5e Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Thu, 1 Feb 2024 13:26:36 +0000 Subject: [PATCH 7/7] use copy of master branch complete fixture --- tests/integration/conftest.py | 2 -- tests/integration/test_workflow_db_mgr.py | 6 +----- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index b8ac755837e..bd69bbaeed9 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -555,5 +555,3 @@ def _set_stop(mode=None): schd._set_stop = set_stop return _complete - - diff --git a/tests/integration/test_workflow_db_mgr.py b/tests/integration/test_workflow_db_mgr.py index 70ec8d03daf..cf4fca7e064 100644 --- a/tests/integration/test_workflow_db_mgr.py +++ b/tests/integration/test_workflow_db_mgr.py @@ -173,11 +173,7 @@ async def test_record_only_non_clock_triggers( schd = scheduler(id_, paused_start=False, run_mode='simulation') async with run(schd): - foo = schd.pool.get_task(ISO8601Point(rawpoint), 'foo') - - # Wait until all xtriggers for foo are satisfied: - while not all(foo.state.xtriggers.values()): - await asyncio.sleep(1) + await complete(schd, timeout=20) # Assert that (only) the real clock trigger is not in the db: assert db_select(schd, False, 'xtriggers', 'signature') == [