Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow state other run dir fix #6044

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions cylc/flow/id_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ async def parse_ids_async(
constraint: str = 'tasks',
max_workflows: Optional[int] = None,
max_tasks: Optional[int] = None,
cylc_run_dir: Optional[str] = None,
) -> Tuple[Dict[str, List[Tokens]], Any]:
"""Parse IDs from the command line.

Expand Down Expand Up @@ -232,6 +233,8 @@ async def parse_ids_async(
max_tasks:
Specify the maximum number of tasks permitted to be specified
in the ids.
cylc_run_dir:
Infer from a non-stardard run directory.

Returns:
With src=True":
Expand Down Expand Up @@ -294,7 +297,11 @@ async def parse_ids_async(

# infer the run number if not specified the ID (and if possible)
if infer_latest_runs:
_infer_latest_runs(tokens_list, src_path=src_path)
_infer_latest_runs(
tokens_list,
src_path=src_path,
cylc_run_dir=cylc_run_dir
)

_validate_number(
*tokens_list,
Expand Down Expand Up @@ -409,13 +416,16 @@ def _validate_workflow_ids(*tokens_list, src_path):
detect_both_flow_and_suite(src_path)


def _infer_latest_runs(tokens_list, src_path):
def _infer_latest_runs(tokens_list, src_path, cylc_run_dir):
for ind, tokens in enumerate(tokens_list):
if ind == 0 and src_path:
# source workflow passed in as a path
continue
tokens_list[ind] = tokens.duplicate(
workflow=infer_latest_run_from_id(tokens['workflow'])
workflow=infer_latest_run_from_id(
tokens['workflow'],
cylc_run_dir=cylc_run_dir,
)
)
pass

Expand Down
14 changes: 10 additions & 4 deletions cylc/flow/pathutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,26 @@ def get_remote_workflow_run_job_dir(
return get_remote_workflow_run_dir(workflow_id, 'log', 'job', *args)


def get_cylc_run_dir() -> str:
def get_cylc_run_dir(cylc_run_dir: Optional[str] = None) -> str:
"""Return the cylc-run dir path with vars/user expanded."""
return expand_path(_CYLC_RUN_DIR)
if cylc_run_dir is None:
return expand_path(_CYLC_RUN_DIR)
return expand_path(cylc_run_dir)


def get_workflow_run_dir(
workflow_id: Union[Path, str], *args: Union[Path, str]
workflow_id: Union[Path, str],
*args: Union[Path, str],
cylc_run_dir: Optional[str] = None,
) -> str:
"""Return local workflow run directory, joining any extra args, and
expanding vars and user.

Does not check that the directory exists.
"""
return expand_path(_CYLC_RUN_DIR, workflow_id, *args)
if cylc_run_dir is None:
return expand_path(_CYLC_RUN_DIR, workflow_id, *args)
return expand_path(cylc_run_dir, workflow_id, *args)


def get_workflow_run_job_dir(workflow, *args):
Expand Down
8 changes: 3 additions & 5 deletions cylc/flow/scripts/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
from cylc.flow.task_state import TASK_STATUSES_ORDERED
from cylc.flow.terminal import cli_function
from cylc.flow.cycling.util import add_offset
from cylc.flow.pathutil import expand_path, get_cylc_run_dir
from cylc.flow.pathutil import get_cylc_run_dir

from metomi.isodatetime.parsers import TimePointParser

Expand Down Expand Up @@ -199,6 +199,7 @@ def main(parser: COP, options: 'Values', workflow_id: str) -> None:
workflow_id, *_ = parse_id(
workflow_id,
constraint='workflows',
cylc_run_dir=options.run_dir,
)

if options.use_task_point and options.cycle:
Expand Down Expand Up @@ -232,10 +233,7 @@ def main(parser: COP, options: 'Values', workflow_id: str) -> None:
raise InputError(f"invalid status '{options.status}'")

# this only runs locally
if options.run_dir:
run_dir = expand_path(options.run_dir)
else:
run_dir = get_cylc_run_dir()
run_dir = get_cylc_run_dir(cylc_run_dir=options.run_dir)

pollargs = {
'workflow_id': workflow_id,
Expand Down
17 changes: 13 additions & 4 deletions cylc/flow/workflow_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -862,16 +862,25 @@ def check_reserved_dir_names(name: Union[Path, str]) -> None:
raise WorkflowFilesError(err_msg.format('run<number>'))


def infer_latest_run_from_id(workflow_id: str) -> str:
run_dir = Path(get_workflow_run_dir(workflow_id))
_, id_ = infer_latest_run(run_dir)
def infer_latest_run_from_id(
workflow_id: str,
cylc_run_dir: Optional[str] = None,
) -> str:
run_dir = Path(
get_workflow_run_dir(
workflow_id,
cylc_run_dir=cylc_run_dir,
)
)
_, id_ = infer_latest_run(run_dir, cylc_run_dir=cylc_run_dir)
return id_


def infer_latest_run(
path: Path,
implicit_runN: bool = True,
warn_runN: bool = True,
cylc_run_dir: Optional[str] = None,
) -> Tuple[Path, str]:
"""Infer the numbered run dir if the workflow has a runN symlink.

