Skip to content

Commit

Permalink
Profile with kineto and warmup for more accurate benchmarking (#3585)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #3585

X-link: facebookresearch/FBGEMM#667

**Summary**
This PR introduces:
A new warm-up method to ensure sufficient GPU preparation before benchmarking.
Benchmark time calculation using the Kineto profiler for measuring the time and bandwidth of inference forward kernels.

**Motivation**
In small benchmark cases, kernel launch and synchronization overheads can be significant compared to the actual kernel runtime. By leveraging the Kineto profiler:
These overheads are eliminated.
Users get a more accurate estimation of kernel execution time and bandwidth of the forward kernel.

For small kernels the iteration based warm-up might not be sufficient.
By leveraging the time based warmup:
Users will be confident the GPU has done enough warm-up.

**Test instruction**
The below script shows how to use this features:
python bench/split_table_batched_embeddings_benchmark.py nbit-device-with-spec --export-trace --warmup_ms 50

Pull Request resolved: #3580

Reviewed By: leitian

Differential Revision: D68292871

Pulled By: q10

fbshipit-source-id: 0a90cddcf07780164e38ac1b945717d8456947c0
  • Loading branch information
amirakb89 authored and facebook-github-bot committed Jan 17, 2025
1 parent 21d1260 commit 379db5f
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 27 deletions.
53 changes: 43 additions & 10 deletions fbgemm_gpu/bench/bench_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,28 @@
logging.basicConfig(level=logging.DEBUG)


def warmup(
request: TBERequest,
warmup_ms: int,
warmup_runs: int,
func: Callable[[torch.Tensor, torch.Tensor, Optional[torch.Tensor]], torch.Tensor],
bwd_only: bool = False,
grad: Optional[torch.Tensor] = None,
) -> None:
indices, offsets, weights = request.unpack_3()
if warmup_ms:
start_time_ms = time.time() * 1000
while time.time() * 1000 - start_time_ms < warmup_ms:
out = func(indices, offsets, weights)
if bwd_only:
out.backward(grad)
else:
for _ in range(warmup_runs):
out = func(indices, offsets, weights)
if bwd_only:
out.backward(grad)


def benchmark_torch_function( # noqa: C901
# pyre-fixme[2]: Parameter must be annotated.
f,
Expand Down Expand Up @@ -159,19 +181,30 @@ def benchmark_requests(
# Can be used to clear model's stats after warmup for example.
callback_after_warmup: Optional[Callable[[], None]] = None,
periodic_logs: bool = False,
warmup_ms: Optional[int] = None,
) -> float:
times = []

# Run at least one warmup iteration to avoid the long cudaLaunchKernel time
# for the first kernel
num_warmups = num_warmups + 1 if num_warmups >= 0 else 1

if num_warmups > 0:
indices, offsets, weights = requests[0].unpack_3()
for _ in range(num_warmups):
out = func(indices, offsets, weights)
if bwd_only:
out.backward(grad)
# for the first kernel if warmup_ms > 0
# warmup_ms is prioritized over num_warmups

if warmup_ms is None:
num_warmups = num_warmups + 1 if num_warmups >= 0 else 1

# warm-up the GPU before profiling
warmup(
requests[0],
# pyre-ignore[6]
warmup_ms,
num_warmups,
lambda indices, offsets, per_sample_weights: func(
indices,
offsets,
per_sample_weights,
),
bwd_only=bwd_only,
grad=grad,
)

if callback_after_warmup is not None:
callback_after_warmup()
Expand Down
177 changes: 160 additions & 17 deletions fbgemm_gpu/bench/split_table_batched_embeddings_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
benchmark_torch_function,
benchmark_vbe,
fill_random_scale_bias,
warmup,
)
else:
from fbgemm_gpu.bench.bench_utils import (
Expand All @@ -87,12 +88,28 @@
benchmark_torch_function,
benchmark_vbe,
fill_random_scale_bias,
warmup,
)


logging.basicConfig(level=logging.DEBUG)


def kineto_trace_profiler(p: profile, trace_info: tuple[str, str, str, str]) -> float:
phase, trace_url, tbe_type, kern_name = trace_info
p.export_chrome_trace(
trace_url.format(tbe_type=tbe_type, phase=phase, ospid=os.getpid())
)
kernel_time = 0
for event in p.key_averages():
# Sum the total time of forward kernel runs
if kern_name in event.key:
kernel_time += event.device_time
assert kernel_time > 0
print(f"Total CUDA time: {kernel_time:.2f} ")
return kernel_time


@click.group()
def cli() -> None:
pass
Expand Down Expand Up @@ -323,7 +340,6 @@ def device( # noqa C901
logging.info(
f"Accessed weights per batch: {B * sum(Ds) * L * param_size_multiplier / 1.0e9: .2f} GB"
)

requests = generate_requests(
iters,
B,
Expand Down Expand Up @@ -1135,6 +1151,7 @@ def nbit_cpu( # noqa C901
@click.option("--iters", default=100)
@click.option("--runs-of-iters", default=5)
@click.option("--warmup-runs", default=2)
@click.option("--warmup-ms", type=int, default=None)
@click.option("--output-dtype", type=SparseType, default=SparseType.FP16)
@click.option("--report-aibench", is_flag=True)
@click.option("--run-reference", is_flag=True, default=False)
Expand All @@ -1148,6 +1165,17 @@ def nbit_cpu( # noqa C901
type=str,
default="{tbe_type}_tbe_{phase}_trace_{ospid}.json",
)
@click.option(
"--warmup-runs",
default=2,
help="Number of warmup runs. Ignored if --warmup-ms is set.",
)
@click.option(
"--warmup-ms",
type=int,
default=None,
help="Warmup duration in milliseconds. Disables the --run-nums option.",
)
def nbit_device( # noqa C901
alpha: float,
bag_size: int,
Expand All @@ -1168,7 +1196,6 @@ def nbit_device( # noqa C901
check_median: bool,
iters: int,
runs_of_iters: int,
warmup_runs: int,
output_dtype: SparseType,
report_aibench: bool,
run_reference: bool,
Expand All @@ -1178,6 +1205,8 @@ def nbit_device( # noqa C901
fp8_exponent_bias: Optional[int],
export_trace: bool,
trace_url: str,
warmup_runs: int,
warmup_ms: Optional[int],
) -> None:
np.random.seed(42)
torch.manual_seed(42)
Expand Down Expand Up @@ -1295,6 +1324,7 @@ def nbit_device( # noqa C901
per_sample_weights,
),
check_median=check_median,
warmup_ms=warmup_ms,
)

# free up GPU memory
Expand Down Expand Up @@ -1324,18 +1354,6 @@ def nbit_device( # noqa C901
f"Memory Usage For Pruning: {mem_for_pruning / 1.0e9:.0f} GB"
)

# Get trace for one run of iter
tbe_type: str = "split"

def _kineto_trace_handler(p: profile, phase: str) -> None:
p.export_chrome_trace(
trace_url.format(tbe_type=tbe_type, phase=phase, ospid=os.getpid())
)

# pyre-ignore[3]
def context_factory(on_trace_ready: Callable[[profile], None]):
return profile(on_trace_ready=on_trace_ready) if export_trace else nullcontext()

requests = generate_requests(
iters,
B,
Expand All @@ -1353,7 +1371,35 @@ def context_factory(on_trace_ready: Callable[[profile], None]):
for req in requests
]

with context_factory(lambda p: _kineto_trace_handler(p, "fwd")):
# pyre-ignore[3]
def context_factory(on_trace_ready: Callable[[profile], None]):
return profile(on_trace_ready=on_trace_ready) if export_trace else nullcontext()

# Get trace for one run of iter
tbe_type: str = "split"
# input of the kineto_trace_profiler
trace_info = ("fwd", trace_url, tbe_type, "embedding_codegen_forward")
time_dict = {"kernel_time": None} # dict to hold the kernel time

# warm-up right before profiling
# warmup_ms prioritized over warmup_runs
if warmup_ms or warmup_runs:
warmup(
requests[0],
# pyre-ignore[6]
warmup_ms,
warmup_runs,
lambda indices, offsets, per_sample_weights: emb.forward(
indices.int(),
offsets.int(),
per_sample_weights,
),
)

with context_factory(
# pyre-ignore[6]
lambda p: time_dict.update(kernel_time=kineto_trace_profiler(p, trace_info))
):
# forward
time_per_iter = benchmark_requests(
requests,
Expand All @@ -1364,6 +1410,21 @@ def context_factory(on_trace_ready: Callable[[profile], None]):
),
check_median=check_median,
)

if export_trace:
kernel_time = time_dict["kernel_time"]
# pyre-ignore[58]
bandwidth = read_write_bytes / kernel_time / 1.0e3

logging.info(
f"kineto profiled stats: "
f"{weights_precision} Forward, B: {B}, "
f"E: {E}, T: {T}, D: {D}, L: {L}, W: {weighted}, "
f"BW: {bandwidth: .2f} GB/s, " # noqa: B950
f"Time: {kernel_time:.0f}us, "
f"Memory Usage For Pruning: {mem_for_pruning / 1.0e9:.0f} GB"
)

# free up GPU memory
del requests

Expand Down Expand Up @@ -1465,12 +1526,28 @@ def context_factory(on_trace_ready: Callable[[profile], None]):
@click.option("--check-median", is_flag=True, default=True)
@click.option("--iters", default=100)
@click.option("--runs-of-iters", default=5)
@click.option("--warmup-runs", default=2)
@click.option("--output-dtype", type=SparseType, default=SparseType.FP16)
@click.option("--report-aibench", is_flag=True)
@click.option("--fp8-exponent-bits", type=int, default=None)
@click.option("--fp8-exponent-bias", type=int, default=None)
@click.option("--use-cpu", is_flag=True, default=False)
@click.option("--export-trace", is_flag=True, default=False)
@click.option(
"--trace-url",
type=str,
default="{tbe_type}_tbe_spec_{phase}_trace_{ospid}.json",
)
@click.option(
"--warmup-runs",
default=2,
help="Number of warmup runs. Ignored if --warmup-ms is set.",
)
@click.option(
"--warmup-ms",
type=int,
default=None,
help="Warmup duration in milliseconds. Disables the --run-nums option.",
)
def nbit_device_with_spec( # noqa C901
alpha: float,
bag_size_list: str,
Expand All @@ -1490,12 +1567,15 @@ def nbit_device_with_spec( # noqa C901
check_median: bool,
iters: int,
runs_of_iters: int,
warmup_runs: int,
output_dtype: SparseType,
report_aibench: bool,
fp8_exponent_bits: Optional[int],
fp8_exponent_bias: Optional[int],
use_cpu: bool,
export_trace: bool,
trace_url: str,
warmup_runs: int,
warmup_ms: Optional[int],
) -> None:
np.random.seed(42)
torch.manual_seed(42)
Expand Down Expand Up @@ -1607,6 +1687,7 @@ def nbit_device_with_spec( # noqa C901
)

times = []
kineto_request = []
for i in range(runs_of_iters):
# Generate a request for each table then combine
all_requests = {
Expand Down Expand Up @@ -1683,8 +1764,13 @@ def nbit_device_with_spec( # noqa C901
per_sample_weights,
),
check_median=check_median,
warmup_ms=warmup_ms,
)

# copy the request of last iteration for kineto profile benchmark
if i == runs_of_iters - 1:
kineto_request = requests

# free up memory
del requests

Expand Down Expand Up @@ -1712,6 +1798,63 @@ def nbit_device_with_spec( # noqa C901
f"Memory Usage For Pruning: {mem_for_pruning / 1.0e9:.0f} GB"
)

# pyre-ignore[3]
def context_factory(on_trace_ready: Callable[[profile], None]):
return profile(on_trace_ready=on_trace_ready) if export_trace else nullcontext()

if not use_cpu:
# profile with kineto
tbe_type: str = "split"
time_dict = {"kernel_time": None} # Shared variable to hold the kernel time
trace_info = ("fwd", trace_url, tbe_type, "embedding_codegen_forward")

# warm-up right before profiling
# warmup_ms prioritized over warmup_runs
if warmup_ms or warmup_runs:
warmup(
kineto_request[0],
# pyre-ignore[6]
warmup_ms,
warmup_runs,
lambda indices, offsets, per_sample_weights: emb.forward(
indices.int(),
offsets.int(),
per_sample_weights,
),
)

with context_factory(
# pyre-ignore[6]
lambda p: time_dict.update(kernel_time=kineto_trace_profiler(p, trace_info))
):
# forward
time_per_iter = benchmark_requests(
kineto_request,
lambda indices, offsets, per_sample_weights: emb.forward(
indices.int(),
offsets.int(),
per_sample_weights,
),
check_median=check_median,
)

if export_trace:
kernel_time = time_dict["kernel_time"]
# pyre-ignore[6]
bandwidth = read_write_bytes / kernel_time / 1.0e3

logging.info(
f"kineto profiled stats: "
f"{weights_precision} Forward, B: {B}, "
f"E: {E}, T: {T}, D: {D}, L: {L}, W: {weighted}, "
f"BW: {bandwidth: .2f} GB/s, " # noqa: B950
f"Time: {kernel_time:.0f}us, "
f"Memory Usage For Pruning: {mem_for_pruning / 1.0e9:.0f} GB"
)

# free up memory
del kineto_request

if report_aibench and haveAIBench:
print(
emitMetric(
Expand Down

0 comments on commit 379db5f

Please sign in to comment.