From 5b00e12c5819b5f7c82f21b57c26f2d1efbb5556 Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Fri, 7 Feb 2025 20:50:09 +0000 Subject: [PATCH] Simplify simulation mode stuff --- cylc/flow/run_modes/simulation.py | 12 ++--- cylc/flow/run_modes/skip.py | 3 +- cylc/flow/task_events_mgr.py | 75 ++++++++++--------------------- 3 files changed, 32 insertions(+), 58 deletions(-) diff --git a/cylc/flow/run_modes/simulation.py b/cylc/flow/run_modes/simulation.py index cb39ed020c..685dc3b5e3 100644 --- a/cylc/flow/run_modes/simulation.py +++ b/cylc/flow/run_modes/simulation.py @@ -38,10 +38,12 @@ from cylc.flow.run_modes import RunMode from cylc.flow.task_outputs import ( TASK_OUTPUT_FAILED, + TASK_OUTPUT_STARTED, TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_SUCCEEDED, ) from cylc.flow.task_state import TASK_STATUS_RUNNING +from cylc.flow.util import serialise_set from cylc.flow.wallclock import get_unix_time_from_time_string @@ -92,22 +94,22 @@ def submit_task_job( itask.jobs.append( task_job_mgr.get_simulation_job_conf(itask) ) - task_job_mgr.task_events_mgr.process_message( - itask, INFO, TASK_OUTPUT_SUBMITTED, event_time=now[1] - ) + for output in (TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED): + task_job_mgr.task_events_mgr.process_message( + itask, INFO, output, event_time=now[1] + ) task_job_mgr.workflow_db_mgr.put_insert_task_jobs( itask, { 'time_submit': now[1], 'time_run': now[1], 'try_num': itask.get_try_num(), - 'flow_nums': str(list(itask.flow_nums)), + 'flow_nums': serialise_set(itask.flow_nums), 'is_manual_submit': itask.is_manual_submit, 'job_runner_name': RunMode.SIMULATION.value, 'platform_name': RunMode.SIMULATION.value, 'submit_status': 0 # Submission has succeeded } ) - itask.state.status = TASK_STATUS_RUNNING return True diff --git a/cylc/flow/run_modes/skip.py b/cylc/flow/run_modes/skip.py index 595de4bdcf..2b7bd86412 100644 --- a/cylc/flow/run_modes/skip.py +++ b/cylc/flow/run_modes/skip.py @@ -28,6 +28,7 @@ TASK_OUTPUT_STARTED ) from cylc.flow.run_modes import RunMode +from cylc.flow.util import serialise_set if TYPE_CHECKING: from cylc.flow.taskdef import TaskDef @@ -71,7 +72,7 @@ def submit_task_job( itask, { 'time_submit': now[1], 'try_num': itask.get_try_num(), - 'flow_nums': str(list(itask.flow_nums)), + 'flow_nums': serialise_set(itask.flow_nums), 'is_manual_submit': itask.is_manual_submit, 'job_runner_name': RunMode.SKIP.value, 'platform_name': RunMode.SKIP.value, diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index 845c85bc2d..7444f57309 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -777,17 +777,12 @@ def process_message( self._process_message_submitted(itask, event_time, forced) self.spawn_children(itask, TASK_OUTPUT_SUBMITTED) - # ... but either way update the job ID in the job proxy (it only + # update the job ID in the job proxy (it only # comes in via the submission message). - if itask.run_mode != RunMode.SIMULATION: - job_tokens = itask.tokens.duplicate( - job=str(itask.submit_num) - ) - self.data_store_mgr.delta_job_attr( - job_tokens, 'job_id', itask.summary['submit_method_id']) - else: - # In simulation mode submitted implies started: - self.spawn_children(itask, TASK_OUTPUT_STARTED) + job_tokens = itask.tokens.duplicate(job=str(itask.submit_num)) + self.data_store_mgr.delta_job_attr( + job_tokens, 'job_id', itask.summary['submit_method_id'] + ) elif message.startswith(FAIL_MESSAGE_PREFIX): # Task received signal. @@ -1503,54 +1498,30 @@ def _process_message_submitted( ) itask.set_summary_time('submitted', event_time) - if itask.run_mode == RunMode.SIMULATION: - # Simulate job started as well. - itask.set_summary_time('started', event_time) - if itask.state_reset(TASK_STATUS_RUNNING, forced=forced): + # Unset started and finished times in case of resubmission. + itask.set_summary_time('started') + itask.set_summary_time('finished') + if itask.state.status == TASK_STATUS_PREPARING: + # The job started message can (rarely) come in before the + # submit command returns - in which case do not go back to + # 'submitted'. + if itask.state_reset(TASK_STATUS_SUBMITTED, forced=forced): + itask.state_reset(is_queued=False, forced=forced) + self.setup_event_handlers( + itask, + TASK_OUTPUT_SUBMITTED, + f'job {TASK_OUTPUT_SUBMITTED}', + ) self.data_store_mgr.delta_task_state(itask) - itask.state.outputs.set_message_complete(TASK_OUTPUT_STARTED) - self.data_store_mgr.delta_task_output(itask, TASK_OUTPUT_STARTED) - - else: - # Unset started and finished times in case of resubmission. - itask.set_summary_time('started') - itask.set_summary_time('finished') - if itask.state.status == TASK_STATUS_PREPARING: - # The job started message can (rarely) come in before the - # submit command returns - in which case do not go back to - # 'submitted'. - if itask.state_reset(TASK_STATUS_SUBMITTED, forced=forced): - itask.state_reset(is_queued=False, forced=forced) - self.setup_event_handlers( - itask, - TASK_OUTPUT_SUBMITTED, - f'job {TASK_OUTPUT_SUBMITTED}', - ) - self.data_store_mgr.delta_task_state(itask) - self._reset_job_timers(itask) + self._reset_job_timers(itask) # Register the newly submitted job with the database and datastore. # Do after itask has changed state self._insert_task_job( itask, event_time, self.JOB_SUBMIT_SUCCESS_FLAG, forced=forced) job_tokens = itask.tokens.duplicate(job=str(itask.submit_num)) - self.data_store_mgr.delta_job_time( - job_tokens, - 'submitted', - event_time, - ) - if itask.run_mode == RunMode.SIMULATION: - # Simulate job started as well. - self.data_store_mgr.delta_job_time( - job_tokens, - 'started', - event_time, - ) - else: - self.data_store_mgr.delta_job_state( - job_tokens, - TASK_STATUS_SUBMITTED, - ) + self.data_store_mgr.delta_job_time(job_tokens, 'submitted', event_time) + self.data_store_mgr.delta_job_state(job_tokens, TASK_STATUS_SUBMITTED) def _insert_task_job( self, @@ -1850,7 +1821,7 @@ def _get_handler_template_variables( def _reset_job_timers(self, itask): """Set up poll timer and timeout for task.""" - if itask.transient: + if itask.run_mode == RunMode.SIMULATION or itask.transient: return if not itask.state(*TASK_STATUSES_ACTIVE):