Skip to content

Commit

Permalink
Add PosteriorTransform to get_optimal_samples and optimize_posterior_…
Browse files Browse the repository at this point in the history
…samples (pytorch#2576)

Summary:

Added `posterior_transform` arg to get_optimal_samples to enable posterior sampling-based (xES, TestSet IG) acquisition functions with minimization problems. Intended use in one-shot settings.

Differential Revision: D64266499
  • Loading branch information
Carl Hvarfner authored and facebook-github-bot committed Oct 22, 2024
1 parent b9d863d commit 8b81462
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 70 deletions.
56 changes: 42 additions & 14 deletions botorch/acquisition/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
IdentityMCObjective,
MCAcquisitionObjective,
PosteriorTransform,
ScalarizedPosteriorTransform,
)
from botorch.exceptions.errors import (
BotorchTensorDimensionError,
Expand All @@ -28,10 +29,11 @@
from botorch.models.model import Model
from botorch.sampling.base import MCSampler
from botorch.sampling.get_sampler import get_sampler
from botorch.sampling.pathwise import draw_matheron_paths
from botorch.sampling.pathwise.posterior_samplers import get_matheron_path_model
from botorch.utils.objective import compute_feasibility_indicator
from botorch.utils.sampling import optimize_posterior_samples
from botorch.utils.transforms import is_ensemble, normalize_indices
from gpytorch.models import GP
from torch import Tensor


Expand Down Expand Up @@ -486,36 +488,62 @@ def project_to_sample_points(X: Tensor, sample_points: Tensor) -> Tensor:


def get_optimal_samples(
model: Model,
model: GP,
bounds: Tensor,
num_optima: int,
raw_samples: int = 1024,
num_restarts: int = 20,
maximize: bool = True,
posterior_transform: ScalarizedPosteriorTransform | None = None,
objective: MCAcquisitionObjective | None = None,
return_transformed: bool = False,
) -> tuple[Tensor, Tensor]:
"""Draws sample paths from the posterior and maximizes the samples using GD.
Args:
model (Model): The model from which samples are drawn.
bounds: (Tensor): Bounds of the search space. If the model inputs are
model: The model from which samples are drawn.
bounds: Bounds of the search space. If the model inputs are
normalized, the bounds should be normalized as well.
num_optima (int): The number of paths to be drawn and optimized.
raw_samples (int, optional): The number of candidates randomly sample.
num_optima: The number of paths to be drawn and optimized.
raw_samples: The number of candidates randomly sample.
Defaults to 1024.
num_restarts (int, optional): The number of candidates to do gradient-based
num_restarts: The number of candidates to do gradient-based
optimization on. Defaults to 20.
maximize: Whether to maximize or minimize the samples.
posterior_transform: A ScalarizedPosteriorTransform (may e.g. be used to
scalarize multi-output models or negate the objective).
objective: An MCAcquisitionObjective, used to negate the objective or otherwise
transform sample outputs. Cannot be combined with `posterior_transform`.
return_transformed: If True, return the transformed samples.
Returns:
Tuple[Tensor, Tensor]: The optimal input locations and corresponding
outputs, x* and f*.
The optimal input locations and corresponding outputs, x* and f*.
"""
paths = draw_matheron_paths(model, sample_shape=torch.Size([num_optima]))
if posterior_transform and not isinstance(
posterior_transform, ScalarizedPosteriorTransform
):
raise ValueError(
"Only the ScalarizedPosteriorTransform is supported for "
"get_optimal_samples."
)
if posterior_transform and objective:
raise ValueError(
"Only one of `posterior_transform` and `objective` can be specified."
)

if posterior_transform:
sample_transform = posterior_transform.evaluate
elif objective:
sample_transform = objective
else:
sample_transform = None

paths = get_matheron_path_model(model=model, sample_shape=torch.Size([num_optima]))
optimal_inputs, optimal_outputs = optimize_posterior_samples(
paths,
paths=paths,
bounds=bounds,
raw_samples=raw_samples,
num_restarts=num_restarts,
maximize=maximize,
sample_transform=sample_transform,
return_transformed=return_transformed,
)
return optimal_inputs, optimal_outputs
68 changes: 39 additions & 29 deletions botorch/utils/sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import warnings

from abc import ABC, abstractmethod
from collections.abc import Generator, Iterable
from collections.abc import Callable, Generator, Iterable
from contextlib import contextmanager
from typing import Any, TYPE_CHECKING
from typing import TYPE_CHECKING

import numpy as np
import numpy.typing as npt
Expand All @@ -37,7 +37,9 @@


if TYPE_CHECKING:
from botorch.sampling.pathwise.paths import SamplePath # pragma: no cover
from botorch.models.deterministic import ( # pragma: no cover
GenericDeterministicModel,
)


@contextmanager
Expand Down Expand Up @@ -989,68 +991,76 @@ def sparse_to_dense_constraints(


def optimize_posterior_samples(
paths: SamplePath,
paths: GenericDeterministicModel,
bounds: Tensor,
candidates: Tensor | None = None,
raw_samples: int | None = 1024,
raw_samples: int = 1024,
num_restarts: int = 20,
maximize: bool = True,
**kwargs: Any,
sample_transform: Callable[[Tensor], Tensor] | None = None,
return_transformed: bool = False,
) -> tuple[Tensor, Tensor]:
r"""Cheaply maximizes posterior samples by random querying followed by vanilla
gradient descent on the best num_restarts points.
r"""Cheaply maximizes posterior samples by random querying followed by
gradient-based optimization using SciPy's L-BFGS-B routine.
Args:
paths: Random Fourier Feature-based sample paths from the GP
bounds: The bounds on the search space.
candidates: A priori good candidates (typically previous design points)
which acts as extra initial guesses for the optimization routine.
raw_samples: The number of samples with which to query the samples initially.
num_restarts: The number of points selected for gradient-based optimization.
maximize: Boolean indicating whether to maimize or minimize
sample_transform: A callable transform of the sample outputs (e.g.
MCAcquisitionObjective or ScalarizedPosteriorTransform.evaluate) used to
negate the objective or otherwise transform the output.
return_transformed: A boolean indicating whether to return the transformed
or non-transformed samples.
Returns:
A two-element tuple containing:
- X_opt: A `num_optima x [batch_size] x d`-dim tensor of optimal inputs x*.
- f_opt: A `num_optima x [batch_size] x 1`-dim tensor of optimal outputs f*.
- f_opt: A `num_optima x [batch_size] x m`-dim, optionally
`num_optima x [batch_size] x 1`-dim, tensor of optimal outputs f*.
"""
if maximize:

def path_func(x):
return paths(x)

else:
def path_func(x) -> Tensor:
res = paths(x)
if sample_transform:
res = sample_transform(res)

def path_func(x):
return -paths(x)
return res.squeeze(-1)

candidate_set = unnormalize(
SobolEngine(dimension=bounds.shape[1], scramble=True).draw(raw_samples), bounds
SobolEngine(dimension=bounds.shape[1], scramble=True).draw(n=raw_samples),
bounds=bounds,
)

# queries all samples on all candidates - output shape
# raw_samples * num_optima * num_models
candidate_queries = path_func(candidate_set)
argtop_k = torch.topk(candidate_queries, num_restarts, dim=-1).indices
X_top_k = candidate_set[argtop_k, :]

# to avoid circular import, the import occurs here
from botorch.generation.gen import gen_candidates_torch
from botorch.generation.gen import gen_candidates_scipy

X_top_k, f_top_k = gen_candidates_torch(
X_top_k, path_func, lower_bounds=bounds[0], upper_bounds=bounds[1], **kwargs
X_top_k, f_top_k = gen_candidates_scipy(
X_top_k,
path_func,
lower_bounds=bounds[0],
upper_bounds=bounds[1],
)
f_opt, arg_opt = f_top_k.max(dim=-1, keepdim=True)

# For each sample (and possibly for every model in the batch of models), this
# retrieves the argmax. We flatten, pick out the indices and then reshape to
# the original batch shapes (so instead of pickig out the argmax of a
# (3, 7, num_restarts, D)) along the num_restarts dim, we pick it out of a
# (21 , num_restarts, D)
# (21, num_restarts, D)
final_shape = candidate_queries.shape[:-1]
X_opt = X_top_k.reshape(final_shape.numel(), num_restarts, -1)[
torch.arange(final_shape.numel()), arg_opt.flatten()
].reshape(*final_shape, -1)
if not maximize:
f_opt = -f_opt

# if we return transformed, we do not need to pass the samples through paths
# paths a second time but rather just return the transformed optimal values
if return_transformed:
return X_opt, f_opt

f_opt = paths(X_opt.unsqueeze(-2)).squeeze(-2)
return X_opt, f_opt
118 changes: 100 additions & 18 deletions test/acquisition/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@

import torch

from botorch.acquisition.objective import GenericMCObjective, LearnedObjective
from botorch.acquisition.objective import (
ExpectationPosteriorTransform,
GenericMCObjective,
LearnedObjective,
LinearMCObjective,
ScalarizedPosteriorTransform,
)
from botorch.acquisition.utils import (
compute_best_feasible_objective,
expand_trace_observations,
Expand Down Expand Up @@ -412,32 +418,108 @@ class TestGetOptimalSamples(BotorchTestCase):
def test_get_optimal_samples(self):
dims = 3
dtype = torch.float64
for_testing_speed_kwargs = {"raw_samples": 50, "num_restarts": 3}
for_testing_speed_kwargs = {"raw_samples": 20, "num_restarts": 2}
num_optima = 7
batch_shape = (3,)

bounds = torch.tensor([[0, 1]] * dims, dtype=dtype).T
X = torch.rand(*batch_shape, 4, dims, dtype=dtype)
Y = torch.sin(X).sum(dim=-1, keepdim=True).to(dtype)
model = SingleTaskGP(X, Y)
X_opt, f_opt = get_optimal_samples(
model, bounds, num_optima=num_optima, **for_testing_speed_kwargs
Y = torch.sin(2 * 3.1415 * X).sum(dim=-1, keepdim=True).to(dtype)
model = SingleTaskGP(train_X=X, train_Y=Y)
posterior_transform = ScalarizedPosteriorTransform(
weights=torch.ones(1, dtype=dtype)
)
X_opt, f_opt_min = get_optimal_samples(
model,
bounds,
num_optima=num_optima,
maximize=False,
**for_testing_speed_kwargs,
posterior_transform_neg = ScalarizedPosteriorTransform(
weights=-torch.ones(1, dtype=dtype)
)

with torch.random.fork_rng():
torch.manual_seed(0)
X_opt_def, f_opt_def = get_optimal_samples(
model=model,
bounds=bounds,
num_optima=num_optima,
**for_testing_speed_kwargs,
)
correct_X_shape = (num_optima,) + batch_shape + (dims,)
correct_f_shape = (num_optima,) + batch_shape + (1,)
self.assertEqual(X_opt.shape, correct_X_shape)
self.assertEqual(f_opt.shape, correct_f_shape)
# asserting that the solutions found by minimization the samples are smaller
# than those found by maximization
self.assertTrue(torch.all(f_opt_min < f_opt))
self.assertEqual(X_opt_def.shape, correct_X_shape)
self.assertEqual(f_opt_def.shape, correct_f_shape)
with torch.random.fork_rng():
torch.manual_seed(0)
X_opt_ps, f_opt_ps = get_optimal_samples(
model=model,
bounds=bounds,
num_optima=num_optima,
posterior_transform=posterior_transform,
**for_testing_speed_kwargs,
)
self.assertAllClose(X_opt_def, X_opt_ps)

with torch.random.fork_rng():
torch.manual_seed(0)
X_opt_ps_neg, f_opt_ps_neg = get_optimal_samples(
model=model,
bounds=bounds,
num_optima=num_optima,
posterior_transform=posterior_transform_neg,
**for_testing_speed_kwargs,
)
# maxima larger than minima when the seed is fixed
self.assertTrue(torch.all(f_opt_ps_neg < f_opt_ps))

obj = LinearMCObjective(weights=-torch.ones(1, dtype=dtype))
with torch.random.fork_rng():
torch.manual_seed(0)
X_opt_obj_neg, f_opt_obj_neg = get_optimal_samples(
model=model,
bounds=bounds,
num_optima=num_optima,
objective=obj,
**for_testing_speed_kwargs,
)
# check that the minimum is the same for negative objective and
# negative posterior transform
self.assertAllClose(X_opt_ps_neg, X_opt_obj_neg)

obj = LinearMCObjective(weights=-torch.ones(1, dtype=dtype))
with torch.random.fork_rng():
torch.manual_seed(0)
_, f_opt_obj_pos = get_optimal_samples(
model=model,
bounds=bounds,
num_optima=num_optima,
objective=obj,
return_transformed=True,
**for_testing_speed_kwargs,
)
# check that the transformed return value is the negation of the
# non-transformed return value
self.assertAllClose(f_opt_obj_pos, -f_opt_obj_neg)

with self.assertRaisesRegex(
ValueError,
"Only the ScalarizedPosteriorTransform is supported for "
"get_optimal_samples.",
):
get_optimal_samples(
model=model,
bounds=bounds,
num_optima=num_optima,
posterior_transform=ExpectationPosteriorTransform(n_w=5),
**for_testing_speed_kwargs,
)
with self.assertRaisesRegex(
ValueError,
"Only one of `posterior_transform` and `objective` can be specified.",
):
get_optimal_samples(
model=model,
bounds=bounds,
num_optima=num_optima,
posterior_transform=posterior_transform,
objective=obj,
**for_testing_speed_kwargs,
)


class TestPreferenceUtils(BotorchTestCase):
Expand Down
Loading

0 comments on commit 8b81462

Please sign in to comment.