From dde28ec319b6c96f5e4037c120afb7a0a9440871 Mon Sep 17 00:00:00 2001 From: WXTIM <26465611+wxtim@users.noreply.github.com> Date: Thu, 18 Jan 2024 10:50:34 +0000 Subject: [PATCH] wip --- cylc/flow/simulation.py | 43 ++++++++++++++++------------ cylc/flow/task_job_mgr.py | 2 ++ tests/integration/test_simulation.py | 2 ++ 3 files changed, 29 insertions(+), 18 deletions(-) diff --git a/cylc/flow/simulation.py b/cylc/flow/simulation.py index f34e9d8b260..c6d043070ad 100644 --- a/cylc/flow/simulation.py +++ b/cylc/flow/simulation.py @@ -53,7 +53,11 @@ class ModeSettings: simulated_run_length: float = 0.0 sim_task_fails: bool = False - def __init__(self, itask: 'TaskProxy', broadcast_mgr: 'BroadcastMgr'): + def __init__( + self, + itask: 'TaskProxy', + broadcast_mgr: 'BroadcastMgr', + db_mgr: 'WorkflowDatabaseManager' = None): overrides = broadcast_mgr.get_broadcast(itask.tokens) if overrides: rtconfig = pdeepcopy(itask.tdef.rtconfig) @@ -68,6 +72,22 @@ def __init__(self, itask: 'TaskProxy', broadcast_mgr: 'BroadcastMgr'): itask.submit_num ) + # itask.summary['started_time'] and mode_settings.timeout need + # repopulating from the DB on workflow restart: + started_time = itask.summary['started_time'] + if started_time is None: + started_time = int( + TimePointParser() + .parse( + db_mgr.pub_dao.select_task_job( + *itask.tokens.relative_id.split("/") + )["time_submit"] + ) + .seconds_since_unix_epoch + ) + itask.summary['started_time'] = started_time + + self.timeout = started_time + self.simulated_run_length def configure_sim_modes(taskdefs, sim_mode): """Adjust task defs for simulation and dummy mode. @@ -207,24 +227,11 @@ def sim_time_check( ): continue - # Started time and mode_settings are not set on restart: - started_time = itask.summary['started_time'] - if started_time is None: - started_time = int( - TimePointParser() - .parse( - db_mgr.pub_dao.select_task_job( - *itask.tokens.relative_id.split("/") - )["time_submit"] - ) - .seconds_since_unix_epoch - ) - itask.summary['started_time'] = started_time - if itask.mode_settings is None: - itask.mode_settings = ModeSettings(itask, broadcast_mgr) - timeout = started_time + itask.mode_settings.simulated_run_length - if now > timeout: + if itask.mode_settings is None:# + itask.mode_settings = ModeSettings(itask, broadcast_mgr, db_mgr) + + if now > itask.mode_settings.timeout: job_d = itask.tokens.duplicate(job=str(itask.submit_num)) now_str = get_current_time_string() diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index cc2d70a9b61..276d3da7d8d 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -997,7 +997,9 @@ def _set_retry_timers( def _simulation_submit_task_jobs(self, itasks, workflow): """Simulation mode task jobs submission.""" + now = time() for itask in itasks: + itask.summary['started_time'] = now itask.mode_settings = ModeSettings( itask, self.task_events_mgr.broadcast_mgr) itask.waiting_on_job_prep = False diff --git a/tests/integration/test_simulation.py b/tests/integration/test_simulation.py index fe562ba63ce..d3220546757 100644 --- a/tests/integration/test_simulation.py +++ b/tests/integration/test_simulation.py @@ -134,11 +134,13 @@ def test_sim_time_check_sets_started_time( """ schd, _, msg_q = sim_time_check_setup one_1066 = schd.pool.get_task(ISO8601Point('1066'), 'one') + # Add info to databse as if it's be started before shutdown: schd.task_job_mgr._simulation_submit_task_jobs( [one_1066], schd.workflow) schd.workflow_db_mgr.process_queued_ops() one_1066.summary['started_time'] = None one_1066.state.is_queued = False + one_1066.mode_settings = None assert one_1066.summary['started_time'] is None assert sim_time_check( msg_q, [one_1066], schd.task_events_mgr.broadcast_mgr,