Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable code formatting of radix tree and fix a bug of serialization and deserialization #1771

Merged
merged 3 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ endfunction()

file_glob_recurse(FILES_NEED_FORMAT DIRECTORIES "src" "modules" "python" "test" "benchmark"
PATTERNS ".*\\.(cc|cpp|h|hpp|vineyard-mod)$"
EXCLUDE_PATTERNS "(.*\\.vineyard.h$)|(modules/kv-state-cache/radix-tree/ra.*)"
EXCLUDE_PATTERNS "(.*\\.vineyard.h$)|(.*modules/kv-state-cache/radix-tree/radix\.(cc|h)$)"
)

# the `memcpy.h` is borrowed from external project
Expand Down
110 changes: 63 additions & 47 deletions modules/kv-state-cache/radix-tree/radix-tree.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

#include "radix-tree.h"
#include "kv-state-cache/radix-tree/radix-tree.h"

#include "common/util/base64.h"
#include "common/util/logging.h"
#include "common/util/status.h"

#include "zstd/lib/zstd.h"

using namespace vineyard;
using namespace vineyard; // NOLINT(build/namespaces)

RadixTree::RadixTree(int cacheCapacity) {
this->tree = raxNew();
Expand Down Expand Up @@ -52,8 +52,8 @@ RadixTree::~RadixTree() {
raxNode* dataNode = raxFindAndReturnDataNode(this->tree, rootToken.data(),
rootToken.size(), NULL, false);
if (dataNode != nullptr) {
delete (DataWrapper*) dataNode->custom_data;
delete (DataWrapper*) raxGetData(dataNode);
delete reinterpret_cast<DataWrapper*>(dataNode->custom_data);
delete reinterpret_cast<DataWrapper*>(raxGetData(dataNode));
}

raxFree(this->tree);
Expand Down Expand Up @@ -99,7 +99,7 @@ std::shared_ptr<NodeData> RadixTree::InsertInternal(
raxNode* dataNode = NULL;
int retval = raxInsertAndReturnDataNode(
this->tree, insertTokensArray, insertTokensArrayLen, dummyData,
(void**) &dataNode, (void**) &oldData);
reinterpret_cast<void**>(&dataNode), reinterpret_cast<void**>(&oldData));
if (dataNode == NULL) {
throw std::runtime_error("Insert token list failed");
return NULL;
Expand Down Expand Up @@ -140,8 +140,8 @@ std::shared_ptr<NodeData> RadixTree::InsertInternal(
if (subTreeNode == nullptr) {
return std::make_shared<NodeData>(dummyData, nullptr);
}
return std::make_shared<NodeData>(dummyData,
(DataWrapper*) subTreeNode->custom_data);
return std::make_shared<NodeData>(
dummyData, reinterpret_cast<DataWrapper*>(subTreeNode->custom_data));
}

void RadixTree::DeleteInternal(std::vector<int> tokens,
Expand All @@ -162,10 +162,10 @@ void RadixTree::DeleteInternal(std::vector<int> tokens,
nodeIsSubTree = true;
}
int retval = raxRemove(this->tree, deleteTokensArray, deleteTokensArrayLen,
(void**) &oldData);
reinterpret_cast<void**>(&oldData));
if (retval == 1) {
evictedNode = std::make_shared<NodeData>(
oldData, (DataWrapper*) subTreeNode->custom_data);
oldData, reinterpret_cast<DataWrapper*>(subTreeNode->custom_data));
nodeCount--;
if (nodeIsSubTree) {
evictedNode->cleanTreeData = true;
Expand Down Expand Up @@ -193,8 +193,9 @@ std::shared_ptr<NodeData> RadixTree::QueryInternal(std::vector<int> key) {
return NULL;
}

return std::make_shared<NodeData>((DataWrapper*) raxGetData(dataNode),
(DataWrapper*) subTreeNode->custom_data);
return std::make_shared<NodeData>(
reinterpret_cast<DataWrapper*>(raxGetData(dataNode)),
reinterpret_cast<DataWrapper*>(subTreeNode->custom_data));
}

std::string RadixTree::Serialize() {
Expand Down Expand Up @@ -241,10 +242,13 @@ std::string RadixTree::Serialize() {
serializedStr += subTreeSizeOSS.str() + "|";

// convert data to hex string
char* bytes = (char*) ((DataWrapper*) dataList[index])->data;
char* bytes = reinterpret_cast<char*>(
(reinterpret_cast<DataWrapper*>(dataList[index]))->data);
std::ostringstream dataOSS;

for (int i = 0; i < ((DataWrapper*) dataList[index])->dataLength; i++) {
for (int i = 0;
i < (reinterpret_cast<DataWrapper*>(dataList[index]))->dataLength;
i++) {
dataOSS << std::hex << std::setw(2) << std::setfill('0')
<< static_cast<int>(static_cast<unsigned char>(bytes[i]));
}
Expand All @@ -264,38 +268,45 @@ std::string RadixTree::Serialize() {
serializedStr += "|";

// convert custom data to hex string
char* bytes = (char*) ((DataWrapper*) subTreeDataList[index])->data;
char* bytes = reinterpret_cast<char*>(
(reinterpret_cast<DataWrapper*>(subTreeDataList[index]))->data);
std::ostringstream dataOSS;

LOG(INFO) << "data length:"
<< ((DataWrapper*) subTreeDataList[index])->dataLength;
for (int i = 0; i < ((DataWrapper*) subTreeDataList[index])->dataLength;
LOG(INFO)
<< "data length:"
<< (reinterpret_cast<DataWrapper*>(subTreeDataList[index]))->dataLength;
for (int i = 0;
i <
(reinterpret_cast<DataWrapper*>(subTreeDataList[index]))->dataLength;
++i) {
dataOSS << std::hex << std::setw(2) << std::setfill('0')
<< static_cast<int>(static_cast<unsigned char>(bytes[i]));
}
LOG(INFO) << "data:" << ((DataWrapper*) subTreeDataList[index])->data;
LOG(INFO) << "data:"
<< (reinterpret_cast<DataWrapper*>(subTreeDataList[index]))->data;
LOG(INFO) << "data oss:" << dataOSS.str();
serializedStr += dataOSS.str() + "\n";
}
LOG(INFO) << "serializedStr:" << serializedStr;

// use ZSTD to compress the serialized string
size_t srcSize = serializedStr.size();
std::string compressedStr(srcSize, '\0');
int compressedSize =
ZSTD_compress((void*) (compressedStr.c_str()), compressedStr.length(),
serializedStr.c_str(), srcSize, 3);
size_t dstSize = ZSTD_compressBound(srcSize);
std::string compressedStr(dstSize + 1, '\0');
LOG(INFO) << "src size:" << srcSize << " dst size:" << dstSize;
int compressedSize = ZSTD_compress(compressedStr.data(), compressedStr.size(),
serializedStr.c_str(), srcSize, 3);
if (ZSTD_isError(compressedSize)) {
LOG(ERROR) << "ZSTD compression failed: "
<< ZSTD_getErrorName(compressedSize);
}
int cacheCapacity = this->cacheCapacity - 1;

std::string result =
std::string((char*) &srcSize, sizeof(int)) +
std::string((char*) &cacheCapacity, sizeof(int)) +
std::string((char*) &(this->tree->head->numnodes), sizeof(uint32_t)) +
std::string(reinterpret_cast<char*>(&compressedSize), sizeof(int)) +
std::string(reinterpret_cast<char*>(&cacheCapacity), sizeof(int)) +
std::string(reinterpret_cast<char*>(&(this->tree->head->numnodes)),
sizeof(uint32_t)) +
compressedStr;

return result;
Expand All @@ -304,16 +315,23 @@ std::string RadixTree::Serialize() {
std::shared_ptr<RadixTree> RadixTree::Deserialize(std::string data) {
LOG(INFO) << "Deserialize......";
// use LZ4 to decompress the serialized string
int srcSize = *(int*) data.c_str();
int compressedSize = *reinterpret_cast<int*>(data.data());
data.erase(0, sizeof(int));
int cacheCapacity = *(int*) data.c_str();
int cacheCapacity = *reinterpret_cast<int*>(data.data());
data.erase(0, sizeof(int));
int rootNumNodes = *(uint32_t*) data.c_str();
int rootNumNodes = *reinterpret_cast<uint32_t*>(data.data());
data.erase(0, sizeof(uint32_t));
std::string decompressedStr(srcSize, '\0');
int ds = ZSTD_getFrameContentSize(data.c_str(), data.size());
if (ds == ZSTD_CONTENTSIZE_ERROR) {
LOG(ERROR) << "Error: not a valid compressed frame";
} else if (ds == ZSTD_CONTENTSIZE_UNKNOWN) {
LOG(ERROR)
<< "Error: original size unknown. Use streaming decompression instead.";
}

std::string decompressedStr(ds + 1, '\0');
int decompressedSize =
ZSTD_decompress((void*) (decompressedStr.c_str()), decompressedStr.size(),
data.c_str(), srcSize);
ZSTD_decompress(decompressedStr.data(), ds, data.c_str(), compressedSize);
if (ZSTD_isError(decompressedSize)) {
LOG(ERROR) << "ZSTD decompression failed: "
<< ZSTD_getErrorName(decompressedSize);
Expand All @@ -338,7 +356,6 @@ std::shared_ptr<RadixTree> RadixTree::Deserialize(std::string data) {
line.pop_back();
continue;
}
LOG(INFO) << "data line:" << line << std::endl;
std::istringstream lineStream(line);
std::string tokenListPart, timestampPart, dataPart, subTreeSizePart;

Expand All @@ -357,7 +374,7 @@ std::shared_ptr<RadixTree> RadixTree::Deserialize(std::string data) {
}
}
if (!std::getline(lineStream, dataPart)) {
LOG(INFO) << "data length is 0";
LOG(ERROR) << "data length is 0";
}

std::istringstream keyStream(tokenListPart);
Expand All @@ -371,17 +388,14 @@ std::shared_ptr<RadixTree> RadixTree::Deserialize(std::string data) {
if (isMainTree) {
std::istringstream timestampStream(timestampPart);
if (!(timestampStream >> std::hex >> timestamp)) {
LOG(INFO) << "Invalid timestamp format.";
throw std::runtime_error("Invalid timestamp format.");
LOG(ERROR) << "Invalid timestamp format.";
}

std::istringstream subTreeSizeStream(subTreeSizePart);
uint32_t subTreeSize;
if (!(subTreeSizeStream >> std::hex >> subTreeSize)) {
LOG(INFO) << "Invalid sub tree size format.";
throw std::runtime_error("Invalid sub tree size format.");
LOG(ERROR) << "Invalid sub tree size format.";
}
LOG(INFO) << "Deserialize sub tree size:" << subTreeSize;
subTreeSizeList.push_back(subTreeSize);
}

Expand Down Expand Up @@ -455,8 +469,8 @@ std::shared_ptr<RadixTree> RadixTree::Deserialize(std::string data) {
// TBD
// check retval
raxInsertAndReturnDataNode(radixTree->tree, insertTokensArray,
insertTokensArrayLen, data, (void**) &dataNode,
NULL);
insertTokensArrayLen, data,
reinterpret_cast<void**>(&dataNode), NULL);

if (dataNode == NULL) {
throw std::runtime_error("Insert token list failed");
Expand Down Expand Up @@ -517,7 +531,7 @@ std::vector<std::shared_ptr<NodeData>> RadixTree::SplitInternal(
treeData->dataLength = 0;
subTreeRootNode->custom_data = treeData;
header = std::make_shared<NodeData>(
(DataWrapper*) raxGetData(subTreeRootNode), treeData);
reinterpret_cast<DataWrapper*>(raxGetData(subTreeRootNode)), treeData);
return TraverseTreeWithoutSubTree(subTreeRootNode);
}

Expand All @@ -536,8 +550,8 @@ std::vector<std::shared_ptr<NodeData>> RadixTree::TraverseTreeWithoutSubTree(
LOG(INFO) << "data node list:" << dataNodeList.size();
for (size_t i = 0; i < dataNodeList.size(); i++) {
nodes.push_back(std::make_shared<NodeData>(
(DataWrapper*) raxGetData(dataNodeList[i]),
(DataWrapper*) dataNodeList[i]->custom_data));
reinterpret_cast<DataWrapper*>(raxGetData(dataNodeList[i])),
reinterpret_cast<DataWrapper*>(dataNodeList[i]->custom_data)));
}
return nodes;
}
Expand All @@ -555,8 +569,9 @@ void RadixTree::ClearSubtreeData(void* data) {
std::shared_ptr<NodeData> RadixTree::GetRootNode() {
raxNode* node = raxFindAndReturnDataNode(this->tree, rootToken.data(),
rootToken.size(), NULL);
return std::make_shared<NodeData>((DataWrapper*) raxGetData(node),
(DataWrapper*) node->custom_data);
return std::make_shared<NodeData>(
reinterpret_cast<DataWrapper*>(raxGetData(node)),
reinterpret_cast<DataWrapper*>(node->custom_data));
}

void RadixTree::MergeTree(std::shared_ptr<RadixTree> tree_1,
Expand Down Expand Up @@ -591,7 +606,8 @@ std::set<void*> RadixTree::GetAllNodeData() {
if (node->isnull) {
continue;
}
nodeDataSet.insert(((DataWrapper*) raxGetData(node))->data);
nodeDataSet.insert(
(reinterpret_cast<DataWrapper*>(raxGetData(node)))->data);
}
return nodeDataSet;
}
}
19 changes: 10 additions & 9 deletions modules/kv-state-cache/radix-tree/radix-tree.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,22 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

#ifndef RADIX_TREE_H
#define RADIX_TREE_H
#ifndef MODULES_KV_STATE_CACHE_RADIX_TREE_RADIX_TREE_H_
#define MODULES_KV_STATE_CACHE_RADIX_TREE_RADIX_TREE_H_

#include "radix.h"

#include "common/util/base64.h"
#include "common/util/logging.h"
#include "kv-state-cache/radix-tree/radix.h"

#include <iomanip>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <vector>

using namespace vineyard;
#include "common/util/base64.h"
#include "common/util/logging.h"

using namespace vineyard; // NOLINT(build/namespaces)

struct DataWrapper {
void* data;
Expand Down Expand Up @@ -75,7 +76,7 @@ class RadixTree : public std::enable_shared_from_this<RadixTree> {
std::vector<int> tokens, std::shared_ptr<NodeData>& header);

public:
RadixTree(int cacheCapacity);
RadixTree(int cacheCapacity); // NOLINT(runtime/explicit)

~RadixTree();

Expand Down Expand Up @@ -117,4 +118,4 @@ class RadixTree : public std::enable_shared_from_this<RadixTree> {
std::set<void*> GetAllNodeData();
};

#endif
#endif // MODULES_KV_STATE_CACHE_RADIX_TREE_RADIX_TREE_H_"
Loading
Loading