diff --git a/docs/how_to_guides/how_to_run_differential_parameter_sweep.rst b/docs/how_to_guides/how_to_run_differential_parameter_sweep.rst index 81cd3d2e88..c6d06723eb 100644 --- a/docs/how_to_guides/how_to_run_differential_parameter_sweep.rst +++ b/docs/how_to_guides/how_to_run_differential_parameter_sweep.rst @@ -185,8 +185,8 @@ With the flowsheet defined and suitably initialized, along with the definitions # Run the parameter sweep global_results = differential_parameter_sweep( - model, - sweep_params, + build_model, + build_sweep_params, differential_sweep_specs, outputs, h5_results_file_name='monte_carlo_results.h5', diff --git a/docs/how_to_guides/how_to_use_loopTool_to_explore_flowsheets.rst b/docs/how_to_guides/how_to_use_loopTool_to_explore_flowsheets.rst index a2a0300614..f848a8a83b 100644 --- a/docs/how_to_guides/how_to_use_loopTool_to_explore_flowsheets.rst +++ b/docs/how_to_guides/how_to_use_loopTool_to_explore_flowsheets.rst @@ -80,7 +80,7 @@ This enables user to have multiple setup files that can be used to run simulatio * *build_defaults* – default arguments that are passed for every sweep. For example in ro_erd flowsheet, shown here, this could be erd_type, but in other flowsheets, this could define any default options, from file locations to the number of stages, etc. The defaults will be passed along with loop values unless the loop value overrides them. * *init_defaults* - defaults for initialization_function, same behavior as build-defaults but used for initialization_function * *optimize_defaults* - defaults for optimize_function, same behavior as build or init defaults, but for optimize function. - * *build_output_kwargs * - a list of keys to only save in .h5 file. loopTool provides a default build_outputs function that takes in a dict containing model key name and model key (e.g., to output only m.fs.costing.LCOW user would include following dict *LCOW : fs.costing.LCOW*). The default function uses model.find_component to construct an output dict. Alternatively user can provide kwargs to a user-provided build_outputs function that would be linked to loopTool (shown below). + * *build_output_kwargs* - a list of keys to only save in .h5 file. loopTool provides a default build_outputs function that takes in a dict containing model key name and model key (e.g., to output only m.fs.costing.LCOW user would include following dict *LCOW : fs.costing.LCOW*). The default function uses model.find_component to construct an output dict. Alternatively user can provide kwargs to a user-provided build_outputs function that would be linked to loopTool (shown below). **loop options:** diff --git a/tutorials/assets_parameter_sweep_demo/parameter_sweep_demo_script.py b/tutorials/assets_parameter_sweep_demo/parameter_sweep_demo_script.py index 6a52b08f26..a39dd6008e 100644 --- a/tutorials/assets_parameter_sweep_demo/parameter_sweep_demo_script.py +++ b/tutorials/assets_parameter_sweep_demo/parameter_sweep_demo_script.py @@ -51,7 +51,6 @@ def build_sweep_params(m, num_samples=1, scenario="A_comp_vs_LCOW"): def run_parameter_sweep(num_samples=100, num_procs=1): - ps1, kwargs_dict1 = create_parameter_sweep_object( num_samples, num_procs, parallel_backend="ConcurrentFutures" ) @@ -103,7 +102,6 @@ def create_parameter_sweep_object( num_procs, parallel_backend="ConcurrentFutures", ): - solver = get_solver() kwargs_dict = { "debugging_data_dir": None, @@ -149,7 +147,6 @@ def create_recursive_parameter_sweep_object( num_procs, parallel_backend="ConcurrentFutures", ): - solver = get_solver() kwargs_dict = { "debugging_data_dir": None, @@ -195,19 +192,20 @@ def create_differential_parameter_sweep_object( num_procs, parallel_backend="ConcurrentFutures", ): - solver = get_solver() m = build_model(read_model_defauls_from_file=False) - differential_sweep_specs = { - "A_comp": { - "diff_mode": "sum", - "diff_sample_type": UniformSample, - "relative_lb": 0.01, - "relative_ub": 0.01, - "pyomo_object": m.fs.RO.A_comp, + def build_spec(model): + differential_sweep_specs = { + "A_comp": { + "diff_mode": "sum", + "diff_sample_type": UniformSample, + "relative_lb": 0.01, + "relative_ub": 0.01, + "pyomo_object": model.fs.RO.A_comp, + } } - } + return differential_sweep_specs kwargs_dict = { "debugging_data_dir": None, @@ -228,8 +226,8 @@ def create_differential_parameter_sweep_object( "build_outputs_kwargs": {}, "optimize_function": optimize, "optimize_kwargs": {"solver": solver, "check_termination": False}, - "num_diff_samples": 2, - "differential_sweep_specs": differential_sweep_specs, + "num_diff_samples": 1, + "build_differential_sweep_specs": build_spec, "initialize_function": None, "update_sweep_params_before_init": False, "initialize_kwargs": {}, @@ -252,7 +250,6 @@ def create_differential_parameter_sweep_object( if __name__ == "__main__": - import sys import time import numpy as np diff --git a/tutorials/parameter_sweep_demo.ipynb b/tutorials/parameter_sweep_demo.ipynb index 91adfcb177..2c81f79cd2 100644 --- a/tutorials/parameter_sweep_demo.ipynb +++ b/tutorials/parameter_sweep_demo.ipynb @@ -192,62 +192,6 @@ ")" ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "a9de9f80", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "outputs": [], - "source": [ - "??build_model" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "d106ae65", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "outputs": [], - "source": [ - "??build_outputs" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "b07c1657", - "metadata": { - "slideshow": { - "slide_type": "subslide" - } - }, - "outputs": [], - "source": [ - "??optimize" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "015e21aa", - "metadata": { - "slideshow": { - "slide_type": "slide" - } - }, - "outputs": [], - "source": [ - "??build_sweep_params" - ] - }, { "cell_type": "code", "execution_count": null, @@ -442,20 +386,6 @@ "*Run the simple parameter sweep in recursion if some runs fail to ensure that a user-specified number of sample results are generated. This involves resampling the input parameter space to compensate for the failed runs.*" ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "0d963217", - "metadata": { - "slideshow": { - "slide_type": "slide" - } - }, - "outputs": [], - "source": [ - "??create_recursive_parameter_sweep_object" - ] - }, { "cell_type": "code", "execution_count": null, @@ -537,16 +467,6 @@ "* The differential sweep is a simple parameter sweep that occurs at every nominal value, where one of the sweep parameters is perturbed keeping the others fixed to their nominal values." ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "5030e928", - "metadata": {}, - "outputs": [], - "source": [ - "??create_differential_parameter_sweep_object" - ] - }, { "cell_type": "code", "execution_count": null, @@ -557,7 +477,8 @@ "outputs": [], "source": [ "num_samples = 2\n", - "num_procs = 2\n", + "num_procs = 1\n", + "\n", "model, dps, dkwargs_dict = create_differential_parameter_sweep_object(num_samples, num_procs)\n", "dps_results_array, dps_results_dict = dps.parameter_sweep(\n", " dkwargs_dict[\"build_model\"],\n", @@ -874,7 +795,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.5" + "version": "3.9.16" }, "rise": { "enable_chalkboard": true, diff --git a/watertap/tools/analysis_tools/loop_tool/loop_tool.py b/watertap/tools/analysis_tools/loop_tool/loop_tool.py index 665f90b132..89b9314fa3 100644 --- a/watertap/tools/analysis_tools/loop_tool/loop_tool.py +++ b/watertap/tools/analysis_tools/loop_tool/loop_tool.py @@ -723,8 +723,12 @@ def run_diff_parameter_sweep(self): ps_kwargs["build_outputs_kwargs"] = self.build_outputs_kwargs ps_kwargs[ - "differential_sweep_specs" - ] = ParameterSweepReader()._dict_to_diff_spec(m, self.differential_sweep_specs) + "build_differential_sweep_specs" + ] = ParameterSweepReader()._dict_to_diff_spec + ps_kwargs["build_differential_sweep_specs_kwargs"] = { + "input_dict": self.differential_sweep_specs + } + ps_kwargs["num_diff_samples"] = self.diff_samples ps = DifferentialParameterSweep(**ps_kwargs) ps.parameter_sweep( diff --git a/watertap/tools/parameter_sweep/model_manager.py b/watertap/tools/parameter_sweep/model_manager.py index d9e8e50456..c1bc801762 100644 --- a/watertap/tools/parameter_sweep/model_manager.py +++ b/watertap/tools/parameter_sweep/model_manager.py @@ -27,6 +27,9 @@ def __init__(self, ps_instance): self.is_initialized = False self.is_solved = False self.is_prior_parameter_solved = False + # internal state option to be used by differntial sweep tool to + # prevent model reintialization and state reset + self._is_rebuild_and_init_enabled = True # this is isused for loggin states if enabled self.solved_states = {"state": [], "local_value_k": []} self.initialized_states = {"state": [], "local_value_k": []} diff --git a/watertap/tools/parameter_sweep/parameter_sweep.py b/watertap/tools/parameter_sweep/parameter_sweep.py index d47c322f08..b5292be6b2 100644 --- a/watertap/tools/parameter_sweep/parameter_sweep.py +++ b/watertap/tools/parameter_sweep/parameter_sweep.py @@ -34,6 +34,10 @@ from watertap.tools.parallel.parallel_manager_factory import create_parallel_manager from watertap.tools.parameter_sweep.model_manager import ModelManager +from watertap.tools.parameter_sweep.paramter_sweep_parallel_utils import ( + _ParameterSweepParallelUtils, + return_none, +) def _default_optimize(model, options=None, tee=False): @@ -254,6 +258,14 @@ class _ParameterSweepBase(ABC): description="Enables loging of model states during serial execution", ), ) + CONFIG.declare( + "index_global_combo_array", + ConfigValue( + default=False, + domain=bool, + description="Will add indexing to global_combo_array, primarily used with differential parameter sweep tool", + ), + ) def __init__( self, @@ -261,6 +273,7 @@ def __init__( ): parallel_manager_class = options.pop("parallel_manager_class", None) self.model = None + self.model_manager = None self.config = self.CONFIG(options) self.parallel_manager = create_parallel_manager( @@ -344,7 +357,10 @@ def _create_global_combo_array(self, d, sampling_type): if not global_combo_array.flags.c_contiguous: # If not, return a copy of this array with row-major memory order global_combo_array = np.ascontiguousarray(global_combo_array) - + # add sample index for tracking of sample number in parallel schema + if self.config.index_global_combo_array: + sample_idx = np.arange(global_combo_array.shape[0]).reshape(-1, 1) + global_combo_array = np.hstack((sample_idx, global_combo_array)) return global_combo_array """ @@ -386,16 +402,21 @@ def _get_object(self, model, pyomo_object): return model.find_component(name) def _update_model_values(self, m, param_dict, values): + # remove index from values + if self.config.index_global_combo_array: + non_indexed_values = values[1:] + else: + non_indexed_values = values + for k, item in enumerate(param_dict.values()): - name = self._get_object(m, item.pyomo_object) - param = m.find_component(name) + param = self._get_object(m, item.pyomo_object) if param.is_variable_type(): # Fix the single value to values[k] - param.fix(values[k]) + param.fix(non_indexed_values[k]) elif param.is_parameter_type(): # Fix the single value to values[k] - param.set_value(values[k]) + param.set_value(non_indexed_values[k]) else: raise RuntimeError(f"Unrecognized Pyomo object {param}") @@ -469,7 +490,6 @@ def _create_local_output_skeleton(self, model, sweep_params, outputs, num_sample ] = self._create_component_output_skeleton( self._get_object(model, pyo_obj), num_samples ) - return output_dict def _create_component_output_skeleton(self, component, num_samples): @@ -593,11 +613,15 @@ def _create_global_output(self, local_output_dict, req_num_samples=None): return global_output_dict def _param_sweep_kernel(self, sweep_params, local_value_k): - initialize_before_sweep = self.config.initialize_before_sweep # Forced reinitialization of the flowsheet if enabled + # and model is not already initalized at givel local sweep param set # or init if model was not initialized or prior solved failed (if solved failed, init state is false) - if initialize_before_sweep or self.model_manager.is_initialized == False: - self.model_manager.build_and_init(sweep_params, local_value_k) + if ( + self.config.initialize_before_sweep + and all(self.model_manager.current_k == local_value_k) == False + ) or self.model_manager.is_initialized == False: + if self.model_manager._is_rebuild_and_init_enabled: + self.model_manager.build_and_init(sweep_params, local_value_k) # try to solve our model self.model_manager.update_model_params(sweep_params, local_value_k) self.model_manager.solve_model() @@ -607,6 +631,7 @@ def _param_sweep_kernel(self, sweep_params, local_value_k): if ( self.model_manager.is_solved == False and self.model_manager.is_prior_parameter_solved == True + and self.model_manager._is_rebuild_and_init_enabled ): self.model_manager.build_and_init(sweep_params, local_value_k) self.model_manager.update_model_params(sweep_params, local_value_k) @@ -654,9 +679,14 @@ def _do_param_sweep(self, sweep_params, outputs, local_values): # build and init model, we also pass first set of paramters incase user wants # to update them before initlizeing the model - self.model_manager.build_and_init( - sweep_params=sweep_params, local_value_k=local_values[0, :] - ) + if ( + self.config.initialize_before_sweep + or self.model_manager.is_initialized == False + ): + if self.model_manager._is_rebuild_and_init_enabled: + self.model_manager.build_and_init( + sweep_params=sweep_params, local_value_k=local_values[0, :] + ) local_num_cases = np.shape(local_values)[0] @@ -692,167 +722,9 @@ def parameter_sweep(self, *args, **kwargs): pass -class ParameterSweep(_ParameterSweepBase): +class ParameterSweep(_ParameterSweepBase, _ParameterSweepParallelUtils): CONFIG = _ParameterSweepBase.CONFIG() - @classmethod - def remove_unpicklable_state(cls, parameter_sweep_instance): - """ - Remove and return any state from the ParameterSweep object that cannot be - pickled, to make the instance picklable. Needed in order to use the - ConcurrentFuturesParallelManager. - """ - saved_state = { - "parallel_manager": parameter_sweep_instance.parallel_manager, - "writer": parameter_sweep_instance.writer, - } - - parameter_sweep_instance.parallel_manager = None - parameter_sweep_instance.writer = None - return saved_state - - @classmethod - def restore_unpicklable_state(cls, parameter_sweep_instance, state): - """ - Restore a collection of saved state that was removed in order to pickle - the ParameterSweep object. - """ - parameter_sweep_instance.parallel_manager = state.get("parallel_manager", None) - parameter_sweep_instance.writer = state.get("writer", None) - - """ - Combine all of the results retrieved from calling gather(). - - all_results is a list of Result objects, each representing the - parameters and results from running the optimization on one process. - """ - - def _combine_gather_results(self, all_results): - if len(all_results) == 0: - return None - - # create the output skeleton based on the first set of results - # we assume the results are in dict format - initial_results = all_results[0].results - - combined_results = copy.deepcopy(initial_results) - - # remove any lingering pyomo objects, and convert inner results to numpy arrays - for key, val in combined_results.items(): - if key != "solve_successful": - for subkey, subval in val.items(): - if "_pyo_obj" in subval: - del subval["_pyo_obj"] - - # for each result, concat the "value" array of results into the - # gathered results to combine them all - - # get length of data in first result for finding missing keys - total_chunk_length = len(all_results[0].results["solve_successful"]) - - for i, result in enumerate(all_results[1:]): - results = result.results - - for key, val in results.items(): - if key == "solve_successful": - combined_results[key] = np.append( - combined_results[key], copy.deepcopy(val) - ) - continue - - for subkey, subval in val.items(): - # lets catch any keys that don' exist in result[0] and - # create empty array with expected length, after which we will add - # additional values, or add nan's instead - if subkey not in combined_results[key]: - # create empty array, as none of results so far had this key\ - - combined_results[key][subkey] = {} - for sub_subkey, value in subval.items(): - if sub_subkey == "value": - combined_results[key][subkey]["value"] = ( - np.zeros(total_chunk_length) * np.nan - ) - else: - combined_results[key][subkey][sub_subkey] = value - combined_results[key][subkey]["value"] = np.append( - combined_results[key][subkey]["value"], - copy.deepcopy( - subval["value"], - ), - ) - # keep track of our subchunk_length - sub_chunk_length = len(subval["value"]) - - # make sure we add any empty value to missing keys - - for subkey in combined_results[key]: - if subkey not in val.keys(): - empty_chunk = np.zeros(sub_chunk_length) * np.nan - combined_results[key][subkey]["value"] = np.append( - combined_results[key][subkey]["value"], empty_chunk - ) - total_chunk_length += sub_chunk_length - return combined_results - - """ - Build up a list of the outputs for each result of the optimization. - Returned as a list of lists, where each inner list is the results from - one process's run. - """ - - def _combine_output_array(self, gathered_results): - outputs = gathered_results["outputs"] - if len(outputs) == 0: - return [] - - # assume all output arrays have the same length - combined_outputs = [ - np.asarray([]) for _ in range(len(list(outputs.values())[0]["value"])) - ] - for _, output in outputs.items(): - for i in range(len(output["value"])): - combined_outputs[i] = np.append(combined_outputs[i], output["value"][i]) - return np.asarray(combined_outputs) - - """ - Use the embedded ParallelManager to fan out and then back in the results. - Args: - - build_model: a function for building the flowsheet model - - build_model_kwargs: any keyword args necessary for the build_model function - - build_sweep_params: a function for building the sweep parameters - - build_sweep_params_kwargs: any keyword args necessary for the build_sweep_params - function - - build_outputs: a function for building the outputs dictionary - - all_parameter_combinations: a list where each element represents the parameters - for a single local run - Returns: - - a list of LocalResults representing the results of the simulation runs - """ - - def run_scatter_gather( - self, - all_parameter_combinations, - ): - # save a reference to the parallel manager since it will be removed - # along with the other unpicklable state - parallel_manager = self.parallel_manager - saved_state = ParameterSweep.remove_unpicklable_state(self) - - do_build_kwargs = {"param_sweep_instance": self} - - parallel_manager.scatter( - do_build, - do_build_kwargs, - do_execute, - all_parameter_combinations, - ) - - # gather the results and combine them into the format we want - all_results = parallel_manager.gather() - ParameterSweep.restore_unpicklable_state(self, saved_state) - - return all_results - def parameter_sweep( self, build_model, @@ -922,18 +794,20 @@ def parameter_sweep( ) all_results = self.run_scatter_gather( - all_parameter_combinations, + all_parameter_combinations, ParameterSweep ) global_sweep_results_dict = self._combine_gather_results(all_results) combined_output_arr = self._combine_output_array(global_sweep_results_dict) - + all_parameter_combinations_solved = self._combine_input_array( + global_sweep_results_dict + ) # save the results for all simulations run by this process and its children for results in self.parallel_manager.results_from_local_tree(all_results): self.writer.save_results( sweep_params, results.parameters, - all_parameter_combinations, + all_parameter_combinations_solved, results.results, global_sweep_results_dict, combined_output_arr, @@ -1216,56 +1090,3 @@ def parameter_sweep( ) return global_save_data, global_filtered_dict - - -def do_build( - param_sweep_instance, -): - """ - Used to pass into the parallel manager to build the parameters necessary - for the sweep function. Defined at the top level so it's picklable. - """ - ps_config = param_sweep_instance.config - model = ps_config.build_model(**ps_config.build_model_kwargs) - sweep_params = ps_config.build_sweep_params( - model, **ps_config.build_sweep_params_kwargs - ) - sweep_params, sampling_type = param_sweep_instance._process_sweep_params( - sweep_params - ) - outputs = ps_config.build_outputs(model, **ps_config.build_outputs_kwargs) - - if outputs is not None: - param_sweep_instance.assign_variable_names(model, outputs) - - return [param_sweep_instance, model, sweep_params, outputs] - - -def do_execute( - local_combo_array, - param_sweep_instance, - model, - sweep_params, - outputs, -): - """ - Used to pass into the parallel manager in order to execute the sweep - for a set of local values. Defined at the top level so it's picklable. - """ - - if param_sweep_instance.config.custom_do_param_sweep is not None: - return param_sweep_instance.config.custom_do_param_sweep( - param_sweep_instance, sweep_params, outputs, local_combo_array - ) - - return param_sweep_instance._do_param_sweep( - sweep_params, outputs, local_combo_array - ) - - -def return_none(model, outputkeys=None): - """ - Used so that build_outputs=None is a valid usage of the parameter sweep tool - without requiring the user to wrap it in a function. - """ - return None diff --git a/watertap/tools/parameter_sweep/parameter_sweep_differential.py b/watertap/tools/parameter_sweep/parameter_sweep_differential.py index d0635c48c5..b417569a7c 100644 --- a/watertap/tools/parameter_sweep/parameter_sweep_differential.py +++ b/watertap/tools/parameter_sweep/parameter_sweep_differential.py @@ -10,23 +10,26 @@ # "https://github.com/watertap-org/watertap/" ################################################################################# import numpy as np +import warnings from pyomo.common.config import ConfigValue from watertap.tools.parameter_sweep.sampling_types import NormalSample from watertap.tools.parameter_sweep.parameter_sweep import ( _ParameterSweepBase, ParameterSweep, - return_none, ) from watertap.tools.parallel.single_process_parallel_manager import ( SingleProcessParallelManager, ) - from pyomo.common.deprecation import deprecation_warning +from watertap.tools.parameter_sweep.paramter_sweep_parallel_utils import ( + _ParameterSweepParallelUtils, + return_none, +) -class DifferentialParameterSweep(_ParameterSweepBase): +class DifferentialParameterSweep(_ParameterSweepBase, _ParameterSweepParallelUtils): CONFIG = _ParameterSweepBase.CONFIG() CONFIG.declare( @@ -50,8 +53,8 @@ class DifferentialParameterSweep(_ParameterSweepBase): CONFIG.declare( "differential_sweep_specs", ConfigValue( - default=dict(), - domain=dict, + default=None, + domain=None, description="Dictionary containing the specifications for the differential sweep", doc=""" A specification dictionary that contains details for how to construct the parameter sweep dictionary for differential sweep. @@ -86,6 +89,53 @@ class DifferentialParameterSweep(_ParameterSweepBase): """, ), ) + CONFIG.declare( + "build_differential_sweep_specs", + ConfigValue( + default=None, + # domain=function, + description="Function for building the differential_sweep_specs", + doc=""" + Must build a specification dictionary that contains details for how to construct the parameter sweep dictionary for differential sweep. + This is a nested dictionary where the first level denotes the variable names for which the differential sweep needs to be carried out. + The second level denotes various options to be used for wach variable. + The number of samples for each differential sweep is specified while initializing the DifferentialParameterSweep object wsing the keyword `num_diff_samples` + e.g. + + { + "fs.a": { + "diff_mode": "sum", + "diff_sample_type": NormalSample, + "std_dev": 0.01, + "pyomo_object": m.fs.input["a"], + }, + "fs.b": { + "diff_mode": "product", + "diff_sample_type": UniformSample, + "relative_lb": 0.01, + "relative_ub": 0.01, + "pyomo_object": m.fs.input["b"], + }, + "fs.c": { + "diff_mode": "sum", + "diff_sample_type": GeomSample, + "relative_lb": 0.01, + "relative_ub": 10.0, + "pyomo_object": m.fs.input["c"], + }, + } + + """, + ), + ) + CONFIG.declare( + "build_differential_sweep_specs_kwargs", + ConfigValue( + default=dict(), + domain=dict, + description="Keyword argument for the building differential sweep function", + ), + ) def __init__( self, @@ -93,16 +143,25 @@ def __init__( ): # Initialize the base Class super().__init__(**options) - + self.config.index_global_combo_array = True if self.config.guarantee_solves: raise NotImplementedError + if self.config.debugging_data_dir is not None: + warnings.warn( + "debugging_data_dir is not configured to work with differential parameter sweep." + ) + def _create_differential_sweep_params(self, local_values): - differential_sweep_specs = self.config.differential_sweep_specs + differential_sweep_specs = self.config.build_differential_sweep_specs( + self.model_manager.model, + **self.config.build_differential_sweep_specs_kwargs, + ) diff_sweep_param = {} + non_indexed_values = local_values[1:] for ctr, (param, specs) in enumerate(differential_sweep_specs.items()): - nominal_val = local_values[self.diff_spec_index[ctr]] + nominal_val = non_indexed_values[self.diff_spec_index[ctr]] pyomo_object = specs["pyomo_object"] if specs["diff_sample_type"] == NormalSample: std_dev = specs["std_dev"] @@ -132,8 +191,10 @@ def _create_differential_sweep_params(self, local_values): return diff_sweep_param - def _check_differential_sweep_key_validity(self, sweep_params): - diff_specs_keys = list(self.config.differential_sweep_specs.keys()) + def _check_differential_sweep_key_validity( + self, differential_sweep_spec, sweep_params + ): + diff_specs_keys = list(differential_sweep_spec.keys()) sweep_param_keys = list(sweep_params.keys()) if all(key in sweep_param_keys for key in diff_specs_keys): @@ -145,11 +206,17 @@ def _check_differential_sweep_key_validity(self, sweep_params): "differential_sweep_specs keys don't match with sweep_param keys" ) - def _define_differential_sweep_outputs(self, sweep_params): - self.differential_outputs = self.outputs - if self.outputs is not None: + def _define_differential_sweep_outputs(self, model, sweep_params): + # Currently used in do_build function only (check paramter_sweep_parallel_utils.py) + self.differential_outputs = self.config.build_outputs( + model, **self.config.build_outputs_kwargs + ) + differential_sweep_spec = self.config.build_differential_sweep_specs( + model, **self.config.build_differential_sweep_specs_kwargs + ) + if self.differential_outputs is not None: for key in sweep_params.keys(): - if key not in self.config.differential_sweep_specs.keys(): + if key not in differential_sweep_spec.keys(): self.differential_outputs[key] = sweep_params[key].pyomo_object def _create_local_output_skeleton(self, model, sweep_params, outputs, num_samples): @@ -162,7 +229,9 @@ def _create_local_output_skeleton(self, model, sweep_params, outputs, num_sample output_dict["differential_idx"] = np.array([np.nan] * num_samples) return output_dict - def _append_differential_results(self, local_output_dict, diff_results_dict): + def _append_differential_results( + self, local_output_dict, diff_results_dict, local_values + ): for idx, diff_sol in diff_results_dict.items(): for key, item in diff_sol.items(): # Solve status @@ -171,28 +240,31 @@ def _append_differential_results(self, local_output_dict, diff_results_dict): local_output_dict["solve_successful"].extend(item) local_output_dict["nominal_idx"] = np.concatenate( ( - local_output_dict["nominal_idx"], - np.array([np.nan] * n_diff_samples), + local_values[:, 0], + np.array( + [np.nan] * np.repeat(local_values[:, 0], n_diff_samples) + ), ), axis=0, ) + local_output_dict["differential_idx"] = np.concatenate( ( - local_output_dict["differential_idx"], - np.array([idx] * n_diff_samples, dtype=float), + np.array(np.nan * local_values[:, 0]), + np.repeat(local_values[:, 0], n_diff_samples), ), axis=0, ) - else: + # TODO review for correct implementation with selected outputs for subkey, subitem in item.items(): - local_output_dict[key][subkey]["value"] = np.concatenate( - ( - local_output_dict[key][subkey]["value"], - subitem["value"], + if subkey in list(local_output_dict[key].keys()): + local_output_dict[key][subkey]["value"] = np.concatenate( + ( + local_output_dict[key][subkey]["value"], + subitem["value"], + ) ) - ) - # We also need to capture sweep_params variables that are not a part of differential_sweep_specs if key == "sweep_params": missing_sub_keys = set(item.keys()) ^ set( @@ -210,58 +282,6 @@ def _append_differential_results(self, local_output_dict, diff_results_dict): ) ) - def _collect_local_inputs(self, local_results_dict): - num_local_samples = len(local_results_dict["solve_successful"]) - local_inputs = np.zeros( - (num_local_samples, len(local_results_dict["sweep_params"])), - dtype=float, - ) - - for i, (key, item) in enumerate(local_results_dict["sweep_params"].items()): - local_inputs[:, i] = item["value"] - - return local_inputs - - def _aggregate_input_arr(self, global_results_dict, num_global_samples): - global_values = np.zeros( - (num_global_samples, len(global_results_dict["sweep_params"])), - dtype=float, - ) - - if self.parallel_manager.is_root_process(): - for i, (key, item) in enumerate( - global_results_dict["sweep_params"].items() - ): - global_values[:, i] = item["value"] - - self.parallel_manager.sync_array_with_peers(global_values) - - return global_values - - def _aggregate_results(self, local_output_dict): - # Create the global results dictionary - global_results_dict = self._create_global_output(local_output_dict) - - # Broadcast the number of global samples to all ranks - num_global_samples = len(global_results_dict["solve_successful"]) - num_global_samples = self.parallel_manager.sync_pyobject_with_peers( - num_global_samples - ) - - global_results_arr = self._aggregate_results_arr( - global_results_dict, num_global_samples - ) - global_input_values = self._aggregate_input_arr( - global_results_dict, num_global_samples - ) - - return ( - global_results_dict, - global_results_arr, - global_input_values, - num_global_samples, - ) - def _create_global_output(self, local_output_dict): # , req_num_samples=None): global_output_dict = super()._create_global_output(local_output_dict) @@ -328,7 +348,9 @@ def _run_differential_sweep(self, local_value): ) # pass model_manager from refernce sweep, to diff sweep # so we don't have to reijnit he model + diff_ps.config.index_global_combo_array = True diff_ps.model_manager = self.model_manager + diff_ps.model_manager._is_rebuild_and_init_enabled = False _, differential_sweep_output_dict = diff_ps.parameter_sweep( diff_ps.model_manager.model, @@ -337,7 +359,7 @@ def _run_differential_sweep(self, local_value): num_samples=self.config.num_diff_samples, seed=self.seed, ) - + diff_ps.model_manager._is_rebuild_and_init_enabled = True return differential_sweep_output_dict def _run_sample( @@ -366,7 +388,7 @@ def _do_param_sweep(self, sweep_params, outputs, local_values): # Now append the outputs of the differential solves self._append_differential_results( - local_output_dict, self.differential_sweep_output_dict + local_output_dict, self.differential_sweep_output_dict, local_values ) return local_output_dict @@ -383,6 +405,7 @@ def parameter_sweep( build_sweep_params_kwargs=None, ): # Create a base sweep_params + build_model_kwargs = ( build_model_kwargs if build_model_kwargs is not None else dict() ) @@ -424,6 +447,15 @@ def parameter_sweep( and will not work with future implementations of parallelism.", version="0.10.0", ) + if not callable(self.config.build_differential_sweep_specs): + _diff_spec = self.config.build_differential_sweep_specs + self.config.build_differential_sweep_specs = lambda model: _diff_spec + deprecation_warning( + "Passing the differential sweep spec dict directly to the parameter_sweep function is deprecated \ + and will not work with future implementations of parallelism. Please instead pass \ + a build_differential_sweep_specs function instead", + version="0.10.0", + ) # This should be depreciated in future versions self.config.build_model = build_model self.config.build_sweep_params = build_sweep_params @@ -435,72 +467,46 @@ def parameter_sweep( model = build_model(**build_model_kwargs) sweep_params = build_sweep_params(model, **build_sweep_params_kwargs) sweep_params, sampling_type = self._process_sweep_params(sweep_params) - + differential_sweep_spec = self.config.build_differential_sweep_specs( + model, **self.config.build_differential_sweep_specs_kwargs + ) # Check if the keys in the differential sweep specs exist in sweep params - self._check_differential_sweep_key_validity(sweep_params) - - # Define differential sweep outputs - self.outputs = self.config.build_outputs( - model, *self.config.build_outputs_kwargs + self._check_differential_sweep_key_validity( + differential_sweep_spec, sweep_params ) - self._define_differential_sweep_outputs(sweep_params) # Set the seed before sampling self.seed = seed np.random.seed(self.seed) # Enumerate/Sample the parameter space - global_values = self._build_combinations( + all_parameter_combinations = self._build_combinations( sweep_params, sampling_type, num_samples ) - # divide the workload between processors - local_values = self._divide_combinations(global_values) - self.n_nominal_local = np.shape(local_values)[0] - - # Check if the outputs have the name attribute. If not, assign one. - if self.outputs is not None: - self.assign_variable_names(model, self.outputs) - - # Create a dictionary to store all the differential ps_objects - self.diff_ps_dict = {} + all_results = self.run_scatter_gather( + all_parameter_combinations, DifferentialParameterSweep + ) + global_sweep_results_dict = self._combine_gather_results(all_results) + combined_output_arr = self._combine_output_array(global_sweep_results_dict) + all_parameter_combinations_solved = self._combine_input_array( + global_sweep_results_dict + ) - # Do the Loop - if self.config.custom_do_param_sweep is None: - local_results_dict = self._do_param_sweep( + # save the results for all simulations run by this process and its children + for results in self.parallel_manager.results_from_local_tree(all_results): + self.writer.save_results( sweep_params, - self.outputs, - local_values, - ) - else: - local_results_dict = self.config.custom_do_param_sweep( - self, - sweep_params, - self.outputs, - local_values, - **self.config.custom_do_param_sweep_kwargs, + results.parameters, + all_parameter_combinations_solved, + results.results, + global_sweep_results_dict, + combined_output_arr, + process_number=results.process_number, ) - # re-writing local_values - local_values = self._collect_local_inputs(local_results_dict) - - # Aggregate results on Master - ( - global_results_dict, - global_results_arr, - global_input_arr, - num_global_samples, - ) = self._aggregate_results(local_results_dict) - - # Save to file - global_save_data = self.writer.save_results( - sweep_params, - local_values, - global_input_arr, - local_results_dict, - global_results_dict, - global_results_arr, - self.parallel_manager.get_rank(), + global_save_data = np.hstack( + (all_parameter_combinations_solved, combined_output_arr) ) - return global_save_data, global_results_dict + return global_save_data, global_sweep_results_dict diff --git a/watertap/tools/parameter_sweep/parameter_sweep_functions.py b/watertap/tools/parameter_sweep/parameter_sweep_functions.py index b961c65b79..7a31eb390c 100644 --- a/watertap/tools/parameter_sweep/parameter_sweep_functions.py +++ b/watertap/tools/parameter_sweep/parameter_sweep_functions.py @@ -323,7 +323,7 @@ def recursive_parameter_sweep( def differential_parameter_sweep( build_model, build_sweep_params, - differential_sweep_specs, + build_differential_sweep_specs, build_outputs=None, csv_results_file_name=None, h5_results_file_name=None, @@ -455,7 +455,7 @@ def differential_parameter_sweep( """ kwargs = {} - kwargs["differential_sweep_specs"] = differential_sweep_specs + kwargs["build_differential_sweep_specs"] = build_differential_sweep_specs if csv_results_file_name is not None: kwargs["csv_results_file_name"] = csv_results_file_name if h5_results_file_name is not None: diff --git a/watertap/tools/parameter_sweep/parameter_sweep_writer.py b/watertap/tools/parameter_sweep/parameter_sweep_writer.py index 2070d87036..8a78d1fa4e 100644 --- a/watertap/tools/parameter_sweep/parameter_sweep_writer.py +++ b/watertap/tools/parameter_sweep/parameter_sweep_writer.py @@ -23,7 +23,6 @@ class ParameterSweepWriter: - CONFIG = ConfigDict() CONFIG.declare( @@ -72,7 +71,6 @@ def __init__( parallel_manager, **options, ): - self.parallel_manager = parallel_manager self.config = self.CONFIG(options) @@ -106,7 +104,6 @@ def _process_results_filename(self, results_file_name): @staticmethod def _interp_nan_values(global_values, global_results): - global_results_clean = np.copy(global_results) n_vals = np.shape(global_values)[1] @@ -142,7 +139,6 @@ def _write_debug_data( write_csv, process_number, ): - if write_h5: fname_h5 = f"local_results_{process_number:03}.h5" self._write_output_to_h5( @@ -173,7 +169,6 @@ def _write_debug_data( ) def _write_outputs(self, output_dict, txt_options="metadata"): - self._write_output_to_h5(output_dict, self.config["h5_results_file_name"]) # We will also create a companion txt file by default which contains @@ -201,7 +196,6 @@ def _write_outputs(self, output_dict, txt_options="metadata"): pprint.pprint(my_dict, log_file) def _write_output_to_h5(self, output_dict, h5_results_file_name): - if self.config.h5_parent_group_name is None: # No parent groups exists, a new file will be created regardless f = h5py.File(h5_results_file_name, "w") @@ -242,7 +236,6 @@ def _write_to_csv( global_results_arr, process_number, ): - # Create the dataframe that is going to be written to a CSV global_save_data = np.hstack((global_values, global_results_arr)) @@ -297,7 +290,6 @@ def save_results( global_results_arr, process_number, ): - if process_number == self.parallel_manager.ROOT_PROCESS_RANK: if self.config["debugging_data_dir"] is not None: os.makedirs(self.config["debugging_data_dir"], exist_ok=True) diff --git a/watertap/tools/parameter_sweep/paramter_sweep_parallel_utils.py b/watertap/tools/parameter_sweep/paramter_sweep_parallel_utils.py new file mode 100644 index 0000000000..c5d890af89 --- /dev/null +++ b/watertap/tools/parameter_sweep/paramter_sweep_parallel_utils.py @@ -0,0 +1,241 @@ +import copy +import numpy as np + + +class _ParameterSweepParallelUtils: + @classmethod + def remove_unpicklable_state(cls, parameter_sweep_instance): + """ + Remove and return any state from the ParameterSweep object that cannot be + pickled, to make the instance picklable. Needed in order to use the + ConcurrentFuturesParallelManager. + """ + saved_state = { + "parallel_manager": parameter_sweep_instance.parallel_manager, + "writer": parameter_sweep_instance.writer, + } + + parameter_sweep_instance.parallel_manager = None + parameter_sweep_instance.writer = None + return saved_state + + @classmethod + def restore_unpicklable_state(cls, parameter_sweep_instance, state): + """ + Restore a collection of saved state that was removed in order to pickle + the ParameterSweep object. + """ + parameter_sweep_instance.parallel_manager = state.get("parallel_manager", None) + parameter_sweep_instance.writer = state.get("writer", None) + + """ + Combine all of the results retrieved from calling gather(). + - all_results is a list of Result objects, each representing the + parameters and results from running the optimization on one process. + """ + + def _combine_gather_results(self, all_results): + if len(all_results) == 0: + return None + + # create the output skeleton based on the first set of results + # we assume the results are in dict format + initial_results = all_results[0].results + + combined_results = copy.deepcopy(initial_results) + + # remove any lingering pyomo objects, and convert inner results to numpy arrays + for key, val in combined_results.items(): + if key != "solve_successful": + if isinstance(val, dict): + for subkey, subval in val.items(): + if "_pyo_obj" in subval: + del subval["_pyo_obj"] + + # for each result, concat the "value" array of results into the + # gathered results to combine them all + + # get length of data in first result for finding missing keys + total_chunk_length = len(all_results[0].results["solve_successful"]) + + for i, result in enumerate(all_results[1:]): + results = result.results + + for key, val in results.items(): + if key == "solve_successful": + combined_results[key] = np.append( + combined_results[key], copy.deepcopy(val) + ) + continue + if key == "nominal_idx" or key == "differential_idx": + combined_results[key] = np.append( + combined_results[key], copy.deepcopy(val) + ) + continue + # print("vall all results!", key, val) + for subkey, subval in val.items(): + # lets catch any keys that don' exist in result[0] and + # create empty array with expected length, after which we will add + # additional values, or add nan's instead + if subkey not in combined_results[key]: + # create empty array, as none of results so far had this key\ + + combined_results[key][subkey] = {} + for sub_subkey, value in subval.items(): + if sub_subkey == "value": + combined_results[key][subkey]["value"] = ( + np.zeros(total_chunk_length) * np.nan + ) + else: + combined_results[key][subkey][sub_subkey] = value + combined_results[key][subkey]["value"] = np.append( + combined_results[key][subkey]["value"], + copy.deepcopy( + subval["value"], + ), + ) + # keep track of our subchunk_length + sub_chunk_length = len(subval["value"]) + + # make sure we add any empty value to missing keys + + for subkey in combined_results[key]: + if subkey not in val.keys(): + empty_chunk = np.zeros(sub_chunk_length) * np.nan + combined_results[key][subkey]["value"] = np.append( + combined_results[key][subkey]["value"], empty_chunk + ) + total_chunk_length += sub_chunk_length + return combined_results + + """ + Build up a list of the outputs for each result of the optimization. + Returned as a list of lists, where each inner list is the results from + one process's run. + """ + + def _combine_output_array(self, gathered_results): + outputs = gathered_results["outputs"] + if len(outputs) == 0: + return [] + + # assume all output arrays have the same length + combined_outputs = [ + np.asarray([]) for _ in range(len(list(outputs.values())[0]["value"])) + ] + for _, output in outputs.items(): + for i in range(len(output["value"])): + combined_outputs[i] = np.append(combined_outputs[i], output["value"][i]) + return np.asarray(combined_outputs) + + """ + Build up a list of the sweep_inputs for each result of the optimization. + Returned as a list of lists, where each inner list is the results from + one process's run. + """ + + def _combine_input_array(self, gathered_results): + inputs = gathered_results["sweep_params"] + if len(inputs) == 0: + return [] + + # assume all output arrays have the same length + combined_inputs = [ + np.asarray([]) for _ in range(len(list(inputs.values())[0]["value"])) + ] + for _, inputv in inputs.items(): + for i in range(len(inputv["value"])): + combined_inputs[i] = np.append(combined_inputs[i], inputv["value"][i]) + return np.asarray(combined_inputs) + + """ + Use the embedded ParallelManager to fan out and then back in the results. + Args: + - build_model: a function for building the flowsheet model + - build_model_kwargs: any keyword args necessary for the build_model function + - build_sweep_params: a function for building the sweep parameters + - build_sweep_params_kwargs: any keyword args necessary for the build_sweep_params + function + - build_outputs: a function for building the outputs dictionary + - all_parameter_combinations: a list where each element represents the parameters + for a single local run + Returns: + - a list of LocalResults representing the results of the simulation runs + """ + + def run_scatter_gather(self, all_parameter_combinations, class_reference): + # save a reference to the parallel manager since it will be removed + # along with the other unpicklable state + parallel_manager = self.parallel_manager + saved_state = class_reference.remove_unpicklable_state(self) + + do_build_kwargs = {"param_sweep_instance": self} + + parallel_manager.scatter( + do_build, + do_build_kwargs, + do_execute, + all_parameter_combinations, + ) + + # gather the results and combine them into the format we want + all_results = parallel_manager.gather() + class_reference.restore_unpicklable_state(self, saved_state) + + return all_results + + +def do_build( + param_sweep_instance, +): + """ + Used to pass into the parallel manager to build the parameters necessary + for the sweep function. Defined at the top level so it's picklable. + """ + ps_config = param_sweep_instance.config + model = ps_config.build_model(**ps_config.build_model_kwargs) + sweep_params = ps_config.build_sweep_params( + model, **ps_config.build_sweep_params_kwargs + ) + sweep_params, sampling_type = param_sweep_instance._process_sweep_params( + sweep_params + ) + outputs = ps_config.build_outputs(model, **ps_config.build_outputs_kwargs) + # for when differential parameter tool is used + if hasattr(param_sweep_instance, "_define_differential_sweep_outputs"): + param_sweep_instance._define_differential_sweep_outputs(model, sweep_params) + + if outputs is not None: + param_sweep_instance.assign_variable_names(model, outputs) + + return [param_sweep_instance, model, sweep_params, outputs] + + +def do_execute( + local_combo_array, + param_sweep_instance, + model, + sweep_params, + outputs, +): + """ + Used to pass into the parallel manager in order to execute the sweep + for a set of local values. Defined at the top level so it's picklable. + """ + + if param_sweep_instance.config.custom_do_param_sweep is not None: + return param_sweep_instance.config.custom_do_param_sweep( + param_sweep_instance, sweep_params, outputs, local_combo_array + ) + + return param_sweep_instance._do_param_sweep( + sweep_params, outputs, local_combo_array + ) + + +def return_none(model, outputkeys=None): + """ + Used so that build_outputs=None is a valid usage of the parameter sweep tool + without requiring the user to wrap it in a function. + """ + return None diff --git a/watertap/tools/parameter_sweep/tests/test_differential_parameter_sweep.py b/watertap/tools/parameter_sweep/tests/test_differential_parameter_sweep.py index 3c2bb8f9b6..f304ff5920 100644 --- a/watertap/tools/parameter_sweep/tests/test_differential_parameter_sweep.py +++ b/watertap/tools/parameter_sweep/tests/test_differential_parameter_sweep.py @@ -37,6 +37,8 @@ ) import watertap.tools.MPI as MPI +from watertap.tools.parameter_sweep.model_manager import ModelManager + def build_none_outputs(model): return None @@ -72,11 +74,8 @@ def model(): @pytest.mark.component def test_check_differential_sweep_key_validity(model): m = model - A = m.fs.input["a"] B = m.fs.input["b"] - sweep_params = {A.name: (A, 0.1, 0.9, 3), B.name: (B, 0.0, 0.5, 3)} - differential_sweep_specs = { A.name: { "diff_mode": "sum", @@ -92,11 +91,14 @@ def test_check_differential_sweep_key_validity(model): "pyomo_object": m.fs.input["b"], }, } + build_spec = lambda model: differential_sweep_specs + sweep_params = {A.name: (A, 0.1, 0.9, 3), B.name: (B, 0.0, 0.5, 3)} + + ps = DifferentialParameterSweep(build_differential_sweep_specs=build_spec) - ps = DifferentialParameterSweep(differential_sweep_specs=differential_sweep_specs) sweep_params, _ = ps._process_sweep_params(sweep_params) ps.outputs = None - ps._check_differential_sweep_key_validity(sweep_params) + ps._check_differential_sweep_key_validity(differential_sweep_specs, sweep_params) assert ps.diff_spec_index == [0, 1] @@ -104,7 +106,6 @@ def test_check_differential_sweep_key_validity(model): @pytest.mark.component def test_create_differential_sweep_params_normal(model): m = model - differential_sweep_specs = { "fs.a": { "diff_sample_type": NormalSample, @@ -117,9 +118,12 @@ def test_create_differential_sweep_params_normal(model): "pyomo_object": m.fs.input["b"], }, } + build_spec = lambda model: differential_sweep_specs + ps = DifferentialParameterSweep(build_differential_sweep_specs=build_spec) + ps.model_manager = ModelManager(ps) + ps.model_manager.model = m - ps = DifferentialParameterSweep(differential_sweep_specs=differential_sweep_specs) - local_values = np.array([0.0, 1.0, 2.0]) + local_values = np.array([0.0, 0.0, 1.0, 2.0]) ps.diff_spec_index = [0, 1] diff_sweep_param_dict = ps._create_differential_sweep_params(local_values) @@ -137,7 +141,6 @@ def test_create_differential_sweep_params_normal(model): @pytest.mark.component def test_create_differential_sweep_params_sum_prod(model): m = model - differential_sweep_specs = { "fs.a": { "diff_mode": "sum", @@ -154,9 +157,11 @@ def test_create_differential_sweep_params_sum_prod(model): "pyomo_object": m.fs.input["b"], }, } - - ps = DifferentialParameterSweep(differential_sweep_specs=differential_sweep_specs) - local_values = np.array([0.1, 1.0, 2.0]) + build_spec = lambda model: differential_sweep_specs + ps = DifferentialParameterSweep(build_differential_sweep_specs=build_spec) + ps.model_manager = ModelManager(ps) + ps.model_manager.model = m + local_values = np.array([0.0, 0.1, 1.0, 2.0]) ps.diff_spec_index = [0, 1] diff_sweep_param_dict = ps._create_differential_sweep_params(local_values) @@ -174,7 +179,6 @@ def test_create_differential_sweep_params_sum_prod(model): @pytest.mark.component def test_create_differential_sweep_params_percentile(model): m = model - differential_sweep_specs = { "fs.b": { "diff_mode": "percentile", @@ -186,10 +190,11 @@ def test_create_differential_sweep_params_percentile(model): "pyomo_object": m.fs.input["b"], }, } - - ps = DifferentialParameterSweep(differential_sweep_specs=differential_sweep_specs) - local_values = np.array([0.1, 1.0, 2.0]) - + build_spec = lambda model: differential_sweep_specs + ps = DifferentialParameterSweep(build_differential_sweep_specs=build_spec) + ps.model_manager = ModelManager(ps) + ps.model_manager.model = m + local_values = np.array([0.0, 0.1, 1.0, 2.0]) ps.diff_spec_index = [0, 1] diff_sweep_param_dict = ps._create_differential_sweep_params(local_values) @@ -205,25 +210,21 @@ def test_create_differential_sweep_params_percentile(model): @pytest.mark.component def test_bad_differential_sweep_specs(model, tmp_path): m = model - differential_sweep_specs = { - "fs.a": { - "diff_mode": "sum", - "diff_sample_type": GeomSample, - "relative_lb": 0.01, - "relative_ub": 10.0, - "pyomo_object": m.fs.input["a"], - }, "fs.b": { - "diff_mode": "product", + "diff_mode": "percentile", "diff_sample_type": UniformSample, "relative_lb": 0.01, "relative_ub": 0.1, + "nominal_lb": 0.0, + "nominal_ub": 1.0, "pyomo_object": m.fs.input["b"], }, } - - ps = DifferentialParameterSweep(differential_sweep_specs=differential_sweep_specs) + build_spec = lambda model: differential_sweep_specs + ps = DifferentialParameterSweep(build_differential_sweep_specs=build_spec) + ps.model_manager = ModelManager(ps) + ps.model_manager.model = m with pytest.raises(ValueError): ps.parameter_sweep( build_model, @@ -254,18 +255,19 @@ def test_differential_sweep_outputs(model): } outputs = {"fs.output[c]": m.fs.output["c"]} - + build_spec = lambda model: differential_sweep_specs ps = DifferentialParameterSweep( optimize_function=_optimization, reinitialize_function=_reinitialize, reinitialize_kwargs={"slack_penalty": 10.0}, - differential_sweep_specs=differential_sweep_specs, + build_differential_sweep_specs=build_spec, ) - + ps.model_manager = ModelManager(ps) + ps.model_manager.model = m sweep_params, _ = ps._process_sweep_params(sweep_params) - - ps.outputs = outputs - ps._define_differential_sweep_outputs(sweep_params) + ps.config.differential_sweep_specs = build_spec(m) + ps.config.build_outputs = lambda model: outputs + ps._define_differential_sweep_outputs(model, sweep_params) # Finally test for the keys expected_keys = ["fs.output[c]", "fs.input[a]"] @@ -306,10 +308,10 @@ def test_differential_parameter_sweep(model, tmp_path): ps = DifferentialParameterSweep( csv_results_file_name=csv_results_file_name, h5_results_file_name=h5_results_file_name, - debugging_data_dir=tmp_path, + debugging_data_dir=None, # Does not work at the moment interpolate_nan_outputs=True, optimize_function=_optimization, - differential_sweep_specs=differential_sweep_specs, + build_differential_sweep_specs=lambda model: differential_sweep_specs, initialize_function=_reinitialize, number_of_subprocesses=1, ) @@ -740,14 +742,14 @@ def test_differential_parameter_sweep_selective(model, tmp_path): "pyomo_object": m.fs.input["b"], }, } - + build_spec = lambda model: differential_sweep_specs ps = DifferentialParameterSweep( csv_results_file_name=csv_results_file_name, h5_results_file_name=h5_results_file_name, - debugging_data_dir=tmp_path, + debugging_data_dir=None, # Does not work at the moment interpolate_nan_outputs=True, optimize_function=_optimization, - differential_sweep_specs=differential_sweep_specs, + build_differential_sweep_specs=build_spec, num_diff_samples=2, ) @@ -763,10 +765,11 @@ def test_differential_parameter_sweep_selective(model, tmp_path): "fs.slack[cd_slack]": m.fs.slack["cd_slack"], "fs.slack_penalty": m.fs.slack_penalty, } + build_outputs = lambda model: outputs _, global_results_dict = ps.parameter_sweep( m, sweep_params, - build_outputs=outputs, + build_outputs=build_outputs, seed=0, ) @@ -1329,7 +1332,7 @@ def test_differential_parameter_sweep_function(model, tmp_path): build_outputs=None, csv_results_file_name=csv_results_file_name, h5_results_file_name=h5_results_file_name, - debugging_data_dir=tmp_path, + debugging_data_dir=None, # Does not work at the moment interpolate_nan_outputs=True, optimize_function=_optimization, initialize_function=_reinitialize,