diff --git a/src/ert/job_queue/job_queue_node.py b/src/ert/job_queue/job_queue_node.py index 9661bf345ec..3d48b5f9353 100644 --- a/src/ert/job_queue/job_queue_node.py +++ b/src/ert/job_queue/job_queue_node.py @@ -1,21 +1,16 @@ from __future__ import annotations import logging -import multiprocessing as mp import random -import sys import time -import traceback -from ctypes import c_int from threading import Lock, Semaphore, Thread -from typing import TYPE_CHECKING, Any, Optional, Tuple +from typing import TYPE_CHECKING, Optional from cwrap import BaseCClass from ecl.util.util import StringList from ert._clib.queue import _refresh_status # pylint: disable=import-error from ert.load_status import LoadStatus -from ert.realization_state import RealizationState from . import ResPrototype from .job_status_type_enum import JobStatusType @@ -23,8 +18,6 @@ from .thread_status_type_enum import ThreadStatus if TYPE_CHECKING: - from multiprocessing.sharedctypes import Synchronized, SynchronizedString - from ert.callbacks import Callback, CallbackArgs from .driver import Driver @@ -184,25 +177,9 @@ def submit(self, driver: "Driver") -> JobSubmitStatusType: return self._submit(driver) # type: ignore def run_done_callback(self) -> Optional[LoadStatus]: - if sys.platform == "linux": - callback_status, status_msg = self.run_done_callback_forking() - else: - try: - callback_status, status_msg = self.done_callback_function( - *self.callback_arguments - ) - except Exception as err: # pylint: disable=broad-exception-caught - exception_with_stack = "".join( - traceback.format_exception(type(err), err, err.__traceback__) - ) - error_message = ( - "got exception while running forward_model_ok " - f"callback:\n{exception_with_stack}" - ) - print(error_message) - logger.exception(err) - callback_status = LoadStatus.LOAD_FAILURE - status_msg = error_message + callback_status, status_msg = self.done_callback_function( + *self.callback_arguments + ) if callback_status == LoadStatus.LOAD_SUCCESSFUL: self._set_queue_status(JobStatusType.JOB_QUEUE_SUCCESS) elif callback_status == LoadStatus.TIME_MAP_FAILURE: @@ -219,62 +196,6 @@ def run_timeout_callback(self) -> None: if self.callback_timeout: self.callback_timeout(*self.callback_arguments) - # this function only works on systems where multiprocessing.Process uses forking - def run_done_callback_forking(self) -> Tuple[LoadStatus, str]: - # status_msg has a maximum length of 1024 bytes. - # the size is immutable after creation due to being backed by a c array. - status_msg: "SynchronizedString" = mp.Array("c", b" " * 1024) # type: ignore - callback_status: "Synchronized[c_int]" = mp.Value("i", 2) # type: ignore - pcontext = ProcessWithException( - target=self.done_callback_wrapper, - kwargs={ - "callback_arguments": self.callback_arguments, - "callback_status_shared": callback_status, - "status_msg_shared": status_msg, - }, - ) - pcontext.start() - try: - pcontext.wait_and_throw_if_exception() - except Exception as err: # pylint: disable=broad-exception-caught - exception_with_stack = "".join( - traceback.format_exception(type(err), err, err.__traceback__) - ) - error_message = ( - "got exception while running forward_model_ok " - f"callback:\n{exception_with_stack}" - ) - print(error_message) - logger.exception(err) - pcontext.join() - - load_status = LoadStatus(callback_status.value) - - # this step was added because the state_map update in - # forward_model_ok does not propagate from the spawned process. - run_arg = self.callback_arguments[0] - run_arg.ensemble_storage.state_map[run_arg.iens] = ( - RealizationState.HAS_DATA - if load_status == LoadStatus.LOAD_SUCCESSFUL - else RealizationState.LOAD_FAILURE - ) - - return load_status, status_msg.value.decode("utf-8") - - def done_callback_wrapper( - self, - callback_arguments: CallbackArgs, - callback_status_shared: "Synchronized[c_int]", - status_msg_shared: "SynchronizedString", - ) -> None: - callback_status: Optional[LoadStatus] - status_msg: str - callback_status, status_msg = self.done_callback_function(*callback_arguments) - - if callback_status is not None: - callback_status_shared.value = callback_status.value # type: ignore - status_msg_shared.value = bytes(status_msg, "utf-8") - def run_exit_callback(self) -> None: self.exit_callback_function(*self.callback_arguments) @@ -473,26 +394,3 @@ def wait_for(self) -> None: def _set_thread_status(self, new_status: ThreadStatus) -> None: self._thread_status = new_status - - -class ProcessWithException(mp.Process): - """Used to run something in a subprocess, and capture exceptions. In order to catch - an exception, wait_and_throw_if_exception should be tried - before one joins the - process!""" - - def __init__(self, *args: Any, **kwargs: Any): - mp.Process.__init__(self, *args, **kwargs) - self._parent_connection, self._child_connection = mp.Pipe(False) - self._exception = None - - def run(self) -> None: - try: - mp.Process.run(self) - self._child_connection.send(None) - except Exception as err: # pylint: disable=broad-exception-caught - self._child_connection.send(err) - - def wait_and_throw_if_exception(self) -> None: - exception = self._parent_connection.recv() - if exception: - raise exception diff --git a/tests/unit_tests/cli/test_integration_cli.py b/tests/unit_tests/cli/test_integration_cli.py index 8bdb9ab08c7..ffcd4082a96 100644 --- a/tests/unit_tests/cli/test_integration_cli.py +++ b/tests/unit_tests/cli/test_integration_cli.py @@ -8,7 +8,7 @@ from argparse import ArgumentParser from pathlib import Path from textwrap import dedent -from unittest.mock import Mock, call, patch +from unittest.mock import Mock, call import numpy as np import pandas as pd @@ -892,43 +892,3 @@ def test_that_setenv_sets_environment_variables_in_jobs(setenv_config): assert lines[2].strip() == "TheThirdValue" # now MYVAR now set, so should be expanded inside the value of FOURTH assert lines[3].strip() == "fourth:foo" - - -@pytest.mark.integration_test -def test_cli_test_run_catches_forward_model_ok_callback_exception( - tmpdir, source_root, caplog -): - shutil.copytree( - os.path.join(source_root, "test-data", "poly_example"), - os.path.join(str(tmpdir), "poly_example"), - ) - with open( - os.path.join(str(tmpdir), "poly_example", "poly.ert"), - mode="a", - encoding="utf-8", - ) as config_file: - config_file.writelines(["MAX_SUBMIT 1"]) - caplog.set_level(logging.ERROR) - - error_message = "Argh" - - with tmpdir.as_cwd(), patch( - "ert.run_models.base_run_model.forward_model_ok" - ) as bad_fm_ok_callback: - bad_fm_ok_callback.side_effect = RuntimeError(error_message) - parser = ArgumentParser(prog="test_main") - parsed = ert_parser( - parser, - [ - TEST_RUN_MODE, - "poly_example/poly.ert", - "--port-range", - "1024-65535", - ], - ) - with pytest.raises(ErtCliError): - run_cli(parsed) - - assert "RuntimeError" in caplog.text - assert error_message in caplog.text - assert "Traceback" in caplog.text diff --git a/tests/unit_tests/ensemble_evaluator/conftest.py b/tests/unit_tests/ensemble_evaluator/conftest.py index 480f15ac280..5d63a2b6b1f 100644 --- a/tests/unit_tests/ensemble_evaluator/conftest.py +++ b/tests/unit_tests/ensemble_evaluator/conftest.py @@ -3,7 +3,7 @@ import stat from dataclasses import dataclass from pathlib import Path -from unittest.mock import MagicMock, Mock +from unittest.mock import Mock import pytest @@ -99,7 +99,6 @@ def _make_ensemble_builder(tmpdir, num_reals, num_jobs, job_sleep=0): @dataclass class RunArg: iens: int - ensemble_storage = MagicMock() for iens in range(0, num_reals): run_path = Path(tmpdir / f"real_{iens}") diff --git a/tests/unit_tests/job_queue/test_job_queue.py b/tests/unit_tests/job_queue/test_job_queue.py index 698aaa2a4ca..92dc444c6fb 100644 --- a/tests/unit_tests/job_queue/test_job_queue.py +++ b/tests/unit_tests/job_queue/test_job_queue.py @@ -62,7 +62,6 @@ def dummy_exit_callback(*args): @dataclass class RunArg: iens: int - ensemble_storage = MagicMock() def create_local_queue( diff --git a/tests/unit_tests/job_queue/test_job_queue_manager.py b/tests/unit_tests/job_queue/test_job_queue_manager.py index 84f2e445938..5c9a5ed28a4 100644 --- a/tests/unit_tests/job_queue/test_job_queue_manager.py +++ b/tests/unit_tests/job_queue/test_job_queue_manager.py @@ -4,7 +4,6 @@ from pathlib import Path from threading import BoundedSemaphore from typing import Callable, List, TypedDict -from unittest.mock import MagicMock import pytest @@ -16,7 +15,6 @@ @dataclass class RunArg: iens: int - ensemble_storage = MagicMock() class Config(TypedDict): diff --git a/tests/unit_tests/job_queue/test_job_queue_manager_torque.py b/tests/unit_tests/job_queue/test_job_queue_manager_torque.py index c3076ae1540..ffbf0519e94 100644 --- a/tests/unit_tests/job_queue/test_job_queue_manager_torque.py +++ b/tests/unit_tests/job_queue/test_job_queue_manager_torque.py @@ -4,7 +4,6 @@ from pathlib import Path from threading import BoundedSemaphore from typing import Callable, TypedDict -from unittest.mock import MagicMock import pytest @@ -36,7 +35,6 @@ def fixture_dummy_config(): @dataclass class RunArg: iens: int - ensemble_storage = MagicMock() class JobConfig(TypedDict):