Skip to content

Commit

Permalink
add more functions to handler class and format the tests
Browse files Browse the repository at this point in the history
Signed-off-by: Bangtian Liu <[email protected]>
  • Loading branch information
bangtianliu committed Jan 17, 2025
1 parent fce1eb5 commit 6b4b292
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 214 deletions.
194 changes: 99 additions & 95 deletions tuner/tuner/libtuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,37 +263,6 @@ def map_baseline_by_device(
return average_device_times


def detect_baseline_regression(
first_baseline_results: list[BenchmarkResult],
second_baseline_results: list[BenchmarkResult],
) -> list[str]:
"""
Detects performance regressions between two sets of baseline results.
Returns a list of device IDs where performance regressions are detected.
"""
regression_device_ids = []
first_baseline_by_device = map_baseline_by_device(first_baseline_results)
second_baseline_by_device = map_baseline_by_device(second_baseline_results)
for device_id in first_baseline_by_device:
if device_id not in second_baseline_by_device:
continue
first_baseline_ms = first_baseline_by_device[device_id]
second_baseline_ms = second_baseline_by_device[device_id]

if second_baseline_ms > first_baseline_ms * 1.03:
percentage_slower = (
(second_baseline_ms - first_baseline_ms) / first_baseline_ms
) * 100
logging.warning(
f"Performance regression detected on device {device_id}: "
f"First baseline time = {first_baseline_ms} ms, Second baseline time = {second_baseline_ms} ms, "
f"Slower by {percentage_slower:.3f}%"
)
regression_device_ids.append(device_id)

return regression_device_ids


class ExecutionPhases(str, Enum):
dont_stop = ""
generate_candidates = "generate-candidates"
Expand Down Expand Up @@ -873,61 +842,110 @@ def benchmark_baseline(

class BaselineResultHandler:
def __init__(self) -> None:
self.runs: dict[str, list[float]] = defaultdict(list)
# Maps device IDs to a list of baseline run times (in milliseconds).
self.device_baseline_times: dict[str, list[float]] = defaultdict(list)

def add_run(self, results: list[BenchmarkResult]) -> None:
for result in results:
if math.isfinite(result.time):
self.runs[result.device_id].append(result.time)
self.device_baseline_times[result.device_id].append(result.time)

def get_valid_time_ms(self, device_id: str) -> list[float]:
return [
time
for time in self.device_baseline_times.get(device_id, [])
if math.isfinite(time)
]

def num_successful_runs(self, device_id: str) -> int:
return len(self.runs[device_id])
return len(self.get_valid_time_ms(device_id))

def get_average_result(self, device_id: str) -> Optional[float]:
times = self.runs.get(device_id, [])
if times:
return sum(times) / len(times)
def get_average_result_ms(self, device_id: str) -> Optional[float]:
valid_times = self.get_valid_time_ms(device_id)
if valid_times:
return sum(valid_times) / len(valid_times)
return None

def detect_regressions(
self,
baseline_results: list[BenchmarkResult],
threshold: float = 1.03,
) -> list[str]:
regressions = []
for result in baseline_results:
if not math.isfinite(result.time):
continue

baseline_avg = self.get_average_result_ms(result.device_id)
if baseline_avg is not None and result.time > baseline_avg * threshold:
regressions.append(result.device_id)
logging.warning(
f"Performance regression detected on device {result.device_id}: "
f"Stored average baseline time = {baseline_avg:.2f} ms, "
f"New baseline time = {result.time:.2f} ms, "
f"Slower by {((result.time - baseline_avg) / baseline_avg) * 100:.2f}%"
)

return regressions

def is_valid(self) -> bool:
return any(self.runs.values())
"""
Check if there are any valid finite baseline time recorded.
Return True if at least a valid (finite) baseline time recorded,
otherwise False.
This method determines whether the baseline data is available for computations
such as calculating speedup.
"""
return any(
self.get_valid_time_ms(device_id)
for device_id in self.device_baseline_times
)

def is_valid_for_device(self, device_id: str) -> bool:
return bool(self.runs.get(device_id))
return bool(self.get_valid_time_ms(device_id))

def calculate_speedup(
self, candidate_results: list[BenchmarkResult]
) -> dict[int, float]:
"""
Calculate the speedup for a list of candidate results compared to the baseline.
Return A dictionary where the key is the candidate_id and the value is the speedup ratio.
Returns a map from candidate_id to its speedup ratio.
"""
baseline_times = [
if not self.is_valid():
logging.warning("No valid baseline times available.")
# Use the candidate time directly when no baselines are available
return {
candidate.candidate_id: candidate.time
for candidate in candidate_results
}

# Calculate the fallback baseline as the average of all valid times across devices
valid_baseline_times = [
time
for times in self.runs.values()
for time in times
if math.isfinite(time)
for device_id in self.device_baseline_times
for time in self.get_valid_time_ms(device_id)
]
fallback_baseline = (
sum(baseline_times) / len(baseline_times) if baseline_times else None
)

fallback_baseline = sum(valid_baseline_times) / len(valid_baseline_times)

speedup_by_candidate = {}
if fallback_baseline is None:
logging.warning("No valid baseline times available. ")
# Use the candidate time directly when no baselines are available
for candidate in candidate_results:
speedup_by_candidate[candidate.candidate_id] = candidate.time
else:
for candidate in candidate_results:
baseline_avg = self.get_average_result(candidate.device_id)
if baseline_avg is None or not math.isfinite(baseline_avg):
baseline_avg = fallback_baseline
speedup_by_candidate[candidate.candidate_id] = (
candidate.time / baseline_avg
)
for candidate in candidate_results:
baseline_avg = self.get_average_result_ms(candidate.device_id)
if baseline_avg is None or not math.isfinite(baseline_avg):
baseline_avg = fallback_baseline
speedup_by_candidate[candidate.candidate_id] = candidate.time / baseline_avg
return speedup_by_candidate

def get_top_candidates(
self,
speedup_by_candidate: dict[int, float],
num_candidates: Optional[int] = None,
) -> list[int]:
sorted_candidates = sorted(speedup_by_candidate.items(), key=lambda x: x[1])
if num_candidates is not None:
sorted_candidates = sorted_candidates[:num_candidates]

return [candidate_id for candidate_id, _ in sorted_candidates]


def compile(
args: argparse.Namespace,
Expand Down Expand Up @@ -1012,28 +1030,6 @@ def compile(
return compiled_candidates


def get_top_candidates(
speedup_by_candidate: dict[int, float],
num_candidates: Optional[int] = None,
use_speedup: bool = True,
) -> list[int]:
sorted_candidates = sorted(speedup_by_candidate.items(), key=lambda x: x[1])

if num_candidates is not None:
sorted_candidates = sorted_candidates[:num_candidates]

for candidate_id, metric_value in sorted_candidates:
if use_speedup:
percentage_of_baseline = metric_value * 100
logging.info(
f"Candidate {candidate_id} time: {metric_value:.2f} ms "
f"({percentage_of_baseline:.1f}% of baseline)"
)
else:
logging.info(f"Candidate {candidate_id} time: {metric_value:.2f} ms")
return [candidate_id for candidate_id, _ in sorted_candidates]


def benchmark(
args: argparse.Namespace,
compiled_candidates: list[int],
Expand Down Expand Up @@ -1074,6 +1070,12 @@ def benchmark(
tuning_client=tuning_client,
candidate_tracker=baseline_tracker,
)

regression_devices = baseline_handler.detect_regressions(second_baseline_result)
if regression_devices:
logging.warning(
f"Performance regressions detected for the following devices: {', '.join(regression_devices)}"
)
baseline_handler.add_run(second_baseline_result)

if not baseline_handler.is_valid():
Expand All @@ -1082,18 +1084,20 @@ def benchmark(
if not are_baseline_devices_unique(second_baseline_result):
logging.warning("Duplicate device IDs detected in the second baseline results.")

regression_devices = detect_baseline_regression(
first_baseline_result, second_baseline_result
)
if regression_devices:
logging.warning(
f"Performance regressions detected for the following devices: {', '.join(regression_devices)}"
)

speedup_result = baseline_handler.calculate_speedup(candidate_results)
# If the baseline is valid (`baseline_handler.is_valid()`), `speedup_result` represents the speedup values.
# Otherwise, `speedup_result` contains the raw time values.
top_candidates = get_top_candidates(
speedup_result, num_candidates, baseline_handler.is_valid()
)
top_candidates = baseline_handler.get_top_candidates(speedup_result, num_candidates)
if baseline_handler.is_valid():
for candidate_id in top_candidates:
speedup_value = speedup_result[candidate_id]
percentage_of_baseline = speedup_value * 100
logging.info(
f"Candidate {candidate_id} time: {speedup_value:.2f} ms "
f"({percentage_of_baseline:.1f}% of baseline)"
)
else:
for candidate_id in top_candidates:
raw_time = speedup_result[candidate_id]
logging.info(f"Candidate {candidate_id} time: {raw_time:.2f} ms")
return top_candidates
Loading

0 comments on commit 6b4b292

Please sign in to comment.