Skip to content

Commit

Permalink
Let bsub retry on identified SSH failure
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Mar 12, 2024
1 parent 45815b6 commit d7bebaf
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/ert/scheduler/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ async def _execute_with_retry(
await asyncio.sleep(retry_interval)
error_message = (
f'Command "{shlex.join(cmd_with_args)}" failed after {retries} retries'
f" with error {error_message}"
f" with error {error_message or '<empty>'}"
)
_logger.error(error_message)
return False, error_message
29 changes: 12 additions & 17 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class RunningJob(BaseModel):
]

LSF_INFO_JSON_FILENAME = "lsf_info.json"
BSUB_FLAKY_SSH = 255


class _Stat(BaseModel):
Expand Down Expand Up @@ -93,7 +94,8 @@ def __init__(
self._jobs: MutableMapping[str, Tuple[int, JobState]] = {}
self._iens2jobid: MutableMapping[int, str] = {}
self._max_attempt: int = 100
self._retry_sleep_period = 3
self._sleep_time_between_cmd_retries = 3
self._bsub_retries = 10

self._poll_period = _POLL_PERIOD

Expand All @@ -112,25 +114,18 @@ async def submit(
[str(self._bsub_cmd)] + arg_queue_name + ["-J", name, executable, *args]
)
logger.debug(f"Submitting to LSF with command {shlex.join(bsub_with_args)}")
process = await asyncio.create_subprocess_exec(
*bsub_with_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
process_success, process_message = await self._execute_with_retry(
bsub_with_args,
retry_codes=(BSUB_FLAKY_SSH,),
retries=self._bsub_retries,
retry_interval=self._sleep_time_between_cmd_retries,
)
stdout, stderr = await process.communicate()
if process.returncode:
logger.error(
f"Command \"{' '.join(bsub_with_args)}\" failed with "
f"returncode {process.returncode} and error message: "
f"{stderr.decode(errors='ignore') or '<empty>'}"
)
raise RuntimeError(stderr.decode(errors="ignore"))

stdout_decoded = stdout.decode(errors="ignore")
if not process_success:
raise RuntimeError(process_message)

match = re.search("Job <([0-9]+)> is submitted to .+ queue", stdout_decoded)
match = re.search("Job <([0-9]+)> is submitted to .+ queue", process_message)
if match is None:
raise RuntimeError(f"Could not understand '{stdout_decoded}' from bsub")
raise RuntimeError(f"Could not understand '{process_message}' from bsub")
job_id = match[1]
logger.info(f"Realization {iens} accepted by LSF, got id {job_id}")

Expand Down
6 changes: 3 additions & 3 deletions src/ert/scheduler/openpbs_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def __init__(
self._cluster_label: Optional[str] = cluster_label
self._job_prefix = job_prefix
self._num_pbs_cmd_retries = 10
self._retry_pbs_cmd_interval = 2
self._sleep_time_between_cmd_retries = 2

self._jobs: MutableMapping[str, Tuple[int, AnyJob]] = {}
self._iens2jobid: MutableMapping[int, str] = {}
Expand Down Expand Up @@ -190,7 +190,7 @@ async def submit(
QSUB_CONNECTION_REFUSED,
),
retries=self._num_pbs_cmd_retries,
retry_interval=self._retry_pbs_cmd_interval,
retry_interval=self._sleep_time_between_cmd_retries,
driverlogger=logger,
)
if not process_success:
Expand Down Expand Up @@ -229,7 +229,7 @@ async def kill(self, iens: int) -> None:
retry_codes=(QDEL_REQUEST_INVALID,),
accept_codes=(QDEL_JOB_HAS_FINISHED,),
retries=self._num_pbs_cmd_retries,
retry_interval=self._retry_pbs_cmd_interval,
retry_interval=self._sleep_time_between_cmd_retries,
driverlogger=logger,
)
if not process_success:
Expand Down
69 changes: 68 additions & 1 deletion tests/unit_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import stat
from contextlib import ExitStack as does_not_raise
from pathlib import Path
from textwrap import dedent
from typing import Collection, get_args

import pytest
Expand All @@ -11,7 +12,7 @@
from tests.utils import poll

from ert.scheduler import LsfDriver
from ert.scheduler.lsf_driver import JobState, parse_bjobs
from ert.scheduler.lsf_driver import BSUB_FLAKY_SSH, JobState, parse_bjobs

valid_jobstates: Collection[str] = list(get_args(JobState))

Expand Down Expand Up @@ -333,3 +334,69 @@ async def test_faulty_bjobs(monkeypatch, tmp_path, bjobs_script, expectation):
with expectation:
await driver.submit(0, "sleep")
await asyncio.wait_for(poll(driver, {0}), timeout=0.2)


@pytest.mark.parametrize(
("exit_code, error_msg"),
[
(BSUB_FLAKY_SSH, ""),
(199, "Not recognized"),
],
)
async def test_that_bsub_will_retry_and_fail(
monkeypatch, tmp_path, exit_code, error_msg
):
os.chdir(tmp_path)
bin_path = tmp_path / "bin"
bin_path.mkdir()
monkeypatch.setenv("PATH", f"{bin_path}:{os.environ['PATH']}")
bsub_path = bin_path / "bsub"
bsub_path.write_text(f"#!/bin/sh\necho {error_msg} >&2\nexit {exit_code}")
bsub_path.chmod(bsub_path.stat().st_mode | stat.S_IEXEC)
driver = LsfDriver()
driver._bsub_retries = 2
driver._sleep_time_between_cmd_retries = 0.2
match_str = (
f"failed after 2 retries with error {error_msg}"
if exit_code != 199
else "failed with exit code 199 and error message: Not recognized"
)
with pytest.raises(RuntimeError, match=match_str):
await driver.submit(0, "sleep 10")


@pytest.mark.parametrize(
("exit_code, error_msg"),
[
(BSUB_FLAKY_SSH, ""),
],
)
async def test_that_bsub_will_retry_and_succeed(
monkeypatch, tmp_path, exit_code, error_msg
):
os.chdir(tmp_path)
bin_path = tmp_path / "bin"
bin_path.mkdir()
monkeypatch.setenv("PATH", f"{bin_path}:{os.environ['PATH']}")
bsub_path = bin_path / "bsub"
bsub_path.write_text(
"#!/bin/sh"
+ dedent(
f"""
TRY_FILE="{bin_path}/script_try"
if [ -f "$TRY_FILE" ]; then
echo "Job <1> is submitted to normal queue"
exit 0
else
echo "TRIED" > $TRY_FILE
echo "{error_msg}" >&2
exit {exit_code}
fi
"""
)
)
bsub_path.chmod(bsub_path.stat().st_mode | stat.S_IEXEC)
driver = LsfDriver()
driver._bsub_retries = 2
driver._sleep_time_between_cmd_retries = 0.2
await driver.submit(0, "sleep 10")
4 changes: 2 additions & 2 deletions tests/unit_tests/scheduler/test_openpbs_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ async def test_that_qsub_will_retry_and_fail(
qsub_path.chmod(qsub_path.stat().st_mode | stat.S_IEXEC)
driver = OpenPBSDriver()
driver._num_pbs_cmd_retries = 2
driver._retry_pbs_cmd_interval = 0.2
driver._sleep_time_between_cmd_retries = 0.2
match_str = (
f"failed after 2 retries with error {error_msg}"
if exit_code != 199
Expand Down Expand Up @@ -389,7 +389,7 @@ async def test_that_qsub_will_retry_and_succeed(
qsub_path.chmod(qsub_path.stat().st_mode | stat.S_IEXEC)
driver = OpenPBSDriver()
driver._num_pbs_cmd_retries = 2
driver._retry_pbs_cmd_interval = 0.2
driver._sleep_time_between_cmd_retries = 0.2
await driver.submit(0, "sleep 10")


Expand Down

0 comments on commit d7bebaf

Please sign in to comment.