Skip to content

Commit

Permalink
Improve the query API of llm cache and use vector<uint8_t> as payload…
Browse files Browse the repository at this point in the history
… object.

Signed-off-by: Ye Cao <[email protected]>
  • Loading branch information
dashanji committed Mar 4, 2024
1 parent 3e4738f commit 1edb37c
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 198 deletions.
3 changes: 3 additions & 0 deletions modules/basic/ds/dataframe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ const std::shared_ptr<arrow::RecordBatch> DataFrame::AsBatch(bool copy) const {
} else if (auto tensor =
std::dynamic_pointer_cast<Tensor<std::string>>(df_col)) {
num_rows = tensor->shape()[0];
} else if (auto tensor =
std::dynamic_pointer_cast<Tensor<uint8_t>>(df_col)) {
num_rows = tensor->shape()[0];
}

std::vector<std::shared_ptr<arrow::Buffer>> buffer{
Expand Down
41 changes: 20 additions & 21 deletions modules/llm-cache/ds/kv_state_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,24 @@ void KVStateCache::Resolve() {
}

// 3. construct the member field
this->dimension = this->meta_.GetKeyValue<int>("dimension");
this->tensorBytes = this->meta_.GetKeyValue<int>("tensorBytes");
this->version = this->meta_.GetKeyValue<uint64_t>("version");
this->layer = this->meta_.GetKeyValue<int>("layer");
VLOG(100) << "construct the member field success, with dimension:"
<< this->dimension << " version:" << this->version
VLOG(100) << "construct the member field success, with tensorBytes:"
<< this->tensorBytes << " version:" << this->version
<< " layer:" << this->layer;
}

KVStateCache::~KVStateCache() {}

