Skip to content

Commit

Permalink
Add remaining states to the working STATES for LSF driver
Browse files Browse the repository at this point in the history
This commit moves status update into its own function
(_process_job_udpate). Additionally, it adds two standalone
states FinishedJobFailure and FinishedJobSuccess.
It adds test for process_job_update that it handles all the states properly.
  • Loading branch information
xjules committed Mar 12, 2024
1 parent 55b9e3a commit d649a76
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 43 deletions.
109 changes: 68 additions & 41 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,45 @@
logger = logging.getLogger(__name__)

JobState = Literal[
"EXIT", "DONE", "PEND", "RUN", "ZOMBI", "PDONE", "SSUSP", "USUSP", "UNKWN"
"EXIT", "DONE", "PEND", "RUN", "ZOMBI", "PDONE", "SSUSP", "USUSP", "PSUSP", "UNKWN"
]


class FinishedJob(BaseModel):
job_state: Literal["DONE", "EXIT"]
class IgnoredJobstates(BaseModel):
job_state: Literal["UNKWN"]


class FinishedJobSuccess(BaseModel):
job_state: Literal["DONE", "PDONE"]


class FinishedJobFailure(BaseModel):
job_state: Literal["EXIT", "ZOMBI"]


class QueuedJob(BaseModel):
job_state: Literal["PEND"]


class RunningJob(BaseModel):
job_state: Literal["RUN"]
job_state: Literal["RUN", "SSUSP", "USUSP", "PSUSP"]


AnyJob = Annotated[
Union[FinishedJob, QueuedJob, RunningJob], Field(discriminator="job_state")
Union[
FinishedJobSuccess, FinishedJobFailure, QueuedJob, RunningJob, IgnoredJobstates
],
Field(discriminator="job_state"),
]

_STATE_ORDER: dict[type[BaseModel], int] = {
IgnoredJobstates: -1,
QueuedJob: 0,
RunningJob: 1,
FinishedJobSuccess: 2,
FinishedJobFailure: 2,
}

LSF_INFO_JSON_FILENAME = "lsf_info.json"


Expand Down Expand Up @@ -90,7 +109,7 @@ def __init__(
self._bjobs_cmd = Path(bjobs_cmd or shutil.which("bjobs") or "bjobs")
self._bkill_cmd = Path(bkill_cmd or shutil.which("bkill") or "bkill")

self._jobs: MutableMapping[str, Tuple[int, JobState]] = {}
self._jobs: MutableMapping[str, Tuple[int, AnyJob]] = {}
self._iens2jobid: MutableMapping[int, str] = {}
self._max_attempt: int = 100
self._retry_sleep_period = 3
Expand Down Expand Up @@ -138,7 +157,7 @@ async def submit(
(Path(runpath) / LSF_INFO_JSON_FILENAME).write_text(
json.dumps({"job_id": job_id}), encoding="utf-8"
)
self._jobs[job_id] = (iens, "PEND")
self._jobs[job_id] = (iens, QueuedJob(job_state="PEND"))
self._iens2jobid[iens] = job_id

async def kill(self, iens: int) -> None:
Expand Down Expand Up @@ -195,46 +214,54 @@ async def poll(self) -> None:
)
stat = _Stat(**parse_bjobs(stdout.decode(errors="ignore")))
for job_id, job in stat.jobs.items():
if job_id not in self._jobs:
continue

iens, old_state = self._jobs[job_id]
new_state = job.job_state
if old_state == new_state:
continue

self._jobs[job_id] = (iens, new_state)
event: Optional[Event] = None
if isinstance(job, RunningJob):
logger.debug(f"Realization {iens} is running.")
event = StartedEvent(iens=iens)
elif isinstance(job, FinishedJob):
aborted = job.job_state == "EXIT"
event = FinishedEvent(
iens=iens,
returncode=1 if job.job_state == "EXIT" else 0,
aborted=aborted,
)
if aborted:
logger.warning(
f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) failed."
)
else:
logger.info(
f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) succeeded"
)
del self._jobs[job_id]
del self._iens2jobid[iens]

if event:
await self.event_queue.put(event)

await self._process_job_update(job_id, job)
missing_in_bjobs_output = set(self._jobs) - set(stat.jobs.keys())
if missing_in_bjobs_output:
logger.warning(
f"bjobs did not give status for job_ids {missing_in_bjobs_output}"
)
await asyncio.sleep(_POLL_PERIOD)

async def _process_job_update(self, job_id: str, new_state: AnyJob) -> None:
if job_id not in self._jobs:
return

