Skip to content

Commit

Permalink
Re-sync with internal repository
Browse files Browse the repository at this point in the history
The internal and external repositories are out of sync. This Pull Request attempts to brings them back in sync by patching the GitHub repository. Please carefully review this patch. You must disable ShipIt for your project in order to merge this pull request. DO NOT IMPORT this pull request. Instead, merge it directly on GitHub using the MERGE BUTTON. Re-enable ShipIt after merging.
  • Loading branch information
facebook-github-bot committed Mar 18, 2024
1 parent 639b731 commit 5df07ff
Show file tree
Hide file tree
Showing 49 changed files with 736 additions and 2,244 deletions.
196 changes: 11 additions & 185 deletions ax/benchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,20 @@
from itertools import product
from logging import Logger
from time import time
from typing import Dict, Iterable, List
from typing import Iterable, List

import numpy as np

from ax.benchmark.benchmark_method import BenchmarkMethod
from ax.benchmark.benchmark_problem import (
BenchmarkProblemProtocol,
BenchmarkProblemBase,
BenchmarkProblemWithKnownOptimum,
)
from ax.benchmark.benchmark_result import AggregatedBenchmarkResult, BenchmarkResult
from ax.benchmark.metrics.base import BenchmarkMetricBase, GroundTruthMetricMixin
from ax.core.experiment import Experiment
from ax.core.metric import Metric
from ax.core.objective import MultiObjective, Objective
from ax.core.optimization_config import (
MultiObjectiveOptimizationConfig,
OptimizationConfig,
)
from ax.core.outcome_constraint import ObjectiveThreshold, OutcomeConstraint
from ax.core.utils import get_model_times
from ax.service.scheduler import Scheduler
from ax.utils.common.logger import get_logger
from ax.utils.common.typeutils import checked_cast, not_none
from botorch.utils.sampling import manual_seed

logger: Logger = get_logger(__name__)
Expand All @@ -53,7 +44,7 @@
def compute_score_trace(
optimization_trace: np.ndarray,
num_baseline_trials: int,
problem: BenchmarkProblemProtocol,
problem: BenchmarkProblemBase,
) -> np.ndarray:
"""Computes a score trace from the optimization trace."""

