Skip to content

Commit

Permalink
[Enchancement](runtime-filter) improvement for datetimev2 bloom filte…
Browse files Browse the repository at this point in the history
…r hash method (#44924)

### What problem does this PR solve?
improvement for datetimev2 bloom filter hash method
In the past, datetimev2 use to_int64 to get a 64bit data and truncate to
32bit data, then use this 32bit data to build the bloom filter. this can
lead to poor performance and bad filterability.
  • Loading branch information
BiteTheDDDDt authored Dec 5, 2024
1 parent c477fd1 commit 6a8ae77
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 94 deletions.
169 changes: 110 additions & 59 deletions be/src/exprs/bloom_filter_func.h

Large diffs are not rendered by default.

25 changes: 11 additions & 14 deletions be/src/exprs/create_predicate_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ inline auto create_bitmap_filter(PrimitiveType type) {
template <PrimitiveType PT>
ColumnPredicate* create_olap_column_predicate(uint32_t column_id,
const std::shared_ptr<BloomFilterFuncBase>& filter,
int be_exec_version, const TabletColumn*) {
const TabletColumn*) {
std::shared_ptr<BloomFilterFuncBase> filter_olap;
filter_olap.reset(create_bloom_filter(PT));
filter_olap->light_copy(filter.get());
Expand All @@ -243,26 +243,26 @@ ColumnPredicate* create_olap_column_predicate(uint32_t column_id,
template <PrimitiveType PT>
ColumnPredicate* create_olap_column_predicate(uint32_t column_id,
const std::shared_ptr<BitmapFilterFuncBase>& filter,
int be_exec_version, const TabletColumn*) {
const TabletColumn*) {
if constexpr (PT == TYPE_TINYINT || PT == TYPE_SMALLINT || PT == TYPE_INT ||
PT == TYPE_BIGINT) {
return new BitmapFilterColumnPredicate<PT>(column_id, filter, be_exec_version);
return new BitmapFilterColumnPredicate<PT>(column_id, filter);
} else {
throw Exception(ErrorCode::INTERNAL_ERROR, "bitmap filter do not support type {}", PT);
}
}

template <PrimitiveType PT>
ColumnPredicate* create_olap_column_predicate(uint32_t column_id,
const std::shared_ptr<HybridSetBase>& filter, int,
const std::shared_ptr<HybridSetBase>& filter,
const TabletColumn* column = nullptr) {
return create_in_list_predicate<PT, PredicateType::IN_LIST>(column_id, filter,
column->length());
}

template <PrimitiveType PT>
ColumnPredicate* create_olap_column_predicate(uint32_t column_id,
const std::shared_ptr<FunctionFilter>& filter, int,
const std::shared_ptr<FunctionFilter>& filter,
const TabletColumn* column = nullptr) {
// currently only support like predicate
if constexpr (PT == TYPE_CHAR) {
Expand All @@ -277,22 +277,19 @@ ColumnPredicate* create_olap_column_predicate(uint32_t column_id,

template <typename T>
ColumnPredicate* create_column_predicate(uint32_t column_id, const std::shared_ptr<T>& filter,
FieldType type, int be_exec_version,
const TabletColumn* column = nullptr) {
FieldType type, const TabletColumn* column = nullptr) {
switch (type) {
#define M(NAME) \
case FieldType::OLAP_FIELD_##NAME: { \
return create_olap_column_predicate<NAME>(column_id, filter, be_exec_version, column); \
#define M(NAME) \
case FieldType::OLAP_FIELD_##NAME: { \
return create_olap_column_predicate<NAME>(column_id, filter, column); \
}
APPLY_FOR_PRIMTYPE(M)
#undef M
case FieldType::OLAP_FIELD_TYPE_DECIMAL: {
return create_olap_column_predicate<TYPE_DECIMALV2>(column_id, filter, be_exec_version,
column);
return create_olap_column_predicate<TYPE_DECIMALV2>(column_id, filter, column);
}
case FieldType::OLAP_FIELD_TYPE_BOOL: {
return create_olap_column_predicate<TYPE_BOOLEAN>(column_id, filter, be_exec_version,
column);
return create_olap_column_predicate<TYPE_BOOLEAN>(column_id, filter, column);
}
default:
return nullptr;
Expand Down
17 changes: 16 additions & 1 deletion be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,12 @@ class RuntimePredicateWrapper {
return Status::OK();
}

void set_enable_fixed_len_to_uint32_v2() {
if (is_bloomfilter()) {
_context->bloom_filter_func->set_enable_fixed_len_to_uint32_v2();
}
}

// used by shuffle runtime filter
// assign this filter by protobuf
Status assign(const PBloomFilter* bloom_filter, butil::IOBufAsZeroCopyInputStream* data,
Expand Down Expand Up @@ -1357,6 +1363,8 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
_expr_order = desc->expr_order;
vectorized::VExprContextSPtr build_ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(desc->src_expr, build_ctx));
_enable_fixed_len_to_uint32_v2 = options->__isset.enable_fixed_len_to_uint32_v2 &&
options->enable_fixed_len_to_uint32_v2;

RuntimeFilterParams params;
params.filter_id = _filter_id;
Expand Down Expand Up @@ -1407,7 +1415,11 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
}

_wrapper = std::make_shared<RuntimePredicateWrapper>(&params);
return _wrapper->init(&params);
RETURN_IF_ERROR(_wrapper->init(&params));
if (_enable_fixed_len_to_uint32_v2) {
_wrapper->set_enable_fixed_len_to_uint32_v2();
}
return Status::OK();
}

Status IRuntimeFilter::serialize(PMergeFilterRequest* request, void** data, int* len) {
Expand Down Expand Up @@ -1604,6 +1616,9 @@ void IRuntimeFilter::update_filter(std::shared_ptr<RuntimePredicateWrapper> wrap
wrapper->_column_return_type = _wrapper->_column_return_type;
}
_wrapper = wrapper;
if (_enable_fixed_len_to_uint32_v2) {
_wrapper->set_enable_fixed_len_to_uint32_v2();
}
update_runtime_filter_type_to_profile(local_merge_time);
signal();
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,8 @@ class IRuntimeFilter {

int64_t _synced_size = -1;
std::shared_ptr<pipeline::CountedFinishDependency> _dependency;

bool _enable_fixed_len_to_uint32_v2 = false;
};

// avoid expose RuntimePredicateWrapper
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/bitmap_filter_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class BitmapFilterColumnPredicate : public ColumnPredicate {
using SpecificFilter = BitmapFilterFunc<T>;

BitmapFilterColumnPredicate(uint32_t column_id,
const std::shared_ptr<BitmapFilterFuncBase>& filter, int)
const std::shared_ptr<BitmapFilterFuncBase>& filter)
: ColumnPredicate(column_id),
_filter(filter),
_specific_filter(assert_cast<SpecificFilter*>(_filter.get())) {}
Expand Down
12 changes: 4 additions & 8 deletions be/src/olap/tablet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -598,8 +598,7 @@ ColumnPredicate* TabletReader::_parse_to_predicate(
return nullptr;
}
const TabletColumn& column = materialize_column(_tablet_schema->column(index));
return create_column_predicate(index, bloom_filter.second, column.type(),
_reader_context.runtime_state->be_exec_version(), &column);
return create_column_predicate(index, bloom_filter.second, column.type(), &column);
}

ColumnPredicate* TabletReader::_parse_to_predicate(
Expand All @@ -609,8 +608,7 @@ ColumnPredicate* TabletReader::_parse_to_predicate(
return nullptr;
}
const TabletColumn& column = materialize_column(_tablet_schema->column(index));
return create_column_predicate(index, in_filter.second, column.type(),
_reader_context.runtime_state->be_exec_version(), &column);
return create_column_predicate(index, in_filter.second, column.type(), &column);
}

ColumnPredicate* TabletReader::_parse_to_predicate(
Expand All @@ -620,8 +618,7 @@ ColumnPredicate* TabletReader::_parse_to_predicate(
return nullptr;
}
const TabletColumn& column = materialize_column(_tablet_schema->column(index));
return create_column_predicate(index, bitmap_filter.second, column.type(),
_reader_context.runtime_state->be_exec_version(), &column);
return create_column_predicate(index, bitmap_filter.second, column.type(), &column);
}

ColumnPredicate* TabletReader::_parse_to_predicate(const FunctionFilter& function_filter) {
Expand All @@ -631,8 +628,7 @@ ColumnPredicate* TabletReader::_parse_to_predicate(const FunctionFilter& functio
}
const TabletColumn& column = materialize_column(_tablet_schema->column(index));
return create_column_predicate(index, std::make_shared<FunctionFilter>(function_filter),
column.type(), _reader_context.runtime_state->be_exec_version(),
&column);
column.type(), &column);
}

Status TabletReader::_init_delete_condition(const ReaderParams& read_params) {
Expand Down
8 changes: 0 additions & 8 deletions be/src/util/hash_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,6 @@ namespace doris {
// Utility class to compute hash values.
class HashUtil {
public:
template <typename T>
static uint32_t fixed_len_to_uint32(T value) {
if constexpr (sizeof(T) <= sizeof(uint32_t)) {
return (uint32_t)value;
}
return std::hash<T>()(value);
}

static uint32_t zlib_crc_hash(const void* data, uint32_t bytes, uint32_t hash) {
return crc32(hash, (const unsigned char*)data, bytes);
}
Expand Down
2 changes: 0 additions & 2 deletions be/src/vec/runtime/vdatetime_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -1275,8 +1275,6 @@ class DateV2Value {
}
}

operator int64_t() const { return to_int64(); }

int64_t to_int64() const {
if constexpr (is_datetime) {
return (date_v2_value_.year_ * 10000L + date_v2_value_.month_ * 100 +
Expand Down
2 changes: 1 addition & 1 deletion be/test/olap/bitmap_filter_column_predicate_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class BitmapFilterColumnPredicateTest : public testing::Test {
template <PrimitiveType type>
BitmapFilterColumnPredicate<type> create_predicate(
const std::shared_ptr<BitmapFilterFunc<type>>& filter) {
return BitmapFilterColumnPredicate<type>(0, filter, 0);
return BitmapFilterColumnPredicate<type>(0, filter);
}

const std::string kTestDir = "./ut_dir/bitmap_filter_column_predicate_test";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,11 @@ public enum IgnoreSplitType {
"Ignore the rf when it encounters an error" })
public boolean ignoreRuntimeFilterError = false;

@VariableMgr.VarAttr(name = "enable_fixed_len_to_uint32_v2", needForward = true, description = {
"使用新版本fixed_len_to_uint32_v2,对datetimev2类型bloom filter做了优化",
"Using the new version fixed_len_to_uint32_v2, the datetimev2 type bloom filter has been optimized" })
public boolean enableFixedLenToUint32V2 = true;

@VariableMgr.VarAttr(name = RUNTIME_FILTER_MODE, needForward = true)
private String runtimeFilterMode = "GLOBAL";

Expand Down Expand Up @@ -3984,6 +3989,7 @@ public TQueryOptions toThrift() {
tResult.setOrcMaxMergeDistanceBytes(orcMaxMergeDistanceBytes);
tResult.setOrcOnceMaxReadBytes(orcOnceMaxReadBytes);
tResult.setIgnoreRuntimeFilterError(ignoreRuntimeFilterError);
tResult.setEnableFixedLenToUint32V2(enableFixedLenToUint32V2);

return tResult;
}
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ struct TQueryOptions {
140: optional i64 orc_max_merge_distance_bytes = 1048576;

141: optional bool ignore_runtime_filter_error = false;
142: optional bool enable_fixed_len_to_uint32_v2 = false;

// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
Expand Down

0 comments on commit 6a8ae77

Please sign in to comment.