From 3320c050a350a3f0c2622405a2739d28ff827eb9 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Tue, 19 Dec 2023 12:28:47 +0000 Subject: [PATCH] dump: restrict window to n=0 (#5600) This restores the old `cylc dump` behaviour of displaying only the pool contents. --- changes.d/5600.break.md | 3 ++ cylc/flow/scripts/dump.py | 40 +++++++++----- tests/functional/runahead/06-release-update.t | 1 - tests/integration/scripts/test_dump.py | 52 +++++++++++++++++++ 4 files changed, 81 insertions(+), 15 deletions(-) create mode 100644 changes.d/5600.break.md create mode 100644 tests/integration/scripts/test_dump.py diff --git a/changes.d/5600.break.md b/changes.d/5600.break.md new file mode 100644 index 00000000000..fd11f650ff5 --- /dev/null +++ b/changes.d/5600.break.md @@ -0,0 +1,3 @@ +The `cylc dump` command now only shows active tasks (e.g. running & queued +tasks). This restores its behaviour of only showing the tasks which currently +exist in the pool as it did in Cylc 7 and earlier versions of Cylc 8. diff --git a/cylc/flow/scripts/dump.py b/cylc/flow/scripts/dump.py index c62bf6719aa..d3ffda59fce 100755 --- a/cylc/flow/scripts/dump.py +++ b/cylc/flow/scripts/dump.py @@ -20,6 +20,9 @@ Print information about a running workflow. +This command can provide information about active tasks, e.g. running or queued +tasks. For more detailed view of the workflow see `cylc tui` or `cylc gui`. + For command line monitoring: * `cylc tui` * `watch cylc dump WORKFLOW_ID` works for small simple workflows @@ -28,20 +31,21 @@ its prerequisites and outputs, see 'cylc show'. Examples: - # Display the state of all running tasks, sorted by cycle point: + # Display the state of all active tasks, sorted by cycle point: $ cylc dump --tasks --sort WORKFLOW_ID | grep running - # Display the state of all tasks in a particular cycle point: + # Display the state of all active in a particular cycle point: $ cylc dump -t WORKFLOW_ID | grep 2010082406 """ -from graphene.utils.str_converters import to_snake_case +import asyncio import json -import sys from typing import TYPE_CHECKING +from graphene.utils.str_converters import to_snake_case + from cylc.flow.exceptions import CylcError -from cylc.flow.id_cli import parse_id +from cylc.flow.id_cli import parse_id_async from cylc.flow.option_parsers import ( WORKFLOW_ID_ARG_DOC, CylcOptionParser as COP, @@ -59,6 +63,7 @@ name cyclePoint state + graphDepth isHeld isQueued isRunahead @@ -179,7 +184,11 @@ def get_option_parser(): @cli_function(get_option_parser) def main(_, options: 'Values', workflow_id: str) -> None: - workflow_id, *_ = parse_id( + asyncio.run(dump(workflow_id, options)) + + +async def dump(workflow_id, options, write=print): + workflow_id, *_ = await parse_id_async( workflow_id, constraint='workflows', ) @@ -195,6 +204,9 @@ def main(_, options: 'Values', workflow_id: str) -> None: else: sort_args = {'keys': ['name', 'cyclePoint']} + # retrict to the n=0 window + graph_depth = 0 + if options.disp_form == "raw": query = f''' {TASK_SUMMARY_FRAGMENT} @@ -203,10 +215,10 @@ def main(_, options: 'Values', workflow_id: str) -> None: query ($wFlows: [ID]!, $sortBy: SortArgs) {{ workflows (ids: $wFlows, stripNull: false) {{ ...wFlow - taskProxies (sort: $sortBy) {{ + taskProxies (sort: $sortBy, graphDepth: {graph_depth}) {{ ...tProxy }} - familyProxies (sort: $sortBy) {{ + familyProxies (sort: $sortBy, graphDepth: {graph_depth}) {{ ...fProxy }} }} @@ -224,7 +236,7 @@ def main(_, options: 'Values', workflow_id: str) -> None: {TASK_SUMMARY_FRAGMENT} query ($wFlows: [ID]!, $sortBy: SortArgs) {{ workflows (ids: $wFlows, stripNull: false) {{ - taskProxies (sort: $sortBy) {{ + taskProxies (sort: $sortBy, graphDepth: {graph_depth}) {{ ...tProxy }} }} @@ -235,15 +247,15 @@ def main(_, options: 'Values', workflow_id: str) -> None: 'variables': {'wFlows': [workflow_id], 'sortBy': sort_args} } - workflows = pclient('graphql', query_kwargs) + workflows = await pclient.async_request('graphql', query_kwargs) try: for summary in workflows['workflows']: if options.disp_form == "raw": if options.pretty: - sys.stdout.write(json.dumps(summary, indent=4) + '\n') + write(json.dumps(summary, indent=4)) else: - print(summary) + write(summary) else: if options.disp_form != "tasks": node_urls = { @@ -261,7 +273,7 @@ def main(_, options: 'Values', workflow_id: str) -> None: del summary['families'] del summary['meta'] for key, value in sorted(summary.items()): - print( + write( f'{to_snake_case(key).replace("_", " ")}={value}') else: for item in summary['taskProxies']: @@ -282,7 +294,7 @@ def main(_, options: 'Values', workflow_id: str) -> None: else 'not-runahead') if options.show_flows: values.append(item['flowNums']) - print(', '.join(values)) + write(', '.join(values)) except Exception as exc: raise CylcError( json.dumps(workflows, indent=4) + '\n' + str(exc) + '\n') diff --git a/tests/functional/runahead/06-release-update.t b/tests/functional/runahead/06-release-update.t index d52b12272a6..c4ab28530e3 100644 --- a/tests/functional/runahead/06-release-update.t +++ b/tests/functional/runahead/06-release-update.t @@ -35,7 +35,6 @@ sleep 10 # (gratuitous use of --flows for test coverage) cylc dump --flows -t "${WORKFLOW_NAME}" | awk '{print $1 $2 $3 $7}' >'log' cmp_ok 'log' - <<__END__ -bar,$NEXT1,waiting,[1] foo,$NEXT1,waiting,[1] __END__ diff --git a/tests/integration/scripts/test_dump.py b/tests/integration/scripts/test_dump.py new file mode 100644 index 00000000000..681167a564f --- /dev/null +++ b/tests/integration/scripts/test_dump.py @@ -0,0 +1,52 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Test the "cylc dump" command.""" + +from cylc.flow.option_parsers import ( + Options, +) +from cylc.flow.scripts.dump import ( + dump, + get_option_parser, +) + + +DumpOptions = Options(get_option_parser()) + + +async def test_dump_tasks(flow, scheduler, start): + """It should show n=0 tasks. + + See: https://github.com/cylc/cylc-flow/pull/5600 + """ + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': 'true', + }, + 'scheduling': { + 'graph': { + 'R1': 'a => b => c', + }, + }, + }) + schd = scheduler(id_) + async with start(schd): + # schd.release_queued_tasks() + await schd.update_data_structure() + ret = [] + await dump(id_, DumpOptions(disp_form='tasks'), write=ret.append) + assert ret == ['a, 1, waiting, not-held, queued, not-runahead']