Skip to content

Commit

Permalink
Add tests for various scenarios for a faulty bjobs (#7193)
Browse files Browse the repository at this point in the history
Add tests for faulty bjobs behaviour

Currently no result from bjobs will take down Ert.

Also, there is currently no retry functionality for some valid
error scenarios, this is to be fixed later.
  • Loading branch information
berland authored Feb 29, 2024
1 parent c6ce683 commit 3575519
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 8 deletions.
32 changes: 24 additions & 8 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
Optional,
Tuple,
Union,
get_args,
)

from pydantic import BaseModel, Field
Expand Down Expand Up @@ -56,13 +57,20 @@ class _Stat(BaseModel):
jobs: Mapping[str, AnyJob]


def parse_bjobs(bjobs_output_raw: bytes) -> Dict[str, Dict[str, Dict[str, str]]]:
def parse_bjobs(bjobs_output: str) -> Dict[str, Dict[str, Dict[str, str]]]:
data: Dict[str, Dict[str, str]] = {}
for line in bjobs_output_raw.decode(errors="ignore").splitlines():
for line in bjobs_output.splitlines():
if not line or not line[0].isdigit():
continue
(jobid, _, stat, _) = line.split(maxsplit=3)
data[jobid] = {"job_state": stat}
tokens = line.split(maxsplit=3)
if len(tokens) >= 3 and tokens[0] and tokens[2]:
if tokens[2] not in get_args(JobState):
logger.error(
f"Unknown state {tokens[2]} obtained from "
f"LSF for jobid {tokens[0]}, ignored."
)
continue
data[tokens[0]] = {"job_state": tokens[2]}
return {"jobs": data}


