Skip to content

Commit

Permalink
Test cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
oyvindeide committed Sep 25, 2023
1 parent ece80b9 commit 4afec8b
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 103 deletions.
33 changes: 32 additions & 1 deletion tests/unit_tests/job_queue/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import stat
from pathlib import Path
from unittest.mock import MagicMock
import ert

import pytest

import ert
from ert.load_status import LoadStatus


Expand All @@ -10,3 +13,31 @@ def mock_fm_ok(monkeypatch):
fm_ok = MagicMock(return_value=(LoadStatus.LOAD_SUCCESSFUL, ""))
monkeypatch.setattr(ert.job_queue.job_queue_node, "forward_model_ok", fm_ok)
yield fm_ok


@pytest.fixture
def simple_script(tmp_path):
SIMPLE_SCRIPT = """#!/bin/sh
echo "finished successfully" > STATUS
"""
fout = Path(tmp_path / "job_script")
fout.write_text(SIMPLE_SCRIPT, encoding="utf-8")
fout.chmod(stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG)
yield str(fout)


@pytest.fixture
def failing_script(tmp_path):
"""
This script is susceptible to race conditions. Python works
better than sh."""
FAILING_SCRIPT = """#!/usr/bin/env python
import sys
with open("one_byte_pr_invocation", "a") as f:
f.write(".")
sys.exit(1)
"""
fout = Path(tmp_path / "failing_script")
fout.write_text(FAILING_SCRIPT, encoding="utf-8")
fout.chmod(stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG)
yield str(fout)
81 changes: 28 additions & 53 deletions tests/unit_tests/job_queue/test_job_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from pathlib import Path
from threading import BoundedSemaphore
from typing import Any, Callable, Dict, Optional
from unittest.mock import patch
from unittest.mock import MagicMock, patch

import pytest

import ert.callbacks
from ert.config import QueueSystem
Expand All @@ -28,71 +30,47 @@ def wait_for(
)


def dummy_exit_callback(*args):
print(args)


DUMMY_CONFIG: Dict[str, Any] = {
"job_script": "job_script.py",
"num_cpu": 1,
"job_name": "dummy_job_{}",
"run_path": "dummy_path_{}",
"ok_callback": lambda _, _b: (LoadStatus.LOAD_SUCCESSFUL, ""),
"exit_callback": dummy_exit_callback,
}

SIMPLE_SCRIPT = """#!/usr/bin/env python
print('hello')
"""

NEVER_ENDING_SCRIPT = """#!/usr/bin/env python
@pytest.fixture
def never_ending_script(tmp_path):
NEVER_ENDING_SCRIPT = """#!/usr/bin/env python
import time
while True:
time.sleep(0.5)
"""

FAILING_SCRIPT = """#!/usr/bin/env python
import sys
sys.exit(1)
"""


@dataclass
class RunArg:
iens: int
"""
fout = Path(tmp_path / "never_ending_job_script")
fout.write_text(NEVER_ENDING_SCRIPT, encoding="utf-8")
fout.chmod(stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG)
yield str(fout)


def create_local_queue(
monkeypatch,
executable_script: str,
max_submit: int = 1,
max_runtime: Optional[int] = None,
callback_timeout: Optional["Callable[[int], None]"] = None,
):
monkeypatch.setattr(
ert.job_queue.job_queue_node, "forward_model_ok", DUMMY_CONFIG["ok_callback"]
)
monkeypatch.setattr(
JobQueueNode, "run_exit_callback", DUMMY_CONFIG["exit_callback"]
)

driver = Driver(driver_type=QueueSystem.LOCAL)
job_queue = JobQueue(driver, max_submit=max_submit)

scriptpath = Path(DUMMY_CONFIG["job_script"])
scriptpath.write_text(executable_script, encoding="utf-8")
scriptpath.chmod(stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG)

for iens in range(10):
Path(DUMMY_CONFIG["run_path"].format(iens)).mkdir(exist_ok=False)
job = JobQueueNode(
job_script=DUMMY_CONFIG["job_script"],
job_script=executable_script,
job_name=DUMMY_CONFIG["job_name"].format(iens),
run_path=DUMMY_CONFIG["run_path"].format(iens),
num_cpu=DUMMY_CONFIG["num_cpu"],
status_file=job_queue.status_file,
exit_file=job_queue.exit_file,
run_arg=RunArg(iens),
run_arg=MagicMock(),
max_runtime=max_runtime,
callback_timeout=callback_timeout,
)
Expand All @@ -109,9 +87,9 @@ def start_all(job_queue, sema_pool):
job = job_queue.fetch_next_waiting()


