diff --git a/tuner/tuner/candidate_gen.py b/tuner/tuner/candidate_gen.py index f0494c2e5..fd6fcdcc4 100644 --- a/tuner/tuner/candidate_gen.py +++ b/tuner/tuner/candidate_gen.py @@ -25,7 +25,6 @@ from abc import abstractmethod from iree.compiler import ir # type: ignore - from iree.compiler.dialects import iree_codegen # type: ignore from .common import * diff --git a/tuner/tuner/libtuner.py b/tuner/tuner/libtuner.py index 580e0d30e..f3a73bb3d 100644 --- a/tuner/tuner/libtuner.py +++ b/tuner/tuner/libtuner.py @@ -22,6 +22,7 @@ import shutil import logging import argparse +from collections import defaultdict from datetime import datetime from enum import Enum from pathlib import Path @@ -143,6 +144,12 @@ class BenchmarkResult: time: float device_id: str + def is_valid(self) -> bool: + return math.isfinite(self.time) + + def __iter__(self): + return iter((self.candidate_id, self.time, self.device_id)) + def unit_to_microseconds(real_time: float, time_unit: str) -> float: unit_conversions = { @@ -429,7 +436,7 @@ def init_worker_context(queue: multiprocessing.Queue) -> None: worker_id, device_id = queue.get() -def create_worker_context_queue(device_ids: list[int]) -> queue.Queue[tuple[int, int]]: +def create_worker_context_queue(device_ids: list[str]) -> queue.Queue[tuple[int, int]]: """Create queue contains Worker ID and Device ID for worker initialization""" worker_contexts_queue = multiprocessing.Manager().Queue() for worker_id, device_id in enumerate(device_ids): @@ -564,7 +571,7 @@ def run_iree_benchmark_module_command(benchmark_pack: BenchmarkPack): mean_benchmark_time = sum(times) / float(len(times)) logging.debug( - f"Benchmark time of candidate {candidate_id}: {mean_benchmark_time:.2f}" + f"Benchmark time of candidate {candidate_id}: {mean_benchmark_time:.2f} ms" ) return BenchmarkResult( candidate_id=candidate_id, @@ -749,6 +756,170 @@ def collision_handler(index_hash_list: list[tuple[int, str]]) -> tuple[bool, lis return collision_detected, unique_indexes +def benchmark_candidates( + candidate_indices, devices, tuning_client, candidate_trackers +) -> list[BenchmarkResult]: + """ + Runs the benchmarking for a given list of candidate indices. + """ + worker_context_queue = create_worker_context_queue(devices) + + task_list = [ + BenchmarkPack( + iree_benchmark_module_flags=tuning_client.get_iree_benchmark_module_flags(), + benchmark_timeout=tuning_client.get_benchmark_timeout_s(), + candidate_tracker=candidate_trackers[idx], + ) + for idx in candidate_indices + ] + + # Perform benchmarking. + return multiprocess_progress_wrapper( + num_worker=len(devices), + task_list=task_list, + function=run_iree_benchmark_module_command, + initializer=init_worker_context, + initializer_inputs=(worker_context_queue,), + ) + + +def benchmark_baseline( + devices: list[str], + tuning_client: TuningClient, + candidate_tracker: CandidateTracker, +) -> list[BenchmarkResult]: + task_list = [ + BenchmarkPack( + iree_benchmark_module_flags=tuning_client.get_iree_benchmark_module_flags(), + benchmark_timeout=tuning_client.get_benchmark_timeout_s(), + candidate_tracker=candidate_tracker, + ) + ] * len(devices) + + worker_context_queue = create_worker_context_queue(devices) + baseline_results = multiprocess_progress_wrapper( + num_worker=len(devices), + task_list=task_list, + function=run_iree_benchmark_module_command, + initializer=init_worker_context, + initializer_inputs=(worker_context_queue,), + ) + return baseline_results + + +class BaselineResultHandler: + def __init__(self) -> None: + # Maps device IDs to a list of `BenchmarkResult`. + self.device_baseline_results: dict[str, list[BenchmarkResult]] = defaultdict( + list + ) + + def add_run(self, results: list[BenchmarkResult]) -> None: + if not BaselineResultHandler.are_baseline_devices_unique(results): + logging.warning( + "Duplicate device IDs detected in the first baseline results." + ) + for result in results: + self.device_baseline_results[result.device_id].append(result) + + @staticmethod + def are_baseline_devices_unique(results: list[BenchmarkResult]) -> bool: + return len(results) == len(set(result.device_id for result in results)) + + def get_valid_time_ms(self, device_id: str) -> list[float]: + return [ + result.time + for result in self.device_baseline_results.get(device_id, []) + if result.is_valid() + ] + + def get_average_result_ms(self, device_id: str) -> Optional[float]: + valid_times = self.get_valid_time_ms(device_id) + if valid_times: + return sum(valid_times) / len(valid_times) + return None + + def detect_regressions( + self, + baseline_results: list[BenchmarkResult], + threshold: float = 1.03, + ) -> list[str]: + """ + Returns a list of device IDs where performance regressions were detected. + A performance regression is defined as a baseline time from 'baseline_results' + for a device that exceeds the stored average baseline time for that device by + a factor greater than the specified 'threshold'. + """ + regressions = [] + for result in baseline_results: + if not result.is_valid(): + continue + + baseline_avg = self.get_average_result_ms(result.device_id) + if baseline_avg is not None and result.time > baseline_avg * threshold: + regressions.append(result.device_id) + + return regressions + + def is_valid(self) -> bool: + """ + Check if there are any valid finite baseline time recorded. + Returns True iff at least one valid (finite) baseline time was recorded. + """ + return any( + self.get_valid_time_ms(device_id) + for device_id in self.device_baseline_results + ) + + def is_valid_for_device(self, device_id: str) -> bool: + return len(self.get_valid_time_ms(device_id)) != 0 + + def get_candidates_ordered_by_speedup( + self, candidate_results: list[BenchmarkResult] + ) -> list[tuple[BenchmarkResult, float]]: + """ + Returns a list of tuples (BenchmarkResult, speedup) sorted in ascending order based on speedup + or raw runtime. + + If no valid baseline times are available across all devices, candidates are sorted based on + their raw runtime. A placeholder speedup value of 1.0 is assigned to each candidate. + + If valid baseline times exist, speedup is defined as the ratio of the candidate's runtime to + the average baseline time for the corresponding device as: + + speedup = candidate_runtime / avg_baseline_time (or fallback_baseline) + + If no valid baseline times are available for a specific device, the fallback baseline is used. + The fallback baseline is the average of all valid baseline times across devices. + """ + if not self.is_valid(): + logging.warning("No valid baseline times available.") + # Use the candidate time directly when no baselines are available. + return sorted( + [(candidate, 1.0) for candidate in candidate_results], + key=lambda x: x[0].time, + ) + + # Calculate the fallback baseline as the average of all valid times across devices. + valid_baseline_times = [ + result.time + for device_id in self.device_baseline_results + for result in self.device_baseline_results[device_id] + if result.is_valid() + ] + + fallback_baseline = sum(valid_baseline_times) / len(valid_baseline_times) + + candidates_with_speedup = [] + for candidate in candidate_results: + baseline_avg_ms = self.get_average_result_ms(candidate.device_id) + if baseline_avg_ms is None: + baseline_avg_ms = fallback_baseline + speedup = candidate.time / baseline_avg_ms + candidates_with_speedup.append((candidate, speedup)) + return sorted(candidates_with_speedup, key=lambda x: x[1]) + + def compile( args: argparse.Namespace, path_config: PathConfig, @@ -832,64 +1003,6 @@ def compile( return compiled_candidates -def select_best_benchmark_results( - candidate_results: list[BenchmarkResult], - baseline_results: list[BenchmarkResult], - num_candidates: Optional[int], -) -> list[BenchmarkResult]: - filtered_candidate_results = [r for r in candidate_results if math.isfinite(r.time)] - if len(filtered_candidate_results) == 0: - logging.error("No successful candidate benchmarks.") - return [] - fallback_baseline_time: Optional[float] = None - filtered_baseline_results: list[BenchmarkResult] = [] - for r in baseline_results: - if math.isfinite(r.time): - filtered_baseline_results.append(r) - fallback_baseline_time = r.time - else: - logging.warning(f"Baseline on device {r.device_id} failed.") - if fallback_baseline_time is None: - logging.warning( - f"All baseline benchmarks failed. Baselines will not be used to select top candidates" - ) - baseline_times_by_device = {} - for r in filtered_baseline_results: - baseline_times_by_device[r.device_id] = r.time - - # Select top candidates - def get_speedup(result: BenchmarkResult) -> float: - if result.device_id in baseline_times_by_device: - return result.time / baseline_times_by_device[result.device_id] - assert fallback_baseline_time is not None, "expected fallback_baseline_time" - return result.time / fallback_baseline_time - - num_top_candidates = len(filtered_candidate_results) - if num_candidates is not None: - num_top_candidates = num_candidates - - # Sort by the speedup over baseline on the same device. If a device failed - # the baseline benchmark, then use the fallback baseline. If there is no - # successful baseline, then the best we can do is to sort by the actual - # time. - sorting_key = get_speedup - if fallback_baseline_time is None: - sorting_key = lambda result: result.time - best_results = sorted(filtered_candidate_results, key=sorting_key)[ - :num_top_candidates - ] - logging.info(f"Selected top[{len(best_results)}]:") - - for r in best_results: - if fallback_baseline_time is not None: - speedup = f"{round(get_speedup(r) * 100, 2)}% of baseline" - else: - speedup = "baseline unavailable" - result = f"Candidate {r.candidate_id} time: {r.time:.2f} ms ({speedup})" - logging.info(result) - return best_results - - def benchmark( args: argparse.Namespace, compiled_candidates: list[int], @@ -902,46 +1015,61 @@ def benchmark( logging.warning("No candidates to benchmark.") return [] - task_list = [ - BenchmarkPack( - iree_benchmark_module_flags=tuning_client.get_iree_benchmark_module_flags(), - benchmark_timeout=tuning_client.get_benchmark_timeout_s(), - candidate_tracker=candidate_trackers[i], - ) - for i in compiled_candidates - if i != 0 - ] - worker_context_queue = create_worker_context_queue(args.devices) - candidate_results: list[BenchmarkResult] = multiprocess_progress_wrapper( - num_worker=len(args.devices), - task_list=task_list, - function=run_iree_benchmark_module_command, - initializer=init_worker_context, - initializer_inputs=(worker_context_queue,), - ) - # Benchmarking baselines on each involved device. - worker_context_queue = create_worker_context_queue(args.devices) - baseline_task_list = [ - BenchmarkPack( - iree_benchmark_module_flags=tuning_client.get_iree_benchmark_module_flags(), - benchmark_timeout=tuning_client.get_benchmark_timeout_s(), - candidate_tracker=candidate_trackers[0], - ) - ] * len(args.devices) - baseline_results: list[BenchmarkResult] = multiprocess_progress_wrapper( - num_worker=len(args.devices), - task_list=baseline_task_list, - function=run_iree_benchmark_module_command, - initializer=init_worker_context, - initializer_inputs=(worker_context_queue,), + baseline_tracker = candidate_trackers[0] + first_baseline_result = benchmark_baseline( + devices=args.devices, + tuning_client=tuning_client, + candidate_tracker=baseline_tracker, + ) + baseline_handler = BaselineResultHandler() + baseline_handler.add_run(first_baseline_result) + if not baseline_handler.is_valid(): + logging.warning("Baseline run failed.") + + candidate_indices = [i for i in compiled_candidates if i != 0] + candidate_results = benchmark_candidates( + candidate_indices=candidate_indices, + devices=args.devices, + tuning_client=tuning_client, + candidate_trackers=candidate_trackers, ) - best_results: list[BenchmarkResult] = select_best_benchmark_results( - candidate_results=candidate_results, - baseline_results=baseline_results, - num_candidates=num_candidates, + second_baseline_result = benchmark_baseline( + devices=args.devices, + tuning_client=tuning_client, + candidate_tracker=baseline_tracker, ) - top_candidates = [result.candidate_id for result in best_results] - return top_candidates + regression_devices = baseline_handler.detect_regressions(second_baseline_result) + if regression_devices: + logging.warning( + f"Performance regressions detected for the following devices: {', '.join(regression_devices)}." + ) + baseline_handler.add_run(second_baseline_result) + + if not baseline_handler.is_valid(): + logging.warning("Baseline run failed.") + + all_candidates_with_speedup = baseline_handler.get_candidates_ordered_by_speedup( + candidate_results + ) + top_candidates_with_speedup = all_candidates_with_speedup[:num_candidates] + top_candidate_ids = [] + if baseline_handler.is_valid(): + for candidate, speedup in top_candidates_with_speedup: + time_ms = candidate.time + candidate_id = candidate.candidate_id + percentage_of_baseline = speedup * 100 + top_candidate_ids.append(candidate_id) + logging.info( + f"Candidate {candidate_id} time: {time_ms:.2f} ms " + f"({percentage_of_baseline:.1f}% of baseline)" + ) + else: + for candidate, _ in top_candidates_with_speedup: + time_ms = candidate.time + candidate_id = candidate.candidate_id + top_candidate_ids.append(candidate_id) + logging.info(f"Candidate {candidate_id} time: {time_ms:.2f} ms") + return top_candidate_ids diff --git a/tuner/tuner/libtuner_test.py b/tuner/tuner/libtuner_test.py index f7e189637..bbca2d370 100644 --- a/tuner/tuner/libtuner_test.py +++ b/tuner/tuner/libtuner_test.py @@ -187,60 +187,117 @@ def test_get_compilation_success_rate(): assert libtuner.get_compilation_success_rate(compiled_candidates) == 0.0 -def test_select_best_benchmark_results() -> None: - candidate_results = [ - libtuner.BenchmarkResult(1, 0.5, "hip://0"), - libtuner.BenchmarkResult(2, 0.3, "hip://1"), - libtuner.BenchmarkResult(3, 0.2, "hip://2"), - libtuner.BenchmarkResult(4, 0.1, "hip://3"), +def test_enum_collision(): + from iree.compiler.dialects import linalg, vector, iree_gpu, iree_codegen, iree_input # type: ignore + + +def test_baseline_result_handler_valid(): + handler = libtuner.BaselineResultHandler() + assert not handler.is_valid() + baseline = [ + libtuner.BenchmarkResult(0, 0.5, "hip://0"), + libtuner.BenchmarkResult(0, math.inf, "hip://1"), + libtuner.BenchmarkResult(0, 0.7, "hip://0"), ] - baseline_results = [ - libtuner.BenchmarkResult(0, 1.0, "hip://0"), - libtuner.BenchmarkResult(0, 0.1, "hip://1"), - libtuner.BenchmarkResult(0, 0.1, "hip://2"), - libtuner.BenchmarkResult(0, 0.1, "hip://3"), + assert libtuner.BaselineResultHandler.are_baseline_devices_unique([]) + assert not libtuner.BaselineResultHandler.are_baseline_devices_unique(baseline) + handler.add_run(baseline) + assert handler.is_valid() + assert handler.is_valid_for_device("hip://0") + assert not handler.is_valid_for_device("hip://1") + + assert handler.device_baseline_results["hip://0"] == [ + libtuner.BenchmarkResult(0, 0.5, "hip://0"), + libtuner.BenchmarkResult(0, 0.7, "hip://0"), ] - best_results: list[ - libtuner.BenchmarkResult - ] = libtuner.select_best_benchmark_results( - candidate_results=candidate_results, - baseline_results=baseline_results, - num_candidates=3, - ) - assert best_results[0].candidate_id == 1 - assert best_results[1].candidate_id == 4 - assert best_results[2].candidate_id == 3 - - baseline_results = [ - libtuner.BenchmarkResult(0, math.inf, "hip://0"), - libtuner.BenchmarkResult(0, 0.1, "hip://1"), - libtuner.BenchmarkResult(0, 0.1, "hip://2"), - libtuner.BenchmarkResult(0, 0.1, "hip://3"), + assert handler.device_baseline_results["hip://1"] == [ + libtuner.BenchmarkResult(0, math.inf, "hip://1"), ] - best_results = libtuner.select_best_benchmark_results( - candidate_results=candidate_results, - baseline_results=baseline_results, - num_candidates=3, - ) - assert best_results[0].candidate_id == 4 - assert best_results[1].candidate_id == 3 - assert best_results[2].candidate_id == 2 - baseline_results = [ - libtuner.BenchmarkResult(0, math.inf, "hip://0"), + assert handler.get_valid_time_ms("hip://0") == [0.5, 0.7] + assert handler.get_valid_time_ms("hip://1") == [] + assert handler.get_valid_time_ms("hip://2") == [] + + additional_baseline = [ libtuner.BenchmarkResult(0, math.inf, "hip://1"), - libtuner.BenchmarkResult(0, math.inf, "hip://2"), - libtuner.BenchmarkResult(0, math.inf, "hip://3"), + libtuner.BenchmarkResult(0, math.nan, "hip://1"), + libtuner.BenchmarkResult(0, 1.2, "hip://1"), + libtuner.BenchmarkResult(0, 0.8, "hip://1"), ] - best_results = libtuner.select_best_benchmark_results( - candidate_results=candidate_results, - baseline_results=baseline_results, - num_candidates=3, + assert not libtuner.BaselineResultHandler.are_baseline_devices_unique( + additional_baseline ) - assert best_results[0].candidate_id == 4 - assert best_results[1].candidate_id == 3 - assert best_results[2].candidate_id == 2 + handler.add_run(additional_baseline) + assert handler.get_valid_time_ms("hip://0") == [0.5, 0.7] + assert handler.get_valid_time_ms("hip://1") == [1.2, 0.8] + assert handler.is_valid_for_device("hip://1") + assert handler.get_average_result_ms("hip://0") == 0.6 + assert handler.get_average_result_ms("hip://1") == 1.0 -def test_enum_collision(): - from iree.compiler.dialects import linalg, vector, iree_gpu, iree_codegen, iree_input # type: ignore + +def test_baseline_result_handler_speedup(): + handler = libtuner.BaselineResultHandler() + first_run_baseline = [ + libtuner.BenchmarkResult(0, 1.0, "hip://0"), + libtuner.BenchmarkResult(0, 0.5, "hip://1"), + ] + handler.detect_regressions(first_run_baseline) == [] + handler.add_run(first_run_baseline) + + second_run_baseline = [ + libtuner.BenchmarkResult(0, 0.8, "hip://0"), + libtuner.BenchmarkResult(0, math.inf, "hip://1"), + libtuner.BenchmarkResult(0, 1.2, "hip://2"), + ] + handler.detect_regressions(second_run_baseline) == ["hip:://1"] + handler.add_run(second_run_baseline) + + candidates = [ + libtuner.BenchmarkResult(1, 0.4, "hip://0"), + libtuner.BenchmarkResult(2, 0.3, "hip://1"), + libtuner.BenchmarkResult(3, 1.0, "hip://2"), + libtuner.BenchmarkResult(4, 0.2, "hip://3"), + ] + all_candidates_with_speedup = handler.get_candidates_ordered_by_speedup(candidates) + assert all_candidates_with_speedup == [ + (candidates[3], 0.2 / 0.875), + (candidates[0], 0.4 / 0.9), + (candidates[1], 0.3 / 0.5), + (candidates[2], 1.0 / 1.2), + ] + top_candidates_with_speedup = all_candidates_with_speedup[:2] + assert [candidate.candidate_id for candidate, _ in top_candidates_with_speedup] == [ + 4, + 1, + ] + + candidates = [ + libtuner.BenchmarkResult(5, 0.6, "hip://0"), + libtuner.BenchmarkResult(6, 0.4, "hip://1"), + libtuner.BenchmarkResult(7, 0.8, "hip://2"), + ] + all_candidates_with_speedup = handler.get_candidates_ordered_by_speedup(candidates) + assert all_candidates_with_speedup == [ + (candidates[0], 0.6 / 0.9), + (candidates[2], 0.8 / 1.2), + (candidates[1], 0.4 / 0.5), + ] + top_candidates_with_speedup = all_candidates_with_speedup[:2] + assert [candidate.candidate_id for candidate, _ in top_candidates_with_speedup] == [ + 5, + 7, + ] + + handler = libtuner.BaselineResultHandler() + all_candidates_with_speedup = handler.get_candidates_ordered_by_speedup(candidates) + assert all_candidates_with_speedup == [ + (candidates[1], 1.0), + (candidates[0], 1.0), + (candidates[2], 1.0), + ] + top_candidates_with_speedup = all_candidates_with_speedup[:2] + assert [candidate.candidate_id for candidate, _ in top_candidates_with_speedup] == [ + 6, + 5, + ]