diff --git a/benchmark/profile_pipeline_api.py b/benchmark/profile_pipeline_api.py index be06d32ee2..39e8ba1942 100644 --- a/benchmark/profile_pipeline_api.py +++ b/benchmark/profile_pipeline_api.py @@ -61,7 +61,9 @@ def sample_requests(dataset_path: str, num_requests: int, class Engine: def __init__(self, model_path: str, engine_config, csv: str): - self.pipe = pipeline(model_path, backend_config=engine_config) + self.pipe = pipeline(model_path, + backend_config=engine_config, + log_level='ERROR') self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) diff --git a/lmdeploy/logger.py b/lmdeploy/logger.py index ce2680e66b..701611e4b5 100644 --- a/lmdeploy/logger.py +++ b/lmdeploy/logger.py @@ -21,16 +21,21 @@ def __init__(self, max_log_len: Optional[int]) -> None: self.max_log_len = max_log_len def log_prompt(self, session_id: int, prompt: str) -> None: + if not isinstance(prompt, str): + # Prompt may be a GPT4V message with base64 images; + # logging might be impractical due to length + return if self.max_log_len is not None: if prompt is not None: prompt = prompt[:self.max_log_len] - logger.info(f'session_id={session_id}, ' + logger.info(f'session={session_id}, ' f'prompt={prompt!r}') def log_inputs(self, session_id: int, prompt: Optional[str], prompt_token_ids: Optional[List[int]], gen_config: GenerationConfig, adapter_name: str) -> None: max_log_len = self.max_log_len + input_tokens = len(prompt_token_ids) if max_log_len is not None: if prompt is not None: prompt = prompt[:max_log_len] @@ -38,8 +43,9 @@ def log_inputs(self, session_id: int, prompt: Optional[str], if prompt_token_ids is not None: prompt_token_ids = prompt_token_ids[:max_log_len] - logger.info(f'session_id={session_id}, ' - f'prompt={prompt!r}, ' + logger.info(f'session={session_id}, ' + f'adapter_name={adapter_name}, ' + f'input_tokens={input_tokens}, ' f'gen_config={gen_config}, ' - f'prompt_token_id={prompt_token_ids}, ' - f'adapter_name={adapter_name}.') + f'prompt={prompt!r}, ' + f'prompt_token_id={prompt_token_ids}') diff --git a/lmdeploy/model.py b/lmdeploy/model.py index f7b80ed102..64cc79a198 100644 --- a/lmdeploy/model.py +++ b/lmdeploy/model.py @@ -443,11 +443,12 @@ def match(cls, model_path: str) -> Optional[str]: model_path (str): the model path used for matching. """ path = model_path.lower() - if all([c not in path for c in ['internlm2', '8k']]) and \ + if all([c not in path for c in ['internlm3', 'internlm2', '8k']]) and \ all([c in path for c in ['internlm', 'chat']]): return 'internlm' +@MODELS.register_module(name='internlm3') @MODELS.register_module(name='internlm2') class InternLM2Chat7B(InternLMChat7B): """Chat template and generation parameters of InternLM2-Chat-7B.""" @@ -491,6 +492,9 @@ def match(cls, model_path: str) -> Optional[str]: if 'internlm2' in path and ('chat' in path or 'math' in path): return 'internlm2' + if 'internlm3' in path and ('instruct' in path): + return 'internlm3' + def messages2prompt(self, messages, sequence_start=True, diff --git a/lmdeploy/pytorch/configurations/default.py b/lmdeploy/pytorch/configurations/default.py index d1337a241e..c13517c098 100644 --- a/lmdeploy/pytorch/configurations/default.py +++ b/lmdeploy/pytorch/configurations/default.py @@ -14,7 +14,9 @@ def condition(cls, hf_config): @classmethod def build(cls, hf_config, model_path: str = None, **kwargs): """build.""" - head_dim = hf_config.hidden_size // hf_config.num_attention_heads + head_dim = getattr( + hf_config, 'head_dim', + hf_config.hidden_size // hf_config.num_attention_heads) num_attention_heads = hf_config.num_attention_heads num_key_value_heads = getattr(hf_config, 'num_key_value_heads', num_attention_heads) diff --git a/lmdeploy/pytorch/engine/engine.py b/lmdeploy/pytorch/engine/engine.py index a674d609af..1e4d70b9a1 100644 --- a/lmdeploy/pytorch/engine/engine.py +++ b/lmdeploy/pytorch/engine/engine.py @@ -172,7 +172,6 @@ def __init__(self, # create main thread self._start_loop() - self._create_buffers() self._output_stream = torch.cuda.Stream() @classmethod @@ -228,12 +227,6 @@ def _download_adapters(self, adapters: Dict[str, str], return new_adapters - def _create_buffers(self): - max_batches = self.scheduler_config.max_batches - - # buffers to create inputs - self._seq_length_buf = torch.ones(max_batches, dtype=torch.long) - def _build_adapter_manager(self, adapters): return AdapterManager(adapters) @@ -368,14 +361,16 @@ def __update_max_new_tokens(msg): session_id = req.data['session_id'] sess = self.scheduler.sessions[session_id] # TODO: support 1 session n sequence + sampling_param = req.data['sampling_param'] + return_logits = sampling_param.out_logits if len(sess.sequences) == 0: assert len( req.data['token_ids']) > 0, ('Empty input is not allowed.') sess.add_sequence( req.data['token_ids'], - sampling_param=req.data['sampling_param'], + sampling_param=sampling_param, adapter_name=req.data['adapter_name'], - return_logits=req.data.get('return_logits', False), + return_logits=return_logits, multimodals=req.data.get('input_multimodals'), input_embeddings=req.data.get('input_embeddings'), ) @@ -391,8 +386,8 @@ def __update_max_new_tokens(msg): embeddings=req.data.get('input_embeddings'), ) msg.num_new_tokens = 0 - msg.sampling_param = req.data['sampling_param'] - msg.return_logits = req.data.get('return_logits', False) + msg.sampling_param = sampling_param + msg.return_logits = return_logits msg.status = MessageStatus.WAITING __update_bad_words(msg) __update_max_new_tokens(msg) @@ -431,7 +426,7 @@ def create_model_inputs(self, messages: SeqList, is_prefill: bool): seq_length = [len(tokens) for tokens in token_ids] seq_length = torch.tensor(seq_length, dtype=torch.long) else: - seq_length = self._seq_length_buf[:batch_size] + seq_length = torch.ones(batch_size, dtype=torch.long) max_q_seq_length = seq_length.max().item() block_offsets = self.scheduler.get_block_tables(messages) @@ -685,6 +680,8 @@ async def __long_context_single_forward(inputs): if not return_logits and not inputs.is_decoding: last_token_loc = [-1] ret['hidden_states'] = ret['hidden_states'][:, last_token_loc] + else: + ret['hidden_states'] = ret['hidden_states'].to('cuda') hidden_states = ret.pop('hidden_states') logits = self.model_agent.get_logits(hidden_states) @@ -808,7 +805,11 @@ def __update_inputs(next_token_ids): finish = finish or _check_finish(self.scheduler, idx) event = torch.cuda.Event() event.record() - output = (next_token_ids, logits, stopped, model_metas, event) + output = dict(next_token_ids=next_token_ids, + logits=logits, + stopped=stopped, + model_metas=model_metas, + event=event) output_que.put_nowait((finish, output)) inputs.model_metas = model_metas @@ -1053,7 +1054,7 @@ async def __step(): finish = False while not finish: finish, out = await out_que.get() - step_outputs = await self._make_infer_outputs(*out) + step_outputs = await self._make_infer_outputs(**out) self._set_has_runable_event(has_runable_event) resp_que.put_nowait(step_outputs) diff --git a/lmdeploy/pytorch/engine/engine_instance.py b/lmdeploy/pytorch/engine/engine_instance.py index 5cf1366783..b532a48341 100644 --- a/lmdeploy/pytorch/engine/engine_instance.py +++ b/lmdeploy/pytorch/engine/engine_instance.py @@ -41,12 +41,6 @@ async def async_try_add_session(req_sender: RequestSender, session_id: int): f'with error: {resp.type}')) -async def async_end(req_sender: RequestSender, session_id: int): - """End the given session.""" - req_sender.send_async(RequestType.END_SESSION, - dict(session_id=session_id, response=False)) - - async def async_cancel(req_sender: RequestSender, session_id: int): """Stop current streaming inference.""" resp = await req_sender.async_send(RequestType.STOP_SESSION, @@ -158,8 +152,13 @@ async def async_stream_infer(self, token_ids = resp.data['token_ids'].tolist() yield EngineOutput(resp.type, token_ids, len(token_ids)) elif resp.type == ResponseType.FINISH: - token_ids = resp.data['token_ids'].tolist() - yield EngineOutput(resp.type, token_ids, len(token_ids)) + resp_data = resp.data + token_ids = resp_data['token_ids'].tolist() + logits = resp_data['logits'] + yield EngineOutput(resp.type, + token_ids, + len(token_ids), + logits=logits) break else: yield EngineOutput(resp.type, [], 0) @@ -183,18 +182,16 @@ async def async_infer(self, List[int]: The streaming output tokens. int: The number of the output tokens. """ - token_ids = [] async for outputs in self.async_stream_infer(session_id, input_ids, multimodal=multimodal, gen_config=gen_config, **kwargs): - status, tmp_ids = outputs.status, outputs.token_ids + status = outputs.status if status not in [ResponseType.SUCCESS, ResponseType.FINISH]: - return EngineOutput(status, token_ids, len(token_ids)) - token_ids = tmp_ids + return outputs - return EngineOutput(0, token_ids, len(token_ids)) + return outputs def stream_infer(self, session_id: int, @@ -216,9 +213,6 @@ def stream_infer(self, List[int]: The streaming output tokens. int: The number of the output tokens. """ - if len(input_ids) > self.max_input_len: - yield EngineOutput(ResponseType.INPUT_LENGTH_ERROR, [], 0) - return def __call_async(): """call async.""" @@ -255,22 +249,16 @@ def infer(self, List[int]: The streaming output tokens. int: The number of the output tokens. """ - token_ids = [] - for outputs in self.stream_infer(session_id, - input_ids, - multimodal=multimodal, - gen_config=gen_config, - **kwargs): - status, tmp_ids = outputs.status, outputs.token_ids - if status not in [ResponseType.SUCCESS, ResponseType.FINISH]: - return EngineOutput(status, token_ids, len(token_ids)) - token_ids = tmp_ids - - return EngineOutput(0, token_ids, len(token_ids)) + return self.req_sender.run_until_complete( + self.async_infer(session_id, + input_ids, + multimodal=multimodal, + gen_config=gen_config, + **kwargs)) async def async_end(self, session_id: int): """End the given session.""" - return await async_end(self.req_sender, session_id) + return end(self.req_sender, session_id) def end(self, session_id: int): """End the given session.""" @@ -283,83 +271,3 @@ async def async_cancel(self, session_id: int): def cancel(self, session_id: int): """Stop current streaming inference.""" return cancel(self.req_sender, session_id) - - def decode(self, - input_ids, - multimodal: List[InputMultiModalType] = None, - steps: List[int] = None, - sequence_start: bool = True, - sequence_end: bool = True, - adapter_names: List[str] = None): - """Perform context decode on input tokens. - - Args: - input_ids (numpy.ndarray): the batch of input token ids - steps (List[int]): the offset of the k/v cache - multimodal (List[InputMultiModalType]): - multimodals inputs. - sequence_start (bool): indicator for starting a sequence - sequence_end (bool): indicator for ending a sequence - adapter_names (List[str]): The name of the adapters. - """ - from torch.nn.utils.rnn import pad_sequence - logger.debug('Decoding logits.') - batch_size = len(input_ids) - - def __add_messages(session_ids, input_ids, adapter_names, - input_multimodals): - add_msgs = [] - sampling_param = SamplingParam(max_new_tokens=0) - batch_size = len(input_ids) - if input_multimodals is None: - input_multimodals = [None] * batch_size - for (session_id, token_id, adapter_name, - in_mm) in zip(session_ids, input_ids, adapter_names, - input_multimodals): - if len(token_id) > self.max_input_len: - raise RuntimeError( - f'Expect input length<={self.max_input_len} ' - f'but get {len(token_id)}') - msg = dict(token_ids=token_id, - session_id=session_id, - sampling_param=sampling_param, - adapter_name=adapter_name, - input_multimodals=in_mm, - return_logits=True) - add_msgs.append(msg) - req_types = [RequestType.ADD_MESSAGE] * batch_size - resps = self.req_sender.batched_send_async(req_types, - data=add_msgs) - return resps - - if steps is not None: - assert batch_size == len(steps) - - if adapter_names is not None: - assert len(adapter_names) == batch_size - else: - adapter_names = [None] * batch_size - - session_ids = tuple(range(batch_size)) - if sequence_start: - for sid in session_ids: - self.req_sender.send(RequestType.END_SESSION, - dict(session_id=sid)) - self._try_add_session(sid) - - resps = __add_messages(session_ids, input_ids, adapter_names, - multimodal) - - ret = [] - for resp in resps: - resp = self.req_sender.recv(resp) - assert resp.type == ResponseType.FINISH - ret.append(resp.data['logits']) - - ret = pad_sequence(ret, True) - - if sequence_end: - for sid in session_ids: - self.end(sid) - - return ret diff --git a/lmdeploy/pytorch/messages.py b/lmdeploy/pytorch/messages.py index 968b71fee1..82bd93c52a 100644 --- a/lmdeploy/pytorch/messages.py +++ b/lmdeploy/pytorch/messages.py @@ -50,6 +50,8 @@ class SamplingParam: min_new_tokens: int = 0 response_format: Optional[str] = None logits_processors: Optional[List[LogitsProcessor]] = None + out_logits: bool = False + out_last_hidden_states: bool = False @classmethod def from_gen_config(self, gen_config: GenerationConfig): @@ -70,6 +72,16 @@ def from_gen_config(self, gen_config: GenerationConfig): max_new_tokens = gen_config.max_new_tokens response_format = gen_config.response_format + output_logits = gen_config.output_logits + if output_logits: + if (output_logits != 'all' or gen_config.max_new_tokens > 0): + output_logits = None + logger.warning( + 'Pytorch Engine only support output_logits="all"' + ' with max_new_tokens=0') + if gen_config.output_last_hidden_state is not None: + logger.warning( + 'Pytorch Engine does not support output last hidden states.') if top_p < 0 or top_p > 1.0: logger.warning('`top_p` has to be a float > 0 and < 1' f' but is {top_p}') @@ -110,7 +122,8 @@ def from_gen_config(self, gen_config: GenerationConfig): response_format=response_format, max_new_tokens=max_new_tokens, min_new_tokens=min_new_tokens, - logits_processors=gen_config.logits_processors) + logits_processors=gen_config.logits_processors, + out_logits=(output_logits is not None)) class MessageStatus(enum.Enum): diff --git a/lmdeploy/pytorch/models/internlm3.py b/lmdeploy/pytorch/models/internlm3.py new file mode 100644 index 0000000000..623483ebfe --- /dev/null +++ b/lmdeploy/pytorch/models/internlm3.py @@ -0,0 +1,448 @@ +# Copyright (c) OpenMMLab. All rights reserved. +from typing import Any, Iterable, List, Optional, Tuple + +import torch +from torch import nn +from transformers.configuration_utils import PretrainedConfig + +from lmdeploy.pytorch.model_inputs import StepContext, StepContextManager +from lmdeploy.pytorch.nn import (ApplyRotaryEmb, Attention, RMSNorm, RopeType, + SiluAndMul, build_rotary_embedding) +from lmdeploy.pytorch.nn.linear import (build_merged_colwise_linear, + build_qkv_proj, build_rowwise_linear) +from lmdeploy.pytorch.weight_loader.model_weight_loader import load_weight + +from .utils.cudagraph import CudaGraphMixin + + +class InternLM3Attention(nn.Module): + """Rewrite module of InternLM3Attention.""" + + def __init__(self, + config: PretrainedConfig, + dtype: torch.dtype = None, + device: torch.device = None): + super().__init__() + quantization_config = getattr(config, 'quantization_config', None) + num_heads = config.num_attention_heads + num_key_value_heads = config.num_key_value_heads + hidden_size = config.hidden_size + head_dim = getattr(config, 'head_dim', hidden_size // num_heads) + num_replicate_kv_heads = getattr(config, + 'num_replicate_key_value_heads', 1) + # packed qkv + self.qkv_proj = build_qkv_proj( + hidden_size, + num_q_heads=num_heads, + num_kv_heads=num_key_value_heads, + head_size=head_dim, + bias=config.qkv_bias, + quant_config=quantization_config, + dtype=dtype, + device=device, + num_replicate_kv_heads=num_replicate_kv_heads, + ) + + # rotary embedding + self.apply_rotary_pos_emb = ApplyRotaryEmb() + + # attention + self.attn_fwd = Attention( + num_heads, + head_dim, + num_kv_heads=num_key_value_heads, + v_head_size=head_dim, + ) + + # o_proj + self.o_proj = build_rowwise_linear(num_heads * head_dim, + hidden_size, + bias=config.bias, + quant_config=quantization_config, + dtype=dtype, + device=device, + is_tp=True) + + def forward( + self, + hidden_states: torch.Tensor, + rotary_pos_emb: Tuple[torch.FloatTensor, torch.FloatTensor], + past_key_value: Optional[Tuple[torch.Tensor]] = None, + attn_metadata: Any = None, + ): + """Rewrite of InternLM3Attention.forward.""" + # qkv proj + qkv_states = self.qkv_proj(hidden_states) + # (-1, heads, head_dim) + qkv_states = qkv_states.flatten(0, -2) + query_states, key_states, value_states = self.qkv_proj.split_qkv( + qkv_states) + + # apply rotary embedding + cos, sin = rotary_pos_emb + query_states, key_states = self.apply_rotary_pos_emb( + query_states, + key_states, + cos, + sin, + inplace=True, + ) + + # attention + attn_output = self.attn_fwd( + query_states, + key_states, + value_states, + past_key_value[0], + past_key_value[1], + attn_metadata, + k_scales_zeros=None + if len(past_key_value) == 2 else past_key_value[2], + v_scales_zeros=None + if len(past_key_value) == 2 else past_key_value[3], + inplace=True, + ) + attn_output = attn_output.reshape(*hidden_states.shape[:-1], -1) + + # o proj + attn_output = self.o_proj(attn_output) + return attn_output + + +class InternLM3MLP(nn.Module): + """internlm3 mlp.""" + + def __init__(self, + config: PretrainedConfig, + dtype: torch.dtype = None, + device: torch.device = None): + super().__init__() + quantization_config = getattr(config, 'quantization_config', None) + # gate up + mlp_bias = getattr(config, 'bias', False) + self.gate_up_proj = build_merged_colwise_linear( + config.hidden_size, + [config.intermediate_size, config.intermediate_size], + bias=mlp_bias, + dtype=dtype, + device=device, + quant_config=quantization_config, + is_tp=True, + ) + + # silu and mul + self.act_fn = SiluAndMul(inplace=True) + + # down + self.down_proj = build_rowwise_linear(config.intermediate_size, + config.hidden_size, + bias=mlp_bias, + quant_config=quantization_config, + dtype=dtype, + device=device, + is_tp=True) + + def forward(self, x): + """forward.""" + gate_up = self.gate_up_proj(x) + act = self.act_fn(gate_up) + return self.down_proj(act) + + +class InternLM3DecoderLayer(nn.Module): + """llama decoder layer.""" + + def __init__(self, + config: PretrainedConfig, + layer_idx: int, + dtype: torch.dtype = None, + device: torch.device = None): + super().__init__() + self.layer_idx = layer_idx + quantization_config = getattr(config, 'quantization_config', None) + + # build attention layer + self.self_attn = InternLM3Attention(config, dtype=dtype, device=device) + + # build MLP + self.mlp = InternLM3MLP(config, dtype=dtype, device=device) + + # build input layer norm + self.input_layernorm = RMSNorm(config.hidden_size, + config.rms_norm_eps, + quant_config=quantization_config, + dtype=dtype, + device=device) + + # build attention layer norm + self.post_attention_layernorm = RMSNorm( + config.hidden_size, + config.rms_norm_eps, + quant_config=quantization_config, + dtype=dtype, + device=device) + + def forward( + self, + hidden_states: torch.Tensor, + rotary_pos_emb: Tuple[torch.FloatTensor, torch.FloatTensor], + past_key_value: Optional[List[torch.FloatTensor]], + residual: Optional[torch.Tensor] = None, + attn_metadata: Any = None, + ): + + if residual is None: + residual = hidden_states + hidden_states = self.input_layernorm(hidden_states) + else: + hidden_states, residual = self.input_layernorm( + hidden_states, residual) + + # Self Attention + hidden_states = self.self_attn( + hidden_states=hidden_states, + rotary_pos_emb=rotary_pos_emb, + past_key_value=past_key_value, + attn_metadata=attn_metadata, + ) + + # Fully Connected + hidden_states, residual = self.post_attention_layernorm( + hidden_states, residual) + hidden_states = self.mlp(hidden_states) + + outputs = (hidden_states, residual) + return outputs + + +class InternLM3Model(nn.Module): + """internlm3 model.""" + + def __init__(self, + config: PretrainedConfig, + dtype: torch.dtype = None, + device: torch.device = None): + super().__init__() + self.padding_idx = config.pad_token_id + self.vocab_size = config.vocab_size + + self.embed_tokens = nn.Embedding(config.vocab_size, + config.hidden_size, + self.padding_idx, + dtype=dtype, + device=device) + + # build all decode layers + self.layers = nn.ModuleList([ + InternLM3DecoderLayer(config, + layer_idx, + dtype=dtype, + device=device) + for layer_idx in range(config.num_hidden_layers) + ]) + + # build norm + self.norm = RMSNorm(config.hidden_size, + config.rms_norm_eps, + dtype=dtype, + device=device) + + # build rotary embedding + rope_dim = config.hidden_size // config.num_attention_heads + rope_max_pos_emb = config.max_position_embeddings + rope_base = config.rope_theta + scaling_factor = 1.0 + rope_scaling = config.rope_scaling + if rope_scaling is None: + emb_type = RopeType.LinearScaling + else: + if 'scaling_factor' in rope_scaling: + scaling_factor = rope_scaling['scaling_factor'] + elif 'factor' in rope_scaling: + scaling_factor = rope_scaling['factor'] + + rope_type = rope_scaling['rope_type'] + if rope_type == 'dynamic': + emb_type = RopeType.DynamicNTKScaling + elif rope_type == 'linear': + emb_type = RopeType.LinearScaling + else: + raise RuntimeError(f'Unsupported rope type: {rope_type}') + + self.rotary_emb = build_rotary_embedding( + rope_dim, + rope_max_pos_emb, + rope_base, + scaling_factor, + emb_type=emb_type, + ) + + def forward( + self, + input_ids: torch.LongTensor = None, + position_ids: Optional[torch.LongTensor] = None, + past_key_values: Optional[List[torch.FloatTensor]] = None, + attn_metadata: Any = None, + inputs_embeds: Optional[torch.FloatTensor] = None, + ): + """Rewrite of InternLM3Model.forward.""" + + # token embedding + if inputs_embeds is None: + inputs_embeds = self.embed_tokens(input_ids) + + hidden_states = inputs_embeds + + # rotary embedding + cos, sin = self.rotary_emb(hidden_states, position_ids) + cos, sin = cos[0], sin[0] + rotary_pos_emb = (cos, sin) + + # decoding + residual = None + for idx, decoder_layer in enumerate(self.layers): + past_key_value = past_key_values[idx] + hidden_states, residual = decoder_layer( + hidden_states, + rotary_pos_emb=rotary_pos_emb, + past_key_value=past_key_value, + residual=residual, + attn_metadata=attn_metadata, + ) + + # norm + hidden_states, _ = self.norm(hidden_states, residual) + + return hidden_states + + def get_input_embeddings(self): + """get input embeddings.""" + return self.embed_tokens + + +class InternLM3ForCausalLM(nn.Module, CudaGraphMixin): + """rewrote model of InternLM3ForCausalLM.""" + + packed_modules_mapping = { + 'qkv_proj': [ + 'q_proj', + 'k_proj', + 'v_proj', + ], + 'gate_up_proj': [ + 'gate_proj', + 'up_proj', + ], + } + + def __init__(self, + config: PretrainedConfig, + ctx_mgr: StepContextManager, + dtype: torch.dtype = None, + device: torch.device = None): + super().__init__() + self.config = config + self.ctx_mgr = ctx_mgr + # build InternLM3Model + self.model = InternLM3Model(config, dtype=dtype, device=device) + # build lm_head + self.lm_head = build_rowwise_linear(config.hidden_size, + config.vocab_size, + bias=False, + dtype=dtype, + device=device) + + def forward( + self, + input_ids: torch.Tensor, + position_ids: torch.Tensor, + past_key_values: List[List[torch.Tensor]], + attn_metadata: Any = None, + inputs_embeds: torch.Tensor = None, + **kwargs, + ): + """model forward, return logits.""" + hidden_states = self.model( + input_ids=input_ids, + position_ids=position_ids, + past_key_values=past_key_values, + attn_metadata=attn_metadata, + inputs_embeds=inputs_embeds, + ) + return hidden_states + + def update_weights(self): + """update weights.""" + if self.config.tie_word_embeddings: + self.lm_head.weight = self.model.embed_tokens.weight + + def get_logits(self, hidden_states: torch.Tensor): + """compute logits of the model output.""" + return self.lm_head(hidden_states) + + def get_input_embeddings(self): + """get input embeddings.""" + return self.model.get_input_embeddings() + + def prepare_inputs_for_generation( + self, + past_key_values: List[List[torch.Tensor]], + inputs_embeds: Optional[torch.Tensor] = None, + context: StepContext = None, + ): + """prepare input.""" + # get input_ids, position_ids and attention metadatas + input_ids = context.input_ids + position_ids = context.position_ids + attn_metadata = context.attn_metadata + + # process vision embeddings + vision_embeddings = context.input_embeddings + vision_embedding_indexing = context.input_embedding_indexing + if vision_embeddings is not None and len(vision_embeddings) > 0: + if inputs_embeds is None: + inputs_embeds = self.get_input_embeddings()(input_ids) + inputs_embeds[:, + vision_embedding_indexing, :] = vision_embeddings.to( + inputs_embeds) + + # inputs of forward + return dict( + input_ids=input_ids, + position_ids=position_ids, + past_key_values=past_key_values, + attn_metadata=attn_metadata, + inputs_embeds=inputs_embeds, + ) + + def load_weights(self, weights: Iterable[Tuple[str, torch.Tensor]]): + """load weights.""" + # modify from vllm + stacked_params_mapping = [ + # (param_name, shard_name, shard_id) + ('.qkv_proj', '.q_proj', 'q'), + ('.qkv_proj', '.k_proj', 'k'), + ('.qkv_proj', '.v_proj', 'v'), + ('.gate_up_proj', '.gate_proj', 0), + ('.gate_up_proj', '.up_proj', 1), + ] + + params_dict = dict(self.named_parameters()) + for name, loaded_weight in weights: + if 'rotary_emb.inv_freq' in name: + continue + if ('rotary_emb.cos_cached' in name + or 'rotary_emb.sin_cached' in name): + continue + if self.config.tie_word_embeddings and 'lm_head.weight' in name: + continue + for (param_name, weight_name, shard_id) in stacked_params_mapping: + if weight_name not in name: + continue + name = name.replace(weight_name, param_name) + param = params_dict[name] + load_weight(param, loaded_weight, shard_id=shard_id) + break + else: + param = params_dict[name] + load_weight(param, loaded_weight) diff --git a/lmdeploy/pytorch/models/module_map.py b/lmdeploy/pytorch/models/module_map.py index c01a166b94..2503500eed 100644 --- a/lmdeploy/pytorch/models/module_map.py +++ b/lmdeploy/pytorch/models/module_map.py @@ -189,4 +189,10 @@ f'{LMDEPLOY_PYTORCH_MODEL_PATH}.mllama.MllamaForConditionalGeneration', }) +# internlm3 +MODULE_MAP.update({ + 'InternLM3ForCausalLM': + f'{LMDEPLOY_PYTORCH_MODEL_PATH}.internlm3.InternLM3ForCausalLM', +}) + CUSTOM_MODULE_MAP = dict() diff --git a/lmdeploy/pytorch/supported_models.py b/lmdeploy/pytorch/supported_models.py index 67452f78e3..cf5862763f 100644 --- a/lmdeploy/pytorch/supported_models.py +++ b/lmdeploy/pytorch/supported_models.py @@ -72,6 +72,8 @@ MllamaForConditionalGeneration=True, # MiniCPM-V-2_6 MiniCPMVForCausalLM=True, + # internlm3 + InternLM3ForCausalLM=True, ) diff --git a/lmdeploy/serve/async_engine.py b/lmdeploy/serve/async_engine.py index 797397e660..0f2ea66367 100644 --- a/lmdeploy/serve/async_engine.py +++ b/lmdeploy/serve/async_engine.py @@ -382,6 +382,7 @@ def __call__(self, async def stop_session(self, session_id: int): """Stop a session by a session_id.""" + logger.info(f'stop session {session_id}') generator = self.id2inst.get(session_id) if generator: await generator.async_cancel(session_id) @@ -389,6 +390,7 @@ async def stop_session(self, session_id: int): async def end_session(self, session_id: int): """For ending a session that is not running.""" + logger.info(f'end session {session_id}') inst = self.id2inst.get(session_id) if inst: await inst._active.wait() @@ -700,7 +702,7 @@ async def generate( prompt_token_ids=input_ids, gen_config=gen_config, adapter_name=adapter_name) - logger.info(f'session_id={session_id}, ' + logger.info(f'session={session_id}, ' f'history_tokens={self.id2step[session_id]}, ' f'input_tokens={len(input_ids)}, ' f'max_new_tokens={gen_config.max_new_tokens}, ' @@ -724,7 +726,7 @@ async def generate( f'Truncate max_new_tokens to {gen_config.max_new_tokens}') if self.id2step[session_id] + len( input_ids) + gen_config.max_new_tokens > self.session_len: - logger.error(f'run out of tokens. session_id={session_id}.') + logger.error(f'run out of tokens. session={session_id}.') yield GenOut('', self.id2step[session_id], len(input_ids), 0, 'length') if sequence_end is True and sequence_start is False: @@ -822,9 +824,14 @@ def is_error(status): if not response.endswith('�'): # avoid returning the last response twice response = '' + logger.info(f'session {session_id} finished, reason ' + f'"{finish_reason}", input_tokens ' + f'{len(input_ids)}, outupt_tokens {gen_len}') yield GenOut(response, self.id2step[session_id], len(input_ids), gen_len, finish_reason) else: + logger.error(f'session {session_id} finished, ' + 'reason "error"') yield GenOut(response='internal error happened', history_token_len=self.id2step[session_id], input_token_len=len(input_ids), diff --git a/lmdeploy/serve/openai/protocol.py b/lmdeploy/serve/openai/protocol.py index a6f945ac13..bc119e3c63 100644 --- a/lmdeploy/serve/openai/protocol.py +++ b/lmdeploy/serve/openai/protocol.py @@ -191,7 +191,8 @@ class ChatCompletionResponseChoice(BaseModel): index: int message: ChatMessage logprobs: Optional[ChoiceLogprobs] = None - finish_reason: Optional[Literal['stop', 'length', 'tool_calls']] = None + finish_reason: Optional[Literal['stop', 'length', 'tool_calls', + 'error']] = None class ChatCompletionResponse(BaseModel): diff --git a/lmdeploy/serve/utils.py b/lmdeploy/serve/utils.py index afcec3d4ab..7e62b8a9c4 100644 --- a/lmdeploy/serve/utils.py +++ b/lmdeploy/serve/utils.py @@ -31,25 +31,25 @@ async def _async_get_logits( logits = [None] * len(input_ids) async def _proc(i): - async for out in self.generate( - messages=None, - input_ids=input_ids[i], - step=0 if steps is None else steps[i], - session_id=i, - # `max_new_tokens=0` means we don't need engine to - # generate tokens and `output_logits=all` requests engine - # to output logits of all input tokens - gen_config=GenerationConfig(max_new_tokens=0, - output_logits='all'), - stream_response=False, - sequence_start=sequence_start, - sequence_end=sequence_end): - # In the last iteration, the yielded `out` is an empty response - # indicating the finish_reason, which should be ignored here - if out.finish_reason is None: - # Try not to return in async for loop. Otherwise, there - # will be `GeneratorExit` exception - logits[i] = out.logits + async with self.model_inst(session_id=i) as inst: + input_len = len(input_ids[i]) + # TODO(lvhan): Fix the ugly code later on + max_new_tokens = 1 if self.backend == 'turbomind' else 0 + gen_config = GenerationConfig(max_new_tokens=max_new_tokens, + output_logits='all') + async with self.safe_run(inst, + session_id=i, + input_ids=input_ids[i], + gen_config=gen_config, + stream_output=False, + sequence_start=sequence_start, + sequence_end=sequence_end, + step=steps[i] if steps else 0) as gen: + async for outputs in gen: + pass + logits[i] = outputs.logits[:input_len, :] + if sequence_end and self.backend == 'pytorch': + await inst.async_end(session_id=i) tasks = [_proc(i) for i in range(len(input_ids))] await asyncio.gather(*tasks) @@ -211,4 +211,5 @@ def _get_ppl(self, loss = flat_loss_matrix.sum() target_count = target_mask.sum() result.append(loss.item() / target_count.item()) + logger.info(f'ppl result: {result}') return result diff --git a/lmdeploy/turbomind/deploy/source_model/llama.py b/lmdeploy/turbomind/deploy/source_model/llama.py index 0c702d6588..aa7f98b41f 100644 --- a/lmdeploy/turbomind/deploy/source_model/llama.py +++ b/lmdeploy/turbomind/deploy/source_model/llama.py @@ -191,7 +191,7 @@ def model_info(self): return dict( size_per_head=head_dim, - rotary_embedding=hidden_units // attn_head_num, + rotary_embedding=head_dim, num_layer=num_layer, norm_eps=norm_eps, head_num=attn_head_num, diff --git a/lmdeploy/turbomind/supported_models.py b/lmdeploy/turbomind/supported_models.py index 2b9c5156ed..59ddae2114 100644 --- a/lmdeploy/turbomind/supported_models.py +++ b/lmdeploy/turbomind/supported_models.py @@ -13,6 +13,8 @@ InternLMForCausalLM='llama', # internlm2 InternLM2ForCausalLM='internlm2', + # internlm3 + InternLM3ForCausalLM='llama', # llama, llama2, alpaca, vicuna, codellama, ultracm, yi, # deepseek-coder, deepseek-llm LlamaForCausalLM='llama', diff --git a/src/turbomind/models/llama/LlamaBatch.cc b/src/turbomind/models/llama/LlamaBatch.cc index e37af1bb76..8b1d0ebd2c 100644 --- a/src/turbomind/models/llama/LlamaBatch.cc +++ b/src/turbomind/models/llama/LlamaBatch.cc @@ -1503,7 +1503,11 @@ template auto LlamaBatch::Interrupt(int index, bool force_stop, bool force_end) -> Signal { if (rank_ == 0) { - TM_LOG_INFO("[Interrupt] slot = %d, id = %lu", index, (long)state_->requests[index]->id); + TM_LOG_INFO("[Interrupt] slot %d, request %lu, stop %d, end %d", + index, + (long)state_->requests[index]->id, + force_stop, + force_end); } if (debug_ && rank_ == 0) { diff --git a/tests/test_lmdeploy/test_auto_backend.py b/tests/test_lmdeploy/test_auto_backend.py index 5db727f17f..a8e7aa2ab9 100644 --- a/tests/test_lmdeploy/test_auto_backend.py +++ b/tests/test_lmdeploy/test_auto_backend.py @@ -33,8 +33,6 @@ def models(self): ('tiiuae/falcon-7b-instruct', True, False), ('01-ai/Yi-34B-Chat', True, True), ('codellama/CodeLlama-7b-Instruct-hf', True, True), - ('mistralai/Mistral-7B-Instruct-v0.1', True, True), - ('mistralai/Mixtral-8x7B-Instruct-v0.1', True, True), ('Qwen/Qwen-7B-Chat', True, True), ('Qwen/Qwen-VL-Chat', False, True), ('Qwen/Qwen1.5-4B-Chat', True, True),