Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tuner] Add BaselineResultHandler class #789

Merged
merged 17 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion tuner/tuner/candidate_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from abc import abstractmethod

from iree.compiler import ir # type: ignore

from iree.compiler.dialects import iree_codegen # type: ignore

from .common import *
Expand Down
331 changes: 233 additions & 98 deletions tuner/tuner/libtuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import shutil
import logging
import argparse
from collections import defaultdict
from datetime import datetime
from enum import Enum
from pathlib import Path
Expand Down Expand Up @@ -143,6 +144,9 @@ class BenchmarkResult:
time: float
device_id: str

def is_valid(self) -> bool:
return math.isfinite(self.time)


def unit_to_microseconds(real_time: float, time_unit: str) -> float:
unit_conversions = {
Expand Down Expand Up @@ -429,7 +433,7 @@ def init_worker_context(queue: multiprocessing.Queue) -> None:
worker_id, device_id = queue.get()


def create_worker_context_queue(device_ids: list[int]) -> queue.Queue[tuple[int, int]]:
def create_worker_context_queue(device_ids: list[str]) -> queue.Queue[tuple[int, int]]:
"""Create queue contains Worker ID and Device ID for worker initialization"""
worker_contexts_queue = multiprocessing.Manager().Queue()
for worker_id, device_id in enumerate(device_ids):
Expand Down Expand Up @@ -564,7 +568,7 @@ def run_iree_benchmark_module_command(benchmark_pack: BenchmarkPack):

mean_benchmark_time = sum(times) / float(len(times))
logging.debug(
f"Benchmark time of candidate {candidate_id}: {mean_benchmark_time:.2f}"
f"Benchmark time of candidate {candidate_id}: {mean_benchmark_time:.2f} ms"
)
return BenchmarkResult(
candidate_id=candidate_id,
Expand Down Expand Up @@ -749,6 +753,181 @@ def collision_handler(index_hash_list: list[tuple[int, str]]) -> tuple[bool, lis
return collision_detected, unique_indexes


def benchmark_candidates(
candidate_indices, devices, tuning_client, candidate_trackers
) -> list[BenchmarkResult]:
"""
Runs the benchmarking for a given list of candidate indices.
"""
worker_context_queue = create_worker_context_queue(devices)

task_list = [
BenchmarkPack(
iree_benchmark_module_flags=tuning_client.get_iree_benchmark_module_flags(),
benchmark_timeout=tuning_client.get_benchmark_timeout_s(),
candidate_tracker=candidate_trackers[idx],
)
for idx in candidate_indices
]

# Perform benchmarking.
return multiprocess_progress_wrapper(
num_worker=len(devices),
task_list=task_list,
function=run_iree_benchmark_module_command,
initializer=init_worker_context,
initializer_inputs=(worker_context_queue,),
)


def benchmark_baseline(
devices: list[str],
tuning_client: TuningClient,
candidate_tracker: CandidateTracker,
) -> list[BenchmarkResult]:
task_list = [
BenchmarkPack(
iree_benchmark_module_flags=tuning_client.get_iree_benchmark_module_flags(),
benchmark_timeout=tuning_client.get_benchmark_timeout_s(),
candidate_tracker=candidate_tracker,
)
] * len(devices)

worker_context_queue = create_worker_context_queue(devices)
baseline_results = multiprocess_progress_wrapper(
num_worker=len(devices),
task_list=task_list,
function=run_iree_benchmark_module_command,
initializer=init_worker_context,
initializer_inputs=(worker_context_queue,),
)
return baseline_results


class BaselineResultHandler:
def __init__(self) -> None:
# Maps device IDs to a list of `BenchmarkResult`.
self.device_baseline_results: dict[str, list[BenchmarkResult]] = defaultdict(
list
)

def add_run(self, results: list[BenchmarkResult]) -> None:
if not BaselineResultHandler.are_baseline_devices_unique(results):
logging.warning(
"Duplicate device IDs detected in the first baseline results."
)
for result in results:
self.device_baseline_results[result.device_id].append(result)

@staticmethod
def are_baseline_devices_unique(results: list[BenchmarkResult]) -> bool:
return len(results) == len(set(result.device_id for result in results))

def get_valid_time_ms(self, device_id: str) -> list[float]:
return [
result.time
for result in self.device_baseline_results.get(device_id, [])
if result.is_valid()
]

def get_average_result_ms(self, device_id: str) -> Optional[float]:
valid_times = self.get_valid_time_ms(device_id)
if valid_times:
bangtianliu marked this conversation as resolved.
Show resolved Hide resolved
return sum(valid_times) / len(valid_times)
return None

def detect_regressions(
self,
baseline_results: list[BenchmarkResult],
threshold: float = 1.03,
) -> list[str]:
kuhar marked this conversation as resolved.
Show resolved Hide resolved
"""
Returns a list of device IDs where performance regressions were detected.
A performance regression is defined as a baseline time from 'baseline_results'
for a device that exceeds the stored average baseline time for that device by
a factor greater than the specified 'threshold'.
"""
regressions = []
for result in baseline_results:
if not result.is_valid():
continue

baseline_avg = self.get_average_result_ms(result.device_id)
if baseline_avg is not None and result.time > baseline_avg * threshold:
regressions.append(result.device_id)

return regressions

def is_valid(self) -> bool:
bangtianliu marked this conversation as resolved.
Show resolved Hide resolved
"""
Check if there are any valid finite baseline time recorded.
Returns True iff at least one valid (finite) baseline time was recorded.
"""
return any(
self.get_valid_time_ms(device_id)
for device_id in self.device_baseline_results
)

def is_valid_for_device(self, device_id: str) -> bool:
return len(self.get_valid_time_ms(device_id)) != 0

def calculate_speedup(
self, candidate_results: list[BenchmarkResult]
) -> dict[int, float]:
"""
Calculate the speedup for a list of candidate results compared to the baseline.
bangtianliu marked this conversation as resolved.
Show resolved Hide resolved
Returns a map from candidate_id to its speedup ratio.

Speedup is defined as the ratio of the candidate's runtime to the average baseline time
for the corresponding device as:

speedup = candidate_runtime / avg_baseline_time (or fallback_baseline)

If no valid baseline times are available for a specific device, the fallback baseline is used.
The fallback baseline is calculated as the average of all valid baseline times across devices.

If no valid baseline times are available across all devices, the candidate's runtime is
used directly as:

speedup = candidate_runtime
kuhar marked this conversation as resolved.
Show resolved Hide resolved

The speedup values are sorted in ascending order to select the top-performing candidates.
"""
if not self.is_valid():
logging.warning("No valid baseline times available.")
# Use the candidate time directly when no baselines are available.
return {
candidate.candidate_id: candidate.time
bangtianliu marked this conversation as resolved.
Show resolved Hide resolved
for candidate in candidate_results
}

# Calculate the fallback baseline as the average of all valid times across devices.
valid_baseline_times = [
result.time
for device_id in self.device_baseline_results
for result in self.device_baseline_results[device_id]
if result.is_valid()
]

fallback_baseline = sum(valid_baseline_times) / len(valid_baseline_times)

speedup_by_candidate = {}
for candidate in candidate_results:
baseline_avg_ms = self.get_average_result_ms(candidate.device_id)
if baseline_avg_ms is None:
baseline_avg_ms = fallback_baseline
speedup_by_candidate[candidate.candidate_id] = (
candidate.time / baseline_avg_ms
)
return speedup_by_candidate

def sort_candidates_with_speedup(
bangtianliu marked this conversation as resolved.
Show resolved Hide resolved
self,
speedup_by_candidate: dict[int, float],
) -> list[tuple[int, float]]:
return sorted(speedup_by_candidate.items(), key=lambda x: x[1])


def compile(
args: argparse.Namespace,
path_config: PathConfig,
Expand Down Expand Up @@ -832,64 +1011,6 @@ def compile(
return compiled_candidates


def select_best_benchmark_results(
candidate_results: list[BenchmarkResult],
baseline_results: list[BenchmarkResult],
num_candidates: Optional[int],
) -> list[BenchmarkResult]:
filtered_candidate_results = [r for r in candidate_results if math.isfinite(r.time)]
if len(filtered_candidate_results) == 0:
logging.error("No successful candidate benchmarks.")
return []
fallback_baseline_time: Optional[float] = None
filtered_baseline_results: list[BenchmarkResult] = []
for r in baseline_results:
if math.isfinite(r.time):
filtered_baseline_results.append(r)
fallback_baseline_time = r.time
else:
logging.warning(f"Baseline on device {r.device_id} failed.")
if fallback_baseline_time is None:
logging.warning(
f"All baseline benchmarks failed. Baselines will not be used to select top candidates"
)
baseline_times_by_device = {}
for r in filtered_baseline_results:
baseline_times_by_device[r.device_id] = r.time

# Select top candidates
def get_speedup(result: BenchmarkResult) -> float:
if result.device_id in baseline_times_by_device:
return result.time / baseline_times_by_device[result.device_id]
assert fallback_baseline_time is not None, "expected fallback_baseline_time"
return result.time / fallback_baseline_time

num_top_candidates = len(filtered_candidate_results)
if num_candidates is not None:
num_top_candidates = num_candidates

# Sort by the speedup over baseline on the same device. If a device failed
# the baseline benchmark, then use the fallback baseline. If there is no
# successful baseline, then the best we can do is to sort by the actual
# time.
sorting_key = get_speedup
if fallback_baseline_time is None:
sorting_key = lambda result: result.time
best_results = sorted(filtered_candidate_results, key=sorting_key)[
:num_top_candidates
]
logging.info(f"Selected top[{len(best_results)}]:")

for r in best_results:
if fallback_baseline_time is not None:
speedup = f"{round(get_speedup(r) * 100, 2)}% of baseline"
else:
speedup = "baseline unavailable"
result = f"Candidate {r.candidate_id} time: {r.time:.2f} ms ({speedup})"
logging.info(result)
return best_results


def benchmark(
args: argparse.Namespace,
compiled_candidates: list[int],
Expand All @@ -902,46 +1023,60 @@ def benchmark(
logging.warning("No candidates to benchmark.")
return []

task_list = [
BenchmarkPack(
iree_benchmark_module_flags=tuning_client.get_iree_benchmark_module_flags(),
benchmark_timeout=tuning_client.get_benchmark_timeout_s(),
candidate_tracker=candidate_trackers[i],
)
for i in compiled_candidates
if i != 0
]
worker_context_queue = create_worker_context_queue(args.devices)
candidate_results: list[BenchmarkResult] = multiprocess_progress_wrapper(
num_worker=len(args.devices),
task_list=task_list,
function=run_iree_benchmark_module_command,
initializer=init_worker_context,
initializer_inputs=(worker_context_queue,),
)

# Benchmarking baselines on each involved device.
worker_context_queue = create_worker_context_queue(args.devices)
baseline_task_list = [
BenchmarkPack(
iree_benchmark_module_flags=tuning_client.get_iree_benchmark_module_flags(),
benchmark_timeout=tuning_client.get_benchmark_timeout_s(),
candidate_tracker=candidate_trackers[0],
)
] * len(args.devices)
baseline_results: list[BenchmarkResult] = multiprocess_progress_wrapper(
num_worker=len(args.devices),
task_list=baseline_task_list,
function=run_iree_benchmark_module_command,
initializer=init_worker_context,
initializer_inputs=(worker_context_queue,),
baseline_tracker = candidate_trackers[0]
first_baseline_result = benchmark_baseline(
devices=args.devices,
tuning_client=tuning_client,
candidate_tracker=baseline_tracker,
)
baseline_handler = BaselineResultHandler()
baseline_handler.add_run(first_baseline_result)
if not baseline_handler.is_valid():
logging.warning("Baseline run failed.")

candidate_indices = [i for i in compiled_candidates if i != 0]
candidate_results = benchmark_candidates(
candidate_indices=candidate_indices,
devices=args.devices,
tuning_client=tuning_client,
candidate_trackers=candidate_trackers,
)

best_results: list[BenchmarkResult] = select_best_benchmark_results(
candidate_results=candidate_results,
baseline_results=baseline_results,
num_candidates=num_candidates,
second_baseline_result = benchmark_baseline(
devices=args.devices,
tuning_client=tuning_client,
candidate_tracker=baseline_tracker,
)

top_candidates = [result.candidate_id for result in best_results]
return top_candidates
regression_devices = baseline_handler.detect_regressions(second_baseline_result)
if regression_devices:
logging.warning(
f"Performance regressions detected for the following devices: {', '.join(regression_devices)}."
)
baseline_handler.add_run(second_baseline_result)

if not baseline_handler.is_valid():
logging.warning("Baseline run failed.")

speedup_result = baseline_handler.calculate_speedup(candidate_results)
all_candidates_with_speedup = baseline_handler.sort_candidates_with_speedup(
speedup_result
bangtianliu marked this conversation as resolved.
Show resolved Hide resolved
)
top_candidates_with_speedup = all_candidates_with_speedup[:num_candidates]

if baseline_handler.is_valid():
candidate_time_map = {
bangtianliu marked this conversation as resolved.
Show resolved Hide resolved
result.candidate_id: result.time for result in candidate_results
}
for candidate_id, speedup in top_candidates_with_speedup:
actual_time = candidate_time_map[candidate_id]
bangtianliu marked this conversation as resolved.
Show resolved Hide resolved
percentage_of_baseline = speedup * 100
logging.info(
bangtianliu marked this conversation as resolved.
Show resolved Hide resolved
f"Candidate {candidate_id} time: {actual_time:.2f} ms "
f"({percentage_of_baseline:.1f}% of baseline)"
)
else:
for candidate_id, time in top_candidates_with_speedup:
logging.info(f"Candidate {candidate_id} time: {time:.2f} ms")
return [candidate_id for candidate_id, _ in top_candidates_with_speedup]
Loading
Loading