From ba65aeae32086f3ecc4b3a6bf7fdd9f9046df8af Mon Sep 17 00:00:00 2001 From: Alessandro Vullo Date: Wed, 18 Sep 2024 09:03:53 +0100 Subject: [PATCH] Encoded DeepGPs. (#873) --- tests/unit/models/gpflux/test_interface.py | 19 +++++++++++--- trieste/models/gpflux/interface.py | 29 +++++++++++++++------- trieste/models/gpflux/models.py | 18 ++++++++------ trieste/models/gpflux/sampler.py | 8 +++++- 4 files changed, 52 insertions(+), 22 deletions(-) diff --git a/tests/unit/models/gpflux/test_interface.py b/tests/unit/models/gpflux/test_interface.py index 976203517..f7c1c0383 100644 --- a/tests/unit/models/gpflux/test_interface.py +++ b/tests/unit/models/gpflux/test_interface.py @@ -20,7 +20,6 @@ import numpy.testing as npt import pytest import tensorflow as tf -from check_shapes import inherit_check_shapes from gpflow.conditionals.util import sample_mvn from gpflow.keras import tf_keras from gpflux.helpers import construct_basic_inducing_variables, construct_basic_kernel @@ -30,6 +29,7 @@ from tests.util.misc import random_seed from trieste.data import Dataset from trieste.models.gpflux import GPfluxPredictor +from trieste.space import CategoricalSearchSpace, EncoderFunction, one_hot_encoder from trieste.types import TensorType @@ -38,8 +38,9 @@ def __init__( self, optimizer: tf_keras.optimizers.Optimizer | None = None, likelihood: gpflow.likelihoods.Likelihood = gpflow.likelihoods.Gaussian(0.01), + encoder: EncoderFunction | None = None, ): - super().__init__(optimizer=optimizer) + super().__init__(optimizer=optimizer, encoder=encoder) if optimizer is None: self._optimizer = tf_keras.optimizers.Adam() @@ -61,8 +62,7 @@ def model_keras(self) -> tf_keras.Model: def optimizer(self) -> tf_keras.optimizers.Optimizer: return self._optimizer - @inherit_check_shapes - def sample(self, query_points: TensorType, num_samples: int) -> TensorType: + def sample_encoded(self, query_points: TensorType, num_samples: int) -> TensorType: # Taken from GPflow implementation of `GPModel.predict_f_samples` in gpflow.models.model mean, cov = self._model_gpflux.predict_f(query_points, full_cov=True) mean_for_sample = tf.linalg.adjoint(mean) @@ -152,3 +152,14 @@ def test_gpflux_predictor_get_observation_noise_raises_for_non_gaussian_likeliho with pytest.raises(NotImplementedError): model.get_observation_noise() + + +def test_gpflux_categorical_predict() -> None: + search_space = CategoricalSearchSpace(["Red", "Green", "Blue"]) + query_points = search_space.sample(10) + model = _QuadraticPredictor(encoder=one_hot_encoder(search_space)) + mean, variance = model.predict(query_points) + assert mean.shape == [10, 1] + assert variance.shape == [10, 1] + npt.assert_allclose(mean, [[1.0]] * 10, rtol=0.01) + npt.assert_allclose(variance, [[1.0]] * 10, rtol=0.01) diff --git a/trieste/models/gpflux/interface.py b/trieste/models/gpflux/interface.py index 16fd6fe44..79aa1b420 100644 --- a/trieste/models/gpflux/interface.py +++ b/trieste/models/gpflux/interface.py @@ -16,33 +16,46 @@ from abc import ABC, abstractmethod -from check_shapes import inherit_check_shapes from gpflow.base import Module from gpflow.keras import tf_keras +from ...space import EncoderFunction from ...types import TensorType -from ..interfaces import SupportsGetObservationNoise, SupportsPredictY +from ..interfaces import EncodedSupportsPredictY, SupportsGetObservationNoise from ..optimizer import KerasOptimizer -class GPfluxPredictor(SupportsGetObservationNoise, SupportsPredictY, ABC): +class GPfluxPredictor(SupportsGetObservationNoise, EncodedSupportsPredictY, ABC): """ A trainable wrapper for a GPflux deep Gaussian process model. The code assumes subclasses will use the Keras `fit` method for training, and so they should provide access to both a `model_keras` and `model_gpflux`. """ - def __init__(self, optimizer: KerasOptimizer | None = None): + def __init__( + self, optimizer: KerasOptimizer | None = None, encoder: EncoderFunction | None = None + ): """ :param optimizer: The optimizer wrapper containing the optimizer with which to train the model and arguments for the wrapper and the optimizer. The optimizer must be an instance of a :class:`~tf.optimizers.Optimizer`. Defaults to :class:`~tf.optimizers.Adam` optimizer with 0.01 learning rate. + :param encoder: Optional encoder with which to transform query points before + generating predictions. """ if optimizer is None: optimizer = KerasOptimizer(tf_keras.optimizers.Adam(0.01)) self._optimizer = optimizer + self._encoder = encoder + + @property + def encoder(self) -> EncoderFunction | None: + return self._encoder + + @encoder.setter + def encoder(self, encoder: EncoderFunction | None) -> None: + self._encoder = encoder @property @abstractmethod @@ -59,18 +72,16 @@ def optimizer(self) -> KerasOptimizer: """The optimizer wrapper for training the model.""" return self._optimizer - @inherit_check_shapes - def predict(self, query_points: TensorType) -> tuple[TensorType, TensorType]: + def predict_encoded(self, query_points: TensorType) -> tuple[TensorType, TensorType]: """Note: unless otherwise noted, this returns the mean and variance of the last layer conditioned on one sample from the previous layers.""" return self.model_gpflux.predict_f(query_points) @abstractmethod - def sample(self, query_points: TensorType, num_samples: int) -> TensorType: + def sample_encoded(self, query_points: TensorType, num_samples: int) -> TensorType: raise NotImplementedError - @inherit_check_shapes - def predict_y(self, query_points: TensorType) -> tuple[TensorType, TensorType]: + def predict_y_encoded(self, query_points: TensorType) -> tuple[TensorType, TensorType]: """Note: unless otherwise noted, this will return the prediction conditioned on one sample from the lower layers.""" f_mean, f_var = self.model_gpflux.predict_f(query_points) diff --git a/trieste/models/gpflux/models.py b/trieste/models/gpflux/models.py index 2feb6d54a..b30b14c0c 100644 --- a/trieste/models/gpflux/models.py +++ b/trieste/models/gpflux/models.py @@ -19,7 +19,6 @@ import dill import gpflow import tensorflow as tf -from check_shapes import inherit_check_shapes from gpflow.inducing_variables import InducingPoints from gpflow.keras import tf_keras from gpflux.layers import GPLayer, LatentVariableLayer @@ -28,12 +27,13 @@ from ... import logging from ...data import Dataset +from ...space import EncoderFunction from ...types import TensorType from ..interfaces import ( + EncodedTrainableProbabilisticModel, HasReparamSampler, HasTrajectorySampler, ReparametrizationSampler, - TrainableProbabilisticModel, TrajectorySampler, ) from ..optimizer import KerasOptimizer @@ -50,7 +50,7 @@ class DeepGaussianProcess( - GPfluxPredictor, TrainableProbabilisticModel, HasReparamSampler, HasTrajectorySampler + GPfluxPredictor, EncodedTrainableProbabilisticModel, HasReparamSampler, HasTrajectorySampler ): """ A :class:`TrainableProbabilisticModel` wrapper for a GPflux :class:`~gpflux.models.DeepGP` with @@ -65,6 +65,7 @@ def __init__( num_rff_features: int = 1000, continuous_optimisation: bool = True, compile_args: Optional[Mapping[str, Any]] = None, + encoder: EncoderFunction | None = None, ): """ :param model: The underlying GPflux deep Gaussian process model. Passing in a named closure @@ -88,6 +89,8 @@ def __init__( See https://keras.io/api/models/model_training_apis/#compile-method for a list of possible arguments. The ``optimizer`` and ``metrics`` arguments must not be included. + :param encoder: Optional encoder with which to transform query points before + generating predictions. :raise ValueError: If ``model`` has unsupported layers, ``num_rff_features`` is less than 0, if the ``optimizer`` is not of a supported type, or `compile_args` contains disallowed arguments. @@ -113,7 +116,7 @@ def __init__( f"`LatentVariableLayer`, received {type(layer)} instead." ) - super().__init__(optimizer) + super().__init__(optimizer, encoder) if num_rff_features <= 0: raise ValueError( @@ -304,8 +307,7 @@ def model_gpflux(self) -> DeepGP: def model_keras(self) -> tf_keras.Model: return self._model_keras - @inherit_check_shapes - def sample(self, query_points: TensorType, num_samples: int) -> TensorType: + def sample_encoded(self, query_points: TensorType, num_samples: int) -> TensorType: trajectory = self.trajectory_sampler().get_trajectory() expanded_query_points = tf.expand_dims(query_points, -2) # [N, 1, D] tiled_query_points = tf.tile(expanded_query_points, [1, num_samples, 1]) # [N, S, D] @@ -329,7 +331,7 @@ def trajectory_sampler(self) -> TrajectorySampler[GPfluxPredictor]: """ return DeepGaussianProcessDecoupledTrajectorySampler(self, self._num_rff_features) - def update(self, dataset: Dataset) -> None: + def update_encoded(self, dataset: Dataset) -> None: inputs = dataset.query_points new_num_data = inputs.shape[0] self.model_gpflux.num_data = new_num_data @@ -366,7 +368,7 @@ def update(self, dataset: Dataset) -> None: inputs = layer(inputs) - def optimize(self, dataset: Dataset) -> tf_keras.callbacks.History: + def optimize_encoded(self, dataset: Dataset) -> tf_keras.callbacks.History: """ Optimize the model with the specified `dataset`. :param dataset: The data with which to optimize the `model`. diff --git a/trieste/models/gpflux/sampler.py b/trieste/models/gpflux/sampler.py index 02b83753f..4aef9937a 100644 --- a/trieste/models/gpflux/sampler.py +++ b/trieste/models/gpflux/sampler.py @@ -72,6 +72,7 @@ def __init__(self, sample_size: int, model: GPfluxPredictor): ) for _ in range(len(self._model_gpflux.f_layers)) ] + self._encode = lambda x: model.encode(x) @property def _model_gpflux(self) -> tf.Module: @@ -96,7 +97,9 @@ def sample(self, at: TensorType, *, jitter: float = DEFAULTS.JITTER) -> TensorTy tf.debugging.assert_shapes([(at, [..., 1, None])]) tf.debugging.assert_greater_equal(jitter, 0.0) - samples = tf.repeat(at[..., None, :, :], self._sample_size, axis=-3) # [..., S, 1, D] + samples = tf.repeat( + self._encode(at[..., None, :, :]), self._sample_size, axis=-3 + ) # [..., S, 1, D] for i, layer in enumerate(self._model_gpflux.f_layers): if isinstance(layer, LatentVariableLayer): if not self._initialized: @@ -477,6 +480,8 @@ def __init__(self, model: GPfluxPredictor, num_features: int): for i in range(len(model.model_gpflux.f_layers)) ] + self._encode = lambda x: model.encode(x) + @tf.function def __call__(self, x: TensorType) -> TensorType: """ @@ -486,6 +491,7 @@ def __call__(self, x: TensorType) -> TensorType: the batch dimension, and `D` is the input dimensionality. :return: Trajectory samples with shape `[N, B, L]`, where `L` is the number of outputs. """ + x = self._encode(x) for layer in self._sampling_layers: x = layer(x) return x