KVStateCacheBuilder::KVStateCacheBuilder(Client& client, int dimension,
KVStateCacheBuilder::KVStateCacheBuilder(Client& client, int tensorBytes,
int cacheCapacity, int layer,
int blockSize) {
this->dimension = dimension;
this->tensorBytes = tensorBytes;
this->version = 0;
this->layer = layer;
KVStateCacheBlockBuilder* builder =
new KVStateCacheBlockBuilder(client, this->dimension, layer, blockSize);
new KVStateCacheBlockBuilder(client, this->tensorBytes, layer, blockSize);

this->rootTree = std::make_shared<RadixTree>(cacheCapacity);

Expand All @@ -90,7 +90,7 @@ KVStateCacheBuilder::KVStateCacheBuilder(Client& client, int dimension,

KVStateCacheBuilder::KVStateCacheBuilder(Client& client,
std::shared_ptr<KVStateCache> cache) {
this->dimension = cache->GetDimension();
this->tensorBytes = cache->GetTensorBytes();
this->version = cache->GetVersion();
this->layer = cache->GetLayer();
// 1. create block builder from block
Expand Down Expand Up @@ -118,7 +118,7 @@ KVStateCacheBlockBuilder* KVStateCacheBuilder::Split(
// Split the tree if the list of kvState is full.
VINEYARD_ASSERT(nodeDataList.size() > 0);
KVStateCacheBlockBuilder* childKVStateCacheBlockBuilder =
new KVStateCacheBlockBuilder(client, this->dimension, this->layer,
new KVStateCacheBlockBuilder(client, this->tensorBytes, this->layer,
kvStateCacheBlockBuilder->GetBlockSize());
for (size_t i = 0; i < nodeDataList.size(); i++) {
OffsetData* data =
Expand All @@ -138,10 +138,9 @@ KVStateCacheBlockBuilder* KVStateCacheBuilder::Split(
return childKVStateCacheBlockBuilder;
}

void KVStateCacheBuilder::Update(Client& client,
const std::vector<int>& tokenList,
int nextToken,
const KV_STATE_WITH_LAYER& kvState) {
void KVStateCacheBuilder::Update(
Client& client, const std::vector<int>& tokenList, int nextToken,
const std::map<int, std::pair<K_STATE, V_STATE>>& kvState) {
std::vector<int> tokenListCopy = tokenList;
tokenListCopy.push_back(nextToken);

Expand Down Expand Up @@ -199,9 +198,9 @@ void KVStateCacheBuilder::Update(Client& client,
<< " bitmap:" << kvStateCacheBlockBuilder->GetBitmapStr();
}

int KVStateCacheBuilder::Query(Client& client,
const std::vector<int>& tokenList, int token,
KV_STATE_WITH_LAYER& kvState) {
int KVStateCacheBuilder::Query(
Client& client, const std::vector<int>& tokenList, int token,
std::map<int, std::pair<K_STATE, V_STATE>>& kvState) {
std::vector<int> tokenListCopy = tokenList;
tokenListCopy.push_back(token);

Expand Down Expand Up @@ -275,14 +274,14 @@ void KVStateCacheBuilder::Merge(Client& client,
for (auto it = insertTokenList.begin(); it != insertTokenList.end(); ++it) {
std::vector<int> tokenList =
std::vector<int>((*it).begin(), (*it).end() - 1);
KV_STATE_WITH_LAYER kvState;
std::map<int, std::pair<K_STATE, V_STATE>> kvState;
for (int currentLayer = 0; currentLayer < this->layer; currentLayer++) {
K_STATE key_state;
V_STATE value_state;
key_state.data = malloc(this->dimension * sizeof(double));
key_state.length = this->dimension * sizeof(double);
value_state.data = malloc(this->dimension * sizeof(double));
value_state.length = this->dimension * sizeof(double);
key_state.data = malloc(this->tensorBytes);
key_state.length = this->tensorBytes;
value_state.data = malloc(this->tensorBytes);
value_state.length = this->tensorBytes;

kvState.insert(
std::make_pair(currentLayer, std::make_pair(key_state, value_state)));
Expand All @@ -309,7 +308,7 @@ std::shared_ptr<Object> KVStateCacheBuilder::_Seal(Client& client) {
std::shared_ptr<KVStateCache> kvStateCache = std::make_shared<KVStateCache>();

// 1. store the member variables to cache object meta
kvStateCache->meta_.AddKeyValue("dimension", this->dimension);
kvStateCache->meta_.AddKeyValue("tensorBytes", this->tensorBytes);
kvStateCache->meta_.AddKeyValue("version", this->version);
kvStateCache->meta_.AddKeyValue("layer", this->layer);

Expand Down
16 changes: 9 additions & 7 deletions modules/llm-cache/ds/kv_state_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ limitations under the License.
#include <map>
#include <memory>
#include <vector>
#include <utility>

#include "client/client.h"
#include "common/util/logging.h"
Expand All @@ -40,7 +41,7 @@ class KVStateCache : public vineyard::Registered<KVStateCache> {
private:
std::vector<std::shared_ptr<KVStateCacheBlock>> kvStateCacheBlockList;
std::shared_ptr<RadixTree> rootTree;
int dimension;
int tensorBytes;
int cacheCapacity;
int layer;
uint64_t version;
Expand All @@ -60,7 +61,7 @@ class KVStateCache : public vineyard::Registered<KVStateCache> {
return this->kvStateCacheBlockList;
}

int GetDimension() { return this->dimension; }
int GetTensorBytes() { return this->tensorBytes; }

int GetCacheCapacity() { return this->cacheCapacity; }

Expand All @@ -77,12 +78,12 @@ class KVStateCache : public vineyard::Registered<KVStateCache> {

class KVStateCacheBuilder : public vineyard::ObjectBuilder {
std::shared_ptr<RadixTree> rootTree;
int dimension;
int tensorBytes;
int layer;
uint64_t version;

public:
KVStateCacheBuilder(Client& client, int dimension, int cacheCapacity,
KVStateCacheBuilder(Client& client, int tensorBytes, int cacheCapacity,
int layer, int blockSize = DEFAULT_BLOCK_SIZE);

KVStateCacheBuilder(Client& client, std::shared_ptr<KVStateCache> cache);
Expand All @@ -92,10 +93,11 @@ class KVStateCacheBuilder : public vineyard::ObjectBuilder {
std::vector<std::shared_ptr<NodeData>> nodeDataList);

void Update(Client& client, const std::vector<int>& token_list,
int next_token, const KV_STATE_WITH_LAYER& kv_state);
int next_token,
const std::map<int, std::pair<K_STATE, V_STATE>>& kv_state);

int Query(Client& client, const std::vector<int>& token_list, int token,
KV_STATE_WITH_LAYER& kv_state);
std::map<int, std::pair<K_STATE, V_STATE>>& kv_state);

void Delete(std::shared_ptr<NodeData> evicted_node);

Expand All @@ -109,7 +111,7 @@ class KVStateCacheBuilder : public vineyard::ObjectBuilder {

std::shared_ptr<Object> _Seal(Client& client) override;

uint64_t GetDimension() { return this->dimension; }
uint64_t GetTensorBytes() { return this->tensorBytes; }

std::shared_ptr<RadixTree> GetRootTree() { return this->rootTree; }

Expand Down
111 changes: 54 additions & 57 deletions modules/llm-cache/ds/kv_state_cache_block.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ void KVStateCacheBlock::Construct(const ObjectMeta& meta) {
this->layer = this->meta_.GetKeyValue<int>("layer");
for (int currentLayer = 0; currentLayer < this->layer; currentLayer++) {
this->keyStateTensorList.push_back(
std::dynamic_pointer_cast<Tensor<double>>(this->meta_.GetMember(
std::dynamic_pointer_cast<Tensor<uint8_t>>(this->meta_.GetMember(
"keyStateTensorBuilder_" + std::to_string(currentLayer))));
this->valueStateTensorList.push_back(
std::dynamic_pointer_cast<Tensor<double>>(this->meta_.GetMember(
std::dynamic_pointer_cast<Tensor<uint8_t>>(this->meta_.GetMember(
"valueStateTensorBuilder_" + std::to_string(currentLayer))));
}
// 2. construct the member field
Expand All @@ -74,27 +74,27 @@ void KVStateCacheBlock::Construct(const ObjectMeta& meta) {
this->bitmap[i] =
this->meta_.GetKeyValue<uint64_t>("bitmap_" + std::to_string(i));
}
this->dimension = this->meta_.GetKeyValue<int>("dimension");
this->tensorBytes = this->meta_.GetKeyValue<int>("tensorBytes");
this->blockSize = this->meta_.GetKeyValue<int>("block_size");
}

KVStateCacheBlock::~KVStateCacheBlock() { delete this->bitmap; }

KVStateCacheBlockBuilder::KVStateCacheBlockBuilder(Client& client,
int dimension, int layer,
int tensorBytes, int layer,
int blockSize) {
this->blockSize = blockSize;
this->bitmapSize = (blockSize + 63) / 64;
this->bitmap = new uint64_t[this->bitmapSize];
memset(this->bitmap, UINT8_MAX, this->bitmapSize * sizeof(uint64_t));
std::vector<int64_t> shape = {(int64_t)(blockSize), dimension};
std::vector<int64_t> shape = {(int64_t)(blockSize), tensorBytes};
for (int i = 0; i < layer; i++) {
this->keyStateTensorBuilderList.push_back(
std::make_shared<TensorBuilder<double>>(client, shape));
std::make_shared<TensorBuilder<uint8_t>>(client, shape));
this->valueStateTensorBuilderList.push_back(
std::make_shared<TensorBuilder<double>>(client, shape));
std::make_shared<TensorBuilder<uint8_t>>(client, shape));
}
this->dimension = dimension;
this->tensorBytes = tensorBytes;
this->layer = layer;
}

Expand All @@ -108,37 +108,38 @@ KVStateCacheBlockBuilder::KVStateCacheBlockBuilder(
for (int i = 0; i < this->bitmapSize; i++) {
this->bitmap[i] = kvStateCacheBlock->bitmap[i];
}
this->dimension = kvStateCacheBlock->dimension;
this->tensorBytes = kvStateCacheBlock->tensorBytes;
this->layer = kvStateCacheBlock->layer;
std::vector<int64_t> shape = {(int64_t)(blockSize), dimension};
std::vector<int64_t> shape = {(int64_t)(blockSize), tensorBytes};
for (int currentLayer = 0; currentLayer < this->layer; currentLayer++) {
this->keyStateTensorBuilderList.push_back(
std::make_shared<TensorBuilder<double>>(client, shape));
std::make_shared<TensorBuilder<uint8_t>>(client, shape));
this->valueStateTensorBuilderList.push_back(
std::make_shared<TensorBuilder<double>>(client, shape));
std::make_shared<TensorBuilder<uint8_t>>(client, shape));
}

for (int currentLayer = 0; currentLayer < this->layer; currentLayer++) {
memcpy(this->keyStateTensorBuilderList[currentLayer]->data(),
kvStateCacheBlock->keyStateTensorList[currentLayer]->data(),
(int64_t)(blockSize) * this->dimension * sizeof(double));
(int64_t)(blockSize) * this->tensorBytes);
memcpy(this->valueStateTensorBuilderList[currentLayer]->data(),
kvStateCacheBlock->valueStateTensorList[currentLayer]->data(),
(int64_t)(blockSize) * this->dimension * sizeof(double));
(int64_t)(blockSize) * this->tensorBytes);
}
}

// current we do not consider the layer.
int KVStateCacheBlockBuilder::Query(Client& client, int index,
KV_STATE_WITH_LAYER& kvState) {
int KVStateCacheBlockBuilder::Query(
Client& client, int index,
std::map<int, std::pair<K_STATE, V_STATE>>& kvState) {
for (int currentLayer = 0; currentLayer < this->layer; currentLayer++) {
memcpy((kvState.find(currentLayer)->second).first.data,
keyStateTensorBuilderList[currentLayer]->data() + index * dimension,
dimension * sizeof(double));
memcpy(
(kvState.find(currentLayer)->second).second.data,
valueStateTensorBuilderList[currentLayer]->data() + index * dimension,
dimension * sizeof(double));
K_STATE keyState = (kvState.find(currentLayer)->second).first;
V_STATE valueState = (kvState.find(currentLayer)->second).second;
keyState.data = keyStateTensorBuilderList[currentLayer]->data() + index;
keyState.length = tensorBytes;
valueState.data = valueStateTensorBuilderList[currentLayer]->data() + index;
valueState.length = tensorBytes;
kvState.emplace(currentLayer, std::make_pair(keyState, valueState));
}
return 0;
}
Expand All @@ -164,23 +165,20 @@ bool KVStateCacheBlockBuilder::IsFull() {
return true;
}

void KVStateCacheBlockBuilder::Update(const KV_STATE_WITH_LAYER& kvState,
OffsetData* data) {
void KVStateCacheBlockBuilder::Update(
const std::map<int, std::pair<K_STATE, V_STATE>>& kvState,
OffsetData* data) {
int index = this->FindEmptySlot();
for (int currentLayer = 0; currentLayer < this->layer; currentLayer++) {
K_STATE keyState = (kvState.find(currentLayer)->second).first;
V_STATE valueState = (kvState.find(currentLayer)->second).second;
VINEYARD_ASSERT(keyState.length ==
(size_t) this->dimension * sizeof(double));
VINEYARD_ASSERT(valueState.length ==
(size_t) this->dimension * sizeof(double));

double* keyData = keyStateTensorBuilderList[currentLayer]->data();
double* valueData = valueStateTensorBuilderList[currentLayer]->data();
memcpy(keyData + index * this->dimension, keyState.data,
this->dimension * sizeof(double));
memcpy(valueData + index * this->dimension, valueState.data,
this->dimension * sizeof(double));
VINEYARD_ASSERT(keyState.length == (size_t) this->tensorBytes);
VINEYARD_ASSERT(valueState.length == (size_t) this->tensorBytes);

uint8_t* keyData = keyStateTensorBuilderList[currentLayer]->data();
uint8_t* valueData = valueStateTensorBuilderList[currentLayer]->data();
memcpy(keyData + index, keyState.data, this->tensorBytes);
memcpy(valueData + index, valueState.data, this->tensorBytes);
}
data->offset = index;

Expand All @@ -193,25 +191,23 @@ int16_t KVStateCacheBlockBuilder::Split(KVStateCacheBlockBuilder* child,
VINEYARD_ASSERT(this->layer == child->layer);
int childIndex = child->FindEmptySlot();
for (int currentLayer = 0; currentLayer < this->layer; currentLayer++) {
std::shared_ptr<TensorBuilder<double>> keyStateTensorBuilder =
std::shared_ptr<TensorBuilder<uint8_t>> keyStateTensorBuilder =
keyStateTensorBuilderList[currentLayer];
std::shared_ptr<TensorBuilder<double>> valueStateTensorBuilder =
std::shared_ptr<TensorBuilder<uint8_t>> valueStateTensorBuilder =
valueStateTensorBuilderList[currentLayer];
std::shared_ptr<TensorBuilder<double>> childKeyStateTensorBuilder =
std::shared_ptr<TensorBuilder<uint8_t>> childKeyStateTensorBuilder =
child->keyStateTensorBuilderList[currentLayer];
std::shared_ptr<TensorBuilder<double>> childValueStateTensorBuilder =
std::shared_ptr<TensorBuilder<uint8_t>> childValueStateTensorBuilder =
child->valueStateTensorBuilderList[currentLayer];

double* keyState = keyStateTensorBuilder->data() + index * this->dimension;
double* valueState =
valueStateTensorBuilder->data() + index * this->dimension;
double* childKeyState =
childKeyStateTensorBuilder->data() + childIndex * this->dimension;
double* childValueState =
childValueStateTensorBuilder->data() + childIndex * this->dimension;
uint8_t* keyState = keyStateTensorBuilder->data() + index;
uint8_t* valueState = valueStateTensorBuilder->data() + index;
uint8_t* childKeyState = childKeyStateTensorBuilder->data() + childIndex;
uint8_t* childValueState =
childValueStateTensorBuilder->data() + childIndex;

memcpy(childKeyState, keyState, this->dimension * sizeof(double));
memcpy(childValueState, valueState, this->dimension * sizeof(double));
memcpy(childKeyState, keyState, this->tensorBytes);
memcpy(childValueState, valueState, this->tensorBytes);
}
ACQUIRE_BIT_RESOURCE(child->bitmap[childIndex / 64], childIndex % 64);
FREE_BIT_RESOURCE(this->bitmap[index / 64], index % 64);
Expand Down Expand Up @@ -244,7 +240,7 @@ std::shared_ptr<Object> KVStateCacheBlockBuilder::_Seal(Client& client) {
}

kvStateCacheBlock->meta_.AddKeyValue("block_size", this->blockSize);
kvStateCacheBlock->meta_.AddKeyValue("dimension", this->dimension);
kvStateCacheBlock->meta_.AddKeyValue("tensorBytes", this->tensorBytes);
kvStateCacheBlock->meta_.AddKeyValue("layer", this->layer);
// 3. set the object type to meta
kvStateCacheBlock->meta_.SetTypeName(type_name<KVStateCacheBlock>());
Expand All @@ -264,15 +260,16 @@ void KVStateCacheBlockBuilder::PrintKVStateCacheBlock() {
LOG(INFO) << "layer:" << currentLayer;
for (int i = 0; i < this->blockSize; i++) {
LOG(INFO) << "index:" << i;
uint8_t* key_state_data = keyStateTensorBuilderList[currentLayer]->data();
uint8_t* value_state_data =
valueStateTensorBuilderList[currentLayer]->data();
// print the first tensorBytes bytes
std::string keyState = "";
std::string valueState = "";
for (int j = 0; j < this->dimension; j++) {
keyState += std::to_string((keyStateTensorBuilderList[currentLayer]
->data())[i * dimension + j]) +
" ";
valueState += std::to_string((valueStateTensorBuilderList[currentLayer]
->data())[i * dimension + j]) +
" ";
for (int j = 0; j < this->tensorBytes; j++) {
keyState += std::to_string(key_state_data[i * tensorBytes + j]) + " ";
valueState +=
std::to_string(value_state_data[i * tensorBytes + j]) + " ";
}
LOG(INFO) << "keyState:" << keyState;
LOG(INFO) << "valueState:" << valueState;
Expand Down
Loading

0 comments on commit 1edb37c

Please sign in to comment.