Skip to content

Commit

Permalink
Try completely different approach
Browse files Browse the repository at this point in the history
  • Loading branch information
Uri Granta committed Feb 2, 2024
1 parent 876fcda commit 05d80e8
Showing 1 changed file with 29 additions and 110 deletions.
139 changes: 29 additions & 110 deletions trieste/acquisition/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@

from __future__ import annotations

import itertools
from abc import ABC, abstractmethod
from typing import Any, Callable, Iterator, List, Optional, Sequence, Tuple, Union, cast
from typing import Any, Callable, Iterable, List, Optional, Sequence, Tuple, Union, cast

import greenlet as gr
import numpy as np
Expand Down Expand Up @@ -170,102 +168,35 @@ def optimize_discrete(
return _get_max_discrete_points(points, target_func)


InitialPointSampler = Callable[[SearchSpace, int, int], TensorType]
"""
Type alias for a function that generates initial samples for an optimizer.
Takes a search space, the number of samples to generate, and an offset, and returns
that many samples (at the offset).
"""


class InitialPointSamplerClass(ABC):
"""An :class:`InitialPointSamplerClass` is an initial point sampler represented using a class
rather than as a standalone function. Using a class to represent an initial point sampler
makes it easier to update internal state between calls.
"""

@abstractmethod
def __call__(self, space: SearchSpace, num_samples: int, offset: int) -> TensorType:
"""Return initial points."""

InitialPointSampler = Callable[[SearchSpace], Iterable[TensorType]]
"""Type alias for a function that returns initial points for an optimization."""

def sample_from_space(space: SearchSpace, num_samples: int, offset: int) -> TensorType:
"""Default initial point sampler that samples randomly from the space."""
return space.sample(num_samples)


def sample_from_sequence(
sequence: Sequence[TensorType],
additional_sampler: Optional[InitialPointSampler] = None,
) -> InitialPointSampler:
def sample_from_space(num_samples: int, batch_size: Optional[int] = None) -> InitialPointSampler:
"""
Initial point sampler that returns points from a prebuilt sequence.
:param sequence: Sequence of initial points (e.g. a list or Tensor).
:param additional_sampler: Sampler to use if there aren't enough points to return.
If unspecified, this scenario raises a :exc:`ValueError`.
An initial point sampler that returns `num_samples` points. If `batch_size` is specified,
then these are returned in batches of that size, to preserve memory usage.
"""
if num_samples <= 0:
raise ValueError(f"num_samples must be positive, got {num_samples}")

def _sampler(space: SearchSpace, num_samples: int, offset: int) -> TensorType:
samples = sequence[offset : offset + num_samples]
if not isinstance(slice, tf.Tensor):
samples = tf.concat(samples, axis=0)
if len(samples) < num_samples:
if additional_sampler is None:
raise ValueError(
"Insufficient samples "
f"({offset + num_samples} required, {len(sequence)} available)"
)
additional_samples = additional_sampler(
space, num_samples - len(samples), max(offset - len(sequence), 0)
)
samples = tf.concat([samples, additional_samples], axis=0)
return samples

return _sampler
if isinstance(batch_size, int) and batch_size <= 0:
raise ValueError(f"batch_size must be positive, got {batch_size}")

batch_size_int = batch_size or num_samples

class sample_from_iterator(InitialPointSamplerClass):
def __init__(
self,
iterator: Iterator[TensorType],
additional_sampler: Optional[InitialPointSampler] = None,
) -> None:
"""
Initial point sampler that returns points from a prebuilt iterator (e.g. a generator).
def sampler(space: SearchSpace) -> Iterable[TensorType]:
for offset in range(0, num_samples, batch_size_int):
yield space.sample(min(num_samples, offset + batch_size_int) - offset)

:param iterator: Iterator returning initial points.
:param additional_sampler: Sampler to use if the iterator is exhausted without enough points.
If unspecified, this scenario raises a :exc:`ValueError`.
"""
self._iterator = iterator
self._additional_sampler = additional_sampler
self._iterator_length = 0

def __call__(self, space: SearchSpace, num_samples: int, offset: int) -> TensorType:
slice = itertools.islice(self._iterator, num_samples)
samples = tf.concat(list(slice), axis=0)
self._iterator_length += len(samples)
if len(samples) < num_samples:
if self._additional_sampler is None:
raise ValueError(
"Insufficient samples "
f"({offset + num_samples} required, {self._iterator_length} available)"
)
additional_samples = self._additional_sampler(
space, num_samples - len(samples), max(offset - self._iterator_length, 0)
)
samples = tf.concat([samples, additional_samples], axis=0)
return samples
return sampler


def generate_continuous_optimizer(
num_initial_samples: int = NUM_SAMPLES_MIN,
num_initial_samples: int | InitialPointSampler = NUM_SAMPLES_MIN,
num_optimization_runs: int = 10,
num_recovery_runs: int = 10,
optimizer_args: Optional[dict[str, Any]] = None,
split_initial_samples: Optional[int] = 100_000,
initial_sampler: InitialPointSampler = sample_from_space,
) -> AcquisitionOptimizer[Box | CollectionSearchSpace]:
"""
Generate a gradient-based optimizer for :class:'Box' and :class:'CollectionSearchSpace'
Expand All @@ -288,43 +219,39 @@ def generate_continuous_optimizer(
**Note:** using a large number of `num_initial_samples` and `num_optimization_runs` with a
high-dimensional search space can consume a large amount of CPU memory (RAM).
:param num_initial_samples: The size of the random sample used to find the starting point(s) of
the optimization.
:param initial_samples: The starting point(s) of the optimization. This can be either
the number of random samples to use, or a function that given the search space returns
the points to use (possibly in multiple batches to help reduce memory usage).
:param num_optimization_runs: The number of separate optimizations to run.
:param num_recovery_runs: The maximum number of recovery optimization runs in case of failure.
:param optimizer_args: The keyword arguments to pass to the Scipy L-BFGS-B optimizer.
Check `minimize` method of :class:`~scipy.optimize` for details of which arguments
can be passed. Note that method, jac and bounds cannot/should not be changed.
:param split_initial_samples: Maximum number of initial samples to process at a time.
Decreasing this can reduce memory usage at the start, at the slight cost of performance.
:param initial_sampler: Function for generating initial samples. This should accept
a search space, the number of samples to generate and an offset. It may be called multiple
times (with different offsets) if `split_initial_samples` is specified. By default,
samples are generated by calling `space.sample(num_samples)`.
:return: The acquisition optimizer.
"""
if num_initial_samples <= 0:
if isinstance(num_initial_samples, int) and num_initial_samples <= 0:
raise ValueError(f"num_initial_samples must be positive, got {num_initial_samples}")

if split_initial_samples is not None and split_initial_samples <= 0:
raise ValueError(
f"split_initial_samples must be positive or None, got {num_initial_samples}"
)

if num_optimization_runs < 0:
raise ValueError(f"num_optimization_runs must be positive, got {num_optimization_runs}")

if num_initial_samples < num_optimization_runs:
if isinstance(num_initial_samples, int) and num_initial_samples < num_optimization_runs:
raise ValueError(
f"""
num_initial_samples {num_initial_samples} must be at
least num_optimization_runs {num_optimization_runs}
"""
)

if num_recovery_runs <= -1:
if num_recovery_runs < 0:
raise ValueError(f"num_recovery_runs must be zero or greater, got {num_recovery_runs}")

initial_sampler = (
sample_from_space(num_initial_samples)
if isinstance(num_initial_samples, int)
else num_initial_samples
)

def optimize_continuous(
space: Box | CollectionSearchSpace,
target_func: Union[AcquisitionFunction, Tuple[AcquisitionFunction, int]],
Expand Down Expand Up @@ -356,18 +283,10 @@ def optimize_continuous(
if V <= 0:
raise ValueError(f"vectorization must be positive, got {V}")

samples_left = num_initial_samples
top_fun_values: Optional[TensorType] = None # [V, num_optimization_runs]
top_candidates: Optional[TensorType] = None # [V, num_optimization_runs, D]

while samples_left > 0:
if split_initial_samples is None:
samples = samples_left
else:
samples = max(min(samples_left, split_initial_samples // V), 1)
samples_left -= samples

candidates = initial_sampler(space, samples, num_initial_samples - samples_left)
for candidates in initial_sampler(space):
if tf.rank(candidates) == 3:
# If samples is a tensor of rank 3, then it is a batch of samples. In this case
# the vectorization of the target function must be a multiple of the length of the
Expand Down

0 comments on commit 05d80e8

Please sign in to comment.