Skip to content

Commit

Permalink
Response to review
Browse files Browse the repository at this point in the history
- Re-order the ModeSettings.__init__ method to allow
  db loading before setting the start time.
- Add `try_num` to database.
- Fix/streamline the tests - add explicit broadcast test.
  • Loading branch information
wxtim committed Feb 6, 2024
1 parent 8449871 commit 2600125
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 73 deletions.
46 changes: 27 additions & 19 deletions cylc/flow/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,36 +53,50 @@ class ModeSettings:
"""
simulated_run_length: float = 0.0
sim_task_fails: bool = False
timeout: Optional[float] = None

def __init__(
self,
itask: 'TaskProxy',
broadcast_mgr: 'BroadcastMgr',
db_mgr: 'Optional[WorkflowDatabaseManager]' = None
):

# itask.summary['started_time'] and mode_settings.timeout need
# repopulating from the DB on workflow restart:
started_time = itask.summary['started_time']
try_num = None
if started_time is None and db_mgr:
# Get DB info
db_info = db_mgr.pub_dao.select_task_job(
*itask.tokens.relative_id.split("/"))

# Get the started time:
started_time = get_unix_time_from_time_string(
db_info["time_submit"])
itask.summary['started_time'] = started_time

# Get the try number:
try_num = db_info["try_num"]

# Update anything changed by broadcast:
overrides = broadcast_mgr.get_broadcast(itask.tokens)
if overrides:
rtconfig = pdeepcopy(itask.tdef.rtconfig)
poverride(rtconfig, overrides, prepend=True)
else:
rtconfig = itask.tdef.rtconfig

# Calculate simulation info:
self.simulated_run_length = (
get_simulated_run_len(rtconfig))
self.sim_task_fails = sim_task_failed(
rtconfig['simulation'],
itask.point,
itask.get_try_num()
try_num or itask.get_try_num()
)

# itask.summary['started_time'] and mode_settings.timeout need
# repopulating from the DB on workflow restart:
started_time = itask.summary['started_time']
if started_time is None and db_mgr:
started_time_str = db_mgr.pub_dao.select_task_job(
*itask.tokens.relative_id.split("/"))["time_submit"]
started_time = get_unix_time_from_time_string(
started_time_str)
itask.summary['started_time'] = started_time
from cylc.flow import LOG
LOG.critical(try_num or itask.get_try_num())
self.timeout = started_time + self.simulated_run_length


Expand Down Expand Up @@ -217,12 +231,7 @@ def sim_time_check(
sim_task_state_changed: bool = False

for itask in itasks:
if (
itask.state.status != TASK_STATUS_RUNNING
or itask.state.is_queued
or itask.state.is_held
or itask.state.is_runahead
):
if itask.state.status != TASK_STATUS_RUNNING:
continue

if itask.mode_settings is None:
Expand Down Expand Up @@ -250,7 +259,6 @@ def sim_time_check(
# We've finished this psuedojob, so delete all the mode settings.
itask.mode_settings = None
sim_task_state_changed = True
itask.mode_settings = None
return sim_task_state_changed


Expand All @@ -267,5 +275,5 @@ def sim_task_failed(
sim_conf['fail cycle points'] is None # i.e. "all"
or point in sim_conf['fail cycle points']
) and (
try_num == 0 or not sim_conf['fail try 1 only']
try_num == 1 or not sim_conf['fail try 1 only']
)
10 changes: 7 additions & 3 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1002,11 +1002,11 @@ def _simulation_submit_task_jobs(self, itasks, workflow):
now_str = get_time_string_from_unix_time(now)
for itask in itasks:
itask.summary['started_time'] = now
self._set_retry_timers(itask, itask.tdef.rtconfig)
itask.mode_settings = ModeSettings(
itask, self.task_events_mgr.broadcast_mgr)
itask.waiting_on_job_prep = False
itask.submit_num += 1
self._set_retry_timers(itask)

itask.platform = {'name': 'SIMULATION'}
itask.summary['job_runner_name'] = 'SIMULATION'
Expand All @@ -1020,8 +1020,12 @@ def _simulation_submit_task_jobs(self, itasks, workflow):
itask, INFO, TASK_OUTPUT_SUBMITTED,
)
self.workflow_db_mgr.put_insert_task_jobs(
itask, {'time_submit': now_str})

itask, {
'time_submit': now_str,
'try_num': itask.get_try_num(),
}
)
self.workflow_db_mgr.process_queued_ops()
return itasks

def _submit_task_jobs_callback(self, ctx, workflow, itasks):
Expand Down
118 changes: 67 additions & 51 deletions tests/integration/test_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def test_fail_once(sim_time_check_setup, itask, point, results, monkeypatch):
ISO8601Point(point), itask)

for i, result in enumerate(results):
itask.try_timers['execution-retry'].num = i - 1
itask.try_timers['execution-retry'].num = i
schd.task_job_mgr._simulation_submit_task_jobs(
[itask], schd.workflow)
assert itask.mode_settings.sim_task_fails is result
Expand Down Expand Up @@ -256,7 +256,7 @@ def test_task_sped_up(sim_time_check_setup, monkeytime):
) is True


async def test_simulation_mode_settings_restart(
async def test_settings_restart(
monkeytime, flow, scheduler, start
):
"""Check that simulation mode settings are correctly restored
Expand Down Expand Up @@ -286,22 +286,19 @@ async def test_simulation_mode_settings_restart(
}
})
schd = scheduler(id_)
msg_q = Queue()

