Skip to content

Commit

Permalink
Simplify and fix runahead computation.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Dec 19, 2023
1 parent 8f975c4 commit 5e7f4d4
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 172 deletions.
16 changes: 0 additions & 16 deletions cylc/flow/cycling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,22 +415,6 @@ def get_stop_point(self):
"""Return the last point of this sequence, or None if unbounded."""
pass

def get_first_n_points(self, n, point=None):
"""Return a list of first n points of this sequence."""
if point is None:
p1 = self.get_start_point()
else:
p1 = self.get_first_point(point)
if p1 is None:
return []
result = [p1]
for _ in range(1, n):
p1 = self.get_next_point_on_sequence(p1)
if p1 is None:
break
result.append(p1)
return result

@abstractmethod
def __eq__(self, other) -> bool:
# Return True if other (sequence) is equal to self.
Expand Down
124 changes: 51 additions & 73 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,73 +318,45 @@ def compute_runahead(self, force=False) -> bool:
ilimit = int(limit) # type: ignore
count_cycles = True

base_point: 'PointBase'
points: List['PointBase'] = []
base_point: 'PointBase' = None

# First get the runahead base point.
if not self.main_pool:
# No tasks yet, just consider sequence points.
if count_cycles:
# Get the first ilimit points in each sequence.
# (After workflow start point - sequence may begin earlier).
points = [
point
for plist in [
seq.get_first_n_points(
ilimit, self.config.start_point)
for seq in self.config.sequences
]
for point in plist
]
# Drop points beyond the limit.
points = sorted(points)[:ilimit + 1]
base_point = min(points)

else:
# Start at first point in each sequence.
# (After workflow start point - sequence may begin earlier).
points = [
point
for point in {
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
}
if point is not None
]
base_point = min(points)
# Drop points beyond the limit.
points = [
point
for point in points
if point <= base_point + limit
]

