Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial point samplers #808

Merged
merged 28 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 87 additions & 1 deletion tests/unit/acquisition/test_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import unittest
import unittest.mock
from math import ceil
from typing import Any, Callable, Optional, Tuple, TypeVar, Union
from typing import Any, Callable, Iterable, Optional, Tuple, TypeVar, Union
from unittest.mock import MagicMock

import numpy.testing as npt
Expand All @@ -33,9 +33,11 @@
batchify_joint,
batchify_vectorize,
generate_continuous_optimizer,
generate_initial_points,
generate_random_search_optimizer,
get_bounds_of_box_relaxation_around_point,
optimize_discrete,
sample_from_space,
)
from trieste.acquisition.utils import split_acquisition_function_calls
from trieste.logging import tensorboard_writer
Expand Down Expand Up @@ -824,3 +826,87 @@ def side_effect(*args: Any, **kwargs: Any) -> spo.OptimizeResult:
else:
received_constraints = None
assert received_constraints == expected_constraints


@pytest.mark.parametrize("num_initial_points", [0, 1, 2, 3, 4])
uri-granta marked this conversation as resolved.
Show resolved Hide resolved
def test_generate_initial_points(num_initial_points: int) -> None:
def sampler(space: SearchSpace) -> Iterable[TensorType]:
assert space == Box([-1], [2])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are using this space in multiple tests, perhaps create a constant at the top and reuse it? or a fixture?

yield tf.range(-1, 2, 0.1)[:, None]

best_four_samples = tf.constant([1.0, 0.9, 1.1, 0.8])
points = generate_initial_points(
num_initial_points, sampler, Box([-1], [2]), _quadratic_sum([1.0])
khurram-ghani marked this conversation as resolved.
Show resolved Hide resolved
)
assert points.shape == [num_initial_points, 1, 1]
npt.assert_allclose(points, best_four_samples[:num_initial_points, None, None], atol=1e-6)
uri-granta marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.parametrize("num_initial_points", [0, 1, 2, 3, 6, 10])
def test_generate_initial_points_batched_sampler(num_initial_points: int) -> None:
def sampler(space: SearchSpace) -> Iterable[TensorType]:
assert space == Box([-1], [2])
yield tf.constant([[0.8], [0.9]])
yield tf.constant([[1.0], [1.1]])
yield tf.constant([[1.2], [1.3]])

best_samples = tf.constant([1.0, 0.9, 1.1, 0.8, 1.2, 1.3])
points = generate_initial_points(
num_initial_points, sampler, Box([-1], [2]), _quadratic_sum([1.0])
)
assert points.shape == [min(num_initial_points, 6), 1, 1]
npt.assert_allclose(points, best_samples[:num_initial_points, None, None], atol=1e-6)


@pytest.mark.parametrize("num_initial_points", [0, 1, 2, 10])
@pytest.mark.parametrize("vectorization", [1, 3, 4])
def test_generate_initial_points_vectorized(num_initial_points: int, vectorization: int) -> None:
search_space = Box([-1, -2], [1.5, 2.5])

def sampler(space: SearchSpace) -> Iterable[TensorType]:
assert space == search_space
yield tf.constant([[0], [0.5], [1.0]])

def vectorized_target(x: TensorType) -> TensorType: # [N, V, D] -> [N,V]
shifts = [[0.0], [0.2], [0.5], [1.0]]
individual_func = [
_quadratic_sum(shifts[i])(x[:, i : i + 1, :]) for i in range(vectorization)
]
return tf.concat(individual_func, axis=-1)

best_samples = tf.constant(
[[[0.0], [0.0], [0.5], [1.0]], [[0.5], [0.5], [0.0], [0.5]], [[1.0], [1.0], [1.0], [0.0]]]
)
points = generate_initial_points(
num_initial_points, sampler, search_space, vectorized_target, vectorization
)
assert points.shape == [min(num_initial_points, 3), vectorization, 1]
npt.assert_allclose(points, best_samples[:num_initial_points, :vectorization], atol=1e-6)


