From 2b67dc7a6a268265595a4b8b9fa297851d84ad14 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Tue, 1 Aug 2023 19:08:25 +1200 Subject: [PATCH] implement set-task --pre [skip ci] --- cylc/flow/network/resolvers.py | 10 +++-- cylc/flow/network/schema.py | 1 - cylc/flow/scheduler.py | 5 ++- cylc/flow/scripts/set_task.py | 30 ++++++++++++- cylc/flow/task_pool.py | 81 +++++++++++++++++++++++++--------- 5 files changed, 98 insertions(+), 29 deletions(-) diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index dbe68b47d59..24c8bb7ab77 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -811,6 +811,7 @@ def force_spawn_children( self, tasks: Iterable[str], outputs: Optional[Iterable[str]] = None, + prerequisites: Optional[Iterable[str]] = None, flow: Iterable[str] = None, flow_wait: bool = False, flow_descr: str = "" @@ -820,10 +821,10 @@ def force_spawn_children( User-facing method name: set_task. Args: - tasks: List of identifiers or task globs. - outputs: List of outputs to spawn on. - flow (list): - Flow ownership of triggered tasks. + tasks: Identifiers or task globs. + outputs: Outputs to set complete. + prerequisites: Prerequisites to set satisfied. + flow: Flows that spawned tasks should belong to. """ self.schd.command_queue.put( ( @@ -831,6 +832,7 @@ def force_spawn_children( (tasks,), { "outputs": outputs, + "prerequisites": prerequisites, "flow": flow, "flow_wait": flow_wait, "flow_descr": flow_descr, diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 6e16802d718..65e6d0072d6 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -2111,7 +2111,6 @@ class Meta: class Arguments(TaskMutation.Arguments, FlowMutationArguments): outputs = graphene.List( String, - default_value=[TASK_OUTPUT_SUCCEEDED], description='List of task outputs to set complete.' ) prerequisites = graphene.List( diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index cd530f3c1d5..2bb715d269a 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1676,6 +1676,7 @@ async def main_loop(self) -> None: tinit = time() # Useful for debugging core scheduler issues: + # import logging # self.pool.log_task_pool(logging.CRITICAL) if self.incomplete_ri_map: self.manage_remote_init() @@ -2125,14 +2126,14 @@ def command_force_trigger_tasks(self, items, flow, flow_wait, flow_descr): items, flow, flow_wait, flow_descr) def command_force_spawn_children( - self, items, outputs, flow, flow_wait, flow_descr + self, items, outputs, prerequisites, flow, flow_wait, flow_descr ): """Force spawn task successors. User-facing method name: set_task. """ return self.pool.force_spawn_children( - items, outputs, flow, flow_wait, flow_descr + items, outputs, prerequisites, flow, flow_wait, flow_descr ) def _update_profile_info(self, category, amount, amount_format="%s"): diff --git a/cylc/flow/scripts/set_task.py b/cylc/flow/scripts/set_task.py index f3bf47e3ecd..ec5e530966a 100755 --- a/cylc/flow/scripts/set_task.py +++ b/cylc/flow/scripts/set_task.py @@ -31,11 +31,15 @@ - started implies submitted - succeeded and failed imply started - custom outputs and expired do not imply any other outputs + +Specify prerequisites in the form "point/task:message". + """ from functools import partial from optparse import Values +from cylc.flow.exceptions import InputError from cylc.flow.network.client_factory import get_client from cylc.flow.network.multi import call_multi from cylc.flow.option_parsers import ( @@ -53,6 +57,8 @@ ERR_OPT_FLOW_WAIT, validate_flow_opts ) +from cylc.flow.task_id import TaskID +from cylc.flow.task_pool import REC_CLI_PREREQ MUTATION = ''' @@ -105,6 +111,7 @@ def get_option_parser() -> COP: "Set task prerequisites satisfied." ' May be "all", which is equivalent to "cylc trigger".' " (Multiple use allowed, may be comma separated)." + " Prerequisite format: 'point/task:message'." ), action="append", default=None, dest="prerequisites" ) @@ -134,6 +141,25 @@ def get_option_parser() -> COP: return parser +def get_prerequisite_opts(options): + """Convert prerequisite inputs to a single list, and validate. + + This: + --pre=a -pre=b,c + is equivalent to this: + --pre=a,b,c + + Validation: format /: + """ + result = [] + for p in options.prerequisites: + result += p.split(',') + for p in result: + if not REC_CLI_PREREQ.match(p): + raise InputError(f"Bad prerequisite: {p}") + return result + + async def run(options: 'Values', workflow_id: str, *tokens_list) -> None: pclient = get_client(workflow_id, timeout=options.comms_timeout) @@ -146,7 +172,7 @@ async def run(options: 'Values', workflow_id: str, *tokens_list) -> None: for tokens in tokens_list ], 'outputs': options.outputs, - 'prerequisites': options.prerequisites, + 'prerequisites': get_prerequisite_opts(options), 'flow': options.flow, 'flowWait': options.flow_wait, 'flowDescr': options.flow_descr @@ -158,8 +184,10 @@ async def run(options: 'Values', workflow_id: str, *tokens_list) -> None: @cli_function(get_option_parser) def main(parser: COP, options: 'Values', *ids) -> None: + if options.flow is None: options.flow = [FLOW_ALL] # default to all active flows + validate_flow_opts(options) call_multi( diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 7e828c77502..f7e5b6e4b03 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -16,6 +16,7 @@ """Wrangle task proxies to manage the workflow.""" +import re from contextlib import suppress from collections import Counter import json @@ -85,6 +86,15 @@ Pool = Dict['PointBase', Dict[str, TaskProxy]] +# CLI prerequisite pattern: point/name:label +REC_CLI_PREREQ = re.compile( + f"({TaskID.POINT_RE})" + + f"{TaskID.DELIM2}" + + f"({TaskID.NAME_RE})" + + ':' + '(\w+)' # TODO: formally define qualifier RE? +) + + class TaskPool: """Task pool of a workflow.""" @@ -702,7 +712,7 @@ def _get_spawned_or_merged_task( # ntask does not exist: spawn it in the flow. ntask = self.spawn_task(name, point, flow_nums) else: - # ntask already exists (n=0 or incomplete): merge flows. + # ntask already exists (n=0): merge flows. self.merge_flows(ntask, flow_nums) return ntask # may be None @@ -1259,7 +1269,7 @@ def spawn_on_output(self, itask, output, forced=False): Args: tasks: List of identifiers or task globs. - outputs: List of outputs to spawn on. + output: Output to spawn on. forced: If True this is a manual spawn command. """ @@ -1576,45 +1586,74 @@ def spawn_task( self.db_add_new_flow_rows(itask) return itask + # TODO RENAME THIS METHOD def force_spawn_children( self, items: Iterable[str], outputs: List[str], + prerequisites: List[str], flow: List[str], flow_wait: bool = False, flow_descr: str = "", ): - """Spawn downstream children of given outputs, on user command. + """Force set prerequistes satisfied and outputs completed. + + For prerequisites: + - spawn target task if necessary, and set the prerequisites - User-facing command name: set_task. Creates a transient parent just - for the purpose of spawning children. + For outputs: + - spawn child tasks if necessary, and spawn/update prereqs of + children + - TODO: set outputs completed in the target task (DB, and task + proxy if already spawned - but don't spawn a new one) Args: - items: Identifiers for matching task definitions, each with the - form "point/name". - outputs: List of outputs to spawn on - flow: Flow number to attribute the outputs + items: Identifiers for matching task definitions + prerequisites: prerequisites to set and spawn children of + outputs: Outputs to set and spawn children of + flow: Flow numbers for spawned or updated tasks + flow_wait: wait for flows to catch up before continuing + flow_descr: description of new flow """ - outputs = outputs or [TASK_OUTPUT_SUCCEEDED] + if not outputs and not prerequisites: + # Default: set all required outputs. + outputs = outputs or [TASK_OUTPUT_SUCCEEDED] + flow_nums = self._flow_cmd_helper(flow) if flow_nums is None: return n_warnings, task_items = self.match_taskdefs(items) for (_, point), taskdef in sorted(task_items.items()): - # This the parent task: - itask = TaskProxy( - self.tokens, - taskdef, - point, - flow_nums=flow_nums, + + itask = self._get_spawned_or_merged_task( + point, taskdef.name, flow_nums ) - # Spawn children of selected outputs. - for trig, out, _ in itask.state.outputs.get_all(): - if trig in outputs: - LOG.info(f"[{itask}] Forced spawning on {out}") - self.spawn_on_output(itask, out, forced=True) + if itask is None: + # Not in pool but was spawned already in this flow. + return + + if outputs: + # Spawn children of outputs, add them to the pool. + # (Don't add the target task to pool if we just spawned it) + for trig, out, _ in itask.state.outputs.get_all(): + if trig in outputs: + LOG.info(f"[{itask}] Forced spawning on {out}") + self.spawn_on_output(itask, out, forced=True) + self.workflow_db_mgr.put_update_task_outputs(itask) + + if prerequisites: + for pre in prerequisites: + m = REC_CLI_PREREQ.match(pre) + itask.state.satisfy_me({m.groups()}) + + self.data_store_mgr.delta_task_prerequisite(itask) + self.add_to_pool(itask) # move from hidden if necessary + if itask.point <= self.runahead_limit_point: + self.rh_release_and_queue(itask) + + def _get_active_flow_nums(self) -> Set[int]: """Return all active, or most recent previous, flow numbers.