Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic KV cache allocation #1364

Merged
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
a8c290a
Dynamic KV-cache allocation.
popovaan Dec 11, 2024
8b68108
Minor corrections.
popovaan Dec 11, 2024
5c384a0
Fixed cpp tests, added tests of dynamic allocation.
popovaan Dec 12, 2024
4e97cf9
Merge remote-tracking branch 'upstream/master' into dynamic_kv_cache_…
popovaan Dec 12, 2024
0f32f1f
Fixed typo.
popovaan Dec 12, 2024
d3f15fa
Test corrected.
popovaan Dec 12, 2024
ec7ca26
Minor corrections.
popovaan Dec 12, 2024
543cbd6
Merge remote-tracking branch 'upstream/master' into dynamic_kv_cache_…
popovaan Dec 13, 2024
175f241
Minor corrections.
popovaan Dec 13, 2024
a6facc5
Minor correction.
popovaan Dec 13, 2024
34f6d27
Removed not used code.
popovaan Dec 13, 2024
0f50cb7
Merge branch 'master' into dynamic_kv_cache_allocation
popovaan Dec 16, 2024
0c3bb28
Code optimizations.
popovaan Dec 17, 2024
a105a9f
Code corrections.
popovaan Dec 17, 2024
7537997
Merge upsteam/master.
popovaan Dec 18, 2024
a8531a5
Added available memory check for GPU.
popovaan Dec 19, 2024
9043ba3
Minor correction.
popovaan Dec 19, 2024
9256f15
Code corrections.
popovaan Dec 19, 2024
d926303
Minor correction.
popovaan Dec 19, 2024
eb4d110
Used correct core instance.
popovaan Dec 19, 2024
f94929c
Moved increasing of cache logic to scheduler.
popovaan Dec 20, 2024
a1e4973
Merge upstream/master.
popovaan Dec 20, 2024
38a42d6
Made sheduler config not needed for prompt lookup.
popovaan Dec 20, 2024
c7d54dd
Minor correction.
popovaan Dec 20, 2024
51cb0a8
Fixed error.
popovaan Dec 20, 2024
c4c8c25
Removed wrong changes.
popovaan Dec 20, 2024
bfcf9ff
Fixed error.
popovaan Dec 20, 2024
11b5e33
Minor correction.
popovaan Dec 20, 2024
64dab76
Removed wrong changes.
popovaan Dec 20, 2024
0d71053
Merge branch 'master' into dynamic_kv_cache_allocation
ilya-lavrenov Dec 20, 2024
bb24a36
Fix.
popovaan Dec 23, 2024
13f9f08
Fix of cache increasing for gpu.
popovaan Dec 23, 2024
f04c06d
Merge branch 'master' into dynamic_kv_cache_allocation
popovaan Dec 24, 2024
eebac1f
Applied comments.
popovaan Dec 24, 2024
2715110
Update src/cpp/src/scheduler.hpp
popovaan Dec 24, 2024
1d3f85b
Update src/cpp/src/scheduler.hpp
popovaan Dec 24, 2024
3fa02d0
Update src/cpp/src/scheduler.hpp
popovaan Dec 24, 2024
a0456d8
Update scheduler.hpp
ilya-lavrenov Dec 24, 2024
e393d3d
Merge branch 'master' into dynamic_kv_cache_allocation
ilya-lavrenov Dec 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,10 @@ int main(int argc, char* argv[]) try {

std::string device = "CPU";

ov::genai::SchedulerConfig scheduler_config;
scheduler_config.cache_size = 5;

ov::genai::LLMPipeline pipe(
model_path,
device,
ov::genai::prompt_lookup(true),
ov::genai::scheduler_config(scheduler_config));
ov::genai::prompt_lookup(true));

