Skip to content

Commit

Permalink
Add a benchmark test for kv state cache (#1774)
Browse files Browse the repository at this point in the history
Fixes #1770

Signed-off-by: Ye Cao <[email protected]>
  • Loading branch information
dashanji authored and vegetableysm committed Feb 28, 2024
1 parent 44d7c00 commit 9f6a64e
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 52 deletions.
2 changes: 1 addition & 1 deletion modules/kv-state-cache/ds/kv_state_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ void KVStateCacheBuilder::Update(Client& client,
std::shared_ptr<NodeData> nodeData =
this->rootTree->Insert(tokenListCopy, evictedNodeData);
if (nodeData == nullptr) {
LOG(INFO) << "insert failed";
LOG(ERROR) << "insert failed";
return;
}
KVStateCacheBlockBuilder* kvStateCacheBlockBuilder =
Expand Down
84 changes: 41 additions & 43 deletions modules/kv-state-cache/radix-tree/radix-tree.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,21 @@ RadixTree::RadixTree(int cacheCapacity) {
std::vector<int> rootToken = {INT32_MAX};
std::shared_ptr<NodeData> evictedNode;
this->InsertInternal(rootToken, evictedNode);
raxShow(this->tree);
// raxShow(this->tree);
raxNode* dataNode = raxFindAndReturnDataNode(this->tree, rootToken.data(),
rootToken.size(), NULL, false);
DataWrapper* data = new DataWrapper();
data->data = nullptr;
data->dataLength = 0;
dataNode->custom_data = data;
LOG(INFO) << "root data wrapper:" << data;
VLOG(100) << "root data wrapper:" << data;
dataNode->issubtree = true;
this->rootToken = rootToken;
}

RadixTree::~RadixTree() {
LOG(INFO) << "~RadixTree";
raxShow(this->tree);
VLOG(100) << "~RadixTree";
// raxShow(this->tree);

raxNode* dataNode = raxFindAndReturnDataNode(this->tree, rootToken.data(),
rootToken.size(), NULL, false);
Expand Down Expand Up @@ -105,14 +105,14 @@ std::shared_ptr<NodeData> RadixTree::InsertInternal(
return NULL;
}
if (retval == 1) {
LOG(INFO) << "node count++:" << this->nodeCount;
VLOG(100) << "node count++:" << this->nodeCount;
nodeCount++;
}

raxShow(this->tree);
// raxShow(this->tree);
if (this->nodeCount > this->cacheCapacity) {
LOG(INFO) << "cache capacity is full, evict the last recent node";
LOG(INFO) << "cache capacity:" << this->cacheCapacity
VLOG(100) << "cache capacity is full, evict the last recent node";
VLOG(100) << "cache capacity:" << this->cacheCapacity
<< " node count:" << this->nodeCount;
// evict the last recent node (the node with the largest lru index-
std::vector<int> evictedTokensVector;
Expand All @@ -127,13 +127,12 @@ std::shared_ptr<NodeData> RadixTree::InsertInternal(
raxNode* subTreeNode = nullptr;
dataNode = raxFindAndReturnDataNode(
this->tree, insertTokensArray, insertTokensArrayLen, &subTreeNode, false);
LOG(INFO) << "sub tree node:" << subTreeNode << " data node:" << dataNode;
VLOG(100) << "sub tree node:" << subTreeNode << " data node:" << dataNode;
/**
* if the data node is null, it means the evicted node is the same node as
* the inserted node.
*/
if (dataNode == NULL) {
LOG(INFO) << "get failed";
return NULL;
}

Expand Down Expand Up @@ -171,12 +170,12 @@ void RadixTree::DeleteInternal(std::vector<int> tokens,
evictedNode->cleanTreeData = true;
}
} else {
LOG(INFO) << "remove failed";
LOG(ERROR) << "remove failed";
}
}

std::shared_ptr<NodeData> RadixTree::QueryInternal(std::vector<int> key) {
LOG(INFO) << "Query";
VLOG(100) << "Query";
int* tokens = key.data();
size_t tokensLen = key.size();

Expand All @@ -187,9 +186,8 @@ std::shared_ptr<NodeData> RadixTree::QueryInternal(std::vector<int> key) {
raxNode* subTreeNode;
raxNode* dataNode =
raxFindAndReturnDataNode(this->tree, tokens, tokensLen, &subTreeNode);
LOG(INFO) << "query subtree node:" << subTreeNode;
VLOG(100) << "query subtree node:" << subTreeNode;
if (dataNode == NULL) {
LOG(INFO) << "get failed";
return NULL;
}

Expand All @@ -199,8 +197,8 @@ std::shared_ptr<NodeData> RadixTree::QueryInternal(std::vector<int> key) {
}

std::string RadixTree::Serialize() {
LOG(INFO) << "Serialize......";
raxShow(this->tree);
VLOG(100) << "Serialize......";
// raxShow(this->tree);
std::vector<std::vector<int>> tokenList;
std::vector<void*> dataList;
std::vector<uint64_t> timestampList;
Expand All @@ -209,7 +207,7 @@ std::string RadixTree::Serialize() {
raxSerialize(this->tree, tokenList, dataList, timestampList,
&subTreeTokenList, &subTreeDataList);

raxShow(this->tree);
// raxShow(this->tree);
std::string serializedStr;

if (tokenList.size() != dataList.size()) {
Expand Down Expand Up @@ -257,7 +255,7 @@ std::string RadixTree::Serialize() {

serializedStr += "\t\n";

LOG(INFO) << "sub tree token list size:" << subTreeTokenList.size();
VLOG(100) << "sub tree token list size:" << subTreeTokenList.size();
for (size_t index = 0; index < subTreeTokenList.size(); index++) {
for (size_t j = 0; j < subTreeTokenList[index].size(); j++) {
serializedStr += std::to_string(subTreeTokenList[index][j]);
Expand All @@ -272,7 +270,7 @@ std::string RadixTree::Serialize() {
(reinterpret_cast<DataWrapper*>(subTreeDataList[index]))->data);
std::ostringstream dataOSS;

LOG(INFO)
VLOG(100)
<< "data length:"
<< (reinterpret_cast<DataWrapper*>(subTreeDataList[index]))->dataLength;
for (int i = 0;
Expand All @@ -282,12 +280,12 @@ std::string RadixTree::Serialize() {
dataOSS << std::hex << std::setw(2) << std::setfill('0')
<< static_cast<int>(static_cast<unsigned char>(bytes[i]));
}
LOG(INFO) << "data:"
VLOG(100) << "data:"
<< (reinterpret_cast<DataWrapper*>(subTreeDataList[index]))->data;
LOG(INFO) << "data oss:" << dataOSS.str();
VLOG(100) << "data oss:" << dataOSS.str();
serializedStr += dataOSS.str() + "\n";
}
LOG(INFO) << "serializedStr:" << serializedStr;
VLOG(100) << "serializedStr:" << serializedStr;

// use ZSTD to compress the serialized string
size_t srcSize = serializedStr.size();
Expand All @@ -313,7 +311,7 @@ std::string RadixTree::Serialize() {
}

std::shared_ptr<RadixTree> RadixTree::Deserialize(std::string data) {
LOG(INFO) << "Deserialize......";
VLOG(100) << "Deserialize......";
// use LZ4 to decompress the serialized string
int compressedSize = *reinterpret_cast<int*>(data.data());
data.erase(0, sizeof(int));
Expand Down Expand Up @@ -356,6 +354,7 @@ std::shared_ptr<RadixTree> RadixTree::Deserialize(std::string data) {
line.pop_back();
continue;
}
VLOG(100) << "data line:" << line << std::endl;
std::istringstream lineStream(line);
std::string tokenListPart, timestampPart, dataPart, subTreeSizePart;

Expand Down Expand Up @@ -396,6 +395,7 @@ std::shared_ptr<RadixTree> RadixTree::Deserialize(std::string data) {
if (!(subTreeSizeStream >> std::hex >> subTreeSize)) {
LOG(ERROR) << "Invalid sub tree size format.";
}
VLOG(100) << "Deserialize sub tree size:" << subTreeSize;
subTreeSizeList.push_back(subTreeSize);
}

Expand All @@ -410,7 +410,7 @@ std::shared_ptr<RadixTree> RadixTree::Deserialize(std::string data) {
// is created by upper layer. Here just recover it from serialized
// string.
char* data = nullptr;
LOG(INFO) << "data size:" << dataSize;
VLOG(100) << "data size:" << dataSize;
if (dataSize != 0) {
data = new char[dataSize];
std::istringstream dataStream(dataPart);
Expand All @@ -420,16 +420,14 @@ std::shared_ptr<RadixTree> RadixTree::Deserialize(std::string data) {
// Read two characters for one byte
if (!dataStream.read(hex, 2)) {
delete[] data;
LOG(INFO) << "Invalid data format.";
throw std::runtime_error("Invalid data format.");
LOG(ERROR) << "Invalid data format.";
}
// Convert the two hex characters to one byte
unsigned int byte;
std::istringstream hexStream(hex);
if (!(hexStream >> std::hex >> byte)) {
delete[] data;
LOG(INFO) << "Invalid data format.";
throw std::runtime_error("Invalid data format.");
LOG(ERROR) << "Invalid data format.";
}
reinterpret_cast<unsigned char*>(data)[i] =
static_cast<unsigned char>(byte);
Expand All @@ -452,13 +450,13 @@ std::shared_ptr<RadixTree> RadixTree::Deserialize(std::string data) {
std::make_shared<RadixTree>(cacheCapacity);
radixTree->nodeCount = tokenList.size();

raxShow(radixTree->tree);
// raxShow(radixTree->tree);
for (size_t i = 0; i < tokenList.size(); i++) {
std::string token_str = "";
for (size_t j = 0; j < tokenList[i].size(); j++) {
token_str += std::to_string(tokenList[i][j]);
}
LOG(INFO) << "token:" << token_str;
VLOG(100) << "token:" << token_str;
int* insertTokensArray = tokenList[i].data();
size_t insertTokensArrayLen = tokenList[i].size();
DataWrapper* data = new DataWrapper();
Expand All @@ -481,40 +479,40 @@ std::shared_ptr<RadixTree> RadixTree::Deserialize(std::string data) {
for (size_t i = 0; i < tokenList.size(); i++) {
raxNode* node = raxFindAndReturnDataNode(
radixTree->tree, tokenList[i].data(), tokenList[i].size(), NULL, false);
LOG(INFO) << "node:" << node << " sub tree node num:" << subTreeSizeList[i];
VLOG(100) << "node:" << node << " sub tree node num:" << subTreeSizeList[i];
node->numnodes = subTreeSizeList[i];
}
radixTree->tree->head->numnodes = rootNumNodes;
raxShow(radixTree->tree);
// raxShow(radixTree->tree);

LOG(INFO) << "start to insert sub tree token list" << std::endl;
VLOG(100) << "start to insert sub tree token list" << std::endl;
for (size_t i = 0; i < subTreeTokenList.size(); i++) {
for (size_t j = 0; j < subTreeTokenList[i].size(); j++) {
LOG(INFO) << subTreeTokenList[i][j];
VLOG(100) << subTreeTokenList[i][j];
}

raxNode* node = nullptr;
LOG(INFO) << "stage 1";
VLOG(100) << "stage 1";
VINEYARD_ASSERT(radixTree->tree != nullptr);

// TBD refator this code.
node = raxFindAndReturnDataNode(radixTree->tree, subTreeTokenList[i].data(),
subTreeTokenList[i].size(), NULL, false);
VINEYARD_ASSERT(node != nullptr);
LOG(INFO) << "stage 2";
VLOG(100) << "stage 2";
DataWrapper* data = new DataWrapper();
data->data = subTreeDataList[i];
LOG(INFO) << subTreeDataList[i];
VLOG(100) << subTreeDataList[i];
data->dataLength = subTreeDataSizeList[i];

LOG(INFO) << "stage 3";
VLOG(100) << "stage 3";
node->issubtree = true;
raxSetCustomData(node, data);

radixTree->SetSubtreeData(subTreeDataList[i]);
}
LOG(INFO) << "Deserialize success";
raxShow(radixTree->tree);
VLOG(100) << "Deserialize success";
// raxShow(radixTree->tree);
return radixTree;
}

Check notice on line 518 in modules/kv-state-cache/radix-tree/radix-tree.cc

View check run for this annotation

codefactor.io / CodeFactor

modules/kv-state-cache/radix-tree/radix-tree.cc#L313-L518

Complex Method
Expand All @@ -524,7 +522,7 @@ std::vector<std::shared_ptr<NodeData>> RadixTree::SplitInternal(
raxNode* subTreeRootNode =
raxSplit(this->tree, tokens.data(), tokens.size(), rootToken);

raxShow(this->tree);
// raxShow(this->tree);
subTreeRootNode->issubtree = true;
DataWrapper* treeData = new DataWrapper();
treeData->data = nullptr;
Expand All @@ -540,14 +538,14 @@ std::vector<std::shared_ptr<NodeData>> RadixTree::TraverseTreeWithoutSubTree(
raxNode* headNode) {
std::vector<std::shared_ptr<NodeData>> nodes;
if (headNode == NULL) {
LOG(INFO) << "traverse failed";
VLOG(100) << "traverse failed";
return nodes;
}

std::vector<raxNode*> dataNodeList;
std::vector<int> pre_tmp;
raxTraverseSubTree(headNode, dataNodeList);
LOG(INFO) << "data node list:" << dataNodeList.size();
VLOG(100) << "data node list:" << dataNodeList.size();
for (size_t i = 0; i < dataNodeList.size(); i++) {
nodes.push_back(std::make_shared<NodeData>(
reinterpret_cast<DataWrapper*>(raxGetData(dataNodeList[i])),
Expand Down
13 changes: 6 additions & 7 deletions modules/kv-state-cache/radix-tree/radix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1686,7 +1686,6 @@ int raxIteratorNextStep(raxIterator *it, int noup) {
if (!raxStackPush(&it->stack,it->node)) return 0;
// if (it->node->issubtree && it->add_to_subtree_list && it->subtree_list != NULL &&
// it->subtree_data_list != NULL) {
// std::cout << "second find subtree list is:" << std::endl;
// std::vector<int> token;
// for (size_t i = 0; i < it->key_len; i++) {
// token.push_back(it->key[i]);
Expand Down Expand Up @@ -2383,7 +2382,7 @@ raxNode *raxSplit(rax *rax, int *s, size_t len, std::vector<int>& token) {
token_str += std::to_string(token[i]);
token_str += " ";
}
LOG(INFO) << "split token: " << token_str;
VLOG(100) << "split token: " << token_str;

// if the splitNode is NULL, it means that the tree only has one node
if (splitNode == NULL) {
Expand Down Expand Up @@ -2528,10 +2527,10 @@ void mergeTree(rax* first_tree, rax* second_tree,
std::vector<std::vector<int>>& evicted_tokens,
std::set<std::vector<int>>& insert_tokens, int max_node) {
printf("merge tree!\n");
LOG(INFO) << "==============tree 1====================";
raxShow(first_tree);
LOG(INFO) << "==============tree 2====================";
raxShow(second_tree);
VLOG(100) << "==============tree 1====================";
//raxShow(first_tree);
VLOG(100) << "==============tree 2====================";
//raxShow(second_tree);
raxIterator first_tree_iter;
raxIterator second_tree_iter;
rax* tmp = raxNew();
Expand Down Expand Up @@ -2787,7 +2786,7 @@ void mergeTree(rax* first_tree, rax* second_tree,
}
} else if (second_tree_index >= second_tree_iter_list.size()) {
// second tree is empty
raxShow(tmp);
//raxShow(tmp);
printf("nodeCount:%d\n", nodeCount);
printf("first_tree_index:%ld\n", first_tree_index);
while (first_tree_index < first_tree_iter_list.size() &&
Expand Down
1 change: 0 additions & 1 deletion modules/kv-state-cache/utils/kv_state_cache_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ void sync() {

// 8. release the lock
while (1) {
LOG(INFO) << "stage 7";
client.TryReleaseLock(actualKey, result);
if (result == true) {
break;
Expand Down
Loading

0 comments on commit 9f6a64e

Please sign in to comment.