Skip to content

Commit

Permalink
Test cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
oyvindeide committed Sep 25, 2023
1 parent 600a843 commit ece80b9
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 105 deletions.
4 changes: 0 additions & 4 deletions tests/unit_tests/ensemble_evaluator/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ def _make_ensemble_builder(monkeypatch, tmpdir, num_reals, num_jobs, job_sleep=0
)
)

@dataclass
class RunArg:
iens: int

for iens in range(0, num_reals):
run_path = Path(tmpdir / f"real_{iens}")
os.mkdir(run_path)
Expand Down
12 changes: 12 additions & 0 deletions tests/unit_tests/job_queue/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from unittest.mock import MagicMock
import ert
import pytest

from ert.load_status import LoadStatus


@pytest.fixture
def mock_fm_ok(monkeypatch):
fm_ok = MagicMock(return_value=(LoadStatus.LOAD_SUCCESSFUL, ""))
monkeypatch.setattr(ert.job_queue.job_queue_node, "forward_model_ok", fm_ok)
yield fm_ok
61 changes: 11 additions & 50 deletions tests/unit_tests/job_queue/test_job_queue_manager.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,28 @@
import os
import stat
from dataclasses import dataclass
from pathlib import Path
from threading import BoundedSemaphore
from typing import Callable, List, TypedDict
from typing import List, TypedDict
from unittest.mock import MagicMock

import pytest

import ert.callbacks
from ert.config import QueueSystem
from ert.job_queue import Driver, JobQueue, JobQueueManager, JobQueueNode, JobStatus
from ert.load_status import LoadStatus


@dataclass
class RunArg:
iens: int


class Config(TypedDict):
job_script: str
num_cpu: int
job_name: str
run_path: str
ok_callback: Callable
exit_callback: Callable


def dummy_ok_callback(runarg, path):
(Path(path) / "OK").write_text("success", encoding="utf-8")
return (LoadStatus.LOAD_SUCCESSFUL, "")


def dummy_exit_callback(self):
Path("ERROR").write_text("failure", encoding="utf-8")


DUMMY_CONFIG: Config = {
"job_script": "job_script.py",
"num_cpu": 1,
"job_name": "dummy_job_{}",
"run_path": "dummy_path_{}",
"ok_callback": dummy_ok_callback,
"exit_callback": dummy_exit_callback,
}