@pytest.mark.parametrize("num_samples,batch_size", [(1, None), (5, None), (5, 2), (5, 5), (5, 10)])
def test_sample_from_space(num_samples: int, batch_size: Optional[int]) -> None:
batches = list(sample_from_space(num_samples, batch_size)(Box([0], [1])))
assert len(batches) == ceil(num_samples / (batch_size or num_samples))
assert sum(len(batch) for batch in batches) == num_samples
assert all(0 <= x <= 1 for batch in batches for x in batch)
assert len(set(float(x) for batch in batches for x in batch)) == num_samples
uri-granta marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.parametrize("num_samples,batch_size", [(0, None), (-5, None), (5, 0), (5, -5)])
def test_sample_from_space_raises(num_samples: int, batch_size: Optional[int]) -> None:
with pytest.raises(ValueError):
sample_from_space(num_samples=num_samples, batch_size=batch_size)


def test_optimize_continuous_raises_for_insufficient_starting_points() -> None:
search_space = Box([-1], [2])

def sampler(space: SearchSpace) -> Iterable[TensorType]:
assert space == search_space
yield tf.constant([[0.8], [0.9]])

optimizer = generate_continuous_optimizer(sampler, 3)
with pytest.raises(ValueError) as e:
optimizer(search_space, _quadratic_sum([1.0]))
assert str(e.value) == "Not enough initial points generated (2 for 3 optimization runs)"
227 changes: 162 additions & 65 deletions trieste/acquisition/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from __future__ import annotations

from typing import Any, Callable, List, Optional, Sequence, Tuple, Union, cast
from typing import Any, Callable, Iterable, List, Optional, Sequence, Tuple, Union, cast

