From a07b2e461caad3802bda9af7b75df066446e8dcf Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Wed, 6 Mar 2024 12:34:13 +0100 Subject: [PATCH] Group observations by response (WIP) --- src/ert/analysis/_es_update.py | 29 +-- src/ert/config/observation_vector.py | 3 + src/ert/config/observations.py | 226 ++++++++++++++++++-- src/ert/dark_storage/common.py | 107 +++++---- src/ert/dark_storage/endpoints/records.py | 34 ++- src/ert/storage/local_experiment.py | 11 +- tests/unit_tests/analysis/test_es_update.py | 1 + 7 files changed, 319 insertions(+), 92 deletions(-) diff --git a/src/ert/analysis/_es_update.py b/src/ert/analysis/_es_update.py index 9a904d2162c..b8d5b01ea34 100644 --- a/src/ert/analysis/_es_update.py +++ b/src/ert/analysis/_es_update.py @@ -201,6 +201,7 @@ def _get_obs_and_measure_data( observation_values = [] observation_errors = [] observations = ensemble.experiment.observations + for group in ensemble.experiment.response_info: if group not in observations: continue @@ -224,24 +225,24 @@ def _get_obs_and_measure_data( f"Observation: {observation} attached to response: {group}" ) from e - df = filtered_response.to_dataframe().reset_index() + grouped = filtered_response.groupby( + "obs_name", squeeze=True, restore_coord_dims=False + ) - observation_keys.append(df["name"].to_list()) - observation_values.append(df["observations"].to_list()) - observation_errors.append(df["std"].to_list()) + for obs_name, group_ds in grouped: + df = group_ds.to_dataframe().dropna().reset_index() + + observation_keys.append(df["obs_name"].to_list()) + observation_values.append(df["observations"].to_list()) + observation_errors.append(df["std"].to_list()) + measured_data.append( + df["values"].to_numpy().reshape((-1, 3)) + ) - measured_data.append( - filtered_response["values"] - .transpose(..., "realization") - .values.reshape((-1, len(filtered_response.realization))) - ) ensemble.load_responses.cache_clear() - source_fs.load_responses.cache_clear() - # Measured_data, an array of 3 dimensions - # Outer dimension: One array per observation - # Mid dimension is ??? Sometimes length 1, sometimes nreals? - # Inner dimension: value is "values", index is realization + # Measured_data, an array of nd arrays with shape (1, nreals) + # Each inner array has 1 dimension containing obs key, and nreals "values" return ( np.concatenate(measured_data, axis=0), np.concatenate(observation_values), diff --git a/src/ert/config/observation_vector.py b/src/ert/config/observation_vector.py index f77364b0ade..699a9b8f51e 100644 --- a/src/ert/config/observation_vector.py +++ b/src/ert/config/observation_vector.py @@ -27,6 +27,9 @@ def __iter__(self) -> Iterable[Union[SummaryObservation, GenObservation]]: def __len__(self) -> int: return len(self.observations) + def to_dataset_info(self, active_list: List[int]) -> List[any]: + pass + def to_dataset(self, active_list: List[int]) -> xr.Dataset: if self.observation_type == EnkfObservationImplementationType.GEN_OBS: datasets = [] diff --git a/src/ert/config/observations.py b/src/ert/config/observations.py index 002a162dc5d..578f8ea5bbe 100644 --- a/src/ert/config/observations.py +++ b/src/ert/config/observations.py @@ -1,9 +1,19 @@ import os from datetime import datetime, timedelta -from typing import TYPE_CHECKING, Dict, Iterator, List, Optional, Tuple, Union +from typing import ( + TYPE_CHECKING, + Dict, + Iterator, + List, + Optional, + Tuple, + Union, +) import numpy as np +import pandas as pd import xarray as xr +from pydantic import BaseModel, Field from ert.validation import rangestring_to_list @@ -35,34 +45,214 @@ def history_key(key: str) -> str: return ":".join([keyword + "H"] + rest) +class _SummaryObsDataset(BaseModel): + observations: List[float] = Field(default_factory=lambda: []) + stds: List[float] = Field(default_factory=lambda: []) + times: List[int] = Field(default_factory=lambda: []) + summary_keywords: List[str] = Field(default_factory=lambda: []) + obs_names: List[str] = Field(default_factory=lambda: []) + + def to_xarray(self) -> xr.Dataset: + return ( + pd.DataFrame( + data={ + "obs_name": self.obs_names, + "name": self.summary_keywords, + "time": self.times, + "observations": self.observations, + "std": self.stds, + }, + ) + .set_index(["obs_name", "name", "time"]) + .to_xarray() + ) + + +class _GenObsDataset(BaseModel): + observations: List[float] = Field(default_factory=lambda: []) + stds: List[float] = Field(default_factory=lambda: []) + indexes: List[int] = Field(default_factory=lambda: []) + report_steps: List[int] = Field(default_factory=lambda: []) + obs_names: List[str] = Field(default_factory=lambda: []) + + def to_xarray(self) -> xr.Dataset: + return ( + pd.DataFrame( + data={ + "obs_name": self.obs_names, + "report_step": self.report_steps, + "index": self.indexes, + "observations": self.observations, + "std": self.stds, + } + ) + .set_index(["obs_name", "report_step", "index"]) + .to_xarray() + ) + + # return xr.Dataset( + # data_vars={ + # "observations": ( + # ["obs_name", "report_step", "index"], + # [[self.observations]], + # ), + # "std": (["obs_name", "report_step", "index"], [[self.stds]]), + # }, + # coords={ + # "index": self.indexes, + # "report_step": self.report_steps, + # "obs_name": self.obs_names, + # }, + # ) + + +class _GenObsAccumulator: + def __init__(self): + self.obs: Dict[str, _GenObsDataset] = {} + + def write( + self, + response_key: str, + obs_name: str, + report_step: int, + observations, + stds, + indexes: List[int], + ): + # We assume the input lists all have the same length + if response_key not in self.obs: + self.obs[response_key] = _GenObsDataset() + + vecs = self.obs[response_key] + + vecs.observations.extend(observations) + vecs.stds.extend(stds) + vecs.indexes.extend(indexes) + + for _ in observations: + vecs.obs_names.append(obs_name) + vecs.report_steps.append(report_step) + + def to_xarrays_grouped_by_response(self) -> Dict[str, xr.Dataset]: + return {response_key: ds.to_xarray() for response_key, ds in self.obs.items()} + + +class _SummaryObsAccumulator: + def __init__(self): + self.obs: Dict[str, _SummaryObsDataset] = {} + + def write( + self, + response_key: str, + obs_names: List[str], + summary_keyword: str, + observations: List[float], + stds: List[float], + times: List[int], + ): + # We assume the input lists all have the same length + if response_key not in self.obs: + self.obs[response_key] = _SummaryObsDataset() + + vecs = self.obs[response_key] + + vecs.obs_names.extend(obs_names) + vecs.observations.extend(observations) + vecs.stds.extend(stds) + vecs.times.extend(times) + + for _ in observations: + vecs.summary_keywords.append(summary_keyword) + + def to_xarrays_grouped_by_response(self) -> Dict[str, xr.Dataset]: + return {response_key: ds.to_xarray() for response_key, ds in self.obs.items()} + + class EnkfObs: def __init__(self, obs_vectors: Dict[str, ObsVector], obs_time: List[datetime]): self.obs_vectors = obs_vectors self.obs_time = obs_time vecs: List[ObsVector] = [*self.obs_vectors.values()] - response_keys = set([x.data_key for x in vecs]) - observations_by_response: Dict[str, List[xr.Dataset]] = { - k: [] for k in response_keys - } - - for vec in vecs: - k = vec.data_key - ds = vec.to_dataset([]) - assert k in observations_by_response - if "name" not in ds.dims: - ds = ds.expand_dims(name=[vec.observation_key]) + gen_obs = _GenObsAccumulator() + sum_obs = _SummaryObsAccumulator() - observations_by_response[k].append(ds) + # Faster to not create a single xr.Dataset per + # observation and then merge/concat + # this just accumulates 1d vecs before making a dataset + for vec in vecs: + if vec.observation_type == EnkfObservationImplementationType.GEN_OBS: + for report_step, node in vec.observations.items(): + gen_obs.write( + response_key=vec.data_key, + obs_name=vec.observation_key, + report_step=report_step, + observations=node.values, + stds=node.stds, + indexes=node.indices, + ) - merged_by_response: Dict[str, xr.Dataset] = {} + elif vec.observation_type == EnkfObservationImplementationType.SUMMARY_OBS: + observations = [] + stds = [] + dates = [] + obs_keys = [] + + for the_date, obs in vec.observations.items(): + assert isinstance(obs, SummaryObservation) + observations.append(obs.value) + stds.append(obs.std) + dates.append(the_date) + obs_keys.append(obs.observation_key) + + sum_obs.write( + response_key=vec.data_key, + obs_names=obs_keys, + summary_keyword=vec.observation_key, + observations=observations, + stds=stds, + times=dates, + ) + else: + raise ValueError("Unknown observation type") - for k in observations_by_response: - datasets = observations_by_response[k] - merged_by_response[k] = xr.concat(datasets, dim="name") + self.datasets: Dict[str, xr.Dataset] = { + **gen_obs.to_xarrays_grouped_by_response(), + **sum_obs.to_xarrays_grouped_by_response(), + } - self.datasets: Dict[str, xr.Dataset] = merged_by_response + for response_key, ds in self.datasets.items(): + ds.attrs["response"] = response_key + + # Alternate approach: Merge xarray datasets + # seems to be a lot slower as it probably does some checks to see if + # merge is OK and whatnot, faster to create some 1d vecs then build the + # larger datasets + # vecs: List[ObsVector] = [*self.obs_vectors.values()] + # response_keys = set([x.data_key for x in vecs]) + # observations_by_response: Dict[str, List[xr.Dataset]] = { + # k: [] for k in response_keys + # } + + # for vec in vecs: + # k = vec.data_key + # ds = vec.to_dataset([]) + # assert k in observations_by_response + + # + # if "obs_name" not in ds.dims: + # ds = ds.expand_dims(obs_name=[vec.observation_key]) + # + # observations_by_response[k].append(ds) + # + # merged_by_response: Dict[str, xr.Dataset] = {} + # + # for k in observations_by_response: + # datasets = observations_by_response[k] + # merged_by_response[k] = xr.combine_by_coords(datasets, join="inner") + # + # self.datasets: Dict[str, xr.Dataset] = merged_by_response def __len__(self) -> int: return len(self.obs_vectors) diff --git a/src/ert/dark_storage/common.py b/src/ert/dark_storage/common.py index 3e426aa78c9..22d248bd8ee 100644 --- a/src/ert/dark_storage/common.py +++ b/src/ert/dark_storage/common.py @@ -148,17 +148,22 @@ def data_for_key( def get_all_observations(experiment: Experiment) -> List[Dict[str, Any]]: observations = [] - for key, dataset in experiment.observations.items(): - observation = { - "name": key, - "values": list(dataset["observations"].values.flatten()), - "errors": list(dataset["std"].values.flatten()), - } - if "time" in dataset.coords: - observation["x_axis"] = _prepare_x_axis(dataset["time"].values.flatten()) # type: ignore - else: - observation["x_axis"] = _prepare_x_axis(dataset["index"].values.flatten()) # type: ignore - observations.append(observation) + for response_key, dataset in experiment.observations.items(): + x_coord_key = "time" if "time" in dataset.coords else "index" + + for obs_name in dataset["name"].values.flatten(): + ds_for_name = dataset.sel(name=obs_name) + df_for_name = ds_for_name.reset_index() + observations.append( + { + "name": obs_name, + "values": df_for_name["values"].to_list(), + "errors": df_for_name["std"].to_list(), + "x_axis": _prepare_x_axis(df_for_name[x_coord_key].to_list()), + } + ) + + observations.extend(dataset) observations.sort(key=lambda x: x["x_axis"]) # type: ignore return observations @@ -169,18 +174,21 @@ def get_observations_for_obs_keys( ) -> List[Dict[str, Any]]: observations = [] experiment_observations = ensemble.experiment.observations - for key in observation_keys: - dataset = experiment_observations[key] - observation = { - "name": key, - "values": list(dataset["observations"].values.flatten()), - "errors": list(dataset["std"].values.flatten()), - } - if "time" in dataset.coords: - observation["x_axis"] = _prepare_x_axis(dataset["time"].values.flatten()) # type: ignore - else: - observation["x_axis"] = _prepare_x_axis(dataset["index"].values.flatten()) # type: ignore - observations.append(observation) + + for response_name, ds in experiment_observations.items(): + for obs_key, obs_ds in ds.groupby("obs_name"): + # for key in observation_keys: + df = obs_ds.to_dataframe().reset_index() + observation = { + "name": obs_key, + "values": list(df["observations"].to_list()), + "errors": list(df["std"].to_list()), + } + if "time" in obs_ds.coords: + observation["x_axis"] = _prepare_x_axis(df["time"].to_list()) # type: ignore + else: + observation["x_axis"] = _prepare_x_axis(df["index"].to_list()) # type: ignore + observations.append(observation) observations.sort(key=lambda x: x["x_axis"]) # type: ignore return observations @@ -204,32 +212,39 @@ def get_observation_keys_for_response( """ if response_key in gen_data_keys(ensemble): - response_key_parts = response_key.split("@") - data_key = response_key_parts[0] - data_report_step = ( - int(response_key_parts[1]) if len(response_key_parts) > 1 else 0 - ) + data_key = response_key.split("@")[0] + return ensemble.experiment.observations[data_key].groupby("obs_name").groups + + obs_by_response = ensemble.experiment.observations - for observation_key, dataset in ensemble.experiment.observations.items(): - if ( - "report_step" in dataset.coords - and data_key == dataset.attrs["response"] - and data_report_step == min(dataset["report_step"].values) - ): - return [observation_key] - return [] - - elif response_key in ensemble.get_summary_keyset(): - observation_keys = [] - for observation_key, dataset in ensemble.experiment.observations.items(): - if ( - dataset.attrs["response"] == "summary" - and dataset.name.values.flatten()[0] == response_key - ): - observation_keys.append(observation_key) - return observation_keys + if response_key in ensemble.get_summary_keyset() and "summary" in obs_by_response: + return ensemble.experiment.observations["summary"]["name"].to_numpy().tolist() return [] + #data_report_step = ( + # int(response_key_parts[1]) if len(response_key_parts) > 1 else 0 + #) +# + #for observation_key, dataset in ensemble.experiment.observations.items(): + # if ( + # "report_step" in dataset.coords + # and data_key == dataset.attrs["response"] + # and data_report_step == min(dataset["report_step"].values) + # ): + # return [observation_key] + #return [] + + #elif response_key in ensemble.get_summary_keyset(): + # observation_keys = [] + # for observation_key, dataset in ensemble.experiment.observations.items(): + # if ( + # dataset.attrs["response"] == "summary" + # and dataset.name.values.flatten()[0] == response_key + # ): + # observation_keys.append(observation_key) + # return observation_keys +# + #return [] def _prepare_x_axis( diff --git a/src/ert/dark_storage/endpoints/records.py b/src/ert/dark_storage/endpoints/records.py index 0dc89754e79..3bfd4eaca19 100644 --- a/src/ert/dark_storage/endpoints/records.py +++ b/src/ert/dark_storage/endpoints/records.py @@ -35,20 +35,34 @@ async def get_record_observations( response_name: str, ) -> List[js.ObservationOut]: ensemble = storage.get_ensemble(ensemble_id) - obs_keys = get_observation_keys_for_response(ensemble, response_name) - obss = get_observations_for_obs_keys(ensemble, obs_keys) + #obs_keys = get_observation_keys_for_response(ensemble, response_name) + #obss = get_observations_for_obs_keys(ensemble, obs_keys) - if not obss: + observations = ensemble.experiment.observations + + datasets = [] + if response_name in observations: + # It is not a summary + datasets.append(observations[response_name]) + + elif ( + "summary" in observations and + response_name in ensemble.get_summary_keyset() + ): + datasets = [ds for _, ds in observations["summary"].groupby("obs_name")] + pass + + if not datasets: return [] return [ js.ObservationOut( id=uuid4(), userdata={}, - errors=list(chain.from_iterable([obs["errors"] for obs in obss])), - values=list(chain.from_iterable([obs["values"] for obs in obss])), - x_axis=list(chain.from_iterable([obs["x_axis"] for obs in obss])), - name=get_observation_name(ensemble, obs_keys), + errors=list(chain.from_iterable([ds["errors"] for ds in datasets])), + values=list(chain.from_iterable([ds["values"] for ds in datasets])), + x_axis=list(chain.from_iterable([ds["x_axis"] for ds in datasets])), + name=[ds["obs_name"].to_numpy().tolist() for ds in datasets], ) ] @@ -113,12 +127,12 @@ def get_ensemble_responses( response_names_with_observations = set() for dataset in ensemble.experiment.observations.values(): if dataset.attrs["response"] == "summary" and "name" in dataset.coords: - response_name = dataset.name.values.flatten()[0] - response_names_with_observations.add(response_name) + summary_kw_names = dataset.name.values.flatten() + response_names_with_observations = response_names_with_observations.union(set(summary_kw_names)) else: response_name = dataset.attrs["response"] if "report_step" in dataset.coords: - report_step = dataset.report_step.values.flatten()[0] + report_step = dataset.report_step.values.flatten() response_names_with_observations.add(response_name + "@" + str(report_step)) for name in ensemble.get_summary_keyset(): diff --git a/src/ert/storage/local_experiment.py b/src/ert/storage/local_experiment.py index 1ab4ece9629..4add3dbcd48 100644 --- a/src/ert/storage/local_experiment.py +++ b/src/ert/storage/local_experiment.py @@ -208,7 +208,10 @@ def update_parameters(self) -> List[str]: @cached_property def observations(self) -> Dict[str, xr.Dataset]: observations = list(self.mount_point.glob("observations/*")) - return { - observation.name: xr.open_dataset(observation, engine="scipy") - for observation in observations - } + obs_by_response_name = {} + + for obs_file in observations: + ds = xr.open_dataset(obs_file, engine="scipy") + obs_by_response_name[ds.attrs["response"]] = ds + + return obs_by_response_name diff --git a/tests/unit_tests/analysis/test_es_update.py b/tests/unit_tests/analysis/test_es_update.py index 1d3bfada05e..487a20ab1e2 100644 --- a/tests/unit_tests/analysis/test_es_update.py +++ b/tests/unit_tests/analysis/test_es_update.py @@ -388,6 +388,7 @@ def g(X): coords={"report_step": [0], "index": np.arange(len(observations))}, attrs={"response": "RESPONSE"}, ) + obs = obs.expand_dims({"obs_name": ["oops"]}) param_group = "PARAM_FIELD"