iens, old_state = self._jobs[job_id]
if isinstance(new_state, IgnoredJobstates):
logger.debug(
f"Job ID '{job_id}' for {iens=} is of unknown job state '{new_state.job_state}'"
)
return

if _STATE_ORDER[type(new_state)] <= _STATE_ORDER[type(old_state)]:
return

self._jobs[job_id] = (iens, new_state)
event: Optional[Event] = None
if isinstance(new_state, RunningJob):
logger.debug(f"Realization {iens} is running")
event = StartedEvent(iens=iens)
elif isinstance(new_state, FinishedJobFailure):
logger.debug(
f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) failed"
)
event = FinishedEvent(
iens=iens,
returncode=1,
aborted=True,
)

elif isinstance(new_state, FinishedJobSuccess):
logger.debug(
f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) succeeded"
)
event = FinishedEvent(iens=iens, returncode=0)

if event:
if isinstance(event, FinishedEvent):
del self._jobs[job_id]
del self._iens2jobid[iens]
await self.event_queue.put(event)

async def finish(self) -> None:
pass
82 changes: 80 additions & 2 deletions tests/unit_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,25 @@
import stat
from contextlib import ExitStack as does_not_raise
from pathlib import Path
from typing import Collection, get_args
from typing import Collection, List, get_args

import pytest
from hypothesis import given
from hypothesis import strategies as st
from tests.utils import poll

from ert.scheduler import LsfDriver
from ert.scheduler.lsf_driver import JobState, parse_bjobs
from ert.scheduler.lsf_driver import (
FinishedEvent,
FinishedJobFailure,
FinishedJobSuccess,
JobState,
QueuedJob,
RunningJob,
StartedEvent,
_Stat,
parse_bjobs,
)

valid_jobstates: Collection[str] = list(get_args(JobState))

Expand All @@ -38,6 +48,74 @@ def capturing_bsub(monkeypatch, tmp_path):
bsub_path.chmod(bsub_path.stat().st_mode | stat.S_IEXEC)


@given(st.lists(st.sampled_from(JobState.__args__)))
async def test_events_produced_from_jobstate_updates(jobstate_sequence: List[str]):

started = any(
state in jobstate_sequence
for state in RunningJob.model_json_schema()["properties"]["job_state"]["enum"]
)
finished_success = any(
state in jobstate_sequence
for state in FinishedJobSuccess.model_json_schema()["properties"]["job_state"][
"enum"
]
)
finished_failure = any(
state in jobstate_sequence
for state in FinishedJobFailure.model_json_schema()["properties"]["job_state"][
"enum"
]
)

driver = LsfDriver()

async def mocked_submit(self, iens, *_args, **_kwargs):
"""A mocked submit is speedier than going through a command on disk"""
self._jobs["1"] = (iens, QueuedJob(job_state="PEND"))
self._iens2jobid[iens] = "1"

driver.submit = mocked_submit.__get__(driver)
await driver.submit(0, "_")

# Replicate the behaviour of multiple calls to poll()
for statestr in jobstate_sequence:
jobstate = _Stat(**{"jobs": {"1": {"job_state": statestr}}}).jobs["1"]
await driver._process_job_update("1", jobstate)

events = []
while not driver.event_queue.empty():
events.append(await driver.event_queue.get())

if not started and not finished_success and not finished_failure:
assert len(events) == 0

iens, state = driver._jobs["1"]
assert iens == 0
assert isinstance(state, QueuedJob)
elif started and not finished_success and not finished_failure:
assert len(events) == 1
assert events[0] == StartedEvent(iens=0)

iens, state = driver._jobs["1"]
assert iens == 0
assert isinstance(state, RunningJob)
elif started and finished_success and finished_failure:
assert len(events) <= 2 # The StartedEvent is not required
assert events[-1] == FinishedEvent(
iens=0, returncode=events[-1].returncode, aborted=events[-1].aborted
)
assert "1" not in driver._jobs
elif started is True and finished_success and not finished_failure:
assert len(events) <= 2 # The StartedEvent is not required
assert events[-1] == FinishedEvent(iens=0, returncode=0)
assert "1" not in driver._jobs
elif started is True and not finished_success and finished_failure:
assert len(events) <= 2 # The StartedEvent is not required
assert events[-1] == FinishedEvent(iens=0, returncode=1, aborted=True)
assert "1" not in driver._jobs


@pytest.mark.usefixtures("capturing_bsub")
async def test_submit_with_named_queue():
driver = LsfDriver(queue_name="foo")
Expand Down

0 comments on commit d649a76

Please sign in to comment.