Skip to content

Commit

Permalink
Revert "Make forward_model_ok run in its own process"
Browse files Browse the repository at this point in the history
This reverts commit 9a8441f.
  • Loading branch information
oyvindeide committed Sep 1, 2023
1 parent c26f2f2 commit 26144a1
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 154 deletions.
110 changes: 4 additions & 106 deletions src/ert/job_queue/job_queue_node.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,23 @@
from __future__ import annotations

import logging
import multiprocessing as mp
import random
import sys
import time
import traceback
from ctypes import c_int
from threading import Lock, Semaphore, Thread
from typing import TYPE_CHECKING, Any, Optional, Tuple
from typing import TYPE_CHECKING, Optional

from cwrap import BaseCClass
from ecl.util.util import StringList

from ert._clib.queue import _refresh_status # pylint: disable=import-error
from ert.load_status import LoadStatus
from ert.realization_state import RealizationState

from . import ResPrototype
from .job_status_type_enum import JobStatusType
from .job_submit_status_type_enum import JobSubmitStatusType
from .thread_status_type_enum import ThreadStatus

if TYPE_CHECKING:
from multiprocessing.sharedctypes import Synchronized, SynchronizedString

from ert.callbacks import Callback, CallbackArgs

from .driver import Driver
Expand Down Expand Up @@ -184,25 +177,9 @@ def submit(self, driver: "Driver") -> JobSubmitStatusType:
return self._submit(driver) # type: ignore

def run_done_callback(self) -> Optional[LoadStatus]:
if sys.platform == "linux":
callback_status, status_msg = self.run_done_callback_forking()
else:
try:
callback_status, status_msg = self.done_callback_function(
*self.callback_arguments
)
except Exception as err: # pylint: disable=broad-exception-caught
exception_with_stack = "".join(
traceback.format_exception(type(err), err, err.__traceback__)
)
error_message = (
"got exception while running forward_model_ok "
f"callback:\n{exception_with_stack}"
)
print(error_message)
logger.exception(err)
callback_status = LoadStatus.LOAD_FAILURE
status_msg = error_message
callback_status, status_msg = self.done_callback_function(
*self.callback_arguments
)
if callback_status == LoadStatus.LOAD_SUCCESSFUL:
self._set_queue_status(JobStatusType.JOB_QUEUE_SUCCESS)
elif callback_status == LoadStatus.TIME_MAP_FAILURE:
Expand All @@ -219,62 +196,6 @@ def run_timeout_callback(self) -> None:
if self.callback_timeout:
self.callback_timeout(*self.callback_arguments)

# this function only works on systems where multiprocessing.Process uses forking
def run_done_callback_forking(self) -> Tuple[LoadStatus, str]:
# status_msg has a maximum length of 1024 bytes.
# the size is immutable after creation due to being backed by a c array.
status_msg: "SynchronizedString" = mp.Array("c", b" " * 1024) # type: ignore
callback_status: "Synchronized[c_int]" = mp.Value("i", 2) # type: ignore
pcontext = ProcessWithException(
target=self.done_callback_wrapper,
kwargs={
"callback_arguments": self.callback_arguments,
"callback_status_shared": callback_status,
"status_msg_shared": status_msg,
},
)
pcontext.start()
try:
pcontext.wait_and_throw_if_exception()
except Exception as err: # pylint: disable=broad-exception-caught
exception_with_stack = "".join(
traceback.format_exception(type(err), err, err.__traceback__)
)
error_message = (
"got exception while running forward_model_ok "
f"callback:\n{exception_with_stack}"
)
print(error_message)
logger.exception(err)
pcontext.join()

load_status = LoadStatus(callback_status.value)

# this step was added because the state_map update in
# forward_model_ok does not propagate from the spawned process.
run_arg = self.callback_arguments[0]
run_arg.ensemble_storage.state_map[run_arg.iens] = (
RealizationState.HAS_DATA
if load_status == LoadStatus.LOAD_SUCCESSFUL
else RealizationState.LOAD_FAILURE
)

return load_status, status_msg.value.decode("utf-8")

def done_callback_wrapper(
self,
callback_arguments: CallbackArgs,
callback_status_shared: "Synchronized[c_int]",
status_msg_shared: "SynchronizedString",
) -> None:
callback_status: Optional[LoadStatus]
status_msg: str
callback_status, status_msg = self.done_callback_function(*callback_arguments)

if callback_status is not None:
callback_status_shared.value = callback_status.value # type: ignore
status_msg_shared.value = bytes(status_msg, "utf-8")

