Skip to content

Commit

Permalink
[opt](bloomfilter index) optimize memory usage for bloom filter index…
Browse files Browse the repository at this point in the history
… writer #45833 (#46047)

cherry pick from #45833
  • Loading branch information
airborne12 authored Dec 27, 2024
1 parent f2c7eb9 commit d2c1087
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 12 deletions.
10 changes: 10 additions & 0 deletions be/src/olap/rowset/segment_v2/bloom_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,16 @@ class BloomFilter {
return hash_code;
}

static Result<uint64_t> hash(const char* buf, uint32_t size, HashStrategyPB strategy) {
if (strategy == HASH_MURMUR3_X64_64) {
uint64_t hash_code;
murmur_hash3_x64_64(buf, size, DEFAULT_SEED, &hash_code);
return hash_code;
} else {
return Status::InvalidArgument("invalid strategy:{}", strategy);
}
}

virtual void add_bytes(const char* buf, uint32_t size) {
if (buf == nullptr) {
*_has_null = true;
Expand Down
27 changes: 16 additions & 11 deletions be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@ class BloomFilterIndexWriterImpl : public BloomFilterIndexWriter {
for (int i = 0; i < count; ++i) {
if (_values.find(*v) == _values.end()) {
if constexpr (_is_slice_type()) {
CppType new_value;
RETURN_IF_CATCH_EXCEPTION(_type_info->deep_copy(&new_value, v, &_arena));
_values.insert(new_value);
const auto* s = reinterpret_cast<const Slice*>(v);
auto hash =
DORIS_TRY(BloomFilter::hash(s->data, s->size, _bf_options.strategy));
_hash_values.insert(hash);
} else if constexpr (_is_int128()) {
int128_t new_value;
memcpy(&new_value, v, sizeof(PackedInt128));
Expand All @@ -105,25 +106,28 @@ class BloomFilterIndexWriterImpl : public BloomFilterIndexWriter {
Status flush() override {
std::unique_ptr<BloomFilter> bf;
RETURN_IF_ERROR(BloomFilter::create(BLOCK_BLOOM_FILTER, &bf));
RETURN_IF_ERROR(bf->init(_values.size(), _bf_options.fpp, _bf_options.strategy));
bf->set_has_null(_has_null);
for (auto& v : _values) {
if constexpr (_is_slice_type()) {
Slice* s = (Slice*)&v;
bf->add_bytes(s->data, s->size);
} else {
if constexpr (_is_slice_type()) {
RETURN_IF_ERROR(bf->init(_hash_values.size(), _bf_options.fpp, _bf_options.strategy));
for (const auto& h : _hash_values) {
bf->add_hash(h);
}
} else {
RETURN_IF_ERROR(bf->init(_values.size(), _bf_options.fpp, _bf_options.strategy));
for (auto& v : _values) {
bf->add_bytes((char*)&v, sizeof(CppType));
}
}
bf->set_has_null(_has_null);
_bf_buffer_size += bf->size();
_bfs.push_back(std::move(bf));
_values.clear();
_hash_values.clear();
_has_null = false;
return Status::OK();
}

Status finish(io::FileWriter* file_writer, ColumnIndexMetaPB* index_meta) override {
if (_values.size() > 0) {
if (_values.size() > 0 || !_hash_values.empty()) {
RETURN_IF_ERROR(flush());
}
index_meta->set_type(BLOOM_FILTER_INDEX);
Expand Down Expand Up @@ -172,6 +176,7 @@ class BloomFilterIndexWriterImpl : public BloomFilterIndexWriter {
// distinct values
ValueDict _values;
std::vector<std::unique_ptr<BloomFilter>> _bfs;
std::set<uint64_t> _hash_values;
};

} // namespace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,12 @@ void test_bloom_filter_index_reader_writer_template(
}
// test nullptr
EXPECT_TRUE(bf->test_bytes(nullptr, 1));

if (is_slice_type) {
Slice* value = (Slice*)(not_exist_value);
EXPECT_FALSE(bf->test_bytes(value->data, value->size));
} else {
EXPECT_FALSE(bf->test_bytes((char*)not_exist_value, sizeof(CppType)));
}
delete reader;
}
}
Expand Down

0 comments on commit d2c1087

Please sign in to comment.