diff --git a/tests/entrypoints/openai/test_metrics.py b/tests/entrypoints/openai/test_metrics.py index 469a5fb039fb6..64deaedf0f2c1 100644 --- a/tests/entrypoints/openai/test_metrics.py +++ b/tests/entrypoints/openai/test_metrics.py @@ -105,8 +105,6 @@ async def client(server): @pytest.mark.asyncio async def test_metrics_counts(server: RemoteOpenAIServer, client: openai.AsyncClient, use_v1: bool): - if use_v1: - pytest.skip("Skipping test on vllm V1") for _ in range(_NUM_REQUESTS): # sending a request triggers the metrics to be logged. await client.completions.create( @@ -120,6 +118,9 @@ async def test_metrics_counts(server: RemoteOpenAIServer, # Loop over all expected metric_families for metric_family, suffix_values_list in EXPECTED_VALUES.items(): + if use_v1 and metric_family not in EXPECTED_METRICS_V1: + continue + found_metric = False # Check to see if the metric_family is found in the prom endpoint. @@ -199,6 +200,8 @@ async def test_metrics_counts(server: RemoteOpenAIServer, EXPECTED_METRICS_V1 = [ "vllm:num_requests_running", "vllm:num_requests_waiting", + "vllm:prompt_tokens_total", + "vllm:generation_tokens_total", ] diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 917d52d3220b8..022b6d0668e99 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -305,7 +305,8 @@ def _log_stats( return for logger in self.stat_loggers: - logger.log(scheduler_stats=scheduler_stats) + logger.log(scheduler_stats=scheduler_stats, + iteration_stats=iteration_stats) def encode( self, diff --git a/vllm/v1/metrics/loggers.py b/vllm/v1/metrics/loggers.py index b84f03fa3267c..6a7bb423749e1 100644 --- a/vllm/v1/metrics/loggers.py +++ b/vllm/v1/metrics/loggers.py @@ -1,11 +1,12 @@ import time from abc import ABC, abstractmethod -from typing import Dict +from typing import Dict, List +import numpy as np import prometheus_client from vllm.logger import init_logger -from vllm.v1.metrics.stats import SchedulerStats +from vllm.v1.metrics.stats import IterationStats, SchedulerStats logger = init_logger(__name__) @@ -15,27 +16,61 @@ class StatLoggerBase(ABC): @abstractmethod - def log(self, scheduler_stats: SchedulerStats): + def log(self, scheduler_stats: SchedulerStats, + iteration_stats: IterationStats): ... class LoggingStatLogger(StatLoggerBase): def __init__(self): - self.last_log_time = time.monotonic() + self._reset(time.monotonic()) - def log(self, scheduler_stats: SchedulerStats): - """Log Stats to standard output.""" + def _reset(self, now): + self.last_log_time = now + + # Tracked stats over current local logging interval. + self.num_prompt_tokens: List[int] = [] + self.num_generation_tokens: List[int] = [] + def _local_interval_elapsed(self, now: float) -> bool: # Log every _LOCAL_LOGGING_INTERVAL_SEC. + elapsed_time = now - self.last_log_time + return elapsed_time > _LOCAL_LOGGING_INTERVAL_SEC + + def _track_iteration_stats(self, iteration_stats: IterationStats): + # Save tracked stats for token counters. + self.num_prompt_tokens.append(iteration_stats.num_prompt_tokens) + self.num_generation_tokens.append( + iteration_stats.num_generation_tokens) + + def _get_throughput(self, tracked_stats: List[int], now: float) -> float: + # Compute summary metrics for tracked stats + return float(np.sum(tracked_stats) / (now - self.last_log_time)) + + def log(self, scheduler_stats: SchedulerStats, + iteration_stats: IterationStats): + """Log Stats to standard output.""" + + self._track_iteration_stats(iteration_stats) + now = time.monotonic() - if now - self.last_log_time < _LOCAL_LOGGING_INTERVAL_SEC: + if not self._local_interval_elapsed(now): return - self.last_log_time = now + + prompt_throughput = self._get_throughput(self.num_prompt_tokens, now) + generation_throughput = self._get_throughput( + self.num_generation_tokens, now) + + self._reset(now) # Format and print output. logger.info( + "Avg prompt throughput: %.1f tokens/s, " + "Avg generation throughput: %.1f tokens/s, " "Running: %d reqs, Waiting: %d reqs ", + prompt_throughput, + generation_throughput, scheduler_stats.num_running_reqs, scheduler_stats.num_waiting_reqs, ) @@ -61,11 +96,26 @@ def __init__(self, labels: Dict[str, str]): documentation="Number of requests waiting to be processed.", labelnames=labelnames).labels(*labelvalues) - def log(self, scheduler_stats: SchedulerStats): + self.counter_prompt_tokens = prometheus_client.Counter( + name="vllm:prompt_tokens_total", + documentation="Number of prefill tokens processed.", + labelnames=labelnames).labels(*labelvalues) + + self.counter_generation_tokens = prometheus_client.Counter( + name="vllm:generation_tokens_total", + documentation="Number of generation tokens processed.", + labelnames=labelnames).labels(*labelvalues) + + def log(self, scheduler_stats: SchedulerStats, + iteration_stats: IterationStats): """Log to prometheus.""" self.gauge_scheduler_running.set(scheduler_stats.num_running_reqs) self.gauge_scheduler_waiting.set(scheduler_stats.num_waiting_reqs) + self.counter_prompt_tokens.inc(iteration_stats.num_prompt_tokens) + self.counter_generation_tokens.inc( + iteration_stats.num_generation_tokens) + @staticmethod def _unregister_vllm_metrics(): # Unregister any existing vLLM collectors (for CI/CD