def test_kill_jobs(tmpdir, monkeypatch):
def test_kill_jobs(tmpdir, monkeypatch, never_ending_script):
monkeypatch.chdir(tmpdir)
job_queue = create_local_queue(monkeypatch, NEVER_ENDING_SCRIPT)
job_queue = create_local_queue(never_ending_script)

assert job_queue.queue_size == 10
assert job_queue.is_active()
Expand Down Expand Up @@ -140,9 +118,9 @@ def test_kill_jobs(tmpdir, monkeypatch):
job.wait_for()


def test_add_jobs(tmpdir, monkeypatch):
def test_add_jobs(tmpdir, monkeypatch, simple_script):
monkeypatch.chdir(tmpdir)
job_queue = create_local_queue(monkeypatch, SIMPLE_SCRIPT)
job_queue = create_local_queue(simple_script)

assert job_queue.queue_size == 10
assert job_queue.is_active()
Expand All @@ -160,9 +138,9 @@ def test_add_jobs(tmpdir, monkeypatch):
job.wait_for()


def test_failing_jobs(tmpdir, monkeypatch):
def test_failing_jobs(tmpdir, monkeypatch, failing_script):
monkeypatch.chdir(tmpdir)
job_queue = create_local_queue(monkeypatch, FAILING_SCRIPT, max_submit=1)
job_queue = create_local_queue(failing_script, max_submit=1)

assert job_queue.queue_size == 10
assert job_queue.is_active()
Expand All @@ -186,20 +164,17 @@ def test_failing_jobs(tmpdir, monkeypatch):
assert job_queue.snapshot()[iens] == str(JobStatus.FAILED)


def test_timeout_jobs(tmpdir, monkeypatch):
def test_timeout_jobs(tmpdir, monkeypatch, never_ending_script):
monkeypatch.chdir(tmpdir)
job_numbers = set()

def callback(iens):
nonlocal job_numbers
job_numbers.add(iens)
mock_callback = MagicMock()

job_queue = create_local_queue(
monkeypatch,
NEVER_ENDING_SCRIPT,
never_ending_script,
max_submit=1,
max_runtime=5,
callback_timeout=callback,
callback_timeout=mock_callback,
)

assert job_queue.queue_size == 10
Expand All @@ -222,15 +197,15 @@ def callback(iens):
iens = job_queue._differ.qindex_to_iens(q_index)
assert job_queue.snapshot()[iens] == str(JobStatus.IS_KILLED)

assert job_numbers == set(range(10))
assert len(mock_callback.mock_calls) == 20

for job in job_queue.job_list:
job.wait_for()


def test_add_dispatch_info(tmpdir, monkeypatch):
def test_add_dispatch_info(tmpdir, monkeypatch, simple_script):
monkeypatch.chdir(tmpdir)
job_queue = create_local_queue(monkeypatch, SIMPLE_SCRIPT)
job_queue = create_local_queue(simple_script)
ens_id = "some_id"
cert = "My very nice cert"
token = "my_super_secret_token"
Expand Down Expand Up @@ -259,9 +234,9 @@ def test_add_dispatch_info(tmpdir, monkeypatch):
assert (runpath / cert_file).read_text(encoding="utf-8") == cert


def test_add_dispatch_info_cert_none(tmpdir, monkeypatch):
def test_add_dispatch_info_cert_none(tmpdir, monkeypatch, simple_script):
monkeypatch.chdir(tmpdir)
job_queue = create_local_queue(monkeypatch, SIMPLE_SCRIPT)
job_queue = create_local_queue(simple_script)
ens_id = "some_id"
dispatch_url = "wss://example.org"
cert = None
Expand Down
42 changes: 10 additions & 32 deletions tests/unit_tests/job_queue/test_job_queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,17 @@


class Config(TypedDict):
job_script: str
num_cpu: int
job_name: str
run_path: str


DUMMY_CONFIG: Config = {
"job_script": "job_script.py",
"num_cpu": 1,
"job_name": "dummy_job_{}",
"run_path": "dummy_path_{}",
}

SIMPLE_SCRIPT = """#!/bin/sh
echo "finished successfully" > STATUS
"""

# This script is susceptible to race conditions. Python works
# better than sh.
FAILING_SCRIPT = """#!/usr/bin/env python
import sys
with open("one_byte_pr_invocation", "a") as f:
f.write(".")
sys.exit(1)
"""