import greenlet as gr
import numpy as np
Expand Down Expand Up @@ -168,8 +168,144 @@ def optimize_discrete(
return _get_max_discrete_points(points, target_func)


InitialPointSampler = Callable[[SearchSpace], Iterable[TensorType]]
"""
Type alias for a function that returns initial point candidates for an optimization.
Candidates are returned in one or more batches, and each batch should have the shape [N, D],
even when N=1.

For simplicity and memory usage, it is recommended to define these as generators. For example,
the following initial point sampler returns both a set of pre-optimized points and 50,000
random samples:

def sampler(space: SearchSpace) -> Iterable[TensorType]:
yield pre_optimized_points
yield space.sample(50_000)

While the following does the same but groups the random samples into batches of size 1,000
to conserve memory:

def sampler(space: SearchSpace) -> Iterable[TensorType]:
yield pre_optimized_points
yield from sample_from_space(50_000, batch_size=1_000)(space)
"""


def sample_from_space(num_samples: int, batch_size: Optional[int] = None) -> InitialPointSampler:
"""
An initial point sampler that returns `num_samples` points. If `batch_size` is specified,
then these are returned in batches of that size, to preserve memory usage.
"""
if num_samples <= 0:
raise ValueError(f"num_samples must be positive, got {num_samples}")

if isinstance(batch_size, int) and batch_size <= 0:
raise ValueError(f"batch_size must be positive, got {batch_size}")

batch_size_int = batch_size or num_samples

def sampler(space: SearchSpace) -> Iterable[TensorType]:
for offset in range(0, num_samples, batch_size_int):
yield space.sample(min(num_samples, offset + batch_size_int) - offset)
uri-granta marked this conversation as resolved.
Show resolved Hide resolved

return sampler


def generate_initial_points(
num_initial_points: int,
initial_sampler: InitialPointSampler,
space: SearchSpace,
target_func: AcquisitionFunction,
vectorization: int = 1,
) -> TensorType:
"""
Return the best starting points for an optimization from those generated by a given sampler.

:param num_initial_points: Number of best starting points to return.
:param initial_sampler: Initial point sampler.
:param space: Search space.
:param target_func: Target function being optimized.
:param vectorization: Vectorization of the target function.
"""
top_fun_values: Optional[TensorType] = None # [V, num_optimization_runs]
top_candidates: Optional[TensorType] = None # [V, num_optimization_runs, D]
khurram-ghani marked this conversation as resolved.
Show resolved Hide resolved

for candidates in initial_sampler(space):
if tf.rank(candidates) == 3:
# If samples is a tensor of rank 3, then it is a batch of samples. In this case
# the vectorization of the target function must be a multiple of the length of the
# second (batch) dimension.
remainder = vectorization % tf.shape(candidates)[1]
tf.debugging.assert_equal(
remainder,
tf.cast(0, dtype=remainder.dtype),
message=(
f"""
The vectorization of the target function {vectorization} must be a multiple of
the batch shape of initial samples {tf.shape(candidates)[1]}.
"""
),
)
multiple = vectorization // tf.shape(candidates)[1]
tiled_candidates = tf.tile(candidates, [1, multiple, 1]) # [samples, V, D]
else:
tf.debugging.assert_rank(
candidates,
2,
message=(
f"""
The initial samples must be a tensor of rank 2, got a tensor of rank
{tf.rank(candidates)}.
"""
),
)
tiled_candidates = tf.tile(
candidates[:, None, :], [1, vectorization, 1]
) # [samples, V, D]

target_func_values = target_func(tiled_candidates) # [samples, V]
tf.debugging.assert_shapes(
[(target_func_values, ("_", vectorization))],
message=(
f"""
The result of function target_func has shape
{tf.shape(target_func_values)}, however, expected a trailing
dimension of size {vectorization}.
"""
),
)

if top_candidates is None:
top_candidates = tf.zeros(
[vectorization, 0, tf.shape(candidates)[-1]], dtype=candidates.dtype
)
if top_fun_values is None:
top_fun_values = tf.zeros([vectorization, 0], dtype=target_func_values.dtype)
khurram-ghani marked this conversation as resolved.
Show resolved Hide resolved

top_candidates = tf.concat(
[top_candidates, tf.transpose(tiled_candidates, [1, 0, 2])], 1
) # [V, samples+num_initial_points, D]
top_fun_values = tf.concat(
[top_fun_values, tf.transpose(target_func_values)], 1
) # [V, samples+num_initial_points]

_, top_k_indices = tf.math.top_k(
top_fun_values, k=min(num_initial_points, tf.shape(top_fun_values)[-1])
) # [V, num_initial_points]

top_candidates = tf.gather(
top_candidates, top_k_indices, batch_dims=1
) # [V, num_initial_points, D]
top_fun_values = tf.gather(
top_fun_values, top_k_indices, batch_dims=1
) # [V, num_initial_points]

initial_points = tf.transpose(top_candidates, [1, 0, 2]) # [num_initial_points,V,D]
khurram-ghani marked this conversation as resolved.
Show resolved Hide resolved
return initial_points


def generate_continuous_optimizer(
num_initial_samples: int = NUM_SAMPLES_MIN,
num_initial_samples: int | InitialPointSampler = NUM_SAMPLES_MIN,
num_optimization_runs: int = 10,
num_recovery_runs: int = 10,
optimizer_args: Optional[dict[str, Any]] = None,
Expand All @@ -195,32 +331,38 @@ def generate_continuous_optimizer(
**Note:** using a large number of `num_initial_samples` and `num_optimization_runs` with a
high-dimensional search space can consume a large amount of CPU memory (RAM).

:param num_initial_samples: The size of the random sample used to find the starting point(s) of
the optimization.
:param num_initial_samples: The starting point(s) of the optimization. This can be either
the number of random samples to use, or a function that given the search space returns
the points to use. The latter can be used for example to add pre-optimized starting points
to the random points, as well as to batch point generation to reduce memory usage for
high-dimensional problems.
:param num_optimization_runs: The number of separate optimizations to run.
:param num_recovery_runs: The maximum number of recovery optimization runs in case of failure.
:param optimizer_args: The keyword arguments to pass to the Scipy L-BFGS-B optimizer.
Check `minimize` method of :class:`~scipy.optimize` for details of which arguments
can be passed. Note that method, jac and bounds cannot/should not be changed.
:return: The acquisition optimizer.
uri-granta marked this conversation as resolved.
Show resolved Hide resolved
"""
if num_initial_samples <= 0:
raise ValueError(f"num_initial_samples must be positive, got {num_initial_samples}")

if num_optimization_runs < 0:
if num_optimization_runs <= 0:
raise ValueError(f"num_optimization_runs must be positive, got {num_optimization_runs}")

if num_initial_samples < num_optimization_runs:
if not callable(num_initial_samples) and num_initial_samples < num_optimization_runs:
raise ValueError(
f"""
num_initial_samples {num_initial_samples} must be at
least num_optimization_runs {num_optimization_runs}
"""
)

if num_recovery_runs <= -1:
if num_recovery_runs < 0:
raise ValueError(f"num_recovery_runs must be zero or greater, got {num_recovery_runs}")

initial_sampler = (
sample_from_space(num_initial_samples)
if not callable(num_initial_samples)
else num_initial_samples
)

def optimize_continuous(
space: Box | CollectionSearchSpace,
target_func: Union[AcquisitionFunction, Tuple[AcquisitionFunction, int]],
Expand All @@ -232,7 +374,7 @@ def optimize_continuous(
For :class:'CollectionSearchSpace' we only apply gradient updates to
its class:'Box' subspaces.

When this functions receives an acquisition-integer tuple as its `target_func`,it
When this function receives an acquisition-integer tuple as its `target_func`,it
optimizes each of the individual V functions making up `target_func`, i.e.
evaluating `num_initial_samples` samples, running `num_optimization_runs` runs, and
(if necessary) running `num_recovery_runs` recovery run for each of the individual
Expand All @@ -249,63 +391,18 @@ def optimize_continuous(
else:
V = 1

if V < 0:
if V <= 0:
raise ValueError(f"vectorization must be positive, got {V}")

candidates = space.sample(num_initial_samples)
if tf.rank(candidates) == 3:
# If samples is a tensor of rank 3, then it is a batch of samples. In this case
# the vectorization of the target function must be a multiple of the length of the
# second (batch) dimension.
remainder = V % tf.shape(candidates)[1]
tf.debugging.assert_equal(
remainder,
tf.cast(0, dtype=remainder.dtype),
message=(
f"""
The vectorization of the target function {V} must be a multiple of the batch
shape of initial samples {tf.shape(candidates)[1]}.
"""
),
)
multiple = V // tf.shape(candidates)[1]
tiled_candidates = tf.tile(candidates, [1, multiple, 1]) # [num_initial_samples, V, D]
else:
tf.debugging.assert_rank(
candidates,
2,
message=(
f"""
The initial samples must be a tensor of rank 2, got a tensor of rank
{tf.rank(candidates)}.
"""
),
)
tiled_candidates = tf.tile(
candidates[:, None, :], [1, V, 1]
) # [num_initial_samples, V, D]

target_func_values = target_func(tiled_candidates) # [num_samples, V]
tf.debugging.assert_shapes(
[(target_func_values, ("_", V))],
message=(
f"""
The result of function target_func has shape
{tf.shape(target_func_values)}, however, expected a trailing
dimension of size {V}.
"""
),
)
initial_points = generate_initial_points(
num_optimization_runs, initial_sampler, space, target_func, V
) # [num_optimization_runs,V,D]

_, top_k_indices = tf.math.top_k(
tf.transpose(target_func_values), k=num_optimization_runs
) # [1, num_optimization_runs] or [V, num_optimization_runs]

tiled_candidates = tf.transpose(tiled_candidates, [1, 0, 2]) # [V, num_initial_samples, D]
top_k_points = tf.gather(
tiled_candidates, top_k_indices, batch_dims=1
) # [V, num_optimization_runs, D]
initial_points = tf.transpose(top_k_points, [1, 0, 2]) # [num_optimization_runs,V,D]
if len(initial_points) < num_optimization_runs:
raise ValueError(
f"Not enough initial points generated ({len(initial_points)} "
f"for {num_optimization_runs} optimization runs)"
)

(
successes,
Expand Down
Loading