Skip to content

Commit

Permalink
Merge branch '8.2.x' into xtrigger-efficiency
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver authored Jan 11, 2024
2 parents c43a644 + 9948532 commit 2615e2d
Show file tree
Hide file tree
Showing 11 changed files with 416 additions and 239 deletions.
10 changes: 8 additions & 2 deletions .github/workflows/test_fast.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@ jobs:
fail-fast: false # Don't let a failed MacOS run stop the Ubuntu runs
matrix:
os: ['ubuntu-latest']
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11']
python-version: ['3.7', '3.8', '3.10', '3.11']
include:
# mac os test
- os: 'macos-11'
python-version: '3.7'
python-version: '3.7' # oldest supported version
# non-utc timezone test
- os: 'ubuntu-latest'
python-version: '3.9' # not the oldest, not the most recent version
time-zone: 'XXX-09:35'
env:
TZ: ${{ matrix.time-zone }}
PYTEST_ADDOPTS: --cov --cov-append -n 5 --color=yes
steps:
- name: Checkout
Expand Down
1 change: 1 addition & 0 deletions changes.d/5893.fix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed bug in computing a time interval-based runahead limit when future triggers are present.
2 changes: 2 additions & 0 deletions changes.d/5909.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix a bug where Cylc VIP did not remove --workflow-name=<name> from
Cylc play arguments.
16 changes: 0 additions & 16 deletions cylc/flow/cycling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,22 +415,6 @@ def get_stop_point(self):
"""Return the last point of this sequence, or None if unbounded."""
pass

def get_first_n_points(self, n, point=None):
"""Return a list of first n points of this sequence."""
if point is None:
p1 = self.get_start_point()
else:
p1 = self.get_first_point(point)
if p1 is None:
return []
result = [p1]
for _ in range(1, n):
p1 = self.get_next_point_on_sequence(p1)
if p1 is None:
break
result.append(p1)
return result

@abstractmethod
def __eq__(self, other) -> bool:
# Return True if other (sequence) is equal to self.
Expand Down
16 changes: 13 additions & 3 deletions cylc/flow/cycling/iso8601.py
Original file line number Diff line number Diff line change
Expand Up @@ -943,12 +943,22 @@ def _interval_parse(interval_string):

def point_parse(point_string: str) -> 'TimePoint':
"""Parse a point_string into a proper TimePoint object."""
return _point_parse(point_string, WorkflowSpecifics.DUMP_FORMAT)
return _point_parse(
point_string,
WorkflowSpecifics.DUMP_FORMAT,
WorkflowSpecifics.ASSUMED_TIME_ZONE
)


@lru_cache(10000)
def _point_parse(point_string, _dump_fmt):
"""Parse a point_string into a proper TimePoint object."""
def _point_parse(point_string: str, _dump_fmt, _tz) -> 'TimePoint':
"""Parse a point_string into a proper TimePoint object.
Args:
point_string: The string to parse.
_dump_fmt: Dump format (only used to avoid invalid cache hits).
_tz: Cycle point time zone (only used to avoid invalid cache hits).
"""
if "%" in WorkflowSpecifics.DUMP_FORMAT:
# May be a custom not-quite ISO 8601 dump format.
with contextlib.suppress(IsodatetimeError):
Expand Down
107 changes: 80 additions & 27 deletions cylc/flow/option_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

import sys
from textwrap import dedent
from typing import Any, Dict, Optional, List, Tuple, Union
from typing import Any, Dict, Iterable, Optional, List, Tuple, Union

from cylc.flow import LOG
from cylc.flow.terminal import supports_color, DIM
Expand Down Expand Up @@ -820,17 +820,33 @@ def combine_options(*args, modify=None):