Expand All @@ -87,6 +95,8 @@ def __init__(
self._max_attempt: int = 100
self._retry_sleep_period = 3

self._poll_period = _POLL_PERIOD

async def submit(
self,
iens: int,
Expand Down Expand Up @@ -167,17 +177,23 @@ async def kill(self, iens: int) -> None:
async def poll(self) -> None:
while True:
if not self._jobs.keys():
await asyncio.sleep(_POLL_PERIOD)
await asyncio.sleep(self._poll_period)
continue
proc = await asyncio.create_subprocess_exec(
process = await asyncio.create_subprocess_exec(
self._bjobs_cmd,
*self._jobs.keys(),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

stdout, _ = await proc.communicate()
stat = _Stat(**parse_bjobs(stdout))
stdout, stderr = await process.communicate()
if process.returncode:
# bjobs may give nonzero return code even when it is providing
# at least some correct information
logger.warning(
f"bjobs gave returncode {process.returncode} and error {stderr.decode()}"
)
stat = _Stat(**parse_bjobs(stdout.decode(errors="ignore")))
for job_id, job in stat.jobs.items():
if job_id not in self._jobs:
continue
Expand Down
163 changes: 163 additions & 0 deletions tests/unit_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
import asyncio
import os
import stat
from contextlib import ExitStack as does_not_raise
from pathlib import Path
from typing import Collection, Set, get_args

import pytest
from hypothesis import given
from hypothesis import strategies as st

from ert.scheduler import LsfDriver
from ert.scheduler.event import FinishedEvent, StartedEvent
from ert.scheduler.lsf_driver import JobState, parse_bjobs

valid_jobstates: Collection[str] = list(get_args(JobState))


def nonempty_string_without_whitespace():
return st.text(
st.characters(whitelist_categories=("Lu", "Ll", "Nd", "P")), min_size=1
)


@pytest.fixture
Expand Down Expand Up @@ -189,3 +203,152 @@ async def test_kill(
mocked_iens2jobid[iens_to_kill]
== Path("bkill_args").read_text(encoding="utf-8").strip()
)


@given(st.text())
def test_parse_bjobs_gives_empty_result_on_random_input(some_text):
assert parse_bjobs(some_text) == {"jobs": {}}


@pytest.mark.parametrize(
"bjobs_output, expected",
[
pytest.param(
"JOBID USER STAT\n1 foobart RUN",
{"1": {"job_state": "RUN"}},
id="basic",
),
pytest.param(
"1 foobart RUN", {"1": {"job_state": "RUN"}}, id="header_missing_ok"
),
pytest.param(
"1 _ RUN asdf asdf asdf",
{"1": {"job_state": "RUN"}},
id="line_remainder_ignored",
),
pytest.param("1 _ DONE", {"1": {"job_state": "DONE"}}, id="done"),
pytest.param(
"1 _ DONE\n2 _ RUN",
{"1": {"job_state": "DONE"}, "2": {"job_state": "RUN"}},
id="two_jobs",
),
],
)
def test_parse_bjobs_happy_path(bjobs_output, expected):
assert parse_bjobs(bjobs_output) == {"jobs": expected}


@given(
st.integers(min_value=1),
nonempty_string_without_whitespace(),
st.from_type(JobState),
)
def test_parse_bjobs(job_id, username, job_state):
assert parse_bjobs(f"{job_id} {username} {job_state}") == {
"jobs": {str(job_id): {"job_state": job_state}}
}


@given(nonempty_string_without_whitespace().filter(lambda x: x not in valid_jobstates))
def test_parse_bjobs_invalid_state_is_ignored(random_state):
assert parse_bjobs(f"1 _ {random_state}") == {"jobs": {}}


def test_parse_bjobs_invalid_state_is_logged(caplog):
# (cannot combine caplog with hypothesis)
parse_bjobs("1 _ FOO")
assert "Unknown state FOO" in caplog.text


BJOBS_HEADER = (
"JOBID USER STAT QUEUE FROM_HOST EXEC_HOST JOB_NAME SUBMIT_TIME"
)


async def poll(driver: LsfDriver, expected: Set[int], *, started=None, finished=None):
poll_task = asyncio.create_task(driver.poll())
completed = set()
try:
while True:
event = await driver.event_queue.get()
if isinstance(event, StartedEvent):
if started:
await started(event.iens)
elif isinstance(event, FinishedEvent):
if finished is not None:
await finished(event.iens, event.returncode, event.aborted)
completed.add(event.iens)
if completed == expected:
break
finally:
poll_task.cancel()


@pytest.mark.parametrize(
"bjobs_script, expectation",
[
pytest.param(
f"echo '{BJOBS_HEADER}\n1 someuser DONE foo'; exit 0",
does_not_raise(),
id="all-good",
),
pytest.param(
"echo 'No unfinished job found'; exit 0",
pytest.raises(asyncio.TimeoutError),
id="empty_cluster",
),
pytest.param(
"echo 'Job <1> is not found' >&2; exit 0",
# Actual command is seen to return zero in such a scenario
pytest.raises(asyncio.TimeoutError),
id="empty_cluster_specific_id",
),
pytest.param(
"echo '1 someuser DONE foo'",
does_not_raise(),
id="missing_header_is_accepted", # (debatable)
),
pytest.param(
f"echo '{BJOBS_HEADER}\n1 someuser DONE foo'; "
"echo 'Job <2> is not found' >&2 ; exit 255",
# If we have some success and some failures, actual command returns 255
does_not_raise(),
id="error_for_irrelevant_job_id",
),
pytest.param(
f"echo '{BJOBS_HEADER}\n2 someuser DONE foo'",
pytest.raises(asyncio.TimeoutError),
id="wrong-job-id",
),
pytest.param(
"exit 1",
pytest.raises(asyncio.TimeoutError),
id="exit-1",
),
pytest.param(
f"echo '{BJOBS_HEADER}\n1 someuser DONE foo'; exit 1",
# (this is not observed in reality)
does_not_raise(),
id="correct_output_but_exitcode_1",
),
pytest.param(
f"echo '{BJOBS_HEADER}\n1 someuser'; exit 0",
pytest.raises(asyncio.TimeoutError),
id="unparsable_output",
),
],
)
async def test_faulty_bjobs(monkeypatch, tmp_path, bjobs_script, expectation):
bin_path = tmp_path / "bin"
bin_path.mkdir()
monkeypatch.setenv("PATH", f"{bin_path}:{os.environ['PATH']}")
bsub_path = bin_path / "bsub"
bsub_path.write_text("#!/bin/sh\necho 'Job <1> is submitted to default queue'")
bsub_path.chmod(bsub_path.stat().st_mode | stat.S_IEXEC)
bjobs_path = bin_path / "bjobs"
bjobs_path.write_text(f"#!/bin/sh\n{bjobs_script}")
bjobs_path.chmod(bsub_path.stat().st_mode | stat.S_IEXEC)
driver = LsfDriver()
with expectation:
await driver.submit(0, "sleep")
await asyncio.wait_for(poll(driver, {0}), timeout=0.2)

0 comments on commit 3575519

Please sign in to comment.