Skip to content

Commit

Permalink
(WIP, fixup for getting measured data without going through pandas)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yngve S. Kristiansen committed Mar 22, 2024
1 parent 7acfad8 commit bc0880b
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 71 deletions.
206 changes: 136 additions & 70 deletions src/ert/storage/local_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import TYPE_CHECKING, List, Optional, Tuple, Union
from uuid import UUID

import numpy
import numpy as np
import pandas as pd
import xarray as xr
Expand Down Expand Up @@ -47,18 +48,20 @@ class _Failure(BaseModel):
time: datetime


class ObservationsAndResponsesDataFrame(pd.DataFrame):
def __init__(self, args, **kwargs):
super().__init__(args, **kwargs)
self._as_np = self.to_numpy()
class ObservationsAndResponsesData:
def __init__(self, np_arr):
self._as_np = np_arr

def to_dataframe(self):
pass

def vec_of_obs_names(self):
"""
Extracts a ndarray with the shape (num_obs,).
Each cell holds the observation name.
vec_of* getters of this class.
"""
return self.index.get_level_values("obs_name").to_numpy()
return self._as_np[:, 0]

def vec_of_errors(self) -> np.ndarray:
"""
Expand All @@ -67,7 +70,7 @@ def vec_of_errors(self) -> np.ndarray:
The index in this list corresponds to the index of the other
vec_of* getters of this class.
"""
return self._as_np[:, 1]
return self._as_np[:, 2].astype(float)

def vec_of_obs_values(self) -> np.ndarray:
"""
Expand All @@ -76,7 +79,7 @@ def vec_of_obs_values(self) -> np.ndarray:
The index in this list corresponds to the index of the other
vec_of* getters of this class.
"""
return self._as_np[:, 0]
return self._as_np[:, 1].astype(float)

def vec_of_realization_values(self) -> np.ndarray:
"""
Expand All @@ -85,7 +88,7 @@ def vec_of_realization_values(self) -> np.ndarray:
indicated by the index. The first index here corresponds to that of other
vec_of* getters of this class.
"""
return self._as_np[:, 2:]
return self._as_np[:, 3:].astype(float)


