Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🤖 Merge 8.2.x-sync into master #5910

Merged
merged 2 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/test_fast.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ jobs:
time-zone: 'XXX-09:35'

env:
# Use non-UTC time zone
TZ: ${{ matrix.time-zone }}
PYTEST_ADDOPTS: --cov --cov-append -n 5 --color=yes

Expand Down
1 change: 1 addition & 0 deletions changes.d/5893.fix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed bug in computing a time interval-based runahead limit when future triggers are present.
16 changes: 0 additions & 16 deletions cylc/flow/cycling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,22 +415,6 @@ def get_stop_point(self):
"""Return the last point of this sequence, or None if unbounded."""
pass

def get_first_n_points(self, n, point=None):
"""Return a list of first n points of this sequence."""
if point is None:
p1 = self.get_start_point()
else:
p1 = self.get_first_point(point)
if p1 is None:
return []
result = [p1]
for _ in range(1, n):
p1 = self.get_next_point_on_sequence(p1)
if p1 is None:
break
result.append(p1)
return result

@abstractmethod
def __eq__(self, other) -> bool:
# Return True if other (sequence) is equal to self.
Expand Down
16 changes: 13 additions & 3 deletions cylc/flow/cycling/iso8601.py
Original file line number Diff line number Diff line change
Expand Up @@ -947,12 +947,22 @@ def _interval_parse(interval_string):

def point_parse(point_string: str) -> 'TimePoint':
"""Parse a point_string into a proper TimePoint object."""
return _point_parse(point_string, WorkflowSpecifics.DUMP_FORMAT)
return _point_parse(
point_string,
WorkflowSpecifics.DUMP_FORMAT,
WorkflowSpecifics.ASSUMED_TIME_ZONE
)


@lru_cache(10000)
def _point_parse(point_string, _dump_fmt):
"""Parse a point_string into a proper TimePoint object."""
def _point_parse(point_string: str, _dump_fmt, _tz) -> 'TimePoint':
"""Parse a point_string into a proper TimePoint object.

Args:
point_string: The string to parse.
_dump_fmt: Dump format (only used to avoid invalid cache hits).
_tz: Cycle point time zone (only used to avoid invalid cache hits).
"""
if "%" in WorkflowSpecifics.DUMP_FORMAT:
# May be a custom not-quite ISO 8601 dump format.
with contextlib.suppress(IsodatetimeError):
Expand Down
118 changes: 45 additions & 73 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,82 +307,48 @@ def compute_runahead(self, force=False) -> bool:

With force=True we recompute the limit even if the base point has not
changed (needed if max_future_offset changed, or on reload).
"""

"""
limit = self.config.runahead_limit # e.g. P2 or P2D
count_cycles = False
with suppress(TypeError):
# Count cycles (integer cycling, and optional for datetime too).
ilimit = int(limit) # type: ignore
count_cycles = True

base_point: 'PointBase'
points: List['PointBase'] = []
base_point: Optional['PointBase'] = None

# First get the runahead base point.
if not self.main_pool:
# No tasks yet, just consider sequence points.
if count_cycles:
# Get the first ilimit points in each sequence.
# (After workflow start point - sequence may begin earlier).
points = [
point
for plist in [
seq.get_first_n_points(
ilimit, self.config.start_point)
for seq in self.config.sequences
]
for point in plist
]
# Drop points beyond the limit.
points = sorted(points)[:ilimit + 1]
base_point = min(points)

else:
# Start at first point in each sequence.
# (After workflow start point - sequence may begin earlier).
points = [
point
for point in {
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
}
if point is not None
]
base_point = min(points)
# Drop points beyond the limit.
points = [
point
for point in points
if point <= base_point + limit
]

# Find the earliest sequence point beyond the workflow start point.
base_point = min(
point
for point in {
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
}
if point is not None
)
else:
# Find the earliest point with unfinished tasks.
# Find the earliest point with incomplete tasks.
for point, itasks in sorted(self.get_tasks_by_point().items()):
# All n=0 tasks are incomplete by definition, but Cylc 7
# ignores failed ones (it does not ignore submit-failed!).
if (
points # got the limit already so this point too
or any(
not itask.state(
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_EXPIRED
)
or (
# For Cylc 7 back-compat, ignore incomplete tasks.
# (Success is required in back-compat mode, so
# failedtasks end up as incomplete; and Cylc 7
# ignores failed tasks in computing the limit).
itask.state.outputs.is_incomplete()
and not cylc.flow.flags.cylc7_back_compat
)
cylc.flow.flags.cylc7_back_compat and
all(
itask.state(TASK_STATUS_FAILED)
for itask in itasks
)
):
points.append(point)
continue
base_point = point
break

if not points:
return False
base_point = min(points)
if base_point is None:
return False

LOG.debug(f"Runahead: base point {base_point}")

if self._prev_runahead_base_point is None:
self._prev_runahead_base_point = base_point
Expand All @@ -399,8 +365,10 @@ def compute_runahead(self, force=False) -> bool:
# change or the runahead limit is already at stop point.
return False

# Get all cycle points possible after the base point.
sequence_points: Set['PointBase']
# Now generate all possible cycle points from the base point and stop
# at the runahead limit point. Note both cycle count and time interval
# limits involve all possible cycles, not just active cycles.
sequence_points: Set['PointBase'] = set()
if (
not force
and self._prev_runahead_sequence_points
Expand All @@ -410,44 +378,48 @@ def compute_runahead(self, force=False) -> bool:
sequence_points = self._prev_runahead_sequence_points
else:
# Recompute possible points.
sequence_points = set()
for sequence in self.config.sequences:
seq_point = sequence.get_next_point(base_point)
seq_point = sequence.get_first_point(base_point)
count = 1
while seq_point is not None:
if count_cycles:
# P0 allows only the base cycle point to run.
if count > 1 + ilimit:
# this point may be beyond the runahead limit
break
else:
# PT0H allows only the base cycle point to run.
if seq_point > base_point + limit:
# this point can not be beyond the runahead limit
break
count += 1
sequence_points.add(seq_point)
seq_point = sequence.get_next_point(seq_point)
self._prev_runahead_sequence_points = sequence_points
self._prev_runahead_base_point = base_point

points = set(points).union(sequence_points)

if count_cycles:
# Some sequences may have different intervals.
limit_point = sorted(points)[:(ilimit + 1)][-1]
# (len(list) may be less than ilimit due to sequence end)
limit_point = sorted(sequence_points)[:ilimit + 1][-1]
else:
# We already stopped at the runahead limit.
limit_point = sorted(points)[-1]
limit_point = max(sequence_points)

# Adjust for future offset and stop point, if necessary.
# Adjust for future offset and stop point.
pre_adj_limit = limit_point
if self.max_future_offset is not None:
limit_point += self.max_future_offset
LOG.debug(f"{pre_adj_limit} -> {limit_point} (future offset)")
LOG.debug(
"Runahead (future trigger adjust):"
f" {pre_adj_limit} -> {limit_point}"
)
if self.stop_point and limit_point > self.stop_point:
limit_point = self.stop_point
LOG.debug(f"{pre_adj_limit} -> {limit_point} (stop point)")
LOG.debug(f"Runahead limit: {limit_point}")
LOG.debug(
"Runahead (stop point adjust):"
f" {pre_adj_limit} -> {limit_point} (stop point)"
)

LOG.debug(f"Runahead limit: {limit_point}")
self.runahead_limit_point = limit_point
return True

Expand Down
Loading
Loading