Skip to content

Commit

Permalink
OpenPBS: Treat 'E' state the same as 'F'
Browse files Browse the repository at this point in the history
Both E and F mean that the process has finished. The difference seems
that jobs in E state might still be using the queue. This distinction is
of no concern to us.
  • Loading branch information
pinkwah committed Mar 7, 2024
1 parent 8a94112 commit f8ede6f
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 31 deletions.
58 changes: 33 additions & 25 deletions src/ert/scheduler/openpbs_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"B", # Begun
"E", # Exiting with or without errors
"F", # Finished (completed, failed or deleted)
"H", # Held,
"H", # Held
"M", # Moved to another server
"Q", # Queued
"R", # Running
Expand All @@ -28,7 +28,6 @@
"W", # Waiting
"X", # Expired (subjobs only)
]
JOBSTATE_INITIAL: JobState = "Q"

QSUB_INVALID_CREDENTIAL: int = 171
QSUB_PREMATURE_END_OF_MESSAGE: int = 183
Expand All @@ -45,21 +44,21 @@
QDEL_EXIT_CODES = [QDEL_REQUEST_INVALID, QDEL_JOB_HAS_FINISHED]


class FinishedJob(BaseModel):
job_state: Literal["F"]
returncode: Annotated[int, Field(alias="Exit_status")]
class IgnoredJobstates(BaseModel):
job_state: Literal["B", "M", "S", "T", "U", "W", "X"]


class QueuedJob(BaseModel):
job_state: Literal["H", "Q"]
job_state: Literal["H", "Q"] = "H"


class RunningJob(BaseModel):
job_state: Literal["R"]


class IgnoredJobstates(BaseModel):
job_state: Literal["B", "E", "M", "S", "T", "U", "W", "X"]
class FinishedJob(BaseModel):
job_state: Literal["E", "F"]
returncode: Annotated[int, Field(alias="Exit_status")]


AnyJob = Annotated[
Expand All @@ -68,6 +67,14 @@ class IgnoredJobstates(BaseModel):
]


_STATE_ORDER: dict[type[BaseModel], int] = {
IgnoredJobstates: -1,
QueuedJob: 0,
RunningJob: 1,
FinishedJob: 2,
}


class _Stat(BaseModel):
jobs: Annotated[Mapping[str, AnyJob], Field(alias="Jobs")]

Expand Down Expand Up @@ -98,7 +105,7 @@ def __init__(
self._num_pbs_cmd_retries = 10
self._retry_pbs_cmd_interval = 2

self._jobs: MutableMapping[str, Tuple[int, JobState]] = {}
self._jobs: MutableMapping[str, Tuple[int, AnyJob]] = {}
self._iens2jobid: MutableMapping[int, str] = {}

def _resource_string(self) -> str:
Expand Down Expand Up @@ -187,7 +194,7 @@ async def submit(

job_id_ = process_message
logger.debug(f"Realization {iens} accepted by PBS, got id {job_id_}")
self._jobs[job_id_] = (iens, JOBSTATE_INITIAL)
self._jobs[job_id_] = (iens, QueuedJob())
self._iens2jobid[iens] = job_id_

async def kill(self, iens: int) -> None:
Expand Down Expand Up @@ -229,31 +236,32 @@ async def poll(self) -> None:

await asyncio.sleep(_POLL_PERIOD)

async def _process_job_update(self, job_id: str, job: AnyJob) -> None:
significant_transitions = {"Q": ["R", "F"], "R": ["F"]}
muted_transitions = {"H": ["Q", "E"], "Q": ["H", "E"], "R": ["E"]}
async def _process_job_update(self, job_id: str, new_state: AnyJob) -> None:
if job_id not in self._jobs:
return

iens, old_state = self._jobs[job_id]
new_state = job.job_state
if old_state == new_state:
if isinstance(new_state, IgnoredJobstates):
logger.debug(
f"Job ID '{job_id}' for {iens=} is of unknown job state '{new_state.job_state}'"
)
return
if not new_state in significant_transitions[old_state]:
if not new_state in muted_transitions[old_state]:
logger.debug(
"Ignoring transition from "
f"{old_state} to {new_state} in {iens=} {job_id=}"
)

if _STATE_ORDER[type(new_state)] <= _STATE_ORDER[type(old_state)]:
return

self._jobs[job_id] = (iens, new_state)
event: Optional[Event] = None
if isinstance(job, RunningJob):
if isinstance(new_state, RunningJob):
logger.debug(f"Realization {iens} is running")
event = StartedEvent(iens=iens)
elif isinstance(job, FinishedJob):
aborted = job.returncode >= 256
event = FinishedEvent(iens=iens, returncode=job.returncode, aborted=aborted)
elif isinstance(new_state, FinishedJob):
aborted = new_state.returncode >= 256
event = FinishedEvent(
iens=iens,
returncode=new_state.returncode,
aborted=aborted,
)
if aborted:
logger.debug(
f"Realization {iens} (PBS-id: {self._iens2jobid[iens]}) failed"
Expand Down
3 changes: 2 additions & 1 deletion tests/integration_tests/scheduler/bin/qstat.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import os
import random
from argparse import ArgumentParser, Namespace
from pathlib import Path
from typing import Any, Dict, Optional
Expand Down Expand Up @@ -37,7 +38,7 @@ def main() -> None:

state = "Q"
if returncode is not None:
state = "F"
state = random.choice("EF")
elif pid is not None:
state = "R"

Expand Down
17 changes: 12 additions & 5 deletions tests/unit_tests/scheduler/test_openpbs_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@

from ert.scheduler import OpenPBSDriver
from ert.scheduler.openpbs_driver import (
JOBSTATE_INITIAL,
QDEL_JOB_HAS_FINISHED,
QDEL_REQUEST_INVALID,
QSUB_CONNECTION_REFUSED,
QSUB_INVALID_CREDENTIAL,
QSUB_PREMATURE_END_OF_MESSAGE,
FinishedEvent,
JobState,
QueuedJob,
RunningJob,
StartedEvent,
_Stat,
)
Expand All @@ -31,14 +32,14 @@ async def test_events_produced_from_jobstate_updates(jobstate_sequence: List[str
finished = False
if "R" in jobstate_sequence:
started = True
if "F" in jobstate_sequence:
if "F" in jobstate_sequence or "E" in jobstate_sequence:
finished = True

driver = OpenPBSDriver()

async def mocked_submit(self, iens, *_args, **_kwargs):
"""A mocked submit is speedier than going through a command on disk"""
self._jobs["1"] = (iens, JOBSTATE_INITIAL)
self._jobs["1"] = (iens, QueuedJob())
self._iens2jobid[iens] = "1"

driver.submit = mocked_submit.__get__(driver)
Expand All @@ -56,11 +57,17 @@ async def mocked_submit(self, iens, *_args, **_kwargs):

if started is False and finished is False:
assert len(events) == 0
assert driver._jobs["1"] in [(0, "Q"), (0, "H")]

iens, state = driver._jobs["1"]
assert iens == 0
assert isinstance(state, QueuedJob)
elif started is True and finished is False:
assert len(events) == 1
assert events[0] == StartedEvent(iens=0)
assert driver._jobs["1"] == (0, "R")

iens, state = driver._jobs["1"]
assert iens == 0
assert isinstance(state, RunningJob)
elif started is True and finished is True:
assert len(events) <= 2 # The StartedEvent is not required
assert events[-1] == FinishedEvent(iens=0, returncode=0)
Expand Down

0 comments on commit f8ede6f

Please sign in to comment.