Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stop after cycle point: support offsets #5943

Merged
merged 4 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/5943.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The `stop after cycle point` can now be specified as an offset from the inital cycle point.
21 changes: 19 additions & 2 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,14 +539,20 @@ def get_script_common_text(this: str, example: Optional[str] = None):
''')
# NOTE: final cycle point is not a V_CYCLE_POINT to allow expressions
# such as '+P1Y' (relative to initial cycle point)
Conf('final cycle point', VDR.V_STRING, desc='''
Conf('final cycle point', VDR.V_CYCLE_POINT_WITH_OFFSETS, desc='''
The (optional) last cycle point at which tasks are run.

Once all tasks have reached this cycle point, the
workflow will shut down.

This item can be overridden on the command line using
``cylc play --final-cycle-point`` or ``--fcp``.

Examples:

- ``2000`` - Shorthand for ``2000-01-01T00:00``.
- ``+P1D`` - The initial cycle point plus one day.
- ``2000 +P1D +P1Y`` - The year ``2000`` plus one day and one year.
''')
Conf('initial cycle point constraints', VDR.V_STRING_LIST, desc='''
Rules to allow only some initial datetime cycle points.
Expand Down Expand Up @@ -599,7 +605,7 @@ def get_script_common_text(this: str, example: Optional[str] = None):

{REPLACES}``[scheduling]hold after point``.
''')
Conf('stop after cycle point', VDR.V_CYCLE_POINT, desc='''
Conf('stop after cycle point', VDR.V_CYCLE_POINT_WITH_OFFSETS, desc='''
Shut down the workflow after all tasks pass this cycle point.

The stop cycle point can be overridden on the command line using
Expand All @@ -612,7 +618,18 @@ def get_script_common_text(this: str, example: Optional[str] = None):
choosing not to run that part of the graph. You can play
the workflow and continue.

Examples:

- ``2000`` - Shorthand for ``2000-01-01T00:00``.
- ``+P1D`` - The initial cycle point plus one day.
- ``2000 +P1D +P1Y`` - The year ``2000`` plus one day and one year.

.. versionadded:: 8.0.0

.. versionchanged:: 8.3.0

This now supports offsets (e.g. ``+P1D``) in the same way the
:cylc:conf:`[..]final cycle point` does.
''')
Conf('cycling mode', VDR.V_STRING, Calendar.MODE_GREGORIAN,
options=list(Calendar.MODES) + ['integer'], desc='''
Expand Down
13 changes: 10 additions & 3 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -851,9 +851,16 @@ def process_stop_cycle_point(self) -> None:
if stopcp_str is None:
stopcp_str = self.cfg['scheduling']['stop after cycle point']

if stopcp_str is not None:
self.stop_point = get_point(stopcp_str).standardise()
if self.final_point and (self.stop_point > self.final_point):
if stopcp_str:
self.stop_point = get_point_relative(
stopcp_str,
self.initial_point,
).standardise()
if (
self.final_point is not None
and self.stop_point is not None
and self.stop_point > self.final_point
):
LOG.warning(
f"Stop cycle point '{self.stop_point}' will have no "
"effect as it is after the final cycle "
Expand Down
24 changes: 24 additions & 0 deletions cylc/flow/parsec/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,7 @@ class CylcConfigValidator(ParsecValidator):
V_CYCLE_POINT = 'V_CYCLE_POINT'
V_CYCLE_POINT_FORMAT = 'V_CYCLE_POINT_FORMAT'
V_CYCLE_POINT_TIME_ZONE = 'V_CYCLE_POINT_TIME_ZONE'
V_CYCLE_POINT_WITH_OFFSETS = 'V_CYCLE_POINT_WITH_OFFSETS'
V_INTERVAL = 'V_INTERVAL'
V_INTERVAL_LIST = 'V_INTERVAL_LIST'
V_PARAMETER_LIST = 'V_PARAMETER_LIST'
Expand Down Expand Up @@ -699,6 +700,26 @@ class CylcConfigValidator(ParsecValidator):
'-0830': 'UTC minus 8 hours and 30 minutes.'
}
),
V_CYCLE_POINT_WITH_OFFSETS: (
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new parsec validator type which does the same thing as V_STRING.

It exists purely for documentation purposes.

'cycle point with support for offsets',
'An integer or date-time cycle point, with optional offset(s).',
{
'1': 'An integer cycle point.',
'1 +P5': (
'An integer cycle point with an offset'
' (this evaluates as ``6``).'
),
'+P5': (
'An integer cycle point offset.'
' This offset is added to the initial cycle point'
),
'2000-01-01T00:00Z': 'A date-time cycle point.',
'2000-02-29T00:00Z +P1D +P1M': (
'A date-time cycle point with offsets'
' (this evaluates as ``2000-04-01T00:00Z``).'
),
}
),
V_INTERVAL: (
'time interval',
'An ISO8601 duration.',
Expand Down Expand Up @@ -751,6 +772,9 @@ def __init__(self):
self.V_CYCLE_POINT: self.coerce_cycle_point,
self.V_CYCLE_POINT_FORMAT: self.coerce_cycle_point_format,
self.V_CYCLE_POINT_TIME_ZONE: self.coerce_cycle_point_time_zone,
# NOTE: This type exists for documentation reasons
# it doesn't actually process offsets, that happens later
self.V_CYCLE_POINT_WITH_OFFSETS: self.coerce_str,
self.V_INTERVAL: self.coerce_interval,
self.V_INTERVAL_LIST: self.coerce_interval_list,
self.V_PARAMETER_LIST: self.coerce_parameter_list,
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,7 @@ def command_stop(
point = TaskID.get_standardised_point(cycle_point)
if point is not None and self.pool.set_stop_point(point):
self.options.stopcp = str(point)
self.config.stop_point = point
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spotted during testing, the config stop_point isn't updated when it is modified via the CLI, but the config hold_point is.

self.workflow_db_mgr.put_workflow_stop_cycle_point(
self.options.stopcp)
elif clock_time is not None:
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,7 @@ def set_stop_point(self, stop_point: 'PointBase') -> bool:
LOG.info(f"Stop point unchanged: {stop_point}")
return False

LOG.info("Setting stop point: {stop_point}")
LOG.info(f"Setting stop point: {stop_point}")
self.stop_point = stop_point

if (
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/cylc-play/09-invalid-cp-opt.t
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
run_fail "${TEST_NAME_BASE}-run" \
cylc play "${WORKFLOW_NAME}" --no-detach --stopcp='potato'

grep_ok "ERROR - Workflow shutting down .* potato" "${TEST_NAME_BASE}-run.stderr"
grep_ok "ERROR - Workflow shutting down .*potato" "${TEST_NAME_BASE}-run.stderr"

# Check that we haven't got a database
exists_ok "${WORKFLOW_RUN_DIR}/.service"
Expand Down
132 changes: 132 additions & 0 deletions tests/integration/test_stop_after_cycle_point.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Test logic pertaining to the stop after cycle points.

This may be defined in different ways:
* In the workflow configuration.
* On the command line.
* Or loaded from the database.

When the workflow hits the "stop after" point, it should be wiped (i.e. set
to None).
"""

from typing import Optional

from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.id import Tokens
from cylc.flow.workflow_status import StopMode


async def test_stop_after_cycle_point(
flow,
scheduler,
run,
reflog,
complete,
):
"""Test the stop after cycle point.

This ensures:
* The stop after point gets loaded from the config.
* The workflow stops when it hits this point.
* The point gets wiped when the workflow hits this point.
* The point is stored/retrieved from the DB as appropriate.

"""
async def stops_after_cycle(schd) -> Optional[str]:
"""Run the workflow until it stops and return the cycle point."""
triggers = reflog(schd)
await complete(schd, timeout=2)
assert len(triggers) == 1 # only one task (i.e. cycle) should be run
return Tokens(list(triggers)[0][0], relative=True)['cycle']

def get_db_value(schd) -> Optional[str]:
"""Return the cycle point value stored in the DB."""
with schd.workflow_db_mgr.get_pri_dao() as pri_dao:
return dict(pri_dao.select_workflow_params())['stopcp']

config = {
'scheduling': {
'cycling mode': 'integer',
'initial cycle point': '1',
'stop after cycle point': '1',
'graph': {
'P1': 'a[-P1] => a',
},
},
}
id_ = flow(config)
schd = scheduler(id_, paused_start=False)
async with run(schd):
# the cycle point should be loaded from the workflow configuration
assert schd.config.stop_point == IntegerPoint('1')

# this value should *not* be written to the database
assert get_db_value(schd) is None

# the workflow should stop after cycle 1
assert await stops_after_cycle(schd) == '1'

# change the configured cycle point to "2"
config['scheduling']['stop after cycle point'] = '2'
id_ = flow(config, id_=id_)
schd = scheduler(id_, paused_start=False)
async with run(schd):
# the cycle point should be reloaded from the workflow configuration
assert schd.config.stop_point == IntegerPoint('2')

# this value should not be written to the database
assert get_db_value(schd) is None

# the workflow should stop after cycle 2
assert await stops_after_cycle(schd) == '2'

# override the configured value via the CLI option
schd = scheduler(id_, paused_start=False, **{'stopcp': '3'})
async with run(schd):
# the CLI should take precedence over the config
assert schd.config.stop_point == IntegerPoint('3')

# this value *should* be written to the database
assert get_db_value(schd) == '3'

# the workflow should stop after cycle 3
assert await stops_after_cycle(schd) == '3'

# once the workflow hits this point, it should get cleared
assert get_db_value(schd) is None

schd = scheduler(id_, paused_start=False)
async with run(schd):
# the workflow should fall back to the configured value
assert schd.config.stop_point == IntegerPoint('2')

# override this value whilst the workflow is running
schd.command_stop(
cycle_point=IntegerPoint('4'),
mode=StopMode.REQUEST_CLEAN,
)
assert schd.config.stop_point == IntegerPoint('4')

# the new *should* be written to the database
assert get_db_value(schd) == '4'

schd = scheduler(id_, paused_start=False)
async with run(schd):
# the workflow should stop after cycle 4
assert await stops_after_cycle(schd) == '4'
Loading
Loading