From e39bfdb6645fc1ec02d4122e8adfcd10defe32f4 Mon Sep 17 00:00:00 2001 From: bnb32 Date: Wed, 4 Oct 2023 11:56:24 -0700 Subject: [PATCH 1/3] exo data data structure classes --- sup3r/models/abstract.py | 107 +++------ sup3r/models/multi_step.py | 224 ++---------------- sup3r/pipeline/forward_pass.py | 22 +- .../data_handling/exogenous_data_handling.py | 175 +++++++++++++- tests/data_handling/test_exo_data_handling.py | 38 ++- tests/forward_pass/test_forward_pass_exo.py | 18 +- 6 files changed, 265 insertions(+), 319 deletions(-) diff --git a/sup3r/models/abstract.py b/sup3r/models/abstract.py index a2dc0264e..be744b452 100644 --- a/sup3r/models/abstract.py +++ b/sup3r/models/abstract.py @@ -21,6 +21,7 @@ from tensorflow.keras import optimizers import sup3r.utilities.loss_metrics +from sup3r.preprocessing.data_handling.exogenous_data_handling import ExoData from sup3r.utilities import VERSION_RECORD logger = logging.getLogger(__name__) @@ -223,14 +224,11 @@ def _combine_fwp_input(self, low_res, exogenous_data=None): Low-resolution input data, usually a 4D or 5D array of shape: (n_obs, spatial_1, spatial_2, n_features) (n_obs, spatial_1, spatial_2, n_temporal, n_features) - exogenous_data : dict | None - Dictionary of exogenous feature data with entries describing - whether features should be combined at input, a mid network layer, - or with output. This doesn't have to include the 'model' key since - this data is for a single step model. e.g. - {'topography': {'steps': [ - {'combine_type': 'input', 'data': ..., 'resolution': ...}, - {'combine_type': 'layer', 'data': ..., 'resolution': ...}]}} + exogenous_data : ExoData | None + Special dictionary (class:`ExoData`) of exogenous feature data with + entries describing whether features should be combined at input, a + mid network layer, or with output. This doesn't have to include + the 'model' key since this data is for a single step model. Returns ------- @@ -253,14 +251,10 @@ def _combine_fwp_input(self, low_res, exogenous_data=None): assert all(feature in exogenous_data for feature in exo_feats), msg if exogenous_data is not None and fnum_diff > 0: for feature in exo_feats: - entry = exogenous_data[feature] - combine_types = [step['combine_type'] - for step in entry['steps']] - if 'input' in combine_types: - idx = combine_types.index('input') - low_res = np.concatenate((low_res, - entry['steps'][idx]['data']), - axis=-1) + exo_input = exogenous_data.get_combine_type_data( + feature, 'input') + if exo_input is not None: + low_res = np.concatenate((low_res, exo_input), axis=-1) return low_res def _combine_fwp_output(self, hi_res, exogenous_data=None): @@ -274,13 +268,10 @@ def _combine_fwp_output(self, hi_res, exogenous_data=None): (n_obs, spatial_1, spatial_2, n_features) (n_obs, spatial_1, spatial_2, n_temporal, n_features) exogenous_data : dict | None - Dictionary of exogenous feature data with entries describing - whether features should be combined at input, a mid network layer, - or with output. This doesn't have to include the 'model' key since - this data is for a single step model. e.g. - {'topography': {'steps': [ - {'combine_type': 'input', 'data': ..., 'resolution': ...}, - {'combine_type': 'layer', 'data': ..., 'resolution': ...}]}} + Special dictionary (class:`ExoData`) of exogenous feature data with + entries describing whether features should be combined at input, a + mid network layer, or with output. This doesn't have to include + the 'model' key since this data is for a single step model. Returns ------- @@ -303,14 +294,10 @@ def _combine_fwp_output(self, hi_res, exogenous_data=None): assert all(feature in exogenous_data for feature in exo_feats), msg if exogenous_data is not None and fnum_diff > 0: for feature in exo_feats: - entry = exogenous_data[feature] - combine_types = [step['combine_type'] - for step in entry['steps']] - if 'output' in combine_types: - idx = combine_types.index('output') - hi_res = np.concatenate((hi_res, - entry['steps'][idx]['data']), - axis=-1) + exo_output = exogenous_data.get_combine_type_data( + feature, 'output') + if exo_output is not None: + hi_res = np.concatenate((hi_res, exo_output), axis=-1) return hi_res def _combine_loss_input(self, high_res_true, high_res_gen): @@ -1237,39 +1224,6 @@ def _reshape_norm_exo(self, hi_res, hi_res_exo, exo_name, norm_in=True): return hi_res_exo - def _get_layer_exo_input(self, layer_name, exogenous_data): - """Get the high-resolution exo data for the given layer name from the - full exogenous_data dictionary. - - Parameters - ---------- - layer_name : str - Name of Sup3rAdder or Sup3rConcat layer. This should match a - feature key in exogenous_data - exogenous_data : dict | None - Dictionary of exogenous feature data with entries describing - whether features should be combined at input, a mid network layer, - or with output. This doesn't have to include the 'model' key since - this data is for a single step model. e.g. - {'topography': {'steps': [ - {'combine_type': 'input', 'data': ..., 'resolution': ...}, - {'combine_type': 'layer', 'data': ..., 'resolution': ...}]}} - - """ - msg = (f'layer.name = {layer_name} does not match any ' - 'features in exogenous_data ' - f'({list(exogenous_data)})') - assert layer_name in exogenous_data, msg - steps = exogenous_data[layer_name]['steps'] - combine_types = [step['combine_type'] for step in steps] - msg = ('Received exogenous_data without any combine_type ' - '= "layer" steps, for a model with an Adder/Concat ' - 'layer.') - assert 'layer' in combine_types, msg - idx = combine_types.index('layer') - hi_res_exo = steps[idx]['data'] - return hi_res_exo - def generate(self, low_res, norm_in=True, @@ -1292,14 +1246,11 @@ def generate(self, un_norm_out : bool Flag to un-normalize synthetically generated output data to physical units - exogenous_data : dict | None - Dictionary of exogenous feature data with entries describing - whether features should be combined at input, a mid network layer, - or with output. This doesn't have to include the 'model' key since - this data is for a single step model. e.g. - {'topography': {'steps': [ - {'combine_type': 'input', 'data': ..., 'resolution': ...}, - {'combine_type': 'layer', 'data': ..., 'resolution': ...}]}} + exogenous_data : dict | ExoData | None + Special dictionary (class:`ExoData`) of exogenous feature data with + entries describing whether features should be combined at input, a + mid network layer, or with output. This doesn't have to include + the 'model' key since this data is for a single step model. Returns ------- @@ -1309,6 +1260,10 @@ def generate(self, (n_obs, spatial_1, spatial_2, n_features) (n_obs, spatial_1, spatial_2, n_temporal, n_features) """ + if (isinstance(exogenous_data, dict) + and not isinstance(exogenous_data, ExoData)): + exogenous_data = ExoData(exogenous_data) + low_res = self._combine_fwp_input(low_res, exogenous_data) if norm_in and self._means is not None: low_res = self.norm_input(low_res) @@ -1317,8 +1272,12 @@ def generate(self, for i, layer in enumerate(self.generator.layers[1:]): try: if isinstance(layer, (Sup3rAdder, Sup3rConcat)): - hi_res_exo = self._get_layer_exo_input(layer.name, - exogenous_data) + msg = (f'layer.name = {layer.name} does not match any ' + 'features in exogenous_data ' + f'({list(exogenous_data)})') + assert layer.name in exogenous_data, msg + hi_res_exo = exogenous_data.get_combine_type_data( + layer.name, 'layer') hi_res_exo = self._reshape_norm_exo(hi_res, hi_res_exo, layer.name, diff --git a/sup3r/models/multi_step.py b/sup3r/models/multi_step.py index c76fd47df..28c0ffdcd 100644 --- a/sup3r/models/multi_step.py +++ b/sup3r/models/multi_step.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- """Sup3r multi step model frameworks""" -import copy import json import logging import os @@ -11,6 +10,7 @@ import sup3r.models from sup3r.models.abstract import AbstractInterface from sup3r.models.base import Sup3rGan +from sup3r.preprocessing.data_handling.exogenous_data_handling import ExoData logger = logging.getLogger(__name__) @@ -112,43 +112,6 @@ def seed(s=0): """ Sup3rGan.seed(s=s) - def _get_model_step_exo(self, model_step, exogenous_data=None): - """Get the exogenous data for the given model_step from the full - exogenous data dictionary - - Parameters - ---------- - model_step : int - Index of the model to get exogenous data for. - exogenous_data : dict - Dictionary of exogenous feature data with entries describing - whether features should be combined at input, a mid network layer, - or with output. e.g. - {'topography': {'steps': [ - {'combine_type': 'input', 'model': 0, 'data': ..., - 'resolution': ...}, - {'combine_type': 'layer', 'model': 0, 'data': ..., - 'resolution': ...}]}} - Each array in in 'data' key has 3D or 4D shape: - (spatial_1, spatial_2, 1) - (spatial_1, spatial_2, n_temporal, 1) - - Returns - ------- - exogenous_data : dict - Same as input dictionary but with only entries with 'model': - model_step - """ - model_step_exo = None - if exogenous_data is not None: - model_step_exo = {} - for feature in exogenous_data: - steps = [step for step in exogenous_data[feature]['steps'] - if step['model'] == model_step] - if steps: - model_step_exo[feature] = {'steps': steps} - return model_step_exo - def _transpose_model_input(self, model, hi_res): """Transpose input data according to mdel input dimensions. @@ -206,18 +169,10 @@ def generate(self, low_res, norm_in=True, un_norm_out=True, un_norm_out : bool Flag to un-normalize synthetically generated output data to physical units - exogenous_data : dict - Dictionary of exogenous feature data with entries describing - whether features should be combined at input, a mid network layer, - or with output. e.g. - {'topography': {'steps': [ - {'combine_type': 'input', 'model': 0, 'data': ..., - 'resolution': ...}, - {'combine_type': 'layer', 'model': 0, 'data': ..., - 'resolution': ...}]}} - Each array in in 'data' key has 3D or 4D shape: - (spatial_1, spatial_2, 1) - (spatial_1, spatial_2, n_temporal, 1) + exogenous_data : ExoData + class:`ExoData` object, which is a special dictionary containing + exogenous data for each model step and info about how to use the + data at each step. Returns ------- @@ -227,16 +182,20 @@ def generate(self, low_res, norm_in=True, un_norm_out=True, (n_obs, spatial_1, spatial_2, n_features) (n_obs, spatial_1, spatial_2, n_temporal, n_features) """ + if (isinstance(exogenous_data, dict) + and not isinstance(exogenous_data, ExoData)): + exogenous_data = ExoData(exogenous_data) + hi_res = low_res.copy() for i, model in enumerate(self.models): - # pylint: disable=R1719 i_norm_in = False if (i == 0 and not norm_in) else True i_un_norm_out = (False if (i + 1 == len(self.models) and not un_norm_out) else True) - i_exo_data = self._get_model_step_exo(i, exogenous_data) + i_exo_data = (None if exogenous_data is None + else exogenous_data.get_model_step_exo(i)) try: hi_res = self._transpose_model_input(model, hi_res) @@ -299,133 +258,6 @@ def model_params(self): return tuple(model.model_params for model in self.models) -class SpatialThenTemporalBase(MultiStepGan): - """A base class for spatial-then-temporal or temporal-then-spatial multi - step GANs - """ - - def __init__(self, spatial_models, temporal_models): - """ - Parameters - ---------- - spatial_models : MultiStepGan - A loaded MultiStepGan object representing the one or more spatial - super resolution steps in this composite SpatialThenTemporal model - temporal_models : MultiStepGan - A loaded MultiStepGan object representing the single temporal - enhancement model in this composite SpatialThenTemporal model - """ - self._spatial_models = spatial_models - self._temporal_models = temporal_models - - @property - def spatial_models(self): - """Get the MultiStepGan object for the spatial-only model(s) - - Returns - ------- - MultiStepGan - """ - return self._spatial_models - - @property - def temporal_models(self): - """Get the MultiStepGan object for the (spatio)temporal model(s) - - Returns - ------- - MultiStepGan - """ - return self._temporal_models - - @classmethod - def load(cls, spatial_model_dirs, temporal_model_dirs, verbose=True): - """Load the GANs with its sub-networks from a previously saved-to - output directory. - - Parameters - ---------- - spatial_model_dirs : str | list | tuple - An ordered list/tuple of one or more directories containing trained - + saved Sup3rGan models created using the Sup3rGan.save() method. - This must contain only spatial models that input/output 4D - tensors. - temporal_model_dirs : str | list | tuple - An ordered list/tuple of one or more directories containing trained - + saved Sup3rGan models created using the Sup3rGan.save() method. - This must contain only (spatio)temporal models that input/output 5D - tensors. - verbose : bool - Flag to log information about the loaded model. - - Returns - ------- - out : MultiStepGan - Returns a pretrained gan model that was previously saved to - model_dirs - """ - if isinstance(spatial_model_dirs, str): - spatial_model_dirs = [spatial_model_dirs] - if isinstance(temporal_model_dirs, str): - temporal_model_dirs = [temporal_model_dirs] - - s_models = MultiStepGan.load(spatial_model_dirs, verbose=verbose) - t_models = MultiStepGan.load(temporal_model_dirs, verbose=verbose) - - return cls(s_models, t_models) - - def _split_exo_dict(self, split_step, exogenous_data=None): - """Split exogenous_data into two dicts based on split_step. The first - dict has only model steps less than split_step. The second dict has - only model steps greater than or equal to split_step. - - Parameters - ---------- - split_step : int - Step index to use for splitting. If this is for a - SpatialThenTemporal model split_step should be len(spatial_models). - If this is for a TemporalThenSpatial model split_step should be - len(temporal_models). - exogenous_data : dict - Dictionary of exogenous feature data with entries describing - whether features should be combined at input, a mid network layer, - or with output. e.g. - {'topography': {'steps': [ - {'combine_type': 'input', 'model': 0, 'data': ..., - 'resolution': ...}, - {'combine_type': 'layer', 'model': 0, 'data': ..., - 'resolution': ...}]}} - Each array in in 'data' key has 3D or 4D shape: - (spatial_1, spatial_2, 1) - (spatial_1, spatial_2, n_temporal, 1) - - Returns - ------- - split_exo_1 : dict - Same as input dictionary but with only entries with 'model': - model_step where model_step is less than split_step - split_exo_2 : dict - Same as input dictionary but with only entries with 'model': - model_step where model_step is greater than or equal to split_step - """ - split_exo_1 = {} - split_exo_2 = {} - if exogenous_data is not None: - exo_data = copy.deepcopy(exogenous_data) - for feature in exo_data: - steps = [step for step in exo_data[feature]['steps'] - if step['model'] < split_step] - if steps: - split_exo_1[feature] = {'steps': steps} - steps = [step for step in exo_data[feature]['steps'] - if step['model'] >= split_step] - for step in steps: - step.update({'model': step['model'] - split_step}) - if steps: - split_exo_2[feature] = {'steps': steps} - return split_exo_1, split_exo_2 - - class MultiStepSurfaceMetGan(MultiStepGan): """A two-step GAN where the first step is a spatial-only enhancement on a 4D tensor of near-surface temperature and relative humidity data, and the @@ -545,7 +377,7 @@ def load(cls, surface_model_class='SurfaceSpatialMetModel', return cls([*s_models, *t_models]) -class SolarMultiStepGan(SpatialThenTemporalBase): +class SolarMultiStepGan(MultiStepGan): """Special multi step model for solar clearsky ratio super resolution. This model takes in two parallel models for wind-only and solar-only @@ -632,18 +464,6 @@ def preflight(self): .format(missing, spatial_out_features)) assert not any(missing), msg - @property - def spatial_models(self): - """Alias for spatial_solar_models to preserve MultiStepGan - interface.""" - return self.spatial_solar_models - - @property - def temporal_models(self): - """Alias for temporal_solar_models to preserve MultiStepGan - interface.""" - return self.temporal_solar_models - @property def spatial_solar_models(self): """Get the MultiStepGan object for the spatial-only solar model(s) @@ -748,14 +568,9 @@ def generate(self, low_res, norm_in=True, un_norm_out=True, un_norm_out : bool Flag to un-normalize synthetically generated output data to physical units - exogenous_data : list - List of arrays of exogenous_data with length equal to the - number of model steps. e.g. If we want to include topography as - an exogenous feature in a spatial + temporal multistep model then - we need to provide a list of length=2 with topography at the low - spatial resolution and at the high resolution. If we include more - than one exogenous feature the ordering must be consistent. - Each array in the list has 3D or 4D shape: + exogenous_data : ExoData + class:`ExoData` object with data arrays for each exogenous data + step. Each array has 3D or 4D shape: (spatial_1, spatial_2, n_features) (temporal, spatial_1, spatial_2, n_features) It's assumed that the spatial_solar_models do not require @@ -772,9 +587,12 @@ def generate(self, low_res, norm_in=True, un_norm_out=True, logger.debug('Data input to the SolarMultiStepGan has shape {} which ' 'will be split up for solar- and wind-only features.' .format(low_res.shape)) - s_exo, t_exo = self._split_exo_dict( - split_step=len(self.spatial_models), - exogenous_data=exogenous_data) + if exogenous_data is not None: + s_exo, t_exo = exogenous_data.split_exo_dict( + split_step=len(self.spatial_solar_models)) + else: + s_exo = t_exo = None + try: hi_res_wind = self.spatial_wind_models.generate( low_res[..., self.idf_wind], diff --git a/sup3r/pipeline/forward_pass.py b/sup3r/pipeline/forward_pass.py index 1cd24aa96..bcd5149fd 100644 --- a/sup3r/pipeline/forward_pass.py +++ b/sup3r/pipeline/forward_pass.py @@ -25,8 +25,11 @@ OutputHandlerH5, OutputHandlerNC, ) -from sup3r.preprocessing.data_handling import ExogenousDataHandler from sup3r.preprocessing.data_handling.base import InputMixIn +from sup3r.preprocessing.data_handling.exogenous_data_handling import ( + ExoData, + ExogenousDataHandler, +) from sup3r.utilities import ModuleName from sup3r.utilities.cli import BaseCLI from sup3r.utilities.execution import DistributedProcess @@ -1121,13 +1124,13 @@ def load_exo_data(self): Returns ------- - exo_data : dict - Same as exo_kwargs dictionary with data arrays added to a 'data' - key for each feature + exo_data : ExoData + class:`ExoData` object composed of multiple + class:`SingleExoDataStep` objects. """ + data = [] exo_data = None if self.exo_kwargs: - exo_data = self.exo_kwargs.copy() self.features = [f for f in self.features if f not in self.exo_features] for feature in self.exo_features: @@ -1141,13 +1144,8 @@ def load_exo_data(self): sig = signature(ExogenousDataHandler) exo_kwargs = {k: v for k, v in exo_kwargs.items() if k in sig.parameters} - data = ExogenousDataHandler(**exo_kwargs).data - for i, _ in enumerate(exo_kwargs['steps']): - exo_data[feature]['steps'][i]['data'] = data[i] - shapes = [None if d is None else d.shape for d in data] - logger.info( - 'Got exogenous_data of length {} with shapes: {}'.format( - len(data), shapes)) + data += ExogenousDataHandler(**exo_kwargs).data + exo_data = ExoData(data) return exo_data def update_input_handler_kwargs(self, strategy): diff --git a/sup3r/preprocessing/data_handling/exogenous_data_handling.py b/sup3r/preprocessing/data_handling/exogenous_data_handling.py index 41791006a..4fa676356 100644 --- a/sup3r/preprocessing/data_handling/exogenous_data_handling.py +++ b/sup3r/preprocessing/data_handling/exogenous_data_handling.py @@ -19,6 +19,172 @@ logger = logging.getLogger(__name__) +class SingleExoDataStep(dict): + """Special dictionary class for exogenous_data step""" + + def __init__(self, feature, combine_type, model, data): + """exogenous_data step dictionary for a given model step + + Parameters + ---------- + feature : str + Name of feature corresponding to `data`. + combine_type : str + Specifies how the exogenous_data should be used for this step. e.g. + "input", "layer", "output". For example, if tis equals "input" the + `data` will be used as input to the forward pass for the model step + given by `model` + model : int + Specifies the model index which will use the `data`. For example, + if `model` == 1 then the `data` will be used according to + `combine_type` in the 2nd model step in a MultiStepGan. + data : tf.Tensor | np.ndarray + The data to be used for the given model step. + """ + step = {'model': model, 'combine_type': combine_type, 'data': data} + for k, v in step.items(): + self.__setitem__(k, v) + self.feature = feature + + @property + def shape(self): + """Shape of data array for this model step.""" + return self['data'].shape + + +class ExoData(dict): + """Special dictionary class for multiple exogenous_data steps""" + + def __init__(self, steps): + """Combine multiple SingleExoDataStep objects + + Parameters + ---------- + steps : list | dict + List of SingleExoDataStep objects or a feature dictionary with list + of steps for each feature + """ + if isinstance(steps, list): + for step in steps: + self.append(step.feature, step) + elif isinstance(steps, dict): + for k, v in steps.items(): + self.__setitem__(k, v) + else: + msg = ('ExoData must be initialized with a dictionary of features ' + 'or list of SingleExoDataStep objects.') + logger.error(msg) + raise ValueError(msg) + + def append(self, feature, step): + """Append steps list for given feature""" + tmp = self.get(feature, {'steps': []}) + tmp['steps'].append(step) + self[feature] = tmp + + def get_model_step_exo(self, model_step): + """Get the exogenous data for the given model_step from the full list + of steps + + Parameters + ---------- + model_step : int + Index of the model to get exogenous data for. + + Returns + ------- + model_step_exo : dict + Dictionary of features each with list of steps which match the + given model_step + """ + model_step_exo = {} + for feature, entry in self.items(): + steps = [step for step in entry['steps'] + if step['model'] == model_step] + if steps: + model_step_exo[feature] = {'steps': steps} + return ExoData(model_step_exo) + + def split_exo_dict(self, split_step): + """Split exogenous_data into two dicts based on split_step. The first + dict has only model steps less than split_step. The second dict has + only model steps greater than or equal to split_step. + + Parameters + ---------- + split_step : int + Step index to use for splitting. To split this into exo data for + spatial models and temporal models split_step should be + len(spatial_models). If this is for a TemporalThenSpatial model + split_step should be len(temporal_models). + exogenous_data : dict + Dictionary of exogenous feature data with entries describing + whether features should be combined at input, a mid network layer, + or with output. e.g. + {'topography': {'steps': [ + {'combine_type': 'input', 'model': 0, 'data': ..., + 'resolution': ...}, + {'combine_type': 'layer', 'model': 0, 'data': ..., + 'resolution': ...}]}} + Each array in in 'data' key has 3D or 4D shape: + (spatial_1, spatial_2, 1) + (spatial_1, spatial_2, n_temporal, 1) + + Returns + ------- + split_exo_1 : dict + Same as input dictionary but with only entries with 'model': + model_step where model_step is less than split_step + split_exo_2 : dict + Same as input dictionary but with only entries with 'model': + model_step where model_step is greater than or equal to split_step + """ + split_exo_1 = {} + split_exo_2 = {} + for feature, entry in self.items(): + steps = [step for step in entry['steps'] + if step['model'] < split_step] + if steps: + split_exo_1[feature] = {'steps': steps} + steps = [step for step in entry['steps'] + if step['model'] >= split_step] + for step in steps: + step.update({'model': step['model'] - split_step}) + if steps: + split_exo_2[feature] = {'steps': steps} + return ExoData(split_exo_1), ExoData(split_exo_2) + + def get_combine_type_data(self, feature, combine_type, model_step=None): + """Get exogenous data for given feature which is used according to the + given combine_type (input/output/layer) for this model_step. + + Parameters + ---------- + feature : str + Name of exogenous feature to get data for + combine_type : str + Usage type for requested data. e.g input/output/layer + model_step : int | None + Model step the data will be used for. If this is not None then + only steps with self[feature]['steps'][:]['model'] == model_step + will be searched for data. + + Returns + ------- + data : tf.Tensor | np.ndarray + Exogenous data for given parameters + """ + tmp = self[feature] + if model_step is not None: + tmp = {k: v for k, v in tmp.items() if v['model'] == model_step} + combine_types = [step['combine_type'] for step in tmp['steps']] + msg = ('Received exogenous_data without any combine_type ' + f'= "{combine_type}" steps') + assert combine_type in combine_types, msg + idx = combine_types.index(combine_type) + return tmp['steps'][idx]['data'] + + class ExogenousDataHandler: """Class to extract exogenous features for multistep forward passes. e.g. Multiple topography arrays at different resolutions for multiple spatial @@ -173,11 +339,18 @@ def __init__(self, t_enhance=t_enhance, s_agg_factor=s_agg_factor, t_agg_factor=t_agg_factor) - self.data.append(data) + step = SingleExoDataStep(feature, steps[i]['combine_type'], + steps[i]['model'], data) + self.data.append(step) else: msg = (f"Can only extract {list(self.AVAILABLE_HANDLERS)}." f" Received {feature}.") raise NotImplementedError(msg) + shapes = [None if d is None else d['data'].shape + for d in self.data] + logger.info( + 'Got exogenous_data of length {} with shapes: {}'.format( + len(self.data), shapes)) def input_check(self): """Make sure agg factors are provided or exo_resolution and models are diff --git a/tests/data_handling/test_exo_data_handling.py b/tests/data_handling/test_exo_data_handling.py index 93091733c..2dda35883 100644 --- a/tests/data_handling/test_exo_data_handling.py +++ b/tests/data_handling/test_exo_data_handling.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """pytests for exogenous data handling""" import os -import shutil +from tempfile import TemporaryDirectory import numpy as np import pytest @@ -33,38 +33,30 @@ def test_exo_cache(feature): steps.append({'s_enhance': s_en, 't_enhance': t_en, 's_agg_factor': s_agg, - 't_agg_factor': t_agg}) - try: + 't_agg_factor': t_agg, + 'combine_type': 'input', + 'model': 0}) + with TemporaryDirectory() as td: base = ExogenousDataHandler(FILE_PATHS, feature, source_file=FP_WTK, steps=steps, target=TARGET, shape=SHAPE, - input_handler='DataHandlerNCforCC') + input_handler='DataHandlerNCforCC', + cache_dir=os.path.join(td, 'exo_cache')) for i, arr in enumerate(base.data): assert arr.shape[0] == SHAPE[0] * S_ENHANCE[i] assert arr.shape[1] == SHAPE[1] * S_ENHANCE[i] - except Exception as e: - if os.path.exists('./exo_cache/'): - shutil.rmtree('./exo_cache/') - raise e - else: - assert os.path.exists('./exo_cache/') - assert len(os.listdir('./exo_cache')) == 2 - # load cached data - try: + assert len(os.listdir(f'{td}/exo_cache')) == 2 + + # load cached data cache = ExogenousDataHandler(FILE_PATHS, feature, source_file=FP_WTK, steps=steps, target=TARGET, shape=SHAPE, - input_handler='DataHandlerNCforCC') - except Exception as e: - if os.path.exists('./exo_cache/'): - shutil.rmtree('./exo_cache/') - raise e - else: - assert os.path.exists('./exo_cache/') - assert len(os.listdir('./exo_cache')) == 2 + input_handler='DataHandlerNCforCC', + cache_dir=os.path.join(td, 'exo_cache')) + assert len(os.listdir(f'{td}/exo_cache')) == 2 + for arr1, arr2 in zip(base.data, cache.data): - assert np.allclose(arr1, arr2) - shutil.rmtree('./exo_cache/') + assert np.allclose(arr1['data'], arr2['data']) diff --git a/tests/forward_pass/test_forward_pass_exo.py b/tests/forward_pass/test_forward_pass_exo.py index 1bf24b286..a282ccd28 100644 --- a/tests/forward_pass/test_forward_pass_exo.py +++ b/tests/forward_pass/test_forward_pass_exo.py @@ -449,7 +449,8 @@ def test_fwp_single_step_wind_hi_res_topo(plot=False): 'steps': [ {'model': 0, 'combine_type': 'layer', 'data': np.random.rand(4, 20, 20, 12, 1)}]}} - _ = model.generate(np.random.rand(4, 10, 10, 6, 3), exogenous_data=exo_tmp) + _ = model.generate(np.random.rand(4, 10, 10, 6, 3), + exogenous_data=exo_tmp) with tempfile.TemporaryDirectory() as td: input_files = make_fake_nc_files(td, INPUT_FILE, 8) @@ -592,7 +593,8 @@ def test_fwp_multi_step_wind_hi_res_topo(): 'steps': [ {'model': 0, 'combine_type': 'layer', 'data': np.random.rand(4, 20, 20, 1)}]}} - _ = s1_model.generate(np.ones((4, 10, 10, 3)), exogenous_data=exo_tmp) + _ = s1_model.generate(np.ones((4, 10, 10, 3)), + exogenous_data=exo_tmp) s2_model = Sup3rGan(gen_model, fp_disc, learning_rate=1e-4) s2_model.meta['training_features'] = ['U_100m', 'V_100m', 'topography'] @@ -601,7 +603,8 @@ def test_fwp_multi_step_wind_hi_res_topo(): s2_model.meta['t_enhance'] = 1 s2_model.meta['input_resolution'] = {'spatial': '24km', 'temporal': '60min'} - _ = s2_model.generate(np.ones((4, 10, 10, 3)), exogenous_data=exo_tmp) + _ = s2_model.generate(np.ones((4, 10, 10, 3)), + exogenous_data=exo_tmp) fp_gen = os.path.join(CONFIG_DIR, 'spatiotemporal/gen_3x_4x_2f.json') fp_disc = os.path.join(CONFIG_DIR, 'spatiotemporal/disc.json') @@ -1095,7 +1098,8 @@ def test_fwp_multi_step_exo_hi_res_topo_and_sza(): 'steps': [{'model': 0, 'combine_type': 'layer', 'data': np.ones((4, 20, 20, 1))}]} } - _ = s1_model.generate(np.ones((4, 10, 10, 4)), exogenous_data=exo_tmp) + _ = s1_model.generate(np.ones((4, 10, 10, 4)), + exogenous_data=exo_tmp) s2_model = Sup3rGan(gen_s_model, fp_disc, learning_rate=1e-4) s2_model.meta['training_features'] = [ @@ -1106,7 +1110,8 @@ def test_fwp_multi_step_exo_hi_res_topo_and_sza(): s2_model.meta['t_enhance'] = 1 s2_model.meta['input_resolution'] = {'spatial': '24km', 'temporal': '60min'} - _ = s2_model.generate(np.ones((4, 10, 10, 4)), exogenous_data=exo_tmp) + _ = s2_model.generate(np.ones((4, 10, 10, 4)), + exogenous_data=exo_tmp) fp_disc = os.path.join(CONFIG_DIR, 'spatiotemporal/disc.json') st_model = Sup3rGan(gen_t_model, fp_disc, learning_rate=1e-4) @@ -1123,7 +1128,8 @@ def test_fwp_multi_step_exo_hi_res_topo_and_sza(): 'steps': [{'model': 0, 'combine_type': 'layer', 'data': np.ones((4, 30, 30, 12, 1))}]} } - _ = st_model.generate(np.ones((4, 10, 10, 6, 3)), exogenous_data=exo_tmp) + _ = st_model.generate(np.ones((4, 10, 10, 6, 3)), + exogenous_data=exo_tmp) with tempfile.TemporaryDirectory() as td: input_files = make_fake_nc_files(td, INPUT_FILE, 8) From 40155ae70d78a23b5c6f697514967b9e4698139c Mon Sep 17 00:00:00 2001 From: bnb32 Date: Tue, 10 Oct 2023 08:56:37 -0600 Subject: [PATCH 2/3] log=False for tests --- tests/data_handling/test_dual_data_handling.py | 2 +- tests/pipeline/test_cli.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/data_handling/test_dual_data_handling.py b/tests/data_handling/test_dual_data_handling.py index fe42324a7..248954673 100644 --- a/tests/data_handling/test_dual_data_handling.py +++ b/tests/data_handling/test_dual_data_handling.py @@ -132,7 +132,7 @@ def test_regrid_caching(log=False, assert np.array_equal(old_dh.hr_data, new_dh.hr_data) -def test_regrid_caching_in_steps(log=True, +def test_regrid_caching_in_steps(log=False, full_shape=(20, 20), sample_shape=(10, 10, 1)): """Test caching and loading of regridded data""" diff --git a/tests/pipeline/test_cli.py b/tests/pipeline/test_cli.py index 547ae1c74..29e9fbc95 100644 --- a/tests/pipeline/test_cli.py +++ b/tests/pipeline/test_cli.py @@ -288,7 +288,7 @@ def test_data_extract_cli(runner): assert len(glob.glob(f'{log_file}')) == 1 -def test_pipeline_fwp_qa(runner, log=True): +def test_pipeline_fwp_qa(runner, log=False): """Test the sup3r pipeline with Forward Pass and QA modules via pipeline cli""" From 53d8bc7f337ebc6b5208e344c15c90bd8edf54b7 Mon Sep 17 00:00:00 2001 From: bnb32 Date: Thu, 12 Oct 2023 10:09:35 -0600 Subject: [PATCH 3/3] PR updates --- sup3r/models/abstract.py | 16 +++++-- sup3r/pipeline/forward_pass.py | 4 +- .../data_handling/exogenous_data_handling.py | 45 ++++++++----------- tests/data_handling/test_exo_data_handling.py | 5 ++- 4 files changed, 36 insertions(+), 34 deletions(-) diff --git a/sup3r/models/abstract.py b/sup3r/models/abstract.py index be744b452..de3f7807a 100644 --- a/sup3r/models/abstract.py +++ b/sup3r/models/abstract.py @@ -224,7 +224,7 @@ def _combine_fwp_input(self, low_res, exogenous_data=None): Low-resolution input data, usually a 4D or 5D array of shape: (n_obs, spatial_1, spatial_2, n_features) (n_obs, spatial_1, spatial_2, n_temporal, n_features) - exogenous_data : ExoData | None + exogenous_data : dict | ExoData | None Special dictionary (class:`ExoData`) of exogenous feature data with entries describing whether features should be combined at input, a mid network layer, or with output. This doesn't have to include @@ -241,6 +241,10 @@ def _combine_fwp_input(self, low_res, exogenous_data=None): if exogenous_data is None: return low_res + if (not isinstance(exogenous_data, ExoData) + and exogenous_data is not None): + exogenous_data = ExoData(exogenous_data) + training_features = ([] if self.training_features is None else self.training_features) fnum_diff = len(training_features) - low_res.shape[-1] @@ -267,7 +271,7 @@ def _combine_fwp_output(self, hi_res, exogenous_data=None): High-resolution output data, usually a 4D or 5D array of shape: (n_obs, spatial_1, spatial_2, n_features) (n_obs, spatial_1, spatial_2, n_temporal, n_features) - exogenous_data : dict | None + exogenous_data : dict | ExoData | None Special dictionary (class:`ExoData`) of exogenous feature data with entries describing whether features should be combined at input, a mid network layer, or with output. This doesn't have to include @@ -284,6 +288,10 @@ def _combine_fwp_output(self, hi_res, exogenous_data=None): if exogenous_data is None: return hi_res + if (not isinstance(exogenous_data, ExoData) + and exogenous_data is not None): + exogenous_data = ExoData(exogenous_data) + output_features = ([] if self.output_features is None else self.output_features) fnum_diff = len(output_features) - hi_res.shape[-1] @@ -1260,8 +1268,8 @@ def generate(self, (n_obs, spatial_1, spatial_2, n_features) (n_obs, spatial_1, spatial_2, n_temporal, n_features) """ - if (isinstance(exogenous_data, dict) - and not isinstance(exogenous_data, ExoData)): + if (not isinstance(exogenous_data, ExoData) + and exogenous_data is not None): exogenous_data = ExoData(exogenous_data) low_res = self._combine_fwp_input(low_res, exogenous_data) diff --git a/sup3r/pipeline/forward_pass.py b/sup3r/pipeline/forward_pass.py index bcd5149fd..63dec38fa 100644 --- a/sup3r/pipeline/forward_pass.py +++ b/sup3r/pipeline/forward_pass.py @@ -1128,7 +1128,7 @@ def load_exo_data(self): class:`ExoData` object composed of multiple class:`SingleExoDataStep` objects. """ - data = [] + data = {} exo_data = None if self.exo_kwargs: self.features = [f for f in self.features @@ -1144,7 +1144,7 @@ def load_exo_data(self): sig = signature(ExogenousDataHandler) exo_kwargs = {k: v for k, v in exo_kwargs.items() if k in sig.parameters} - data += ExogenousDataHandler(**exo_kwargs).data + data.update(ExogenousDataHandler(**exo_kwargs).data) exo_data = ExoData(data) return exo_data diff --git a/sup3r/preprocessing/data_handling/exogenous_data_handling.py b/sup3r/preprocessing/data_handling/exogenous_data_handling.py index 4fa676356..5ff81b831 100644 --- a/sup3r/preprocessing/data_handling/exogenous_data_handling.py +++ b/sup3r/preprocessing/data_handling/exogenous_data_handling.py @@ -60,19 +60,24 @@ def __init__(self, steps): Parameters ---------- - steps : list | dict - List of SingleExoDataStep objects or a feature dictionary with list - of steps for each feature + steps : dict + Dictionary with feature keys each with entries describing whether + features should be combined at input, a mid network layer, or with + output. e.g. + {'topography': {'steps': [ + {'combine_type': 'input', 'model': 0, 'data': ..., + 'resolution': ...}, + {'combine_type': 'layer', 'model': 0, 'data': ..., + 'resolution': ...}]}} + Each array in in 'data' key has 3D or 4D shape: + (spatial_1, spatial_2, 1) + (spatial_1, spatial_2, n_temporal, 1) """ - if isinstance(steps, list): - for step in steps: - self.append(step.feature, step) - elif isinstance(steps, dict): + if isinstance(steps, dict): for k, v in steps.items(): self.__setitem__(k, v) else: - msg = ('ExoData must be initialized with a dictionary of features ' - 'or list of SingleExoDataStep objects.') + msg = 'ExoData must be initialized with a dictionary of features.' logger.error(msg) raise ValueError(msg) @@ -117,18 +122,6 @@ def split_exo_dict(self, split_step): spatial models and temporal models split_step should be len(spatial_models). If this is for a TemporalThenSpatial model split_step should be len(temporal_models). - exogenous_data : dict - Dictionary of exogenous feature data with entries describing - whether features should be combined at input, a mid network layer, - or with output. e.g. - {'topography': {'steps': [ - {'combine_type': 'input', 'model': 0, 'data': ..., - 'resolution': ...}, - {'combine_type': 'layer', 'model': 0, 'data': ..., - 'resolution': ...}]}} - Each array in in 'data' key has 3D or 4D shape: - (spatial_1, spatial_2, 1) - (spatial_1, spatial_2, n_temporal, 1) Returns ------- @@ -306,7 +299,7 @@ def __init__(self, self.input_handler = input_handler self.cache_data = cache_data self.cache_dir = cache_dir - self.data = [] + self.data = {feature: {'steps': []}} self.input_check() agg_enhance = self._get_all_agg_and_enhancement() @@ -341,16 +334,16 @@ def __init__(self, t_agg_factor=t_agg_factor) step = SingleExoDataStep(feature, steps[i]['combine_type'], steps[i]['model'], data) - self.data.append(step) + self.data[feature]['steps'].append(step) else: msg = (f"Can only extract {list(self.AVAILABLE_HANDLERS)}." f" Received {feature}.") raise NotImplementedError(msg) - shapes = [None if d is None else d['data'].shape - for d in self.data] + shapes = [None if step is None else step.shape + for step in self.data[feature]['steps']] logger.info( 'Got exogenous_data of length {} with shapes: {}'.format( - len(self.data), shapes)) + len(self.data[feature]['steps']), shapes)) def input_check(self): """Make sure agg factors are provided or exo_resolution and models are diff --git a/tests/data_handling/test_exo_data_handling.py b/tests/data_handling/test_exo_data_handling.py index 2dda35883..601d547b4 100644 --- a/tests/data_handling/test_exo_data_handling.py +++ b/tests/data_handling/test_exo_data_handling.py @@ -43,7 +43,7 @@ def test_exo_cache(feature): target=TARGET, shape=SHAPE, input_handler='DataHandlerNCforCC', cache_dir=os.path.join(td, 'exo_cache')) - for i, arr in enumerate(base.data): + for i, arr in enumerate(base.data[feature]['steps']): assert arr.shape[0] == SHAPE[0] * S_ENHANCE[i] assert arr.shape[1] == SHAPE[1] * S_ENHANCE[i] @@ -58,5 +58,6 @@ def test_exo_cache(feature): cache_dir=os.path.join(td, 'exo_cache')) assert len(os.listdir(f'{td}/exo_cache')) == 2 - for arr1, arr2 in zip(base.data, cache.data): + for arr1, arr2 in zip(base.data[feature]['steps'], + cache.data[feature]['steps']): assert np.allclose(arr1['data'], arr2['data'])