diff --git a/src/ert/scheduler/openpbs_driver.py b/src/ert/scheduler/openpbs_driver.py index b9577182f10..22235a415e6 100644 --- a/src/ert/scheduler/openpbs_driver.py +++ b/src/ert/scheduler/openpbs_driver.py @@ -18,7 +18,7 @@ "B", # Begun "E", # Exiting with or without errors "F", # Finished (completed, failed or deleted) - "H", # Held, + "H", # Held "M", # Moved to another server "Q", # Queued "R", # Running @@ -28,7 +28,6 @@ "W", # Waiting "X", # Expired (subjobs only) ] -JOBSTATE_INITIAL: JobState = "Q" QSUB_INVALID_CREDENTIAL: int = 171 QSUB_PREMATURE_END_OF_MESSAGE: int = 183 @@ -46,12 +45,12 @@ class FinishedJob(BaseModel): - job_state: Literal["F"] + job_state: Literal["E", "F"] returncode: Annotated[int, Field(alias="Exit_status")] class QueuedJob(BaseModel): - job_state: Literal["H", "Q"] + job_state: Literal["H", "Q"] = "H" class RunningJob(BaseModel): @@ -59,7 +58,7 @@ class RunningJob(BaseModel): class IgnoredJobstates(BaseModel): - job_state: Literal["B", "E", "M", "S", "T", "U", "W", "X"] + job_state: Literal["B", "M", "S", "T", "U", "W", "X"] AnyJob = Annotated[ @@ -98,7 +97,7 @@ def __init__( self._num_pbs_cmd_retries = 10 self._retry_pbs_cmd_interval = 2 - self._jobs: MutableMapping[str, Tuple[int, JobState]] = {} + self._jobs: MutableMapping[str, Tuple[int, AnyJob]] = {} self._iens2jobid: MutableMapping[int, str] = {} def _resource_string(self) -> str: @@ -187,7 +186,7 @@ async def submit( job_id_ = process_message logger.debug(f"Realization {iens} accepted by PBS, got id {job_id_}") - self._jobs[job_id_] = (iens, JOBSTATE_INITIAL) + self._jobs[job_id_] = (iens, QueuedJob()) self._iens2jobid[iens] = job_id_ async def kill(self, iens: int) -> None: @@ -230,23 +229,21 @@ async def poll(self) -> None: await asyncio.sleep(_POLL_PERIOD) async def _process_job_update(self, job_id: str, job: AnyJob) -> None: - significant_transitions = {"Q": ["R", "F"], "R": ["F"]} - muted_transitions = {"H": ["Q", "E"], "Q": ["H", "E"], "R": ["E"]} if job_id not in self._jobs: return iens, old_state = self._jobs[job_id] - new_state = job.job_state - if old_state == new_state: - return - if not new_state in significant_transitions[old_state]: - if not new_state in muted_transitions[old_state]: - logger.debug( - "Ignoring transition from " - f"{old_state} to {new_state} in {iens=} {job_id=}" - ) + new_state = job + if isinstance(new_state, IgnoredJobstates): + logger.debug( + f"Job ID '{job_id}' for {iens=} is of unknown job state '{new_state.job_state}'" + ) return + self._jobs[job_id] = (iens, new_state) + if type(new_state) is type(old_state): + return + event: Optional[Event] = None if isinstance(job, RunningJob): logger.debug(f"Realization {iens} is running") diff --git a/test-data/block_storage b/test-data/block_storage index ef83a469581..4b41bf0617f 160000 --- a/test-data/block_storage +++ b/test-data/block_storage @@ -1 +1 @@ -Subproject commit ef83a46958180d9dd7391b8cb71d688f30e3550d +Subproject commit 4b41bf0617fa1ded3d59be3215785d923ae09f70 diff --git a/tests/integration_tests/scheduler/bin/qstat.py b/tests/integration_tests/scheduler/bin/qstat.py index 9081e6ceea7..fa761086b68 100644 --- a/tests/integration_tests/scheduler/bin/qstat.py +++ b/tests/integration_tests/scheduler/bin/qstat.py @@ -2,6 +2,7 @@ import json import os +import random from argparse import ArgumentParser, Namespace from pathlib import Path from typing import Any, Dict, Optional @@ -37,7 +38,7 @@ def main() -> None: state = "Q" if returncode is not None: - state = "F" + state = random.choice("EF") elif pid is not None: state = "R"