From a3a87df323b4060f2fbc4f821ac89d633f85e8f0 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Thu, 27 Jun 2024 15:55:52 +0100 Subject: [PATCH] workflow_state: reject invalid configurations * Do not allow users to poll for transient statuses. * Reject invalid task states. * Reject polling waiting and preparing tasks (not reliably pollable). * Closes #6157 --- changes.d/6175.fix.md | 1 + cylc/flow/dbstatecheck.py | 28 ++++++++++++ cylc/flow/scripts/workflow_state.py | 39 ++++++++++------ cylc/flow/task_state.py | 28 ++++++++---- cylc/flow/xtriggers/workflow_state.py | 17 ++++++- .../xtriggers/01-workflow_state/flow.cylc | 2 +- .../workflow-state/11-multi/flow.cylc | 2 +- tests/unit/test_dbstatecheck.py | 44 +++++++++++++++++++ tests/unit/xtriggers/test_workflow_state.py | 32 ++++++++++++++ 9 files changed, 167 insertions(+), 26 deletions(-) create mode 100644 changes.d/6175.fix.md create mode 100644 tests/unit/test_dbstatecheck.py diff --git a/changes.d/6175.fix.md b/changes.d/6175.fix.md new file mode 100644 index 00000000000..b135207930e --- /dev/null +++ b/changes.d/6175.fix.md @@ -0,0 +1 @@ +The workflow-state command and xtrigger will now reject invalid polling arguments. diff --git a/cylc/flow/dbstatecheck.py b/cylc/flow/dbstatecheck.py index b38c394ccdb..2de4bd7b3b5 100644 --- a/cylc/flow/dbstatecheck.py +++ b/cylc/flow/dbstatecheck.py @@ -36,6 +36,10 @@ TASK_OUTPUT_FAILED, TASK_OUTPUT_FINISHED, ) +from cylc.flow.task_state import ( + TASK_STATE_MAP, + TASK_STATUSES_FINAL, +) from cylc.flow.util import deserialise_set from metomi.isodatetime.parsers import TimePointParser from metomi.isodatetime.exceptions import ISO8601SyntaxError @@ -244,6 +248,8 @@ def workflow_state_query( stmt_args = [] stmt_wheres = [] + check_polling_config(selector, is_trigger, is_message) + if is_trigger or is_message: target_table = CylcWorkflowDAO.TABLE_TASK_OUTPUTS mask = "name, cycle, outputs" @@ -363,3 +369,25 @@ def _selector_in_outputs(selector: str, outputs: Iterable[str]) -> bool: or TASK_OUTPUT_FAILED in outputs ) ) + + +def check_polling_config(selector, is_trigger, is_message): + """Check for invalid or unreliable polling configurations.""" + if selector and not (is_trigger or is_message): + # we are using task status polling + try: + trigger = TASK_STATE_MAP[selector] + except KeyError: + raise InputError(f'No such task state "{selector}"') + else: + if trigger is None: + raise InputError( + f'Cannot poll for the "{selector}" task state' + ) + + if selector not in TASK_STATUSES_FINAL: + raise InputError( + f'Polling for the "{selector}" task status is not' + ' reliable as it is a transient state.' + f'\nPoll for the "{trigger}" trigger instead.' + ) diff --git a/cylc/flow/scripts/workflow_state.py b/cylc/flow/scripts/workflow_state.py index f6350110223..eed189a7113 100755 --- a/cylc/flow/scripts/workflow_state.py +++ b/cylc/flow/scripts/workflow_state.py @@ -33,8 +33,8 @@ so you can start checking before the target workflow is started. Legacy (pre-8.3.0) options are supported, but deprecated, for existing scripts: - cylc workflow-state --task=NAME --point=CYCLE --status=STATUS - --output=MESSAGE --message=MESSAGE --task-point WORKFLOW + cylc workflow-state --task=NAME --point=CYCLE --status=STATUS + --output=MESSAGE --message=MESSAGE --task-point WORKFLOW (Note from 8.0 until 8.3.0 --output and --message both match task messages). In "cycle/task:selector" the selector will match task statuses, unless: @@ -55,24 +55,23 @@ Flow numbers are only printed for flow numbers > 1. -USE IN TASK SCRIPTING: +Use in task scripting: - To poll a task at the same cycle point in another workflow, just use $CYLC_TASK_CYCLE_POINT in the ID. - To poll a task at an offset cycle point, use the --offset option to have Cylc do the datetime arithmetic for you. - However, see also the workflow_state xtrigger for this use case. -WARNINGS: - - Typos in the workflow or task ID will result in fruitless polling. - - To avoid missing transient states ("submitted", "running") poll for the - corresponding output trigger instead ("submitted", "started"). - - Cycle points are auto-converted to the DB point format (and UTC mode). - - Task outputs manually completed by "cylc set" have "(force-completed)" - recorded as the task message in the DB, so it is best to query trigger - names, not messages, unless specifically interested in forced outputs. +Warnings: + - Typos in the workflow or task ID will result in fruitless polling. + - To avoid missing transient states ("submitted", "running") poll for the + corresponding output trigger instead ("submitted", "started"). + - Cycle points are auto-converted to the DB point format (and UTC mode). + - Task outputs manually completed by "cylc set" have "(force-completed)" + recorded as the task message in the DB, so it is best to query trigger + names, not messages, unless specifically interested in forced outputs. Examples: - # Print the status of all tasks in WORKFLOW: $ cylc workflow-state WORKFLOW @@ -115,7 +114,11 @@ from cylc.flow.dbstatecheck import CylcWorkflowDBChecker from cylc.flow.terminal import cli_function from cylc.flow.workflow_files import infer_latest_run_from_id -from cylc.flow.task_state import TASK_STATUSES_ORDERED +from cylc.flow.task_state import ( + TASK_STATUSES_ORDERED, + TASK_STATUSES_FINAL, + TASK_STATUSES_ALL, +) if TYPE_CHECKING: from optparse import Values @@ -363,7 +366,6 @@ def get_option_parser() -> COP: @cli_function(get_option_parser, remove_opts=["--db"]) def main(parser: COP, options: 'Values', *ids: str) -> None: - # Note it would be cleaner to use 'id_cli.parse_ids()' here to get the # workflow ID and tokens, but that function infers run number and fails # if the workflow is not installed yet. We want to be able to start polling @@ -427,6 +429,15 @@ def main(parser: COP, options: 'Values', *ids: str) -> None: msg += id_ else: msg += id_.replace(options.depr_point, "$CYLC_TASK_CYCLE_POINT") + + if ( + options.depr_status + and options.depr_status in TASK_STATUSES_ALL + and options.depr_status not in TASK_STATUSES_FINAL + ): + # polling for non-final task statuses is flaky + msg += ' and the --triggers option' + LOG.warning(msg) poller = WorkflowPoller( diff --git a/cylc/flow/task_state.py b/cylc/flow/task_state.py index ebb3dbc985b..5b77aedf79f 100644 --- a/cylc/flow/task_state.py +++ b/cylc/flow/task_state.py @@ -19,7 +19,15 @@ from typing import List, Iterable, Set, TYPE_CHECKING from cylc.flow.prerequisite import Prerequisite -from cylc.flow.task_outputs import TaskOutputs +from cylc.flow.task_outputs import ( + TASK_OUTPUT_EXPIRED, + TASK_OUTPUT_FAILED, + TASK_OUTPUT_STARTED, + TASK_OUTPUT_SUBMITTED, + TASK_OUTPUT_SUBMIT_FAILED, + TASK_OUTPUT_SUCCEEDED, + TaskOutputs, +) from cylc.flow.wallclock import get_current_time_string if TYPE_CHECKING: @@ -144,13 +152,17 @@ TASK_STATUS_RUNNING, } -# Task statuses that can be manually triggered. -TASK_STATUSES_TRIGGERABLE = { - TASK_STATUS_WAITING, - TASK_STATUS_EXPIRED, - TASK_STATUS_SUBMIT_FAILED, - TASK_STATUS_SUCCEEDED, - TASK_STATUS_FAILED, +# Mapping between task outputs and their corresponding states +TASK_STATE_MAP = { + # status: trigger + TASK_STATUS_WAITING: None, + TASK_STATUS_EXPIRED: TASK_OUTPUT_EXPIRED, + TASK_STATUS_PREPARING: None, + TASK_STATUS_SUBMIT_FAILED: TASK_OUTPUT_SUBMIT_FAILED, + TASK_STATUS_SUBMITTED: TASK_OUTPUT_SUBMITTED, + TASK_STATUS_RUNNING: TASK_OUTPUT_STARTED, + TASK_STATUS_FAILED: TASK_OUTPUT_FAILED, + TASK_STATUS_SUCCEEDED: TASK_OUTPUT_SUCCEEDED, } diff --git a/cylc/flow/xtriggers/workflow_state.py b/cylc/flow/xtriggers/workflow_state.py index e6025a561f8..14948a43390 100644 --- a/cylc/flow/xtriggers/workflow_state.py +++ b/cylc/flow/xtriggers/workflow_state.py @@ -20,8 +20,12 @@ from cylc.flow.scripts.workflow_state import WorkflowPoller from cylc.flow.id import tokenise -from cylc.flow.exceptions import WorkflowConfigError +from cylc.flow.exceptions import WorkflowConfigError, InputError from cylc.flow.task_state import TASK_STATUS_SUCCEEDED +from cylc.flow.dbstatecheck import check_polling_config + + +DEFAULT_STATUS = TASK_STATUS_SUCCEEDED def workflow_state( @@ -84,7 +88,7 @@ def workflow_state( offset, flow_num, alt_cylc_run_dir, - TASK_STATUS_SUCCEEDED, + DEFAULT_STATUS, is_trigger, is_message, old_format=False, condition=workflow_task_id, @@ -151,6 +155,15 @@ def validate(args: Dict[str, Any]): ): raise WorkflowConfigError("flow_num must be an integer if given.") + try: + check_polling_config( + tokens['cycle_sel'] or tokens['task_sel'] or DEFAULT_STATUS, + args['is_trigger'], + args['is_message'], + ) + except InputError as exc: + raise WorkflowConfigError(str(exc)) from None + # BACK COMPAT: workflow_state_backcompat # from: 8.0.0 diff --git a/tests/flakyfunctional/xtriggers/01-workflow_state/flow.cylc b/tests/flakyfunctional/xtriggers/01-workflow_state/flow.cylc index 609b0be3a05..76b4efd2813 100644 --- a/tests/flakyfunctional/xtriggers/01-workflow_state/flow.cylc +++ b/tests/flakyfunctional/xtriggers/01-workflow_state/flow.cylc @@ -8,7 +8,7 @@ initial cycle point = 2011 final cycle point = 2016 [[xtriggers]] - upstream = workflow_state("{{UPSTREAM}}//%(point)s/foo:data_ready"):PT1S + upstream = workflow_state("{{UPSTREAM}}//%(point)s/foo:data_ready", is_trigger=True):PT1S [[graph]] P1Y = """ foo diff --git a/tests/functional/workflow-state/11-multi/flow.cylc b/tests/functional/workflow-state/11-multi/flow.cylc index a0ab61e9312..fc8928646ab 100644 --- a/tests/functional/workflow-state/11-multi/flow.cylc +++ b/tests/functional/workflow-state/11-multi/flow.cylc @@ -23,7 +23,7 @@ # Cylc 8 new (from 8.3.0) c1 = workflow_state(c8b//1/foo, offset=P0, alt_cylc_run_dir={{ALT}}):PT1S c2 = workflow_state(c8b//1/foo:succeeded, offset=P0, alt_cylc_run_dir={{ALT}}):PT1S - c3 = workflow_state(c8b//1/foo:x, offset=P0, alt_cylc_run_dir={{ALT}}):PT1S + c3 = workflow_state(c8b//1/foo:x, offset=P0, alt_cylc_run_dir={{ALT}}, is_trigger=True):PT1S c4 = workflow_state(c8b//1/foo:"the quick brown", offset=P0, is_message=True, alt_cylc_run_dir={{ALT}}):PT1S [[graph]] diff --git a/tests/unit/test_dbstatecheck.py b/tests/unit/test_dbstatecheck.py new file mode 100644 index 00000000000..7fd1de94edd --- /dev/null +++ b/tests/unit/test_dbstatecheck.py @@ -0,0 +1,44 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from cylc.flow.dbstatecheck import check_polling_config +from cylc.flow.exceptions import InputError + +import pytest + + +def test_check_polling_config(): + """It should reject invalid or unreliable polling configurations. + + See https://github.com/cylc/cylc-flow/issues/6157 + """ + # invalid polling use cases + with pytest.raises(InputError, match='No such task state'): + check_polling_config('elephant', False, False) + + with pytest.raises(InputError, match='Cannot poll for'): + check_polling_config('waiting', False, False) + + with pytest.raises(InputError, match='is not reliable'): + check_polling_config('running', False, False) + + # valid polling use cases + check_polling_config('started', True, False) + check_polling_config('started', False, True) + + # valid query use cases + check_polling_config(None, False, True) + check_polling_config(None, False, False) diff --git a/tests/unit/xtriggers/test_workflow_state.py b/tests/unit/xtriggers/test_workflow_state.py index 5420a4fd909..d376da6c727 100644 --- a/tests/unit/xtriggers/test_workflow_state.py +++ b/tests/unit/xtriggers/test_workflow_state.py @@ -263,6 +263,8 @@ def test_validate_ok(): """Validate returns ok with valid args.""" validate({ 'workflow_task_id': 'foo//1/bar', + 'is_trigger': False, + 'is_message': False, 'offset': 'PT1H', 'flow_num': 44, }) @@ -292,3 +294,33 @@ def test_validate_fail_non_int_flow(flow_num): 'offset': 'PT1H', 'flow_num': flow_num, }) + + +def test_validate_polling_config(): + """It should reject invalid or unreliable polling configurations. + + See https://github.com/cylc/cylc-flow/issues/6157 + """ + with pytest.raises(WorkflowConfigError, match='No such task state'): + validate({ + 'workflow_task_id': 'foo//1/bar:elephant', + 'is_trigger': False, + 'is_message': False, + 'flow_num': 44, + }) + + with pytest.raises(WorkflowConfigError, match='Cannot poll for'): + validate({ + 'workflow_task_id': 'foo//1/bar:waiting', + 'is_trigger': False, + 'is_message': False, + 'flow_num': 44, + }) + + with pytest.raises(WorkflowConfigError, match='is not reliable'): + validate({ + 'workflow_task_id': 'foo//1/bar:submitted', + 'is_trigger': False, + 'is_message': False, + 'flow_num': 44, + })