From 4180da78ce4c8f5cb51e0bacdd2eecc6d5ec70e9 Mon Sep 17 00:00:00 2001 From: Feda Curic Date: Sat, 4 Feb 2023 11:57:23 +0100 Subject: [PATCH 1/5] Temporarily add flow_config.yml to enable running with flow --- .../ert/forward-models/res/script/flow_config.yml | 4 ++++ tests/unit_tests/cli/test_integration_cli.py | 14 ++++++++++++++ 2 files changed, 18 insertions(+) create mode 100644 src/ert/shared/share/ert/forward-models/res/script/flow_config.yml diff --git a/src/ert/shared/share/ert/forward-models/res/script/flow_config.yml b/src/ert/shared/share/ert/forward-models/res/script/flow_config.yml new file mode 100644 index 00000000000..9a5d2463f14 --- /dev/null +++ b/src/ert/shared/share/ert/forward-models/res/script/flow_config.yml @@ -0,0 +1,4 @@ +versions: + '2023.04-pre': + scalar: + executable: /Users/fedacuric/opm-simulators/build/bin/flow diff --git a/tests/unit_tests/cli/test_integration_cli.py b/tests/unit_tests/cli/test_integration_cli.py index e9edd7f8bf1..3ae50a7d8d9 100644 --- a/tests/unit_tests/cli/test_integration_cli.py +++ b/tests/unit_tests/cli/test_integration_cli.py @@ -78,6 +78,20 @@ def test_runpath_file(tmpdir, source_root): assert os.path.isfile("RUNPATH_WORKFLOW_1.OK") +# def test_reek_case(): +# parser = ArgumentParser(prog="test_reek") +# parsed = ert_parser( +# parser, +# [ +# ENSEMBLE_SMOOTHER_MODE, +# "--target-case", +# "ES_loc", +# "/Users/fedacuric/reek/ert/config.ert", +# ], +# ) +# run_cli(parsed) + + @pytest.mark.integration_test def test_ensemble_evaluator(tmpdir, source_root): shutil.copytree( From d80f169b16e38d05436eeb6288ab29112b828477 Mon Sep 17 00:00:00 2001 From: Feda Curic Date: Fri, 10 Mar 2023 09:07:57 +0100 Subject: [PATCH 2/5] Implement adaptive localization Add option of running adaptive localization that can simply be turned on and does not need any user input. Only parameters that are significantly correlated to responses will be updated. Default value of what constitutes significant correlation is calculated based on theory, but can be set by the user. --- src/ert/analysis/_es_update.py | 128 +++++++++++++++--- src/ert/config/analysis_module.py | 124 +++++++++++++---- .../analysismodulevariablespanel.py | 19 +++ test-data/poly_example/poly_loc.ert | 21 +++ test-data/poly_example/poly_loc_1.ert | 22 +++ test-data/poly_example/poly_no_loc.ert | 19 +++ test-data/poly_example/poly_zero_loc.ert | 22 +++ test-data/poly_example/test_localization.sh | 9 ++ .../analysis/test_analysis_module.py | 8 +- 9 files changed, 329 insertions(+), 43 deletions(-) create mode 100644 test-data/poly_example/poly_loc.ert create mode 100644 test-data/poly_example/poly_loc_1.ert create mode 100644 test-data/poly_example/poly_no_loc.ert create mode 100644 test-data/poly_example/poly_zero_loc.ert create mode 100644 test-data/poly_example/test_localization.sh diff --git a/src/ert/analysis/_es_update.py b/src/ert/analysis/_es_update.py index 621f017391e..61f25513c94 100644 --- a/src/ert/analysis/_es_update.py +++ b/src/ert/analysis/_es_update.py @@ -25,6 +25,7 @@ from iterative_ensemble_smoother.experimental import ( ensemble_smoother_update_step_row_scaling, ) +from tqdm import tqdm from ert.config import Field, GenKwConfig, SurfaceConfig from ert.realization_state import RealizationState @@ -370,6 +371,12 @@ def _load_observations_and_responses( ) +def _split_by_batchsize( + arr: npt.NDArray[np.int_], batch_size: int +) -> List[npt.NDArray[np.int_]]: + return np.array_split(arr, int((arr.shape[0] / batch_size)) + 1) + + def analysis_ES( updatestep: UpdateConfiguration, obs: EnkfObs, @@ -417,21 +424,17 @@ def analysis_ES( # pylint: disable=unsupported-assignment-operation smoother_snapshot.update_step_snapshots[update_step.name] = update_snapshot - if len(observation_values) == 0: + + num_obs = len(observation_values) + if num_obs == 0: raise ErtAnalysisError( f"No active observations for update step: {update_step.name}." ) - noise = rng.standard_normal(size=(len(observation_values), S.shape[1])) + smoother = ies.ES() - smoother.fit( - S, - observation_errors, - observation_values, - noise=noise, - truncation=module.get_truncation(), - inversion=ies.InversionType(module.inversion), - param_ensemble=param_ensemble, - ) + truncation = module.get_truncation() + noise = rng.standard_normal(size=(num_obs, ensemble_size)) + for param_group in update_step.parameters: source: Union[EnsembleReader, EnsembleAccessor] if target_fs.has_parameter_group(param_group.name): @@ -441,15 +444,92 @@ def analysis_ES( temp_storage = _create_temporary_parameter_storage( source, iens_active_index, param_group.name ) - progress_callback(Progress(Task("Updating data", 2, 3), None)) - if active_indices := param_group.index_list: - temp_storage[param_group.name][active_indices, :] = smoother.update( - temp_storage[param_group.name][active_indices, :] + + if module.localization(): + Y_prime = S - S.mean(axis=1, keepdims=True) + C_YY = Y_prime @ Y_prime.T / (ensemble_size - 1) + Sigma_Y = np.diag(np.sqrt(np.diag(C_YY))) + batch_size: int = 1000 + correlation_threshold = module.localization_correlation_threshold( + ensemble_size ) + # for parameter in update_step.parameters: + num_params = temp_storage[param_group.name].shape[0] + + print( + ( + f"Running localization on {num_params} parameters,", + f"{num_obs} responses and {ensemble_size} realizations...", + ) + ) + batches = _split_by_batchsize(np.arange(0, num_params), batch_size) + for param_batch_idx in tqdm(batches): + X_local = temp_storage[param_group.name][param_batch_idx, :] + A = X_local - X_local.mean(axis=1, keepdims=True) + C_AA = A @ A.T / (ensemble_size - 1) + + # State-measurement covariance matrix + C_AY = A @ Y_prime.T / (ensemble_size - 1) + Sigma_A = np.diag(np.sqrt(np.diag(C_AA))) + + # State-measurement correlation matrix + c_AY = np.abs( + np.linalg.inv(Sigma_A) @ C_AY @ np.linalg.inv(Sigma_Y) + ) + c_bool = c_AY > correlation_threshold + # Some parameters might be significantly correlated + # to the exact same responses, + # making up what we call a `parameter group``. + # We want to call the update only once per such parameter group + # to speed up computation. + param_groups = np.unique(c_bool, axis=0) + + # Drop the parameter group that does not correlate to any responses. + row_with_all_false = np.all(~param_groups, axis=1) + param_groups = param_groups[~row_with_all_false] + + for grp in param_groups: + # Find the rows matching the parameter group + matching_rows = np.all(c_bool == grp, axis=1) + # Get the indices of the matching rows + row_indices = np.where(matching_rows)[0] + X_chunk = temp_storage[param_group.name][param_batch_idx, :][ + row_indices, : + ] + S_chunk = S[grp, :] + observation_errors_loc = observation_errors[grp] + observation_values_loc = observation_values[grp] + smoother.fit( + S_chunk, + observation_errors_loc, + observation_values_loc, + noise=noise[grp], + truncation=truncation, + inversion=ies.InversionType(module.inversion), + param_ensemble=param_ensemble, + ) + temp_storage[param_group.name][ + param_batch_idx[row_indices], : + ] = smoother.update(X_chunk) else: - temp_storage[param_group.name] = smoother.update( - temp_storage[param_group.name] + smoother.fit( + S, + observation_errors, + observation_values, + noise=noise, + truncation=truncation, + inversion=ies.InversionType(module.inversion), + param_ensemble=param_ensemble, ) + if active_indices := param_group.index_list: + temp_storage[param_group.name][active_indices, :] = smoother.update( + temp_storage[param_group.name][active_indices, :] + ) + else: + temp_storage[param_group.name] = smoother.update( + temp_storage[param_group.name] + ) + if params_with_row_scaling := _get_params_with_row_scaling( temp_storage, update_step.row_scaling_parameters ): @@ -465,7 +545,19 @@ def analysis_ES( for row_scaling_parameter, (A, _) in zip( update_step.row_scaling_parameters, params_with_row_scaling ): - _save_to_temp_storage(temp_storage, [row_scaling_parameter], A) + params_with_row_scaling = ensemble_smoother_update_step_row_scaling( + S, + params_with_row_scaling, + observation_errors, + observation_values, + noise, + module.get_truncation(), + ies.InversionType(module.inversion), + ) + for row_scaling_parameter, (A, _) in zip( + update_step.row_scaling_parameters, params_with_row_scaling + ): + _save_to_temp_storage(temp_storage, [row_scaling_parameter], A) progress_callback(Progress(Task("Storing data", 3, 3), None)) _save_temp_storage_to_disk(target_fs, temp_storage, iens_active_index) diff --git a/src/ert/config/analysis_module.py b/src/ert/config/analysis_module.py index 51a25f2ee96..03694244f43 100644 --- a/src/ert/config/analysis_module.py +++ b/src/ert/config/analysis_module.py @@ -1,4 +1,5 @@ import logging +import math import sys from typing import TYPE_CHECKING, Dict, List, Type, TypedDict, Union @@ -33,6 +34,23 @@ class VariableInfo(TypedDict): DEFAULT_IES_DEC_STEPLENGTH = 2.50 DEFAULT_ENKF_TRUNCATION = 0.98 DEFAULT_IES_INVERSION = 0 +DEFAULT_LOCALIZATION = False +# Default threshold is a function of ensemble size which is not available here. +DEFAULT_LOCALIZATION_CORRELATION_THRESHOLD = -1 + + +def correlation_threshold(ensemble_size: int, user_defined_threshold: float) -> float: + """Decides whether or not to use user-defined or default threshold. + + Default threshold taken from luo2022, + Continuous Hyper-parameter OPtimization (CHOP) in an ensemble Kalman filter + Section 2.3 - Localization in the CHOP problem + """ + default_threshold = 3 / math.sqrt(ensemble_size) + if user_defined_threshold == -1: + return default_threshold + + return user_defined_threshold class AnalysisMode(StrEnum): @@ -58,6 +76,22 @@ def get_mode_variables(mode: AnalysisMode) -> Dict[str, "VariableInfo"]: "step": 0.01, "labelname": "Singular value truncation", }, + "LOCALIZATION": { + "type": bool, + "min": 0.0, + "value": DEFAULT_LOCALIZATION, + "max": 1.0, + "step": 1.0, + "labelname": "Adaptive localization", + }, + "LOCALIZATION_CORRELATION_THRESHOLD": { + "type": float, + "min": 0.0, + "value": DEFAULT_LOCALIZATION_CORRELATION_THRESHOLD, + "max": 1.0, + "step": 0.1, + "labelname": "Adaptive localization correlation threshold", + }, } ies_variables: Dict[str, "VariableInfo"] = { "IES_MAX_STEPLENGTH": { @@ -169,31 +203,47 @@ def set_var(self, var_name: str, value: Union[float, int, bool, str]) -> None: self.handle_special_key_set(var_name, value) elif var_name in self._variables: var = self._variables[var_name] - try: - new_value = var["type"](value) - if new_value > var["max"]: - var["value"] = var["max"] - logger.warning( - f"New value {new_value} for key" - f" {var_name} is out of [{var['min']}, {var['max']}] " - f"using max value {var['max']}" - ) - elif new_value < var["min"]: - var["value"] = var["min"] - logger.warning( - f"New value {new_value} for key" - f" {var_name} is out of [{var['min']}, {var['max']}] " - f"using min value {var['min']}" + + if var["type"] is not bool: + try: + new_value = var["type"](value) + if new_value > var["max"]: + var["value"] = var["max"] + logger.warning( + f"New value {new_value} for key" + f" {var_name} is out of [{var['min']}, {var['max']}] " + f"using max value {var['max']}" + ) + elif new_value < var["min"]: + var["value"] = var["min"] + logger.warning( + f"New value {new_value} for key" + f" {var_name} is out of [{var['min']}, {var['max']}] " + f"using min value {var['min']}" + ) + else: + var["value"] = new_value + + except ValueError as e: + raise ConfigValidationError( + f"Variable {var_name!r} with value {value!r} has " + f"incorrect type." + f" Expected type {var['type'].__name__!r} but received" + f" value {value!r} of type {type(value).__name__!r}" + ) from e + else: + if not isinstance(var["value"], bool): + raise ValueError( + f"Variable {var_name} expected type {var['type']}" + f" received value `{value}` of type `{type(value)}`" ) - else: - var["value"] = new_value - - except ValueError as e: - raise ConfigValidationError( - f"Variable {var_name!r} with value {value!r} has incorrect type." - f" Expected type {var['type'].__name__!r} but received" - f" value {value!r} of type {type(value).__name__!r}" - ) from e + # When config is first read, `value` is a string + # that's either "False" or "True", + # but since bool("False") is True we need to convert it to bool. + if not isinstance(value, bool): + value = str(value).lower() != "false" + + var["value"] = var["type"](value) else: raise ConfigValidationError( f"Variable {var_name!r} not found in {self.name!r} analysis module" @@ -210,6 +260,32 @@ def inversion(self, value: int) -> None: def get_truncation(self) -> float: return self.get_variable_value("ENKF_TRUNCATION") + def localization(self) -> bool: + return bool(self.get_variable_value("LOCALIZATION")) + + def localization_correlation_threshold(self, ensemble_size: int) -> float: + return correlation_threshold( + ensemble_size, self.get_variable_value("LOCALIZATION_CORRELATION_THRESHOLD") + ) + + def get_steplength(self, iteration_nr: int) -> float: + """ + This is an implementation of Eq. (49), which calculates a suitable + step length for the update step, from the book: + Geir Evensen, Formulating the history matching problem with + consistent error statistics, Computational Geosciences (2021) 25:945 –970 + + Function not really used moved from C to keep the class interface consistent + should be investigated for possible removal. + """ + min_step_length = self.get_variable_value("IES_MIN_STEPLENGTH") + max_step_length = self.get_variable_value("IES_MAX_STEPLENGTH") + dec_step_length = self.get_variable_value("IES_DEC_STEPLENGTH") + step_length = min_step_length + (max_step_length - min_step_length) * pow( + 2, -(iteration_nr - 1) / (dec_step_length - 1) + ) + return step_length + def __repr__(self) -> str: return f"AnalysisModule(name = {self.name})" diff --git a/src/ert/gui/ertwidgets/analysismodulevariablespanel.py b/src/ert/gui/ertwidgets/analysismodulevariablespanel.py index 0ebc75a4cb4..9e8985a1cf4 100644 --- a/src/ert/gui/ertwidgets/analysismodulevariablespanel.py +++ b/src/ert/gui/ertwidgets/analysismodulevariablespanel.py @@ -11,6 +11,7 @@ QWidget, ) +from ert.config.analysis_module import correlation_threshold from ert.gui.ertwidgets.models.analysismodulevariablesmodel import ( AnalysisModuleVariablesModel, ) @@ -41,10 +42,16 @@ def __init__(self, analysis_module_name: str, facade: LibresFacade): variable_type = analysis_module_variables_model.getVariableType( variable_name ) + variable_value = analysis_module_variables_model.getVariableValue( self.facade, self._analysis_module_name, variable_name ) + if variable_name == "LOCALIZATION_CORRELATION_THRESHOLD": + variable_value = correlation_threshold( + self.facade.get_ensemble_size(), variable_value + ) + label_name = analysis_module_variables_model.getVariableLabelName( variable_name ) @@ -123,6 +130,17 @@ def __init__(self, analysis_module_name: str, facade: LibresFacade): lambda value: self.update_truncation_spinners(value, truncation_spinner) ) + localization_checkbox = self.widget_from_layout(layout, "LOCALIZATION") + localization_correlation_spinner = self.widget_from_layout( + layout, "LOCALIZATION_CORRELATION_THRESHOLD" + ) + localization_correlation_spinner.setEnabled(localization_checkbox.isChecked()) + localization_checkbox.stateChanged.connect( + lambda localization_is_on: localization_correlation_spinner.setEnabled(True) + if localization_is_on + else localization_correlation_spinner.setEnabled(False) + ) + self.setLayout(layout) self.blockSignals(False) @@ -172,6 +190,7 @@ def createSpinBox( def createCheckBox(self, variable_name, variable_value, variable_type): spinner = QCheckBox() spinner.setChecked(variable_value) + spinner.setObjectName(variable_name) spinner.clicked.connect( partial(self.valueChanged, variable_name, variable_type, spinner) ) diff --git a/test-data/poly_example/poly_loc.ert b/test-data/poly_example/poly_loc.ert new file mode 100644 index 00000000000..333f930100c --- /dev/null +++ b/test-data/poly_example/poly_loc.ert @@ -0,0 +1,21 @@ +JOBNAME poly_%d + +RANDOM_SEED 1234 + +QUEUE_SYSTEM LOCAL +QUEUE_OPTION LOCAL MAX_RUNNING 50 + +RUNPATH runpath/es_loc/realization-/iter- + +ANALYSIS_SET_VAR STD_ENKF LOCALIZATION True + +OBS_CONFIG observations + +NUM_REALIZATIONS 100 +MIN_REALIZATIONS 1 + +GEN_KW COEFFS coeff.tmpl coeffs.json coeff_priors +GEN_DATA POLY_RES RESULT_FILE:poly.out + +INSTALL_JOB poly_eval POLY_EVAL +SIMULATION_JOB poly_eval diff --git a/test-data/poly_example/poly_loc_1.ert b/test-data/poly_example/poly_loc_1.ert new file mode 100644 index 00000000000..49065d9b6ce --- /dev/null +++ b/test-data/poly_example/poly_loc_1.ert @@ -0,0 +1,22 @@ +JOBNAME poly_%d + +RANDOM_SEED 1234 + +QUEUE_SYSTEM LOCAL +QUEUE_OPTION LOCAL MAX_RUNNING 50 + +RUNPATH runpath/es_loc_1/realization-/iter- + +ANALYSIS_SET_VAR STD_ENKF LOCALIZATION True +ANALYSIS_SET_VAR STD_ENKF LOCALIZATION_CORRELATION_THRESHOLD 1.0 + +OBS_CONFIG observations + +NUM_REALIZATIONS 100 +MIN_REALIZATIONS 1 + +GEN_KW COEFFS coeff.tmpl coeffs.json coeff_priors +GEN_DATA POLY_RES RESULT_FILE:poly.out + +INSTALL_JOB poly_eval POLY_EVAL +SIMULATION_JOB poly_eval diff --git a/test-data/poly_example/poly_no_loc.ert b/test-data/poly_example/poly_no_loc.ert new file mode 100644 index 00000000000..1953ec87e77 --- /dev/null +++ b/test-data/poly_example/poly_no_loc.ert @@ -0,0 +1,19 @@ +JOBNAME poly_%d + +RANDOM_SEED 1234 + +QUEUE_SYSTEM LOCAL +QUEUE_OPTION LOCAL MAX_RUNNING 50 + +RUNPATH runpath/es_no_loc/realization-/iter- + +OBS_CONFIG observations + +NUM_REALIZATIONS 100 +MIN_REALIZATIONS 1 + +GEN_KW COEFFS coeff.tmpl coeffs.json coeff_priors +GEN_DATA POLY_RES RESULT_FILE:poly.out + +INSTALL_JOB poly_eval POLY_EVAL +SIMULATION_JOB poly_eval diff --git a/test-data/poly_example/poly_zero_loc.ert b/test-data/poly_example/poly_zero_loc.ert new file mode 100644 index 00000000000..4c9e7873009 --- /dev/null +++ b/test-data/poly_example/poly_zero_loc.ert @@ -0,0 +1,22 @@ +JOBNAME poly_%d + +RANDOM_SEED 1234 + +QUEUE_SYSTEM LOCAL +QUEUE_OPTION LOCAL MAX_RUNNING 50 + +RUNPATH runpath/es_zero_loc/realization-/iter- + +ANALYSIS_SET_VAR STD_ENKF LOCALIZATION True +ANALYSIS_SET_VAR STD_ENKF LOCALIZATION_CORRELATION_THRESHOLD 0.0 + +OBS_CONFIG observations + +NUM_REALIZATIONS 100 +MIN_REALIZATIONS 1 + +GEN_KW COEFFS coeff.tmpl coeffs.json coeff_priors +GEN_DATA POLY_RES RESULT_FILE:poly.out + +INSTALL_JOB poly_eval POLY_EVAL +SIMULATION_JOB poly_eval diff --git a/test-data/poly_example/test_localization.sh b/test-data/poly_example/test_localization.sh new file mode 100644 index 00000000000..3f04a258692 --- /dev/null +++ b/test-data/poly_example/test_localization.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +ert ensemble_smoother poly_no_loc.ert --target ES_no_loc + +ert ensemble_smoother poly_zero_loc.ert --target ES_zero_loc + +ert ensemble_smoother poly_loc.ert --target ES_loc + +ert ensemble_smoother poly_loc_1.ert --target ES_loc_1 diff --git a/tests/unit_tests/analysis/test_analysis_module.py b/tests/unit_tests/analysis/test_analysis_module.py index e1a26f89ed8..ca1f4734e03 100644 --- a/tests/unit_tests/analysis/test_analysis_module.py +++ b/tests/unit_tests/analysis/test_analysis_module.py @@ -4,7 +4,6 @@ AnalysisMode, AnalysisModule, ConfigValidationError, - get_mode_variables, ) from ert.config.analysis_module import ( DEFAULT_ENKF_TRUNCATION, @@ -12,6 +11,9 @@ DEFAULT_IES_INVERSION, DEFAULT_IES_MAX_STEPLENGTH, DEFAULT_IES_MIN_STEPLENGTH, + DEFAULT_LOCALIZATION, + DEFAULT_LOCALIZATION_CORRELATION_THRESHOLD, + get_mode_variables, ) @@ -25,12 +27,16 @@ def test_analysis_module_default_values(): "IES_DEC_STEPLENGTH": DEFAULT_IES_DEC_STEPLENGTH, "IES_INVERSION": DEFAULT_IES_INVERSION, "ENKF_TRUNCATION": DEFAULT_ENKF_TRUNCATION, + "LOCALIZATION": DEFAULT_LOCALIZATION, + "LOCALIZATION_CORRELATION_THRESHOLD": DEFAULT_LOCALIZATION_CORRELATION_THRESHOLD, # noqa } es_am = AnalysisModule.ens_smoother_module() assert es_am.variable_value_dict() == { "IES_INVERSION": DEFAULT_IES_INVERSION, "ENKF_TRUNCATION": DEFAULT_ENKF_TRUNCATION, + "LOCALIZATION": DEFAULT_LOCALIZATION, + "LOCALIZATION_CORRELATION_THRESHOLD": DEFAULT_LOCALIZATION_CORRELATION_THRESHOLD, # noqa } From 270170c78db3459402450a5d216734cd8194f72a Mon Sep 17 00:00:00 2001 From: Feda Curic Date: Tue, 17 Oct 2023 08:56:58 +0200 Subject: [PATCH 3/5] Introduce param_correlation_sets Fix typing --- src/ert/analysis/_es_update.py | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/src/ert/analysis/_es_update.py b/src/ert/analysis/_es_update.py index 61f25513c94..44ba2f7dc0d 100644 --- a/src/ert/analysis/_es_update.py +++ b/src/ert/analysis/_es_update.py @@ -478,32 +478,38 @@ def analysis_ES( ) c_bool = c_AY > correlation_threshold # Some parameters might be significantly correlated - # to the exact same responses, - # making up what we call a `parameter group``. + # to the exact same responses. # We want to call the update only once per such parameter group # to speed up computation. - param_groups = np.unique(c_bool, axis=0) - - # Drop the parameter group that does not correlate to any responses. - row_with_all_false = np.all(~param_groups, axis=1) - param_groups = param_groups[~row_with_all_false] + # Here we create a collection of unique sets of parameter-to-observation + # correlations. + param_correlation_sets: npt.NDArray[np.bool_] = np.unique( + c_bool, axis=0 + ) + # Drop the correlation set that does not correlate to any responses. + row_with_all_false = np.all(~param_correlation_sets, axis=1) + param_correlation_sets = param_correlation_sets[~row_with_all_false] - for grp in param_groups: + for param_correlation_set in param_correlation_sets: # Find the rows matching the parameter group - matching_rows = np.all(c_bool == grp, axis=1) + matching_rows = np.all(c_bool == param_correlation_set, axis=1) # Get the indices of the matching rows row_indices = np.where(matching_rows)[0] X_chunk = temp_storage[param_group.name][param_batch_idx, :][ row_indices, : ] - S_chunk = S[grp, :] - observation_errors_loc = observation_errors[grp] - observation_values_loc = observation_values[grp] + S_chunk = S[param_correlation_set, :] + observation_errors_loc = observation_errors[ + param_correlation_set + ] + observation_values_loc = observation_values[ + param_correlation_set + ] smoother.fit( S_chunk, observation_errors_loc, observation_values_loc, - noise=noise[grp], + noise=noise[param_correlation_set], truncation=truncation, inversion=ies.InversionType(module.inversion), param_ensemble=param_ensemble, From c8fa508288b2edb188aa6e68df49422626fd3c81 Mon Sep 17 00:00:00 2001 From: Anna Kvashchuk Date: Fri, 6 Oct 2023 12:13:58 +0200 Subject: [PATCH 4/5] Add tests for adaptive localization with threshold 0.0 and 1.0 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored by: Berent Å. S. Lunde --- .../analysis/test_adaptive_localization.py | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 tests/unit_tests/analysis/test_adaptive_localization.py diff --git a/tests/unit_tests/analysis/test_adaptive_localization.py b/tests/unit_tests/analysis/test_adaptive_localization.py new file mode 100644 index 00000000000..a87276ffe29 --- /dev/null +++ b/tests/unit_tests/analysis/test_adaptive_localization.py @@ -0,0 +1,104 @@ +from argparse import ArgumentParser +from textwrap import dedent + +import numpy as np +import pytest + +from ert.__main__ import ert_parser +from ert.cli import ENSEMBLE_SMOOTHER_MODE +from ert.cli.main import run_cli +from ert.config import ErtConfig +from ert.storage import open_storage + + +def run_cli_ES_with_case(poly_config): + config_name = poly_config.split(".")[0] + prior_sample_name = "prior_sample" + "_" + config_name + posterior_sample_name = "posterior_sample" + "_" + config_name + parser = ArgumentParser(prog="test_main") + parsed = ert_parser( + parser, + [ + ENSEMBLE_SMOOTHER_MODE, + "--current-case", + prior_sample_name, + "--target-case", + posterior_sample_name, + "--realizations", + "1-50", + poly_config, + "--port-range", + "1024-65535", + ], + ) + run_cli(parsed) + storage_path = ErtConfig.from_file(poly_config).ens_path + with open_storage(storage_path) as storage: + prior_ensemble = storage.get_ensemble_by_name(prior_sample_name) + prior_sample = prior_ensemble.load_parameters("COEFFS") + posterior_ensemble = storage.get_ensemble_by_name(posterior_sample_name) + posterior_sample = posterior_ensemble.load_parameters("COEFFS") + return prior_sample, posterior_sample + + +@pytest.mark.integration_test +def test_that_adaptive_localization_with_cutoff_1_equals_ensemble_prior(copy_case): + copy_case("poly_example") + random_seed_line = "RANDOM_SEED 1234\n\n" + set_adaptive_localization_1 = dedent( + """ + ANALYSIS_SET_VAR STD_ENKF LOCALIZATION True + ANALYSIS_SET_VAR STD_ENKF LOCALIZATION_CORRELATION_THRESHOLD 1.0 + """ + ) + + with open("poly.ert", "r+", encoding="utf-8") as f: + lines = f.readlines() + lines.insert(2, random_seed_line) + lines.insert(9, set_adaptive_localization_1) + + with open("poly_localization_1.ert", "w", encoding="utf-8") as f: + f.writelines(lines) + prior_sample, posterior_sample = run_cli_ES_with_case("poly_localization_1.ert") + + # Check prior and posterior samples are equal + assert np.allclose(posterior_sample, prior_sample) + + +@pytest.mark.integration_test +def test_that_adaptive_localization_with_cutoff_0_equals_ESupdate(copy_case): + """ + Note that "RANDOM_SEED" in both ert configs needs to be the same to obtain + the same sample from the prior. + """ + copy_case("poly_example") + + random_seed_line = "RANDOM_SEED 1234\n\n" + set_adaptive_localization_0 = dedent( + """ + ANALYSIS_SET_VAR STD_ENKF LOCALIZATION True + ANALYSIS_SET_VAR STD_ENKF LOCALIZATION_CORRELATION_THRESHOLD 0.0 + """ + ) + + with open("poly.ert", "r+", encoding="utf-8") as f: + lines = f.readlines() + lines.insert(2, random_seed_line) + + with open("poly_no_localization.ert", "w", encoding="utf-8") as f: + f.writelines(lines) + + lines.insert(9, set_adaptive_localization_0) + + with open("poly_localization_0.ert", "w", encoding="utf-8") as f: + f.writelines(lines) + + prior_sample_loc0, posterior_sample_loc0 = run_cli_ES_with_case( + "poly_localization_0.ert" + ) + prior_sample_noloc, posterior_sample_noloc = run_cli_ES_with_case( + "poly_no_localization.ert" + ) + + # Check posterior sample without adaptive localization and with cut-off 0 are equal + assert np.allclose(posterior_sample_loc0, posterior_sample_noloc) From 2662448bd667d3f20b46fdccf9844909139a547e Mon Sep 17 00:00:00 2001 From: Blunde1 Date: Mon, 16 Oct 2023 14:49:52 +0200 Subject: [PATCH 5/5] Compute cross-correlation matrices without matrix inversion --- src/ert/analysis/_es_update.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/ert/analysis/_es_update.py b/src/ert/analysis/_es_update.py index 44ba2f7dc0d..df465a5f43b 100644 --- a/src/ert/analysis/_es_update.py +++ b/src/ert/analysis/_es_update.py @@ -444,11 +444,10 @@ def analysis_ES( temp_storage = _create_temporary_parameter_storage( source, iens_active_index, param_group.name ) - if module.localization(): Y_prime = S - S.mean(axis=1, keepdims=True) C_YY = Y_prime @ Y_prime.T / (ensemble_size - 1) - Sigma_Y = np.diag(np.sqrt(np.diag(C_YY))) + Sigma_Y = np.std(S, axis=1, ddof=1) batch_size: int = 1000 correlation_threshold = module.localization_correlation_threshold( ensemble_size @@ -465,17 +464,16 @@ def analysis_ES( batches = _split_by_batchsize(np.arange(0, num_params), batch_size) for param_batch_idx in tqdm(batches): X_local = temp_storage[param_group.name][param_batch_idx, :] + # Parameter standard deviations + Sigma_A = np.std(X_local, axis=1, ddof=1) + # Cross-covariance between parameters and measurements A = X_local - X_local.mean(axis=1, keepdims=True) - C_AA = A @ A.T / (ensemble_size - 1) - - # State-measurement covariance matrix C_AY = A @ Y_prime.T / (ensemble_size - 1) - Sigma_A = np.diag(np.sqrt(np.diag(C_AA))) - - # State-measurement correlation matrix + # Cross-correlation between parameters and measurements c_AY = np.abs( - np.linalg.inv(Sigma_A) @ C_AY @ np.linalg.inv(Sigma_Y) + (C_AY / Sigma_Y.reshape(1, -1)) / Sigma_A.reshape(-1, 1) ) + # Absolute values of the correlation matrix c_bool = c_AY > correlation_threshold # Some parameters might be significantly correlated # to the exact same responses.