SIMPLE_SCRIPT = """#!/bin/sh
Expand All @@ -67,15 +47,8 @@ def dummy_exit_callback(self):


def create_local_queue(
monkeypatch, executable_script: str, max_submit: int = 2, num_realizations: int = 10
executable_script: str, max_submit: int = 2, num_realizations: int = 10
):
monkeypatch.setattr(
ert.job_queue.job_queue_node, "forward_model_ok", DUMMY_CONFIG["ok_callback"]
)
monkeypatch.setattr(
JobQueueNode, "run_exit_callback", DUMMY_CONFIG["exit_callback"]
)

driver = Driver(driver_type=QueueSystem.LOCAL)
job_queue = JobQueue(driver, max_submit=max_submit)

Expand All @@ -92,23 +65,16 @@ def create_local_queue(
num_cpu=DUMMY_CONFIG["num_cpu"],
status_file=job_queue.status_file,
exit_file=job_queue.exit_file,
run_arg=RunArg(iens),
ensemble_config=Path(DUMMY_CONFIG["run_path"].format(iens)).resolve(),
run_arg=MagicMock(),
)
job_queue.add_job(job, iens)
return job_queue


def test_num_cpu_submitted_correctly_lsf(tmpdir, monkeypatch):
@pytest.mark.usefixtures("use_tmpdir")
def test_num_cpu_submitted_correctly_lsf(tmpdir, mock_fm_ok):
"""Assert that num_cpu from the ERT configuration is passed on to the bsub
command used to submit jobs to LSF"""
monkeypatch.setattr(
ert.job_queue.job_queue_node, "forward_model_ok", DUMMY_CONFIG["ok_callback"]
)
monkeypatch.setattr(
JobQueueNode, "run_exit_callback", DUMMY_CONFIG["exit_callback"]
)
monkeypatch.chdir(tmpdir)
os.putenv("PATH", os.getcwd() + ":" + os.getenv("PATH"))
driver = Driver(driver_type=QueueSystem.LSF)

Expand All @@ -131,8 +97,7 @@ def test_num_cpu_submitted_correctly_lsf(tmpdir, monkeypatch):
num_cpu=4,
status_file="STATUS",
exit_file="ERROR",
run_arg=RunArg(iens=job_id),
ensemble_config=Path(DUMMY_CONFIG["run_path"].format(job_id)).resolve(),
run_arg=MagicMock(),
)

pool_sema = BoundedSemaphore(value=2)
Expand All @@ -153,14 +118,13 @@ def test_num_cpu_submitted_correctly_lsf(tmpdir, monkeypatch):
assert found_cpu_arg is True


def test_execute_queue(tmpdir, monkeypatch):
def test_execute_queue(tmpdir, monkeypatch, mock_fm_ok):
monkeypatch.chdir(tmpdir)
job_queue = create_local_queue(monkeypatch, SIMPLE_SCRIPT)
job_queue = create_local_queue(SIMPLE_SCRIPT)
manager = JobQueueManager(job_queue)
manager.execute_queue()

for job in job_queue.job_list:
assert (Path(job.run_path) / "OK").read_text(encoding="utf-8") == "success"
assert len(mock_fm_ok.mock_calls) == len(job_queue.job_list)


@pytest.mark.parametrize("max_submit_num", [1, 2, 3])
Expand All @@ -170,7 +134,6 @@ def test_max_submit_reached(tmpdir, max_submit_num, monkeypatch):
monkeypatch.chdir(tmpdir)
num_realizations = 2
job_queue = create_local_queue(
monkeypatch,
FAILING_SCRIPT,
max_submit=max_submit_num,
num_realizations=num_realizations,
Expand All @@ -195,9 +158,7 @@ def test_max_submit_reached(tmpdir, max_submit_num, monkeypatch):
@pytest.mark.parametrize("max_submit_num", [1, 2, 3])
def test_kill_queue(tmpdir, max_submit_num, monkeypatch):
monkeypatch.chdir(tmpdir)
job_queue = create_local_queue(
monkeypatch, SIMPLE_SCRIPT, max_submit=max_submit_num
)
job_queue = create_local_queue(SIMPLE_SCRIPT, max_submit=max_submit_num)
manager = JobQueueManager(job_queue)
job_queue.kill_all_jobs()
manager.execute_queue()
Expand Down
71 changes: 21 additions & 50 deletions tests/unit_tests/job_queue/test_job_queue_manager_torque.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
from dataclasses import dataclass
from pathlib import Path
from threading import BoundedSemaphore
from typing import Callable, TypedDict
from typing import TypedDict
from unittest.mock import MagicMock

import pytest

import ert.job_queue.job_queue_node
from ert.config import QueueSystem
from ert.job_queue import Driver, JobQueueNode, JobStatus
from ert.load_status import LoadStatus


@pytest.fixture(name="temp_working_directory")
Expand All @@ -22,49 +21,24 @@ def fixture_temp_working_directory(tmpdir, monkeypatch):
@pytest.fixture(name="dummy_config")
def fixture_dummy_config():
return JobConfig(
{
"job_script": "job_script.py",
"num_cpu": 1,
"job_name": "dummy_job_{}",
"run_path": "dummy_path_{}",
"ok_callback": dummy_ok_callback,
"exit_callback": dummy_exit_callback,
}
job_script="job_script.py",
num_cpu=1,
job_name="dummy_job_{}",
run_path="dummy_path_{}",
)


@dataclass
class RunArg:
iens: int


class JobConfig(TypedDict):
job_script: str
num_cpu: int
job_name: str
run_path: str
ok_callback: Callable
exit_callback: Callable


def dummy_ok_callback(runargs, path):
(Path(path) / "OK").write_text("success", encoding="utf-8")
return (LoadStatus.LOAD_SUCCESSFUL, "")


def dummy_exit_callback(*_args):
Path("ERROR").write_text("failure", encoding="utf-8")


SIMPLE_SCRIPT = """#!/bin/sh
echo "finished successfully" > STATUS
"""

FAILING_FORWARD_MODEL = """#!/usr/bin/env python
import sys
sys.exit(1)
"""

MOCK_QSUB = """#!/bin/sh
echo "torque job submitted" > job_output
echo "$@" >> job_output
Expand Down Expand Up @@ -143,14 +117,7 @@ def _deploy_script(scriptname: Path, scripttext: str):
script.chmod(stat.S_IRWXU)


