Skip to content

Commit

Permalink
Simplify simulation mode stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
MetRonnie committed Feb 10, 2025
1 parent f15526b commit 5b00e12
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 58 deletions.
12 changes: 7 additions & 5 deletions cylc/flow/run_modes/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@
from cylc.flow.run_modes import RunMode
from cylc.flow.task_outputs import (
TASK_OUTPUT_FAILED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_SUCCEEDED,
)
from cylc.flow.task_state import TASK_STATUS_RUNNING
from cylc.flow.util import serialise_set
from cylc.flow.wallclock import get_unix_time_from_time_string


Expand Down Expand Up @@ -92,22 +94,22 @@ def submit_task_job(
itask.jobs.append(
task_job_mgr.get_simulation_job_conf(itask)
)
task_job_mgr.task_events_mgr.process_message(
itask, INFO, TASK_OUTPUT_SUBMITTED, event_time=now[1]
)
for output in (TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED):
task_job_mgr.task_events_mgr.process_message(
itask, INFO, output, event_time=now[1]
)
task_job_mgr.workflow_db_mgr.put_insert_task_jobs(
itask, {
'time_submit': now[1],
'time_run': now[1],
'try_num': itask.get_try_num(),
'flow_nums': str(list(itask.flow_nums)),
'flow_nums': serialise_set(itask.flow_nums),
'is_manual_submit': itask.is_manual_submit,
'job_runner_name': RunMode.SIMULATION.value,
'platform_name': RunMode.SIMULATION.value,
'submit_status': 0 # Submission has succeeded
}
)
itask.state.status = TASK_STATUS_RUNNING
return True


Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/run_modes/skip.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
TASK_OUTPUT_STARTED
)
from cylc.flow.run_modes import RunMode
from cylc.flow.util import serialise_set

if TYPE_CHECKING:
from cylc.flow.taskdef import TaskDef
Expand Down Expand Up @@ -71,7 +72,7 @@ def submit_task_job(
itask, {
'time_submit': now[1],
'try_num': itask.get_try_num(),
'flow_nums': str(list(itask.flow_nums)),
'flow_nums': serialise_set(itask.flow_nums),
'is_manual_submit': itask.is_manual_submit,
'job_runner_name': RunMode.SKIP.value,
'platform_name': RunMode.SKIP.value,
Expand Down
75 changes: 23 additions & 52 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -777,17 +777,12 @@ def process_message(
self._process_message_submitted(itask, event_time, forced)
self.spawn_children(itask, TASK_OUTPUT_SUBMITTED)

# ... but either way update the job ID in the job proxy (it only
# update the job ID in the job proxy (it only
# comes in via the submission message).
if itask.run_mode != RunMode.SIMULATION:
job_tokens = itask.tokens.duplicate(
job=str(itask.submit_num)
)
self.data_store_mgr.delta_job_attr(
job_tokens, 'job_id', itask.summary['submit_method_id'])
else:
# In simulation mode submitted implies started:
self.spawn_children(itask, TASK_OUTPUT_STARTED)
job_tokens = itask.tokens.duplicate(job=str(itask.submit_num))
self.data_store_mgr.delta_job_attr(
job_tokens, 'job_id', itask.summary['submit_method_id']
)

elif message.startswith(FAIL_MESSAGE_PREFIX):
# Task received signal.
Expand Down Expand Up @@ -1503,54 +1498,30 @@ def _process_message_submitted(
)

itask.set_summary_time('submitted', event_time)
if itask.run_mode == RunMode.SIMULATION:
# Simulate job started as well.
itask.set_summary_time('started', event_time)
if itask.state_reset(TASK_STATUS_RUNNING, forced=forced):
# Unset started and finished times in case of resubmission.
itask.set_summary_time('started')
itask.set_summary_time('finished')
if itask.state.status == TASK_STATUS_PREPARING:
# The job started message can (rarely) come in before the
# submit command returns - in which case do not go back to
# 'submitted'.
if itask.state_reset(TASK_STATUS_SUBMITTED, forced=forced):
itask.state_reset(is_queued=False, forced=forced)
self.setup_event_handlers(
itask,
TASK_OUTPUT_SUBMITTED,
f'job {TASK_OUTPUT_SUBMITTED}',
)
self.data_store_mgr.delta_task_state(itask)
itask.state.outputs.set_message_complete(TASK_OUTPUT_STARTED)
self.data_store_mgr.delta_task_output(itask, TASK_OUTPUT_STARTED)

else:
# Unset started and finished times in case of resubmission.
itask.set_summary_time('started')
itask.set_summary_time('finished')
if itask.state.status == TASK_STATUS_PREPARING:
# The job started message can (rarely) come in before the
# submit command returns - in which case do not go back to
# 'submitted'.
if itask.state_reset(TASK_STATUS_SUBMITTED, forced=forced):
itask.state_reset(is_queued=False, forced=forced)
self.setup_event_handlers(
itask,
TASK_OUTPUT_SUBMITTED,
f'job {TASK_OUTPUT_SUBMITTED}',
)
self.data_store_mgr.delta_task_state(itask)
self._reset_job_timers(itask)
self._reset_job_timers(itask)

# Register the newly submitted job with the database and datastore.
# Do after itask has changed state
self._insert_task_job(
itask, event_time, self.JOB_SUBMIT_SUCCESS_FLAG, forced=forced)
job_tokens = itask.tokens.duplicate(job=str(itask.submit_num))
self.data_store_mgr.delta_job_time(
job_tokens,
'submitted',
event_time,
)
if itask.run_mode == RunMode.SIMULATION:
# Simulate job started as well.
self.data_store_mgr.delta_job_time(
job_tokens,
'started',
event_time,
)
else:
self.data_store_mgr.delta_job_state(
job_tokens,
TASK_STATUS_SUBMITTED,
)
self.data_store_mgr.delta_job_time(job_tokens, 'submitted', event_time)
self.data_store_mgr.delta_job_state(job_tokens, TASK_STATUS_SUBMITTED)

def _insert_task_job(
self,
Expand Down Expand Up @@ -1850,7 +1821,7 @@ def _get_handler_template_variables(
def _reset_job_timers(self, itask):
"""Set up poll timer and timeout for task."""

if itask.transient:
if itask.run_mode == RunMode.SIMULATION or itask.transient:
return

if not itask.state(*TASK_STATUSES_ACTIVE):
Expand Down

0 comments on commit 5b00e12

Please sign in to comment.