Skip to content

Commit

Permalink
dump: restrict window to n=0 (#5600)
Browse files Browse the repository at this point in the history
This restores the old `cylc dump` behaviour of displaying only the pool contents.
  • Loading branch information
oliver-sanders authored Dec 19, 2023
1 parent 9f738da commit 3320c05
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 15 deletions.
3 changes: 3 additions & 0 deletions changes.d/5600.break.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `cylc dump` command now only shows active tasks (e.g. running & queued
tasks). This restores its behaviour of only showing the tasks which currently
exist in the pool as it did in Cylc 7 and earlier versions of Cylc 8.
40 changes: 26 additions & 14 deletions cylc/flow/scripts/dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
Print information about a running workflow.
This command can provide information about active tasks, e.g. running or queued
tasks. For more detailed view of the workflow see `cylc tui` or `cylc gui`.
For command line monitoring:
* `cylc tui`
* `watch cylc dump WORKFLOW_ID` works for small simple workflows
Expand All @@ -28,20 +31,21 @@
its prerequisites and outputs, see 'cylc show'.
Examples:
# Display the state of all running tasks, sorted by cycle point:
# Display the state of all active tasks, sorted by cycle point:
$ cylc dump --tasks --sort WORKFLOW_ID | grep running
# Display the state of all tasks in a particular cycle point:
# Display the state of all active in a particular cycle point:
$ cylc dump -t WORKFLOW_ID | grep 2010082406
"""

from graphene.utils.str_converters import to_snake_case
import asyncio
import json
import sys
from typing import TYPE_CHECKING

from graphene.utils.str_converters import to_snake_case

from cylc.flow.exceptions import CylcError
from cylc.flow.id_cli import parse_id
from cylc.flow.id_cli import parse_id_async
from cylc.flow.option_parsers import (
WORKFLOW_ID_ARG_DOC,
CylcOptionParser as COP,
Expand All @@ -59,6 +63,7 @@
name
cyclePoint
state
graphDepth
isHeld
isQueued
isRunahead
Expand Down Expand Up @@ -179,7 +184,11 @@ def get_option_parser():

@cli_function(get_option_parser)
def main(_, options: 'Values', workflow_id: str) -> None:
workflow_id, *_ = parse_id(
asyncio.run(dump(workflow_id, options))


async def dump(workflow_id, options, write=print):
workflow_id, *_ = await parse_id_async(
workflow_id,
constraint='workflows',
)
Expand All @@ -195,6 +204,9 @@ def main(_, options: 'Values', workflow_id: str) -> None:
else:
sort_args = {'keys': ['name', 'cyclePoint']}

# retrict to the n=0 window
graph_depth = 0

if options.disp_form == "raw":
query = f'''
{TASK_SUMMARY_FRAGMENT}
Expand All @@ -203,10 +215,10 @@ def main(_, options: 'Values', workflow_id: str) -> None:
query ($wFlows: [ID]!, $sortBy: SortArgs) {{
workflows (ids: $wFlows, stripNull: false) {{
...wFlow
taskProxies (sort: $sortBy) {{
taskProxies (sort: $sortBy, graphDepth: {graph_depth}) {{
...tProxy
}}
familyProxies (sort: $sortBy) {{
familyProxies (sort: $sortBy, graphDepth: {graph_depth}) {{
...fProxy
}}
}}
Expand All @@ -224,7 +236,7 @@ def main(_, options: 'Values', workflow_id: str) -> None:
{TASK_SUMMARY_FRAGMENT}
query ($wFlows: [ID]!, $sortBy: SortArgs) {{
workflows (ids: $wFlows, stripNull: false) {{
taskProxies (sort: $sortBy) {{
taskProxies (sort: $sortBy, graphDepth: {graph_depth}) {{
...tProxy
}}
}}
Expand All @@ -235,15 +247,15 @@ def main(_, options: 'Values', workflow_id: str) -> None:
'variables': {'wFlows': [workflow_id], 'sortBy': sort_args}
}

workflows = pclient('graphql', query_kwargs)
workflows = await pclient.async_request('graphql', query_kwargs)

try:
for summary in workflows['workflows']:
if options.disp_form == "raw":
if options.pretty:
sys.stdout.write(json.dumps(summary, indent=4) + '\n')
write(json.dumps(summary, indent=4))
else:
print(summary)
write(summary)
else:
if options.disp_form != "tasks":
node_urls = {
Expand All @@ -261,7 +273,7 @@ def main(_, options: 'Values', workflow_id: str) -> None:
del summary['families']
del summary['meta']
for key, value in sorted(summary.items()):
print(
write(
f'{to_snake_case(key).replace("_", " ")}={value}')
else:
for item in summary['taskProxies']:
Expand All @@ -282,7 +294,7 @@ def main(_, options: 'Values', workflow_id: str) -> None:
else 'not-runahead')
if options.show_flows:
values.append(item['flowNums'])
print(', '.join(values))
write(', '.join(values))
except Exception as exc:
raise CylcError(
json.dumps(workflows, indent=4) + '\n' + str(exc) + '\n')
1 change: 0 additions & 1 deletion tests/functional/runahead/06-release-update.t
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ sleep 10
# (gratuitous use of --flows for test coverage)
cylc dump --flows -t "${WORKFLOW_NAME}" | awk '{print $1 $2 $3 $7}' >'log'
cmp_ok 'log' - <<__END__
bar,$NEXT1,waiting,[1]
foo,$NEXT1,waiting,[1]
__END__

Expand Down
52 changes: 52 additions & 0 deletions tests/integration/scripts/test_dump.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Test the "cylc dump" command."""

from cylc.flow.option_parsers import (
Options,
)
from cylc.flow.scripts.dump import (
dump,
get_option_parser,
)


DumpOptions = Options(get_option_parser())


async def test_dump_tasks(flow, scheduler, start):
"""It should show n=0 tasks.
See: https://github.com/cylc/cylc-flow/pull/5600
"""
id_ = flow({
'scheduler': {
'allow implicit tasks': 'true',
},
'scheduling': {
'graph': {
'R1': 'a => b => c',
},
},
})
schd = scheduler(id_)
async with start(schd):
# schd.release_queued_tasks()
await schd.update_data_structure()
ret = []
await dump(id_, DumpOptions(disp_form='tasks'), write=ret.append)
assert ret == ['a, 1, waiting, not-held, queued, not-runahead']

0 comments on commit 3320c05

Please sign in to comment.