Skip to content

Commit

Permalink
Have scheduler handle exceptions in Job.__call__()
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-eq committed Mar 11, 2024
1 parent bbcb52b commit d9ad8ac
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 23 deletions.
1 change: 0 additions & 1 deletion src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ async def __call__(
) -> None:
self._requested_max_submit = max_submit
await start.wait()

for attempt in range(max_submit):
await self._submit_and_run_once(sem)

Expand Down
46 changes: 24 additions & 22 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,35 +214,37 @@ async def gather_realization_jobs() -> list[BaseException | None]:
for scheduling_task in scheduling_tasks:
scheduling_task.cancel()

job_results: Optional[list[BaseException | None]] = None
try:
# there are two types of tasks and each type is handled differently:
# -`gather_realization_jobs` does not raise; rather, each job task's result
# is collected into the returning list. Exceptions from the job tasks are
# handled in the `else` branch after the evaluation has stopped.
# -If exception occurs, it must necessarily come from `scheduling tasks`
job_results = (
await asyncio.gather(gather_realization_jobs(), *scheduling_tasks)
)[0]

await asyncio.gather(gather_realization_jobs(), *scheduling_tasks)

except (asyncio.CancelledError, Exception) as e:
for job_task in self._tasks.values():
job_task.cancel()
# suppress potential error during driver.kill
with suppress(Exception):
await job_task
for scheduling_task in scheduling_tasks:
scheduling_task.cancel()
# Log and re-raise non-cancellation errors.
if not isinstance(e, asyncio.CancelledError):
logger.error(str(e))
raise e
else:
for result in job_results or []:
if not isinstance(result, asyncio.CancelledError) and isinstance(
result, Exception
):
logger.error(str(result))
raise result
if isinstance(e, asyncio.CancelledError):
for result in self._tasks.values() or []:
try:
await result
except asyncio.CancelledError:
continue
except Exception as e:
logger.error(str(result))
raise e
else:
for job_task in self._tasks.values():
job_task.cancel()
# suppress potential error during driver.kill
with suppress(Exception):
await job_task
for scheduling_task in scheduling_tasks:
scheduling_task.cancel()
# Log and re-raise non-cancellation errors.
if not isinstance(e, asyncio.CancelledError):
logger.error(str(e))
raise e

await self.driver.finish()

Expand Down
12 changes: 12 additions & 0 deletions tests/unit_tests/scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,3 +508,15 @@ async def test_that_process_event_queue_exceptions_are_propagated(

with pytest.raises(RuntimeError, match="Processing event queue failed"):
await sch.execute()


@pytest.mark.timeout(15)
async def test_that_driver_kill_exceptions_from_job_call_are_propagated(
mock_driver, realization
):
driver = mock_driver()
driver.kill = partial(mock_failure, "Driver kill failed")
sch = scheduler.Scheduler(driver, [realization])

with pytest.raises(RuntimeError, match=r"Driver kill failed"):
await sch.execute()

0 comments on commit d9ad8ac

Please sign in to comment.