Skip to content

Commit

Permalink
Let bsub retry on identified SSH failure
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Mar 14, 2024
1 parent fb10128 commit 3834dd8
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 22 deletions.
29 changes: 12 additions & 17 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class RunningJob(BaseModel):
}

LSF_INFO_JSON_FILENAME = "lsf_info.json"
BSUB_FLAKY_SSH = 255


class _Stat(BaseModel):
Expand Down Expand Up @@ -114,7 +115,8 @@ def __init__(
self._jobs: MutableMapping[str, Tuple[int, AnyJob]] = {}
self._iens2jobid: MutableMapping[int, str] = {}
self._max_attempt: int = 100
self._retry_sleep_period = 3
self._sleep_time_between_cmd_retries = 3
self._bsub_retries = 10

self._poll_period = _POLL_PERIOD

Expand Down Expand Up @@ -157,25 +159,18 @@ async def submit(
+ ["-J", name, str(script_path), str(runpath)]
)
logger.debug(f"Submitting to LSF with command {shlex.join(bsub_with_args)}")
process = await asyncio.create_subprocess_exec(
*bsub_with_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
process_success, process_message = await self._execute_with_retry(
bsub_with_args,
retry_codes=(BSUB_FLAKY_SSH,),
retries=self._bsub_retries,
retry_interval=self._sleep_time_between_cmd_retries,
)
stdout, stderr = await process.communicate()
if process.returncode:
logger.error(
f"Command \"{' '.join(bsub_with_args)}\" failed with "
f"returncode {process.returncode} and error message: "
f"{stderr.decode(errors='ignore') or '<empty>'}"
)
raise RuntimeError(stderr.decode(errors="ignore"))

stdout_decoded = stdout.decode(errors="ignore")
if not process_success:
raise RuntimeError(process_message)

match = re.search("Job <([0-9]+)> is submitted to .+ queue", stdout_decoded)
match = re.search("Job <([0-9]+)> is submitted to .+ queue", process_message)
if match is None:
raise RuntimeError(f"Could not understand '{stdout_decoded}' from bsub")
raise RuntimeError(f"Could not understand '{process_message}' from bsub")
job_id = match[1]
logger.info(f"Realization {iens} accepted by LSF, got id {job_id}")

Expand Down
6 changes: 3 additions & 3 deletions src/ert/scheduler/openpbs_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def __init__(
self._cluster_label: Optional[str] = cluster_label
self._job_prefix = job_prefix
self._num_pbs_cmd_retries = 10
self._retry_pbs_cmd_interval = 2
self._sleep_time_between_cmd_retries = 2

self._jobs: MutableMapping[str, Tuple[int, AnyJob]] = {}
self._iens2jobid: MutableMapping[int, str] = {}
Expand Down Expand Up @@ -197,7 +197,7 @@ async def submit(
),
stdin=script.encode(encoding="utf-8"),
retries=self._num_pbs_cmd_retries,
retry_interval=self._retry_pbs_cmd_interval,
retry_interval=self._sleep_time_between_cmd_retries,
driverlogger=logger,
)
if not process_success:
Expand Down Expand Up @@ -236,7 +236,7 @@ async def kill(self, iens: int) -> None:
retry_codes=(QDEL_REQUEST_INVALID,),
accept_codes=(QDEL_JOB_HAS_FINISHED,),
retries=self._num_pbs_cmd_retries,
retry_interval=self._retry_pbs_cmd_interval,
retry_interval=self._sleep_time_between_cmd_retries,
driverlogger=logger,
)
if not process_success:
Expand Down
68 changes: 68 additions & 0 deletions tests/unit_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import stat
from contextlib import ExitStack as does_not_raise
from pathlib import Path
from textwrap import dedent
from typing import Collection, List, get_args

import pytest
Expand All @@ -12,6 +13,7 @@

from ert.scheduler import LsfDriver
from ert.scheduler.lsf_driver import (
BSUB_FLAKY_SSH,
FinishedEvent,
FinishedJobFailure,
FinishedJobSuccess,
Expand Down Expand Up @@ -411,3 +413,69 @@ async def test_faulty_bjobs(monkeypatch, tmp_path, bjobs_script, expectation):
with expectation:
await driver.submit(0, "sleep")
await asyncio.wait_for(poll(driver, {0}), timeout=0.2)


@pytest.mark.parametrize(
("exit_code, error_msg"),
[
(BSUB_FLAKY_SSH, ""),
(199, "Not recognized"),
],
)
async def test_that_bsub_will_retry_and_fail(
monkeypatch, tmp_path, exit_code, error_msg
):
os.chdir(tmp_path)
bin_path = tmp_path / "bin"
bin_path.mkdir()
monkeypatch.setenv("PATH", f"{bin_path}:{os.environ['PATH']}")
bsub_path = bin_path / "bsub"
bsub_path.write_text(f"#!/bin/sh\necho {error_msg} >&2\nexit {exit_code}")
bsub_path.chmod(bsub_path.stat().st_mode | stat.S_IEXEC)
driver = LsfDriver()
driver._bsub_retries = 2
driver._sleep_time_between_cmd_retries = 0.2
match_str = (
f"failed after 2 retries with error {error_msg}"
if exit_code != 199
else "failed with exit code 199 and error message: Not recognized"
)
with pytest.raises(RuntimeError, match=match_str):
await driver.submit(0, "sleep 10")


@pytest.mark.parametrize(
("exit_code, error_msg"),
[
(BSUB_FLAKY_SSH, ""),
],
)
async def test_that_bsub_will_retry_and_succeed(
monkeypatch, tmp_path, exit_code, error_msg
):
os.chdir(tmp_path)
bin_path = tmp_path / "bin"
bin_path.mkdir()
monkeypatch.setenv("PATH", f"{bin_path}:{os.environ['PATH']}")
bsub_path = bin_path / "bsub"
bsub_path.write_text(
"#!/bin/sh"
+ dedent(
f"""
TRY_FILE="{bin_path}/script_try"
if [ -f "$TRY_FILE" ]; then
echo "Job <1> is submitted to normal queue"
exit 0
else
echo "TRIED" > $TRY_FILE
echo "{error_msg}" >&2
exit {exit_code}
fi
"""
)
)
bsub_path.chmod(bsub_path.stat().st_mode | stat.S_IEXEC)
driver = LsfDriver()
driver._bsub_retries = 2
driver._sleep_time_between_cmd_retries = 0.2
await driver.submit(0, "sleep 10")
4 changes: 2 additions & 2 deletions tests/unit_tests/scheduler/test_openpbs_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ async def test_that_qsub_will_retry_and_fail(
qsub_path.chmod(qsub_path.stat().st_mode | stat.S_IEXEC)
driver = OpenPBSDriver()
driver._num_pbs_cmd_retries = 2
driver._retry_pbs_cmd_interval = 0.2
driver._sleep_time_between_cmd_retries = 0.2
match_str = (
f"failed after 2 retries with error {error_msg}"
if exit_code != 199
Expand Down Expand Up @@ -389,7 +389,7 @@ async def test_that_qsub_will_retry_and_succeed(
qsub_path.chmod(qsub_path.stat().st_mode | stat.S_IEXEC)
driver = OpenPBSDriver()
driver._num_pbs_cmd_retries = 2
driver._retry_pbs_cmd_interval = 0.2
driver._sleep_time_between_cmd_retries = 0.2
await driver.submit(0, "sleep 10")


Expand Down

0 comments on commit 3834dd8

Please sign in to comment.