From c3dc43a7a023f1856534e43ca4dd9e9e4db74d00 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Mon, 15 Jan 2024 10:30:31 +0000
Subject: [PATCH 1/7] Don't record satisfied xtriggers in the DB Closes
https://github.com/cylc/cylc-flow/issues/5911
---
cylc/flow/workflow_db_mgr.py | 7 +++--
tests/integration/test_workflow_db_mgr.py | 37 +++++++++++++++++++++++
2 files changed, 41 insertions(+), 3 deletions(-)
diff --git a/cylc/flow/workflow_db_mgr.py b/cylc/flow/workflow_db_mgr.py
index 7d6968b137c..1e58ca6c188 100644
--- a/cylc/flow/workflow_db_mgr.py
+++ b/cylc/flow/workflow_db_mgr.py
@@ -405,9 +405,10 @@ def put_task_event_timers(self, task_events_mgr):
def put_xtriggers(self, sat_xtrig):
"""Put statements to update external triggers table."""
for sig, res in sat_xtrig.items():
- self.db_inserts_map[self.TABLE_XTRIGGERS].append({
- "signature": sig,
- "results": json.dumps(res)})
+ if not sig.startswith('wall_clock(trigger_time='):
+ self.db_inserts_map[self.TABLE_XTRIGGERS].append({
+ "signature": sig,
+ "results": json.dumps(res)})
def put_update_task_state(self, itask):
"""Update task_states table for current state of itask.
diff --git a/tests/integration/test_workflow_db_mgr.py b/tests/integration/test_workflow_db_mgr.py
index fa15c9165b0..9e9722caff3 100644
--- a/tests/integration/test_workflow_db_mgr.py
+++ b/tests/integration/test_workflow_db_mgr.py
@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
+import asyncio
import pytest
import sqlite3
@@ -127,3 +128,39 @@ async def test_workflow_param_rapid_toggle(
w_params = dict(schd.workflow_db_mgr.pri_dao.select_workflow_params())
assert w_params['is_paused'] == '0'
+
+
+async def test_record_only_non_clock_triggers(flow, run, scheduler):
+ """Database does not record wall_clock xtriggers.
+
+ https://github.com/cylc/cylc-flow/issues/5911
+ """
+ id_ = flow({
+ "scheduler": {
+ "allow implicit tasks": True,
+ 'cycle point format': '%Y'
+ },
+ "scheduling": {
+ "initial cycle point": "1348",
+ "xtriggers": {
+ "another": "xrandom(100)",
+ "wall_clock": "xrandom(100, _=Not a real wall clock trigger)",
+ "real_wall_clock": "wall_clock()"
+ },
+ "graph": {
+ "R1": "@another & @wall_clock & @real_wall_clock => foo"
+ }
+ },
+ })
+ # Run workflow unto completion:
+ schd = scheduler(id_, paused_start=False)
+ async with run(schd) as log:
+ while 'Workflow shutting down - AUTOMATIC' not in log.messages:
+ await asyncio.sleep(1)
+
+ # Get xtriggers db table:
+ info = schd.workflow_db_mgr.get_pri_dao().conn.execute(
+ 'SELECT * FROM xtriggers').fetchall()
+
+ # All xtriggers are xrandom: None are wall_clock:
+ assert all(i[0].startswith('xrandom') for i in info)
From 5a592f957104ad05e230c842d921d01912c61b6a Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Mon, 15 Jan 2024 11:17:54 +0000
Subject: [PATCH 2/7] Test that no wall_clock xtriggers are saved to the DB for
a retry.
---
tests/integration/test_workflow_db_mgr.py | 20 +++++++++++++++++---
1 file changed, 17 insertions(+), 3 deletions(-)
diff --git a/tests/integration/test_workflow_db_mgr.py b/tests/integration/test_workflow_db_mgr.py
index 9e9722caff3..56a26d93fab 100644
--- a/tests/integration/test_workflow_db_mgr.py
+++ b/tests/integration/test_workflow_db_mgr.py
@@ -134,10 +134,17 @@ async def test_record_only_non_clock_triggers(flow, run, scheduler):
"""Database does not record wall_clock xtriggers.
https://github.com/cylc/cylc-flow/issues/5911
+
+ Includes:
+ - Not in DB: A normal wall clock xtrigger (wall_clock).
+ - In DB: An xrandom mis-labelled as wall_clock trigger DB).
+ - Not in DB: An execution retry xtrigger.
+
+ @TODO: Refactor to use simulation mode to speedup after Simulation
+ mode upgrade bugfixes: This should speed this test up considerably.
"""
id_ = flow({
"scheduler": {
- "allow implicit tasks": True,
'cycle point format': '%Y'
},
"scheduling": {
@@ -151,12 +158,19 @@ async def test_record_only_non_clock_triggers(flow, run, scheduler):
"R1": "@another & @wall_clock & @real_wall_clock => foo"
}
},
+ 'runtime': {
+ 'foo': {
+ 'execution retry delays': 'PT0S',
+ 'script': (
+ 'test $CYLC_TASK_SUBMIT_NUMBER == 1 && exit 1 || exit 0')
+ }
+ }
})
# Run workflow unto completion:
- schd = scheduler(id_, paused_start=False)
+ schd = scheduler(id_, paused_start=False, run_mode='live')
async with run(schd) as log:
while 'Workflow shutting down - AUTOMATIC' not in log.messages:
- await asyncio.sleep(1)
+ await asyncio.sleep(0.5)
# Get xtriggers db table:
info = schd.workflow_db_mgr.get_pri_dao().conn.execute(
From 56e65fc61dc8733f3834e1fd22dc41f98c35a651 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Mon, 15 Jan 2024 15:53:19 +0000
Subject: [PATCH 3/7] Apply suggestions from code review
Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
---
tests/integration/test_workflow_db_mgr.py | 13 +++++--------
1 file changed, 5 insertions(+), 8 deletions(-)
diff --git a/tests/integration/test_workflow_db_mgr.py b/tests/integration/test_workflow_db_mgr.py
index 56a26d93fab..5b1df987302 100644
--- a/tests/integration/test_workflow_db_mgr.py
+++ b/tests/integration/test_workflow_db_mgr.py
@@ -161,8 +161,7 @@ async def test_record_only_non_clock_triggers(flow, run, scheduler):
'runtime': {
'foo': {
'execution retry delays': 'PT0S',
- 'script': (
- 'test $CYLC_TASK_SUBMIT_NUMBER == 1 && exit 1 || exit 0')
+ 'script': 'test $CYLC_TASK_SUBMIT_NUMBER != 1'
}
}
})
@@ -172,9 +171,7 @@ async def test_record_only_non_clock_triggers(flow, run, scheduler):
while 'Workflow shutting down - AUTOMATIC' not in log.messages:
await asyncio.sleep(0.5)
- # Get xtriggers db table:
- info = schd.workflow_db_mgr.get_pri_dao().conn.execute(
- 'SELECT * FROM xtriggers').fetchall()
-
- # All xtriggers are xrandom: None are wall_clock:
- assert all(i[0].startswith('xrandom') for i in info)
+ assert db_select(schd, False, 'xtriggers', 'signature') == [
+ ('xrandom(100)',),
+ ('xrandom(100, _=Not a real wall clock trigger)',)
+ ]
From d1e38aadc180244b224e9a2524dbcd2e6ec1e167 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Tue, 16 Jan 2024 14:21:20 +0000
Subject: [PATCH 4/7] Update cylc/flow/workflow_db_mgr.py
Co-authored-by: Oliver Sanders
---
cylc/flow/workflow_db_mgr.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/cylc/flow/workflow_db_mgr.py b/cylc/flow/workflow_db_mgr.py
index 1e58ca6c188..354fff76b1e 100644
--- a/cylc/flow/workflow_db_mgr.py
+++ b/cylc/flow/workflow_db_mgr.py
@@ -405,7 +405,7 @@ def put_task_event_timers(self, task_events_mgr):
def put_xtriggers(self, sat_xtrig):
"""Put statements to update external triggers table."""
for sig, res in sat_xtrig.items():
- if not sig.startswith('wall_clock(trigger_time='):
+ if not sig.startswith('wall_clock('):
self.db_inserts_map[self.TABLE_XTRIGGERS].append({
"signature": sig,
"results": json.dumps(res)})
From 65cc6120297d2302a3320892f0b5cd2325f78714 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Tue, 16 Jan 2024 15:54:11 +0000
Subject: [PATCH 5/7] wip
---
cylc/flow/task_pool.py | 4 ++
tests/integration/conftest.py | 88 ++++++++++++++++++++++-
tests/integration/test_workflow_db_mgr.py | 25 +++++--
3 files changed, 108 insertions(+), 9 deletions(-)
diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py
index 2885bacba7b..4478653da96 100644
--- a/cylc/flow/task_pool.py
+++ b/cylc/flow/task_pool.py
@@ -1733,6 +1733,7 @@ def sim_time_check(self, message_queue: 'Queue[TaskMsg]') -> bool:
"""Simulation mode: simulate task run times and set states."""
if not self.config.run_mode('simulation'):
return False
+ breakpoint(header='start of sim_time_check')
sim_task_state_changed = False
now = time()
for itask in self.get_tasks():
@@ -1753,11 +1754,14 @@ def sim_time_check(self, message_queue: 'Queue[TaskMsg]') -> bool:
) and (
itask.get_try_num() == 1 or not conf['fail try 1 only']
):
+ breakpoint(header='fail')
message_queue.put(
TaskMsg(job_d, now_str, 'CRITICAL', TASK_STATUS_FAILED)
)
else:
# Simulate message outputs.
+ breakpoint(header='success')
+
for msg in itask.tdef.rtconfig['outputs'].values():
message_queue.put(
TaskMsg(job_d, now_str, 'DEBUG', msg)
diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py
index f2f24b09ab5..b8ac755837e 100644
--- a/tests/integration/conftest.py
+++ b/tests/integration/conftest.py
@@ -20,11 +20,11 @@
from pathlib import Path
import pytest
from shutil import rmtree
+from time import time
from typing import List, TYPE_CHECKING, Set, Tuple, Union
from cylc.flow.config import WorkflowConfig
from cylc.flow.option_parsers import Options
-from cylc.flow.network.client import WorkflowRuntimeClient
from cylc.flow.pathutil import get_cylc_run_dir
from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow.scripts.validate import ValidateOptions
@@ -34,6 +34,7 @@
)
from cylc.flow.wallclock import get_current_time_string
from cylc.flow.workflow_files import infer_latest_run_from_id
+from cylc.flow.workflow_status import StopMode
from .utils import _rm_if_empty
from .utils.flow_tools import (
@@ -47,6 +48,7 @@
if TYPE_CHECKING:
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_proxy import TaskProxy
+ from cylc.flow.network.client import WorkflowRuntimeClient
InstallOpts = Options(install_gop())
@@ -323,7 +325,7 @@ def _inner(
def gql_query():
"""Execute a GraphQL query given a workflow runtime client."""
async def _gql_query(
- client: WorkflowRuntimeClient, query_str: str
+ client: 'WorkflowRuntimeClient', query_str: str
) -> object:
ret = await client.async_request(
'graphql', {
@@ -473,3 +475,85 @@ def _inner(source, **kwargs):
workflow_id = infer_latest_run_from_id(workflow_id)
return workflow_id
yield _inner
+
+
+@pytest.fixture
+def complete():
+ """Wait for the workflow, or tasks within it to complete.
+
+ Args:
+ schd:
+ The scheduler to await.
+ tokens_list:
+ If specified, this will wait for the tasks represented by these
+ tokens to be marked as completed by the task pool.
+ stop_mode:
+ If tokens_list is not provided, this will wait for the scheduler
+ to be shutdown with the specified mode (default = AUTO, i.e.
+ workflow completed normally).
+ timeout:
+ Max time to wait for the condition to be met.
+
+ Note, if you need to increase this, you might want to rethink your
+ test.
+
+ Note, use this timeout rather than wrapping the complete call with
+ async_timeout (handles shutdown logic more cleanly).
+
+ """
+ async def _complete(
+ schd,
+ *tokens_list,
+ stop_mode=StopMode.AUTO,
+ timeout=60,
+ ):
+ start_time = time()
+ tokens_list = [tokens.task for tokens in tokens_list]
+
+ # capture task completion
+ remove_if_complete = schd.pool.remove_if_complete
+
+ def _remove_if_complete(itask):
+ ret = remove_if_complete(itask)
+ if ret and itask.tokens.task in tokens_list:
+ tokens_list.remove(itask.tokens.task)
+ return ret
+
+ schd.pool.remove_if_complete = _remove_if_complete
+
+ # capture workflow shutdown
+ set_stop = schd._set_stop
+ has_shutdown = False
+
+ def _set_stop(mode=None):
+ nonlocal has_shutdown, stop_mode
+ if mode == stop_mode:
+ has_shutdown = True
+ return set_stop(mode)
+ else:
+ set_stop(mode)
+ raise Exception(f'Workflow bailed with stop mode = {mode}')
+
+ schd._set_stop = _set_stop
+
+ # determine the completion condition
+ if tokens_list:
+ condition = lambda: bool(tokens_list)
+ else:
+ condition = lambda: bool(not has_shutdown)
+
+ # wait for the condition to be met
+ while condition():
+ # allow the main loop to advance
+ await asyncio.sleep(0)
+ if time() - start_time > timeout:
+ raise Exception(
+ f'Timeout waiting for {", ".join(map(str, tokens_list))}'
+ )
+
+ # restore regular shutdown logic
+ schd._set_stop = set_stop
+
+ return _complete
+
+
diff --git a/tests/integration/test_workflow_db_mgr.py b/tests/integration/test_workflow_db_mgr.py
index 5b1df987302..52b7245e746 100644
--- a/tests/integration/test_workflow_db_mgr.py
+++ b/tests/integration/test_workflow_db_mgr.py
@@ -130,7 +130,9 @@ async def test_workflow_param_rapid_toggle(
assert w_params['is_paused'] == '0'
-async def test_record_only_non_clock_triggers(flow, run, scheduler):
+async def test_record_only_non_clock_triggers(
+ flow, run, scheduler, complete, db_select
+):
"""Database does not record wall_clock xtriggers.
https://github.com/cylc/cylc-flow/issues/5911
@@ -155,21 +157,30 @@ async def test_record_only_non_clock_triggers(flow, run, scheduler):
"real_wall_clock": "wall_clock()"
},
"graph": {
- "R1": "@another & @wall_clock & @real_wall_clock => foo"
+ "R1": """
+ @another & @wall_clock & @real_wall_clock => foo
+ @real_wall_clock => bar
+ """
}
},
'runtime': {
'foo': {
- 'execution retry delays': 'PT0S',
- 'script': 'test $CYLC_TASK_SUBMIT_NUMBER != 1'
+ 'execution retry delays': 'PT1S',
+ 'submission retry delays': 'PT0S',
+ 'simulation': {
+ 'default run length': 'PT0S',
+ 'fail cycle points': '1348'
+ }
}
}
})
# Run workflow unto completion:
- schd = scheduler(id_, paused_start=False, run_mode='live')
+ schd = scheduler(id_, paused_start=False, run_mode='simulation')
+ import asyncio
async with run(schd) as log:
- while 'Workflow shutting down - AUTOMATIC' not in log.messages:
- await asyncio.sleep(0.5)
+ #schd.pool.get_tasks()[0].state.status = 'running'
+ await complete(schd, timeout=10)
+
assert db_select(schd, False, 'xtriggers', 'signature') == [
('xrandom(100)',),
From e51cfe987b3f559df469661cf269075d20590d3f Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Fri, 26 Jan 2024 10:58:10 +0000
Subject: [PATCH 6/7] Response to review: Remove real script from test
workflow.
---
cylc/flow/task_pool.py | 4 --
tests/integration/test_workflow_db_mgr.py | 49 +++++++++++------------
2 files changed, 23 insertions(+), 30 deletions(-)
diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py
index 4478653da96..2885bacba7b 100644
--- a/cylc/flow/task_pool.py
+++ b/cylc/flow/task_pool.py
@@ -1733,7 +1733,6 @@ def sim_time_check(self, message_queue: 'Queue[TaskMsg]') -> bool:
"""Simulation mode: simulate task run times and set states."""
if not self.config.run_mode('simulation'):
return False
- breakpoint(header='start of sim_time_check')
sim_task_state_changed = False
now = time()
for itask in self.get_tasks():
@@ -1754,14 +1753,11 @@ def sim_time_check(self, message_queue: 'Queue[TaskMsg]') -> bool:
) and (
itask.get_try_num() == 1 or not conf['fail try 1 only']
):
- breakpoint(header='fail')
message_queue.put(
TaskMsg(job_d, now_str, 'CRITICAL', TASK_STATUS_FAILED)
)
else:
# Simulate message outputs.
- breakpoint(header='success')
-
for msg in itask.tdef.rtconfig['outputs'].values():
message_queue.put(
TaskMsg(job_d, now_str, 'DEBUG', msg)
diff --git a/tests/integration/test_workflow_db_mgr.py b/tests/integration/test_workflow_db_mgr.py
index 52b7245e746..70ec8d03daf 100644
--- a/tests/integration/test_workflow_db_mgr.py
+++ b/tests/integration/test_workflow_db_mgr.py
@@ -17,8 +17,12 @@
import asyncio
import pytest
import sqlite3
+from typing import TYPE_CHECKING
-from cylc.flow.scheduler import Scheduler
+from cylc.flow.cycling.iso8601 import ISO8601Point
+
+if TYPE_CHECKING:
+ from cylc.flow.scheduler import Scheduler
async def test_restart_number(
@@ -30,7 +34,7 @@ async def test_restart_number(
async def test(expected_restart_num: int, do_reload: bool = False):
"""(Re)start the workflow and check the restart number is as expected.
"""
- schd: Scheduler = scheduler(id_, paused_start=True)
+ schd: 'Scheduler' = scheduler(id_, paused_start=True)
async with start(schd) as log:
if do_reload:
schd.command_reload_workflow()
@@ -53,7 +57,7 @@ async def test(expected_restart_num: int, do_reload: bool = False):
await test(expected_restart_num=3)
-def db_remove_column(schd: Scheduler, table: str, column: str) -> None:
+def db_remove_column(schd: 'Scheduler', table: str, column: str) -> None:
"""Remove a column from a scheduler DB table.
ALTER TABLE DROP COLUMN is not supported by sqlite yet, so we have to copy
@@ -83,7 +87,7 @@ async def test_db_upgrade_pre_803(
id_ = flow(one_conf)
# Run a scheduler to create a DB.
- schd: Scheduler = scheduler(id_, paused_start=True)
+ schd: 'Scheduler' = scheduler(id_, paused_start=True)
async with start(schd):
assert ('n_restart', '0') in db_select(schd, False, 'workflow_params')
@@ -91,7 +95,7 @@ async def test_db_upgrade_pre_803(
db_remove_column(schd, "task_states", "is_manual_submit")
db_remove_column(schd, "task_jobs", "flow_nums")
- schd: Scheduler = scheduler(id_, paused_start=True)
+ schd: 'Scheduler' = scheduler(id_, paused_start=True)
# Restart should fail due to the missing column.
with pytest.raises(sqlite3.OperationalError):
@@ -99,7 +103,7 @@ async def test_db_upgrade_pre_803(
pass
assert ('n_restart', '1') in db_select(schd, False, 'workflow_params')
- schd: Scheduler = scheduler(id_, paused_start=True)
+ schd: 'Scheduler' = scheduler(id_, paused_start=True)
# Run the DB upgrader for version 8.0.2
# (8.0.2 requires upgrade)
@@ -118,7 +122,7 @@ async def test_workflow_param_rapid_toggle(
https://github.com/cylc/cylc-flow/issues/5593
"""
- schd: Scheduler = scheduler(flow(one_conf), paused_start=False)
+ schd: 'Scheduler' = scheduler(flow(one_conf), paused_start=False)
async with run(schd):
assert schd.is_paused is False
schd.pause_workflow()
@@ -145,12 +149,14 @@ async def test_record_only_non_clock_triggers(
@TODO: Refactor to use simulation mode to speedup after Simulation
mode upgrade bugfixes: This should speed this test up considerably.
"""
+ rawpoint = '1348'
id_ = flow({
"scheduler": {
- 'cycle point format': '%Y'
+ 'cycle point format': '%Y',
+ 'allow implicit tasks': True
},
"scheduling": {
- "initial cycle point": "1348",
+ "initial cycle point": rawpoint,
"xtriggers": {
"another": "xrandom(100)",
"wall_clock": "xrandom(100, _=Not a real wall clock trigger)",
@@ -163,26 +169,17 @@ async def test_record_only_non_clock_triggers(
"""
}
},
- 'runtime': {
- 'foo': {
- 'execution retry delays': 'PT1S',
- 'submission retry delays': 'PT0S',
- 'simulation': {
- 'default run length': 'PT0S',
- 'fail cycle points': '1348'
- }
- }
- }
})
- # Run workflow unto completion:
schd = scheduler(id_, paused_start=False, run_mode='simulation')
- import asyncio
- async with run(schd) as log:
- #schd.pool.get_tasks()[0].state.status = 'running'
- await complete(schd, timeout=10)
+ async with run(schd):
+ foo = schd.pool.get_task(ISO8601Point(rawpoint), 'foo')
+
+ # Wait until all xtriggers for foo are satisfied:
+ while not all(foo.state.xtriggers.values()):
+ await asyncio.sleep(1)
+ # Assert that (only) the real clock trigger is not in the db:
assert db_select(schd, False, 'xtriggers', 'signature') == [
('xrandom(100)',),
- ('xrandom(100, _=Not a real wall clock trigger)',)
- ]
+ ('xrandom(100, _=Not a real wall clock trigger)',)]
From 94f3a769360bc78c639f45d9ec25442244c73e5e Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Thu, 1 Feb 2024 13:26:36 +0000
Subject: [PATCH 7/7] use copy of master branch complete fixture
---
tests/integration/conftest.py | 2 --
tests/integration/test_workflow_db_mgr.py | 6 +-----
2 files changed, 1 insertion(+), 7 deletions(-)
diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py
index b8ac755837e..bd69bbaeed9 100644
--- a/tests/integration/conftest.py
+++ b/tests/integration/conftest.py
@@ -555,5 +555,3 @@ def _set_stop(mode=None):
schd._set_stop = set_stop
return _complete
-
-
diff --git a/tests/integration/test_workflow_db_mgr.py b/tests/integration/test_workflow_db_mgr.py
index 70ec8d03daf..cf4fca7e064 100644
--- a/tests/integration/test_workflow_db_mgr.py
+++ b/tests/integration/test_workflow_db_mgr.py
@@ -173,11 +173,7 @@ async def test_record_only_non_clock_triggers(
schd = scheduler(id_, paused_start=False, run_mode='simulation')
async with run(schd):
- foo = schd.pool.get_task(ISO8601Point(rawpoint), 'foo')
-
- # Wait until all xtriggers for foo are satisfied:
- while not all(foo.state.xtriggers.values()):
- await asyncio.sleep(1)
+ await complete(schd, timeout=20)
# Assert that (only) the real clock trigger is not in the db:
assert db_select(schd, False, 'xtriggers', 'signature') == [