def cleanup_sysargv(
script_name,
workflow_id,
options,
compound_script_opts,
script_opts,
source,
):
script_name: str,
workflow_id: str,
options: 'Values',
compound_script_opts: Iterable['OptionSettings'],
script_opts: Iterable['OptionSettings'],
source: str,
) -> None:
"""Remove unwanted options from sys.argv
Some cylc scripts (notably Cylc Play when it is re-invoked on a scheduler
server) require the correct content in sys.argv.
server) require the correct content in sys.argv: This function
subtracts the unwanted options from sys.argv.
Args:
script_name:
Name of the target script. For example if we are
using this for the play step of cylc vip then this
will be "play".
workflow_id:
options:
Actual options provided to the compound script.
compound_script_options:
Options available in compound script.
script_options:
Options available in target script.
source:
Source directory.
"""
# Organize Options by dest.
script_opts_by_dest = {
Expand All @@ -841,30 +857,67 @@ def cleanup_sysargv(
x.kwargs.get('dest', x.args[0].strip(DOUBLEDASH)): x
for x in compound_script_opts
}
# Filter out non-cylc-play options.
args = [i.split('=')[0] for i in sys.argv]
for unwanted_opt in (set(options.__dict__)) - set(script_opts_by_dest):
for arg in compound_opts_by_dest[unwanted_opt].args:
if arg in sys.argv:
index = sys.argv.index(arg)
sys.argv.pop(index)
if (
compound_opts_by_dest[unwanted_opt].kwargs['action']
not in ['store_true', 'store_false']
):
sys.argv.pop(index)
elif arg in args:
index = args.index(arg)
sys.argv.pop(index)

# Get a list of unwanted args:
unwanted_compound: List[str] = []
unwanted_simple: List[str] = []
for unwanted_dest in (set(options.__dict__)) - set(script_opts_by_dest):
for unwanted_arg in compound_opts_by_dest[unwanted_dest].args:
if (
compound_opts_by_dest[unwanted_dest].kwargs.get('action', None)
in ['store_true', 'store_false']
):
unwanted_simple.append(unwanted_arg)
else:
unwanted_compound.append(unwanted_arg)

new_args = filter_sysargv(sys.argv, unwanted_simple, unwanted_compound)

# replace compound script name:
sys.argv[1] = script_name
new_args[1] = script_name

# replace source path with workflow ID.
if str(source) in sys.argv:
sys.argv.remove(str(source))
new_args.remove(str(source))
if workflow_id not in sys.argv:
sys.argv.append(workflow_id)
new_args.append(workflow_id)

sys.argv = new_args


def filter_sysargv(
sysargs, unwanted_simple: List, unwanted_compound: List
) -> List:
"""Create a copy of sys.argv without unwanted arguments:
Cases:
>>> this = filter_sysargv
>>> this(['--foo', 'expects-a-value', '--bar'], [], ['--foo'])
['--bar']
>>> this(['--foo=expects-a-value', '--bar'], [], ['--foo'])
['--bar']
>>> this(['--foo', '--bar'], ['--foo'], [])
['--bar']
"""
pop_next: bool = False
new_args: List = []
for this_arg in sysargs:
parts = this_arg.split('=', 1)
if pop_next:
pop_next = False
continue
elif parts[0] in unwanted_compound:
# Case --foo=value or --foo value
if len(parts) == 1:
# --foo value
pop_next = True
continue
elif parts[0] in unwanted_simple:
# Case --foo does not expect a value:
continue
else:
new_args.append(this_arg)
return new_args


def log_subcommand(*args):
Expand Down
118 changes: 45 additions & 73 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,82 +309,48 @@ def compute_runahead(self, force=False) -> bool:
With force=True we recompute the limit even if the base point has not
changed (needed if max_future_offset changed, or on reload).
"""
"""
limit = self.config.runahead_limit # e.g. P2 or P2D
count_cycles = False
with suppress(TypeError):
# Count cycles (integer cycling, and optional for datetime too).
ilimit = int(limit) # type: ignore
count_cycles = True

base_point: 'PointBase'
points: List['PointBase'] = []
base_point: Optional['PointBase'] = None

# First get the runahead base point.
if not self.main_pool:
# No tasks yet, just consider sequence points.
if count_cycles:
# Get the first ilimit points in each sequence.
# (After workflow start point - sequence may begin earlier).
points = [
point
for plist in [
seq.get_first_n_points(
ilimit, self.config.start_point)
for seq in self.config.sequences
]
for point in plist
]
# Drop points beyond the limit.
points = sorted(points)[:ilimit + 1]
base_point = min(points)

else:
# Start at first point in each sequence.
# (After workflow start point - sequence may begin earlier).
points = [
point
for point in {
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
}
if point is not None
]
base_point = min(points)
# Drop points beyond the limit.
points = [
point
for point in points
if point <= base_point + limit
]

# Find the earliest sequence point beyond the workflow start point.
base_point = min(
point
for point in {
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
}
if point is not None
)
else:
# Find the earliest point with unfinished tasks.
# Find the earliest point with incomplete tasks.
for point, itasks in sorted(self.get_tasks_by_point().items()):
# All n=0 tasks are incomplete by definition, but Cylc 7
# ignores failed ones (it does not ignore submit-failed!).
if (
points # got the limit already so this point too
or any(
not itask.state(
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_EXPIRED
)
or (
# For Cylc 7 back-compat, ignore incomplete tasks.
# (Success is required in back-compat mode, so
# failedtasks end up as incomplete; and Cylc 7
# ignores failed tasks in computing the limit).
itask.state.outputs.is_incomplete()
and not cylc.flow.flags.cylc7_back_compat
)
cylc.flow.flags.cylc7_back_compat and
all(
itask.state(TASK_STATUS_FAILED)
for itask in itasks
)
):
points.append(point)
continue
base_point = point
break

if not points:
return False
base_point = min(points)
if base_point is None:
return False

LOG.debug(f"Runahead: base point {base_point}")

if self._prev_runahead_base_point is None:
self._prev_runahead_base_point = base_point
Expand All @@ -401,8 +367,10 @@ def compute_runahead(self, force=False) -> bool:
# change or the runahead limit is already at stop point.
return False

# Get all cycle points possible after the base point.
sequence_points: Set['PointBase']
# Now generate all possible cycle points from the base point and stop
# at the runahead limit point. Note both cycle count and time interval
# limits involve all possible cycles, not just active cycles.
sequence_points: Set['PointBase'] = set()
if (
not force
and self._prev_runahead_sequence_points
Expand All @@ -412,44 +380,48 @@ def compute_runahead(self, force=False) -> bool:
sequence_points = self._prev_runahead_sequence_points
else:
# Recompute possible points.
sequence_points = set()
for sequence in self.config.sequences:
seq_point = sequence.get_next_point(base_point)
seq_point = sequence.get_first_point(base_point)
count = 1
while seq_point is not None:
if count_cycles:
# P0 allows only the base cycle point to run.
if count > 1 + ilimit:
# this point may be beyond the runahead limit
break
else:
# PT0H allows only the base cycle point to run.
if seq_point > base_point + limit:
# this point can not be beyond the runahead limit
break
count += 1
sequence_points.add(seq_point)
seq_point = sequence.get_next_point(seq_point)
self._prev_runahead_sequence_points = sequence_points
self._prev_runahead_base_point = base_point

points = set(points).union(sequence_points)

if count_cycles:
# Some sequences may have different intervals.
limit_point = sorted(points)[:(ilimit + 1)][-1]
# (len(list) may be less than ilimit due to sequence end)
limit_point = sorted(sequence_points)[:ilimit + 1][-1]
else:
# We already stopped at the runahead limit.
limit_point = sorted(points)[-1]
limit_point = max(sequence_points)

# Adjust for future offset and stop point, if necessary.
# Adjust for future offset and stop point.
pre_adj_limit = limit_point
if self.max_future_offset is not None:
limit_point += self.max_future_offset
LOG.debug(f"{pre_adj_limit} -> {limit_point} (future offset)")
LOG.debug(
"Runahead (future trigger adjust):"
f" {pre_adj_limit} -> {limit_point}"
)
if self.stop_point and limit_point > self.stop_point:
limit_point = self.stop_point
LOG.debug(f"{pre_adj_limit} -> {limit_point} (stop point)")
LOG.debug(f"Runahead limit: {limit_point}")
LOG.debug(
"Runahead (stop point adjust):"
f" {pre_adj_limit} -> {limit_point} (stop point)"
)

LOG.debug(f"Runahead limit: {limit_point}")
self.runahead_limit_point = limit_point
return True

Expand Down
Loading

0 comments on commit 2615e2d

Please sign in to comment.