diff --git a/cylc/flow/cycling/__init__.py b/cylc/flow/cycling/__init__.py index e3d8a2fe64a..c47d242b5c5 100644 --- a/cylc/flow/cycling/__init__.py +++ b/cylc/flow/cycling/__init__.py @@ -415,22 +415,6 @@ def get_stop_point(self): """Return the last point of this sequence, or None if unbounded.""" pass - def get_first_n_points(self, n, point=None): - """Return a list of first n points of this sequence.""" - if point is None: - p1 = self.get_start_point() - else: - p1 = self.get_first_point(point) - if p1 is None: - return [] - result = [p1] - for _ in range(1, n): - p1 = self.get_next_point_on_sequence(p1) - if p1 is None: - break - result.append(p1) - return result - @abstractmethod def __eq__(self, other) -> bool: # Return True if other (sequence) is equal to self. diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 6ddc62d39f2..a70abd466e0 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -318,73 +318,45 @@ def compute_runahead(self, force=False) -> bool: ilimit = int(limit) # type: ignore count_cycles = True - base_point: 'PointBase' - points: List['PointBase'] = [] + base_point: 'PointBase' = None + # First get the runahead base point. if not self.main_pool: - # No tasks yet, just consider sequence points. - if count_cycles: - # Get the first ilimit points in each sequence. - # (After workflow start point - sequence may begin earlier). - points = [ - point - for plist in [ - seq.get_first_n_points( - ilimit, self.config.start_point) - for seq in self.config.sequences - ] - for point in plist - ] - # Drop points beyond the limit. - points = sorted(points)[:ilimit + 1] - base_point = min(points) - - else: - # Start at first point in each sequence. - # (After workflow start point - sequence may begin earlier). - points = [ - point - for point in { - seq.get_first_point(self.config.start_point) - for seq in self.config.sequences - } - if point is not None - ] - base_point = min(points) - # Drop points beyond the limit. - points = [ - point - for point in points - if point <= base_point + limit - ] - + # Find the earliest sequence point beyond the workflow start point. + points = [ + point + for point in { + seq.get_first_point(self.config.start_point) + for seq in self.config.sequences + } + if point is not None + ] + base_point = min(points) else: # Find the earliest point with unfinished tasks. for point, itasks in sorted(self.get_tasks_by_point().items()): - if ( - points # got the limit already so this point too - or any( - not itask.state( - TASK_STATUS_FAILED, - TASK_STATUS_SUCCEEDED, - TASK_STATUS_EXPIRED - ) - or ( - # For Cylc 7 back-compat, ignore incomplete tasks. - # (Success is required in back-compat mode, so - # failedtasks end up as incomplete; and Cylc 7 - # ignores failed tasks in computing the limit). - itask.state.outputs.is_incomplete() - and not cylc.flow.flags.cylc7_back_compat - ) - for itask in itasks + if any( + not itask.state( + TASK_STATUS_FAILED, + TASK_STATUS_SUCCEEDED, + TASK_STATUS_EXPIRED + ) or ( + # For Cylc 7 back-compat, ignore incomplete tasks. + # (Success is required in back-compat mode, so + # failed tasks end up as incomplete; and Cylc 7 + # ignores failed tasks in computing the limit). + itask.state.outputs.is_incomplete() + and not cylc.flow.flags.cylc7_back_compat ) + for itask in itasks ): - points.append(point) + base_point = point + break - if not points: - return False - base_point = min(points) + if base_point is None: + return False + + LOG.debug(f"Runahead: base point {base_point}") if self._prev_runahead_base_point is None: self._prev_runahead_base_point = base_point @@ -401,8 +373,10 @@ def compute_runahead(self, force=False) -> bool: # change or the runahead limit is already at stop point. return False - # Get all cycle points possible after the base point. - sequence_points: Set['PointBase'] + # Now generate all possible cycle points from the base point and stop + # at the runahead limit point. Note both cycle count and time interval + # limits involve all possible cycles, not just active cycles. + sequence_points: Set['PointBase'] = set() if ( not force and self._prev_runahead_sequence_points @@ -412,18 +386,19 @@ def compute_runahead(self, force=False) -> bool: sequence_points = self._prev_runahead_sequence_points else: # Recompute possible points. - sequence_points = set() for sequence in self.config.sequences: - seq_point = sequence.get_next_point(base_point) + seq_point = sequence.get_first_point(base_point) count = 1 while seq_point is not None: if count_cycles: # P0 allows only the base cycle point to run. if count > 1 + ilimit: + # this point may be beyond the runahead limit break else: # PT0H allows only the base cycle point to run. if seq_point > base_point + limit: + # this point can not be beyond the runahead limit break count += 1 sequence_points.add(seq_point) @@ -431,25 +406,28 @@ def compute_runahead(self, force=False) -> bool: self._prev_runahead_sequence_points = sequence_points self._prev_runahead_base_point = base_point - points = set(points).union(sequence_points) - if count_cycles: - # Some sequences may have different intervals. - limit_point = sorted(points)[:(ilimit + 1)][-1] + # (len(list) may less than ilimit due to sequence end) + limit_point = sorted(sequence_points)[:ilimit + 1][-1] else: - # We already stopped at the runahead limit. - limit_point = sorted(points)[-1] + limit_point = max(sequence_points) - # Adjust for future offset and stop point, if necessary. + # Adjust for future offset and stop point. pre_adj_limit = limit_point if self.max_future_offset is not None: limit_point += self.max_future_offset - LOG.debug(f"{pre_adj_limit} -> {limit_point} (future offset)") + LOG.debug( + "Runahead (future trigger adjust):" + f" {pre_adj_limit} -> {limit_point}" + ) if self.stop_point and limit_point > self.stop_point: limit_point = self.stop_point - LOG.debug(f"{pre_adj_limit} -> {limit_point} (stop point)") - LOG.debug(f"Runahead limit: {limit_point}") + LOG.debug( + "Runahead (stop point adjust):" + f" {pre_adj_limit} -> {limit_point} (stop point)" + ) + LOG.debug(f"Runahead limit: {limit_point}") self.runahead_limit_point = limit_point return True diff --git a/tests/unit/cycling/test_cycling.py b/tests/unit/cycling/test_cycling.py index ae90b68e62f..eaf48aa1e35 100644 --- a/tests/unit/cycling/test_cycling.py +++ b/tests/unit/cycling/test_cycling.py @@ -88,86 +88,3 @@ def test_parse_bad_exclusion(expression): """Tests incorrectly formatted exclusions""" with pytest.raises(Exception): parse_exclusion(expression) - - -@pytest.mark.parametrize( - 'sequence, wf_start_point, expected', - ( - ( - ('R/2/P2', 1), - None, - [2,4,6,8,10] - ), - ( - ('R/2/P2', 1), - 3, - [4,6,8,10,12] - ), - ), -) -def test_get_first_n_points_integer( - set_cycling_type, - sequence, wf_start_point, expected -): - """Test sequence get_first_n_points method. - - (The method is implemented in the base class). - """ - set_cycling_type(INTEGER_CYCLING_TYPE) - sequence = IntegerSequence(*sequence) - if wf_start_point is not None: - wf_start_point = IntegerPoint(wf_start_point) - expected = [ - IntegerPoint(p) - for p in expected - ] - assert ( - expected == ( - sequence.get_first_n_points( - len(expected), - wf_start_point - ) - ) - ) - - -@pytest.mark.parametrize( - 'sequence, wf_start_point, expected', - ( - ( - ('R/2008/P2Y', '2001'), - None, - ['2008', '2010', '2012', '2014', '2016'] - ), - ( - ('R/2008/P2Y', '2001'), - '2009', - ['2010', '2012', '2014', '2016', '2018'] - ), - ), -) -def test_get_first_n_points_iso8601( - set_cycling_type, - sequence, wf_start_point, expected -): - """Test sequence get_first_n_points method. - - (The method is implemented in the base class). - """ - set_cycling_type(ISO8601_CYCLING_TYPE, 'Z') - sequence = ISO8601Sequence(*sequence) - if wf_start_point is not None: - wf_start_point = ISO8601Point(wf_start_point) - expected = [ - ISO8601Point(p) - for p in expected - ] - - assert ( - expected == ( - sequence.get_first_n_points( - len(expected), - wf_start_point - ) - ) - )