Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save response info in storage #6099

Merged
merged 4 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/ert/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pathlib import Path
from typing import Iterable

from ert.config import EnsembleConfig, ParameterConfig, SummaryConfig
from ert.config import ParameterConfig, ResponseConfig, SummaryConfig
from ert.run_arg import RunArg

from .load_status import LoadResult, LoadStatus
Expand Down Expand Up @@ -38,10 +38,10 @@ def _read_parameters(


def _write_responses_to_storage(
ens_config: EnsembleConfig, run_arg: RunArg
run_arg: RunArg, response_configs: Iterable[ResponseConfig]
) -> LoadResult:
errors = []
for config in ens_config.response_configs.values():
for config in response_configs:
if isinstance(config, SummaryConfig) and not config.keys:
# Nothing to load, should not be handled here, should never be
# added in the first place
Expand All @@ -64,7 +64,6 @@ def _write_responses_to_storage(

def forward_model_ok(
run_arg: RunArg,
ens_conf: EnsembleConfig,
) -> LoadResult:
parameters_result = LoadResult(LoadStatus.LOAD_SUCCESSFUL, "")
response_result = LoadResult(LoadStatus.LOAD_SUCCESSFUL, "")
Expand All @@ -78,7 +77,10 @@ def forward_model_ok(
)

if parameters_result.status == LoadStatus.LOAD_SUCCESSFUL:
response_result = _write_responses_to_storage(ens_conf, run_arg)
response_result = _write_responses_to_storage(
run_arg,
run_arg.ensemble_storage.experiment.response_configuration.values(),
)

except Exception as err:
logging.exception(f"Failed to load results for realization {run_arg.iens}")
Expand Down
3 changes: 2 additions & 1 deletion src/ert/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ def run_cli(args: Namespace, _: Any = None) -> None:

evaluator_server_config = EvaluatorServerConfig(custom_port_range=args.port_range)
experiment = storage.create_experiment(
parameters=ert.ensembleConfig().parameter_configuration
parameters=ert.ensembleConfig().parameter_configuration,
responses=ert.ensembleConfig().response_configuration,
)

# Note that asyncio.run should be called once in ert/shared/main.py
Expand Down
2 changes: 2 additions & 0 deletions src/ert/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from .parsing import ConfigValidationError, ConfigWarning
from .queue_config import QueueConfig
from .queue_system import QueueSystem
from .response_config import ResponseConfig
from .summary_config import SummaryConfig
from .summary_observation import SummaryObservation
from .surface_config import SurfaceConfig
Expand Down Expand Up @@ -53,6 +54,7 @@
"PriorDict",
"QueueConfig",
"QueueSystem",
"ResponseConfig",
"SummaryConfig",
"SummaryObservation",
"SurfaceConfig",
Expand Down
4 changes: 4 additions & 0 deletions src/ert/config/ensemble_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,7 @@ def get_summary_keys(self) -> List[str]:
@property
def parameter_configuration(self) -> List[ParameterConfig]:
return list(self.parameter_configs.values())

@property
def response_configuration(self) -> List[ResponseConfig]:
return list(self.response_configs.values())
11 changes: 6 additions & 5 deletions src/ert/config/gen_data_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import numpy as np
import xarray as xr
from sortedcontainers import SortedList
from typing_extensions import Self

from ert.validation import rangestring_to_list
Expand All @@ -18,11 +17,11 @@
@dataclass
class GenDataConfig(ResponseConfig):
input_file: str = ""
report_steps: Optional[SortedList] = None
report_steps: Optional[List[int]] = None

def __post_init__(self) -> None:
if isinstance(self.report_steps, list):
self.report_steps = SortedList(set(self.report_steps))
self.report_steps = list(set(self.report_steps))

@classmethod
def from_config_list(cls, gen_data: List[str]) -> Self:
Expand All @@ -35,8 +34,10 @@ def from_config_list(cls, gen_data: List[str]) -> Self:
f"Missing or unsupported RESULT_FILE for GEN_DATA key {name!r}", name
)

report_steps = rangestring_to_list(options.get("REPORT_STEPS", ""))
report_steps = SortedList(report_steps) if report_steps else None
report_steps: Optional[List[int]] = rangestring_to_list(
options.get("REPORT_STEPS", "")
)
report_steps = sorted(list(report_steps)) if report_steps else None
if os.path.isabs(res_file):
result_file_context = next(
x for x in gen_data if x.startswith("RESULT_FILE:")
Expand Down
8 changes: 6 additions & 2 deletions src/ert/config/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ def _handle_history_observation(
if history_type is None:
raise ValueError("Need a history type in order to use history observations")

response_config.keys.append(summary_key)
if summary_key not in response_config.keys:
response_config.keys.append(summary_key)
oyvindeide marked this conversation as resolved.
Show resolved Hide resolved
error = history_observation["ERROR"]
error_min = history_observation["ERROR_MIN"]
error_mode = history_observation["ERROR_MODE"]
Expand Down Expand Up @@ -323,7 +324,10 @@ def _handle_summary_observation(
time_map: List[datetime],
) -> Dict[str, ObsVector]:
summary_key = summary_dict["KEY"]
ensemble_config["summary"].keys.append(summary_key) # type: ignore
summary_config = ensemble_config["summary"]
assert isinstance(summary_config, SummaryConfig)
if summary_key not in summary_config.keys:
summary_config.keys.append(summary_key)
value, std_dev = cls._make_value_and_std_dev(summary_dict)
try:

Expand Down
2 changes: 2 additions & 0 deletions src/ert/config/parameter_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ def __init__(self, data: List[Tuple[Any, Any]]) -> None:
for i, (key, value) in enumerate(data):
if isinstance(value, Path):
data[i] = (key, str(value))
if isinstance(value, set):
data[i] = (key, list(value))
super().__init__(data)


Expand Down
8 changes: 8 additions & 0 deletions src/ert/config/response_config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import dataclasses
from abc import ABC, abstractmethod
from typing import Any, Dict

import xarray as xr

from ert.config.parameter_config import CustomDict


@dataclasses.dataclass
class ResponseConfig(ABC):
Expand All @@ -11,3 +14,8 @@ class ResponseConfig(ABC):
@abstractmethod
def read_from_file(self, run_path: str, iens: int) -> xr.Dataset:
...

def to_dict(self) -> Dict[str, Any]:
data = dataclasses.asdict(self, dict_factory=CustomDict)
data["_ert_kind"] = self.__class__.__name__
return data
9 changes: 7 additions & 2 deletions src/ert/config/summary_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import TYPE_CHECKING, Set
from typing import TYPE_CHECKING, Set, Union

import xarray as xr
from ecl.summary import EclSum
Expand All @@ -23,7 +23,11 @@
class SummaryConfig(ResponseConfig):
input_file: str
keys: List[str]
refcase: Optional[Set[datetime]] = None
refcase: Union[Set[datetime], List[str], None] = None

def __post_init__(self) -> None:
if isinstance(self.refcase, list):
self.refcase = {datetime.fromisoformat(val) for val in self.refcase}

def read_from_file(self, run_path: str, iens: int) -> xr.Dataset:
filename = self.input_file.replace("<IENS>", str(iens))
Expand All @@ -42,6 +46,7 @@ def read_from_file(self, run_path: str, iens: int) -> xr.Dataset:
c_time = summary.alloc_time_vector(True)
time_map = [t.datetime() for t in c_time]
if self.refcase:
assert isinstance(self.refcase, set)
oyvindeide marked this conversation as resolved.
Show resolved Hide resolved
missing = self.refcase.difference(time_map)
if missing:
first, last = min(missing), max(missing)
Expand Down
1 change: 0 additions & 1 deletion src/ert/enkf_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ def loadFromForwardModel(
run_context = self.ensemble_context(fs, realization, iteration)
nr_loaded = fs.load_from_run_path(
self.getEnsembleSize(),
self.ensembleConfig(),
run_context.run_args,
run_context.mask,
)
Expand Down
2 changes: 0 additions & 2 deletions src/ert/ensemble_evaluator/_builder/_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

SOURCE_TEMPLATE_STEP = "/step/{step_id}"
if TYPE_CHECKING:
from ert.config.ensemble_config import EnsembleConfig
from ert.run_arg import RunArg


Expand All @@ -27,7 +26,6 @@ class LegacyStep:
name: str
max_runtime: Optional[int]
run_arg: "RunArg"
ensemble_config: "EnsembleConfig"
num_cpu: int
run_path: Path
job_script: str
Expand Down
3 changes: 2 additions & 1 deletion src/ert/gui/ertwidgets/caselist.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ def addItem(self):
new_case_name = dialog.showAndTell()
if new_case_name != "":
ensemble = self.storage.create_experiment(
parameters=self.facade.ensemble_config.parameter_configuration
parameters=self.facade.ensemble_config.parameter_configuration,
responses=self.facade.ensemble_config.response_configuration,
).create_ensemble(
name=new_case_name,
ensemble_size=self.facade.get_ensemble_size(),
Expand Down
3 changes: 2 additions & 1 deletion src/ert/gui/simulation/simulation_panel.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ def runSimulation(self):
try:
args = self.getSimulationArguments()
experiment = self._notifier.storage.create_experiment(
parameters=self.ert.ensembleConfig().parameter_configuration
parameters=self.ert.ensembleConfig().parameter_configuration,
responses=self.ert.ensembleConfig().response_configuration,
)
model = create_model(
self.ert,
Expand Down
7 changes: 1 addition & 6 deletions src/ert/job_queue/job_queue_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from .thread_status import ThreadStatus

if TYPE_CHECKING:
from ..config import EnsembleConfig
from ..run_arg import RunArg
from .driver import Driver

Expand Down Expand Up @@ -97,13 +96,11 @@ def __init__(
status_file: str,
exit_file: str,
run_arg: "RunArg",
ensemble_config: "EnsembleConfig",
max_runtime: Optional[int] = None,
callback_timeout: Optional[Callable[[int], None]] = None,
):
self.callback_timeout = callback_timeout
self.run_arg = run_arg
self.ensemble_config = ensemble_config
argc = 1
argv = StringList()
argv.append(run_path)
Expand Down Expand Up @@ -177,9 +174,7 @@ def submit(self, driver: "Driver") -> SubmitStatus:
return self._submit(driver)

def run_done_callback(self) -> Optional[LoadStatus]:
callback_status, status_msg = forward_model_ok(
self.run_arg, self.ensemble_config
)
callback_status, status_msg = forward_model_ok(self.run_arg)
if callback_status == LoadStatus.LOAD_SUCCESSFUL:
self._set_queue_status(JobStatus.SUCCESS)
elif callback_status == LoadStatus.TIME_MAP_FAILURE:
Expand Down
6 changes: 1 addition & 5 deletions src/ert/job_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
from . import ResPrototype

if TYPE_CHECKING:
from ert.config import ErtConfig
from ert.ensemble_evaluator import LegacyStep
from ert.run_arg import RunArg

Expand Down Expand Up @@ -478,13 +477,12 @@ async def execute_queue_comms_via_bus( # pylint: disable=too-many-arguments
def add_job_from_run_arg(
self,
run_arg: "RunArg",
ert_config: "ErtConfig",
job_script: str,
max_runtime: Optional[int],
num_cpu: int,
) -> None:
job_name = run_arg.job_name
run_path = run_arg.runpath
job_script = ert_config.queue_config.job_script

job = JobQueueNode(
job_script=job_script,
Expand All @@ -494,7 +492,6 @@ def add_job_from_run_arg(
status_file=self.status_file,
exit_file=self.exit_file,
run_arg=run_arg,
ensemble_config=ert_config.ensemble_config,
max_runtime=max_runtime,
)

Expand All @@ -515,7 +512,6 @@ def add_ee_stage(
status_file=self.status_file,
exit_file=self.exit_file,
run_arg=stage.run_arg,
ensemble_config=stage.ensemble_config,
max_runtime=stage.max_runtime,
callback_timeout=callback_timeout,
)
Expand Down
1 change: 0 additions & 1 deletion src/ert/run_models/base_run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ def _build_ensemble(
name="legacy step",
max_runtime=self.ert().analysisConfig().max_runtime,
run_arg=run_arg,
ensemble_config=self.ert().resConfig().ensemble_config,
num_cpu=self.ert().get_num_cpu(),
run_path=Path(run_arg.runpath),
job_script=self.ert().resConfig().queue_config.job_script,
Expand Down
8 changes: 3 additions & 5 deletions src/ert/simulator/batch_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union

import numpy as np
from sortedcontainers import SortedList

from ert.config import ErtConfig, ExtParamConfig, GenDataConfig
from ert.enkf_main import EnKFMain
Expand Down Expand Up @@ -112,9 +111,7 @@ def callback(*args, **kwargs):

for key in results:
ens_config.addNode(
GenDataConfig(
name=key, input_file=f"{key}_%d", report_steps=SortedList([0])
)
GenDataConfig(name=key, input_file=f"{key}_%d", report_steps=[0])
)

def _setup_sim(
Expand Down Expand Up @@ -237,7 +234,8 @@ def start(
batch complete before you start a new batch.
"""
experiment = storage.create_experiment(
self.ert_config.ensemble_config.parameter_configuration
parameters=self.ert_config.ensemble_config.parameter_configuration,
responses=self.ert_config.ensemble_config.response_configuration,
)
ensemble = storage.create_ensemble(
experiment.id, name=case_name, ensemble_size=self.ert.getEnsembleSize()
Expand Down
2 changes: 1 addition & 1 deletion src/ert/simulator/simulation_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def _run_forward_model(
continue
job_queue.add_job_from_run_arg(
run_arg,
ert.resConfig(),
ert.resConfig().queue_config.job_script,
max_runtime,
ert.get_num_cpu(),
)
Expand Down
7 changes: 2 additions & 5 deletions src/ert/storage/local_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
if TYPE_CHECKING:
import numpy.typing as npt

from ert.config import EnsembleConfig
from ert.run_arg import RunArg
from ert.storage.local_experiment import (
LocalExperimentAccessor,
Expand All @@ -34,15 +33,14 @@
def _load_realization(
sim_fs: LocalEnsembleAccessor,
realisation: int,
ensemble_config: EnsembleConfig,
run_args: List[RunArg],
) -> Tuple[LoadResult, int]:
sim_fs.update_realization_state(
realisation,
[RealizationState.UNDEFINED],
RealizationState.INITIALIZED,
)
result = forward_model_ok(run_args[realisation], ensemble_config)
result = forward_model_ok(run_args[realisation])
sim_fs.state_map[realisation] = (
RealizationState.HAS_DATA
if result.status == LoadStatus.LOAD_SUCCESSFUL
Expand Down Expand Up @@ -280,7 +278,6 @@ def sync(self) -> None:
def load_from_run_path(
self,
ensemble_size: int,
ensemble_config: EnsembleConfig,
run_args: List[RunArg],
active_realizations: List[bool],
) -> int:
Expand All @@ -290,7 +287,7 @@ def load_from_run_path(
async_result = [
pool.apply_async(
_load_realization,
(self, iens, ensemble_config, run_args),
(self, iens, run_args),
)
for iens in range(ensemble_size)
if active_realizations[iens]
Expand Down
Loading
Loading