def run_exit_callback(self) -> None:
self.exit_callback_function(*self.callback_arguments)

Expand Down Expand Up @@ -473,26 +394,3 @@ def wait_for(self) -> None:

def _set_thread_status(self, new_status: ThreadStatus) -> None:
self._thread_status = new_status


class ProcessWithException(mp.Process):
"""Used to run something in a subprocess, and capture exceptions. In order to catch
an exception, wait_and_throw_if_exception should be tried - before one joins the
process!"""

def __init__(self, *args: Any, **kwargs: Any):
mp.Process.__init__(self, *args, **kwargs)
self._parent_connection, self._child_connection = mp.Pipe(False)
self._exception = None

def run(self) -> None:
try:
mp.Process.run(self)
self._child_connection.send(None)
except Exception as err: # pylint: disable=broad-exception-caught
self._child_connection.send(err)

def wait_and_throw_if_exception(self) -> None:
exception = self._parent_connection.recv()
if exception:
raise exception
42 changes: 1 addition & 41 deletions tests/unit_tests/cli/test_integration_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from argparse import ArgumentParser
from pathlib import Path
from textwrap import dedent
from unittest.mock import Mock, call, patch
from unittest.mock import Mock, call

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -892,43 +892,3 @@ def test_that_setenv_sets_environment_variables_in_jobs(setenv_config):
assert lines[2].strip() == "TheThirdValue"
# now MYVAR now set, so should be expanded inside the value of FOURTH
assert lines[3].strip() == "fourth:foo"


@pytest.mark.integration_test
def test_cli_test_run_catches_forward_model_ok_callback_exception(
tmpdir, source_root, caplog
):
shutil.copytree(
os.path.join(source_root, "test-data", "poly_example"),
os.path.join(str(tmpdir), "poly_example"),
)
with open(
os.path.join(str(tmpdir), "poly_example", "poly.ert"),
mode="a",
encoding="utf-8",
) as config_file:
config_file.writelines(["MAX_SUBMIT 1"])
caplog.set_level(logging.ERROR)

error_message = "Argh"

with tmpdir.as_cwd(), patch(
"ert.run_models.base_run_model.forward_model_ok"
) as bad_fm_ok_callback:
bad_fm_ok_callback.side_effect = RuntimeError(error_message)
parser = ArgumentParser(prog="test_main")
parsed = ert_parser(
parser,
[
TEST_RUN_MODE,
"poly_example/poly.ert",
"--port-range",
"1024-65535",
],
)
with pytest.raises(ErtCliError):
run_cli(parsed)

assert "RuntimeError" in caplog.text
assert error_message in caplog.text
assert "Traceback" in caplog.text
3 changes: 1 addition & 2 deletions tests/unit_tests/ensemble_evaluator/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import stat
from dataclasses import dataclass
from pathlib import Path
from unittest.mock import MagicMock, Mock
from unittest.mock import Mock

import pytest

Expand Down Expand Up @@ -99,7 +99,6 @@ def _make_ensemble_builder(tmpdir, num_reals, num_jobs, job_sleep=0):
@dataclass
class RunArg:
iens: int
ensemble_storage = MagicMock()

for iens in range(0, num_reals):
run_path = Path(tmpdir / f"real_{iens}")
Expand Down
1 change: 0 additions & 1 deletion tests/unit_tests/job_queue/test_job_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ def dummy_exit_callback(*args):
@dataclass
class RunArg:
iens: int
ensemble_storage = MagicMock()


def create_local_queue(
Expand Down
2 changes: 0 additions & 2 deletions tests/unit_tests/job_queue/test_job_queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from pathlib import Path
from threading import BoundedSemaphore
from typing import Callable, List, TypedDict
from unittest.mock import MagicMock

import pytest

Expand All @@ -16,7 +15,6 @@
@dataclass
class RunArg:
iens: int
ensemble_storage = MagicMock()


class Config(TypedDict):
Expand Down
2 changes: 0 additions & 2 deletions tests/unit_tests/job_queue/test_job_queue_manager_torque.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from pathlib import Path
from threading import BoundedSemaphore
from typing import Callable, TypedDict
from unittest.mock import MagicMock

import pytest

Expand Down Expand Up @@ -36,7 +35,6 @@ def fixture_dummy_config():
@dataclass
class RunArg:
iens: int
ensemble_storage = MagicMock()


class JobConfig(TypedDict):
Expand Down

0 comments on commit 26144a1

Please sign in to comment.