Expand All @@ -77,47 +68,20 @@ def compute_score_trace(


def _create_benchmark_experiment(
problem: BenchmarkProblemProtocol, method_name: str
problem: BenchmarkProblemBase, method_name: str
) -> Experiment:
"""Creates an empty experiment for the given problem and method.
If the problem is "noiseless" (i.e. evaluations prior to adding artificial
noise to the outcomes are determinstic), also adds the respective ground
truth metrics as tracking metrics for each metric defined on the problem
(including tracking metrics and the metrics in the OptimizationConfig).
Args:
problem: The BenchmarkProblem to test against (can be synthetic or real).
method_name: Name of the method being tested.
Returns:
The Experiment object to be used for benchmarking.
"""
tracking_metrics = problem.tracking_metrics
if not problem.is_noiseless and problem.has_ground_truth:
# Make the ground truth counterparts for each metric defined on the problem,
# which will be added as tracking metrics on the Experiment object below.
# In the analysis, a modified OptimziationConfig referencing those metrics
# will be passed to the `Scheduler.get_trace()` method, which allows to extract
# the optimziation trace based on the ground truth outcomes (without noise).
# If the problem is known to be noiseless, this is unneccesary and we can just
# use the observations made during the optimization loop directly.
gt_metric_dict = make_ground_truth_metrics(problem=problem)
tracking_metrics = tracking_metrics + list(gt_metric_dict.values())
"""Creates an empty experiment for the given problem and method."""
return Experiment(
name=f"{problem.name}|{method_name}_{int(time())}",
search_space=problem.search_space,
optimization_config=problem.optimization_config,
tracking_metrics=tracking_metrics, # pyre-ignore [6]: Incompatible
# parameter type: In call `Experiment.__init__`, for argument
# `tracking_metrics`, expected `Optional[List[Metric]]` but got
# `Union[List[Union[BenchmarkMetricBase, Metric]], List[BenchmarkMetricBase]]`.
tracking_metrics=problem.tracking_metrics,
runner=problem.runner,
)


def benchmark_replication(
problem: BenchmarkProblemProtocol,
problem: BenchmarkProblemBase,
method: BenchmarkMethod,
seed: int,
) -> BenchmarkResult:
Expand All @@ -141,22 +105,7 @@ def benchmark_replication(
with manual_seed(seed=seed):
scheduler.run_n_trials(max_trials=problem.num_trials)

if not problem.is_noiseless and problem.has_ground_truth:
# We modify the optimization config so we can use `Scheduler.get_trace()`
# to use the true (not corrupted by noise) observations that were logged
# as tracking metrics on the Experiment object. If the problem is known to
# be noiseless, this is unnecssary and we can just use the observations
# made during the optimization loop directly.
analysis_opt_config = make_ground_truth_optimization_config(
experiment=experiment
)
else:
analysis_opt_config = experiment.optimization_config

optimization_trace = np.asarray(
scheduler.get_trace(optimization_config=analysis_opt_config)
)

optimization_trace = np.array(scheduler.get_trace())
try:
# Catch any errors that may occur during score computation, such as errors
# while accessing "steps" in node based generation strategies. The error
Expand All @@ -167,7 +116,6 @@ def benchmark_replication(
num_baseline_trials = scheduler.standard_generation_strategy._steps[
0
].num_trials

score_trace = compute_score_trace(
optimization_trace=optimization_trace,
num_baseline_trials=num_baseline_trials,
Expand All @@ -179,7 +127,7 @@ def benchmark_replication(
)
score_trace = np.full(len(optimization_trace), np.nan)

fit_time, gen_time = get_model_times(experiment=experiment)
fit_time, gen_time = get_model_times(experiment=scheduler.experiment)

return BenchmarkResult(
name=scheduler.experiment.name,
Expand All @@ -193,7 +141,7 @@ def benchmark_replication(


def benchmark_one_method_problem(
problem: BenchmarkProblemProtocol,
problem: BenchmarkProblemBase,
method: BenchmarkMethod,
seeds: Iterable[int],
) -> AggregatedBenchmarkResult:
Expand All @@ -206,7 +154,7 @@ def benchmark_one_method_problem(


def benchmark_multiple_problems_methods(
problems: Iterable[BenchmarkProblemProtocol],
problems: Iterable[BenchmarkProblemBase],
methods: Iterable[BenchmarkMethod],
seeds: Iterable[int],
) -> List[AggregatedBenchmarkResult]:
Expand All @@ -220,125 +168,3 @@ def benchmark_multiple_problems_methods(
benchmark_one_method_problem(problem=p, method=m, seeds=seeds)
for p, m in product(problems, methods)
]


def make_ground_truth_metrics(
problem: BenchmarkProblemProtocol,
include_tracking_metrics: bool = True,
) -> Dict[str, Metric]:
"""Makes a ground truth version for each metric defined on the problem.
Args:
problem: The BenchmarkProblem to test against (can be synthetic or real).
include_tracking_metrics: Whether or not to include tracking metrics.
Returns:
A dict mapping (original) metric names to their respective ground truth metric.
"""
if not problem.has_ground_truth:
raise ValueError(
"Cannot create ground truth metrics for problems that "
"do not have a ground truth."
)
metrics: List[BenchmarkMetricBase] = [
checked_cast(BenchmarkMetricBase, metric)
for metric in problem.optimization_config.metrics.values()
]
if include_tracking_metrics:
metrics = metrics + problem.tracking_metrics
return {metric.name: metric.make_ground_truth_metric() for metric in metrics}


def make_ground_truth_optimization_config(
experiment: Experiment,
) -> OptimizationConfig:
"""Makes a clone of the OptimizationConfig on the experiment in which each metric
is replaced by its respective "ground truth" counterpart, which has been added to
the experiment's tracking metrics in `_create_benchmark_experiment` and which
returns the ground truth (i.e., uncorrupted by noise) observations.
"""
optimization_config = not_none(experiment.optimization_config)

if optimization_config.risk_measure is not None:
raise NotImplementedError("Support for risk measures is not yet implemented.")

# dict for caching metric lookup
gt_metric_dict: Dict[str, BenchmarkMetricBase] = {}

def get_gt_metric(metric: Metric) -> BenchmarkMetricBase:
"""Look up corresponding ground truth metric of the experiment. Will error
out if no corresponding ground truth metric exists."""
if not isinstance(metric, BenchmarkMetricBase):
raise ValueError(
"Only BenchmarkMetricBase metrics are supported for ground truth "
f"metrics. Got {type(metric)}."
)

if metric.name in gt_metric_dict:
return gt_metric_dict[metric.name]

for tracking_metric in experiment.tracking_metrics:
if getattr(tracking_metric, "is_ground_truth", False):
# TODO: Figure out if there is a better way to match the ground truth
# metric and the original metric.
ground_truth_name = tracking_metric.name
orig_name = checked_cast(
GroundTruthMetricMixin, tracking_metric
).get_original_name(ground_truth_name)
if orig_name == metric.name:
tracking_metric = checked_cast(BenchmarkMetricBase, tracking_metric)
gt_metric_dict[metric.name] = tracking_metric
return tracking_metric
raise ValueError(f"Ground truth metric for metric {metric.name} not found!")

# convert outcome constraints
if optimization_config.outcome_constraints is not None:
gt_outcome_constraints = [
OutcomeConstraint(
metric=get_gt_metric(oc.metric),
op=oc.op,
bound=oc.bound,
relative=oc.relative,
)
for oc in optimization_config.outcome_constraints
]
else:
gt_outcome_constraints = None

# we need to distinguish MOO and non-MOO problems
if not optimization_config.is_moo_problem:
gt_objective = Objective(
metric=get_gt_metric(optimization_config.objective.metric)
)

return OptimizationConfig(
objective=gt_objective, outcome_constraints=gt_outcome_constraints
)

gt_objective = MultiObjective(
metrics=[
get_gt_metric(metric) for metric in optimization_config.objective.metrics
]
)
# there may be objective thresholds to also convert
objective_thresholds = checked_cast(
MultiObjectiveOptimizationConfig, optimization_config
).objective_thresholds
if objective_thresholds is not None:
gt_objective_thresholds = [
ObjectiveThreshold(
metric=get_gt_metric(ot.metric),
bound=ot.bound,
relative=ot.relative,
op=ot.op,
)
for ot in objective_thresholds
]
else:
gt_objective_thresholds = None

return MultiObjectiveOptimizationConfig(
objective=gt_objective,
outcome_constraints=gt_outcome_constraints,
objective_thresholds=gt_objective_thresholds,
)
64 changes: 5 additions & 59 deletions ax/benchmark/benchmark_method.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,75 +5,28 @@

# pyre-strict

import logging
from dataclasses import dataclass
from typing import Any

from ax.modelbridge.generation_strategy import GenerationStep, GenerationStrategy
from ax.modelbridge.generation_strategy import GenerationStrategy
from ax.service.utils.scheduler_options import SchedulerOptions
from ax.utils.common.base import Base
from ax.utils.common.logger import get_logger
from ax.utils.common.typeutils import not_none


logger: logging.Logger = get_logger("BenchmarkMethod")


@dataclass(frozen=True)
class BenchmarkMethod(Base):
"""Benchmark method, represented in terms of Ax generation strategy (which tells us
which models to use when) and scheduler options (which tell us extra execution
information like maximum parallelism, early stopping configuration, etc.).
Note: If `BenchmarkMethod.scheduler_options.total_trials` is less than
`BenchmarkProblem.num_trials` then only the number of trials specified in the
former will be run.
Note: The `generation_strategy` passed in is assumed to be in its "base state",
as it will be cloned and reset.
information like maximum parallelism, early stopping configuration, etc.). Note:
if BenchmarkMethod.scheduler_optionss.total_trials is lower than
BenchmarkProblem.num_trials only the number of trials specified in the former will
be run.
"""

name: str
generation_strategy: GenerationStrategy
scheduler_options: SchedulerOptions
distribute_replications: bool = False

def __post_init__(self) -> None:
# We (I think?) in general don't want to fit tracking metrics during our
# benchmarks. Further, not setting `fit_tracking_metrics=False`causes
# issues with the ground truth metrics created automatically when running
# the benchmark - in fact, things will error out deep inside the modeling
# stack since the model gets both noisy (benchmark) and noiseless (ground
# truth) observations. While support for this is something we shold add
# or models, in the context of benchmarking we actually want to avoid
# fitting the ground truth metrics at all.

# Clone the GS so as to not modify the original one in-place below.
# Note that this assumes that the GS passed in is in its base state.
gs_cloned = self.generation_strategy.clone_reset()

for node in gs_cloned._nodes:
if isinstance(node, GenerationStep):
if node.model_kwargs is None:
node.model_kwargs = {}
if node.model_kwargs.get("fit_tracking_metrics", True):
logger.warning(
"Setting `fit_tracking_metrics` in a GenerationStep to False.",
)
not_none(node.model_kwargs)["fit_tracking_metrics"] = False
for model_spec in node.model_specs:
if model_spec.model_kwargs is None:
model_spec.model_kwargs = {}
elif model_spec.model_kwargs.get("fit_tracking_metrics", True):
logger.warning(
"Setting `fit_tracking_metrics` in a GenerationNode's "
"model_spec to False."
)
not_none(model_spec.model_kwargs)["fit_tracking_metrics"] = False

# hack around not being able to update frozen attribute of a dataclass
_assign_frozen_attr(self, name="generation_strategy", value=gs_cloned)


def get_sequential_optimization_scheduler_options(
timeout_hours: int = 4,
Expand All @@ -92,10 +45,3 @@ def get_sequential_optimization_scheduler_options(
min_seconds_before_poll=0,
timeout_hours=timeout_hours,
)


def _assign_frozen_attr(obj: Any, name: str, value: Any) -> None: # pyre-ignore [2]
"""Assign a new value to an attribute of a frozen dataclass.
This is an ugly hack and shouldn't be used broadly.
"""
object.__setattr__(obj, name, value)
Loading

0 comments on commit 5df07ff

Please sign in to comment.