Skip to content

Commit

Permalink
Broadcast changes to simulated tasks in task_job_manager
Browse files Browse the repository at this point in the history
to allow non simulation settings to be modified and
to avoid changing the itask.tdef.rtconfig.
  • Loading branch information
wxtim committed Mar 11, 2024
1 parent 269078a commit b4f4bb1
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 19 deletions.
15 changes: 14 additions & 1 deletion cylc/flow/broadcast_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@
from cylc.flow.cfgspec.workflow import SPEC
from cylc.flow.cycling.loader import get_point, standardise_point_string
from cylc.flow.exceptions import PointParsingError
from cylc.flow.parsec.util import listjoin
from cylc.flow.parsec.util import listjoin, pdeepcopy, poverride
from cylc.flow.parsec.validate import BroadcastConfigValidator

if TYPE_CHECKING:
from cylc.flow.id import Tokens
from cylc.flow.task_proxy import TaskProxy


ALL_CYCLE_POINTS_STRS = ["*", "all-cycle-points", "all-cycles"]
Expand Down Expand Up @@ -179,6 +180,18 @@ def get_broadcast(self, tokens: 'Optional[Tokens]' = None) -> dict:
addict(ret, self.broadcasts[cycle][namespace])
return ret

def get_updated_rtconfig(self, itask: 'TaskProxy') -> dict:
"""Retrieve updated rtconfig for a single task proxy"""
overrides = self.get_broadcast(
itask.tokens
)
if overrides:
rtconfig = pdeepcopy(itask.tdef.rtconfig)
poverride(rtconfig, overrides, prepend=True)
else:
rtconfig = itask.tdef.rtconfig
return rtconfig

def load_db_broadcast_states(self, row_idx, row):
"""Load broadcast variables from runtime DB broadcast states row."""
if row_idx == 0:
Expand Down
19 changes: 5 additions & 14 deletions cylc/flow/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@
from cylc.flow import LOG
from cylc.flow.cycling.loader import get_point
from cylc.flow.exceptions import PointParsingError
from cylc.flow.parsec.util import (
pdeepcopy,
poverride
)
from cylc.flow.platforms import FORBIDDEN_WITH_PLATFORM
from cylc.flow.task_state import (
TASK_STATUS_RUNNING,
Expand All @@ -38,7 +34,6 @@
from metomi.isodatetime.parsers import DurationParser

if TYPE_CHECKING:
from cylc.flow.broadcast_mgr import BroadcastMgr
from cylc.flow.task_events_mgr import TaskEventsManager
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager
Expand Down Expand Up @@ -79,8 +74,8 @@ class ModeSettings:
def __init__(
self,
itask: 'TaskProxy',
broadcast_mgr: 'BroadcastMgr',
db_mgr: 'WorkflowDatabaseManager',
rtconfig
):

# itask.summary['started_time'] and mode_settings.timeout need
Expand All @@ -107,11 +102,7 @@ def __init__(
try_num = db_info["try_num"]

# Update anything changed by broadcast:
overrides = broadcast_mgr.get_broadcast(itask.tokens)
if overrides:
rtconfig = pdeepcopy(itask.tdef.rtconfig)
poverride(rtconfig, overrides, prepend=True)

if rtconfig != itask.tdef.rtconfig:
try:
rtconfig["simulation"][
"fail cycle points"
Expand All @@ -127,8 +118,6 @@ def __init__(
rtconfig['simulation'][
'fail cycle points'
] = itask.tdef.rtconfig['simulation']['fail cycle points']
else:
rtconfig = itask.tdef.rtconfig

# Calculate simulation info:
self.simulated_run_length = (
Expand Down Expand Up @@ -280,10 +269,12 @@ def sim_time_check(

# This occurs if the workflow has been restarted.
if itask.mode_settings is None:
rtconfig = task_events_manager.broadcast_mgr.get_updated_rtconfig(
itask)
itask.mode_settings = ModeSettings(
itask,
task_events_manager.broadcast_mgr,
db_mgr,
rtconfig
)

if now > itask.mode_settings.timeout:
Expand Down
9 changes: 7 additions & 2 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1001,13 +1001,18 @@ def _simulation_submit_task_jobs(self, itasks, workflow):
now = time()
now_str = get_time_string_from_unix_time(now)
for itask in itasks:
# Handle broadcasts
rtconfig = self.task_events_mgr.broadcast_mgr.get_updated_rtconfig(
itask)

itask.summary['started_time'] = now
self._set_retry_timers(itask)
itask.mode_settings = ModeSettings(
itask,
self.task_events_mgr.broadcast_mgr,
self.workflow_db_mgr
self.workflow_db_mgr,
rtconfig
)

itask.waiting_on_job_prep = False
itask.submit_num += 1

Expand Down
7 changes: 5 additions & 2 deletions tests/integration/test_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,11 @@ async def test_settings_broadcast(
'runtime': {
'one': {
'execution time limit': 'PT1S',
'execution retry delays': '2*PT5S',
'simulation': {
'speedup factor': 1,
'fail cycle points': '1066',
'fail try 1 only': False
}
},
}
Expand Down Expand Up @@ -427,9 +429,10 @@ async def test_settings_broadcast(
# Broadcast tasks will reparse correctly:
schd.task_events_mgr.broadcast_mgr.put_broadcast(
['1066'], ['one'], [{
'simulation': {'fail cycle points': '1945, 1977, 1066'}
'simulation': {'fail cycle points': '1945, 1977, 1066'},
'execution retry delays': '5*PT2S'
}])
schd.task_job_mgr._simulation_submit_task_jobs(
[itask], schd.workflow)
assert itask.mode_settings.sim_task_fails is True

assert itask.tdef.rtconfig['execution retry delays'] == [5.0, 5.0]

0 comments on commit b4f4bb1

Please sign in to comment.