Skip to content

Commit

Permalink
Ensure that mode_settings are deleted from the task proxy when
Browse files Browse the repository at this point in the history
a psuedo-job finishes. Add test for workflow reload.
  • Loading branch information
wxtim committed Jan 30, 2024
1 parent ec74145 commit bac32c9
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 5 deletions.
8 changes: 6 additions & 2 deletions cylc/flow/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def __init__(

self.timeout = started_time + self.simulated_run_length


def configure_sim_modes(taskdefs, sim_mode):
"""Adjust task defs for simulation and dummy mode.
Expand Down Expand Up @@ -219,6 +220,7 @@ def sim_time_check(
"""
now = time()
sim_task_state_changed: bool = False

for itask in itasks:
if (
itask.state.status != TASK_STATUS_RUNNING
Expand All @@ -228,8 +230,6 @@ def sim_time_check(
):
continue



if itask.mode_settings is None:
itask.mode_settings = ModeSettings(itask, broadcast_mgr, db_mgr)

Expand All @@ -251,7 +251,11 @@ def sim_time_check(
message_queue.put(
TaskMsg(job_d, now_str, 'DEBUG', msg)
)

# We've finished this psuedojob, so delete all the mode settings.
itask.mode_settings = None
sim_task_state_changed = True
itask.mode_settings = None
return sim_task_state_changed


Expand Down
102 changes: 99 additions & 3 deletions tests/integration/test_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from pathlib import Path
import pytest
from pytest import param
from queue import Queue
Expand Down Expand Up @@ -239,7 +240,7 @@ def test_task_sped_up(sim_time_check_setup, monkeytime):


async def test_simulation_mode_settings_restart(
monkeytime, flow, scheduler, run, start
monkeytime, flow, scheduler, start
):
"""Check that simulation mode settings are correctly restored
upon restart.
Expand All @@ -258,8 +259,11 @@ async def test_simulation_mode_settings_restart(
'runtime': {
'one': {
'execution time limit': 'PT1M',
'execution retry delays': 'P0Y',
'simulation': {
'speedup factor': 1
'speedup factor': 1,
'fail cycle points': 'all',
'fail try 1 only': True,
}
},
}
Expand Down Expand Up @@ -292,8 +296,9 @@ async def test_simulation_mode_settings_restart(
monkeytime(12)
itask.state.status = 'running'

# Check that we haven't got started time back
# Check that we haven't got started time & mode settings back:
assert itask.summary['started_time'] is None
assert itask.mode_settings is None

# Set the start time in the database to 0 to make the
# test simpler:
Expand All @@ -308,9 +313,100 @@ async def test_simulation_mode_settings_restart(
schd.workflow_db_mgr
) is False

# Check that the itask.mode_settings is now re-created
assert itask.mode_settings.__dict__ == {
'simulated_run_length': 60.0,
'sim_task_fails': False,
'timeout': 60.0
}

# Set the current time > timeout
monkeytime(61)
assert sim_time_check(
msg_q, [itask], schd.task_events_mgr.broadcast_mgr,
schd.workflow_db_mgr
) is True

assert itask.mode_settings is None

schd.task_events_mgr.broadcast_mgr.put_broadcast(
['1066'], ['one'], [{
'execution time limit': 'PT1S'}])

assert itask.mode_settings is None

schd.task_job_mgr._simulation_submit_task_jobs(
[itask], schd.workflow)

assert itask.submit_num == 2
assert itask.mode_settings.__dict__ == {
'simulated_run_length': 1.0,
'sim_task_fails': False,
'timeout': 62.0
}


async def test_simulation_mode_settings_reload(
monkeytime, flow, scheduler, start
):
"""Check that simulation mode settings are changed for future
pseudo jobs on reload.
"""
def sim_run(schd):
"""Submit and run a psuedo-job.
"""
# Get the only task proxy, submit the psuedo job:
itask = schd.pool.get_tasks()[0]
itask.state.is_queued = False
monkeytime(0)
schd.task_job_mgr._simulation_submit_task_jobs(
[itask], schd.workflow)
monkeytime(61)

# Run Time Check
assert sim_time_check(
schd.message_queue, [itask], schd.task_events_mgr.broadcast_mgr,
schd.workflow_db_mgr
) is True

# Capture result process queue.
out = schd.message_queue.queue[0].message
schd.process_queued_task_messages()
return out

id_ = flow({
'scheduler': {'cycle point format': '%Y'},
'scheduling': {
'initial cycle point': '1066',
'graph': {
'R1': 'one'
}
},
'runtime': {
'one': {
'execution time limit': 'PT1M',
'execution retry delays': 'P0Y',
'simulation': {
'speedup factor': 1,
'fail cycle points': 'all',
'fail try 1 only': False,
}
},
}
})
schd = scheduler(id_)
async with start(schd):
# Submit first psuedo-job and "run" to failure:
assert sim_run(schd) == 'failed'

# Modify config as if reinstall had taken place:
conf_file = Path(schd.workflow_run_dir) / 'flow.cylc'
conf_file.write_text(
conf_file.read_text().replace('False', 'True'))

# Reload Workflow:
await schd.command_reload_workflow()

# Submit second psuedo-job and "run" to success:
assert sim_run(schd) == 'succeeded'

0 comments on commit bac32c9

Please sign in to comment.