Skip to content

Commit

Permalink
OpenPBS: Treat 'E' state the same as 'F'
Browse files Browse the repository at this point in the history
Both E and F mean that the process has finished. The difference seems
that jobs in E state might still be using the queue. This distinction is
of no concern to us.
  • Loading branch information
pinkwah committed Mar 6, 2024
1 parent dff37c3 commit 1c467fc
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 20 deletions.
33 changes: 15 additions & 18 deletions src/ert/scheduler/openpbs_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"B", # Begun
"E", # Exiting with or without errors
"F", # Finished (completed, failed or deleted)
"H", # Held,
"H", # Held
"M", # Moved to another server
"Q", # Queued
"R", # Running
Expand All @@ -28,7 +28,6 @@
"W", # Waiting
"X", # Expired (subjobs only)
]
JOBSTATE_INITIAL: JobState = "Q"

QSUB_INVALID_CREDENTIAL: int = 171
QSUB_PREMATURE_END_OF_MESSAGE: int = 183
Expand All @@ -46,20 +45,20 @@


class FinishedJob(BaseModel):
job_state: Literal["F"]
job_state: Literal["E", "F"]
returncode: Annotated[int, Field(alias="Exit_status")]


class QueuedJob(BaseModel):
job_state: Literal["H", "Q"]
job_state: Literal["H", "Q"] = "H"


class RunningJob(BaseModel):
job_state: Literal["R"]


class IgnoredJobstates(BaseModel):
job_state: Literal["B", "E", "M", "S", "T", "U", "W", "X"]
job_state: Literal["B", "M", "S", "T", "U", "W", "X"]


AnyJob = Annotated[
Expand Down Expand Up @@ -98,7 +97,7 @@ def __init__(
self._num_pbs_cmd_retries = 10
self._retry_pbs_cmd_interval = 2

self._jobs: MutableMapping[str, Tuple[int, JobState]] = {}
self._jobs: MutableMapping[str, Tuple[int, AnyJob]] = {}
self._iens2jobid: MutableMapping[int, str] = {}

def _resource_string(self) -> str:
Expand Down Expand Up @@ -187,7 +186,7 @@ async def submit(

job_id_ = process_message
logger.debug(f"Realization {iens} accepted by PBS, got id {job_id_}")
self._jobs[job_id_] = (iens, JOBSTATE_INITIAL)
self._jobs[job_id_] = (iens, QueuedJob())
self._iens2jobid[iens] = job_id_

async def kill(self, iens: int) -> None:
Expand Down Expand Up @@ -230,23 +229,21 @@ async def poll(self) -> None:
await asyncio.sleep(_POLL_PERIOD)

async def _process_job_update(self, job_id: str, job: AnyJob) -> None:
significant_transitions = {"Q": ["R", "F"], "R": ["F"]}
muted_transitions = {"H": ["Q", "E"], "Q": ["H", "E"], "R": ["E"]}
if job_id not in self._jobs:
return

iens, old_state = self._jobs[job_id]
new_state = job.job_state
if old_state == new_state:
return
if not new_state in significant_transitions[old_state]:
if not new_state in muted_transitions[old_state]:
logger.debug(
"Ignoring transition from "
f"{old_state} to {new_state} in {iens=} {job_id=}"
)
new_state = job
if isinstance(new_state, IgnoredJobstates):
logger.debug(
f"Job ID '{job_id}' for {iens=} is of unknown job state '{new_state.job_state}'"
)
return

self._jobs[job_id] = (iens, new_state)
if type(new_state) is type(old_state):
return

event: Optional[Event] = None
if isinstance(job, RunningJob):
logger.debug(f"Realization {iens} is running")
Expand Down
3 changes: 2 additions & 1 deletion tests/integration_tests/scheduler/bin/qstat.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import os
import random
from argparse import ArgumentParser, Namespace
from pathlib import Path
from typing import Any, Dict, Optional
Expand Down Expand Up @@ -37,7 +38,7 @@ def main() -> None:

state = "Q"
if returncode is not None:
state = "F"
state = random.choice("EF")
elif pid is not None:
state = "R"

Expand Down

0 comments on commit 1c467fc

Please sign in to comment.