From b4f4bb10630b1606c4bf4c6f36bb41696c1adc7a Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Mon, 11 Mar 2024 09:54:37 +0000 Subject: [PATCH] Broadcast changes to simulated tasks in task_job_manager to allow non simulation settings to be modified and to avoid changing the itask.tdef.rtconfig. --- cylc/flow/broadcast_mgr.py | 15 ++++++++++++++- cylc/flow/simulation.py | 19 +++++-------------- cylc/flow/task_job_mgr.py | 9 +++++++-- tests/integration/test_simulation.py | 7 +++++-- 4 files changed, 31 insertions(+), 19 deletions(-) diff --git a/cylc/flow/broadcast_mgr.py b/cylc/flow/broadcast_mgr.py index 81504055052..6cd007ee25a 100644 --- a/cylc/flow/broadcast_mgr.py +++ b/cylc/flow/broadcast_mgr.py @@ -30,11 +30,12 @@ from cylc.flow.cfgspec.workflow import SPEC from cylc.flow.cycling.loader import get_point, standardise_point_string from cylc.flow.exceptions import PointParsingError -from cylc.flow.parsec.util import listjoin +from cylc.flow.parsec.util import listjoin, pdeepcopy, poverride from cylc.flow.parsec.validate import BroadcastConfigValidator if TYPE_CHECKING: from cylc.flow.id import Tokens + from cylc.flow.task_proxy import TaskProxy ALL_CYCLE_POINTS_STRS = ["*", "all-cycle-points", "all-cycles"] @@ -179,6 +180,18 @@ def get_broadcast(self, tokens: 'Optional[Tokens]' = None) -> dict: addict(ret, self.broadcasts[cycle][namespace]) return ret + def get_updated_rtconfig(self, itask: 'TaskProxy') -> dict: + """Retrieve updated rtconfig for a single task proxy""" + overrides = self.get_broadcast( + itask.tokens + ) + if overrides: + rtconfig = pdeepcopy(itask.tdef.rtconfig) + poverride(rtconfig, overrides, prepend=True) + else: + rtconfig = itask.tdef.rtconfig + return rtconfig + def load_db_broadcast_states(self, row_idx, row): """Load broadcast variables from runtime DB broadcast states row.""" if row_idx == 0: diff --git a/cylc/flow/simulation.py b/cylc/flow/simulation.py index 72c4db879d2..edf52d322d3 100644 --- a/cylc/flow/simulation.py +++ b/cylc/flow/simulation.py @@ -23,10 +23,6 @@ from cylc.flow import LOG from cylc.flow.cycling.loader import get_point from cylc.flow.exceptions import PointParsingError -from cylc.flow.parsec.util import ( - pdeepcopy, - poverride -) from cylc.flow.platforms import FORBIDDEN_WITH_PLATFORM from cylc.flow.task_state import ( TASK_STATUS_RUNNING, @@ -38,7 +34,6 @@ from metomi.isodatetime.parsers import DurationParser if TYPE_CHECKING: - from cylc.flow.broadcast_mgr import BroadcastMgr from cylc.flow.task_events_mgr import TaskEventsManager from cylc.flow.task_proxy import TaskProxy from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager @@ -79,8 +74,8 @@ class ModeSettings: def __init__( self, itask: 'TaskProxy', - broadcast_mgr: 'BroadcastMgr', db_mgr: 'WorkflowDatabaseManager', + rtconfig ): # itask.summary['started_time'] and mode_settings.timeout need @@ -107,11 +102,7 @@ def __init__( try_num = db_info["try_num"] # Update anything changed by broadcast: - overrides = broadcast_mgr.get_broadcast(itask.tokens) - if overrides: - rtconfig = pdeepcopy(itask.tdef.rtconfig) - poverride(rtconfig, overrides, prepend=True) - + if rtconfig != itask.tdef.rtconfig: try: rtconfig["simulation"][ "fail cycle points" @@ -127,8 +118,6 @@ def __init__( rtconfig['simulation'][ 'fail cycle points' ] = itask.tdef.rtconfig['simulation']['fail cycle points'] - else: - rtconfig = itask.tdef.rtconfig # Calculate simulation info: self.simulated_run_length = ( @@ -280,10 +269,12 @@ def sim_time_check( # This occurs if the workflow has been restarted. if itask.mode_settings is None: + rtconfig = task_events_manager.broadcast_mgr.get_updated_rtconfig( + itask) itask.mode_settings = ModeSettings( itask, - task_events_manager.broadcast_mgr, db_mgr, + rtconfig ) if now > itask.mode_settings.timeout: diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 936bc38d0fc..f52c88b85e7 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -1001,13 +1001,18 @@ def _simulation_submit_task_jobs(self, itasks, workflow): now = time() now_str = get_time_string_from_unix_time(now) for itask in itasks: + # Handle broadcasts + rtconfig = self.task_events_mgr.broadcast_mgr.get_updated_rtconfig( + itask) + itask.summary['started_time'] = now self._set_retry_timers(itask) itask.mode_settings = ModeSettings( itask, - self.task_events_mgr.broadcast_mgr, - self.workflow_db_mgr + self.workflow_db_mgr, + rtconfig ) + itask.waiting_on_job_prep = False itask.submit_num += 1 diff --git a/tests/integration/test_simulation.py b/tests/integration/test_simulation.py index efcbef7d4cd..bad9b94835b 100644 --- a/tests/integration/test_simulation.py +++ b/tests/integration/test_simulation.py @@ -365,9 +365,11 @@ async def test_settings_broadcast( 'runtime': { 'one': { 'execution time limit': 'PT1S', + 'execution retry delays': '2*PT5S', 'simulation': { 'speedup factor': 1, 'fail cycle points': '1066', + 'fail try 1 only': False } }, } @@ -427,9 +429,10 @@ async def test_settings_broadcast( # Broadcast tasks will reparse correctly: schd.task_events_mgr.broadcast_mgr.put_broadcast( ['1066'], ['one'], [{ - 'simulation': {'fail cycle points': '1945, 1977, 1066'} + 'simulation': {'fail cycle points': '1945, 1977, 1066'}, + 'execution retry delays': '5*PT2S' }]) schd.task_job_mgr._simulation_submit_task_jobs( [itask], schd.workflow) assert itask.mode_settings.sim_task_fails is True - + assert itask.tdef.rtconfig['execution retry delays'] == [5.0, 5.0]