Skip to content

Commit

Permalink
Don't record satisfied wall clock triggers in the DB (#5923)
Browse files Browse the repository at this point in the history
Don't record satisfied xtriggers in the DB

* Closes #5911
* Test that no wall_clock xtriggers are saved to the DB for a retry.

---------

Co-authored-by: Ronnie Dutta <[email protected]>
Co-authored-by: Oliver Sanders <[email protected]>
  • Loading branch information
3 people authored Feb 5, 2024
1 parent f4e2853 commit 57fe4ae
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 12 deletions.
7 changes: 4 additions & 3 deletions cylc/flow/workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,9 +405,10 @@ def put_task_event_timers(self, task_events_mgr):
def put_xtriggers(self, sat_xtrig):
"""Put statements to update external triggers table."""
for sig, res in sat_xtrig.items():
self.db_inserts_map[self.TABLE_XTRIGGERS].append({
"signature": sig,
"results": json.dumps(res)})
if not sig.startswith('wall_clock('):
self.db_inserts_map[self.TABLE_XTRIGGERS].append({
"signature": sig,
"results": json.dumps(res)})

def put_update_task_state(self, itask):
"""Update task_states table for current state of itask.
Expand Down
86 changes: 84 additions & 2 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
from pathlib import Path
import pytest
from shutil import rmtree
from time import time
from typing import List, TYPE_CHECKING, Set, Tuple, Union

from cylc.flow.config import WorkflowConfig
from cylc.flow.option_parsers import Options
from cylc.flow.network.client import WorkflowRuntimeClient
from cylc.flow.pathutil import get_cylc_run_dir
from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow.scripts.validate import ValidateOptions
Expand All @@ -34,6 +34,7 @@
)
from cylc.flow.wallclock import get_current_time_string
from cylc.flow.workflow_files import infer_latest_run_from_id
from cylc.flow.workflow_status import StopMode

from .utils import _rm_if_empty
from .utils.flow_tools import (
Expand All @@ -47,6 +48,7 @@
if TYPE_CHECKING:
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.network.client import WorkflowRuntimeClient


InstallOpts = Options(install_gop())
Expand Down Expand Up @@ -323,7 +325,7 @@ def _inner(
def gql_query():
"""Execute a GraphQL query given a workflow runtime client."""
async def _gql_query(
client: WorkflowRuntimeClient, query_str: str
client: 'WorkflowRuntimeClient', query_str: str
) -> object:
ret = await client.async_request(
'graphql', {
Expand Down Expand Up @@ -473,3 +475,83 @@ def _inner(source, **kwargs):
workflow_id = infer_latest_run_from_id(workflow_id)
return workflow_id
yield _inner


@pytest.fixture
def complete():
"""Wait for the workflow, or tasks within it to complete.
Args:
schd:
The scheduler to await.
tokens_list:
If specified, this will wait for the tasks represented by these
tokens to be marked as completed by the task pool.
stop_mode:
If tokens_list is not provided, this will wait for the scheduler
to be shutdown with the specified mode (default = AUTO, i.e.
workflow completed normally).
timeout:
Max time to wait for the condition to be met.
Note, if you need to increase this, you might want to rethink your
test.
Note, use this timeout rather than wrapping the complete call with
async_timeout (handles shutdown logic more cleanly).
"""
async def _complete(
schd,
*tokens_list,
stop_mode=StopMode.AUTO,
timeout=60,
):
start_time = time()
tokens_list = [tokens.task for tokens in tokens_list]

# capture task completion
remove_if_complete = schd.pool.remove_if_complete

def _remove_if_complete(itask):
ret = remove_if_complete(itask)
if ret and itask.tokens.task in tokens_list:
tokens_list.remove(itask.tokens.task)
return ret

schd.pool.remove_if_complete = _remove_if_complete

# capture workflow shutdown
set_stop = schd._set_stop
has_shutdown = False

def _set_stop(mode=None):
nonlocal has_shutdown, stop_mode
if mode == stop_mode:
has_shutdown = True
return set_stop(mode)
else:
set_stop(mode)
raise Exception(f'Workflow bailed with stop mode = {mode}')

schd._set_stop = _set_stop

# determine the completion condition
if tokens_list:
condition = lambda: bool(tokens_list)
else:
condition = lambda: bool(not has_shutdown)

# wait for the condition to be met
while condition():
# allow the main loop to advance
await asyncio.sleep(0)
if time() - start_time > timeout:
raise Exception(
f'Timeout waiting for {", ".join(map(str, tokens_list))}'
)

# restore regular shutdown logic
schd._set_stop = set_stop

return _complete
66 changes: 59 additions & 7 deletions tests/integration/test_workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import asyncio
import pytest
import sqlite3
from typing import TYPE_CHECKING

from cylc.flow.scheduler import Scheduler
from cylc.flow.cycling.iso8601 import ISO8601Point

if TYPE_CHECKING:
from cylc.flow.scheduler import Scheduler


async def test_restart_number(
Expand All @@ -29,7 +34,7 @@ async def test_restart_number(
async def test(expected_restart_num: int, do_reload: bool = False):
"""(Re)start the workflow and check the restart number is as expected.
"""
schd: Scheduler = scheduler(id_, paused_start=True)
schd: 'Scheduler' = scheduler(id_, paused_start=True)
async with start(schd) as log:
if do_reload:
schd.command_reload_workflow()
Expand All @@ -52,7 +57,7 @@ async def test(expected_restart_num: int, do_reload: bool = False):
await test(expected_restart_num=3)


def db_remove_column(schd: Scheduler, table: str, column: str) -> None:
def db_remove_column(schd: 'Scheduler', table: str, column: str) -> None:
"""Remove a column from a scheduler DB table.
ALTER TABLE DROP COLUMN is not supported by sqlite yet, so we have to copy
Expand Down Expand Up @@ -82,23 +87,23 @@ async def test_db_upgrade_pre_803(
id_ = flow(one_conf)

# Run a scheduler to create a DB.
schd: Scheduler = scheduler(id_, paused_start=True)
schd: 'Scheduler' = scheduler(id_, paused_start=True)
async with start(schd):
assert ('n_restart', '0') in db_select(schd, False, 'workflow_params')

# Remove task_states:is_manual_submit to fake a pre-8.0.3 DB.
db_remove_column(schd, "task_states", "is_manual_submit")
db_remove_column(schd, "task_jobs", "flow_nums")

schd: Scheduler = scheduler(id_, paused_start=True)
schd: 'Scheduler' = scheduler(id_, paused_start=True)

# Restart should fail due to the missing column.
with pytest.raises(sqlite3.OperationalError):
async with start(schd):
pass
assert ('n_restart', '1') in db_select(schd, False, 'workflow_params')

schd: Scheduler = scheduler(id_, paused_start=True)
schd: 'Scheduler' = scheduler(id_, paused_start=True)

# Run the DB upgrader for version 8.0.2
# (8.0.2 requires upgrade)
Expand All @@ -117,7 +122,7 @@ async def test_workflow_param_rapid_toggle(
https://github.com/cylc/cylc-flow/issues/5593
"""
schd: Scheduler = scheduler(flow(one_conf), paused_start=False)
schd: 'Scheduler' = scheduler(flow(one_conf), paused_start=False)
async with run(schd):
assert schd.is_paused is False
schd.pause_workflow()
Expand All @@ -127,3 +132,50 @@ async def test_workflow_param_rapid_toggle(

w_params = dict(schd.workflow_db_mgr.pri_dao.select_workflow_params())
assert w_params['is_paused'] == '0'


async def test_record_only_non_clock_triggers(
flow, run, scheduler, complete, db_select
):
"""Database does not record wall_clock xtriggers.
https://github.com/cylc/cylc-flow/issues/5911
Includes:
- Not in DB: A normal wall clock xtrigger (wall_clock).
- In DB: An xrandom mis-labelled as wall_clock trigger DB).
- Not in DB: An execution retry xtrigger.
@TODO: Refactor to use simulation mode to speedup after Simulation
mode upgrade bugfixes: This should speed this test up considerably.
"""
rawpoint = '1348'
id_ = flow({
"scheduler": {
'cycle point format': '%Y',
'allow implicit tasks': True
},
"scheduling": {
"initial cycle point": rawpoint,
"xtriggers": {
"another": "xrandom(100)",
"wall_clock": "xrandom(100, _=Not a real wall clock trigger)",
"real_wall_clock": "wall_clock()"
},
"graph": {
"R1": """
@another & @wall_clock & @real_wall_clock => foo
@real_wall_clock => bar
"""
}
},
})
schd = scheduler(id_, paused_start=False, run_mode='simulation')

async with run(schd):
await complete(schd, timeout=20)

# Assert that (only) the real clock trigger is not in the db:
assert db_select(schd, False, 'xtriggers', 'signature') == [
('xrandom(100)',),
('xrandom(100, _=Not a real wall clock trigger)',)]

0 comments on commit 57fe4ae

Please sign in to comment.