Skip to content

Commit

Permalink
Combine OpenPBS & LSF integration tests
Browse files Browse the repository at this point in the history
There is a lot of overlap between the two drivers, so we combine some of
the generic integration tests into a single parameterised pytest.
  • Loading branch information
pinkwah committed Mar 4, 2024
1 parent c385d31 commit ae1bdd5
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 186 deletions.
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,10 @@ def pytest_addoption(parser):
help="Defaults to not running tests that require eclipse.",
)
parser.addoption(
"--torque",
"--openpbs",
action="store_true",
default=False,
help="Run TORQUE tests against the real cluster",
help="Run OpenPBS tests against the real cluster",
)
parser.addoption(
"--lsf",
Expand Down
13 changes: 13 additions & 0 deletions tests/integration_tests/scheduler/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from __future__ import annotations

import os
import sys
from pathlib import Path


def mock_bin(monkeypatch, tmp_path):
bin_path = Path(__file__).parent / "bin"

monkeypatch.setenv("PATH", f"{bin_path}:{os.environ['PATH']}")
monkeypatch.setenv("PYTEST_TMP_PATH", str(tmp_path))
monkeypatch.setenv("PYTHON", sys.executable)
84 changes: 84 additions & 0 deletions tests/integration_tests/scheduler/test_generic_driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import signal

import pytest

from ert.scheduler.lsf_driver import LsfDriver
from ert.scheduler.openpbs_driver import OpenPBSDriver
from tests.utils import poll

from .conftest import mock_bin


@pytest.fixture(params=[LsfDriver, OpenPBSDriver])
def driver(request, pytestconfig, monkeypatch, tmp_path):
class_ = request.param

# It's not possible to dynamically choose a pytest fixture in a fixture, so
# we copy some code here
if class_ is OpenPBSDriver and pytestconfig.getoption("openpbs"):
# User provided --openpbs, which means we should use the actual OpenPBS
# cluster without mocking anything.
pass
elif class_ is LsfDriver and pytestconfig.getoption("lsf"):
# User provided --lsf, which means we should use the actual LSF
# cluster without mocking anything.""
pass
else:
mock_bin(monkeypatch, tmp_path)

return class_()


@pytest.mark.integration_test
async def test_submit(driver, tmp_path):
await driver.submit(0, f"echo test > {tmp_path}/test")
await poll(driver, {0})

assert (tmp_path / "test").read_text(encoding="utf-8") == "test\n"


async def test_submit_something_that_fails(driver):
finished_called = False

expected_returncode = 42
if isinstance(driver, LsfDriver):
expected_returncode = 1

async def finished(iens, returncode, aborted):
assert iens == 0
assert returncode == expected_returncode

if isinstance(driver, LsfDriver):
assert aborted is True

nonlocal finished_called
finished_called = True

await driver.submit(0, f"exit {expected_returncode}")
await poll(driver, {0}, finished=finished)

assert finished_called


async def test_kill(driver):
aborted_called = False

expected_returncode = 1
if isinstance(driver, OpenPBSDriver):
expected_returncode = 256 + signal.SIGTERM

async def started(iens):
nonlocal driver
await driver.kill(iens)

async def finished(iens, returncode, aborted):
assert iens == 0
assert returncode == expected_returncode
assert aborted is True

nonlocal aborted_called
aborted_called = True

await driver.submit(0, "sleep 3; exit 2")
await poll(driver, {0}, started=started, finished=finished)
assert aborted_called
81 changes: 5 additions & 76 deletions tests/integration_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import asyncio
import json
import os
import sys
from pathlib import Path
from typing import Set

import pytest

from ert.scheduler import Driver, LsfDriver
from ert.scheduler.event import FinishedEvent, StartedEvent
from ert.scheduler import LsfDriver
from tests.utils import poll

from .conftest import mock_bin


@pytest.fixture(autouse=True)
Expand All @@ -17,78 +17,7 @@ def mock_lsf(pytestconfig, monkeypatch, tmp_path):
# User provided --lsf, which means we should use the actual LSF
# cluster without mocking anything.""
return

bin_path = Path(__file__).parent / "bin"

monkeypatch.setenv("PATH", f"{bin_path}:{os.environ['PATH']}")
monkeypatch.setenv("PYTEST_TMP_PATH", str(tmp_path))
monkeypatch.setenv("PYTHON", sys.executable)


async def poll(driver: Driver, expected: Set[int], *, started=None, finished=None):
poll_task = asyncio.create_task(driver.poll())
completed = set()
try:
while True:
event = await driver.event_queue.get()
if isinstance(event, StartedEvent):
if started:
await started(event.iens)
elif isinstance(event, FinishedEvent):
if finished is not None:
await finished(event.iens, event.returncode, event.aborted)
completed.add(event.iens)
if completed == expected:
break
finally:
poll_task.cancel()


@pytest.mark.integration_test
async def test_submit(tmp_path):
driver = LsfDriver()
await driver.submit(0, f"echo test > {tmp_path}/test")
await poll(driver, {0})

assert (tmp_path / "test").read_text(encoding="utf-8") == "test\n"


async def test_submit_something_that_fails():
driver = LsfDriver()
finished_called = False

async def finished(iens, returncode, aborted):
assert iens == 0
assert returncode == 1
assert aborted is True
nonlocal finished_called
finished_called = True

await driver.submit(0, "exit 1")
await poll(driver, {0}, finished=finished)

assert finished_called


async def test_kill():
driver = LsfDriver()
aborted_called = False

async def started(iens):
nonlocal driver
await driver.kill(iens)

async def finished(iens, returncode, aborted):
assert iens == 0
assert returncode == 1 # LSF cant get returncodes
assert aborted is True

nonlocal aborted_called
aborted_called = True

await driver.submit(0, "sleep 3")
await poll(driver, {0}, started=started, finished=finished)
assert aborted_called
mock_bin(monkeypatch, tmp_path)


@pytest.mark.parametrize("runpath_supplied", [(True), (False)])
Expand Down
93 changes: 6 additions & 87 deletions tests/integration_tests/scheduler/test_openpbs_driver.py
Original file line number Diff line number Diff line change
@@ -1,99 +1,18 @@
import asyncio
import os
import signal
import sys
from typing import Set

import pytest

from ert.cli import ENSEMBLE_EXPERIMENT_MODE
from ert.scheduler.event import FinishedEvent, StartedEvent
from ert.scheduler.openpbs_driver import Driver, OpenPBSDriver
from tests.integration_tests.run_cli import run_cli

from .conftest import mock_bin


@pytest.fixture(autouse=True)
def mock_torque(pytestconfig, monkeypatch, tmp_path):
if pytestconfig.getoption("torque"):
# User provided --torque, which means we should use the actual TORQUE
def mock_openpbs(pytestconfig, monkeypatch, tmp_path):
if pytestconfig.getoption("openpbs"):
# User provided --openpbs, which means we should use the actual OpenPBS
# cluster without mocking anything.
return

bin_path = os.path.join(os.path.dirname(__file__), "bin")

monkeypatch.setenv("PATH", f"{bin_path}:{os.environ['PATH']}")
monkeypatch.setenv("PYTEST_TMP_PATH", str(tmp_path))
monkeypatch.setenv("PYTHON", sys.executable)


async def poll(driver: Driver, expected: Set[int], *, started=None, finished=None):
poll_task = asyncio.create_task(driver.poll())
completed = set()
try:
while True:
event = await driver.event_queue.get()
if isinstance(event, StartedEvent):
if started:
await started(event.iens)

elif isinstance(event, FinishedEvent):
if finished is not None:
await finished(event.iens, event.returncode, event.aborted)

completed.add(event.iens)
if completed == expected:
break
finally:
poll_task.cancel()


@pytest.mark.integration_test
async def test_submit(tmp_path):
driver = OpenPBSDriver()
await driver.submit(0, f"echo test > {tmp_path}/test")
await poll(driver, {0})

assert (tmp_path / "test").read_text() == "test\n"


@pytest.mark.integration_test
async def test_returncode():
driver = OpenPBSDriver()
finished_called = False

async def finished(iens, returncode, aborted):
assert iens == 0
assert returncode == 42
assert aborted is False

nonlocal finished_called
finished_called = True

await driver.submit(0, "exit 42")
await poll(driver, {0}, finished=finished)
assert finished_called


@pytest.mark.integration_test
async def test_kill():
driver = OpenPBSDriver()
aborted_called = False

async def started(iens):
nonlocal driver
await driver.kill(iens)

async def finished(iens, returncode, aborted):
assert iens == 0
assert returncode == 256 + signal.SIGTERM
assert aborted is True

nonlocal aborted_called
aborted_called = True

await driver.submit(0, "sleep 60; exit 2")
await poll(driver, {0}, started=started, finished=finished)
assert aborted_called
mock_bin(monkeypatch, tmp_path)


@pytest.mark.timeout(180)
Expand Down
23 changes: 2 additions & 21 deletions tests/unit_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import stat
from contextlib import ExitStack as does_not_raise
from pathlib import Path
from typing import Collection, Set, get_args
from typing import Collection, get_args

import pytest
from hypothesis import given
from hypothesis import strategies as st
from tests.utils import poll

from ert.scheduler import LsfDriver
from ert.scheduler.event import FinishedEvent, StartedEvent
from ert.scheduler.lsf_driver import JobState, parse_bjobs

valid_jobstates: Collection[str] = list(get_args(JobState))
Expand Down Expand Up @@ -265,25 +265,6 @@ def test_parse_bjobs_invalid_state_is_logged(caplog):
)


async def poll(driver: LsfDriver, expected: Set[int], *, started=None, finished=None):
poll_task = asyncio.create_task(driver.poll())
completed = set()
try:
while True:
event = await driver.event_queue.get()
if isinstance(event, StartedEvent):
if started:
await started(event.iens)
elif isinstance(event, FinishedEvent):
if finished is not None:
await finished(event.iens, event.returncode, event.aborted)
completed.add(event.iens)
if completed == expected:
break
finally:
poll_task.cancel()


@pytest.mark.parametrize(
"bjobs_script, expectation",
[
Expand Down
Loading

0 comments on commit ae1bdd5

Please sign in to comment.