Skip to content

Commit

Permalink
Refactor async engine & turbomind IO (#2968)
Browse files Browse the repository at this point in the history
* refactor

* async interface

* update perf metrics & adaptive tokens per tick

* wait-free

* refactor gateway

* optimize throughput

* add cancel cb

* simplify async engine

* simplify async engine

* fix end session

* faster synchronization

* fix async engine

* refactor async engine

* fix semaphore

* refactor inference API

* remove turbomind sync interface

* fix msvc build

* fix msvc build

* fix msvc build

* add extra outputs

* skip stop tokens

* exit gracefully

* cancel all tasks atexit

* refactor profiler

* fix id2step for api server

* save csv

* fix interactive

* fix lint

* fix generate_token_len

* fix async_end

* update pipeline ut

* fix ignore eos

* minor

* refactor profile pipeline api

* fix stop ids

* fix duplication

* control output range of logits & last hidden states

* fix lint & typo

* fix blank response

* export batch & num prompts
  • Loading branch information
lzhangzz authored Jan 10, 2025
1 parent ac509e8 commit c25381f
Show file tree
Hide file tree
Showing 41 changed files with 2,683 additions and 2,019 deletions.
2 changes: 1 addition & 1 deletion autotest/interface/pipeline/test_pipeline_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ def run_pipeline_testcase(config, model, backend, file_name):
result = True
for i in range(2):
result &= response[i].finish_reason == 'length'
result &= response[i].session_id == i
result &= response[i].index == i
save_pipeline_common_log(config, file_name, result, response)
del pipe
torch.cuda.empty_cache()
Expand Down
4 changes: 2 additions & 2 deletions autotest/utils/pipeline_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def assert_pipeline_single_stream_return(output, logprobs_num: int = 0):

def assert_pipeline_batch_stream_return(output, size: int = 1):
for i in range(size):
output_list = [item for item in output if item.session_id == i]
output_list = [item for item in output if item.index == i]
result, msg = assert_pipeline_single_stream_return(output_list)
if not result:
return result, msg
Expand All @@ -249,7 +249,7 @@ def assert_pipeline_single_element(output,
result = True
result &= output.generate_token_len > 0
result &= output.input_token_len > 0
result &= output.session_id >= 0
result &= output.index >= 0
if is_last:
result &= len(output.text) >= 0
result &= output.finish_reason in ['stop', 'length']
Expand Down
116 changes: 54 additions & 62 deletions benchmark/profile_pipeline_api.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
# Copyright (c) OpenMMLab. All rights reserved.
import argparse
import csv
import json
import os
import random
import time
from collections import OrderedDict
from typing import List, Tuple

from tqdm import tqdm
Expand All @@ -14,6 +11,10 @@
from lmdeploy import (GenerationConfig, PytorchEngineConfig,
TurbomindEngineConfig, pipeline)
from lmdeploy.cli.utils import ArgumentHelper, DefaultsAndTypesHelpFormatter
from lmdeploy.profiler import Profiler, Session
from lmdeploy.utils import get_logger

logger = get_logger('lmdeploy')


def sample_requests(dataset_path: str, num_requests: int,
Expand Down Expand Up @@ -66,91 +67,70 @@ def __init__(self, model_path: str, engine_config, csv: str):

self.csv = csv

def process_request(self, requests, concurrency, temperature, top_p, top_k,
stream_output):
def process_request(self, requests, profiler: Profiler, temperature, top_p,
top_k, stream_output):

stats = OrderedDict(
(session_id, None) for session_id in range(len(requests)))
prompts = [prompt for prompt, _, _ in requests]
gen_configs = [
GenerationConfig(temperature=temperature,
top_p=top_p,
top_k=top_k,
ignore_eos=True,
do_sample=True,
max_new_tokens=output_len)
for _, _, output_len in requests
]

start = time.perf_counter()
sess: List[Session] = []
for _, input_len, output_len in requests:
sess.append(profiler.new_session(input_len, output_len))

def _to_status(finish_reason):
if finish_reason == 'length':
return Session.SUCCESS
else:
return Session.FAIL

profiler.start()

for s in sess:
s.tick(0)

if stream_output:
pbar = tqdm(total=len(requests))
for output in self.pipe.stream_infer(prompts,
gen_configs,
do_preprocess=False):
session_id = output.session_id
index = output.index
n_token = output.generate_token_len
finish_reason = output.finish_reason
stats[session_id] = (n_token, finish_reason)
sess[index].tick(n_token)
if finish_reason is not None:
sess[index].finish(_to_status(finish_reason))
pbar.update(1)
pbar.close()
else:
for output in self.pipe(prompts,
gen_configs,
do_preprocess=False,
use_tqdm=True):
session_id = output.session_id
index = output.index
n_token = output.generate_token_len
finish_reason = output.finish_reason
stats[session_id] = (n_token, finish_reason)

elapsed_time = time.perf_counter() - start

completion_tokens = 0
for session_id, (n_token, finish_reason) in stats.items():
assert finish_reason == 'length', \
f'unexpected finish_reason of session_id={session_id}, ' \
f'prompt={requests[session_id][0]}'
assert n_token - 1 <= requests[session_id][-1] <= n_token, \
f'request to generate {requests[session_id][-1]} tokens, ' \
f'but got {n_token} tokens'
completion_tokens += n_token

prompt_tokens = 0
for _, input_len, _ in requests:
prompt_tokens += input_len

completion_token_throughput = completion_tokens / elapsed_time
total_token_throughput = (prompt_tokens +
completion_tokens) / elapsed_time
rps = len(requests) / elapsed_time
rpm = rps * 60

print(f'\n{"-" * 50}\nconcurrency: {concurrency}\n'
f'elapsed_time: {elapsed_time:.3f}s\n')

print(
f'number of prompts: {len(requests)}\n'
f'number of prompt tokens: {prompt_tokens:.0f}\n'
f'number of completion tokens: {completion_tokens:.0f}\n'
f'token throughput (completion token): {completion_token_throughput:.3f} token/s\n' # noqa
f'token throughput (prompt + completion token): {total_token_throughput:.3f} token/s\n' # noqa
f'RPS (request per second): {rps:.3f} req/s\n'
f'RPM (request per minute): {rpm:.3f} req/min\n'
f'{"-" * 50}\n')

if self.csv:
with open(self.csv, 'w') as csvfile:
writer = csv.writer(csvfile)
writer.writerow([
'batch', 'num_promts', 'RPS', 'RPM',
'throughput(out tok/s)', 'throughput(total tok/s)'
])
writer.writerow([
concurrency,
len(requests), f'{rps:.3f}', f'{rpm:.3f}',
f'{completion_token_throughput:.3f}',
f'{total_token_throughput:.3f}'
])
sess[index].tick(n_token)
sess[index].finish(_to_status(finish_reason))

profiler.finish()

# report first failure
for i, s in enumerate(sess):
if s.status != Session.SUCCESS or s.ns[-1] < s.req_output_len:
logger.error(
f'Request {i} failed with {s.ns[-1]}/{s.req_output_len} tokens generated' # noqa: E501
)
logger.error(f'Prompt: {prompts[i]}')
logger.warning('Got failed requests, metrics may be invalid')
break


def parse_args():
Expand Down Expand Up @@ -252,13 +232,25 @@ def main():
requests = sample_requests(args.dataset, args.num_prompts,
engine.tokenizer)

profiler = Profiler(args.stream_output, [50, 75, 95, 99])

engine.process_request(requests,
profiler,
temperature=args.temperature,
top_p=args.top_p,
top_k=args.top_k,
concurrency=args.concurrency,
stream_output=args.stream_output)

hyperparams = [('Concurrency', args.concurrency),
('Stream output', str(args.stream_output).lower())]

profiler.compute_metrics()
profiler.summarize(title='Profile Pipeline API', hyperparams=hyperparams)

if args.csv:
profiler.save_csv(args.csv, (('batch', args.concurrency),
('num_prompts', args.num_prompts)))


if __name__ == '__main__':
main()
Loading

0 comments on commit c25381f

Please sign in to comment.