Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can TensorRT-LLM Handle High Levels of Concurrent Requests? #2514

Open
1 of 4 tasks
Rumeysakeskin opened this issue Nov 29, 2024 · 2 comments
Open
1 of 4 tasks

Can TensorRT-LLM Handle High Levels of Concurrent Requests? #2514

Rumeysakeskin opened this issue Nov 29, 2024 · 2 comments
Labels
bug Something isn't working Investigating triaged Issue has been triaged by maintainers Triton Backend

Comments

@Rumeysakeskin
Copy link

System Info

NVIDIA A100 80GB
Running on nvcr.io/nvidia/tritonserver:23.10-trtllm-python-py3 image

Who can help?

import asyncio
from threading import Lock
from transformers import AutoTokenizer
from tensorrt_llm.runtime import ModelRunner
from pydantic import BaseModel, Extra, Field
from langchain.llms.base import LLM
from langchain.callbacks.manager import CallbackManagerForLLMRun
from typing import Any, List, Mapping, Optional


N_GPUS = 1
MAX_OUTPUT_LEN = 512
MAX_BATCH_SIZE = 16
MODEL_ID = "meta-llama/Meta-Llama-3-8B-Instruct"
ENGINE_DIR = "/llama3-8b-it/engine/"

class Model:
    """TensorRT tabanlı Model yönetimi."""
    def __init__(self):
        self.runner_pool = []
        self.lock = Lock()
        self.load()

    def load(self):
        """TRT-LLM modelini ve tokenizer'ı başlatır."""
        print("🥶 Cold boot: TRT-LLM motoru başlatılıyor.")
        self.init_start = time.monotonic_ns()
    
        # Tokenizer'ı yükle
        self.tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
        self.tokenizer.add_special_tokens({"pad_token": self.tokenizer.eos_token})
        self.tokenizer.padding_side = "left"
        self.pad_id = self.tokenizer.pad_token_id
        self.end_id = self.tokenizer.eos_token_id
    
        # ModelRunner'ları başlat ve GPU'lara dağıt
        for gpu_id in range(N_GPUS):
            runner_kwargs = dict(engine_dir=ENGINE_DIR, lora_dir=None, rank=gpu_id)  # Sadece geçerli parametreler
            runner = ModelRunner.from_dir(**runner_kwargs)
            self.runner_pool.append(runner)
    
        self.init_duration_s = (time.monotonic_ns() - self.init_start) / 1e9
        print(f"🚀 Cold boot tamamlandı: {self.init_duration_s:.2f} saniye sürdü.")


    def get_runner(self):
        """Thread-safe şekilde bir runner al."""
        with self.lock:
            runner = self.runner_pool.pop(0)
            self.runner_pool.append(runner)
        return runner

    def generate(self, prompts, settings=None):
        """Prompts'e yanıtlar üretir (batch destekli)."""
        if settings is None:
            settings = {
                "temperature": 0.1,
                "top_k": 1,
                "repetition_penalty": 1.1,
                "max_new_tokens": MAX_OUTPUT_LEN,
                "end_id": self.end_id,
                "pad_id": self.pad_id,
            }
    
        # Batch limitine göre işleyin
        if len(prompts) > MAX_BATCH_SIZE:
            raise ValueError(f"Batch boyutu {len(prompts)} maksimum {MAX_BATCH_SIZE} olmalı.")
    
        runner = self.get_runner()
        parsed_prompts = [
            self.tokenizer.apply_chat_template(
                [{"role": "user", "content": prompt}], add_generation_prompt=True, tokenize=False
            ) for prompt in prompts
        ]
    
        inputs_t = self.tokenizer(parsed_prompts, return_tensors="pt", padding=True, truncation=True)["input_ids"]
        
        # Inflight batching'i model runner üzerinde optimize ediyoruz
        outputs_t = runner.generate(inputs_t, batch_size=len(prompts), **settings)
        outputs_text = self.tokenizer.batch_decode(outputs_t[:, 0])
        
        return [self.extract_assistant_response(output_text) for output_text in outputs_text]

    async def generate_async(self, prompts, settings=None):
        """Prompts'e yanıtlar üretir (asenkron)."""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, self.generate, prompts, settings)

    def extract_assistant_response(self, output_text):
        """Çıktı metninden istenilen yanıtı çıkarır."""
        first_end_header_index = output_text.find("<|end_header_id|>")
        second_end_header_index = output_text.find("<|end_header_id|>", first_end_header_index + 1)

        if second_end_header_index != -1:
            start_index = second_end_header_index + len("<|end_header_id|>")
            end_index = output_text.find("<|eot_id|>", start_index)
            if end_index != -1:
                return output_text[start_index:end_index].strip()
        return "Hata: Çıktı formatı hatalı."


