From edcd86aabca0bd5b6f5249805d13249ece13d6ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=98yvind=20Eide?= Date: Mon, 18 Sep 2023 13:37:21 +0200 Subject: [PATCH 1/4] Save response info in storage This moves all the information about which responses are saved in storage to storage, instead of reading them through the config. The removes the need to pass the ensemble config to the forward model ok function. --- src/ert/callbacks.py | 12 ++-- src/ert/cli/main.py | 3 +- src/ert/config/__init__.py | 2 + src/ert/config/ensemble_config.py | 4 ++ src/ert/config/gen_data_config.py | 11 ++-- src/ert/config/observations.py | 8 ++- src/ert/config/parameter_config.py | 2 + src/ert/config/response_config.py | 8 +++ src/ert/config/summary_config.py | 9 ++- src/ert/enkf_main.py | 1 - src/ert/ensemble_evaluator/_builder/_step.py | 2 - src/ert/gui/ertwidgets/caselist.py | 3 +- src/ert/gui/simulation/simulation_panel.py | 3 +- src/ert/job_queue/job_queue_node.py | 7 +- src/ert/job_queue/queue.py | 3 - src/ert/run_models/base_run_model.py | 1 - src/ert/simulator/batch_simulator.py | 8 +-- src/ert/storage/local_ensemble.py | 7 +- src/ert/storage/local_experiment.py | 51 +++++++++++++- src/ert/storage/local_storage.py | 25 +++++-- src/ert/storage/migration/block_fs.py | 41 +++++++++++- src/ert/storage/migration/response_info.py | 25 +++++++ test-data/block_storage | 2 +- .../unit_tests/ensemble_evaluator/conftest.py | 3 +- .../ensemble_evaluator_utils.py | 1 - .../test_ensemble_builder.py | 1 - tests/unit_tests/job_queue/test_job_queue.py | 1 - .../scenarios/test_summary_response.py | 3 +- .../status/test_tracking_integration.py | 3 +- .../storage/migration/test_version_2.py | 36 ++++++++++ .../unit_tests/storage/test_local_storage.py | 66 +++++++++++++++++++ tests/unit_tests/test_load_forward_model.py | 64 +++++++++++------- tests/unit_tests/test_summary_response.py | 4 +- 33 files changed, 341 insertions(+), 79 deletions(-) create mode 100644 src/ert/storage/migration/response_info.py create mode 100644 tests/unit_tests/storage/migration/test_version_2.py diff --git a/src/ert/callbacks.py b/src/ert/callbacks.py index b3904dc9f5c..214222433b7 100644 --- a/src/ert/callbacks.py +++ b/src/ert/callbacks.py @@ -5,7 +5,7 @@ from pathlib import Path from typing import Iterable -from ert.config import EnsembleConfig, ParameterConfig, SummaryConfig +from ert.config import ParameterConfig, ResponseConfig, SummaryConfig from ert.run_arg import RunArg from .load_status import LoadResult, LoadStatus @@ -38,10 +38,10 @@ def _read_parameters( def _write_responses_to_storage( - ens_config: EnsembleConfig, run_arg: RunArg + run_arg: RunArg, response_configs: Iterable[ResponseConfig] ) -> LoadResult: errors = [] - for config in ens_config.response_configs.values(): + for config in response_configs: if isinstance(config, SummaryConfig) and not config.keys: # Nothing to load, should not be handled here, should never be # added in the first place @@ -64,7 +64,6 @@ def _write_responses_to_storage( def forward_model_ok( run_arg: RunArg, - ens_conf: EnsembleConfig, ) -> LoadResult: parameters_result = LoadResult(LoadStatus.LOAD_SUCCESSFUL, "") response_result = LoadResult(LoadStatus.LOAD_SUCCESSFUL, "") @@ -78,7 +77,10 @@ def forward_model_ok( ) if parameters_result.status == LoadStatus.LOAD_SUCCESSFUL: - response_result = _write_responses_to_storage(ens_conf, run_arg) + response_result = _write_responses_to_storage( + run_arg, + run_arg.ensemble_storage.experiment.response_configuration.values(), + ) except Exception as err: logging.exception(f"Failed to load results for realization {run_arg.iens}") diff --git a/src/ert/cli/main.py b/src/ert/cli/main.py index 0548c1effe2..61bac00459e 100644 --- a/src/ert/cli/main.py +++ b/src/ert/cli/main.py @@ -83,7 +83,8 @@ def run_cli(args: Namespace, _: Any = None) -> None: evaluator_server_config = EvaluatorServerConfig(custom_port_range=args.port_range) experiment = storage.create_experiment( - parameters=ert.ensembleConfig().parameter_configuration + parameters=ert.ensembleConfig().parameter_configuration, + responses=ert.ensembleConfig().response_configuration, ) # Note that asyncio.run should be called once in ert/shared/main.py diff --git a/src/ert/config/__init__.py b/src/ert/config/__init__.py index 69dfe123f3d..d9577324450 100644 --- a/src/ert/config/__init__.py +++ b/src/ert/config/__init__.py @@ -19,6 +19,7 @@ from .parsing import ConfigValidationError, ConfigWarning from .queue_config import QueueConfig from .queue_system import QueueSystem +from .response_config import ResponseConfig from .summary_config import SummaryConfig from .summary_observation import SummaryObservation from .surface_config import SurfaceConfig @@ -53,6 +54,7 @@ "PriorDict", "QueueConfig", "QueueSystem", + "ResponseConfig", "SummaryConfig", "SummaryObservation", "SurfaceConfig", diff --git a/src/ert/config/ensemble_config.py b/src/ert/config/ensemble_config.py index ce3d2af2e56..24a6caa6ae5 100644 --- a/src/ert/config/ensemble_config.py +++ b/src/ert/config/ensemble_config.py @@ -304,3 +304,7 @@ def get_summary_keys(self) -> List[str]: @property def parameter_configuration(self) -> List[ParameterConfig]: return list(self.parameter_configs.values()) + + @property + def response_configuration(self) -> List[ResponseConfig]: + return list(self.response_configs.values()) diff --git a/src/ert/config/gen_data_config.py b/src/ert/config/gen_data_config.py index 10c384c6556..997c51b852c 100644 --- a/src/ert/config/gen_data_config.py +++ b/src/ert/config/gen_data_config.py @@ -5,7 +5,6 @@ import numpy as np import xarray as xr -from sortedcontainers import SortedList from typing_extensions import Self from ert.validation import rangestring_to_list @@ -18,11 +17,11 @@ @dataclass class GenDataConfig(ResponseConfig): input_file: str = "" - report_steps: Optional[SortedList] = None + report_steps: Optional[List[int]] = None def __post_init__(self) -> None: if isinstance(self.report_steps, list): - self.report_steps = SortedList(set(self.report_steps)) + self.report_steps = list(set(self.report_steps)) @classmethod def from_config_list(cls, gen_data: List[str]) -> Self: @@ -35,8 +34,10 @@ def from_config_list(cls, gen_data: List[str]) -> Self: f"Missing or unsupported RESULT_FILE for GEN_DATA key {name!r}", name ) - report_steps = rangestring_to_list(options.get("REPORT_STEPS", "")) - report_steps = SortedList(report_steps) if report_steps else None + report_steps: Optional[List[int]] = rangestring_to_list( + options.get("REPORT_STEPS", "") + ) + report_steps = sorted(list(report_steps)) if report_steps else None if os.path.isabs(res_file): result_file_context = next( x for x in gen_data if x.startswith("RESULT_FILE:") diff --git a/src/ert/config/observations.py b/src/ert/config/observations.py index 0c5633b3fd0..f569b7c36de 100644 --- a/src/ert/config/observations.py +++ b/src/ert/config/observations.py @@ -123,7 +123,8 @@ def _handle_history_observation( if history_type is None: raise ValueError("Need a history type in order to use history observations") - response_config.keys.append(summary_key) + if summary_key not in response_config.keys: + response_config.keys.append(summary_key) error = history_observation["ERROR"] error_min = history_observation["ERROR_MIN"] error_mode = history_observation["ERROR_MODE"] @@ -323,7 +324,10 @@ def _handle_summary_observation( time_map: List[datetime], ) -> Dict[str, ObsVector]: summary_key = summary_dict["KEY"] - ensemble_config["summary"].keys.append(summary_key) # type: ignore + summary_config = ensemble_config["summary"] + assert isinstance(summary_config, SummaryConfig) + if summary_key not in summary_config.keys: + summary_config.keys.append(summary_key) value, std_dev = cls._make_value_and_std_dev(summary_dict) try: diff --git a/src/ert/config/parameter_config.py b/src/ert/config/parameter_config.py index 3e6c59953e5..4da60bdece9 100644 --- a/src/ert/config/parameter_config.py +++ b/src/ert/config/parameter_config.py @@ -22,6 +22,8 @@ def __init__(self, data: List[Tuple[Any, Any]]) -> None: for i, (key, value) in enumerate(data): if isinstance(value, Path): data[i] = (key, str(value)) + if isinstance(value, set): + data[i] = (key, list(value)) super().__init__(data) diff --git a/src/ert/config/response_config.py b/src/ert/config/response_config.py index 98c0660f2fe..6ebf3660191 100644 --- a/src/ert/config/response_config.py +++ b/src/ert/config/response_config.py @@ -1,8 +1,11 @@ import dataclasses from abc import ABC, abstractmethod +from typing import Any, Dict import xarray as xr +from ert.config.parameter_config import CustomDict + @dataclasses.dataclass class ResponseConfig(ABC): @@ -11,3 +14,8 @@ class ResponseConfig(ABC): @abstractmethod def read_from_file(self, run_path: str, iens: int) -> xr.Dataset: ... + + def to_dict(self) -> Dict[str, Any]: + data = dataclasses.asdict(self, dict_factory=CustomDict) + data["_ert_kind"] = self.__class__.__name__ + return data diff --git a/src/ert/config/summary_config.py b/src/ert/config/summary_config.py index 9605bcfa216..68c35089c72 100644 --- a/src/ert/config/summary_config.py +++ b/src/ert/config/summary_config.py @@ -3,7 +3,7 @@ import logging from dataclasses import dataclass from datetime import datetime -from typing import TYPE_CHECKING, Set +from typing import TYPE_CHECKING, Set, Union import xarray as xr from ecl.summary import EclSum @@ -23,7 +23,11 @@ class SummaryConfig(ResponseConfig): input_file: str keys: List[str] - refcase: Optional[Set[datetime]] = None + refcase: Union[Set[datetime], List[str], None] = None + + def __post_init__(self) -> None: + if isinstance(self.refcase, list): + self.refcase = {datetime.fromisoformat(val) for val in self.refcase} def read_from_file(self, run_path: str, iens: int) -> xr.Dataset: filename = self.input_file.replace("", str(iens)) @@ -42,6 +46,7 @@ def read_from_file(self, run_path: str, iens: int) -> xr.Dataset: c_time = summary.alloc_time_vector(True) time_map = [t.datetime() for t in c_time] if self.refcase: + assert isinstance(self.refcase, set) missing = self.refcase.difference(time_map) if missing: first, last = min(missing), max(missing) diff --git a/src/ert/enkf_main.py b/src/ert/enkf_main.py index 0f1caa98f60..0adcc13e18d 100644 --- a/src/ert/enkf_main.py +++ b/src/ert/enkf_main.py @@ -222,7 +222,6 @@ def loadFromForwardModel( run_context = self.ensemble_context(fs, realization, iteration) nr_loaded = fs.load_from_run_path( self.getEnsembleSize(), - self.ensembleConfig(), run_context.run_args, run_context.mask, ) diff --git a/src/ert/ensemble_evaluator/_builder/_step.py b/src/ert/ensemble_evaluator/_builder/_step.py index c6eaedbcdb1..737b7a6cb38 100644 --- a/src/ert/ensemble_evaluator/_builder/_step.py +++ b/src/ert/ensemble_evaluator/_builder/_step.py @@ -8,7 +8,6 @@ SOURCE_TEMPLATE_STEP = "/step/{step_id}" if TYPE_CHECKING: - from ert.config.ensemble_config import EnsembleConfig from ert.run_arg import RunArg @@ -27,7 +26,6 @@ class LegacyStep: name: str max_runtime: Optional[int] run_arg: "RunArg" - ensemble_config: "EnsembleConfig" num_cpu: int run_path: Path job_script: str diff --git a/src/ert/gui/ertwidgets/caselist.py b/src/ert/gui/ertwidgets/caselist.py index 85900667212..a903b396688 100644 --- a/src/ert/gui/ertwidgets/caselist.py +++ b/src/ert/gui/ertwidgets/caselist.py @@ -97,7 +97,8 @@ def addItem(self): new_case_name = dialog.showAndTell() if new_case_name != "": ensemble = self.storage.create_experiment( - parameters=self.facade.ensemble_config.parameter_configuration + parameters=self.facade.ensemble_config.parameter_configuration, + responses=self.facade.ensemble_config.response_configuration, ).create_ensemble( name=new_case_name, ensemble_size=self.facade.get_ensemble_size(), diff --git a/src/ert/gui/simulation/simulation_panel.py b/src/ert/gui/simulation/simulation_panel.py index 93d6927e67e..8ffb4e08605 100644 --- a/src/ert/gui/simulation/simulation_panel.py +++ b/src/ert/gui/simulation/simulation_panel.py @@ -146,7 +146,8 @@ def runSimulation(self): try: args = self.getSimulationArguments() experiment = self._notifier.storage.create_experiment( - parameters=self.ert.ensembleConfig().parameter_configuration + parameters=self.ert.ensembleConfig().parameter_configuration, + responses=self.ert.ensembleConfig().response_configuration, ) model = create_model( self.ert, diff --git a/src/ert/job_queue/job_queue_node.py b/src/ert/job_queue/job_queue_node.py index 67628ed177b..6c1a4782289 100644 --- a/src/ert/job_queue/job_queue_node.py +++ b/src/ert/job_queue/job_queue_node.py @@ -20,7 +20,6 @@ from .thread_status import ThreadStatus if TYPE_CHECKING: - from ..config import EnsembleConfig from ..run_arg import RunArg from .driver import Driver @@ -97,13 +96,11 @@ def __init__( status_file: str, exit_file: str, run_arg: "RunArg", - ensemble_config: "EnsembleConfig", max_runtime: Optional[int] = None, callback_timeout: Optional[Callable[[int], None]] = None, ): self.callback_timeout = callback_timeout self.run_arg = run_arg - self.ensemble_config = ensemble_config argc = 1 argv = StringList() argv.append(run_path) @@ -177,9 +174,7 @@ def submit(self, driver: "Driver") -> SubmitStatus: return self._submit(driver) def run_done_callback(self) -> Optional[LoadStatus]: - callback_status, status_msg = forward_model_ok( - self.run_arg, self.ensemble_config - ) + callback_status, status_msg = forward_model_ok(self.run_arg) if callback_status == LoadStatus.LOAD_SUCCESSFUL: self._set_queue_status(JobStatus.SUCCESS) elif callback_status == LoadStatus.TIME_MAP_FAILURE: diff --git a/src/ert/job_queue/queue.py b/src/ert/job_queue/queue.py index 9d2c45322fb..78d0300d70b 100644 --- a/src/ert/job_queue/queue.py +++ b/src/ert/job_queue/queue.py @@ -41,7 +41,6 @@ from . import ResPrototype if TYPE_CHECKING: - from ert.config import ErtConfig from ert.ensemble_evaluator import LegacyStep from ert.run_arg import RunArg @@ -494,7 +493,6 @@ def add_job_from_run_arg( status_file=self.status_file, exit_file=self.exit_file, run_arg=run_arg, - ensemble_config=ert_config.ensemble_config, max_runtime=max_runtime, ) @@ -515,7 +513,6 @@ def add_ee_stage( status_file=self.status_file, exit_file=self.exit_file, run_arg=stage.run_arg, - ensemble_config=stage.ensemble_config, max_runtime=stage.max_runtime, callback_timeout=callback_timeout, ) diff --git a/src/ert/run_models/base_run_model.py b/src/ert/run_models/base_run_model.py index cc5a006b120..f88d50c2df5 100644 --- a/src/ert/run_models/base_run_model.py +++ b/src/ert/run_models/base_run_model.py @@ -395,7 +395,6 @@ def _build_ensemble( name="legacy step", max_runtime=self.ert().analysisConfig().max_runtime, run_arg=run_arg, - ensemble_config=self.ert().resConfig().ensemble_config, num_cpu=self.ert().get_num_cpu(), run_path=Path(run_arg.runpath), job_script=self.ert().resConfig().queue_config.job_script, diff --git a/src/ert/simulator/batch_simulator.py b/src/ert/simulator/batch_simulator.py index a019af3cd41..2d9162015a5 100644 --- a/src/ert/simulator/batch_simulator.py +++ b/src/ert/simulator/batch_simulator.py @@ -3,7 +3,6 @@ from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union import numpy as np -from sortedcontainers import SortedList from ert.config import ErtConfig, ExtParamConfig, GenDataConfig from ert.enkf_main import EnKFMain @@ -112,9 +111,7 @@ def callback(*args, **kwargs): for key in results: ens_config.addNode( - GenDataConfig( - name=key, input_file=f"{key}_%d", report_steps=SortedList([0]) - ) + GenDataConfig(name=key, input_file=f"{key}_%d", report_steps=[0]) ) def _setup_sim( @@ -237,7 +234,8 @@ def start( batch complete before you start a new batch. """ experiment = storage.create_experiment( - self.ert_config.ensemble_config.parameter_configuration + parameters=self.ert_config.ensemble_config.parameter_configuration, + responses=self.ert_config.ensemble_config.response_configuration, ) ensemble = storage.create_ensemble( experiment.id, name=case_name, ensemble_size=self.ert.getEnsembleSize() diff --git a/src/ert/storage/local_ensemble.py b/src/ert/storage/local_ensemble.py index 7b78c9300bf..03d8725c9f8 100644 --- a/src/ert/storage/local_ensemble.py +++ b/src/ert/storage/local_ensemble.py @@ -20,7 +20,6 @@ if TYPE_CHECKING: import numpy.typing as npt - from ert.config import EnsembleConfig from ert.run_arg import RunArg from ert.storage.local_experiment import ( LocalExperimentAccessor, @@ -34,7 +33,6 @@ def _load_realization( sim_fs: LocalEnsembleAccessor, realisation: int, - ensemble_config: EnsembleConfig, run_args: List[RunArg], ) -> Tuple[LoadResult, int]: sim_fs.update_realization_state( @@ -42,7 +40,7 @@ def _load_realization( [RealizationState.UNDEFINED], RealizationState.INITIALIZED, ) - result = forward_model_ok(run_args[realisation], ensemble_config) + result = forward_model_ok(run_args[realisation]) sim_fs.state_map[realisation] = ( RealizationState.HAS_DATA if result.status == LoadStatus.LOAD_SUCCESSFUL @@ -280,7 +278,6 @@ def sync(self) -> None: def load_from_run_path( self, ensemble_size: int, - ensemble_config: EnsembleConfig, run_args: List[RunArg], active_realizations: List[bool], ) -> int: @@ -290,7 +287,7 @@ def load_from_run_path( async_result = [ pool.apply_async( _load_realization, - (self, iens, ensemble_config, run_args), + (self, iens, run_args), ) for iens in range(ensemble_size) if active_realizations[iens] diff --git a/src/ert/storage/local_experiment.py b/src/ert/storage/local_experiment.py index e91065b8691..0d6d27e4c08 100644 --- a/src/ert/storage/local_experiment.py +++ b/src/ert/storage/local_experiment.py @@ -8,7 +8,15 @@ import numpy as np import xtgeo -from ert.config import ExtParamConfig, Field, GenKwConfig, SurfaceConfig +from ert.config import ( + ExtParamConfig, + Field, + GenDataConfig, + GenKwConfig, + SummaryConfig, + SurfaceConfig, +) +from ert.config.response_config import ResponseConfig if TYPE_CHECKING: from ert.config.parameter_config import ParameterConfig @@ -24,8 +32,15 @@ } +_KNOWN_RESPONSE_TYPES = { + SummaryConfig.__name__: SummaryConfig, + GenDataConfig.__name__: GenDataConfig, +} + + class LocalExperimentReader: _parameter_file = Path("parameter.json") + _responses_file = Path("responses.json") _simulation_arguments_file = Path("simulation_arguments.json") def __init__(self, storage: LocalStorageReader, uuid: UUID, path: Path) -> None: @@ -57,6 +72,16 @@ def parameter_info(self) -> Dict[str, Any]: info = json.load(f) return info + @property + def response_info(self) -> Dict[str, Any]: + info: Dict[str, Any] + path = self.mount_point / self._responses_file + if not path.exists(): + raise ValueError(f"{str(self._responses_file)} does not exist") + with open(path, encoding="utf-8", mode="r") as f: + info = json.load(f) + return info + def get_surface(self, name: str) -> xtgeo.RegularSurface: return xtgeo.surface_from_file( str(self.mount_point / f"{name}.irap"), @@ -72,14 +97,23 @@ def parameter_configuration(self) -> Dict[str, ParameterConfig]: params[data["name"]] = _KNOWN_PARAMETER_TYPES[param_type](**data) return params + @property + def response_configuration(self) -> Dict[str, ResponseConfig]: + params = {} + for data in self.response_info.values(): + param_type = data.pop("_ert_kind") + params[data["name"]] = _KNOWN_RESPONSE_TYPES[param_type](**data) + return params + class LocalExperimentAccessor(LocalExperimentReader): - def __init__( + def __init__( # pylint: disable=too-many-arguments self, storage: LocalStorageAccessor, uuid: UUID, path: Path, parameters: Optional[List[ParameterConfig]] = None, + responses: Optional[List[ResponseConfig]] = None, ) -> None: self._storage: LocalStorageAccessor = storage self._id = uuid @@ -100,6 +134,19 @@ def __init__( with open(self.mount_point / self._parameter_file, "w", encoding="utf-8") as f: json.dump(parameter_data, f) + responses = [] if responses is None else responses + response_file = self.mount_point / self._responses_file + response_data = ( + json.loads(response_file.read_text(encoding="utf-8")) + if response_file.exists() + else {} + ) + + for response in responses: + response_data.update({response.name: response.to_dict()}) + with open(response_file, "w", encoding="utf-8") as f: + json.dump(response_data, f, default=str) + @property def ensembles(self) -> Generator[LocalEnsembleAccessor, None, None]: yield from super().ensembles # type: ignore diff --git a/src/ert/storage/local_storage.py b/src/ert/storage/local_storage.py index ed0d64bfbf9..213d428e313 100644 --- a/src/ert/storage/local_storage.py +++ b/src/ert/storage/local_storage.py @@ -38,12 +38,12 @@ if TYPE_CHECKING: - from ert.config.parameter_config import ParameterConfig + from ert.config import ParameterConfig, ResponseConfig logger = logging.getLogger(__name__) -_LOCAL_STORAGE_VERSION = 2 +_LOCAL_STORAGE_VERSION = 3 class _Migrations(BaseModel): @@ -180,10 +180,21 @@ def __init__( block_fs.migrate(self.path) self._add_migration_information(0, "block_fs") elif version == 1: - from ert.storage.migration import gen_kw # pylint: disable=C0415 + from ert.storage.migration import ( # pylint: disable=C0415 + gen_kw, + response_info, + ) gen_kw.migrate(self.path) + response_info.migrate(self.path) self._add_migration_information(1, "gen_kw") + elif version == 2: + from ert.storage.migration import ( # pylint: disable=C0415 + response_info, + ) + + response_info.migrate(self.path) + self._add_migration_information(2, "response") except Exception as err: # pylint: disable=broad-exception-caught logger.error(f"Migrating storage at {self.path} failed with {err}") @@ -220,12 +231,16 @@ def to_accessor(self) -> LocalStorageAccessor: return self def create_experiment( - self, parameters: Optional[List[ParameterConfig]] = None + self, + parameters: Optional[List[ParameterConfig]] = None, + responses: Optional[List[ResponseConfig]] = None, ) -> LocalExperimentAccessor: exp_id = uuid4() path = self._experiment_path(exp_id) path.mkdir(parents=True, exist_ok=False) - exp = LocalExperimentAccessor(self, exp_id, path, parameters=parameters) + exp = LocalExperimentAccessor( + self, exp_id, path, parameters=parameters, responses=responses + ) self._experiments[exp.id] = exp return exp diff --git a/src/ert/storage/migration/block_fs.py b/src/ert/storage/migration/block_fs.py index 7b13452de5f..d48dae36f6b 100644 --- a/src/ert/storage/migration/block_fs.py +++ b/src/ert/storage/migration/block_fs.py @@ -17,8 +17,10 @@ from ert.config import ( EnsembleConfig, Field, + GenDataConfig, GenKwConfig, ParameterConfig, + ResponseConfig, SurfaceConfig, field_transform, ) @@ -95,12 +97,20 @@ def migrate_case(storage: StorageAccessor, path: Path) -> None: parameter_configs.extend(_migrate_gen_kw_info(data_file, ens_config)) parameter_configs.extend(_migrate_surface_info(data_file, ens_config)) + # Copy experiment response data + response_configs: List[ResponseConfig] = [] + for data_file in response_files: + response_configs.extend(_migrate_summary_info(data_file, ens_config)) + response_configs.extend(_migrate_gen_data_info(data_file, ens_config)) + # Guess iteration number iteration = 0 if (match := re.search(r"_(\d+)$", path.name)) is not None: iteration = int(match[1]) - experiment = storage.create_experiment(parameters=parameter_configs) + experiment = storage.create_experiment( + parameters=parameter_configs, responses=response_configs + ) ensemble = experiment.create_ensemble( name=path.name, ensemble_size=ensemble_size, @@ -256,6 +266,18 @@ def _migrate_field( ensemble.save_parameters(block.name, block.realization_index, ds) +def _migrate_summary_info( + data_file: DataFile, + ens_config: EnsembleConfig, +) -> List[ResponseConfig]: + seen = set() + for block in data_file.blocks(Kind.SUMMARY): + if block.name in seen: + continue + seen.add(block.name) + return [ens_config["summary"]] if seen else [] # type: ignore + + def _migrate_summary( ensemble: EnsembleAccessor, data_file: DataFile, @@ -291,6 +313,23 @@ def _migrate_summary( ensemble.save_response("summary", ds, realization_index) +def _migrate_gen_data_info( + data_file: DataFile, + ens_config: EnsembleConfig, +) -> List[ResponseConfig]: + seen = set() + configs: List[ResponseConfig] = [] + for block in data_file.blocks(Kind.GEN_DATA): + if block.name in seen: + continue + seen.add(block.name) + config = ens_config[block.name] + assert isinstance(config, GenDataConfig) + + configs.append(config) + return configs + + def _migrate_gen_data( ensemble: EnsembleAccessor, data_file: DataFile, diff --git a/src/ert/storage/migration/response_info.py b/src/ert/storage/migration/response_info.py new file mode 100644 index 00000000000..dd27292288d --- /dev/null +++ b/src/ert/storage/migration/response_info.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +import json +from typing import TYPE_CHECKING + +from ert.storage.local_storage import local_storage_get_ert_config + +if TYPE_CHECKING: + from pathlib import Path + + +def migrate(path: Path) -> None: + ert_config = local_storage_get_ert_config() + ens_config = ert_config.ensemble_config + for experiment in path.glob("experiments/*"): + response_info = {} + for response in ens_config.response_configuration: + response_info[response.name] = response.to_dict() + with open(experiment / "responses.json", "w", encoding="utf-8") as fout: + fout.write(json.dumps(response_info, default=str)) + with open(path / "index.json", encoding="utf-8") as f: + index_json = json.load(f) + index_json["version"] = 3 + with open(path / "index.json", "w", encoding="utf-8") as f: + f.write(json.dumps(index_json)) diff --git a/test-data/block_storage b/test-data/block_storage index aac08ccf968..84305c7fb68 160000 --- a/test-data/block_storage +++ b/test-data/block_storage @@ -1 +1 @@ -Subproject commit aac08ccf9687e507d38e3de6c386d3014b5e9865 +Subproject commit 84305c7fb683c8aa49d2e46723307721447658fa diff --git a/tests/unit_tests/ensemble_evaluator/conftest.py b/tests/unit_tests/ensemble_evaluator/conftest.py index 7ff87201b32..5c2957257a9 100644 --- a/tests/unit_tests/ensemble_evaluator/conftest.py +++ b/tests/unit_tests/ensemble_evaluator/conftest.py @@ -71,7 +71,7 @@ def _make_ensemble_builder(monkeypatch, tmpdir, num_reals, num_jobs, job_sleep=0 monkeypatch.setattr( ert.job_queue.job_queue_node, "forward_model_ok", - lambda _, _b: (LoadStatus.LOAD_SUCCESSFUL, ""), + lambda _: (LoadStatus.LOAD_SUCCESSFUL, ""), ) monkeypatch.setattr( JobQueueNode, "run_exit_callback", lambda _: (LoadStatus.LOAD_FAILURE, "") @@ -133,7 +133,6 @@ class RunArg: run_arg=Mock(iens=iens), # the first callback_argument is expected to be a run_arg # from the run_arg, the queue wants to access the iens prop - ensemble_config=None, run_path=run_path, num_cpu=1, name="dummy step", diff --git a/tests/unit_tests/ensemble_evaluator/ensemble_evaluator_utils.py b/tests/unit_tests/ensemble_evaluator/ensemble_evaluator_utils.py index fb7f7cfd52d..69451551892 100644 --- a/tests/unit_tests/ensemble_evaluator/ensemble_evaluator_utils.py +++ b/tests/unit_tests/ensemble_evaluator/ensemble_evaluator_utils.py @@ -71,7 +71,6 @@ def __init__(self, _iter, reals, steps, jobs, id_): ], name=f"step-{step_no}", max_runtime=0, - ensemble_config=None, num_cpu=0, run_path=None, run_arg=None, diff --git a/tests/unit_tests/ensemble_evaluator/test_ensemble_builder.py b/tests/unit_tests/ensemble_evaluator/test_ensemble_builder.py index 2f9ef599a46..8562dd94936 100644 --- a/tests/unit_tests/ensemble_evaluator/test_ensemble_builder.py +++ b/tests/unit_tests/ensemble_evaluator/test_ensemble_builder.py @@ -29,7 +29,6 @@ def test_build_ensemble(active_real): job_script="job_script", job_name="job_name", num_cpu=1, - ensemble_config=MagicMock(), jobs=[ LegacyJob( ext_job=MagicMock(), diff --git a/tests/unit_tests/job_queue/test_job_queue.py b/tests/unit_tests/job_queue/test_job_queue.py index d59ff634d5e..16115faa103 100644 --- a/tests/unit_tests/job_queue/test_job_queue.py +++ b/tests/unit_tests/job_queue/test_job_queue.py @@ -93,7 +93,6 @@ def create_local_queue( status_file=job_queue.status_file, exit_file=job_queue.exit_file, run_arg=RunArg(iens), - ensemble_config=None, max_runtime=max_runtime, callback_timeout=callback_timeout, ) diff --git a/tests/unit_tests/scenarios/test_summary_response.py b/tests/unit_tests/scenarios/test_summary_response.py index 2e296b948dd..eb4ed92a318 100644 --- a/tests/unit_tests/scenarios/test_summary_response.py +++ b/tests/unit_tests/scenarios/test_summary_response.py @@ -18,7 +18,8 @@ def prior_ensemble(storage, setup_configuration): ert_config = setup_configuration.ert_config return storage.create_experiment( - parameters=ert_config.ensemble_config.parameter_configuration + parameters=ert_config.ensemble_config.parameter_configuration, + responses=ert_config.ensemble_config.response_configuration, ).create_ensemble(ensemble_size=100, name="prior") diff --git a/tests/unit_tests/status/test_tracking_integration.py b/tests/unit_tests/status/test_tracking_integration.py index e1297aef97c..298096fbce5 100644 --- a/tests/unit_tests/status/test_tracking_integration.py +++ b/tests/unit_tests/status/test_tracking_integration.py @@ -406,7 +406,8 @@ def test_tracking_missing_ecl( storage, parsed, storage.create_experiment( - ert_config.ensemble_config.parameter_configuration + parameters=ert_config.ensemble_config.parameter_configuration, + responses=ert_config.ensemble_config.response_configuration, ), ) diff --git a/tests/unit_tests/storage/migration/test_version_2.py b/tests/unit_tests/storage/migration/test_version_2.py new file mode 100644 index 00000000000..d3ce13f248c --- /dev/null +++ b/tests/unit_tests/storage/migration/test_version_2.py @@ -0,0 +1,36 @@ +import json + +import pytest + +from ert.config import ErtConfig +from ert.storage import open_storage +from ert.storage.local_storage import local_storage_set_ert_config + + +@pytest.fixture(scope="module", autouse=True) +def set_ert_config(block_storage_path): + ert_config = ErtConfig.from_file( + str(block_storage_path / "version-2/snake_oil/snake_oil.ert") + ) + yield local_storage_set_ert_config(ert_config) + local_storage_set_ert_config(None) + + +def test_migrate_gen_kw(setup_case, set_ert_config): + ert_config = setup_case("block_storage/version-2/snake_oil", "snake_oil.ert") + with open_storage(ert_config.ens_path, "w") as storage: + assert len(list(storage.experiments)) == 1 + experiment = list(storage.experiments)[0] + response_info = json.loads( + (experiment._path / "responses.json").read_text(encoding="utf-8") + ) + assert ( + list(experiment.response_configuration.values()) + == ert_config.ensemble_config.response_configuration + ) + assert list(response_info) == [ + "SNAKE_OIL_OPR_DIFF", + "SNAKE_OIL_WPR_DIFF", + "SNAKE_OIL_GPR_DIFF", + "summary", + ] diff --git a/tests/unit_tests/storage/test_local_storage.py b/tests/unit_tests/storage/test_local_storage.py index 0f85e692ff2..bb1845cefa1 100644 --- a/tests/unit_tests/storage/test_local_storage.py +++ b/tests/unit_tests/storage/test_local_storage.py @@ -1,5 +1,8 @@ +import hypothesis.strategies as st import pytest +from hypothesis import given +from ert.config import GenDataConfig, SummaryConfig from ert.storage import StorageReader, open_storage from ert.storage import local_storage as local @@ -98,3 +101,66 @@ def test_to_accessor(tmp_path): with open_storage(tmp_path, mode="w") as storage_accessor: storage_reader: StorageReader = storage_accessor storage_reader.to_accessor() + + +@pytest.fixture(scope="module") +def shared_storage(tmp_path_factory): + yield tmp_path_factory.mktemp("storage") / "serialize" + + +@given(name=st.text(), input_file=st.text()) +def test_serialize_deserialize_gen_data_responses(shared_storage, name, input_file): + responses = [GenDataConfig(name=name, input_file=input_file)] + with open_storage(shared_storage, "w") as storage: + experiment = storage.create_experiment( + responses=responses, + ) + storage.create_ensemble(experiment, ensemble_size=5) + with open_storage(shared_storage) as storage: + assert ( + list(storage.get_experiment(experiment.id).response_configuration.values()) + == responses + ) + + +@st.composite +def refcase(draw): + datetimes = draw(st.lists(st.datetimes())) + container_type = draw(st.sampled_from([set(), list(), None])) + if isinstance(container_type, set): + return set(datetimes) + elif isinstance(container_type, list): + return [str(date) for date in datetimes] + return None + + +@given(refcase()) +def test_refcase_conversion_to_set(refcase): + SummaryConfig(name="name", input_file="input_file", keys=["keys"], refcase=refcase) + assert isinstance(SummaryConfig.refcase, set) or SummaryConfig.refcase is None + + +@given( + name=st.text(), + input_file=st.text(), + keys=st.lists(st.text()), + refcase=st.sets(st.datetimes()), +) +def test_serialize_deserialize_summary_responses( + shared_storage, name, input_file, keys, refcase +): + if isinstance(refcase, list): + refcase = [str(date) for date in refcase] + responses = [ + SummaryConfig(name=name, input_file=input_file, keys=keys, refcase=refcase) + ] + with open_storage(shared_storage, "w") as storage: + experiment = storage.create_experiment( + responses=responses, + ) + storage.create_ensemble(experiment, ensemble_size=5) + with open_storage(shared_storage) as storage: + assert ( + list(storage.get_experiment(experiment.id).response_configuration.values()) + == responses + ) diff --git a/tests/unit_tests/test_load_forward_model.py b/tests/unit_tests/test_load_forward_model.py index f5e9316fd32..2866b7aa434 100644 --- a/tests/unit_tests/test_load_forward_model.py +++ b/tests/unit_tests/test_load_forward_model.py @@ -16,6 +16,28 @@ from ert.storage import open_storage +@pytest.fixture() +@pytest.mark.usefixtures("use_tmpdir") +def setup_case(storage): + def func(config_text): + Path("config.ert").write_text(config_text, encoding="utf-8") + + ert_config = ErtConfig.from_file("config.ert") + ert = EnKFMain(ert_config) + prior_ensemble = storage.create_ensemble( + storage.create_experiment( + responses=ert_config.ensemble_config.response_configuration + ), + name="prior", + ensemble_size=ert.getEnsembleSize(), + ) + run_context = ert.ensemble_context(prior_ensemble, [True], iteration=0) + ert.createRunPath(run_context) + return ert, prior_ensemble + + yield func + + def run_simulator(time_step_count, start_date) -> EclSum: ecl_sum = EclSum.writer("SNAKE_OIL_FIELD", start_date, 10, 10, 10) @@ -146,6 +168,12 @@ def test_load_forward_model_summary( ert_config = ErtConfig.from_file("config.ert") ert = EnKFMain(ert_config) + experiment_id = storage.create_experiment( + responses=ert_config.ensemble_config.response_configuration + ) + prior_ensemble = storage.create_ensemble( + experiment_id, name="prior", ensemble_size=100 + ) run_context = ert.ensemble_context(prior_ensemble, [True], iteration=0) ert.createRunPath(run_context) @@ -159,20 +187,15 @@ def test_load_forward_model_summary( @pytest.mark.usefixtures("use_tmpdir") -def test_load_forward_model_gen_data(prior_ensemble): +def test_load_forward_model_gen_data(setup_case): config_text = dedent( """ NUM_REALIZATIONS 1 GEN_DATA RESPONSE RESULT_FILE:response_%d.out REPORT_STEPS:0,1 INPUT_FORMAT:ASCII """ ) - Path("config.ert").write_text(config_text, encoding="utf-8") - - ert_config = ErtConfig.from_file("config.ert") - ert = EnKFMain(ert_config) - run_context = ert.ensemble_context(prior_ensemble, [True], iteration=0) - ert.createRunPath(run_context) + ert, prior_ensemble = setup_case(config_text) run_path = Path("simulations/realization-0/iter-0/") with open(run_path / "response_0.out", "w", encoding="utf-8") as fout: fout.write("\n".join(["1", "2", "3"])) @@ -189,20 +212,15 @@ def test_load_forward_model_gen_data(prior_ensemble): @pytest.mark.usefixtures("use_tmpdir") -def test_single_valued_gen_data_with_active_info_is_loaded(prior_ensemble): +def test_single_valued_gen_data_with_active_info_is_loaded(setup_case): config_text = dedent( """ NUM_REALIZATIONS 1 GEN_DATA RESPONSE RESULT_FILE:response_%d.out REPORT_STEPS:0 INPUT_FORMAT:ASCII """ ) - Path("config.ert").write_text(config_text, encoding="utf-8") + ert, prior_ensemble = setup_case(config_text) - ert_config = ErtConfig.from_file("config.ert") - ert = EnKFMain(ert_config) - - run_context = ert.ensemble_context(prior_ensemble, [True], iteration=0) - ert.createRunPath(run_context) run_path = Path("simulations/realization-0/iter-0/") with open(run_path / "response_0.out", "w", encoding="utf-8") as fout: fout.write("\n".join(["1"])) @@ -217,20 +235,15 @@ def test_single_valued_gen_data_with_active_info_is_loaded(prior_ensemble): @pytest.mark.usefixtures("use_tmpdir") -def test_that_all_decativated_values_are_loaded(prior_ensemble): +def test_that_all_decativated_values_are_loaded(setup_case): config_text = dedent( """ NUM_REALIZATIONS 1 GEN_DATA RESPONSE RESULT_FILE:response_%d.out REPORT_STEPS:0 INPUT_FORMAT:ASCII """ ) - Path("config.ert").write_text(config_text, encoding="utf-8") + ert, prior_ensemble = setup_case(config_text) - ert_config = ErtConfig.from_file("config.ert") - ert = EnKFMain(ert_config) - - run_context = ert.ensemble_context(prior_ensemble, [True], iteration=0) - ert.createRunPath(run_context) run_path = Path("simulations/realization-0/iter-0/") with open(run_path / "response_0.out", "w", encoding="utf-8") as fout: fout.write("\n".join(["-1"])) @@ -248,7 +261,7 @@ def test_that_all_decativated_values_are_loaded(prior_ensemble): @pytest.mark.usefixtures("use_tmpdir") -def test_loading_gen_data_without_restart(prior_ensemble): +def test_loading_gen_data_without_restart(storage): config_text = dedent( """ NUM_REALIZATIONS 1 @@ -259,6 +272,13 @@ def test_loading_gen_data_without_restart(prior_ensemble): ert_config = ErtConfig.from_file("config.ert") ert = EnKFMain(ert_config) + prior_ensemble = storage.create_ensemble( + storage.create_experiment( + responses=ert_config.ensemble_config.response_configuration + ), + name="prior", + ensemble_size=ert.getEnsembleSize(), + ) run_context = ert.ensemble_context(prior_ensemble, [True], iteration=0) ert.createRunPath(run_context) diff --git a/tests/unit_tests/test_summary_response.py b/tests/unit_tests/test_summary_response.py index 451bf2c346b..df28acc2721 100644 --- a/tests/unit_tests/test_summary_response.py +++ b/tests/unit_tests/test_summary_response.py @@ -31,7 +31,9 @@ def test_load_summary_response_restart_not_zero(tmpdir, snapshot, request, stora ert_config = ErtConfig.from_file("config.ert") ert = EnKFMain(ert_config) - experiment_id = storage.create_experiment() + experiment_id = storage.create_experiment( + responses=ert_config.ensemble_config.response_configuration + ) ensemble = storage.create_ensemble( experiment_id, name="prior", ensemble_size=ert.getEnsembleSize() ) From 3f0e158fbc4f434e9abad4b01967d309d7febf5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=98yvind=20Eide?= Date: Mon, 25 Sep 2023 10:31:47 +0200 Subject: [PATCH 2/4] Add test for migration for storage version 1 --- .../storage/migration/test_version_1.py | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 tests/unit_tests/storage/migration/test_version_1.py diff --git a/tests/unit_tests/storage/migration/test_version_1.py b/tests/unit_tests/storage/migration/test_version_1.py new file mode 100644 index 00000000000..dcf47525c76 --- /dev/null +++ b/tests/unit_tests/storage/migration/test_version_1.py @@ -0,0 +1,27 @@ +import json + +import pytest + +from ert.config import ErtConfig +from ert.storage import open_storage +from ert.storage.local_storage import local_storage_set_ert_config + + +@pytest.fixture(scope="module", autouse=True) +def set_ert_config(block_storage_path): + ert_config = ErtConfig.from_file( + str(block_storage_path / "version-1/poly_example/poly.ert") + ) + yield local_storage_set_ert_config(ert_config) + local_storage_set_ert_config(None) + + +def test_migrate_gen_kw(setup_case, set_ert_config): + setup_case("block_storage/version-1/poly_example", "poly.ert") + with open_storage("storage", "w") as storage: + assert len(list(storage.experiments)) == 1 + experiment = list(storage.experiments)[0] + param_info = json.loads( + (experiment._path / "parameter.json").read_text(encoding="utf-8") + ) + assert "COEFFS" in param_info From d0ee7cd7fd35493898ddcd83960861510990e91c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=98yvind=20Eide?= Date: Mon, 25 Sep 2023 11:42:52 +0200 Subject: [PATCH 3/4] Pass in job_script --- src/ert/job_queue/queue.py | 3 +-- src/ert/simulator/simulation_context.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/ert/job_queue/queue.py b/src/ert/job_queue/queue.py index 78d0300d70b..9b05609216a 100644 --- a/src/ert/job_queue/queue.py +++ b/src/ert/job_queue/queue.py @@ -477,13 +477,12 @@ async def execute_queue_comms_via_bus( # pylint: disable=too-many-arguments def add_job_from_run_arg( self, run_arg: "RunArg", - ert_config: "ErtConfig", + job_script: str, max_runtime: Optional[int], num_cpu: int, ) -> None: job_name = run_arg.job_name run_path = run_arg.runpath - job_script = ert_config.queue_config.job_script job = JobQueueNode( job_script=job_script, diff --git a/src/ert/simulator/simulation_context.py b/src/ert/simulator/simulation_context.py index 91b6dfe700b..a4be80dc8d0 100644 --- a/src/ert/simulator/simulation_context.py +++ b/src/ert/simulator/simulation_context.py @@ -50,7 +50,7 @@ def _run_forward_model( continue job_queue.add_job_from_run_arg( run_arg, - ert.resConfig(), + ert.resConfig().queue_config.job_script, max_runtime, ert.get_num_cpu(), ) From 7d48e315b7bedb2210866743fcb8147b4a60fcb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=98yvind=20Eide?= Date: Mon, 25 Sep 2023 11:43:15 +0200 Subject: [PATCH 4/4] Test refactor This removes some redundant functions and simplifies the mocking as some of it was outdated. --- .../unit_tests/ensemble_evaluator/conftest.py | 5 - tests/unit_tests/job_queue/conftest.py | 43 +++++++++ tests/unit_tests/job_queue/test_job_queue.py | 85 ++++++----------- .../job_queue/test_job_queue_manager.py | 93 ++++--------------- .../test_job_queue_manager_torque.py | 84 +++++------------ .../status/test_tracking_integration.py | 3 +- tests/unit_tests/test_load_forward_model.py | 4 +- 7 files changed, 114 insertions(+), 203 deletions(-) create mode 100644 tests/unit_tests/job_queue/conftest.py diff --git a/tests/unit_tests/ensemble_evaluator/conftest.py b/tests/unit_tests/ensemble_evaluator/conftest.py index 5c2957257a9..288a4ba383f 100644 --- a/tests/unit_tests/ensemble_evaluator/conftest.py +++ b/tests/unit_tests/ensemble_evaluator/conftest.py @@ -1,7 +1,6 @@ import json import os import stat -from dataclasses import dataclass from pathlib import Path from unittest.mock import Mock @@ -106,10 +105,6 @@ def _make_ensemble_builder(monkeypatch, tmpdir, num_reals, num_jobs, job_sleep=0 ) ) - @dataclass - class RunArg: - iens: int - for iens in range(0, num_reals): run_path = Path(tmpdir / f"real_{iens}") os.mkdir(run_path) diff --git a/tests/unit_tests/job_queue/conftest.py b/tests/unit_tests/job_queue/conftest.py new file mode 100644 index 00000000000..21d14da146a --- /dev/null +++ b/tests/unit_tests/job_queue/conftest.py @@ -0,0 +1,43 @@ +import stat +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +import ert +from ert.load_status import LoadStatus + + +@pytest.fixture +def mock_fm_ok(monkeypatch): + fm_ok = MagicMock(return_value=(LoadStatus.LOAD_SUCCESSFUL, "")) + monkeypatch.setattr(ert.job_queue.job_queue_node, "forward_model_ok", fm_ok) + yield fm_ok + + +@pytest.fixture +def simple_script(tmp_path): + SIMPLE_SCRIPT = """#!/bin/sh +echo "finished successfully" > STATUS +""" + fout = Path(tmp_path / "job_script") + fout.write_text(SIMPLE_SCRIPT, encoding="utf-8") + fout.chmod(stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG) + yield str(fout) + + +@pytest.fixture +def failing_script(tmp_path): + """ + This script is susceptible to race conditions. Python works + better than sh.""" + FAILING_SCRIPT = """#!/usr/bin/env python +import sys +with open("one_byte_pr_invocation", "a") as f: + f.write(".") +sys.exit(1) + """ + fout = Path(tmp_path / "failing_script") + fout.write_text(FAILING_SCRIPT, encoding="utf-8") + fout.chmod(stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG) + yield str(fout) diff --git a/tests/unit_tests/job_queue/test_job_queue.py b/tests/unit_tests/job_queue/test_job_queue.py index 16115faa103..4822b1dd80f 100644 --- a/tests/unit_tests/job_queue/test_job_queue.py +++ b/tests/unit_tests/job_queue/test_job_queue.py @@ -1,16 +1,15 @@ import json import stat import time -from dataclasses import dataclass from pathlib import Path from threading import BoundedSemaphore from typing import Any, Callable, Dict, Optional -from unittest.mock import patch +from unittest.mock import MagicMock, patch + +import pytest -import ert.callbacks from ert.config import QueueSystem from ert.job_queue import Driver, JobQueue, JobQueueNode, JobStatus -from ert.load_status import LoadStatus def wait_for( @@ -28,71 +27,46 @@ def wait_for( ) -def dummy_exit_callback(*args): - print(args) - - DUMMY_CONFIG: Dict[str, Any] = { "job_script": "job_script.py", "num_cpu": 1, "job_name": "dummy_job_{}", "run_path": "dummy_path_{}", - "ok_callback": lambda _, _b: (LoadStatus.LOAD_SUCCESSFUL, ""), - "exit_callback": dummy_exit_callback, } -SIMPLE_SCRIPT = """#!/usr/bin/env python -print('hello') -""" -NEVER_ENDING_SCRIPT = """#!/usr/bin/env python +@pytest.fixture +def never_ending_script(tmp_path): + NEVER_ENDING_SCRIPT = """#!/usr/bin/env python import time while True: time.sleep(0.5) -""" - -FAILING_SCRIPT = """#!/usr/bin/env python -import sys -sys.exit(1) -""" - - -@dataclass -class RunArg: - iens: int + """ + fout = Path(tmp_path / "never_ending_job_script") + fout.write_text(NEVER_ENDING_SCRIPT, encoding="utf-8") + fout.chmod(stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG) + yield str(fout) def create_local_queue( - monkeypatch, executable_script: str, max_submit: int = 1, max_runtime: Optional[int] = None, callback_timeout: Optional["Callable[[int], None]"] = None, ): - monkeypatch.setattr( - ert.job_queue.job_queue_node, "forward_model_ok", DUMMY_CONFIG["ok_callback"] - ) - monkeypatch.setattr( - JobQueueNode, "run_exit_callback", DUMMY_CONFIG["exit_callback"] - ) - driver = Driver(driver_type=QueueSystem.LOCAL) job_queue = JobQueue(driver, max_submit=max_submit) - scriptpath = Path(DUMMY_CONFIG["job_script"]) - scriptpath.write_text(executable_script, encoding="utf-8") - scriptpath.chmod(stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG) - for iens in range(10): Path(DUMMY_CONFIG["run_path"].format(iens)).mkdir(exist_ok=False) job = JobQueueNode( - job_script=DUMMY_CONFIG["job_script"], + job_script=executable_script, job_name=DUMMY_CONFIG["job_name"].format(iens), run_path=DUMMY_CONFIG["run_path"].format(iens), num_cpu=DUMMY_CONFIG["num_cpu"], status_file=job_queue.status_file, exit_file=job_queue.exit_file, - run_arg=RunArg(iens), + run_arg=MagicMock(), max_runtime=max_runtime, callback_timeout=callback_timeout, ) @@ -109,9 +83,9 @@ def start_all(job_queue, sema_pool): job = job_queue.fetch_next_waiting() -def test_kill_jobs(tmpdir, monkeypatch): +def test_kill_jobs(tmpdir, monkeypatch, never_ending_script): monkeypatch.chdir(tmpdir) - job_queue = create_local_queue(monkeypatch, NEVER_ENDING_SCRIPT) + job_queue = create_local_queue(never_ending_script) assert job_queue.queue_size == 10 assert job_queue.is_active() @@ -140,9 +114,9 @@ def test_kill_jobs(tmpdir, monkeypatch): job.wait_for() -def test_add_jobs(tmpdir, monkeypatch): +def test_add_jobs(tmpdir, monkeypatch, simple_script): monkeypatch.chdir(tmpdir) - job_queue = create_local_queue(monkeypatch, SIMPLE_SCRIPT) + job_queue = create_local_queue(simple_script) assert job_queue.queue_size == 10 assert job_queue.is_active() @@ -160,9 +134,9 @@ def test_add_jobs(tmpdir, monkeypatch): job.wait_for() -def test_failing_jobs(tmpdir, monkeypatch): +def test_failing_jobs(tmpdir, monkeypatch, failing_script): monkeypatch.chdir(tmpdir) - job_queue = create_local_queue(monkeypatch, FAILING_SCRIPT, max_submit=1) + job_queue = create_local_queue(failing_script, max_submit=1) assert job_queue.queue_size == 10 assert job_queue.is_active() @@ -186,20 +160,17 @@ def test_failing_jobs(tmpdir, monkeypatch): assert job_queue.snapshot()[iens] == str(JobStatus.FAILED) -def test_timeout_jobs(tmpdir, monkeypatch): +def test_timeout_jobs(tmpdir, monkeypatch, never_ending_script): monkeypatch.chdir(tmpdir) job_numbers = set() - def callback(iens): - nonlocal job_numbers - job_numbers.add(iens) + mock_callback = MagicMock() job_queue = create_local_queue( - monkeypatch, - NEVER_ENDING_SCRIPT, + never_ending_script, max_submit=1, max_runtime=5, - callback_timeout=callback, + callback_timeout=mock_callback, ) assert job_queue.queue_size == 10 @@ -222,15 +193,15 @@ def callback(iens): iens = job_queue._differ.qindex_to_iens(q_index) assert job_queue.snapshot()[iens] == str(JobStatus.IS_KILLED) - assert job_numbers == set(range(10)) + assert len(mock_callback.mock_calls) == 20 for job in job_queue.job_list: job.wait_for() -def test_add_dispatch_info(tmpdir, monkeypatch): +def test_add_dispatch_info(tmpdir, monkeypatch, simple_script): monkeypatch.chdir(tmpdir) - job_queue = create_local_queue(monkeypatch, SIMPLE_SCRIPT) + job_queue = create_local_queue(simple_script) ens_id = "some_id" cert = "My very nice cert" token = "my_super_secret_token" @@ -259,9 +230,9 @@ def test_add_dispatch_info(tmpdir, monkeypatch): assert (runpath / cert_file).read_text(encoding="utf-8") == cert -def test_add_dispatch_info_cert_none(tmpdir, monkeypatch): +def test_add_dispatch_info_cert_none(tmpdir, monkeypatch, simple_script): monkeypatch.chdir(tmpdir) - job_queue = create_local_queue(monkeypatch, SIMPLE_SCRIPT) + job_queue = create_local_queue(simple_script) ens_id = "some_id" dispatch_url = "wss://example.org" cert = None diff --git a/tests/unit_tests/job_queue/test_job_queue_manager.py b/tests/unit_tests/job_queue/test_job_queue_manager.py index fbf7c963f9b..64979b024da 100644 --- a/tests/unit_tests/job_queue/test_job_queue_manager.py +++ b/tests/unit_tests/job_queue/test_job_queue_manager.py @@ -1,62 +1,28 @@ import os import stat -from dataclasses import dataclass from pathlib import Path from threading import BoundedSemaphore -from typing import Callable, List, TypedDict +from typing import List, TypedDict +from unittest.mock import MagicMock import pytest -import ert.callbacks from ert.config import QueueSystem from ert.job_queue import Driver, JobQueue, JobQueueManager, JobQueueNode, JobStatus -from ert.load_status import LoadStatus - - -@dataclass -class RunArg: - iens: int class Config(TypedDict): - job_script: str num_cpu: int job_name: str run_path: str - ok_callback: Callable - exit_callback: Callable - - -def dummy_ok_callback(runarg, path): - (Path(path) / "OK").write_text("success", encoding="utf-8") - return (LoadStatus.LOAD_SUCCESSFUL, "") - - -def dummy_exit_callback(self): - Path("ERROR").write_text("failure", encoding="utf-8") DUMMY_CONFIG: Config = { - "job_script": "job_script.py", "num_cpu": 1, "job_name": "dummy_job_{}", "run_path": "dummy_path_{}", - "ok_callback": dummy_ok_callback, - "exit_callback": dummy_exit_callback, } -SIMPLE_SCRIPT = """#!/bin/sh -echo "finished successfully" > STATUS -""" - -# This script is susceptible to race conditions. Python works -# better than sh. -FAILING_SCRIPT = """#!/usr/bin/env python -import sys -with open("one_byte_pr_invocation", "a") as f: - f.write(".") -sys.exit(1) -""" MOCK_BSUB = """#!/bin/sh echo "$@" > test.out @@ -67,55 +33,33 @@ def dummy_exit_callback(self): def create_local_queue( - monkeypatch, executable_script: str, max_submit: int = 2, num_realizations: int = 10 + executable_script: str, max_submit: int = 2, num_realizations: int = 10 ): - monkeypatch.setattr( - ert.job_queue.job_queue_node, "forward_model_ok", DUMMY_CONFIG["ok_callback"] - ) - monkeypatch.setattr( - JobQueueNode, "run_exit_callback", DUMMY_CONFIG["exit_callback"] - ) - driver = Driver(driver_type=QueueSystem.LOCAL) job_queue = JobQueue(driver, max_submit=max_submit) - scriptpath = Path(DUMMY_CONFIG["job_script"]) - scriptpath.write_text(executable_script, encoding="utf-8") - scriptpath.chmod(stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG) - for iens in range(num_realizations): Path(DUMMY_CONFIG["run_path"].format(iens)).mkdir() job = JobQueueNode( - job_script=DUMMY_CONFIG["job_script"], + job_script=executable_script, job_name=DUMMY_CONFIG["job_name"].format(iens), run_path=os.path.realpath(DUMMY_CONFIG["run_path"].format(iens)), num_cpu=DUMMY_CONFIG["num_cpu"], status_file=job_queue.status_file, exit_file=job_queue.exit_file, - run_arg=RunArg(iens), - ensemble_config=Path(DUMMY_CONFIG["run_path"].format(iens)).resolve(), + run_arg=MagicMock(), ) job_queue.add_job(job, iens) return job_queue -def test_num_cpu_submitted_correctly_lsf(tmpdir, monkeypatch): +@pytest.mark.usefixtures("use_tmpdir", "mock_fm_ok") +def test_num_cpu_submitted_correctly_lsf(tmpdir, simple_script): """Assert that num_cpu from the ERT configuration is passed on to the bsub command used to submit jobs to LSF""" - monkeypatch.setattr( - ert.job_queue.job_queue_node, "forward_model_ok", DUMMY_CONFIG["ok_callback"] - ) - monkeypatch.setattr( - JobQueueNode, "run_exit_callback", DUMMY_CONFIG["exit_callback"] - ) - monkeypatch.chdir(tmpdir) os.putenv("PATH", os.getcwd() + ":" + os.getenv("PATH")) driver = Driver(driver_type=QueueSystem.LSF) - script = Path(DUMMY_CONFIG["job_script"]) - script.write_text(SIMPLE_SCRIPT, encoding="utf-8") - script.chmod(stat.S_IRWXU) - bsub = Path("bsub") bsub.write_text(MOCK_BSUB, encoding="utf-8") bsub.chmod(stat.S_IRWXU) @@ -125,14 +69,13 @@ def test_num_cpu_submitted_correctly_lsf(tmpdir, monkeypatch): os.mkdir(DUMMY_CONFIG["run_path"].format(job_id)) job = JobQueueNode( - job_script=DUMMY_CONFIG["job_script"], + job_script=simple_script, job_name=DUMMY_CONFIG["job_name"].format(job_id), run_path=os.path.realpath(DUMMY_CONFIG["run_path"].format(job_id)), num_cpu=4, status_file="STATUS", exit_file="ERROR", - run_arg=RunArg(iens=job_id), - ensemble_config=Path(DUMMY_CONFIG["run_path"].format(job_id)).resolve(), + run_arg=MagicMock(), ) pool_sema = BoundedSemaphore(value=2) @@ -153,25 +96,23 @@ def test_num_cpu_submitted_correctly_lsf(tmpdir, monkeypatch): assert found_cpu_arg is True -def test_execute_queue(tmpdir, monkeypatch): +def test_execute_queue(tmpdir, monkeypatch, mock_fm_ok, simple_script): monkeypatch.chdir(tmpdir) - job_queue = create_local_queue(monkeypatch, SIMPLE_SCRIPT) + job_queue = create_local_queue(simple_script) manager = JobQueueManager(job_queue) manager.execute_queue() - for job in job_queue.job_list: - assert (Path(job.run_path) / "OK").read_text(encoding="utf-8") == "success" + assert len(mock_fm_ok.mock_calls) == len(job_queue.job_list) @pytest.mark.parametrize("max_submit_num", [1, 2, 3]) -def test_max_submit_reached(tmpdir, max_submit_num, monkeypatch): +def test_max_submit_reached(tmpdir, max_submit_num, monkeypatch, failing_script): """Check that the JobQueueManager will submit exactly the maximum number of resubmissions in the case of scripts that fail.""" monkeypatch.chdir(tmpdir) num_realizations = 2 job_queue = create_local_queue( - monkeypatch, - FAILING_SCRIPT, + failing_script, max_submit=max_submit_num, num_realizations=num_realizations, ) @@ -193,11 +134,9 @@ def test_max_submit_reached(tmpdir, max_submit_num, monkeypatch): @pytest.mark.parametrize("max_submit_num", [1, 2, 3]) -def test_kill_queue(tmpdir, max_submit_num, monkeypatch): +def test_kill_queue(tmpdir, max_submit_num, monkeypatch, simple_script): monkeypatch.chdir(tmpdir) - job_queue = create_local_queue( - monkeypatch, SIMPLE_SCRIPT, max_submit=max_submit_num - ) + job_queue = create_local_queue(simple_script, max_submit=max_submit_num) manager = JobQueueManager(job_queue) job_queue.kill_all_jobs() manager.execute_queue() diff --git a/tests/unit_tests/job_queue/test_job_queue_manager_torque.py b/tests/unit_tests/job_queue/test_job_queue_manager_torque.py index a0b2fd255c2..76a89f78897 100644 --- a/tests/unit_tests/job_queue/test_job_queue_manager_torque.py +++ b/tests/unit_tests/job_queue/test_job_queue_manager_torque.py @@ -1,16 +1,14 @@ import os import stat -from dataclasses import dataclass from pathlib import Path from threading import BoundedSemaphore -from typing import Callable, TypedDict +from typing import TypedDict +from unittest.mock import MagicMock import pytest -import ert.job_queue.job_queue_node from ert.config import QueueSystem from ert.job_queue import Driver, JobQueueNode, JobStatus -from ert.load_status import LoadStatus @pytest.fixture(name="temp_working_directory") @@ -22,48 +20,17 @@ def fixture_temp_working_directory(tmpdir, monkeypatch): @pytest.fixture(name="dummy_config") def fixture_dummy_config(): return JobConfig( - { - "job_script": "job_script.py", - "num_cpu": 1, - "job_name": "dummy_job_{}", - "run_path": "dummy_path_{}", - "ok_callback": dummy_ok_callback, - "exit_callback": dummy_exit_callback, - } + num_cpu=1, + job_name="dummy_job_{}", + run_path="dummy_path_{}", ) -@dataclass -class RunArg: - iens: int - - class JobConfig(TypedDict): - job_script: str num_cpu: int job_name: str run_path: str - ok_callback: Callable - exit_callback: Callable - - -def dummy_ok_callback(runargs, path): - (Path(path) / "OK").write_text("success", encoding="utf-8") - return (LoadStatus.LOAD_SUCCESSFUL, "") - - -def dummy_exit_callback(*_args): - Path("ERROR").write_text("failure", encoding="utf-8") - -SIMPLE_SCRIPT = """#!/bin/sh -echo "finished successfully" > STATUS -""" - -FAILING_FORWARD_MODEL = """#!/usr/bin/env python -import sys -sys.exit(1) -""" MOCK_QSUB = """#!/bin/sh echo "torque job submitted" > job_output @@ -143,30 +110,23 @@ def _deploy_script(scriptname: Path, scripttext: str): script.chmod(stat.S_IRWXU) -def _build_jobqueuenode(monkeypatch, dummy_config: JobConfig, job_id=0): - monkeypatch.setattr( - ert.job_queue.job_queue_node, "forward_model_ok", dummy_config["ok_callback"] - ) - monkeypatch.setattr( - JobQueueNode, "run_exit_callback", dummy_config["exit_callback"] - ) - +def _build_jobqueuenode(job_script, dummy_config: JobConfig, job_id=0): runpath = Path(dummy_config["run_path"].format(job_id)) runpath.mkdir() job = JobQueueNode( - job_script=dummy_config["job_script"], + job_script=job_script, job_name=dummy_config["job_name"].format(job_id), run_path=os.path.realpath(dummy_config["run_path"].format(job_id)), num_cpu=1, status_file="STATUS", exit_file="ERROR", - run_arg=RunArg(iens=job_id), - ensemble_config=Path(dummy_config["run_path"].format(job_id)).resolve(), + run_arg=MagicMock(), ) - return (job, runpath) + return job, runpath +@pytest.mark.usefixtures("use_tmpdir") @pytest.mark.parametrize( "qsub_script, qstat_script", [ @@ -183,7 +143,12 @@ def _build_jobqueuenode(monkeypatch, dummy_config: JobConfig, job_id=0): ], ) def test_run_torque_job( - monkeypatch, temp_working_directory, dummy_config, qsub_script, qstat_script + temp_working_directory, + dummy_config, + qsub_script, + qstat_script, + mock_fm_ok, + simple_script, ): """Verify that the torque driver will succeed in submitting and monitoring torque jobs even when the Torque commands qsub and qstat @@ -192,7 +157,6 @@ def test_run_torque_job( A flaky torque command is a shell script that sometimes but not always returns with a non-zero exit code.""" - _deploy_script(dummy_config["job_script"], SIMPLE_SCRIPT) _deploy_script("qsub", qsub_script) _deploy_script("qstat", qstat_script) @@ -201,7 +165,7 @@ def test_run_torque_job( options=[("QSTAT_CMD", temp_working_directory / "qstat")], ) - (job, runpath) = _build_jobqueuenode(monkeypatch, dummy_config) + job, runpath = _build_jobqueuenode(simple_script, dummy_config) job.run(driver, BoundedSemaphore()) job.wait_for() @@ -210,24 +174,24 @@ def test_run_torque_job( assert Path("job_output").exists() # The "done" callback: - assert (runpath / "OK").read_text(encoding="utf-8") == "success" + mock_fm_ok.assert_called() +@pytest.mark.usefixtures("use_tmpdir") @pytest.mark.parametrize( "user_qstat_option, expected_options", [("", "-f 10001"), ("-x", "-f -x 10001"), ("-f", "-f -f 10001")], ) def test_that_torque_driver_passes_options_to_qstat( - monkeypatch, temp_working_directory, dummy_config, user_qstat_option, expected_options, + simple_script, ): """The driver supports setting options to qstat, but the hard-coded -f option is always there.""" - _deploy_script(dummy_config["job_script"], SIMPLE_SCRIPT) _deploy_script("qsub", MOCK_QSUB) _deploy_script( "qstat", @@ -245,13 +209,14 @@ def test_that_torque_driver_passes_options_to_qstat( ], ) - job, _runpath = _build_jobqueuenode(monkeypatch, dummy_config) + job, _runpath = _build_jobqueuenode(simple_script, dummy_config) job.run(driver, BoundedSemaphore()) job.wait_for() assert Path("qstat_options").read_text(encoding="utf-8").strip() == expected_options +@pytest.mark.usefixtures("mock_fm_ok", "use_tmpdir") @pytest.mark.parametrize( "job_state, exit_status, expected_status", [ @@ -264,14 +229,13 @@ def test_that_torque_driver_passes_options_to_qstat( ], ) def test_torque_job_status_from_qstat_output( - monkeypatch, temp_working_directory, dummy_config, job_state, exit_status, expected_status, + simple_script, ): - _deploy_script(dummy_config["job_script"], SIMPLE_SCRIPT) _deploy_script("qsub", MOCK_QSUB) _deploy_script( "qstat", @@ -284,7 +248,7 @@ def test_torque_job_status_from_qstat_output( options=[("QSTAT_CMD", temp_working_directory / "qstat")], ) - job, _runpath = _build_jobqueuenode(monkeypatch, dummy_config) + job, _runpath = _build_jobqueuenode(simple_script, dummy_config) pool_sema = BoundedSemaphore(value=2) job.run(driver, pool_sema) diff --git a/tests/unit_tests/status/test_tracking_integration.py b/tests/unit_tests/status/test_tracking_integration.py index 298096fbce5..f559638ca46 100644 --- a/tests/unit_tests/status/test_tracking_integration.py +++ b/tests/unit_tests/status/test_tracking_integration.py @@ -185,7 +185,8 @@ def test_tracking( os.chdir(ert_config.config_path) ert = EnKFMain(ert_config) experiment_id = storage.create_experiment( - ert.ensembleConfig().parameter_configuration + parameters=ert.ensembleConfig().parameter_configuration, + responses=ert.ensembleConfig().response_configuration, ) model = create_model( diff --git a/tests/unit_tests/test_load_forward_model.py b/tests/unit_tests/test_load_forward_model.py index 2866b7aa434..006b4778fc4 100644 --- a/tests/unit_tests/test_load_forward_model.py +++ b/tests/unit_tests/test_load_forward_model.py @@ -149,9 +149,7 @@ def test_load_forward_model(snake_oil_default_storage): ), ], ) -def test_load_forward_model_summary( - summary_configuration, prior_ensemble, expected, caplog -): +def test_load_forward_model_summary(summary_configuration, storage, expected, caplog): config_text = dedent( """ NUM_REALIZATIONS 1