Skip to content

Commit

Permalink
Let PBS and LSF make explicit job scripts for test robustness
Browse files Browse the repository at this point in the history
For OpenPBS the explicit job script only exists in memory and
is provided to qsub via stdin.

For LSF, the bsub command seemingly cannot read stdin, and the
job script is placed on runpath with a unique filename that
will be left behind.

This enables more complex jobs (e.g. redirection) submitted to the
drivers submit() function.

Align Python LSF driver with C-driver in supplying the runpath as the
last argument. This is needed to pass a legacy test. Note that legacy
LSF and legacy Torque drivers differ in this behaviour, the latter does
not supply runpath.

The type of the runpath object supplied is changed from str to
pathlib.Path
  • Loading branch information
berland committed Mar 13, 2024
1 parent 2a451b8 commit 22ce428
Show file tree
Hide file tree
Showing 12 changed files with 114 additions and 51 deletions.
9 changes: 7 additions & 2 deletions ci/testkomodo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,13 @@ start_tests () {

# Using presence of "bsub" in PATH to detect onprem vs azure
if which bsub >/dev/null && basetemp=$(mktemp -d -p ~/pytest-tmp); then
pytest -v --lsf --basetemp="$basetemp" integration_tests/scheduler/test_lsf_driver.py && \
rm -rf "$basetemp"
pytest -v --lsf --basetemp="$basetemp" integration_tests/scheduler && \
rm -rf "$basetemp"
fi
if ! which bsub 2>/dev/null && basetemp=$(mktemp -d -p ~/pytest-tmp); then
export _ERT_TESTS_DEFAULT_QUEUE_NAME=permanent
pytest -v --openpbs --basetemp="$basetemp" integration_tests/scheduler && \
rm -rf "$basetemp"
fi
popd

Expand Down
3 changes: 2 additions & 1 deletion src/ert/scheduler/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, Optional

from ert.scheduler.event import Event
Expand All @@ -27,7 +28,7 @@ async def submit(
/,
*args: str,
name: str = "dummy",
runpath: Optional[str] = None,
runpath: Optional[Path] = None,
) -> None:
"""Submit a program to execute on the cluster.
Expand Down
2 changes: 1 addition & 1 deletion src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None:
self.real.job_script,
self.real.run_arg.runpath,
name=self.real.run_arg.job_name,
runpath=self.real.run_arg.runpath,
runpath=Path(self.real.run_arg.runpath),
)

await self._send(State.PENDING)
Expand Down
3 changes: 2 additions & 1 deletion src/ert/scheduler/local_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import os
from asyncio.subprocess import Process
from pathlib import Path
from typing import MutableMapping, Optional

from ert.scheduler.driver import Driver
Expand All @@ -23,7 +24,7 @@ async def submit(
/,
*args: str,
name: str = "dummy",
runpath: Optional[str] = None,
runpath: Optional[Path] = None,
) -> None:
await self.kill(iens)
self._tasks[iens] = asyncio.create_task(self._run(iens, executable, *args))
Expand Down
37 changes: 31 additions & 6 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import re
import shlex
import shutil
import stat
import tempfile
from pathlib import Path
from typing import (
Dict,
Expand Down Expand Up @@ -123,12 +125,36 @@ async def submit(
/,
*args: str,
name: str = "dummy",
runpath: Optional[str] = None,
runpath: Optional[Path] = None,
) -> None:
if runpath is None:
runpath = Path.cwd()

arg_queue_name = ["-q", self._queue_name] if self._queue_name else []

script = (
"#!/usr/bin/env bash\n"
f"cd {shlex.quote(str(runpath))}\n"
f"exec -a {shlex.quote(executable)} {executable} {shlex.join(args)}\n"
)
script_path: Optional[Path] = None
with tempfile.NamedTemporaryFile(
dir=runpath,
prefix=".lsf_submit_",
suffix=".sh",
mode="w",
encoding="utf-8",
delete=False,
) as script_handle:
script_handle.write(script)
script_path = Path(script_handle.name)
assert script_path is not None
script_path.chmod(script_path.stat().st_mode | stat.S_IEXEC)

bsub_with_args: List[str] = (
[str(self._bsub_cmd)] + arg_queue_name + ["-J", name, executable, *args]
[str(self._bsub_cmd)]
+ arg_queue_name
+ ["-J", name, str(script_path), str(runpath)]
)
logger.debug(f"Submitting to LSF with command {shlex.join(bsub_with_args)}")
process = await asyncio.create_subprocess_exec(
Expand All @@ -153,10 +179,9 @@ async def submit(
job_id = match[1]
logger.info(f"Realization {iens} accepted by LSF, got id {job_id}")

if runpath is not None:
(Path(runpath) / LSF_INFO_JSON_FILENAME).write_text(
json.dumps({"job_id": job_id}), encoding="utf-8"
)
(Path(runpath) / LSF_INFO_JSON_FILENAME).write_text(
json.dumps({"job_id": job_id}), encoding="utf-8"
)
self._jobs[job_id] = (iens, QueuedJob(job_state="PEND"))
self._iens2jobid[iens] = job_id

Expand Down
20 changes: 14 additions & 6 deletions src/ert/scheduler/openpbs_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import logging
import shlex
from pathlib import Path
from typing import (
Dict,
Iterable,
Expand Down Expand Up @@ -160,17 +161,18 @@ async def _execute_with_retry(
cmd_with_args: List[str],
retry_codes: Iterable[int] = (),
accept_codes: Iterable[int] = (),
stdin: Optional[bytes] = None,
) -> Tuple[bool, str]:
error_message: Optional[str] = None

for _ in range(self._num_pbs_cmd_retries):
process = await asyncio.create_subprocess_exec(
*cmd_with_args,
stdin=asyncio.subprocess.PIPE if stdin else None,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()

stdout, stderr = await process.communicate(stdin)
assert process.returncode is not None
if process.returncode == 0:
return True, stdout.decode(errors="ignore").strip()
Expand Down Expand Up @@ -202,13 +204,21 @@ async def submit(
/,
*args: str,
name: str = "dummy",
runpath: Optional[str] = None,
runpath: Optional[Path] = None,
) -> None:
if runpath is None:
runpath = Path.cwd()

arg_queue_name = ["-q", self._queue_name] if self._queue_name else []
arg_keep_qsub_output = (
[] if self._keep_qsub_output else "-o /dev/null -e /dev/null".split()
)

script = (
"#!/usr/bin/env bash\n"
f"cd {shlex.quote(str(runpath))}\n"
f"exec -a {shlex.quote(executable)} {executable} {shlex.join(args)}\n"
)
name_prefix = self._job_prefix or ""
qsub_with_args: List[str] = [
"qsub",
Expand All @@ -217,9 +227,6 @@ async def submit(
*arg_queue_name,
*arg_keep_qsub_output,
*self._resources,
"--",
executable,
*args,
]
logger.debug(f"Submitting to PBS with command {shlex.join(qsub_with_args)}")

Expand All @@ -230,6 +237,7 @@ async def submit(
QSUB_PREMATURE_END_OF_MESSAGE,
QSUB_CONNECTION_REFUSED,
),
stdin=script.encode(encoding="utf-8"),
)
if not process_success:
raise RuntimeError(process_message)
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/scheduler/bin/qsub
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobdir="${PYTEST_TMP_PATH:-.}/mock_jobs"
jobid="test${RANDOM}.localhost"

mkdir -p "${PYTEST_TMP_PATH:-.}/mock_jobs"
echo $@ > "${jobdir}/${jobid}.script"
cat <&0 > "${jobdir}/${jobid}.script"
echo "$name" > "${PYTEST_TMP_PATH:-.}/mock_jobs/${jobid}.name"


Expand Down
25 changes: 19 additions & 6 deletions tests/integration_tests/scheduler/test_generic_driver.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
import signal
import sys

import pytest

Expand All @@ -12,32 +14,43 @@
@pytest.fixture(params=[LsfDriver, OpenPBSDriver])
def driver(request, pytestconfig, monkeypatch, tmp_path):
class_ = request.param
queue_name = None

# It's not possible to dynamically choose a pytest fixture in a fixture, so
# we copy some code here
if class_ is OpenPBSDriver and pytestconfig.getoption("openpbs"):
# User provided --openpbs, which means we should use the actual OpenPBS
# cluster without mocking anything.
pass
if str(tmp_path).startswith("/tmp"):
print(
"Please use --basetemp option to pytest, PBS tests needs a shared disk"
)
sys.exit(1)
queue_name = os.getenv("_ERT_TESTS_DEFAULT_QUEUE_NAME")
elif class_ is LsfDriver and pytestconfig.getoption("lsf"):
# User provided --lsf, which means we should use the actual LSF
# cluster without mocking anything.""
pass
if str(tmp_path).startswith("/tmp"):
print(
"Please use --basetemp option to pytest, "
"the real LSF cluster needs a shared disk"
)
sys.exit(1)
else:
mock_bin(monkeypatch, tmp_path)

return class_()
return class_(queue_name=queue_name)


@pytest.mark.integration_test
async def test_submit(driver, tmp_path):
await driver.submit(0, f"echo test > {tmp_path}/test")
await driver.submit(0, "sh", "-c", f"echo test > {tmp_path}/test")
await poll(driver, {0})

assert (tmp_path / "test").read_text(encoding="utf-8") == "test\n"


async def test_submit_something_that_fails(driver):
async def test_submit_something_that_fails(driver, tmp_path):
finished_called = False

expected_returncode = 42
Expand All @@ -54,7 +67,7 @@ async def finished(iens, returncode, aborted):
nonlocal finished_called
finished_called = True

await driver.submit(0, f"exit {expected_returncode}")
await driver.submit(0, "sh", "-c", f"exit {expected_returncode}", runpath=tmp_path)
await poll(driver, {0}, finished=finished)

assert finished_called
Expand Down
33 changes: 17 additions & 16 deletions tests/integration_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import json
import os
from pathlib import Path

import pytest

Expand All @@ -20,30 +19,32 @@ def mock_lsf(pytestconfig, monkeypatch, tmp_path):
mock_bin(monkeypatch, tmp_path)


@pytest.mark.parametrize("runpath_supplied", [(True), (False)])
async def test_lsf_info_file_in_runpath(runpath_supplied, tmp_path):
@pytest.mark.parametrize("explicit_runpath", [(True), (False)])
async def test_lsf_info_file_in_runpath(explicit_runpath, tmp_path):
driver = LsfDriver()
(tmp_path / "some_runpath").mkdir()
os.chdir(tmp_path)
if runpath_supplied:
await driver.submit(0, "exit 0", runpath=str(tmp_path))
else:
await driver.submit(0, "exit 0")
effective_runpath = tmp_path / "some_runpath" if explicit_runpath else tmp_path
await driver.submit(
0,
"sh",
"-c",
"exit 0",
runpath=tmp_path / "some_runpath" if explicit_runpath else None,
)

await poll(driver, {0})

if runpath_supplied:
assert json.loads(
(tmp_path / "lsf_info.json").read_text(encoding="utf-8")
).keys() == {"job_id"}

else:
assert not Path("lsf_info.json").exists()
effective_runpath = tmp_path / "some_runpath" if explicit_runpath else tmp_path
assert json.loads(
(effective_runpath / "lsf_info.json").read_text(encoding="utf-8")
).keys() == {"job_id"}


async def test_job_name():
driver = LsfDriver()
iens: int = 0
await driver.submit(iens, "sleep 99", name="my_job")
await driver.submit(iens, "sh", "-c", "sleep 99", name="my_job")
jobid = driver._iens2jobid[iens]
bjobs_process = await asyncio.create_subprocess_exec(
"bjobs",
Expand Down Expand Up @@ -77,5 +78,5 @@ async def finished(iens, returncode, aborted):
assert returncode == returncode_that_ert_sees
assert aborted == (returncode_that_ert_sees != 0)

await driver.submit(0, f"exit {actual_returncode}")
await driver.submit(0, "sh", "-c", f"exit {actual_returncode}")
await poll(driver, {0}, finished=finished)
20 changes: 14 additions & 6 deletions tests/integration_tests/scheduler/test_openpbs_driver.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from functools import partial

import pytest
Expand All @@ -19,12 +20,19 @@ def mock_openpbs(pytestconfig, monkeypatch, tmp_path):
mock_bin(monkeypatch, tmp_path)


@pytest.mark.timeout(30)
@pytest.fixture()
def queue_name_config():
if queue_name := os.getenv("_ERT_TESTS_DEFAULT_QUEUE_NAME"):
return f"\nQUEUE_OPTION TORQUE QUEUE {queue_name}"
return ""


@pytest.mark.integration_test
@pytest.mark.usefixtures("copy_poly_case")
def test_openpbs_driver_with_poly_example():
def test_openpbs_driver_with_poly_example(queue_name_config):
with open("poly.ert", mode="a+", encoding="utf-8") as f:
f.write("QUEUE_SYSTEM TORQUE\nNUM_REALIZATIONS 2")
f.write(queue_name_config)
run_cli(
ENSEMBLE_EXPERIMENT_MODE,
"--enable-scheduler",
Expand All @@ -36,17 +44,17 @@ async def mock_failure(message, *args, **kwargs):
raise RuntimeError(message)


@pytest.mark.timeout(30)
@pytest.mark.integration_test
@pytest.mark.usefixtures("copy_poly_case")
def test_openpbs_driver_with_poly_example_failing_submit_fails_ert_and_propagates_exception_to_user(
monkeypatch, caplog
monkeypatch, caplog, queue_name_config
):
monkeypatch.setattr(
OpenPBSDriver, "submit", partial(mock_failure, "Submit job failed")
)
with open("poly.ert", mode="a+", encoding="utf-8") as f:
f.write("QUEUE_SYSTEM TORQUE\nNUM_REALIZATIONS 2")
f.write(queue_name_config)
with pytest.raises(ErtCliError):
run_cli(
ENSEMBLE_EXPERIMENT_MODE,
Expand All @@ -56,17 +64,17 @@ def test_openpbs_driver_with_poly_example_failing_submit_fails_ert_and_propagate
assert "RuntimeError: Submit job failed" in caplog.text


@pytest.mark.timeout(30)
@pytest.mark.integration_test
@pytest.mark.usefixtures("copy_poly_case")
def test_openpbs_driver_with_poly_example_failing_poll_fails_ert_and_propagates_exception_to_user(
monkeypatch, caplog
monkeypatch, caplog, queue_name_config
):
monkeypatch.setattr(
OpenPBSDriver, "poll", partial(mock_failure, "Status polling failed")
)
with open("poly.ert", mode="a+", encoding="utf-8") as f:
f.write("QUEUE_SYSTEM TORQUE\nNUM_REALIZATIONS 2")
f.write(queue_name_config)
with pytest.raises(ErtCliError):
run_cli(
ENSEMBLE_EXPERIMENT_MODE,
Expand Down
Loading

0 comments on commit 22ce428

Please sign in to comment.