Skip to content

Commit

Permalink
Leave job script on disk for PBS and LSF drivers
Browse files Browse the repository at this point in the history
This will enable more complex script with redirection being
possible to submit directly throught the submit() function
  • Loading branch information
berland committed Mar 8, 2024
1 parent 006499c commit b312116
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 11 deletions.
8 changes: 7 additions & 1 deletion ci/testkomodo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@ start_tests () {

# Using presence of "bsub" in PATH to detect onprem vs azure
if which bsub >/dev/null && basetemp=$(mktemp -d -p ~/pytest-tmp); then
pytest -v --lsf --basetemp="$basetemp" integration_tests/scheduler/test_lsf_driver.py && \
pytest -v --lsf --basetemp="$basetemp" integration_tests/scheduler && \
rm -rf "$basetemp"
fi

if ! which bsub 2>/dev/null && basetemp=$(mktemp -d -p ~/pytest-tmp); then
export _ERT_TESTS_DEFAULT_QUEUE_NAME=permanent
pytest -v --openpbs --basetemp="$basetemp" integration_tests/scheduler && \
rm -rf "$basetemp"
fi
popd
Expand Down
17 changes: 16 additions & 1 deletion src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import asyncio
import json
import logging
import os
import re
import shlex
import shutil
import stat
from pathlib import Path
from typing import (
Dict,
Expand Down Expand Up @@ -106,10 +108,23 @@ async def submit(
name: str = "dummy",
runpath: Optional[str] = None,
) -> None:
if runpath is None:
runpath = os.getcwd()

arg_queue_name = ["-q", self._queue_name] if self._queue_name else []

script = (
"#!/usr/bin/env bash\n"
f"cd {shlex.quote(str(runpath))}\n"
f"exec -a {shlex.quote(executable)} {executable} {shlex.join(args)}\n"
# f"{executable} {shlex.join(args)}\n"
)
script_path = Path(runpath) / "job_script.sh"
script_path.write_text(script, encoding="utf-8")
script_path.chmod(script_path.stat().st_mode | stat.S_IEXEC)

bsub_with_args: List[str] = (
[str(self._bsub_cmd)] + arg_queue_name + ["-J", name, executable, *args]
[str(self._bsub_cmd)] + arg_queue_name + ["-J", name, str(script_path)]
)
logger.debug(f"Submitting to LSF with command {shlex.join(bsub_with_args)}")
process = await asyncio.create_subprocess_exec(
Expand Down
22 changes: 19 additions & 3 deletions src/ert/scheduler/openpbs_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

import asyncio
import logging
import os
import shlex
import stat
from pathlib import Path
from typing import (
Dict,
Iterable,
Expand Down Expand Up @@ -191,15 +194,29 @@ async def submit(
/,
*args: str,
name: str = "dummy",
runpath: Optional[str] = None,
runpath: Optional[Union[Path, str]] = None,
) -> None:
if runpath is None:
runpath = os.getcwd()

print(f"{runpath=}")

arg_queue_name = ["-q", self._queue_name] if self._queue_name else []
arg_keep_qsub_output = (
[] if self._keep_qsub_output else "-o /dev/null -e /dev/null".split()
)
resource_string = self._resource_string()
arg_resource_string = ["-l", resource_string] if resource_string else []

script = (
"#!/usr/bin/env bash\n"
f"cd {shlex.quote(str(runpath))}\n"
f"exec -a {shlex.quote(executable)} {executable} {shlex.join(args)}\n"
# f"{executable} {shlex.join(args)}\n"
)
script_path = Path(runpath) / "job_script.sh"
script_path.write_text(script, encoding="utf-8")
script_path.chmod(script_path.stat().st_mode | stat.S_IEXEC)
name_prefix = self._job_prefix or ""
qsub_with_args: List[str] = [
"qsub",
Expand All @@ -209,8 +226,7 @@ async def submit(
*arg_keep_qsub_output,
*arg_resource_string,
"--",
executable,
*args,
str(script_path),
]
logger.debug(f"Submitting to PBS with command {shlex.join(qsub_with_args)}")

Expand Down
22 changes: 16 additions & 6 deletions tests/integration_tests/scheduler/test_generic_driver.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
import signal
import sys

import pytest

Expand All @@ -18,26 +20,34 @@ def driver(request, pytestconfig, monkeypatch, tmp_path):
if class_ is OpenPBSDriver and pytestconfig.getoption("openpbs"):
# User provided --openpbs, which means we should use the actual OpenPBS
# cluster without mocking anything.
pass
if str(tmp_path).startswith("/tmp"):
print(
"Please use --basetemp option to pytest, PBS tests needs a shared disk"
)
sys.exit(1)
elif class_ is LsfDriver and pytestconfig.getoption("lsf"):
# User provided --lsf, which means we should use the actual LSF
# cluster without mocking anything.""
pass
if str(tmp_path).startswith("/tmp"):
print(
"Please use --basetemp option to pytest, LSF tests needs a shared disk"
)
sys.exit(1)
else:
mock_bin(monkeypatch, tmp_path)

return class_()
return class_(queue_name=os.getenv("_ERT_TESTS_DEFAULT_QUEUE_NAME"))


@pytest.mark.integration_test
async def test_submit(driver, tmp_path):
await driver.submit(0, f"echo test > {tmp_path}/test")
await driver.submit(0, "sh", "-c", f"echo test > {tmp_path}/test") # yay with lsf
await poll(driver, {0})

assert (tmp_path / "test").read_text(encoding="utf-8") == "test\n"


async def test_submit_something_that_fails(driver):
async def test_submit_something_that_fails(driver, tmp_path):
finished_called = False

expected_returncode = 42
Expand All @@ -54,7 +64,7 @@ async def finished(iens, returncode, aborted):
nonlocal finished_called
finished_called = True

await driver.submit(0, f"exit {expected_returncode}")
await driver.submit(0, "sh", "-c", f"exit {expected_returncode}", runpath=tmp_path)
await poll(driver, {0}, finished=finished)

assert finished_called
Expand Down

0 comments on commit b312116

Please sign in to comment.