auto streamer = [](std::string subword) {
std::cout << subword << std::flush;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,10 @@ int main(int argc, char* argv[]) try {
// Please, set device for main model in `LLMPipeline` constructor and in in `ov::genai::draft_model` for draft.
std::string main_device = "CPU", draft_device = "CPU";

ov::genai::SchedulerConfig scheduler_config;
scheduler_config.cache_size = 5;
popovaan marked this conversation as resolved.
Show resolved Hide resolved

ov::genai::LLMPipeline pipe(
main_model_path,
main_device,
ov::genai::draft_model(draft_model_path, draft_device),
ov::genai::scheduler_config(scheduler_config));
ov::genai::draft_model(draft_model_path, draft_device));

auto streamer = [](std::string subword) {
std::cout << subword << std::flush;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@ def main():
args = parser.parse_args()

device = 'CPU'
scheduler_config = openvino_genai.SchedulerConfig()
# cache params
scheduler_config.cache_size = 2

pipe = openvino_genai.LLMPipeline(args.model_dir, device, scheduler_config=scheduler_config, prompt_lookup=True)
pipe = openvino_genai.LLMPipeline(args.model_dir, device, prompt_lookup=True)

config = openvino_genai.GenerationConfig()
config.max_new_tokens = 100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,9 @@ def main():
main_device = 'CPU' # GPU can be used as well
draft_device = 'CPU'

scheduler_config = openvino_genai.SchedulerConfig()
# cache params
scheduler_config.cache_size = 2

draft_model = openvino_genai.draft_model(args.draft_model_dir, draft_device)

pipe = openvino_genai.LLMPipeline(args.model_dir, main_device, scheduler_config=scheduler_config, draft_model=draft_model)
pipe = openvino_genai.LLMPipeline(args.model_dir, main_device, draft_model=draft_model)

config = openvino_genai.GenerationConfig()
config.max_new_tokens = 100
Expand Down
51 changes: 47 additions & 4 deletions src/cpp/src/block_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,21 +205,42 @@ class BlockAllocator {
* Blocks returned will be vectors with this size, each vector entry to be associated with a separate layer's KV cache.
*/
BlockAllocator(size_t num_blocks, bool enable_prefix_caching, size_t num_layers = 1) :
m_free_blocks_num(num_layers, num_blocks), m_total_num_blocks(num_blocks), m_num_layers(num_layers), m_enable_prefix_caching(enable_prefix_caching), m_overwriteable_blocks(num_layers) {
m_total_num_blocks(num_blocks), m_num_layers(num_layers), m_enable_prefix_caching(enable_prefix_caching), m_overwriteable_blocks(num_layers) {
OPENVINO_ASSERT(num_layers != 0, "num_layers must be non-zero");
m_free_blocks.resize(m_num_layers);
for (auto& per_layer_block_list : m_free_blocks) {
for (int block_id = 0; block_id < m_total_num_blocks; ++block_id) {
per_layer_block_list.push_back(std::make_shared<KVCacheBlock>(block_id));
if (num_blocks > 0) {
m_free_blocks_num = std::vector<size_t>(num_layers, num_blocks);
for (auto& per_layer_block_list : m_free_blocks) {
for (int block_id = 0; block_id < m_total_num_blocks; ++block_id) {
per_layer_block_list.push_back(std::make_shared<KVCacheBlock>(block_id));
}
}
}
else {
m_free_blocks_num = std::vector<size_t>(m_num_layers, 0);
}
}

~BlockAllocator() {
// sanity check to validate that all blocks are freed
// OPENVINO_ASSERT(m_total_num_blocks == m_free_blocks.size());
}

void increase_kv_blocks_number(size_t new_kv_blocks_count) {
OPENVINO_ASSERT(new_kv_blocks_count > m_total_num_blocks, "New blocks number should be more than previous blocks number.");
size_t added_blocks = new_kv_blocks_count - m_total_num_blocks;
for (auto idx = 0; idx < m_free_blocks_num.size(); idx++) {
m_free_blocks_num[idx] += added_blocks;
}
for (auto& per_layer_block_list : m_free_blocks) {
for (int block_id = m_total_num_blocks; block_id < new_kv_blocks_count; ++block_id) {
per_layer_block_list.push_back(std::make_shared<KVCacheBlock>(block_id));
}
}
m_total_num_blocks = new_kv_blocks_count;
}


/**
* Returns the number of free blocks for a given layer.
* @param layer_idx Index of the layer.
Expand Down Expand Up @@ -459,6 +480,13 @@ class BlockAllocator {
for (size_t layer_idx = 0; layer_idx < m_num_layers; layer_idx++) sum += num_free_blocks(layer_idx);
return static_cast<float>(m_num_layers * m_total_num_blocks - sum) / (m_num_layers * m_total_num_blocks) * 100;
}

/**
* @return The total number of KV blocks .
*/
size_t get_total_number_of_kv_blocks() const {
return m_total_num_blocks;
}
};

/**
Expand Down Expand Up @@ -713,6 +741,21 @@ class BlockManager {
return m_allocator.get_used_percentage();
}

/**
* Increases the number of KV blocks.
* @param num_blocks The new number of KV-blocks.
*/
void increase_kv_blocks_number(size_t num_blocks) {
m_allocator.increase_kv_blocks_number(num_blocks);
}

/**
* @return The total number of KV blocks .
*/
size_t get_total_number_of_kv_blocks() const {
return m_allocator.get_total_number_of_kv_blocks();
}

/**
* @brief Forks a sequence, establishing a new sequence from an existing one, reusing
* currently allocated blocks of the existing sequence.
Expand Down
124 changes: 106 additions & 18 deletions src/cpp/src/cache_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,118 @@ class CacheManager {
DeviceConfig m_device_config;
std::vector<ov::Tensor> m_key_cache;
std::vector<ov::Tensor> m_value_cache;
size_t m_num_allocated_kv_blocks = 0;
ov::Core m_core;
ov::InferRequest m_request;

ov::Shape set_first_dim_and_make_static(const ov::PartialShape& shape, size_t dim) {
ov::PartialShape res_shape = shape;
res_shape[0] = dim;
OPENVINO_ASSERT(res_shape.is_static());
return res_shape.to_shape();
}

void update_request_tensor(size_t decoder_layer_id) {
m_request.set_tensor(std::string("key_cache.") + std::to_string(decoder_layer_id), m_key_cache[decoder_layer_id]);
m_request.set_tensor(std::string("value_cache.") + std::to_string(decoder_layer_id), m_value_cache[decoder_layer_id]);
}

public:
explicit CacheManager(const DeviceConfig &device_config, ov::Core core) :
explicit CacheManager(const DeviceConfig &device_config, ov::InferRequest request, ov::Core core) :
m_device_config(device_config),
m_request(request),
m_core(core) {
m_key_cache.reserve(m_device_config.get_num_layers());
m_value_cache.reserve(m_device_config.get_num_layers());
}

void allocate_cache_if_needed(size_t num_kv_blocks) {
if (m_num_allocated_kv_blocks >= num_kv_blocks) {
return;
}
OPENVINO_ASSERT(m_key_cache.size() == m_value_cache.size());
m_num_allocated_kv_blocks = num_kv_blocks;
ov::Shape value_cache_shape = set_first_dim_and_make_static(m_device_config.get_value_cache_shape(), num_kv_blocks);
ov::Shape key_cache_shape = set_first_dim_and_make_static(m_device_config.get_key_cache_shape(), num_kv_blocks);

const std::string device_name = m_device_config.get_device();

ov::Coordinate start_key{0,0,0,0};
ov::Coordinate start_value{0,0,0,0};

const std::string device_name = device_config.get_device();
if (device_name.find("GPU") == std::string::npos) {// Allocate KV caches
for (size_t decoder_layer_id = 0; decoder_layer_id < m_device_config.get_num_layers(); ++decoder_layer_id) {
ov::Tensor key_cache(device_config.get_cache_precision(), device_config.get_key_cache_shape());
ov::Tensor value_cache(device_config.get_cache_precision(), device_config.get_value_cache_shape());
ov::Tensor key_cache(m_device_config.get_cache_precision(), key_cache_shape);
ov::Tensor value_cache(m_device_config.get_cache_precision(), value_cache_shape);

auto key_cache_roi_end = static_cast<unsigned char*>(key_cache.data());
auto value_cache_roi_end = static_cast<unsigned char*>(value_cache.data());
size_t key_roi_size_byte = 0;
size_t value_roi_size_byte = 0;

if (m_key_cache.size() > decoder_layer_id) {
ov::Coordinate end_key = m_key_cache[decoder_layer_id].get_shape();
ov::Coordinate end_value = m_value_cache[decoder_layer_id].get_shape();

key_roi_size_byte = m_key_cache[decoder_layer_id].get_byte_size();
value_roi_size_byte = m_value_cache[decoder_layer_id].get_byte_size();
key_cache_roi_end = static_cast<unsigned char*>(key_cache.data()) + key_roi_size_byte;
value_cache_roi_end = static_cast<unsigned char*>(value_cache.data()) + value_roi_size_byte;

// copy current cache data
ov::Tensor dst_key_roi(key_cache, start_key, end_key);
ov::Tensor dst_value_roi(value_cache, start_value, end_value);

m_key_cache[decoder_layer_id].copy_to(dst_key_roi);
m_value_cache[decoder_layer_id].copy_to(dst_value_roi);

}

ilya-lavrenov marked this conversation as resolved.
Show resolved Hide resolved
// force allocation
std::memset(key_cache.data(), 0, key_cache.get_byte_size());
std::memset(value_cache.data(), 0, value_cache.get_byte_size());
// Some optimizations like AVX2, AVX512, AMX require a minimal shape and
// perform multiplying by zero on the excess data. Uninitialized tensor data contain NAN's,
// so NAN * 0 returns non-zero invalid data.
// So we need to set zeros to all newly allocated tensors data.
std::memset(key_cache_roi_end, 0, key_cache.get_byte_size() - key_roi_size_byte);
std::memset(value_cache_roi_end, 0, value_cache.get_byte_size() - value_roi_size_byte);

// set new cache tensors
if (m_key_cache.size() > decoder_layer_id) {
m_key_cache[decoder_layer_id] = key_cache;
m_value_cache[decoder_layer_id] = value_cache;
}
else {
m_key_cache.emplace_back(key_cache);
m_value_cache.emplace_back(value_cache);
}

m_key_cache.emplace_back(key_cache);
m_value_cache.emplace_back(value_cache);
update_request_tensor(decoder_layer_id);
}
} else {
auto remote_context = m_core.get_default_context(device_name);
for (size_t decoder_layer_id = 0; decoder_layer_id < m_device_config.get_num_layers(); ++decoder_layer_id) {
ov::Tensor key_cache = remote_context.create_tensor(device_config.get_cache_precision(),
device_config.get_key_cache_shape());
ov::Tensor value_cache = remote_context.create_tensor(device_config.get_cache_precision(),
device_config.get_value_cache_shape());

m_key_cache.emplace_back(key_cache);
m_value_cache.emplace_back(value_cache);
ov::Tensor key_cache = remote_context.create_tensor(m_device_config.get_cache_precision(),
key_cache_shape);
ov::Tensor value_cache = remote_context.create_tensor(m_device_config.get_cache_precision(),
value_cache_shape);

if (m_key_cache.size() > decoder_layer_id) {
ov::Coordinate end_key = m_key_cache[decoder_layer_id].get_shape();
ov::Coordinate end_value = m_value_cache[decoder_layer_id].get_shape();

// copy current cache data
ov::RemoteTensor dst_key_roi(key_cache, start_key, end_key);
ov::RemoteTensor dst_value_roi(value_cache, start_value, end_value);
dst_key_roi.copy_from(m_key_cache[decoder_layer_id]);
dst_value_roi.copy_from(m_value_cache[decoder_layer_id]);

m_key_cache[decoder_layer_id] = key_cache;
m_value_cache[decoder_layer_id] = value_cache;
}
else {
m_key_cache.emplace_back(key_cache);
m_value_cache.emplace_back(value_cache);
}
update_request_tensor(decoder_layer_id);
}
}
}
Expand All @@ -62,8 +142,8 @@ class CacheManager {
}

void copy_blocks(const std::map<size_t, std::list<size_t>>& block_copy_map) {
ov::Shape key_shape = m_device_config.get_key_cache_shape();
ov::Shape value_shape = m_device_config.get_value_cache_shape();
ov::Shape key_shape = set_first_dim_and_make_static(m_device_config.get_key_cache_shape(), m_num_allocated_kv_blocks);
ov::Shape value_shape = set_first_dim_and_make_static(m_device_config.get_value_cache_shape(), m_num_allocated_kv_blocks);

ov::Coordinate key_src_start_roi(key_shape.size(), 0);
ov::Coordinate key_src_end_roi = key_shape;
Expand Down Expand Up @@ -98,5 +178,13 @@ class CacheManager {
}
}
}

std::shared_ptr<Core> get_core() {
return std::make_shared<Core>(m_core);
}

std::shared_ptr<DeviceConfig> get_device_config() {
return std::make_shared<DeviceConfig>(m_device_config);
}
};
}
10 changes: 2 additions & 8 deletions src/cpp/src/continuous_batching_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::init(
ov::InferRequest infer_request = compiled_model.create_infer_request();

// setup KV caches
m_cache_manager = std::make_shared<CacheManager>(device_config, core);
for (size_t decoder_layer_id = 0; decoder_layer_id < device_config.get_num_layers(); ++decoder_layer_id) {
infer_request.set_tensor(std::string("key_cache.") + std::to_string(decoder_layer_id), m_cache_manager->get_key_cache(decoder_layer_id));
infer_request.set_tensor(std::string("value_cache.") + std::to_string(decoder_layer_id), m_cache_manager->get_value_cache(decoder_layer_id));
}
m_cache_manager = std::make_shared<CacheManager>(device_config, infer_request, core);

SchedulerConfig updated_config = scheduler_config;
// update KV blocks number in scheduler config
Expand All @@ -71,8 +67,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::init(
// as it may lead to performance slowdown
can_use_partial_preemption = false;
}

m_scheduler = std::make_shared<Scheduler>(device_config.get_block_size(), updated_config, device_config.get_num_layers(), can_use_partial_preemption);
m_scheduler = std::make_shared<Scheduler>(device_config.get_block_size(), m_cache_manager, updated_config, device_config.get_num_layers(), can_use_partial_preemption);
// and finally create model runner
bool is_use_cache_eviction = m_scheduler->get_config().use_cache_eviction;
m_model_runner = std::make_shared<ModelRunner>(infer_request, m_scheduler->get_block_size(), device_config.get_num_layers(), is_use_cache_eviction);
Expand Down Expand Up @@ -133,7 +128,6 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() {
_pull_awaiting_requests();

m_pipeline_metrics.requests = m_requests.size();

Scheduler::Output scheduler_output;
{
static ManualTimer timer("scheduling");
Expand Down
Loading
Loading