Skip to content

Commit

Permalink
Implement run_exit_callback in disguise.
Browse files Browse the repository at this point in the history
Ensure storage state is set correctly when runs fail.

Adjust error string building

Passes test_tracking_integration
  • Loading branch information
berland committed Nov 29, 2023
1 parent a69d92b commit b32b42b
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 30 deletions.
6 changes: 3 additions & 3 deletions src/ert/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from ert.run_arg import RunArg

from .load_status import LoadResult, LoadStatus
from .realization_state import RealizationState
from .realization_state import RealizationState as RealizationStorageState

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -108,9 +108,9 @@ async def forward_model_ok(
final_result = response_result

run_arg.ensemble_storage.state_map[run_arg.iens] = (
RealizationState.HAS_DATA
RealizationStorageState.HAS_DATA
if final_result.status == LoadStatus.LOAD_SUCCESSFUL
else RealizationState.LOAD_FAILURE
else RealizationStorageState.LOAD_FAILURE
)

return final_result
16 changes: 11 additions & 5 deletions src/ert/scheduler/realization_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __init__(
self.start_time: Optional[datetime.datetime] = None
self.retries_left: int = retries
self._max_submit = retries + 1
self._callback_status_msg: Optional[str] = None
self._callback_status_msg: str = ""
super().__init__()

allocate = UNKNOWN.to(NOT_ACTIVE)
Expand Down Expand Up @@ -123,17 +123,18 @@ def on_enter_RUNNING(self) -> None:
self.start_time = datetime.datetime.now()

def on_enter_EXIT(self) -> None:
self.realization.run_arg.ensemble_storage.state_map[
self.realization.run_arg.iens
] = RealizationStorageState.LOAD_FAILURE

if self.retries_left > 0:
self.retry()
self.retries_left -= 1
else:
self.realization.run_arg.ensemble_storage.state_map[
self.realization.run_arg.iens
] = RealizationStorageState.LOAD_FAILURE

logger.error(
f"Realization: {self.realization.run_arg.iens} "
f"failed after reaching max submit ({self._max_submit}):"
f"\n\t{self._callback_status_msg}"
)
log_info_from_exit_file(
Path(self.realization.run_arg.runpath) / self.realization.exit_file
Expand All @@ -146,6 +147,11 @@ def on_enter_DONE(self) -> None:
def on_enter_DO_KILL(self) -> None:
asyncio.create_task(self.jobqueue.driver.kill(self))

def on_enter_IS_KILLED(self) -> None:
self.realization.run_arg.ensemble_storage.state_map[
self.realization.run_arg.iens
] = RealizationStorageState.LOAD_FAILURE


def log_info_from_exit_file(exit_file_path: Path) -> None:
if not exit_file_path.exists():
Expand Down
27 changes: 19 additions & 8 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,12 @@ def is_active(self) -> bool:
return any(
real.current_state
in (
RealizationState.WAITING,
RealizationState.SUBMITTED,
RealizationState.DONE,
RealizationState.DO_KILL,
RealizationState.PENDING,
RealizationState.RUNNING,
RealizationState.DONE,
RealizationState.SUBMITTED,
RealizationState.WAITING,
)
for real in self._realizations
)
Expand All @@ -124,18 +125,27 @@ async def run_done_callback(self, state: RealizationState) -> Optional[LoadStatu
f"Running done callback for {state.realization.run_arg.iens} ..(partly blocking)..",
end="",
)
callback_status, state._callback_status_msg = await forward_model_ok(
state.realization.run_arg
)
callback_status, status_msg = await forward_model_ok(state.realization.run_arg)
print(" done")

if state._callback_status_msg != "":
state._callback_status_msg = status_msg
else:
state._callback_status_msg += f"\nstatus from done callback: {status_msg}"

if callback_status == LoadStatus.LOAD_SUCCESSFUL:
state.validate()
elif callback_status == LoadStatus.TIME_MAP_FAILURE:
state.invalidate()
else: # LoadStatus.LOAD_FAILURE
state.loadfailed()

return callback_status

async def run_timeout_callback(self, state: RealizationState) -> None:
if state.realization.callback_timeout:
state.realization.callback_timeout(state.realization.run_arg.iens)

@property
def stopped(self) -> bool:
return self._queue_stopped
Expand Down Expand Up @@ -322,8 +332,6 @@ async def execute(
real.activate()
try:
while True:
await asyncio.sleep(1)

for func in evaluators:
func()

Expand All @@ -339,6 +347,7 @@ async def execute(
> datetime.timedelta(seconds=real.realization.max_runtime)
):
real.dokill()
await self.run_timeout_callback(real)

if self.stopped:
print("WE ARE STOPPED")
Expand All @@ -347,6 +356,8 @@ async def execute(
await self._statechanges_to_publish.put(CLOSE_PUBLISHER_SENTINEL)
return EVTYPE_ENSEMBLE_CANCELLED

await asyncio.sleep(1)

if not self.is_active():
print("not active, breaking out")
await asyncio.sleep(0.1) # Let changes be propagated to the queue
Expand Down
27 changes: 13 additions & 14 deletions tests/unit_tests/status/test_tracking_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
JOB_STATE_START,
REALIZATION_STATE_FINISHED,
)
from ert.realization_state import RealizationState
from ert.realization_state import RealizationState as RealizationStorageState
from ert.shared.feature_toggling import FeatureToggling


Expand All @@ -50,12 +50,13 @@ def check_expression(original, path_expression, expected, msg_start):
@pytest.mark.integration_test
@pytest.mark.parametrize(
(
"extra_config, extra_poly_eval, cmd_line_arguments,"
"num_successful,num_iters,progress,assert_present_in_snapshot, expected_state"
"extra_config, extra_poly_eval, cmd_line_arguments, "
"num_successful, num_iters, progress, "
"assert_present_in_snapshot, expected_storage_state"
),
[
pytest.param(
"MAX_RUNTIME 5",
"MAX_RUNTIME 1",
" import time; time.sleep(1000)",
[
ENSEMBLE_EXPERIMENT_MODE,
Expand All @@ -74,9 +75,8 @@ def check_expression(original, path_expression, expected, msg_start):
"The run is cancelled due to reaching MAX_RUNTIME",
),
],
[RealizationState.LOAD_FAILURE] * 2,
[RealizationStorageState.LOAD_FAILURE] * 2,
id="ee_poly_experiment_cancelled_by_max_runtime",
marks=pytest.mark.xfail(reason="Needs reimplementation"),
),
pytest.param(
"",
Expand All @@ -91,7 +91,7 @@ def check_expression(original, path_expression, expected, msg_start):
1,
1.0,
[(".*", "reals.*.jobs.*.status", JOB_STATE_FINISHED)],
[RealizationState.HAS_DATA] * 2,
[RealizationStorageState.HAS_DATA] * 2,
id="ee_poly_experiment",
),
pytest.param(
Expand All @@ -109,7 +109,7 @@ def check_expression(original, path_expression, expected, msg_start):
2,
1.0,
[(".*", "reals.*.jobs.*.status", JOB_STATE_FINISHED)],
[RealizationState.HAS_DATA] * 2,
[RealizationStorageState.HAS_DATA] * 2,
id="ee_poly_smoother",
),
pytest.param(
Expand All @@ -133,11 +133,10 @@ def check_expression(original, path_expression, expected, msg_start):
(".*", "reals.'1'.jobs.*.status", JOB_STATE_FINISHED),
],
[
RealizationState.LOAD_FAILURE,
RealizationState.HAS_DATA,
RealizationStorageState.LOAD_FAILURE,
RealizationStorageState.HAS_DATA,
],
id="ee_failing_poly_smoother",
marks=pytest.mark.skip(reason="Needs reimplementation"),
),
],
)
Expand All @@ -149,7 +148,7 @@ def test_tracking(
num_iters,
progress,
assert_present_in_snapshot,
expected_state,
expected_storage_state,
tmpdir,
source_root,
storage,
Expand Down Expand Up @@ -247,6 +246,7 @@ def test_tracking(
expected,
) in assert_present_in_snapshot:
for i, snapshot in snapshots.items():
print(i)
if re.match(iter_expression, str(i)):
check_expression(
snapshot.to_dict(),
Expand All @@ -256,7 +256,7 @@ def test_tracking(
)
thread.join()
state_map = storage.get_ensemble_by_name("default").state_map
assert state_map[:2] == expected_state
assert state_map[:2] == expected_storage_state
FeatureToggling.reset()


Expand Down Expand Up @@ -365,7 +365,6 @@ def run_sim(start_date):
summary.fwrite()


@pytest.mark.skip(reason="Needs reimplementation")
@pytest.mark.integration_test
def test_tracking_missing_ecl(
tmpdir,
Expand Down

0 comments on commit b32b42b

Please sign in to comment.