diff --git a/changes.d/6583.fix.md b/changes.d/6583.fix.md new file mode 100644 index 0000000000..d7a2ce4d8d --- /dev/null +++ b/changes.d/6583.fix.md @@ -0,0 +1 @@ +Fix bug where graph items with undefined outputs were missed at validation if the graph item was not an upstream dependency of another graph item. diff --git a/cylc/flow/config.py b/cylc/flow/config.py index dbf9316218..48c71e1122 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -1844,6 +1844,7 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, triggers = {} xtrig_labels = set() + for left in left_nodes: if left.startswith('@'): xtrig_labels.add(left[1:]) @@ -2265,6 +2266,17 @@ def load_graph(self): self.workflow_polling_tasks.update( parser.workflow_state_polling_tasks) self._proc_triggers(parser, seq, task_triggers) + self.check_outputs( + [ + task_output + for task_output in parser.task_output_opt + if task_output[0] + in [ + task_output.split(':')[0] + for task_output in parser.terminals + ] + ] + ) self.set_required_outputs(task_output_opt) @@ -2278,6 +2290,24 @@ def load_graph(self): for tdef in self.taskdefs.values(): tdef.tweak_outputs() + def check_outputs(self, tasks_and_outputs: Iterable[Tuple[str]]) -> None: + """Check that task outputs have been registered with tasks. + + Args: tasks_and_outputs: ((task, output), ...) + + Raises: WorkflowConfigError is a user has defined a task with a + custom output, but has not registered a custom output. + """ + for task, output in tasks_and_outputs: + registered_outputs = self.cfg['runtime'][task]['outputs'] + if ( + not TaskOutputs.is_valid_std_name(output) + and output not in registered_outputs + ): + raise WorkflowConfigError( + f"Undefined custom output: {task}:{output}" + ) + def _proc_triggers(self, parser, seq, task_triggers): """Define graph edges, taskdefs, and triggers, from graph sections.""" suicides = 0 diff --git a/cylc/flow/graph_parser.py b/cylc/flow/graph_parser.py index 2804f9a67c..1d0dff38d0 100644 --- a/cylc/flow/graph_parser.py +++ b/cylc/flow/graph_parser.py @@ -470,10 +470,12 @@ def parse_graph(self, graph_string: str) -> None: pairs.add((chain[i], chain[i + 1])) # Get a set of RH nodes which are not at the LH of another pair: - terminals = {p[1] for p in pairs}.difference({p[0] for p in pairs}) + self.terminals = {p[1] for p in pairs}.difference( + {p[0] for p in pairs} + ) for pair in sorted(pairs, key=lambda p: str(p[0])): - self._proc_dep_pair(pair, terminals) + self._proc_dep_pair(pair, self.terminals) @classmethod def _report_invalid_lines(cls, lines: List[str]) -> None: diff --git a/tests/integration/test_dbstatecheck.py b/tests/integration/test_dbstatecheck.py index 16f4a7bb46..44c026ed3a 100644 --- a/tests/integration/test_dbstatecheck.py +++ b/tests/integration/test_dbstatecheck.py @@ -45,7 +45,7 @@ async def checker( }, 'runtime': { 'bad': {'simulation': {'fail cycle points': '1000'}}, - 'output': {'outputs': {'trigger': 'message'}} + 'output': {'outputs': {'trigger': 'message', 'custom_output': 'foo'}} } }) schd: Scheduler = mod_scheduler(wid, paused_start=False) @@ -119,13 +119,13 @@ def test_output(checker): 'output', '10000101T0000Z', "{'submitted': 'submitted', 'started': 'started', 'succeeded': " - "'succeeded', 'trigger': 'message'}", + "'succeeded', 'trigger': 'message', 'custom_output': 'foo'}", ], [ 'output', '10010101T0000Z', "{'submitted': 'submitted', 'started': 'started', 'succeeded': " - "'succeeded', 'trigger': 'message'}", + "'succeeded', 'trigger': 'message', 'custom_output': 'foo'}", ], ] assert result == expect diff --git a/tests/integration/test_optional_outputs.py b/tests/integration/test_optional_outputs.py index da4ff989b3..04cabdeceb 100644 --- a/tests/integration/test_optional_outputs.py +++ b/tests/integration/test_optional_outputs.py @@ -288,11 +288,7 @@ def implicit_completion_config(mod_flow, mod_validate): }, 'runtime': { 'root': { - 'outputs': { - 'x': 'xxx', - 'y': 'yyy', - 'z': 'zzz', - } + 'outputs': {x: f'{x * 3}' for x in 'abcdefghijklxyz'} } } })