Skip to content

Commit

Permalink
Merge branch 'master' into use-continuos-batching-by-default
Browse files Browse the repository at this point in the history
  • Loading branch information
ilya-lavrenov authored Dec 24, 2024
2 parents 44a9249 + 021d880 commit f7b2ac8
Show file tree
Hide file tree
Showing 30 changed files with 632 additions and 167 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/llm_bench-python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ jobs:
rm -rf ./ov_models/internvl2-1B
- name: WWB Tests
run: |
pip install git+https://github.com/huggingface/optimum-intel.git@420fa87d039425a906b7f755e4562b65947f016a
pip install git+https://github.com/huggingface/optimum-intel.git
GIT_CLONE_PROTECTION_ACTIVE=false PIP_PRE=1 PIP_EXTRA_INDEX_URL=https://storage.openvinotoolkit.org/simple/wheels/nightly pip install ${{ env.WWB_PATH }}
python -m pytest -v ${{ env.WWB_PATH }}/tests
stateful:
Expand Down Expand Up @@ -190,7 +190,7 @@ jobs:
- name: WWB Tests
run: |
pip install pytest
pip install git+https://github.com/huggingface/optimum-intel.git@420fa87d039425a906b7f755e4562b65947f016a
pip install git+https://github.com/huggingface/optimum-intel.git
GIT_CLONE_PROTECTION_ACTIVE=false PIP_PRE=1 PIP_EXTRA_INDEX_URL=https://storage.openvinotoolkit.org/simple/wheels/nightly pip install ${{ env.WWB_PATH }}
python -m pytest -v ${{ env.WWB_PATH }}/tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,10 @@ int main(int argc, char* argv[]) try {

std::string device = "CPU";

ov::genai::SchedulerConfig scheduler_config;
scheduler_config.cache_size = 5;

ov::genai::LLMPipeline pipe(
model_path,
device,
ov::genai::prompt_lookup(true),
ov::genai::scheduler_config(scheduler_config));
ov::genai::prompt_lookup(true));