class LocalEnsemble(BaseMode):
Expand Down Expand Up @@ -599,7 +602,7 @@ def get_measured_data(
self,
observation_keys: List[str],
active_realizations: Optional[npt.NDArray[np.int_]] = None,
) -> Optional[ObservationsAndResponsesDataFrame]:
) -> Optional[ObservationsAndResponsesData]:
"""Return a pandas dataframe grouped by observation name, showing the
observation + std values, and accompanying simulated values per realization.
Expand Down Expand Up @@ -639,6 +642,7 @@ def set_key_index(df, index: List[str]):

observations_without_responses = []

Check failure on line 643 in src/ert/storage/local_ensemble.py

View workflow job for this annotation

GitHub Actions / check-style (3.12)

Local variable `observations_without_responses` is assigned to but never used
long_dfs = []

Check failure on line 644 in src/ert/storage/local_ensemble.py

View workflow job for this annotation

GitHub Actions / check-style (3.12)

Local variable `long_dfs` is assigned to but never used
long_nps = []
for response_type, obs_ds in self.experiment.observations.items():
# obs_keys_ds = xr.Dataset({"obs_name": observation_keys})

Expand All @@ -655,92 +659,154 @@ def set_key_index(df, index: List[str]):
realizations=tuple(reals_with_responses_mask),
)

filtered_response = obs_ds.merge(responses_ds, join="left")
filtered_response = obs_ds.merge(responses_ds, join="left").chunk(name=200)
index = ObservationsIndices[response_type]
# (quirk workaround)
# Accomodate for different behavior for different kinds of indexes
# For now, summary's ["time"] index can not be accessed the same as
# gen_data's ["index", "report_step"] index.
obs_ds_index_col = obs_ds[index]

Check failure on line 668 in src/ert/storage/local_ensemble.py

View workflow job for this annotation

GitHub Actions / check-style (3.12)

Local variable `obs_ds_index_col` is assigned to but never used
if hasattr(obs_ds_index_col, "index"):
obs_ds_missing_response_mask = ~obs_ds_index_col.index.isin(
responses_ds[index].index
)
elif len(index) == 1:
[idx_name] = index
obs_ds_missing_response_mask = ~obs_ds.indexes[idx_name].isin(
responses_ds[idx_name].indexes[idx_name]
)
else:
raise IndexError(
"Invalid index specified for response type, expected one of "
f"{', '.join(ObservationsIndices.keys())}"
)
# if hasattr(obs_ds_index_col, "index"):
# obs_ds_missing_response_mask = ~obs_ds_index_col.index.isin(
# responses_ds[index].index
# )
# elif len(index) == 1:
# [idx_name] = index
# obs_ds_missing_response_mask = ~obs_ds.indexes[idx_name].isin(
# responses_ds[idx_name].indexes[idx_name]
# )
# else:
# raise IndexError(
# "Invalid index specified for response type, expected one of "
# f"{', '.join(ObservationsIndices.keys())}"
# )

obs_ds.close()

obs_missing_response = obs_ds.where(
obs_ds_missing_response_mask
).to_dataframe()
obs_missing_response.dropna(inplace=True)
obs_missing_response.reset_index(inplace=True)
response_vals_per_real = (
filtered_response["values"].stack(key=("name", *index)).values.T
)
# stacked = filtered_response[["obs_name", "observations", "std"]].stack(
# key=("obs_name", *index)
# )
stacked = (
filtered_response[["obs_name", "observations", "std"]]
.sum(dim="name", min_count=1)
.stack(key=("obs_name", *index))
.dropna(dim="key")
)

del obs_ds
filtered_response[["obs_name", "observations", "std"]].sum(
dim="name", min_count=1
)

# Note: This is fast, but causes large memory usage spike
# might be a good idea to revise this to something more memory-effective
# Should be doable "directly" from XArray, or with dask-dataframe
# if the overhead of creating it doesn't cost more than the gain of
# its memory-mapping/chunking functionality
df = filtered_response.to_dataframe()
nreals = len(filtered_response.realization)
filtered_response.close()
del filtered_response
obs_names_1d = stacked["obs_name"].data.reshape(-1, 1)

df.reset_index(inplace=True)
df.dropna(subset="observations", inplace=True)
# (ASSUMPTION, 95% confident this is OK)
# These are all disjoint by name, so in reality this just
# removes the name dimension
# without_name_dim = stacked.sum(dim="name")
obs_vals_1d = stacked["observations"].values.reshape(-1, 1)
std_vals_1d = stacked["std"].values.reshape(-1, 1)

set_key_index(df, index)
# obs_missing_response = obs_ds.where(obs_ds_missing_response_mask)

# columns: OBS, STD
left_cols = df[["OBS", "STD"]][::nreals]
num_obs_names = len(obs_names_1d)
if (
len(response_vals_per_real) != num_obs_names
or len(obs_names_1d) != num_obs_names
or len(std_vals_1d) != num_obs_names
):

# columns: 0, 1, 2, ...(nreals-1)
right_cols = df[["realization", "values"]].pivot(
columns="realization", values="values"
)
raise IndexError(
"Axis 0 misalignment, expected axis0 length to "
f"correspond to observation names {num_obs_names}. Got:\n"
f"len(response_vals_per_real)={len(response_vals_per_real)}\n"
f"len(obs_names_1d)={len(obs_names_1d)}\n"
f"len(std_vals_1d)={len(std_vals_1d)}"
)

if not obs_missing_response.empty:
set_key_index(obs_missing_response, index)
nan_obs_not_in_ds_mask = ~obs_missing_response.index.isin(
left_cols.index
if response_vals_per_real.shape[1] != len(reals_with_responses_mask):
raise IndexError(
"Axis 1 misalignment, expected axis 1 of"
f" response_vals_per_real to be the same as number of realizations"
f" with responses ({len(reals_with_responses_mask)}),"
f"but got response_vals_per_real.shape[1]"
f"={response_vals_per_real.shape[1]}"
)
obs_missing_response = obs_missing_response[nan_obs_not_in_ds_mask]
observations_without_responses.append(obs_missing_response)

long = pd.concat([left_cols, right_cols], axis=1)
long_dfs.append(long)
combined_np_long = np.concatenate(
[
obs_names_1d,
obs_vals_1d,
std_vals_1d,
response_vals_per_real,
],
axis=1,
)
long_nps.append(combined_np_long)
# .to_dataframe())
# obs_missing_response.dropna(inplace=True)
# obs_missing_response.reset_index(inplace=True)

# del obs_ds

if not long_dfs:
# Note: This is fast, but causes large memory usage spike
# might be a good idea to revise this to something more memory-effective
# Should be doable "directly" from XArray, or with dask-dataframe
# if the overhead of creating it doesn't cost more than the gain of
# its memory-mapping/chunking functionality
# df = filtered_response.to_dataframe()
# nreals = len(filtered_response.realization)
## filtered_response.close()
## del filtered_response
#
# df.reset_index(inplace=True)
# df.dropna(subset="observations", inplace=True)
#
# set_key_index(df, index)
#
## columns: OBS, STD
# left_cols = df[["OBS", "STD"]][::nreals]
#
## columns: 0, 1, 2, ...(nreals-1)
# right_cols = df[["realization", "values"]].pivot(
# columns="realization", values="values"
# )
#
# if not obs_missing_response.empty:
# set_key_index(obs_missing_response, index)
# nan_obs_not_in_ds_mask = ~obs_missing_response.index.isin(
# left_cols.index
# )
# obs_missing_response = obs_missing_response[nan_obs_not_in_ds_mask]
# observations_without_responses.append(obs_missing_response)
#
# long = pd.concat([left_cols, right_cols], axis=1)
# long_dfs.append(long)

if not long_nps:
msg = (
"No observation: "
+ (", ".join(observation_keys) if observation_keys is not None else "*")
+ " in ensemble"
)
raise KeyError(msg)

if len(long_dfs) > 0 and any(
not x.empty for x in observations_without_responses
):
for df in observations_without_responses:
df[long_dfs[0].columns[2:]] = np.nan
df.drop(columns=["name"], inplace=True)
long_dfs.append(df)
# if len(long_dfs) > 0 and any(
# not x.empty for x in observations_without_responses
# ):
# for df in observations_without_responses:
# df[long_dfs[0].columns[2:]] = np.nan
# df.drop(columns=["name"], inplace=True)
# long_dfs.append(df)

long_dataframe = pd.concat(long_dfs, axis=0).sort_values(
by=["obs_name", "key_index"], axis=0
)
assert "obs_name" in long_dataframe.index.names
assert "key_index" in long_dataframe.index.names
long_np = numpy.concatenate(long_nps)

# long_dataframe = pd.concat(long_dfs, axis=0).sort_values(
# by=["obs_name", "key_index"], axis=0
# )
# assert "obs_name" in long_dataframe.index.names
# assert "key_index" in long_dataframe.index.names

return ObservationsAndResponsesDataFrame(long_dataframe)
return ObservationsAndResponsesData(long_np)
3 changes: 2 additions & 1 deletion tests/performance_tests/test_memory_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ def poly_template(monkeypatch):


@pytest.mark.flaky(reruns=5)
@pytest.mark.limit_memory("4 GB")
@pytest.mark.limit_memory("1.5 GB")
@pytest.mark.integration_test
@profile
def test_memory_smoothing(poly_template):
ert_config = ErtConfig.from_file("poly.ert")
fill_storage_with_data(poly_template, ert_config)
Expand Down

0 comments on commit bc0880b

Please sign in to comment.