Skip to content

Commit

Permalink
Fixes for perf stat
Browse files Browse the repository at this point in the history
  • Loading branch information
ilya-lavrenov committed Jan 14, 2025
1 parent 5d98114 commit 6f36295
Show file tree
Hide file tree
Showing 20 changed files with 82 additions and 57 deletions.
6 changes: 3 additions & 3 deletions samples/cpp/text_generation/benchmark_genai.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ int main(int argc, char* argv[]) try {
std::string device = result["device"].as<std::string>();
size_t num_warmup = result["num_warmup"].as<size_t>();
size_t num_iter = result["num_iter"].as<size_t>();

ov::genai::GenerationConfig config;
config.max_new_tokens = result["max_new_tokens"].as<size_t>();

ov::genai::LLMPipeline pipe(models_path, device);

for (size_t i = 0; i < num_warmup; i++)
pipe.generate(prompt, config);

ov::genai::DecodedResults res = pipe.generate(prompt, config);
ov::genai::PerfMetrics metrics = res.perf_metrics;
for (size_t i = 0; i < num_iter - 1; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ struct PipelineMetrics {
* Running average of the KV cache usage during the lifetime of the pipeline, with max window size of 1000 steps
*/
float avg_cache_usage = 0.0;

/**
* Number of tokens scheduled for processing at the previous step of the pipeline.
*/
size_t total_num_scheduled_tokens;
};

class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline {
Expand Down
12 changes: 2 additions & 10 deletions src/cpp/include/openvino/genai/llm_pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,6 @@ class OPENVINO_GENAI_EXPORTS LLMPipeline {
const ov::genai::GenerationConfig& generation_config = {}
);

OPENVINO_DEPRECATED("Please, specify device explicitly when create LLMPipeline. This overload will be removed in 2025.0.0 release")
explicit LLMPipeline(const std::filesystem::path& path) :
LLMPipeline(path, "CPU") { }

/**
* @brief Constructs an LLMPipeline from xml/bin files, tokenizers and configuration in the same dir.
* Accepts arbitrary list of optional properties.
Expand Down Expand Up @@ -153,7 +149,7 @@ class OPENVINO_GENAI_EXPORTS LLMPipeline {
LLMPipeline(
const ov::InferRequest& request,
const ov::genai::Tokenizer& tokenizer,
OptionalGenerationConfig generation_config=std::nullopt
OptionalGenerationConfig generation_config = std::nullopt
);

/**
Expand All @@ -172,10 +168,6 @@ class OPENVINO_GENAI_EXPORTS LLMPipeline {
const ov::AnyMap& properties = {}
);

OPENVINO_DEPRECATED("Please, specify device explicitly when create LLMPipeline. This overload will be removed in 2025.0.0 release")
LLMPipeline(const std::filesystem::path& models_path, const ov::genai::Tokenizer& tokenizer) :
LLMPipeline(models_path, tokenizer, "CPU") { }

~LLMPipeline();

/**
Expand Down Expand Up @@ -211,7 +203,7 @@ class OPENVINO_GENAI_EXPORTS LLMPipeline {

DecodedResults operator()(
StringInputs inputs,
OptionalGenerationConfig generation_config=std::nullopt,
OptionalGenerationConfig generation_config = std::nullopt,
StreamerVariant streamer=std::monostate()
) {
return generate(inputs, generation_config, streamer);
Expand Down
3 changes: 2 additions & 1 deletion src/cpp/src/continuous_batching_adapter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,14 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase {
std::move(res.m_generation_ids.begin(), res.m_generation_ids.end(), std::back_inserter(plain_replies));
std::move(res.m_scores.begin(), res.m_scores.end(), std::back_inserter(plain_scores));
}

PerfMetrics perf_metrics;
// For GenerationResults, all perf_metrics are the same except tokenization and detokenization durations.
// Since we return here only one perf_metrics, we should accumulate all tokenization and detokenization times.
if (generated.size() > 0) {
perf_metrics = generated[0].perf_metrics;
}

// Tokenizations and detokenization times are dispersed across GenerationResult vector.
// Need to collect them into a single perf_metric for DecodedResult.
auto& raw_metrics = perf_metrics.raw_metrics;
Expand Down
19 changes: 13 additions & 6 deletions src/cpp/src/continuous_batching_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,20 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() {
m_pipeline_metrics.max_cache_usage = std::max(m_pipeline_metrics.max_cache_usage, scheduler_output.m_cache_usage);
_register_step_cache_usage(scheduler_output.m_cache_usage);
m_pipeline_metrics.avg_cache_usage = _get_current_running_average_cache_usage();
m_pipeline_metrics.total_num_scheduled_tokens = scheduler_output.m_total_num_scheduled_tokens;

m_batch_size = 0; // total number of running sequences
for (size_t i = 0; i < scheduler_output.m_scheduled_sequence_groups_ids.size(); ++i) {
size_t seq_group_id = scheduler_output.m_scheduled_sequence_groups_ids[i];
SequenceGroup::CPtr sequence_group = m_requests[seq_group_id];
m_batch_size += sequence_group->num_running_seqs();
}

static ManualTimer copy_blocks_timer("scheduling");
copy_blocks_timer.start();
m_cache_manager->copy_blocks(scheduler_output.m_block_copy_map);
copy_blocks_timer.end();
}

// if no tokens were scheduled, we are out of memory => free all requests and return
if (scheduler_output.m_total_num_scheduled_tokens == 0) {
for (size_t i = 0; i < m_requests.size(); ++i) {
Expand Down Expand Up @@ -297,14 +304,13 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o
try {
const auto infer_start = std::chrono::steady_clock::now();
step();
auto num_generated_tokens = get_metrics().total_num_scheduled_tokens;
if (num_generated_tokens > 0) {
if (m_batch_size > 0) {
const auto infer_end = std::chrono::steady_clock::now();
const auto infer_ms = PerfMetrics::get_microsec(infer_end - infer_start);
const auto infer_ms = PerfMetrics::get_microsec(std::chrono::steady_clock::now() - infer_start);
raw_perf_counters.m_token_infer_durations.emplace_back(infer_ms);
raw_perf_counters.m_inference_durations[0] += MicroSeconds(infer_ms);
raw_perf_counters.m_new_token_times.emplace_back(infer_end);
raw_perf_counters.m_batch_sizes.emplace_back(num_generated_tokens);
raw_perf_counters.m_batch_sizes.emplace_back(m_batch_size);
}
} catch (...) {
drop_requests(); // remove all requests from pipeline state in case of exception
Expand Down Expand Up @@ -360,10 +366,11 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o
}

result.m_status = generations[request_id]->get_status();

// The same perf metrics for each sequence, only tokenization/detokenization will differ.
perf_metrics.raw_metrics.generate_durations.clear();
perf_metrics.raw_metrics.generate_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - start_time));
perf_metrics.num_input_tokens = request->get_prompt_len();
perf_metrics.evaluate_statistics(start_time);

result.perf_metrics = perf_metrics;
Expand Down
6 changes: 5 additions & 1 deletion src/cpp/src/continuous_batching_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc
static const size_t AVG_CACHE_USAGE_WINDOW_SIZE_IN_STEPS = 1000;
std::deque<float> m_previous_step_cache_usages;

// for perf metrics
float m_load_time_ms = 0.0f;
size_t m_batch_size = 0; // stored number of scheduled sequences on last step

// flag to enable validation mode for sampler
bool m_is_validation_mode_enabled = false;

Expand Down Expand Up @@ -75,7 +79,7 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc
void _register_step_cache_usage(float step_cache_usage);
float _get_current_running_average_cache_usage() const;

void drop_requests() override;
virtual void drop_requests();

public:
ContinuousBatchingImpl(const std::shared_ptr<ov::Model>& model,
Expand Down
33 changes: 31 additions & 2 deletions src/cpp/src/continuous_batching_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,18 @@ extract_prompt_lookup_from_config(ov::AnyMap& config) {
return res;
}

inline float get_load_time(std::chrono::steady_clock::time_point start_time) {
auto stop_time = std::chrono::steady_clock::now();
return std::chrono::duration_cast<std::chrono::milliseconds>(stop_time - start_time).count();
}

ContinuousBatchingPipeline::ContinuousBatchingPipeline( const std::filesystem::path& models_path,
const SchedulerConfig& scheduler_config,
const std::string& device,
const ov::AnyMap& properties,
const ov::AnyMap& tokenizer_properties) {
auto start_time = std::chrono::steady_clock::now();

auto properties_without_draft_model = properties;
auto draft_model_desr = extract_draft_model_from_config(properties_without_draft_model);
auto is_prompt_lookup_enabled = extract_prompt_lookup_from_config(properties_without_draft_model);
Expand All @@ -61,6 +68,8 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline( const std::filesystem::p
} else {
m_impl = std::make_shared<ContinuousBatchingImpl>(model, tokenizer, scheduler_config, device, properties, generation_config);
}

m_impl->m_load_time_ms = get_load_time(start_time);
}

ContinuousBatchingPipeline::ContinuousBatchingPipeline(
Expand All @@ -69,6 +78,8 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline(
const SchedulerConfig& scheduler_config,
const std::string& device,
const ov::AnyMap& properties) {
auto start_time = std::chrono::steady_clock::now();

auto properties_without_draft_model = properties;
auto draft_model_desr = extract_draft_model_from_config(properties_without_draft_model);
auto is_prompt_lookup_enabled = extract_prompt_lookup_from_config(properties_without_draft_model);
Expand All @@ -85,6 +96,8 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline(
} else {
m_impl = std::make_shared<ContinuousBatchingImpl>(model, tokenizer, scheduler_config, device, properties, generation_config);
}

m_impl->m_load_time_ms = get_load_time(start_time);
}

ContinuousBatchingPipeline::ContinuousBatchingPipeline(
Expand All @@ -95,6 +108,8 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline(
const std::string& device,
const ov::AnyMap& properties,
const ov::genai::GenerationConfig& generation_config) {
auto start_time = std::chrono::steady_clock::now();

auto properties_without_draft_model = properties;
auto draft_model_desr = extract_draft_model_from_config(properties_without_draft_model);
auto is_prompt_lookup_enabled = extract_prompt_lookup_from_config(properties_without_draft_model);
Expand All @@ -109,6 +124,8 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline(
} else {
m_impl = std::make_shared<ContinuousBatchingImpl>(model, tokenizer, scheduler_config, device, properties, generation_config);
}

m_impl->m_load_time_ms = get_load_time(start_time);
}

ov::genai::Tokenizer ContinuousBatchingPipeline::get_tokenizer() {
Expand Down Expand Up @@ -140,11 +157,23 @@ bool ContinuousBatchingPipeline::has_non_finished_requests() {
}

std::vector<EncodedGenerationResult> ContinuousBatchingPipeline::generate(const std::vector<ov::Tensor>& input_ids, const std::vector<ov::genai::GenerationConfig>& sampling_params, const StreamerVariant& streamer) {
return m_impl->generate(input_ids, sampling_params, streamer);
auto encoded_results = m_impl->generate(input_ids, sampling_params, streamer);

for (auto& encoded_result : encoded_results) {
encoded_result.perf_metrics.load_time = m_impl->m_load_time_ms;
}

return encoded_results;
}

std::vector<GenerationResult> ContinuousBatchingPipeline::generate(const std::vector<std::string>& prompts, const std::vector<ov::genai::GenerationConfig>& sampling_params, const StreamerVariant& streamer) {
return m_impl->generate(prompts, sampling_params, streamer);
auto decoded_results = m_impl->generate(prompts, sampling_params, streamer);

for (auto& decoded_result : decoded_results) {
decoded_result.perf_metrics.load_time = m_impl->m_load_time_ms;
}

return decoded_results;
}

void ContinuousBatchingPipeline::start_chat(const std::string& system_message) {
Expand Down
1 change: 1 addition & 0 deletions src/cpp/src/icontinuous_batching.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate(
}

std::vector<EncodedGenerationResult> encoded = generate(input_ids, sampling_params, streamer);

std::vector<GenerationResult> decoded;
decoded.reserve(encoded.size());
for (size_t i = 0; i < encoded.size(); ++i) {
Expand Down
16 changes: 9 additions & 7 deletions src/cpp/src/icontinuous_batching.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ContinuousBatchingPipeline::IContinuousBatchingPipeline {

// TODO (mzegla): GenerationConfig is request specific object
// and pipeline only uses default rng_seed and some special tokens.
ov::genai::GenerationConfig m_generation_config;
GenerationConfig m_generation_config;

PipelineMetrics m_pipeline_metrics;

Expand All @@ -42,27 +42,29 @@ class ContinuousBatchingPipeline::IContinuousBatchingPipeline {
bool m_is_chat_conversation = false;
ChatHistory m_history;

virtual void drop_requests() = 0;
float m_load_time_ms = 0.0f;
// to access m_load_time_ms
friend class ContinuousBatchingPipeline;

public:
ov::genai::GenerationConfig get_config() const;
GenerationConfig get_config() const;
PipelineMetrics get_metrics() const;
ov::genai::Tokenizer get_tokenizer();
Tokenizer get_tokenizer();

/**
* Adds requests to awaiting queue using encoded inputs
*/
virtual GenerationHandle add_request(uint64_t request_id,
const ov::Tensor& input_ids,
ov::genai::GenerationConfig sampling_params) = 0;
GenerationConfig sampling_params) = 0;

/**
* Adds request to running queue based on string input
* This step also performs tokenization's encode
*/
virtual GenerationHandle add_request(uint64_t request_id,
const std::string& prompt,
ov::genai::GenerationConfig sampling_params) = 0;
GenerationConfig sampling_params) = 0;

/**
* Checks whether server (pipeline) has non-finished requests and step() should be called within a loop
Expand All @@ -87,7 +89,7 @@ class ContinuousBatchingPipeline::IContinuousBatchingPipeline {
*/
std::vector<GenerationResult>
generate(const std::vector<std::string>& prompts,
std::vector<ov::genai::GenerationConfig> sampling_params,
std::vector<GenerationConfig> sampling_params,
const StreamerVariant& streamer);

/**
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/src/llm_pipeline_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class LLMPipelineImplBase {
GenerationConfig m_generation_config;
std::optional<AdapterController> m_adapter_controller;

float m_load_time_ms = 0;
float m_load_time_ms = 0.0f;
};

} // namespace genai
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/src/llm_pipeline_stateful.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ void StatefulLLMPipeline::start_chat(const std::string& system_message) {
if (!m_tokenized_chat_history.empty()) {
reset_kv_state();
m_history = {};
m_templated_chat_history = "";
m_templated_chat_history.clear();
m_tokenized_chat_history.clear();
}
if (system_message.empty())
Expand Down
6 changes: 5 additions & 1 deletion src/cpp/src/lm_encoding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ std::pair<EncodedResults, std::optional<int64_t>> get_lm_encoded_results(
int64_t * input_ids_data = new_input_ids.data<int64_t>();

std::vector<int32_t> next_beams;
size_t current_batch_size = 0;

for (auto& sequence_group : active_sequence_groups) {
std::vector<Sequence::Ptr> running_sequences = sequence_group->get_running_sequences();
size_t num_running_sequences = running_sequences.size();
Expand All @@ -176,6 +178,8 @@ std::pair<EncodedResults, std::optional<int64_t>> get_lm_encoded_results(
// for different sequences iteration of beams started from 0, but we collect it to one input_ids
next_beams.push_back(beam_idxs[sequence->get_id()] + beam_offets.at(sequence_group->get_request_id()));
}

current_batch_size += num_running_sequences;
}

for (size_t i = 0; i < active_sequence_groups.size(); i++) {
Expand Down Expand Up @@ -209,7 +213,7 @@ std::pair<EncodedResults, std::optional<int64_t>> get_lm_encoded_results(
raw_perf_counters.m_inference_durations[0] += MicroSeconds(infer_ms);
raw_perf_counters.m_token_infer_durations.emplace_back(infer_ms);
raw_perf_counters.m_new_token_times.emplace_back(infer_end);
raw_perf_counters.m_batch_sizes.emplace_back(batch_size);
raw_perf_counters.m_batch_sizes.emplace_back(current_batch_size);

sampler_output = sampler.sample(active_sequence_groups, m_llm.get_tensor("logits"));
free_non_running_requests();
Expand Down
6 changes: 3 additions & 3 deletions src/cpp/src/perf_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ void PerfMetrics::evaluate_statistics(std::optional<TimePoint> start_time) {
raw_metrics.m_durations.reserve(tok_times.size());

auto ttft = tok_times[0] - start_time_val;
raw_metrics.m_times_to_first_token = std::vector<MicroSeconds>();
raw_metrics.m_times_to_first_token.clear();
raw_metrics.m_times_to_first_token.emplace_back(ttft);
num_generated_tokens = batch_sizes[0];

// The very first infer request (prefill stage) is slower than subsequent ones since we process a sequence of tokens.
// To have a clearer TPOT number, the time taken to generate the very first token at the prefill stage
// must not be included in the TPOT calculation. The first duration used for TPOT is from the first token
Expand All @@ -114,7 +114,7 @@ void PerfMetrics::evaluate_statistics(std::optional<TimePoint> start_time) {
num_generated_tokens += batch_sizes[i];
}
}

// calc_mean_and_std will convert microsecond to milliseconds.
tpot = calc_mean_and_std(raw_metrics.m_durations);
ipot = calc_mean_and_std(raw_metrics.m_token_infer_durations);
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/src/prompt_lookup/prompt_lookup_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class ContinuousBatchingPipeline::PromptLookupImpl : public ContinuousBatchingPi
std::shared_ptr<ContinuousBatchingForPromptLookupImpl> m_pipeline;
SpeculativeDecodingMetrics m_sd_metrics;

void drop_requests() override;
void drop_requests();

public:
PromptLookupImpl(const std::shared_ptr<ov::Model>& model,
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/src/sampler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ MatchStopStringResult match_stop_string(Tokenizer& tokenizer,
}

// find token cnt to be removed from sequence by decoding token by token
std::string decoded_partially_string = "";
std::string decoded_partially_string;
for (size_t i = 0; i < buffer.size(); ++i) {
decoded_partially_string += tokenizer.decode(TokenIds{buffer[i]});
if (decoded_partially_string.find(decoded_buffer) != std::string::npos) {
Expand Down
Loading

0 comments on commit 6f36295

Please sign in to comment.