class TRTLLM(LLM):
 
    model: Any = Field(..., exclude=True)  # Model nesnesi
    lock: Lock = Field(default_factory=Lock, exclude=True)  # Thread-safe erişim için kilit

    class Config:
        extra = Extra.forbid

    def __init__(self, model: Any, **kwargs):
        """
        TensorRT model örneği ile başlat.
        """
        super().__init__(model=model, **kwargs)
        self.lock = Lock()

    @property
    def _llm_type(self) -> str:
        """LLM türü adı."""
        return "TRT-LLM"

    async def _acall(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> str:
        """Asenkron prompt işleme."""
        if stop is not None:
            raise ValueError("stop kwargs are not supported.")

        responses = await self.model.generate_async([prompt])
        return responses[0] if responses else "Hata: Yanıt alınamadı."

    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> str:
        """Prompta yanıt üretir (senkron)."""
        if stop is not None:
            raise ValueError("stop kwargs are not supported.")

        with self.lock:
            responses = self.model.generate([prompt])
        return responses[0] if responses else "Hata: Yanıt alınamadı."

    @property
    def _identifying_params(self) -> Mapping[str, Any]:
        """Modeli tanımlayan parametreler."""
        return {"model_name": "TRT-LLM"}


llm = TRTLLM(model=Model())

loader = PyPDFDirectoryLoader("data/")
docs = loader.load_and_split()

# RAG model
RAG = RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0")
ragatouille_docs = [str(doc) for doc in docs]
RAG.index(
    collection=ragatouille_docs,
    index_name="langchain-index",
    max_document_length=512,
    split_documents=True,
)

text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
splits = text_splitter.split_documents(docs)
embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-large-en-v1.5")
vectordb = Chroma.from_documents(splits, embeddings)
chroma_retriever = vectordb.as_retriever(
    search_type="mmr", search_kwargs={"k": 4, "fetch_k": 10}
)

# Ensemble retriever
ragatouille_retriever = RAG.as_langchain_retriever(k=10)
retriever = EnsembleRetriever(retrievers=[chroma_retriever, ragatouille_retriever], weights=[0.5, 0.5])
prompt_template = """...
{context}

Soru: {question}
Cevap:"""

prompt = PromptTemplate(template=prompt_template, input_variables=["context", "chat_history", "question"])


def create_qa_chain():
    return RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=retriever,
        return_source_documents=False,
        chain_type_kwargs={
            "verbose": False,
            "prompt": prompt,
            "memory": ConversationBufferMemory(
                memory_key="chat_history", input_key="question"
            ),
        },
    )

async def run_concurrent_queries_async(queries, num_users):
    async def invoke_async(query):
        qa_chain = create_qa_chain()
        # Senkron invoke fonksiyonunu asenkron hale getir
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, qa_chain.invoke, {"query": query})

    tasks = [
        invoke_async(random.choice(queries))  # Her kullanıcı için sorgu oluştur
        for _ in range(num_users)
    ]
    results = await asyncio.gather(*tasks)
    return results

import nest_asyncio
import asyncio
import time

nest_asyncio.apply()


if __name__ == "__main__":
    queries = [
        "question?",
        "question2?",
        "question3?",
    ]
    num_users = 10 

    print("Eşzamanlı sorgular başlatılıyor...")
    start = time.time()

    # Asenkron sorguları çalıştır
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(run_concurrent_queries_async(queries, num_users))

    total_time = time.time() - start
    print(f"Toplam süre: {total_time:.2f} saniye")

    # Sonuçları yazdır
    for idx, response in enumerate(results):
        print(f"Kullanıcı {idx + 1} Yanıtı: {response}")

Information

  • The official example scripts
  • My own modified scripts

Tasks

  • An officially supported task in the examples folder (such as GLUE/SQuAD, ...)
  • My own task or dataset (give details below)

Reproduction

When I send 1 request, it takes 3 seconds to inference, but when I send 10 requests, it takes 30 seconds.
I've worked on parallel processing, but I don't know where the issue lies. Could you help me?

Expected behavior

I've worked on parallel processing, but I don't know where the issue lies.

actual behavior

When I send 1 request, it takes 3 seconds to inference, but when I send 10 requests, it takes 30 seconds.

additional notes

.

@Rumeysakeskin Rumeysakeskin added the bug Something isn't working label Nov 29, 2024
@hello-11 hello-11 added triaged Issue has been triaged by maintainers Triton Backend labels Dec 10, 2024
@hello-11 hello-11 assigned hello-11 and unassigned hello-11 Dec 10, 2024
@pcastonguay
Copy link
Collaborator

I'm not sure what the issue with your code but I would recommend you try to use the LLM API. See https://nvidia.github.io/TensorRT-LLM/llm-api-examples/llm_inference_async.html for example.

@Rumeysakeskin
Copy link
Author

I'm not sure what the issue with your code but I would recommend you try to use the LLM API. See https://nvidia.github.io/TensorRT-LLM/llm-api-examples/llm_inference_async.html for example.

thankss

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Investigating triaged Issue has been triaged by maintainers Triton Backend
Projects
None yet
Development

No branches or pull requests

3 participants