# Find the earliest sequence point beyond the workflow start point.
points = [
point
for point in {
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
}
if point is not None
]
base_point = min(points)
else:
# Find the earliest point with unfinished tasks.
for point, itasks in sorted(self.get_tasks_by_point().items()):
if (
points # got the limit already so this point too
or any(
not itask.state(
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_EXPIRED
)
or (
# For Cylc 7 back-compat, ignore incomplete tasks.
# (Success is required in back-compat mode, so
# failedtasks end up as incomplete; and Cylc 7
# ignores failed tasks in computing the limit).
itask.state.outputs.is_incomplete()
and not cylc.flow.flags.cylc7_back_compat
)
for itask in itasks
if any(
not itask.state(
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_EXPIRED
) or (
# For Cylc 7 back-compat, ignore incomplete tasks.
# (Success is required in back-compat mode, so
# failed tasks end up as incomplete; and Cylc 7
# ignores failed tasks in computing the limit).
itask.state.outputs.is_incomplete()
and not cylc.flow.flags.cylc7_back_compat
)
for itask in itasks
):
points.append(point)
base_point = point
break

if not points:
return False
base_point = min(points)
if base_point is None:
return False

LOG.debug(f"Runahead: base point {base_point}")

if self._prev_runahead_base_point is None:
self._prev_runahead_base_point = base_point
Expand All @@ -401,8 +373,10 @@ def compute_runahead(self, force=False) -> bool:
# change or the runahead limit is already at stop point.
return False

# Get all cycle points possible after the base point.
sequence_points: Set['PointBase']
# Now generate all possible cycle points from the base point and stop
# at the runahead limit point. Note both cycle count and time interval
# limits involve all possible cycles, not just active cycles.
sequence_points: Set['PointBase'] = set()
if (
not force
and self._prev_runahead_sequence_points
Expand All @@ -412,44 +386,48 @@ def compute_runahead(self, force=False) -> bool:
sequence_points = self._prev_runahead_sequence_points
else:
# Recompute possible points.
sequence_points = set()
for sequence in self.config.sequences:
seq_point = sequence.get_next_point(base_point)
seq_point = sequence.get_first_point(base_point)
count = 1
while seq_point is not None:
if count_cycles:
# P0 allows only the base cycle point to run.
if count > 1 + ilimit:
# this point may be beyond the runahead limit
break
else:
# PT0H allows only the base cycle point to run.
if seq_point > base_point + limit:
# this point can not be beyond the runahead limit
break
count += 1
sequence_points.add(seq_point)
seq_point = sequence.get_next_point(seq_point)
self._prev_runahead_sequence_points = sequence_points
self._prev_runahead_base_point = base_point

points = set(points).union(sequence_points)

if count_cycles:
# Some sequences may have different intervals.
limit_point = sorted(points)[:(ilimit + 1)][-1]
# (len(list) may less than ilimit due to sequence end)
limit_point = sorted(sequence_points)[:ilimit + 1][-1]
else:
# We already stopped at the runahead limit.
limit_point = sorted(points)[-1]
limit_point = max(sequence_points)

# Adjust for future offset and stop point, if necessary.
# Adjust for future offset and stop point.
pre_adj_limit = limit_point
if self.max_future_offset is not None:
limit_point += self.max_future_offset
LOG.debug(f"{pre_adj_limit} -> {limit_point} (future offset)")
LOG.debug(
"Runahead (future trigger adjust):"
f" {pre_adj_limit} -> {limit_point}"
)
if self.stop_point and limit_point > self.stop_point:
limit_point = self.stop_point
LOG.debug(f"{pre_adj_limit} -> {limit_point} (stop point)")
LOG.debug(f"Runahead limit: {limit_point}")
LOG.debug(
"Runahead (stop point adjust):"
f" {pre_adj_limit} -> {limit_point} (stop point)"
)

LOG.debug(f"Runahead limit: {limit_point}")
self.runahead_limit_point = limit_point
return True

Expand Down
83 changes: 0 additions & 83 deletions tests/unit/cycling/test_cycling.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,86 +88,3 @@ def test_parse_bad_exclusion(expression):
"""Tests incorrectly formatted exclusions"""
with pytest.raises(Exception):
parse_exclusion(expression)


@pytest.mark.parametrize(
'sequence, wf_start_point, expected',
(
(
('R/2/P2', 1),
None,
[2,4,6,8,10]
),
(
('R/2/P2', 1),
3,
[4,6,8,10,12]
),
),
)
def test_get_first_n_points_integer(
set_cycling_type,
sequence, wf_start_point, expected
):
"""Test sequence get_first_n_points method.
(The method is implemented in the base class).
"""
set_cycling_type(INTEGER_CYCLING_TYPE)
sequence = IntegerSequence(*sequence)
if wf_start_point is not None:
wf_start_point = IntegerPoint(wf_start_point)
expected = [
IntegerPoint(p)
for p in expected
]
assert (
expected == (
sequence.get_first_n_points(
len(expected),
wf_start_point
)
)
)


@pytest.mark.parametrize(
'sequence, wf_start_point, expected',
(
(
('R/2008/P2Y', '2001'),
None,
['2008', '2010', '2012', '2014', '2016']
),
(
('R/2008/P2Y', '2001'),
'2009',
['2010', '2012', '2014', '2016', '2018']
),
),
)
def test_get_first_n_points_iso8601(
set_cycling_type,
sequence, wf_start_point, expected
):
"""Test sequence get_first_n_points method.
(The method is implemented in the base class).
"""
set_cycling_type(ISO8601_CYCLING_TYPE, 'Z')
sequence = ISO8601Sequence(*sequence)
if wf_start_point is not None:
wf_start_point = ISO8601Point(wf_start_point)
expected = [
ISO8601Point(p)
for p in expected
]

assert (
expected == (
sequence.get_first_n_points(
len(expected),
wf_start_point
)
)
)

0 comments on commit 5e7f4d4

Please sign in to comment.