def _build_jobqueuenode(monkeypatch, dummy_config: JobConfig, job_id=0):
monkeypatch.setattr(
ert.job_queue.job_queue_node, "forward_model_ok", dummy_config["ok_callback"]
)
monkeypatch.setattr(
JobQueueNode, "run_exit_callback", dummy_config["exit_callback"]
)

def _build_jobqueuenode(dummy_config: JobConfig, job_id=0):
runpath = Path(dummy_config["run_path"].format(job_id))
runpath.mkdir()

Expand All @@ -161,12 +128,12 @@ def _build_jobqueuenode(monkeypatch, dummy_config: JobConfig, job_id=0):
num_cpu=1,
status_file="STATUS",
exit_file="ERROR",
run_arg=RunArg(iens=job_id),
ensemble_config=Path(dummy_config["run_path"].format(job_id)).resolve(),
run_arg=MagicMock(),
)
return (job, runpath)
return job, runpath


@pytest.mark.usefixtures("use_tmpdir")
@pytest.mark.parametrize(
"qsub_script, qstat_script",
[
Expand All @@ -183,7 +150,11 @@ def _build_jobqueuenode(monkeypatch, dummy_config: JobConfig, job_id=0):
],
)
def test_run_torque_job(
monkeypatch, temp_working_directory, dummy_config, qsub_script, qstat_script
temp_working_directory,
dummy_config,
qsub_script,
qstat_script,
mock_fm_ok,
):
"""Verify that the torque driver will succeed in submitting and
monitoring torque jobs even when the Torque commands qsub and qstat
Expand All @@ -201,7 +172,7 @@ def test_run_torque_job(
options=[("QSTAT_CMD", temp_working_directory / "qstat")],
)

(job, runpath) = _build_jobqueuenode(monkeypatch, dummy_config)
job, runpath = _build_jobqueuenode(dummy_config)
job.run(driver, BoundedSemaphore())
job.wait_for()

Expand All @@ -210,15 +181,15 @@ def test_run_torque_job(
assert Path("job_output").exists()

# The "done" callback:
assert (runpath / "OK").read_text(encoding="utf-8") == "success"
mock_fm_ok.assert_called()


@pytest.mark.usefixtures("use_tmpdir")
@pytest.mark.parametrize(
"user_qstat_option, expected_options",
[("", "-f 10001"), ("-x", "-f -x 10001"), ("-f", "-f -f 10001")],
)
def test_that_torque_driver_passes_options_to_qstat(
monkeypatch,
temp_working_directory,
dummy_config,
user_qstat_option,
Expand All @@ -245,13 +216,14 @@ def test_that_torque_driver_passes_options_to_qstat(
],
)

job, _runpath = _build_jobqueuenode(monkeypatch, dummy_config)
job, _runpath = _build_jobqueuenode(dummy_config)
job.run(driver, BoundedSemaphore())
job.wait_for()

assert Path("qstat_options").read_text(encoding="utf-8").strip() == expected_options


@pytest.mark.usefixtures("mock_fm_ok", "use_tmpdir")
@pytest.mark.parametrize(
"job_state, exit_status, expected_status",
[
Expand All @@ -264,7 +236,6 @@ def test_that_torque_driver_passes_options_to_qstat(
],
)
def test_torque_job_status_from_qstat_output(
monkeypatch,
temp_working_directory,
dummy_config,
job_state,
Expand All @@ -284,7 +255,7 @@ def test_torque_job_status_from_qstat_output(
options=[("QSTAT_CMD", temp_working_directory / "qstat")],
)

job, _runpath = _build_jobqueuenode(monkeypatch, dummy_config)
job, _runpath = _build_jobqueuenode(dummy_config)

pool_sema = BoundedSemaphore(value=2)
job.run(driver, pool_sema)
Expand Down
3 changes: 2 additions & 1 deletion tests/unit_tests/status/test_tracking_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ def test_tracking(
os.chdir(ert_config.config_path)
ert = EnKFMain(ert_config)
experiment_id = storage.create_experiment(
ert.ensembleConfig().parameter_configuration
parameters=ert.ensembleConfig().parameter_configuration,
responses=ert.ensembleConfig().response_configuration,
)

model = create_model(
Expand Down

0 comments on commit ece80b9

Please sign in to comment.