From b312116612901257bd2b91efa31ea4745a0fb84e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Fri, 8 Mar 2024 07:54:38 +0100 Subject: [PATCH] Leave job script on disk for PBS and LSF drivers This will enable more complex script with redirection being possible to submit directly throught the submit() function --- ci/testkomodo.sh | 8 ++++++- src/ert/scheduler/lsf_driver.py | 17 +++++++++++++- src/ert/scheduler/openpbs_driver.py | 22 ++++++++++++++++--- .../scheduler/test_generic_driver.py | 22 ++++++++++++++----- 4 files changed, 58 insertions(+), 11 deletions(-) diff --git a/ci/testkomodo.sh b/ci/testkomodo.sh index 6c74e915e58..25442927d67 100755 --- a/ci/testkomodo.sh +++ b/ci/testkomodo.sh @@ -59,7 +59,13 @@ start_tests () { # Using presence of "bsub" in PATH to detect onprem vs azure if which bsub >/dev/null && basetemp=$(mktemp -d -p ~/pytest-tmp); then - pytest -v --lsf --basetemp="$basetemp" integration_tests/scheduler/test_lsf_driver.py && \ + pytest -v --lsf --basetemp="$basetemp" integration_tests/scheduler && \ + rm -rf "$basetemp" + fi + + if ! which bsub 2>/dev/null && basetemp=$(mktemp -d -p ~/pytest-tmp); then + export _ERT_TESTS_DEFAULT_QUEUE_NAME=permanent + pytest -v --openpbs --basetemp="$basetemp" integration_tests/scheduler && \ rm -rf "$basetemp" fi popd diff --git a/src/ert/scheduler/lsf_driver.py b/src/ert/scheduler/lsf_driver.py index 3d4ac51a259..dd2606f3811 100644 --- a/src/ert/scheduler/lsf_driver.py +++ b/src/ert/scheduler/lsf_driver.py @@ -3,9 +3,11 @@ import asyncio import json import logging +import os import re import shlex import shutil +import stat from pathlib import Path from typing import ( Dict, @@ -106,10 +108,23 @@ async def submit( name: str = "dummy", runpath: Optional[str] = None, ) -> None: + if runpath is None: + runpath = os.getcwd() + arg_queue_name = ["-q", self._queue_name] if self._queue_name else [] + script = ( + "#!/usr/bin/env bash\n" + f"cd {shlex.quote(str(runpath))}\n" + f"exec -a {shlex.quote(executable)} {executable} {shlex.join(args)}\n" + # f"{executable} {shlex.join(args)}\n" + ) + script_path = Path(runpath) / "job_script.sh" + script_path.write_text(script, encoding="utf-8") + script_path.chmod(script_path.stat().st_mode | stat.S_IEXEC) + bsub_with_args: List[str] = ( - [str(self._bsub_cmd)] + arg_queue_name + ["-J", name, executable, *args] + [str(self._bsub_cmd)] + arg_queue_name + ["-J", name, str(script_path)] ) logger.debug(f"Submitting to LSF with command {shlex.join(bsub_with_args)}") process = await asyncio.create_subprocess_exec( diff --git a/src/ert/scheduler/openpbs_driver.py b/src/ert/scheduler/openpbs_driver.py index bd5a48cce77..3afd3d3ec03 100644 --- a/src/ert/scheduler/openpbs_driver.py +++ b/src/ert/scheduler/openpbs_driver.py @@ -2,7 +2,10 @@ import asyncio import logging +import os import shlex +import stat +from pathlib import Path from typing import ( Dict, Iterable, @@ -191,8 +194,13 @@ async def submit( /, *args: str, name: str = "dummy", - runpath: Optional[str] = None, + runpath: Optional[Union[Path, str]] = None, ) -> None: + if runpath is None: + runpath = os.getcwd() + + print(f"{runpath=}") + arg_queue_name = ["-q", self._queue_name] if self._queue_name else [] arg_keep_qsub_output = ( [] if self._keep_qsub_output else "-o /dev/null -e /dev/null".split() @@ -200,6 +208,15 @@ async def submit( resource_string = self._resource_string() arg_resource_string = ["-l", resource_string] if resource_string else [] + script = ( + "#!/usr/bin/env bash\n" + f"cd {shlex.quote(str(runpath))}\n" + f"exec -a {shlex.quote(executable)} {executable} {shlex.join(args)}\n" + # f"{executable} {shlex.join(args)}\n" + ) + script_path = Path(runpath) / "job_script.sh" + script_path.write_text(script, encoding="utf-8") + script_path.chmod(script_path.stat().st_mode | stat.S_IEXEC) name_prefix = self._job_prefix or "" qsub_with_args: List[str] = [ "qsub", @@ -209,8 +226,7 @@ async def submit( *arg_keep_qsub_output, *arg_resource_string, "--", - executable, - *args, + str(script_path), ] logger.debug(f"Submitting to PBS with command {shlex.join(qsub_with_args)}") diff --git a/tests/integration_tests/scheduler/test_generic_driver.py b/tests/integration_tests/scheduler/test_generic_driver.py index 01223aee6e7..94424922dce 100644 --- a/tests/integration_tests/scheduler/test_generic_driver.py +++ b/tests/integration_tests/scheduler/test_generic_driver.py @@ -1,4 +1,6 @@ +import os import signal +import sys import pytest @@ -18,26 +20,34 @@ def driver(request, pytestconfig, monkeypatch, tmp_path): if class_ is OpenPBSDriver and pytestconfig.getoption("openpbs"): # User provided --openpbs, which means we should use the actual OpenPBS # cluster without mocking anything. - pass + if str(tmp_path).startswith("/tmp"): + print( + "Please use --basetemp option to pytest, PBS tests needs a shared disk" + ) + sys.exit(1) elif class_ is LsfDriver and pytestconfig.getoption("lsf"): # User provided --lsf, which means we should use the actual LSF # cluster without mocking anything."" - pass + if str(tmp_path).startswith("/tmp"): + print( + "Please use --basetemp option to pytest, LSF tests needs a shared disk" + ) + sys.exit(1) else: mock_bin(monkeypatch, tmp_path) - return class_() + return class_(queue_name=os.getenv("_ERT_TESTS_DEFAULT_QUEUE_NAME")) @pytest.mark.integration_test async def test_submit(driver, tmp_path): - await driver.submit(0, f"echo test > {tmp_path}/test") + await driver.submit(0, "sh", "-c", f"echo test > {tmp_path}/test") # yay with lsf await poll(driver, {0}) assert (tmp_path / "test").read_text(encoding="utf-8") == "test\n" -async def test_submit_something_that_fails(driver): +async def test_submit_something_that_fails(driver, tmp_path): finished_called = False expected_returncode = 42 @@ -54,7 +64,7 @@ async def finished(iens, returncode, aborted): nonlocal finished_called finished_called = True - await driver.submit(0, f"exit {expected_returncode}") + await driver.submit(0, "sh", "-c", f"exit {expected_returncode}", runpath=tmp_path) await poll(driver, {0}, finished=finished) assert finished_called