diff --git a/cylc/flow/simulation.py b/cylc/flow/simulation.py index ead5148e61d..cfea86644a4 100644 --- a/cylc/flow/simulation.py +++ b/cylc/flow/simulation.py @@ -53,6 +53,7 @@ class ModeSettings: """ simulated_run_length: float = 0.0 sim_task_fails: bool = False + timeout: Optional[float] = None def __init__( self, @@ -60,29 +61,42 @@ def __init__( broadcast_mgr: 'BroadcastMgr', db_mgr: 'Optional[WorkflowDatabaseManager]' = None ): + + # itask.summary['started_time'] and mode_settings.timeout need + # repopulating from the DB on workflow restart: + started_time = itask.summary['started_time'] + try_num = None + if started_time is None and db_mgr: + # Get DB info + db_info = db_mgr.pub_dao.select_task_job( + *itask.tokens.relative_id.split("/")) + + # Get the started time: + started_time = get_unix_time_from_time_string( + db_info["time_submit"]) + itask.summary['started_time'] = started_time + + # Get the try number: + try_num = db_info["try_num"] + + # Update anything changed by broadcast: overrides = broadcast_mgr.get_broadcast(itask.tokens) if overrides: rtconfig = pdeepcopy(itask.tdef.rtconfig) poverride(rtconfig, overrides, prepend=True) else: rtconfig = itask.tdef.rtconfig + + # Calculate simulation info: self.simulated_run_length = ( get_simulated_run_len(rtconfig)) self.sim_task_fails = sim_task_failed( rtconfig['simulation'], itask.point, - itask.get_try_num() + try_num or itask.get_try_num() ) - - # itask.summary['started_time'] and mode_settings.timeout need - # repopulating from the DB on workflow restart: - started_time = itask.summary['started_time'] - if started_time is None and db_mgr: - started_time_str = db_mgr.pub_dao.select_task_job( - *itask.tokens.relative_id.split("/"))["time_submit"] - started_time = get_unix_time_from_time_string( - started_time_str) - itask.summary['started_time'] = started_time + from cylc.flow import LOG + LOG.critical(try_num or itask.get_try_num()) self.timeout = started_time + self.simulated_run_length @@ -217,12 +231,7 @@ def sim_time_check( sim_task_state_changed: bool = False for itask in itasks: - if ( - itask.state.status != TASK_STATUS_RUNNING - or itask.state.is_queued - or itask.state.is_held - or itask.state.is_runahead - ): + if itask.state.status != TASK_STATUS_RUNNING: continue if itask.mode_settings is None: @@ -250,7 +259,6 @@ def sim_time_check( # We've finished this psuedojob, so delete all the mode settings. itask.mode_settings = None sim_task_state_changed = True - itask.mode_settings = None return sim_task_state_changed @@ -267,5 +275,5 @@ def sim_task_failed( sim_conf['fail cycle points'] is None # i.e. "all" or point in sim_conf['fail cycle points'] ) and ( - try_num == 0 or not sim_conf['fail try 1 only'] + try_num == 1 or not sim_conf['fail try 1 only'] ) diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 58677b6c1c1..47cf419745c 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -1002,11 +1002,11 @@ def _simulation_submit_task_jobs(self, itasks, workflow): now_str = get_time_string_from_unix_time(now) for itask in itasks: itask.summary['started_time'] = now + self._set_retry_timers(itask, itask.tdef.rtconfig) itask.mode_settings = ModeSettings( itask, self.task_events_mgr.broadcast_mgr) itask.waiting_on_job_prep = False itask.submit_num += 1 - self._set_retry_timers(itask) itask.platform = {'name': 'SIMULATION'} itask.summary['job_runner_name'] = 'SIMULATION' @@ -1020,8 +1020,12 @@ def _simulation_submit_task_jobs(self, itasks, workflow): itask, INFO, TASK_OUTPUT_SUBMITTED, ) self.workflow_db_mgr.put_insert_task_jobs( - itask, {'time_submit': now_str}) - + itask, { + 'time_submit': now_str, + 'try_num': itask.get_try_num(), + } + ) + self.workflow_db_mgr.process_queued_ops() return itasks def _submit_task_jobs_callback(self, ctx, workflow, itasks): diff --git a/tests/integration/test_simulation.py b/tests/integration/test_simulation.py index 07d023dec53..cab7b5ec90b 100644 --- a/tests/integration/test_simulation.py +++ b/tests/integration/test_simulation.py @@ -184,7 +184,7 @@ def test_fail_once(sim_time_check_setup, itask, point, results, monkeypatch): ISO8601Point(point), itask) for i, result in enumerate(results): - itask.try_timers['execution-retry'].num = i - 1 + itask.try_timers['execution-retry'].num = i schd.task_job_mgr._simulation_submit_task_jobs( [itask], schd.workflow) assert itask.mode_settings.sim_task_fails is result @@ -256,7 +256,7 @@ def test_task_sped_up(sim_time_check_setup, monkeytime): ) is True -async def test_simulation_mode_settings_restart( +async def test_settings_restart( monkeytime, flow, scheduler, start ): """Check that simulation mode settings are correctly restored @@ -286,22 +286,19 @@ async def test_simulation_mode_settings_restart( } }) schd = scheduler(id_) - msg_q = Queue() # Start the workflow: async with start(schd): - # Pick the task proxy, Mock its start time, set state to running: itask = schd.pool.get_tasks()[0] - itask.summary['started_time'] = 0 - itask.state.status = 'running' - - # Submit it, then mock the wallclock and assert that it's not finshed. schd.task_job_mgr._simulation_submit_task_jobs( [itask], schd.workflow) - monkeytime(0) + og_timeout = itask.mode_settings.timeout + + # Mock wallclock < sim end timeout + monkeytime(itask.mode_settings.timeout - 1) assert sim_time_check( - msg_q, [itask], schd.task_events_mgr.broadcast_mgr, + schd.message_queue, [itask], schd.task_events_mgr.broadcast_mgr, schd.workflow_db_mgr ) is False @@ -310,60 +307,27 @@ async def test_simulation_mode_settings_restart( async with start(schd): # Get our tasks and fix wallclock: itask = schd.pool.get_tasks()[0] - monkeytime(12) - itask.state.status = 'running' # Check that we haven't got started time & mode settings back: assert itask.summary['started_time'] is None assert itask.mode_settings is None - # Set the start time in the database to 0 to make the - # test simpler: - schd.workflow_db_mgr.put_insert_task_jobs( - itask, {'time_submit': '1970-01-01T00:00:00Z'}) - schd.workflow_db_mgr.process_queued_ops() - # Set the current time: - monkeytime(12) + monkeytime(og_timeout - 1) assert sim_time_check( - msg_q, [itask], schd.task_events_mgr.broadcast_mgr, + schd.message_queue, [itask], schd.task_events_mgr.broadcast_mgr, schd.workflow_db_mgr ) is False # Check that the itask.mode_settings is now re-created assert itask.mode_settings.__dict__ == { 'simulated_run_length': 60.0, - 'sim_task_fails': False, - 'timeout': 60.0 - } - - # Set the current time > timeout - monkeytime(61) - assert sim_time_check( - msg_q, [itask], schd.task_events_mgr.broadcast_mgr, - schd.workflow_db_mgr - ) is True - - assert itask.mode_settings is None - - schd.task_events_mgr.broadcast_mgr.put_broadcast( - ['1066'], ['one'], [{ - 'execution time limit': 'PT1S'}]) - - assert itask.mode_settings is None - - schd.task_job_mgr._simulation_submit_task_jobs( - [itask], schd.workflow) - - assert itask.submit_num == 2 - assert itask.mode_settings.__dict__ == { - 'simulated_run_length': 1.0, - 'sim_task_fails': False, - 'timeout': 62.0 + 'sim_task_fails': True, + 'timeout': float(int(og_timeout)) } -async def test_simulation_mode_settings_reload( +async def test_settings_reload( flow, scheduler, start, run_simjob ): """Check that simulation mode settings are changed for future @@ -374,9 +338,7 @@ async def test_simulation_mode_settings_reload( 'scheduler': {'cycle point format': '%Y'}, 'scheduling': { 'initial cycle point': '1066', - 'graph': { - 'R1': 'one' - } + 'graph': {'R1': 'one'} }, 'runtime': { 'one': { @@ -405,3 +367,57 @@ async def test_simulation_mode_settings_reload( # Submit second psuedo-job and "run" to success: assert run_simjob(schd) == 'succeeded' + + +async def test_settings_broadcast( + flow, scheduler, start, complete, monkeytime +): + """Assert that broadcasting a change in the settings for a task + affects subsequent psuedo-submissions. + """ + id_ = flow({ + 'scheduler': {'cycle point format': '%Y'}, + 'scheduling': { + 'initial cycle point': '1066', + 'graph': {'R1': 'one'} + }, + 'runtime': { + 'one': { + 'execution time limit': 'PT1S', + 'simulation': { + 'speedup factor': 1, + 'fail cycle points': '1066', + } + }, + } + }, defaults=False) + schd = scheduler(id_, paused_start=False, run_mode='simulation') + async with start(schd): + itask = schd.pool.get_tasks()[0] + itask.state.is_queued = False + + # Submit the first - the sim task will fail: + schd.task_job_mgr._simulation_submit_task_jobs( + [itask], schd.workflow) + assert itask.mode_settings.sim_task_fails is True + + # Let task finish. + monkeytime(itask.mode_settings.timeout + 1) + assert sim_time_check( + schd.message_queue, [itask], schd.task_events_mgr.broadcast_mgr, + schd.workflow_db_mgr + ) is True + + # The mode_settings object has been cleared: + assert itask.mode_settings is None + + # Change a setting using broadcast: + schd.task_events_mgr.broadcast_mgr.put_broadcast( + ['1066'], ['one'], [{ + 'simulation': {'fail cycle points': ''} + }]) + + # Submit again - result is different: + schd.task_job_mgr._simulation_submit_task_jobs( + [itask], schd.workflow) + assert itask.mode_settings.sim_task_fails is False