auto streamer = [](std::string subword) {
std::cout << subword << std::flush;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,10 @@ int main(int argc, char* argv[]) try {
// Please, set device for main model in `LLMPipeline` constructor and in in `ov::genai::draft_model` for draft.
std::string main_device = "CPU", draft_device = "CPU";

ov::genai::SchedulerConfig scheduler_config;
scheduler_config.cache_size = 5;

ov::genai::LLMPipeline pipe(
main_model_path,
main_device,
ov::genai::draft_model(draft_model_path, draft_device),
ov::genai::scheduler_config(scheduler_config));
ov::genai::draft_model(draft_model_path, draft_device));

auto streamer = [](std::string subword) {
std::cout << subword << std::flush;
Expand Down
2 changes: 1 addition & 1 deletion samples/export-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
--extra-index-url https://storage.openvinotoolkit.org/simple/wheels/pre-release
--extra-index-url https://storage.openvinotoolkit.org/simple/wheels/nightly
openvino-tokenizers~=2025.0.0.0.dev
optimum-intel @ git+https://github.com/huggingface/optimum-intel.git@420fa87d039425a906b7f755e4562b65947f016a
optimum-intel @ git+https://github.com/huggingface/optimum-intel.git
numpy<2.0.0; sys_platform == 'darwin'
einops==0.8.0 # For Qwen
transformers_stream_generator==0.0.5 # For Qwen
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@ def main():
args = parser.parse_args()

device = 'CPU'
scheduler_config = openvino_genai.SchedulerConfig()
# cache params
scheduler_config.cache_size = 2

pipe = openvino_genai.LLMPipeline(args.model_dir, device, scheduler_config=scheduler_config, prompt_lookup=True)
pipe = openvino_genai.LLMPipeline(args.model_dir, device, prompt_lookup=True)

config = openvino_genai.GenerationConfig()
config.max_new_tokens = 100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,9 @@ def main():
main_device = 'CPU' # GPU can be used as well
draft_device = 'CPU'

scheduler_config = openvino_genai.SchedulerConfig()
# cache params
scheduler_config.cache_size = 2

draft_model = openvino_genai.draft_model(args.draft_model_dir, draft_device)

pipe = openvino_genai.LLMPipeline(args.model_dir, main_device, scheduler_config=scheduler_config, draft_model=draft_model)
pipe = openvino_genai.LLMPipeline(args.model_dir, main_device, draft_model=draft_model)

config = openvino_genai.GenerationConfig()
config.max_new_tokens = 100
Expand Down
51 changes: 47 additions & 4 deletions src/cpp/src/block_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,21 +205,42 @@ class BlockAllocator {
* Blocks returned will be vectors with this size, each vector entry to be associated with a separate layer's KV cache.
*/
BlockAllocator(size_t num_blocks, bool enable_prefix_caching, size_t num_layers = 1) :
m_free_blocks_num(num_layers, num_blocks), m_total_num_blocks(num_blocks), m_num_layers(num_layers), m_enable_prefix_caching(enable_prefix_caching), m_overwriteable_blocks(num_layers) {
m_total_num_blocks(num_blocks), m_num_layers(num_layers), m_enable_prefix_caching(enable_prefix_caching), m_overwriteable_blocks(num_layers) {
OPENVINO_ASSERT(num_layers != 0, "num_layers must be non-zero");
m_free_blocks.resize(m_num_layers);
for (auto& per_layer_block_list : m_free_blocks) {
for (int block_id = 0; block_id < m_total_num_blocks; ++block_id) {
per_layer_block_list.push_back(std::make_shared<KVCacheBlock>(block_id));
if (num_blocks > 0) {
m_free_blocks_num = std::vector<size_t>(num_layers, num_blocks);
for (auto& per_layer_block_list : m_free_blocks) {
for (int block_id = 0; block_id < m_total_num_blocks; ++block_id) {
per_layer_block_list.push_back(std::make_shared<KVCacheBlock>(block_id));
}
}
}
else {
m_free_blocks_num = std::vector<size_t>(m_num_layers, 0);
}
}

~BlockAllocator() {
// sanity check to validate that all blocks are freed
// OPENVINO_ASSERT(m_total_num_blocks == m_free_blocks.size());
}

void increase_kv_blocks_number(size_t new_kv_blocks_count) {
OPENVINO_ASSERT(new_kv_blocks_count > m_total_num_blocks, "New blocks number should be more than previous blocks number.");
size_t added_blocks = new_kv_blocks_count - m_total_num_blocks;
for (auto idx = 0; idx < m_free_blocks_num.size(); idx++) {
m_free_blocks_num[idx] += added_blocks;
}
for (auto& per_layer_block_list : m_free_blocks) {
for (int block_id = m_total_num_blocks; block_id < new_kv_blocks_count; ++block_id) {
per_layer_block_list.push_back(std::make_shared<KVCacheBlock>(block_id));
}
}
m_total_num_blocks = new_kv_blocks_count;
}


/**
* Returns the number of free blocks for a given layer.
* @param layer_idx Index of the layer.
Expand Down Expand Up @@ -459,6 +480,13 @@ class BlockAllocator {
for (size_t layer_idx = 0; layer_idx < m_num_layers; layer_idx++) sum += num_free_blocks(layer_idx);
return static_cast<float>(m_num_layers * m_total_num_blocks - sum) / (m_num_layers * m_total_num_blocks) * 100;
}

/**
* @return The total number of KV blocks .
*/
size_t get_total_number_of_kv_blocks() const {
return m_total_num_blocks;
}
};

/**
Expand Down Expand Up @@ -713,6 +741,21 @@ class BlockManager {
return m_allocator.get_used_percentage();
}

/**
* Increases the number of KV blocks.
* @param num_blocks The new number of KV-blocks.
*/
void increase_kv_blocks_number(size_t num_blocks) {
m_allocator.increase_kv_blocks_number(num_blocks);
}

/**
* @return The total number of KV blocks .
*/
size_t get_total_number_of_kv_blocks() const {
return m_allocator.get_total_number_of_kv_blocks();
}

/**
* @brief Forks a sequence, establishing a new sequence from an existing one, reusing
* currently allocated blocks of the existing sequence.
Expand Down
124 changes: 106 additions & 18 deletions src/cpp/src/cache_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,118 @@ class CacheManager {
DeviceConfig m_device_config;
std::vector<ov::Tensor> m_key_cache;
std::vector<ov::Tensor> m_value_cache;
size_t m_num_allocated_kv_blocks = 0;
ov::Core m_core;
ov::InferRequest m_request;

ov::Shape set_first_dim_and_make_static(const ov::PartialShape& shape, size_t dim) {
ov::PartialShape res_shape = shape;
res_shape[0] = dim;
OPENVINO_ASSERT(res_shape.is_static());
return res_shape.to_shape();
}

void update_request_tensor(size_t decoder_layer_id) {
m_request.set_tensor(std::string("key_cache.") + std::to_string(decoder_layer_id), m_key_cache[decoder_layer_id]);
m_request.set_tensor(std::string("value_cache.") + std::to_string(decoder_layer_id), m_value_cache[decoder_layer_id]);
}

public:
explicit CacheManager(const DeviceConfig &device_config, ov::Core core) :
explicit CacheManager(const DeviceConfig &device_config, ov::InferRequest request, ov::Core core) :
m_device_config(device_config),
m_request(request),
m_core(core) {
m_key_cache.reserve(m_device_config.get_num_layers());
m_value_cache.reserve(m_device_config.get_num_layers());
}

void allocate_cache_if_needed(size_t num_kv_blocks) {
if (m_num_allocated_kv_blocks >= num_kv_blocks) {
return;
}
OPENVINO_ASSERT(m_key_cache.size() == m_value_cache.size());
m_num_allocated_kv_blocks = num_kv_blocks;
ov::Shape value_cache_shape = set_first_dim_and_make_static(m_device_config.get_value_cache_shape(), num_kv_blocks);
ov::Shape key_cache_shape = set_first_dim_and_make_static(m_device_config.get_key_cache_shape(), num_kv_blocks);

const std::string device_name = m_device_config.get_device();

ov::Coordinate start_key{0,0,0,0};
ov::Coordinate start_value{0,0,0,0};

const std::string device_name = device_config.get_device();
if (device_name.find("GPU") == std::string::npos) {// Allocate KV caches
for (size_t decoder_layer_id = 0; decoder_layer_id < m_device_config.get_num_layers(); ++decoder_layer_id) {
ov::Tensor key_cache(device_config.get_cache_precision(), device_config.get_key_cache_shape());
ov::Tensor value_cache(device_config.get_cache_precision(), device_config.get_value_cache_shape());
ov::Tensor key_cache(m_device_config.get_cache_precision(), key_cache_shape);
ov::Tensor value_cache(m_device_config.get_cache_precision(), value_cache_shape);

auto key_cache_roi_end = static_cast<unsigned char*>(key_cache.data());
auto value_cache_roi_end = static_cast<unsigned char*>(value_cache.data());
size_t key_roi_size_byte = 0;
size_t value_roi_size_byte = 0;

if (m_key_cache.size() > decoder_layer_id) {
ov::Coordinate end_key = m_key_cache[decoder_layer_id].get_shape();
ov::Coordinate end_value = m_value_cache[decoder_layer_id].get_shape();

key_roi_size_byte = m_key_cache[decoder_layer_id].get_byte_size();
value_roi_size_byte = m_value_cache[decoder_layer_id].get_byte_size();
key_cache_roi_end = static_cast<unsigned char*>(key_cache.data()) + key_roi_size_byte;
value_cache_roi_end = static_cast<unsigned char*>(value_cache.data()) + value_roi_size_byte;

// copy current cache data
ov::Tensor dst_key_roi(key_cache, start_key, end_key);
ov::Tensor dst_value_roi(value_cache, start_value, end_value);

m_key_cache[decoder_layer_id].copy_to(dst_key_roi);
m_value_cache[decoder_layer_id].copy_to(dst_value_roi);

}

// force allocation
std::memset(key_cache.data(), 0, key_cache.get_byte_size());
std::memset(value_cache.data(), 0, value_cache.get_byte_size());
// Some optimizations like AVX2, AVX512, AMX require a minimal shape and
// perform multiplying by zero on the excess data. Uninitialized tensor data contain NAN's,
// so NAN * 0 returns non-zero invalid data.
// So we need to set zeros to all newly allocated tensors data.
std::memset(key_cache_roi_end, 0, key_cache.get_byte_size() - key_roi_size_byte);
std::memset(value_cache_roi_end, 0, value_cache.get_byte_size() - value_roi_size_byte);

// set new cache tensors
if (m_key_cache.size() > decoder_layer_id) {
m_key_cache[decoder_layer_id] = key_cache;
m_value_cache[decoder_layer_id] = value_cache;
}
else {
m_key_cache.emplace_back(key_cache);
m_value_cache.emplace_back(value_cache);
}

m_key_cache.emplace_back(key_cache);
m_value_cache.emplace_back(value_cache);
update_request_tensor(decoder_layer_id);
}
} else {
auto remote_context = m_core.get_default_context(device_name);
for (size_t decoder_layer_id = 0; decoder_layer_id < m_device_config.get_num_layers(); ++decoder_layer_id) {
ov::Tensor key_cache = remote_context.create_tensor(device_config.get_cache_precision(),
device_config.get_key_cache_shape());
ov::Tensor value_cache = remote_context.create_tensor(device_config.get_cache_precision(),
device_config.get_value_cache_shape());

m_key_cache.emplace_back(key_cache);
m_value_cache.emplace_back(value_cache);
ov::Tensor key_cache = remote_context.create_tensor(m_device_config.get_cache_precision(),
key_cache_shape);
ov::Tensor value_cache = remote_context.create_tensor(m_device_config.get_cache_precision(),
value_cache_shape);

if (m_key_cache.size() > decoder_layer_id) {
ov::Coordinate end_key = m_key_cache[decoder_layer_id].get_shape();
ov::Coordinate end_value = m_value_cache[decoder_layer_id].get_shape();

// copy current cache data
ov::RemoteTensor dst_key_roi(key_cache, start_key, end_key);
ov::RemoteTensor dst_value_roi(value_cache, start_value, end_value);
dst_key_roi.copy_from(m_key_cache[decoder_layer_id]);
dst_value_roi.copy_from(m_value_cache[decoder_layer_id]);

m_key_cache[decoder_layer_id] = key_cache;
m_value_cache[decoder_layer_id] = value_cache;
}
else {
m_key_cache.emplace_back(key_cache);
m_value_cache.emplace_back(value_cache);
}
update_request_tensor(decoder_layer_id);
}
}
}
Expand All @@ -62,8 +142,8 @@ class CacheManager {
}

void copy_blocks(const std::map<size_t, std::list<size_t>>& block_copy_map) {
ov::Shape key_shape = m_device_config.get_key_cache_shape();
ov::Shape value_shape = m_device_config.get_value_cache_shape();
ov::Shape key_shape = set_first_dim_and_make_static(m_device_config.get_key_cache_shape(), m_num_allocated_kv_blocks);
ov::Shape value_shape = set_first_dim_and_make_static(m_device_config.get_value_cache_shape(), m_num_allocated_kv_blocks);

ov::Coordinate key_src_start_roi(key_shape.size(), 0);
ov::Coordinate key_src_end_roi = key_shape;
Expand Down Expand Up @@ -98,5 +178,13 @@ class CacheManager {
}
}
}

std::shared_ptr<Core> get_core() {
return std::make_shared<Core>(m_core);
}

std::shared_ptr<DeviceConfig> get_device_config() {
return std::make_shared<DeviceConfig>(m_device_config);
}
};
}
10 changes: 2 additions & 8 deletions src/cpp/src/continuous_batching_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::init(
ov::InferRequest infer_request = compiled_model.create_infer_request();

// setup KV caches
m_cache_manager = std::make_shared<CacheManager>(device_config, core);
for (size_t decoder_layer_id = 0; decoder_layer_id < device_config.get_num_layers(); ++decoder_layer_id) {
infer_request.set_tensor(std::string("key_cache.") + std::to_string(decoder_layer_id), m_cache_manager->get_key_cache(decoder_layer_id));
infer_request.set_tensor(std::string("value_cache.") + std::to_string(decoder_layer_id), m_cache_manager->get_value_cache(decoder_layer_id));
}
m_cache_manager = std::make_shared<CacheManager>(device_config, infer_request, core);

SchedulerConfig updated_config = scheduler_config;
// update KV blocks number in scheduler config
Expand All @@ -71,8 +67,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::init(
// as it may lead to performance slowdown
can_use_partial_preemption = false;
}

m_scheduler = std::make_shared<Scheduler>(device_config.get_block_size(), updated_config, device_config.get_num_layers(), can_use_partial_preemption);
m_scheduler = std::make_shared<Scheduler>(device_config.get_block_size(), m_cache_manager, updated_config, device_config.get_num_layers(), can_use_partial_preemption);
// and finally create model runner
bool is_use_cache_eviction = m_scheduler->get_config().use_cache_eviction;
m_model_runner = std::make_shared<ModelRunner>(infer_request, m_scheduler->get_block_size(), device_config.get_num_layers(), is_use_cache_eviction);
Expand Down Expand Up @@ -133,7 +128,6 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() {
_pull_awaiting_requests();

m_pipeline_metrics.requests = m_requests.size();

Scheduler::Output scheduler_output;
{
static ManualTimer timer("scheduling");
Expand Down
Loading

0 comments on commit f7b2ac8

Please sign in to comment.