Expand All @@ -890,7 +899,7 @@ def infer_latest_run(
- WorkflowFilesError if the runN symlink is not valid.
- InputError if the path does not exist.
"""
cylc_run_dir = get_cylc_run_dir()
cylc_run_dir = get_cylc_run_dir(cylc_run_dir=cylc_run_dir)
try:
id_ = str(path.relative_to(cylc_run_dir))
except ValueError:
Expand Down
12 changes: 6 additions & 6 deletions cylc/flow/xtriggers/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from cylc.flow.cycling.util import add_offset
from cylc.flow.dbstatecheck import CylcWorkflowDBChecker
from cylc.flow.pathutil import expand_path, get_cylc_run_dir
from cylc.flow.pathutil import get_cylc_run_dir
from cylc.flow.workflow_files import infer_latest_run


Expand Down Expand Up @@ -78,13 +78,13 @@ def workflow_state(
to this xtrigger.

"""
if cylc_run_dir:
cylc_run_dir = expand_path(cylc_run_dir)
else:
cylc_run_dir = get_cylc_run_dir()
cylc_run_dir = get_cylc_run_dir(cylc_run_dir=cylc_run_dir)
if offset is not None:
point = str(add_offset(point, offset))
_, workflow = infer_latest_run(Path(cylc_run_dir, workflow))
_, workflow = infer_latest_run(
Path(cylc_run_dir, workflow),
cylc_run_dir=cylc_run_dir,
)
try:
checker = CylcWorkflowDBChecker(cylc_run_dir, workflow)
except (OSError, sqlite3.Error):
Expand Down
31 changes: 31 additions & 0 deletions tests/functional/workflow-state/08-other-run-dir.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------
# Test cylc workflow-state "template" option
. "$(dirname "$0")/test_header"
#-------------------------------------------------------------------------------
set_test_number 1
#-------------------------------------------------------------------------------
install_workflow "${TEST_NAME_BASE}" other-run-dir
#-------------------------------------------------------------------------------
TEST_NAME="${TEST_NAME_BASE}-run"
workflow_run_ok "${TEST_NAME}" cylc play --debug --no-detach "${WORKFLOW_NAME}"
#-------------------------------------------------------------------------------
rm -rfv ~/cylc-run-other-test
purge
#-------------------------------------------------------------------------------
exit 0
39 changes: 39 additions & 0 deletions tests/functional/workflow-state/other-run-dir/flow.cylc
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
[meta]
title = "Checks task state of alternate run dir"
[scheduler]
UTC mode = True
[[events]]
inactivity timeout = PT1M
abort on inactivity timeout = True
[scheduling]
initial cycle point = 20240101T00
[[xtriggers]]
up_dir_foo = workflow_state(\
workflow=other-run-dir, \
task=foo, \
point=20240101T0000Z, \
cylc_run_dir=~/cylc-run-other-test \
):PT10S
[[graph]]
R1 = """
foo
@up_dir_foo => bar
"""
[runtime]
[[foo]]
script = """
mkdir -pv ~/cylc-run-other-test
cd ~/cylc-run-other-test
echo 'Current directory:'
pwd
echo 'Creating link to original run dir:'
ln -sfv ${CYLC_RUN_DIR}/${CYLC_WORKFLOW_NAME} other-run-dir
"""
[[bar]]
script = """
cylc workflow-state --debug other-run-dir \
--run-dir=~/cylc-run-other-test \
--task=foo \
--point=20240101T0000Z
rm -rfv ~/cylc-run-other-test
"""
Loading