MOCK_BSUB = """#!/bin/sh
echo "$@" > test.out
Expand All @@ -52,14 +38,10 @@ def create_local_queue(
driver = Driver(driver_type=QueueSystem.LOCAL)
job_queue = JobQueue(driver, max_submit=max_submit)

scriptpath = Path(DUMMY_CONFIG["job_script"])
scriptpath.write_text(executable_script, encoding="utf-8")
scriptpath.chmod(stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG)

for iens in range(num_realizations):
Path(DUMMY_CONFIG["run_path"].format(iens)).mkdir()
job = JobQueueNode(
job_script=DUMMY_CONFIG["job_script"],
job_script=executable_script,
job_name=DUMMY_CONFIG["job_name"].format(iens),
run_path=os.path.realpath(DUMMY_CONFIG["run_path"].format(iens)),
num_cpu=DUMMY_CONFIG["num_cpu"],
Expand All @@ -71,17 +53,13 @@ def create_local_queue(
return job_queue


@pytest.mark.usefixtures("use_tmpdir")
def test_num_cpu_submitted_correctly_lsf(tmpdir, mock_fm_ok):
@pytest.mark.usefixtures("use_tmpdir", "mock_fm_ok")
def test_num_cpu_submitted_correctly_lsf(tmpdir, simple_script):
"""Assert that num_cpu from the ERT configuration is passed on to the bsub
command used to submit jobs to LSF"""
os.putenv("PATH", os.getcwd() + ":" + os.getenv("PATH"))
driver = Driver(driver_type=QueueSystem.LSF)

script = Path(DUMMY_CONFIG["job_script"])
script.write_text(SIMPLE_SCRIPT, encoding="utf-8")
script.chmod(stat.S_IRWXU)

bsub = Path("bsub")
bsub.write_text(MOCK_BSUB, encoding="utf-8")
bsub.chmod(stat.S_IRWXU)
Expand All @@ -91,7 +69,7 @@ def test_num_cpu_submitted_correctly_lsf(tmpdir, mock_fm_ok):
os.mkdir(DUMMY_CONFIG["run_path"].format(job_id))

job = JobQueueNode(
job_script=DUMMY_CONFIG["job_script"],
job_script=simple_script,
job_name=DUMMY_CONFIG["job_name"].format(job_id),
run_path=os.path.realpath(DUMMY_CONFIG["run_path"].format(job_id)),
num_cpu=4,
Expand All @@ -118,23 +96,23 @@ def test_num_cpu_submitted_correctly_lsf(tmpdir, mock_fm_ok):
assert found_cpu_arg is True


def test_execute_queue(tmpdir, monkeypatch, mock_fm_ok):
def test_execute_queue(tmpdir, monkeypatch, mock_fm_ok, simple_script):
monkeypatch.chdir(tmpdir)
job_queue = create_local_queue(SIMPLE_SCRIPT)
job_queue = create_local_queue(simple_script)
manager = JobQueueManager(job_queue)
manager.execute_queue()

assert len(mock_fm_ok.mock_calls) == len(job_queue.job_list)


@pytest.mark.parametrize("max_submit_num", [1, 2, 3])
def test_max_submit_reached(tmpdir, max_submit_num, monkeypatch):
def test_max_submit_reached(tmpdir, max_submit_num, monkeypatch, failing_script):
"""Check that the JobQueueManager will submit exactly the maximum number of
resubmissions in the case of scripts that fail."""
monkeypatch.chdir(tmpdir)
num_realizations = 2
job_queue = create_local_queue(
FAILING_SCRIPT,
failing_script,
max_submit=max_submit_num,
num_realizations=num_realizations,
)
Expand All @@ -156,9 +134,9 @@ def test_max_submit_reached(tmpdir, max_submit_num, monkeypatch):


@pytest.mark.parametrize("max_submit_num", [1, 2, 3])
def test_kill_queue(tmpdir, max_submit_num, monkeypatch):
def test_kill_queue(tmpdir, max_submit_num, monkeypatch, simple_script):
monkeypatch.chdir(tmpdir)
job_queue = create_local_queue(SIMPLE_SCRIPT, max_submit=max_submit_num)
job_queue = create_local_queue(simple_script, max_submit=max_submit_num)
manager = JobQueueManager(job_queue)
job_queue.kill_all_jobs()
manager.execute_queue()
Expand Down
Loading

0 comments on commit 4afec8b

Please sign in to comment.