Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable batched benchmarks (with BatchTrial) #2331

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions ax/benchmark/benchmark_method.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from typing import Any

from ax.modelbridge.generation_strategy import GenerationStep, GenerationStrategy
from ax.service.utils.scheduler_options import SchedulerOptions
from ax.service.utils.scheduler_options import SchedulerOptions, TrialType
from ax.utils.common.base import Base
from ax.utils.common.logger import get_logger
from ax.utils.common.typeutils import not_none
Expand Down Expand Up @@ -75,25 +75,48 @@ def __post_init__(self) -> None:
_assign_frozen_attr(self, name="generation_strategy", value=gs_cloned)


def get_sequential_optimization_scheduler_options(
def get_benchmark_scheduler_options(
timeout_hours: int = 4,
batch_size: int = 1,
) -> SchedulerOptions:
"""The typical SchedulerOptions used in benchmarking.
Currently, regardless of batch size, all pending trials must complete before
new ones are generated. That is, when batch_size > 1, the design is "batch
sequential", and when batch_size = 1, the design is "fully sequential."
Args:
timeout_hours: The maximum amount of time (in hours) to run each
benchmark replication. Defaults to 4 hours.
batch_size: Number of trials to generate at once.
"""

return SchedulerOptions(
# Enforce sequential trials by default
# No new candidates can be generated while any are pending.
# If batched, an entire batch must finish before the next can be
# generated.
max_pending_trials=1,
# Do not throttle, as is often necessary when polling real endpoints
init_seconds_between_polls=0,
min_seconds_before_poll=0,
timeout_hours=timeout_hours,
trial_type=TrialType.TRIAL if batch_size == 1 else TrialType.BATCH_TRIAL,
batch_size=batch_size,
)


def get_sequential_optimization_scheduler_options(
timeout_hours: int = 4,
) -> SchedulerOptions:
"""The typical SchedulerOptions used in benchmarking.
Args:
timeout_hours: The maximum amount of time (in hours) to run each
benchmark replication. Defaults to 4 hours.
"""
return get_benchmark_scheduler_options(timeout_hours=timeout_hours)


def _assign_frozen_attr(obj: Any, name: str, value: Any) -> None: # pyre-ignore [2]
"""Assign a new value to an attribute of a frozen dataclass.
This is an ugly hack and shouldn't be used broadly.
Expand Down
12 changes: 11 additions & 1 deletion ax/benchmark/methods/modular_botorch.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,17 @@ def get_sobol_botorch_modular_acquisition(
acqf_name = acqf_name_abbreviations.get(
acquisition_cls.__name__, acquisition_cls.__name__
)
name = name or f"MBM::{model_name}_{acqf_name}"
# Historically all benchmarks were sequential, so sequential benchmarks
# don't get anything added to their name, for continuity
batch_suffix = ""
if (
scheduler_options is not None
and (batch_size := scheduler_options.batch_size) is not None
):
if batch_size > 1:
batch_suffix = f"_q{batch_size}"

name = name or f"MBM::{model_name}_{acqf_name}{batch_suffix}"

generation_strategy = GenerationStrategy(
name=name,
Expand Down
76 changes: 65 additions & 11 deletions ax/benchmark/tests/methods/test_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,76 @@

# pyre-strict

from typing import Dict, Type

import numpy as np

from ax.benchmark.benchmark import benchmark_replication
from ax.benchmark.benchmark_method import get_sequential_optimization_scheduler_options
from ax.benchmark.benchmark_method import get_benchmark_scheduler_options
from ax.benchmark.methods.modular_botorch import get_sobol_botorch_modular_acquisition
from ax.benchmark.methods.sobol import get_sobol_benchmark_method
from ax.benchmark.problems.registry import get_problem
from ax.modelbridge.registry import Models
from ax.service.utils.scheduler_options import SchedulerOptions
from ax.utils.common.testutils import TestCase
from ax.utils.common.typeutils import not_none
from ax.utils.testing.mock import fast_botorch_optimize
from botorch.acquisition.acquisition import AcquisitionFunction
from botorch.acquisition.analytic import LogExpectedImprovement
from botorch.acquisition.knowledge_gradient import qKnowledgeGradient
from botorch.acquisition.logei import qLogExpectedImprovement
from botorch.models.gp_regression import SingleTaskGP
from pyre_extensions import none_throws


class TestMethods(TestCase):
def test_mbm_acquisition(self) -> None:
def setUp(self) -> None:
super().setUp()
self.batch_size = 2
self.scheduler_options_dict: Dict[str, SchedulerOptions] = {
"sequential": get_benchmark_scheduler_options(),
"batch": get_benchmark_scheduler_options(batch_size=self.batch_size),
}

def _test_mbm_acquisition(self, scheduler_options: SchedulerOptions) -> None:
method = get_sobol_botorch_modular_acquisition(
model_cls=SingleTaskGP,
acquisition_cls=qKnowledgeGradient,
scheduler_options=get_sequential_optimization_scheduler_options(),
scheduler_options=scheduler_options,
distribute_replications=False,
)
self.assertEqual(method.name, "MBM::SingleTaskGP_qKnowledgeGradient")
is_batched = (
scheduler_options.batch_size is not None
and scheduler_options.batch_size > 1
)
expected_name = "MBM::SingleTaskGP_qKnowledgeGradient" + (
f"_q{self.batch_size}" if is_batched else ""
)
self.assertEqual(method.name, expected_name)
gs = method.generation_strategy
sobol, kg = gs._steps
self.assertEqual(kg.model, Models.BOTORCH_MODULAR)
model_kwargs = not_none(kg.model_kwargs)
model_kwargs = none_throws(kg.model_kwargs)
self.assertEqual(model_kwargs["botorch_acqf_class"], qKnowledgeGradient)
surrogate_spec = next(iter(model_kwargs["surrogate_specs"].values()))
self.assertEqual(
surrogate_spec.botorch_model_class.__name__,
"SingleTaskGP",
)

def test_mbm_acquisition(self) -> None:
for name, scheduler_options in self.scheduler_options_dict.items():
with self.subTest(name=name):
self._test_mbm_acquisition(scheduler_options=scheduler_options)

@fast_botorch_optimize
def test_benchmark_replication_runs(self) -> None:
def _test_benchmark_replication_runs(
self, scheduler_options: SchedulerOptions, acqf_cls: Type[AcquisitionFunction]
) -> None:
problem = get_problem(problem_name="ackley4")
method = get_sobol_botorch_modular_acquisition(
model_cls=SingleTaskGP,
scheduler_options=get_sequential_optimization_scheduler_options(),
acquisition_cls=LogExpectedImprovement,
scheduler_options=scheduler_options,
acquisition_cls=acqf_cls,
num_sobol_trials=2,
name="test",
distribute_replications=False,
Expand All @@ -56,13 +83,40 @@ def test_benchmark_replication_runs(self) -> None:
self.assertEqual(n_sobol_trials, 2)
self.assertEqual(method.name, "test")
# Only run one non-Sobol trial
problem = get_problem(problem_name="ackley4", num_trials=n_sobol_trials + 1)
n_total_trials = n_sobol_trials + 1
problem = get_problem(problem_name="ackley4", num_trials=n_total_trials)
result = benchmark_replication(problem=problem, method=method, seed=0)
self.assertTrue(np.isfinite(result.score_trace).all())
self.assertEqual(result.optimization_trace.shape, (n_total_trials,))

expected_n_arms_per_batch = (
1 if (batch_size := scheduler_options.batch_size) is None else batch_size
)
self.assertEqual(
len(none_throws(result.experiment).arms_by_name),
n_total_trials * expected_n_arms_per_batch,
)

def test_benchmark_replication_runs(self) -> None:
with self.subTest(name="sequential LogEI"):
self._test_benchmark_replication_runs(
scheduler_options=self.scheduler_options_dict["sequential"],
acqf_cls=LogExpectedImprovement,
)
with self.subTest(name="sequential qLogEI"):
self._test_benchmark_replication_runs(
scheduler_options=self.scheduler_options_dict["sequential"],
acqf_cls=qLogExpectedImprovement,
)
with self.subTest(name="batch qLogEI"):
self._test_benchmark_replication_runs(
scheduler_options=self.scheduler_options_dict["batch"],
acqf_cls=qLogExpectedImprovement,
)

def test_sobol(self) -> None:
method = get_sobol_benchmark_method(
scheduler_options=get_sequential_optimization_scheduler_options(),
scheduler_options=get_benchmark_scheduler_options(),
distribute_replications=False,
)
self.assertEqual(method.name, "Sobol")
Expand Down
8 changes: 8 additions & 0 deletions ax/benchmark/tests/problems/test_surrogate_problems.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,20 @@

import numpy as np
from ax.benchmark.benchmark import compute_score_trace
from ax.benchmark.benchmark_problem import BenchmarkProblemProtocol
from ax.core.runner import Runner
from ax.utils.common.testutils import TestCase
from ax.utils.testing.benchmark_stubs import get_moo_surrogate, get_soo_surrogate


class TestSurrogateProblems(TestCase):
def test_conforms_to_protocol(self) -> None:
sbp = get_soo_surrogate()
self.assertIsInstance(sbp, BenchmarkProblemProtocol)

mbp = get_moo_surrogate()
self.assertIsInstance(mbp, BenchmarkProblemProtocol)

def test_lazy_instantiation(self) -> None:

# test instantiation from init
Expand Down