# Start the workflow:
async with start(schd):
# Pick the task proxy, Mock its start time, set state to running:
itask = schd.pool.get_tasks()[0]
itask.summary['started_time'] = 0
itask.state.status = 'running'

# Submit it, then mock the wallclock and assert that it's not finshed.
schd.task_job_mgr._simulation_submit_task_jobs(
[itask], schd.workflow)
monkeytime(0)

og_timeout = itask.mode_settings.timeout

# Mock wallclock < sim end timeout
monkeytime(itask.mode_settings.timeout - 1)
assert sim_time_check(
msg_q, [itask], schd.task_events_mgr.broadcast_mgr,
schd.message_queue, [itask], schd.task_events_mgr.broadcast_mgr,
schd.workflow_db_mgr
) is False

Expand All @@ -310,60 +307,27 @@ async def test_simulation_mode_settings_restart(
async with start(schd):
# Get our tasks and fix wallclock:
itask = schd.pool.get_tasks()[0]
monkeytime(12)
itask.state.status = 'running'

# Check that we haven't got started time & mode settings back:
assert itask.summary['started_time'] is None
assert itask.mode_settings is None

# Set the start time in the database to 0 to make the
# test simpler:
schd.workflow_db_mgr.put_insert_task_jobs(
itask, {'time_submit': '1970-01-01T00:00:00Z'})
schd.workflow_db_mgr.process_queued_ops()

# Set the current time:
monkeytime(12)
monkeytime(og_timeout - 1)
assert sim_time_check(
msg_q, [itask], schd.task_events_mgr.broadcast_mgr,
schd.message_queue, [itask], schd.task_events_mgr.broadcast_mgr,
schd.workflow_db_mgr
) is False

# Check that the itask.mode_settings is now re-created
assert itask.mode_settings.__dict__ == {
'simulated_run_length': 60.0,
'sim_task_fails': False,
'timeout': 60.0
}

# Set the current time > timeout
monkeytime(61)
assert sim_time_check(
msg_q, [itask], schd.task_events_mgr.broadcast_mgr,
schd.workflow_db_mgr
) is True

assert itask.mode_settings is None

schd.task_events_mgr.broadcast_mgr.put_broadcast(
['1066'], ['one'], [{
'execution time limit': 'PT1S'}])

assert itask.mode_settings is None

schd.task_job_mgr._simulation_submit_task_jobs(
[itask], schd.workflow)

assert itask.submit_num == 2
assert itask.mode_settings.__dict__ == {
'simulated_run_length': 1.0,
'sim_task_fails': False,
'timeout': 62.0
'sim_task_fails': True,
'timeout': float(int(og_timeout))
}


async def test_simulation_mode_settings_reload(
async def test_settings_reload(
flow, scheduler, start, run_simjob
):
"""Check that simulation mode settings are changed for future
Expand All @@ -374,9 +338,7 @@ async def test_simulation_mode_settings_reload(
'scheduler': {'cycle point format': '%Y'},
'scheduling': {
'initial cycle point': '1066',
'graph': {
'R1': 'one'
}
'graph': {'R1': 'one'}
},
'runtime': {
'one': {
Expand Down Expand Up @@ -405,3 +367,57 @@ async def test_simulation_mode_settings_reload(

# Submit second psuedo-job and "run" to success:
assert run_simjob(schd) == 'succeeded'


async def test_settings_broadcast(
flow, scheduler, start, complete, monkeytime
):
"""Assert that broadcasting a change in the settings for a task
affects subsequent psuedo-submissions.
"""
id_ = flow({
'scheduler': {'cycle point format': '%Y'},
'scheduling': {
'initial cycle point': '1066',
'graph': {'R1': 'one'}
},
'runtime': {
'one': {
'execution time limit': 'PT1S',
'simulation': {
'speedup factor': 1,
'fail cycle points': '1066',
}
},
}
}, defaults=False)
schd = scheduler(id_, paused_start=False, run_mode='simulation')
async with start(schd):
itask = schd.pool.get_tasks()[0]
itask.state.is_queued = False

# Submit the first - the sim task will fail:
schd.task_job_mgr._simulation_submit_task_jobs(
[itask], schd.workflow)
assert itask.mode_settings.sim_task_fails is True

# Let task finish.
monkeytime(itask.mode_settings.timeout + 1)
assert sim_time_check(
schd.message_queue, [itask], schd.task_events_mgr.broadcast_mgr,
schd.workflow_db_mgr
) is True

# The mode_settings object has been cleared:
assert itask.mode_settings is None

# Change a setting using broadcast:
schd.task_events_mgr.broadcast_mgr.put_broadcast(
['1066'], ['one'], [{
'simulation': {'fail cycle points': ''}
}])

# Submit again - result is different:
schd.task_job_mgr._simulation_submit_task_jobs(
[itask], schd.workflow)
assert itask.mode_settings.sim_task_fails is False

0 comments on commit 2600125

Please sign in to comment.