Skip to content

Commit

Permalink
Group observations by response (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yngve S. Kristiansen committed Mar 6, 2024
1 parent a084976 commit d1bfe0f
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 19 deletions.
50 changes: 33 additions & 17 deletions src/ert/analysis/_es_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,17 @@ def _get_obs_and_measure_data(
observation_values = []
observation_errors = []
observations = ensemble.experiment.observations
for obs in selected_observations:
observation = observations[obs]
group = observation.attrs["response"]
for group in ensemble.experiment.response_info:
if group not in observations:
continue

observation = observations[group]
response = ensemble.load_responses(group, tuple(iens_active_index))

# for obs in selected_observations:
# observation = observations[obs]
# group = observation.attrs["response"]
# response = source_fs.load_responses(group, tuple(iens_active_index))
if "time" in observation.coords:
response = response.reindex(
time=observation.time, method="nearest", tolerance="1s" # type: ignore
Expand All @@ -214,18 +221,27 @@ def _get_obs_and_measure_data(
except KeyError as e:
raise ErtAnalysisError(
f"Mismatched index for: "
f"Observation: {obs} attached to response: {group}"
f"Observation: {observation} attached to response: {group}"
) from e

observation_keys.append([obs] * filtered_response["observations"].size)
observation_values.append(filtered_response["observations"].data.ravel())
observation_errors.append(filtered_response["std"].data.ravel())
df = filtered_response.to_dataframe().reset_index()

observation_keys.append(df["name"].to_list())
observation_values.append(df["observations"].to_list())
observation_errors.append(df["std"].to_list())

measured_data.append(
filtered_response["values"]
.transpose(..., "realization")
.values.reshape((-1, len(filtered_response.realization)))
)
ensemble.load_responses.cache_clear()
source_fs.load_responses.cache_clear()

# Measured_data, an array of 3 dimensions
# Outer dimension: One array per observation
# Mid dimension is ??? Sometimes length 1, sometimes nreals?
# Inner dimension: value is "values", index is realization
return (
np.concatenate(measured_data, axis=0),
np.concatenate(observation_values),
Expand Down Expand Up @@ -512,16 +528,16 @@ def adaptive_localization_progress_callback(
start = time.time()
for param_batch_idx in batches:
X_local = temp_storage[param_group][param_batch_idx, :]
temp_storage[param_group][param_batch_idx, :] = (
smoother_adaptive_es.assimilate(
X=X_local,
Y=S,
D=D,
alpha=1.0, # The user is responsible for scaling observation covariance (esmda usage)
correlation_threshold=module.correlation_threshold,
cov_YY=cov_YY,
progress_callback=adaptive_localization_progress_callback,
)
temp_storage[param_group][
param_batch_idx, :
] = smoother_adaptive_es.assimilate(
X=X_local,
Y=S,
D=D,
alpha=1.0, # The user is responsible for scaling observation covariance (esmda usage)
correlation_threshold=module.correlation_threshold,
cov_YY=cov_YY,
progress_callback=adaptive_localization_progress_callback,
)
_logger.info(
f"Adaptive Localization of {param_group} completed in {(time.time() - start) / 60} minutes"
Expand Down
25 changes: 23 additions & 2 deletions src/ert/config/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,31 @@ class EnkfObs:
def __init__(self, obs_vectors: Dict[str, ObsVector], obs_time: List[datetime]):
self.obs_vectors = obs_vectors
self.obs_time = obs_time
self.datasets: Dict[str, xr.Dataset] = {
name: obs.to_dataset([]) for name, obs in sorted(self.obs_vectors.items())

vecs: List[ObsVector] = [*self.obs_vectors.values()]
response_keys = set([x.data_key for x in vecs])
observations_by_response: Dict[str, List[xr.Dataset]] = {
k: [] for k in response_keys
}

for vec in vecs:
k = vec.data_key
ds = vec.to_dataset([])
assert k in observations_by_response

if "name" not in ds.dims:
ds = ds.expand_dims(name=[vec.observation_key])

observations_by_response[k].append(ds)

merged_by_response: Dict[str, xr.Dataset] = {}

for k in observations_by_response:
datasets = observations_by_response[k]
merged_by_response[k] = xr.concat(datasets, dim="name")

self.datasets: Dict[str, xr.Dataset] = merged_by_response

def __len__(self) -> int:
return len(self.obs_vectors)

Expand Down

0 comments on commit d1bfe0f

Please sign in to comment.