From 050b084cda905503081fad9c877a348aaa2b0587 Mon Sep 17 00:00:00 2001 From: Keyun Tong Date: Mon, 13 Jan 2025 11:39:26 -0800 Subject: [PATCH] async --- vllm/v1/outputs.py | 2 +- vllm/v1/sample/sampler.py | 31 ++- vllm/v1/utils.py | 60 +++-- vllm/v1/worker/gpu_model_runner.py | 386 +++++++++++++++++------------ vllm/v1/worker/gpu_worker.py | 92 ++++--- 5 files changed, 336 insertions(+), 235 deletions(-) diff --git a/vllm/v1/outputs.py b/vllm/v1/outputs.py index acc3a944e21b9..32aee44e3f374 100644 --- a/vllm/v1/outputs.py +++ b/vllm/v1/outputs.py @@ -8,7 +8,7 @@ class SamplerOutput: # [num_reqs] - sampled_token_ids: List[int] + sampled_token_ids: torch.Tensor # [num_reqs, max_num_logprobs + 1] logprob_token_ids: Optional[torch.Tensor] diff --git a/vllm/v1/sample/sampler.py b/vllm/v1/sample/sampler.py index 7cd42ca211a22..504645d27c191 100644 --- a/vllm/v1/sample/sampler.py +++ b/vllm/v1/sample/sampler.py @@ -1,4 +1,5 @@ """A layer that samples the next tokens from the model's outputs.""" + from typing import Tuple import torch @@ -6,8 +7,7 @@ from vllm.v1.outputs import SamplerOutput from vllm.v1.sample.metadata import SamplingMetadata -from vllm.v1.sample.ops.penalties import (apply_all_penalties, - apply_min_token_penalties) +from vllm.v1.sample.ops.penalties import apply_all_penalties, apply_min_token_penalties from vllm.v1.sample.ops.topk_topp_sampler import TopKTopPSampler _SAMPLING_EPS = 1e-5 @@ -34,7 +34,8 @@ def forward( # modify the logits tensor in-place (and we don't want to clone # the logits tensor for memory efficiency). topk_logprobs, topk_indices = self.get_topk_logprobs( - logits, sampling_metadata) + logits, sampling_metadata + ) else: topk_logprobs = None topk_indices = None @@ -50,9 +51,8 @@ def forward( # Use int32 to reduce the tensor size. sampled = sampled.to(torch.int32) - # NOTE: CPU-GPU synchronization happens here. sampler_output = SamplerOutput( - sampled_token_ids=sampled.tolist(), + sampled_token_ids=sampled, logprob_token_ids=topk_indices, logprobs=topk_logprobs, prompt_logprob_token_ids=None, @@ -79,8 +79,7 @@ def sample( logits: torch.Tensor, sampling_metadata: SamplingMetadata, ) -> torch.Tensor: - assert not (sampling_metadata.all_greedy - and sampling_metadata.all_random) + assert not (sampling_metadata.all_greedy and sampling_metadata.all_random) if sampling_metadata.all_greedy: return self.greedy_sample(logits) @@ -112,7 +111,8 @@ def get_topk_logprobs( # FIXME: Mask the sampled token_id, get topk logprobs, # and concatenate the topk with the sampled token_id. topk_logprobs, topk_indices = torch.topk( - logprobs, sampling_metadata.max_num_logprobs, dim=-1) + logprobs, sampling_metadata.max_num_logprobs, dim=-1 + ) # Use int32 to reduce the tensor size. topk_indices = topk_indices.to(torch.int32) return topk_logprobs, topk_indices @@ -122,15 +122,20 @@ def apply_penalties( logits: torch.Tensor, sampling_metadata: SamplingMetadata, ) -> torch.Tensor: - apply_min_token_penalties(logits, sampling_metadata.output_token_ids, - sampling_metadata.stop_token_ids, - sampling_metadata.min_tokens) + apply_min_token_penalties( + logits, + sampling_metadata.output_token_ids, + sampling_metadata.stop_token_ids, + sampling_metadata.min_tokens, + ) if not sampling_metadata.no_penalties: assert sampling_metadata.prompt_token_ids is not None logits = apply_all_penalties( - logits, sampling_metadata.prompt_token_ids, + logits, + sampling_metadata.prompt_token_ids, sampling_metadata.presence_penalties, sampling_metadata.frequency_penalties, sampling_metadata.repetition_penalties, - sampling_metadata.output_token_ids) + sampling_metadata.output_token_ids, + ) return logits diff --git a/vllm/v1/utils.py b/vllm/v1/utils.py index b0a7affbebb7e..15ce0e4560a8b 100644 --- a/vllm/v1/utils.py +++ b/vllm/v1/utils.py @@ -1,9 +1,21 @@ +import asyncio import multiprocessing import os import weakref from collections.abc import Sequence -from typing import (Any, Callable, Dict, Generic, List, Optional, TypeVar, - Union, overload) +from typing import ( + Any, + Callable, + Dict, + Generic, + List, + Optional, + overload, + TypeVar, + Union, +) + +import torch from vllm.logger import init_logger from vllm.utils import get_mp_context, kill_process_tree @@ -13,6 +25,12 @@ T = TypeVar("T") +async def cuda_stream_sync() -> None: + await asyncio.get_running_loop().run_in_executor( + executor=None, func=torch.cuda.current_stream().synchronize + ) + + class ConstantList(Generic[T], Sequence): def __init__(self, x: List[T]) -> None: @@ -36,31 +54,23 @@ def remove(self, item): def clear(self): raise Exception("Cannot clear a constant list") - def index(self, - item: T, - start: int = 0, - stop: Optional[int] = None) -> int: - return self._x.index(item, start, - stop if stop is not None else len(self._x)) + def index(self, item: T, start: int = 0, stop: Optional[int] = None) -> int: + return self._x.index(item, start, stop if stop is not None else len(self._x)) @overload - def __getitem__(self, item: int) -> T: - ... + def __getitem__(self, item: int) -> T: ... @overload - def __getitem__(self, s: slice, /) -> List[T]: - ... + def __getitem__(self, s: slice, /) -> List[T]: ... def __getitem__(self, item: Union[int, slice]) -> Union[T, List[T]]: return self._x[item] @overload - def __setitem__(self, item: int, value: T): - ... + def __setitem__(self, item: int, value: T): ... @overload - def __setitem__(self, s: slice, value: T, /): - ... + def __setitem__(self, s: slice, value: T, /): ... def __setitem__(self, item: Union[int, slice], value: Union[T, List[T]]): raise Exception("Cannot set item in a constant list") @@ -95,23 +105,27 @@ def __init__( context = get_mp_context() reader, writer = context.Pipe(duplex=False) - assert ("ready_pipe" not in process_kwargs - and "input_path" not in process_kwargs - and "output_path" not in process_kwargs) + assert ( + "ready_pipe" not in process_kwargs + and "input_path" not in process_kwargs + and "output_path" not in process_kwargs + ) process_kwargs["ready_pipe"] = writer process_kwargs["input_path"] = input_path process_kwargs["output_path"] = output_path # Run busy loop in background process. self.proc = context.Process(target=target_fn, kwargs=process_kwargs) - self._finalizer = weakref.finalize(self, shutdown, self.proc, - input_path, output_path) + self._finalizer = weakref.finalize( + self, shutdown, self.proc, input_path, output_path + ) self.proc.start() # Wait for startup. if reader.recv()["status"] != "READY": - raise RuntimeError(f"{process_name} initialization failed. " - "See root cause above.") + raise RuntimeError( + f"{process_name} initialization failed. " "See root cause above." + ) def shutdown(self): self._finalizer() diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py index fb87dc5a8222a..df87ded0abf6c 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -1,6 +1,6 @@ import gc import time -from typing import TYPE_CHECKING, Dict, List, Tuple, cast +from typing import cast, Dict, List, Tuple, TYPE_CHECKING import numpy as np import torch @@ -15,14 +15,22 @@ from vllm.model_executor.model_loader import get_model from vllm.multimodal import MULTIMODAL_REGISTRY, MultiModalKwargs from vllm.sampling_params import SamplingType -from vllm.utils import (STR_DTYPE_TO_TORCH_DTYPE, DeviceMemoryProfiler, - LayerBlockType, bind_kv_cache, cdiv, - is_pin_memory_available) -from vllm.v1.attention.backends.flash_attn import (FlashAttentionBackend, - FlashAttentionMetadata) +from vllm.utils import ( + bind_kv_cache, + cdiv, + DeviceMemoryProfiler, + is_pin_memory_available, + LayerBlockType, + STR_DTYPE_TO_TORCH_DTYPE, +) +from vllm.v1.attention.backends.flash_attn import ( + FlashAttentionBackend, + FlashAttentionMetadata, +) from vllm.v1.engine.mm_input_mapper import MMInputMapperClient from vllm.v1.outputs import ModelRunnerOutput from vllm.v1.sample.metadata import SamplingMetadata +from vllm.v1.utils import cuda_stream_sync from vllm.v1.worker.gpu_input_batch import CachedRequestState, InputBatch if TYPE_CHECKING: @@ -59,8 +67,7 @@ def __init__( if cache_config.cache_dtype == "auto": self.kv_cache_dtype = self.dtype else: - self.kv_cache_dtype = STR_DTYPE_TO_TORCH_DTYPE[ - cache_config.cache_dtype] + self.kv_cache_dtype = STR_DTYPE_TO_TORCH_DTYPE[cache_config.cache_dtype] self.is_multimodal_model = model_config.is_multimodal_model self.sliding_window = model_config.get_sliding_window() @@ -72,9 +79,9 @@ def __init__( # Model-related. self.num_attn_layers = model_config.get_num_layers_by_block_type( - parallel_config, LayerBlockType.attention) - self.num_query_heads = model_config.get_num_attention_heads( - parallel_config) + parallel_config, LayerBlockType.attention + ) + self.num_query_heads = model_config.get_num_attention_heads(parallel_config) self.num_kv_heads = model_config.get_num_kv_heads(parallel_config) self.head_size = model_config.get_head_size() self.hidden_size = model_config.get_hidden_size() @@ -109,63 +116,76 @@ def __init__( vocab_size=model_config.get_vocab_size(), ) - self.use_cuda_graph = (self.vllm_config.compilation_config.level - == CompilationLevel.PIECEWISE - and not self.model_config.enforce_eager) + self.use_cuda_graph = ( + self.vllm_config.compilation_config.level == CompilationLevel.PIECEWISE + and not self.model_config.enforce_eager + ) # TODO(woosuk): Provide an option to tune the max cudagraph batch size. # The convention is different. # self.cudagraph_batch_sizes sorts in ascending order. # The batch sizes in the config are in descending order. self.cudagraph_batch_sizes = list( - reversed(self.vllm_config.compilation_config.capture_sizes)) + reversed(self.vllm_config.compilation_config.capture_sizes) + ) # Cache the device properties. self.device_properties = torch.cuda.get_device_properties(self.device) self.num_sms = self.device_properties.multi_processor_count # Persistent buffers for CUDA graphs. - self.input_ids = torch.zeros(self.max_num_tokens, - dtype=torch.int32, - device=self.device) - self.positions = torch.zeros(self.max_num_tokens, - dtype=torch.int64, - device=self.device) + self.input_ids = torch.zeros( + self.max_num_tokens, dtype=torch.int32, device=self.device + ) + self.positions = torch.zeros( + self.max_num_tokens, dtype=torch.int64, device=self.device + ) self.inputs_embeds = torch.zeros( (self.max_num_tokens, self.hidden_size), dtype=self.dtype, - device=self.device) + device=self.device, + ) # OPTIMIZATION: Cache the tensors rather than creating them every step. - self.arange_np = np.arange(max(self.max_num_reqs + 1, - self.max_model_len), - dtype=np.int32) + self.arange_np = np.arange( + max(self.max_num_reqs + 1, self.max_model_len), dtype=np.int32 + ) # NOTE(woosuk): These tensors are "stateless", i.e., they are literally # a faster version of creating a new tensor every time. Thus, we should # not make any assumptions about the values in these tensors. - self.input_ids_cpu = torch.zeros(self.max_num_tokens, - dtype=torch.int32, - device="cpu", - pin_memory=self.pin_memory) + self.input_ids_cpu = torch.zeros( + self.max_num_tokens, + dtype=torch.int32, + device="cpu", + pin_memory=self.pin_memory, + ) self.input_ids_np = self.input_ids_cpu.numpy() - self.positions_cpu = torch.zeros(self.max_num_tokens, - dtype=torch.int64, - device="cpu", - pin_memory=self.pin_memory) + self.positions_cpu = torch.zeros( + self.max_num_tokens, + dtype=torch.int64, + device="cpu", + pin_memory=self.pin_memory, + ) self.positions_np = self.positions_cpu.numpy() - self.slot_mapping_cpu = torch.zeros(self.max_num_tokens, - dtype=torch.int32, - device="cpu", - pin_memory=self.pin_memory) + self.slot_mapping_cpu = torch.zeros( + self.max_num_tokens, + dtype=torch.int32, + device="cpu", + pin_memory=self.pin_memory, + ) self.slot_mapping_np = self.slot_mapping_cpu.numpy() - self.query_start_loc_cpu = torch.zeros(self.max_num_reqs + 1, - dtype=torch.int32, - device="cpu", - pin_memory=self.pin_memory) + self.query_start_loc_cpu = torch.zeros( + self.max_num_reqs + 1, + dtype=torch.int32, + device="cpu", + pin_memory=self.pin_memory, + ) self.query_start_loc_np = self.query_start_loc_cpu.numpy() - self.seq_start_loc_cpu = torch.zeros(self.max_num_reqs + 1, - dtype=torch.int32, - device="cpu", - pin_memory=self.pin_memory) + self.seq_start_loc_cpu = torch.zeros( + self.max_num_reqs + 1, + dtype=torch.int32, + device="cpu", + pin_memory=self.pin_memory, + ) self.seq_start_loc_np = self.seq_start_loc_cpu.numpy() def _update_states(self, scheduler_output: "SchedulerOutput") -> None: @@ -203,7 +223,8 @@ def _update_states(self, scheduler_output: "SchedulerOutput") -> None: # Update the num_computed_tokens. req_state.num_computed_tokens = req_data.num_computed_tokens self.input_batch.num_computed_tokens_cpu[req_index] = ( - req_data.num_computed_tokens) + req_data.num_computed_tokens + ) # Update the block table. num_new_blocks = len(req_data.new_block_ids) @@ -211,8 +232,9 @@ def _update_states(self, scheduler_output: "SchedulerOutput") -> None: continue start_index = len(req_state.block_ids) req_state.block_ids.extend(req_data.new_block_ids) - self.input_batch.block_table.append_row(req_index, start_index, - req_data.new_block_ids) + self.input_batch.block_table.append_row( + req_index, start_index, req_data.new_block_ids + ) req_ids_to_add: List[str] = [] # Add new requests to the cached states. @@ -283,40 +305,42 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): assert req_id is not None num_tokens = scheduler_output.num_scheduled_tokens[req_id] num_scheduled_tokens.append(num_tokens) - max_num_scheduled_tokens = max(max_num_scheduled_tokens, - num_tokens) + max_num_scheduled_tokens = max(max_num_scheduled_tokens, num_tokens) num_scheduled_tokens = np.array(num_scheduled_tokens, dtype=np.int32) assert max_num_scheduled_tokens > 0 # Get request indices. # E.g., [2, 5, 3] -> [0, 0, 1, 1, 1, 1, 1, 2, 2, 2] - req_indices = np.repeat(self.arange_np[:num_reqs], - num_scheduled_tokens) + req_indices = np.repeat(self.arange_np[:num_reqs], num_scheduled_tokens) # Get batched arange. # E.g., [2, 5, 3] -> [0, 1, 0, 1, 2, 3, 4, 0, 1, 2] - arange = np.concatenate( - [self.arange_np[:n] for n in num_scheduled_tokens]) + arange = np.concatenate([self.arange_np[:n] for n in num_scheduled_tokens]) # Get positions. positions_np = self.positions_np[:total_num_scheduled_tokens] - np.add(self.input_batch.num_computed_tokens_cpu[req_indices], - arange, - out=positions_np) + np.add( + self.input_batch.num_computed_tokens_cpu[req_indices], + arange, + out=positions_np, + ) # Get token indices. # E.g., [0, 1, 0, 1, 2, 3, 4, 0, 1, 2] # -> [0, 1, M, M + 1, M + 2, M + 3, M + 4, 2 * M, 2 * M + 1, 2 * M + 2] # where M is the max_model_len. - token_indices = (positions_np + - req_indices * self.input_batch.token_ids_cpu.shape[1]) + token_indices = ( + positions_np + req_indices * self.input_batch.token_ids_cpu.shape[1] + ) # NOTE(woosuk): We use torch.index_select instead of np.take here # because torch.index_select is much faster than np.take for large # tensors. - torch.index_select(self.input_batch.token_ids_cpu_tensor.flatten(), - 0, - torch.from_numpy(token_indices), - out=self.input_ids_cpu[:total_num_scheduled_tokens]) + torch.index_select( + self.input_batch.token_ids_cpu_tensor.flatten(), + 0, + torch.from_numpy(token_indices), + out=self.input_ids_cpu[:total_num_scheduled_tokens], + ) # Calculate the slot mapping. # E.g., [0, 1, 0, 1, 2, 3, 4, 0, 1, 2] @@ -324,44 +348,53 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): # where K is the max_num_blocks_per_req and the block size is 2. # NOTE(woosuk): We can't simply use `token_indices // block_size` here # because M (max_model_len) is not necessarily divisible by block_size. - block_table_indices = (req_indices * self.max_num_blocks_per_req + - positions_np // self.block_size) + block_table_indices = ( + req_indices * self.max_num_blocks_per_req + positions_np // self.block_size + ) # NOTE(woosuk): We use torch.index_select instead of np.take here # because torch.index_select is much faster than np.take for large # tensors. block_table_cpu = self.input_batch.block_table.get_cpu_tensor() block_numbers = block_table_cpu.flatten()[block_table_indices].numpy() block_offsets = positions_np % self.block_size - np.add(block_numbers * self.block_size, - block_offsets, - out=self.slot_mapping_np[:total_num_scheduled_tokens]) + np.add( + block_numbers * self.block_size, + block_offsets, + out=self.slot_mapping_np[:total_num_scheduled_tokens], + ) # Prepare the attention metadata. self.query_start_loc_np[0] = 0 - np.cumsum(num_scheduled_tokens, - out=self.query_start_loc_np[1:num_reqs + 1]) + np.cumsum(num_scheduled_tokens, out=self.query_start_loc_np[1 : num_reqs + 1]) - seq_lens = (self.input_batch.num_computed_tokens_cpu[:num_reqs] + - num_scheduled_tokens) + seq_lens = ( + self.input_batch.num_computed_tokens_cpu[:num_reqs] + num_scheduled_tokens + ) max_seq_len = seq_lens.max() self.seq_start_loc_np[0] = 0 - np.cumsum(seq_lens, out=self.seq_start_loc_np[1:num_reqs + 1]) + np.cumsum(seq_lens, out=self.seq_start_loc_np[1 : num_reqs + 1]) # Copy the tensors to the GPU. self.input_ids[:total_num_scheduled_tokens].copy_( - self.input_ids_cpu[:total_num_scheduled_tokens], non_blocking=True) + self.input_ids_cpu[:total_num_scheduled_tokens], non_blocking=True + ) self.positions[:total_num_scheduled_tokens].copy_( - self.positions_cpu[:total_num_scheduled_tokens], non_blocking=True) - query_start_loc = self.query_start_loc_cpu[:num_reqs + 1].to( - self.device, non_blocking=True) - seq_start_loc = self.seq_start_loc_cpu[:num_reqs + 1].to( - self.device, non_blocking=True) - slot_mapping = self.slot_mapping_cpu[:total_num_scheduled_tokens].to( - self.device, non_blocking=True).long() + self.positions_cpu[:total_num_scheduled_tokens], non_blocking=True + ) + query_start_loc = self.query_start_loc_cpu[: num_reqs + 1].to( + self.device, non_blocking=True + ) + seq_start_loc = self.seq_start_loc_cpu[: num_reqs + 1].to( + self.device, non_blocking=True + ) + slot_mapping = ( + self.slot_mapping_cpu[:total_num_scheduled_tokens] + .to(self.device, non_blocking=True) + .long() + ) # Prepare for cascade attention if needed. - common_prefix_len = (scheduler_output.num_common_prefix_blocks * - self.block_size) + common_prefix_len = scheduler_output.num_common_prefix_blocks * self.block_size if common_prefix_len == 0: # Common case. use_cascade = False @@ -407,10 +440,10 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): # this case. common_prefix_len = min( common_prefix_len, - self.input_batch.num_computed_tokens_cpu[:num_reqs].min()) + self.input_batch.num_computed_tokens_cpu[:num_reqs].min(), + ) # common_prefix_len should be a multiple of the block size. - common_prefix_len = (common_prefix_len // self.block_size * - self.block_size) + common_prefix_len = common_prefix_len // self.block_size * self.block_size use_cascade = FlashAttentionBackend.use_cascade_attention( common_prefix_len=common_prefix_len, query_lens=num_scheduled_tokens, @@ -424,17 +457,16 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): if use_cascade: # TODO: Optimize. cu_prefix_query_lens = torch.tensor( - [0, total_num_scheduled_tokens], - dtype=torch.int32, - device=self.device) - cu_prefix_kv_lens = torch.tensor([0, common_prefix_len], - dtype=torch.int32, - device=self.device) + [0, total_num_scheduled_tokens], dtype=torch.int32, device=self.device + ) + cu_prefix_kv_lens = torch.tensor( + [0, common_prefix_len], dtype=torch.int32, device=self.device + ) cu_suffix_kv_lens = ( - self.seq_start_loc_np[:num_reqs + 1] - - self.arange_np[:num_reqs + 1] * common_prefix_len) - cu_suffix_kv_lens = torch.from_numpy(cu_suffix_kv_lens).to( - self.device) + self.seq_start_loc_np[: num_reqs + 1] + - self.arange_np[: num_reqs + 1] * common_prefix_len + ) + cu_suffix_kv_lens = torch.from_numpy(cu_suffix_kv_lens).to(self.device) else: cu_prefix_query_lens = None cu_prefix_kv_lens = None @@ -446,8 +478,7 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): query_start_loc=query_start_loc, max_seq_len=max_seq_len, seq_start_loc=seq_start_loc, - block_table=( - self.input_batch.block_table.get_device_tensor()[:num_reqs]), + block_table=(self.input_batch.block_table.get_device_tensor()[:num_reqs]), slot_mapping=slot_mapping, use_cascade=use_cascade, common_prefix_len=common_prefix_len, @@ -468,19 +499,21 @@ def _prepare_sampling( scheduler_output: "SchedulerOutput", ) -> SamplingMetadata: skip_copy = True - if (scheduler_output.finished_req_ids - or scheduler_output.preempted_req_ids): + if scheduler_output.finished_req_ids or scheduler_output.preempted_req_ids: skip_copy = False - if (scheduler_output.scheduled_new_reqs - or scheduler_output.scheduled_resumed_reqs): + if ( + scheduler_output.scheduled_new_reqs + or scheduler_output.scheduled_resumed_reqs + ): skip_copy = False # Create the sampling metadata. - req_id_output_token_ids: Dict[str, List[int]] = \ - {req_id: req.output_token_ids \ - for req_id, req in self.requests.items()} + req_id_output_token_ids: Dict[str, List[int]] = { + req_id: req.output_token_ids for req_id, req in self.requests.items() + } sampling_metadata = self.input_batch.make_sampling_metadata( - req_id_output_token_ids, skip_copy) + req_id_output_token_ids, skip_copy + ) return sampling_metadata def _execute_encoder(self, scheduler_output: "SchedulerOutput"): @@ -497,8 +530,9 @@ def _execute_encoder(self, scheduler_output: "SchedulerOutput"): mm_inputs.append(req_state.mm_inputs[input_id]) req_input_ids.append((req_id, input_id)) batched_mm_inputs = MultiModalKwargs.batch(mm_inputs) - batched_mm_inputs = MultiModalKwargs.as_kwargs(batched_mm_inputs, - device=self.device) + batched_mm_inputs = MultiModalKwargs.as_kwargs( + batched_mm_inputs, device=self.device + ) # Run the encoder. # `encoder_outputs` is either of the following: @@ -507,8 +541,7 @@ def _execute_encoder(self, scheduler_output: "SchedulerOutput"): # 2. A list (length: num_images) of tensors, each of shape # [feature_size, hidden_size] in case when the feature size is # dynamic depending on input images. - encoder_outputs = self.model.get_multimodal_embeddings( - **batched_mm_inputs) + encoder_outputs = self.model.get_multimodal_embeddings(**batched_mm_inputs) # Cache the encoder outputs. for (req_id, input_id), output in zip(req_input_ids, encoder_outputs): @@ -524,8 +557,7 @@ def _gather_encoder_outputs( num_reqs = self.input_batch.num_reqs for req_id in self.input_batch.req_ids[:num_reqs]: assert req_id is not None - num_scheduled_tokens = scheduler_output.num_scheduled_tokens[ - req_id] + num_scheduled_tokens = scheduler_output.num_scheduled_tokens[req_id] req_state = self.requests[req_id] num_computed_tokens = req_state.num_computed_tokens mm_positions = req_state.mm_positions @@ -548,7 +580,8 @@ def _gather_encoder_outputs( start_idx = max(num_computed_tokens - start_pos, 0) end_idx = min( num_computed_tokens - start_pos + num_scheduled_tokens, - num_encoder_tokens) + num_encoder_tokens, + ) assert start_idx < end_idx assert req_id in self.encoder_cache assert i in self.encoder_cache[req_id] @@ -556,8 +589,14 @@ def _gather_encoder_outputs( encoder_outputs.append(encoder_output[start_idx:end_idx]) return encoder_outputs - @torch.inference_mode() - def execute_model( + async def execute_model( + self, + scheduler_output: "SchedulerOutput", + ) -> ModelRunnerOutput: + with torch.inference_mode(): + return await self._execute_model(scheduler_output) + + async def _execute_model( self, scheduler_output: "SchedulerOutput", ) -> ModelRunnerOutput: @@ -573,12 +612,13 @@ def execute_model( # Prepare the decoder inputs. attn_metadata, logits_indices = self._prepare_inputs(scheduler_output) num_scheduled_tokens = scheduler_output.total_num_scheduled_tokens - if (self.use_cuda_graph - and num_scheduled_tokens <= self.cudagraph_batch_sizes[-1]): + if ( + self.use_cuda_graph + and num_scheduled_tokens <= self.cudagraph_batch_sizes[-1] + ): # Use piecewise CUDA graphs. # Add padding to the batch size. - num_input_tokens = self.vllm_config.pad_for_cudagraph( - num_scheduled_tokens) + num_input_tokens = self.vllm_config.pad_for_cudagraph(num_scheduled_tokens) else: # Eager mode. num_input_tokens = num_scheduled_tokens @@ -591,7 +631,8 @@ def execute_model( input_ids = self.input_ids[:num_scheduled_tokens] if encoder_outputs: inputs_embeds = self.model.get_input_embeddings( - input_ids, encoder_outputs) + input_ids, encoder_outputs + ) else: inputs_embeds = self.model.get_input_embeddings(input_ids) # TODO(woosuk): Avoid the copy. Optimize. @@ -627,15 +668,18 @@ def execute_model( sampling_metadata=sampling_metadata, ) - sampled_token_ids = sampler_output.sampled_token_ids + await cuda_stream_sync() + sampled_token_ids: List[int] = sampler_output.sampled_token_ids.tolist() # TODO(woosuk): The following loop can be slow since it iterates over # the requests one by one. Optimize. num_reqs = self.input_batch.num_reqs for i, req_id in enumerate(self.input_batch.req_ids[:num_reqs]): assert req_id is not None req_state = self.requests[req_id] - seq_len = (req_state.num_computed_tokens + - scheduler_output.num_scheduled_tokens[req_id]) + seq_len = ( + req_state.num_computed_tokens + + scheduler_output.num_scheduled_tokens[req_id] + ) assert seq_len <= req_state.num_tokens if seq_len == req_state.num_tokens: # Append the sampled token to the output token ids. @@ -662,8 +706,8 @@ def execute_model( # num_reqs entries should be non-None assert all( - req_id is not None for req_id in - self.input_batch.req_ids[:num_reqs]), "req_ids contains None" + req_id is not None for req_id in self.input_batch.req_ids[:num_reqs] + ), "req_ids contains None" req_ids = cast(List[str], self.input_batch.req_ids[:num_reqs]) model_runner_output = ModelRunnerOutput( @@ -681,8 +725,9 @@ def load_model(self) -> None: self.model = get_model(vllm_config=self.vllm_config) self.model_memory_usage = m.consumed_memory - logger.info("Loading model weights took %.4f GB", - self.model_memory_usage / float(2**30)) + logger.info( + "Loading model weights took %.4f GB", self.model_memory_usage / float(2**30) + ) @torch.inference_mode() def _dummy_run( @@ -735,39 +780,44 @@ def profile_run(self) -> None: # modality with the max possible input tokens even when # it supports multiple. max_tokens_by_modality_dict = self.mm_registry.get_max_tokens_per_item_by_modality( # noqa: E501 - self.model_config) + self.model_config + ) dummy_data_modality, max_tokens_per_mm_item = max( - max_tokens_by_modality_dict.items(), key=lambda item: item[1]) + max_tokens_by_modality_dict.items(), key=lambda item: item[1] + ) # Check how many items of this modality can be supported by # the encoder cache budget. - encoder_cache_budget = min(self.max_num_encoder_input_tokens, - self.encoder_cache_size) - max_num_mm_items_encoder_budget = encoder_cache_budget // \ - max_tokens_per_mm_item + encoder_cache_budget = min( + self.max_num_encoder_input_tokens, self.encoder_cache_size + ) + max_num_mm_items_encoder_budget = ( + encoder_cache_budget // max_tokens_per_mm_item + ) # TODO: Allow users to set encoder_cache_budget in case this # happens. assert max_num_mm_items_encoder_budget > 0, ( f"Encoder cache budget={encoder_cache_budget} is too small to " f"support the maximum possible size of multimodal embeddings" - f"={max_tokens_per_mm_item}.") + f"={max_tokens_per_mm_item}." + ) # Check how many items of this modality can be supported by # the decoder budget. max_mm_items_per_req = max( - self.mm_registry.get_mm_limits_per_prompt( - self.model_config).values()) + self.mm_registry.get_mm_limits_per_prompt(self.model_config).values() + ) # NOTE: We do not consider max_num_batched_tokens on purpose # because the multimodal embeddings can be generated in advance # and chunked prefilled. - max_num_mm_items_decoder_budget = self.max_num_reqs * \ - max_mm_items_per_req + max_num_mm_items_decoder_budget = self.max_num_reqs * max_mm_items_per_req - max_num_mm_items = min(max_num_mm_items_encoder_budget, - max_num_mm_items_decoder_budget) + max_num_mm_items = min( + max_num_mm_items_encoder_budget, max_num_mm_items_decoder_budget + ) # Dummy data definition in V0 may contain multiple multimodal items # (e.g, multiple images) for a single request, therefore here we @@ -779,7 +829,8 @@ def profile_run(self) -> None: # `MultiModalKwargsItem` from the desired modality to profile on. if isinstance(dummy_mm_data, MultiModalKwargs): dummy_mm_item = dummy_mm_data.get_item( - modality=dummy_data_modality, item_index=0) + modality=dummy_data_modality, item_index=0 + ) dummy_mm_kwargs = MultiModalKwargs.from_items([dummy_mm_item]) # Case when models have dummy data explicitly defined as @@ -792,32 +843,38 @@ def profile_run(self) -> None: mm_data=dummy_mm_data, mm_hashes=None, mm_processor_kwargs=None, - precomputed_mm_inputs=None) + precomputed_mm_inputs=None, + ) dummy_mm_kwargs = mm_kwargs_list[0] batched_dummy_mm_inputs = MultiModalKwargs.batch( - [dummy_mm_kwargs] * max_num_mm_items) + [dummy_mm_kwargs] * max_num_mm_items + ) batched_dummy_mm_inputs = MultiModalKwargs.as_kwargs( - batched_dummy_mm_inputs, device=self.device) + batched_dummy_mm_inputs, device=self.device + ) # Run multimodal encoder. dummy_encoder_outputs = self.model.get_multimodal_embeddings( - **batched_dummy_mm_inputs) + **batched_dummy_mm_inputs + ) assert len(dummy_encoder_outputs) == max_num_mm_items, ( "Expected dimension 0 of encoder outputs to match the number " f"of multimodal data items: {max_num_mm_items}, got " f"{len(dummy_encoder_outputs)=} instead. This is most likely " "due to the 'get_multimodal_embeddings' method of the model " - "not implemented correctly.") + "not implemented correctly." + ) # Cache the dummy encoder outputs. self.encoder_cache["tmp"] = dict(enumerate(dummy_encoder_outputs)) # Trigger compilation for general shape. - hidden_states = self._dummy_run(self.model, self.max_num_tokens, - dummy_kv_caches) + hidden_states = self._dummy_run( + self.model, self.max_num_tokens, dummy_kv_caches + ) logits = self.model.compute_logits(hidden_states, None) - logits = logits[:self.max_num_tokens] + logits = logits[: self.max_num_tokens] # TODO(woosuk): Consider the memory usage of the sampler. torch.cuda.synchronize() del hidden_states, logits @@ -827,8 +884,9 @@ def profile_run(self) -> None: def capture_model(self) -> None: if not self.use_cuda_graph: logger.warning( - "Skipping CUDA graph capture. Please add " - "-O %s to use CUDA graphs.", CompilationLevel.PIECEWISE) + "Skipping CUDA graph capture. Please add " "-O %s to use CUDA graphs.", + CompilationLevel.PIECEWISE, + ) return start_time = time.perf_counter() @@ -839,8 +897,9 @@ def capture_model(self) -> None: # can reuse the memory pool allocated for the large shapes. with graph_capture(device=self.device): for num_tokens in reversed(self.cudagraph_batch_sizes): - for _ in range(self.vllm_config.compilation_config. - cudagraph_num_of_warmups): + for _ in range( + self.vllm_config.compilation_config.cudagraph_num_of_warmups + ): self._dummy_run(self.model, num_tokens, self.kv_caches) self._dummy_run(self.model, num_tokens, self.kv_caches) @@ -849,18 +908,23 @@ def capture_model(self) -> None: elapsed_time = end_time - start_time cuda_graph_size = start_free_gpu_memory - end_free_gpu_memory # This usually takes 5~20 seconds. - logger.info("Graph capturing finished in %.0f secs, took %.2f GiB", - elapsed_time, cuda_graph_size / (1 << 30)) + logger.info( + "Graph capturing finished in %.0f secs, took %.2f GiB", + elapsed_time, + cuda_graph_size / (1 << 30), + ) def initialize_kv_cache(self, num_blocks: int) -> None: assert len(self.kv_caches) == 0 kv_cache_shape = FlashAttentionBackend.get_kv_cache_shape( - num_blocks, self.block_size, self.num_kv_heads, self.head_size) + num_blocks, self.block_size, self.num_kv_heads, self.head_size + ) for _ in range(self.num_attn_layers): self.kv_caches.append( - torch.zeros(kv_cache_shape, - dtype=self.kv_cache_dtype, - device=self.device)) + torch.zeros( + kv_cache_shape, dtype=self.kv_cache_dtype, device=self.device + ) + ) bind_kv_cache( - self.vllm_config.compilation_config.static_forward_context, - [self.kv_caches]) + self.vllm_config.compilation_config.static_forward_context, [self.kv_caches] + ) diff --git a/vllm/v1/worker/gpu_worker.py b/vllm/v1/worker/gpu_worker.py index eb2f39e00a99e..38d903f92037c 100644 --- a/vllm/v1/worker/gpu_worker.py +++ b/vllm/v1/worker/gpu_worker.py @@ -1,20 +1,23 @@ """A GPU worker class.""" + import gc import os -from typing import TYPE_CHECKING, Optional, Tuple +from typing import Optional, Tuple, TYPE_CHECKING import torch import torch.distributed import vllm.envs as envs from vllm.config import CacheConfig, ModelConfig, ParallelConfig, VllmConfig -from vllm.distributed import (ensure_model_parallel_initialized, - init_distributed_environment, - set_custom_all_reduce) +from vllm.distributed import ( + ensure_model_parallel_initialized, + init_distributed_environment, + set_custom_all_reduce, +) from vllm.logger import init_logger from vllm.model_executor import set_random_seed from vllm.platforms import current_platform -from vllm.utils import STR_DTYPE_TO_TORCH_DTYPE, LayerBlockType, get_dtype_size +from vllm.utils import get_dtype_size, LayerBlockType, STR_DTYPE_TO_TORCH_DTYPE from vllm.v1.core.scheduler import SchedulerOutput from vllm.v1.outputs import ModelRunnerOutput from vllm.v1.worker.gpu_model_runner import GPUModelRunner @@ -56,14 +59,17 @@ def __init__( if self.model_config.trust_remote_code: # note: lazy import to avoid importing torch before initializing from vllm.utils import init_cached_hf_modules + init_cached_hf_modules() # Torch profiler. Enabled and configured through env vars: # VLLM_TORCH_PROFILER_DIR=/path/to/save/trace if envs.VLLM_TORCH_PROFILER_DIR: torch_profiler_trace_dir = envs.VLLM_TORCH_PROFILER_DIR - logger.info("Profiling enabled. Traces will be saved to: %s", - torch_profiler_trace_dir) + logger.info( + "Profiling enabled. Traces will be saved to: %s", + torch_profiler_trace_dir, + ) self.profiler = torch.profiler.profile( activities=[ torch.profiler.ProfilerActivity.CPU, @@ -71,7 +77,9 @@ def __init__( ], with_stack=True, on_trace_ready=torch.profiler.tensorboard_trace_handler( - torch_profiler_trace_dir, use_gzip=True)) + torch_profiler_trace_dir, use_gzip=True + ), + ) else: self.profiler = None @@ -95,12 +103,14 @@ def initialize(self): torch.cuda.empty_cache() self.init_gpu_memory = torch.cuda.mem_get_info()[0] else: - raise RuntimeError( - f"Not support device type: {self.device_config.device}") + raise RuntimeError(f"Not support device type: {self.device_config.device}") # Initialize the distributed environment. - init_worker_distributed_environment(self.parallel_config, self.rank, - self.distributed_init_method, - self.local_rank) + init_worker_distributed_environment( + self.parallel_config, + self.rank, + self.distributed_init_method, + self.local_rank, + ) # Set random seed. set_random_seed(self.model_config.seed) @@ -140,7 +150,8 @@ def determine_num_available_blocks(self) -> Tuple[int, int]: "Error in memory profiling. " f"Initial free memory {self.init_gpu_memory}, current free memory" f" {free_gpu_memory}. This happens when the GPU memory was " - "not properly cleaned up before initializing the vLLM instance.") + "not properly cleaned up before initializing the vLLM instance." + ) # Get the peak memory allocation recorded by torch peak_memory = torch.cuda.memory_stats()["allocated_bytes.all.peak"] @@ -149,22 +160,22 @@ def determine_num_available_blocks(self) -> Tuple[int, int]: # gpu outside of `torch`. NCCL operations, for example, can use a few # GB during a forward pass torch.cuda.empty_cache() - torch_allocated_bytes = torch.cuda.memory_stats( - )["allocated_bytes.all.current"] - total_allocated_bytes = torch.cuda.mem_get_info( - )[1] - torch.cuda.mem_get_info()[0] + torch_allocated_bytes = torch.cuda.memory_stats()["allocated_bytes.all.current"] + total_allocated_bytes = ( + torch.cuda.mem_get_info()[1] - torch.cuda.mem_get_info()[0] + ) non_torch_allocations = total_allocated_bytes - torch_allocated_bytes if non_torch_allocations > 0: peak_memory += non_torch_allocations available_kv_cache_memory = ( - total_gpu_memory * self.cache_config.gpu_memory_utilization - - peak_memory) + total_gpu_memory * self.cache_config.gpu_memory_utilization - peak_memory + ) # Calculate the number of blocks that can be allocated with the # profiled peak memory. - cache_block_size = _get_cache_block_size(self.cache_config, - self.model_config, - self.parallel_config) + cache_block_size = _get_cache_block_size( + self.cache_config, self.model_config, self.parallel_config + ) num_gpu_blocks = int(available_kv_cache_memory // cache_block_size) num_gpu_blocks = max(num_gpu_blocks, 0) return num_gpu_blocks, 0 @@ -172,9 +183,11 @@ def determine_num_available_blocks(self) -> Tuple[int, int]: def initialize_cache(self, num_gpu_blocks: int) -> None: """Allocate GPU and CPU KV cache with the specified number of blocks.""" if num_gpu_blocks <= 0: - raise ValueError("No available memory for the cache blocks. " - "Try increasing `gpu_memory_utilization` when " - "initializing the engine.") + raise ValueError( + "No available memory for the cache blocks. " + "Try increasing `gpu_memory_utilization` when " + "initializing the engine." + ) max_seq_len = self.cache_config.block_size * num_gpu_blocks max_model_len = self.model_config.max_model_len @@ -184,7 +197,8 @@ def initialize_cache(self, num_gpu_blocks: int) -> None: "is larger than the maximum number of tokens that can be " f"stored in KV cache ({max_seq_len}). Try increasing " "`gpu_memory_utilization` or decreasing `max_model_len` when " - "initializing the engine.") + "initializing the engine." + ) self.model_runner.initialize_kv_cache(num_gpu_blocks) @@ -195,13 +209,13 @@ def compile_or_warm_up_model(self) -> None: # the model initialization and profiling. set_random_seed(self.model_config.seed) - @torch.inference_mode() - def execute_model( + async def execute_model( self, scheduler_output: "SchedulerOutput", ) -> Optional[ModelRunnerOutput]: - output = self.model_runner.execute_model(scheduler_output) - return output if self.rank == 0 else None + with torch.inference_mode(): + output = await self.model_runner.execute_model(scheduler_output) + return output if self.rank == 0 else None def profile(self, is_start: bool = True): if self.profiler is None: @@ -225,11 +239,13 @@ def init_worker_distributed_environment( """Initialize the distributed environment.""" set_custom_all_reduce(not parallel_config.disable_custom_all_reduce) - init_distributed_environment(parallel_config.world_size, rank, - distributed_init_method, local_rank) + init_distributed_environment( + parallel_config.world_size, rank, distributed_init_method, local_rank + ) - ensure_model_parallel_initialized(parallel_config.tensor_parallel_size, - parallel_config.pipeline_parallel_size) + ensure_model_parallel_initialized( + parallel_config.tensor_parallel_size, parallel_config.pipeline_parallel_size + ) def _check_if_gpu_supports_dtype(torch_dtype: torch.dtype): @@ -249,7 +265,8 @@ def _check_if_gpu_supports_dtype(torch_dtype: torch.dtype): "Bfloat16 is only supported on GPUs with compute capability " f"of at least 8.0. Your {gpu_name} GPU {compute_str}. " "You can use float16 instead by explicitly setting the" - "`dtype` flag in CLI, for example: --dtype=half.") + "`dtype` flag in CLI, for example: --dtype=half." + ) def _get_cache_block_size( @@ -260,7 +277,8 @@ def _get_cache_block_size( head_size = model_config.get_head_size() num_heads = model_config.get_num_kv_heads(parallel_config) num_attention_layers = model_config.get_num_layers_by_block_type( - parallel_config, LayerBlockType.attention) + parallel_config, LayerBlockType.attention + ) key_cache_block = cache_config.block_size * num_heads * head_size value_cache_block = key_cache_block