Skip to content

Commit

Permalink
Enable batched benchmarks (with BatchTrial) (#2331)
Browse files Browse the repository at this point in the history
Summary:

- Add scheduler options for running benchmarks with BatchTrial
- Append "_batch" to the name of MBM batched

Reviewed By: Balandat

Differential Revision: D55750921
  • Loading branch information
esantorella authored and facebook-github-bot committed Apr 8, 2024
1 parent 484ff7c commit e94e825
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 15 deletions.
29 changes: 26 additions & 3 deletions ax/benchmark/benchmark_method.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from typing import Any

from ax.modelbridge.generation_strategy import GenerationStep, GenerationStrategy
from ax.service.utils.scheduler_options import SchedulerOptions
from ax.service.utils.scheduler_options import SchedulerOptions, TrialType
from ax.utils.common.base import Base
from ax.utils.common.logger import get_logger
from ax.utils.common.typeutils import not_none
Expand Down Expand Up @@ -75,25 +75,48 @@ def __post_init__(self) -> None:
_assign_frozen_attr(self, name="generation_strategy", value=gs_cloned)


def get_sequential_optimization_scheduler_options(
def get_benchmark_scheduler_options(
timeout_hours: int = 4,
batch_size: int = 1,
) -> SchedulerOptions:
"""The typical SchedulerOptions used in benchmarking.
Currently, regardless of batch size, all pending trials must complete before
new ones are generated. That is, when batch_size > 1, the design is "batch
sequential", and when batch_size = 1, the design is "fully sequential."
Args:
timeout_hours: The maximum amount of time (in hours) to run each
benchmark replication. Defaults to 4 hours.
batch_size: Number of trials to generate at once.
"""

return SchedulerOptions(
# Enforce sequential trials by default
# No new candidates can be generated while any are pending.
# If batched, an entire batch must finish before the next can be
# generated.
max_pending_trials=1,
# Do not throttle, as is often necessary when polling real endpoints
init_seconds_between_polls=0,
min_seconds_before_poll=0,
timeout_hours=timeout_hours,
trial_type=TrialType.TRIAL if batch_size == 1 else TrialType.BATCH_TRIAL,
batch_size=batch_size,
)


def get_sequential_optimization_scheduler_options(
timeout_hours: int = 4,
) -> SchedulerOptions:
"""The typical SchedulerOptions used in benchmarking.
Args:
timeout_hours: The maximum amount of time (in hours) to run each
benchmark replication. Defaults to 4 hours.
"""
return get_benchmark_scheduler_options(timeout_hours=timeout_hours)


def _assign_frozen_attr(obj: Any, name: str, value: Any) -> None: # pyre-ignore [2]
"""Assign a new value to an attribute of a frozen dataclass.
This is an ugly hack and shouldn't be used broadly.
Expand Down
12 changes: 11 additions & 1 deletion ax/benchmark/methods/modular_botorch.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,17 @@ def get_sobol_botorch_modular_acquisition(
acqf_name = acqf_name_abbreviations.get(
acquisition_cls.__name__, acquisition_cls.__name__
)
name = name or f"MBM::{model_name}_{acqf_name}"
# Historically all benchmarks were sequential, so sequential benchmarks
# don't get anything added to their name, for continuity
batch_suffix = ""
if (
scheduler_options is not None
and (batch_size := scheduler_options.batch_size) is not None
):
if batch_size > 1:
batch_suffix = f"_q{batch_size}"

name = name or f"MBM::{model_name}_{acqf_name}{batch_suffix}"

generation_strategy = GenerationStrategy(
name=name,
Expand Down
76 changes: 65 additions & 11 deletions ax/benchmark/tests/methods/test_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,76 @@

# pyre-strict

from typing import Dict, Type

import numpy as np

from ax.benchmark.benchmark import benchmark_replication
from ax.benchmark.benchmark_method import get_sequential_optimization_scheduler_options
from ax.benchmark.benchmark_method import get_benchmark_scheduler_options
from ax.benchmark.methods.modular_botorch import get_sobol_botorch_modular_acquisition
from ax.benchmark.methods.sobol import get_sobol_benchmark_method
from ax.benchmark.problems.registry import get_problem
from ax.modelbridge.registry import Models
from ax.service.utils.scheduler_options import SchedulerOptions
from ax.utils.common.testutils import TestCase
from ax.utils.common.typeutils import not_none
from ax.utils.testing.mock import fast_botorch_optimize
from botorch.acquisition.acquisition import AcquisitionFunction
from botorch.acquisition.analytic import LogExpectedImprovement
from botorch.acquisition.knowledge_gradient import qKnowledgeGradient
from botorch.acquisition.logei import qLogExpectedImprovement
from botorch.models.gp_regression import SingleTaskGP
from pyre_extensions import none_throws


class TestMethods(TestCase):
def test_mbm_acquisition(self) -> None:
def setUp(self) -> None:
super().setUp()
self.batch_size = 2
self.scheduler_options_dict: Dict[str, SchedulerOptions] = {
"sequential": get_benchmark_scheduler_options(),
"batch": get_benchmark_scheduler_options(batch_size=self.batch_size),
}

def _test_mbm_acquisition(self, scheduler_options: SchedulerOptions) -> None:
method = get_sobol_botorch_modular_acquisition(
model_cls=SingleTaskGP,
acquisition_cls=qKnowledgeGradient,
scheduler_options=get_sequential_optimization_scheduler_options(),
scheduler_options=scheduler_options,
distribute_replications=False,
)
self.assertEqual(method.name, "MBM::SingleTaskGP_qKnowledgeGradient")
is_batched = (
scheduler_options.batch_size is not None
and scheduler_options.batch_size > 1
)
expected_name = "MBM::SingleTaskGP_qKnowledgeGradient" + (
f"_q{self.batch_size}" if is_batched else ""
)
self.assertEqual(method.name, expected_name)
gs = method.generation_strategy
sobol, kg = gs._steps
self.assertEqual(kg.model, Models.BOTORCH_MODULAR)
model_kwargs = not_none(kg.model_kwargs)
model_kwargs = none_throws(kg.model_kwargs)
self.assertEqual(model_kwargs["botorch_acqf_class"], qKnowledgeGradient)
surrogate_spec = next(iter(model_kwargs["surrogate_specs"].values()))
self.assertEqual(
surrogate_spec.botorch_model_class.__name__,
"SingleTaskGP",
)

def test_mbm_acquisition(self) -> None:
for name, scheduler_options in self.scheduler_options_dict.items():
with self.subTest(name=name):
self._test_mbm_acquisition(scheduler_options=scheduler_options)

@fast_botorch_optimize
def test_benchmark_replication_runs(self) -> None:
def _test_benchmark_replication_runs(
self, scheduler_options: SchedulerOptions, acqf_cls: Type[AcquisitionFunction]
) -> None:
problem = get_problem(problem_name="ackley4")
method = get_sobol_botorch_modular_acquisition(
model_cls=SingleTaskGP,
scheduler_options=get_sequential_optimization_scheduler_options(),
acquisition_cls=LogExpectedImprovement,
scheduler_options=scheduler_options,
acquisition_cls=acqf_cls,
num_sobol_trials=2,
name="test",
distribute_replications=False,
Expand All @@ -56,13 +83,40 @@ def test_benchmark_replication_runs(self) -> None:
self.assertEqual(n_sobol_trials, 2)
self.assertEqual(method.name, "test")
# Only run one non-Sobol trial
problem = get_problem(problem_name="ackley4", num_trials=n_sobol_trials + 1)
n_total_trials = n_sobol_trials + 1
problem = get_problem(problem_name="ackley4", num_trials=n_total_trials)
result = benchmark_replication(problem=problem, method=method, seed=0)
self.assertTrue(np.isfinite(result.score_trace).all())
self.assertEqual(result.optimization_trace.shape, (n_total_trials,))

expected_n_arms_per_batch = (
1 if (batch_size := scheduler_options.batch_size) is None else batch_size
)
self.assertEqual(
len(none_throws(result.experiment).arms_by_name),
n_total_trials * expected_n_arms_per_batch,
)

def test_benchmark_replication_runs(self) -> None:
with self.subTest(name="sequential LogEI"):
self._test_benchmark_replication_runs(
scheduler_options=self.scheduler_options_dict["sequential"],
acqf_cls=LogExpectedImprovement,
)
with self.subTest(name="sequential qLogEI"):
self._test_benchmark_replication_runs(
scheduler_options=self.scheduler_options_dict["sequential"],
acqf_cls=qLogExpectedImprovement,
)
with self.subTest(name="batch qLogEI"):
self._test_benchmark_replication_runs(
scheduler_options=self.scheduler_options_dict["batch"],
acqf_cls=qLogExpectedImprovement,
)

def test_sobol(self) -> None:
method = get_sobol_benchmark_method(
scheduler_options=get_sequential_optimization_scheduler_options(),
scheduler_options=get_benchmark_scheduler_options(),
distribute_replications=False,
)
self.assertEqual(method.name, "Sobol")
Expand Down
8 changes: 8 additions & 0 deletions ax/benchmark/tests/problems/test_surrogate_problems.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,20 @@

import numpy as np
from ax.benchmark.benchmark import compute_score_trace
from ax.benchmark.benchmark_problem import BenchmarkProblemProtocol
from ax.core.runner import Runner
from ax.utils.common.testutils import TestCase
from ax.utils.testing.benchmark_stubs import get_moo_surrogate, get_soo_surrogate


class TestSurrogateProblems(TestCase):
def test_conforms_to_protocol(self) -> None:
sbp = get_soo_surrogate()
self.assertIsInstance(sbp, BenchmarkProblemProtocol)

mbp = get_moo_surrogate()
self.assertIsInstance(mbp, BenchmarkProblemProtocol)

def test_lazy_instantiation(self) -> None:

# test instantiation from init
Expand Down

0 comments on commit e94e825

Please sign in to comment.