diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index d1567a8fa79cb4..45158dc9aebdbe 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -476,9 +476,15 @@ class RuntimePredicateWrapper { const TExpr& probe_expr); Status merge(const RuntimePredicateWrapper* wrapper) { - if (wrapper->is_ignored()) { + if (wrapper->is_disabled()) { + set_disabled(); return Status::OK(); } + + if (wrapper->is_ignored() || is_disabled()) { + return Status::OK(); + } + _context->ignored = false; bool can_not_merge_in_or_bloom = @@ -938,6 +944,10 @@ class RuntimePredicateWrapper { void set_ignored() { _context->ignored = true; } + bool is_disabled() const { return _context->disabled; } + + void set_disabled() { _context->disabled = true; } + void batch_assign(const PInFilter* filter, void (*assign_func)(std::shared_ptr& _hybrid_set, PColumnValue&)) { @@ -1216,9 +1226,10 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress merge_filter_callback->cntl_->ignore_eovercrowded(); } - if (get_ignored()) { + if (get_ignored() || get_disabled()) { merge_filter_request->set_filter_type(PFilterType::UNKNOW_FILTER); - merge_filter_request->set_ignored(true); + merge_filter_request->set_ignored(get_ignored()); + merge_filter_request->set_disabled(get_disabled()); } else { RETURN_IF_ERROR(serialize(merge_filter_request.get(), &data, &len)); } @@ -1240,7 +1251,7 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::listis_ignored()) { + if (!_wrapper->is_ignored() && !_wrapper->is_disabled()) { _set_push_down(!is_late_arrival); RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs, _probe_expr)); } @@ -1289,9 +1300,9 @@ PrimitiveType IRuntimeFilter::column_type() const { void IRuntimeFilter::signal() { DCHECK(is_consumer()); - if (!_wrapper->is_ignored() && _wrapper->is_bloomfilter() && + if (!_wrapper->is_ignored() && !_wrapper->is_disabled() && _wrapper->is_bloomfilter() && !_wrapper->get_bloomfilter()->inited()) { - throw Exception(ErrorCode::INTERNAL_ERROR, "bf not inited and not ignored, rf: {}", + throw Exception(ErrorCode::INTERNAL_ERROR, "bf not inited and not ignored/disabled, rf: {}", debug_string()); } @@ -1344,12 +1355,21 @@ bool IRuntimeFilter::get_ignored() { return _wrapper->is_ignored(); } +void IRuntimeFilter::set_disabled() { + _wrapper->set_disabled(); +} + +bool IRuntimeFilter::get_disabled() const { + return _wrapper->is_disabled(); +} + std::string IRuntimeFilter::formatted_state() const { return fmt::format( "[Id = {}, IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, " - "HasLocalTarget = {}, Ignored = {}]", + "HasLocalTarget = {}, Ignored = {}, Disabled = {}, Type = {}, WaitTimeMS = {}]", _filter_id, _is_push_down, _get_explain_state_string(), _has_remote_target, - _has_local_target, _wrapper->_context->ignored); + _has_local_target, _wrapper->_context->ignored, _wrapper->_context->disabled, + _wrapper->get_real_type(), wait_time_ms()); } Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, @@ -1451,6 +1471,11 @@ Status IRuntimeFilter::create_wrapper(const UpdateRuntimeFilterParamsV2* param, *wrapper = std::make_shared(column_type, get_type(filter_type), param->request->filter_id()); + if (param->request->has_disabled() && param->request->disabled()) { + (*wrapper)->set_disabled(); + return Status::OK(); + } + if (param->request->has_ignored() && param->request->ignored()) { (*wrapper)->set_ignored(); return Status::OK(); @@ -1497,6 +1522,11 @@ Status IRuntimeFilter::_create_wrapper(const T* param, *wrapper = std::make_unique(column_type, get_type(filter_type), param->request->filter_id()); + if (param->request->has_disabled() && param->request->disabled()) { + (*wrapper)->set_disabled(); + return Status::OK(); + } + if (param->request->has_ignored() && param->request->ignored()) { (*wrapper)->set_ignored(); return Status::OK(); diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index b0e82a75335cc5..441de7d4da340c 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -284,6 +284,9 @@ class IRuntimeFilter { bool get_ignored(); + void set_disabled(); + bool get_disabled() const; + RuntimeFilterType get_real_type(); bool need_sync_filter_size(); diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 7eb8c131c8a303..a9dd631e3581a2 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -66,11 +66,17 @@ class VRuntimeFilterSlots { return filter->need_sync_filter_size() ? filter->get_synced_size() : hash_table_size; } - Status ignore_filters(RuntimeState* state) { + /** + Disable meaningless filters, such as filters: + RF1: col1 in (1, 3, 5) + RF2: col1 min: 1, max: 5 + We consider RF2 is meaningless, because RF1 has already filtered out all values that RF2 can filter. + */ + Status disable_meaningless_filters(RuntimeState* state) { // process ignore duplicate IN_FILTER std::unordered_set has_in_filter; for (auto filter : _runtime_filters) { - if (filter->get_ignored()) { + if (filter->get_ignored() || filter->get_disabled()) { continue; } if (filter->get_real_type() != RuntimeFilterType::IN_FILTER) { @@ -81,7 +87,7 @@ class VRuntimeFilterSlots { continue; } if (has_in_filter.contains(filter->expr_order())) { - filter->set_ignored(); + filter->set_disabled(); continue; } has_in_filter.insert(filter->expr_order()); @@ -89,14 +95,14 @@ class VRuntimeFilterSlots { // process ignore filter when it has IN_FILTER on same expr for (auto filter : _runtime_filters) { - if (filter->get_ignored()) { + if (filter->get_ignored() || filter->get_disabled()) { continue; } if (filter->get_real_type() == RuntimeFilterType::IN_FILTER || !has_in_filter.contains(filter->expr_order())) { continue; } - filter->set_ignored(); + filter->set_disabled(); } return Status::OK(); } @@ -108,6 +114,13 @@ class VRuntimeFilterSlots { return Status::OK(); } + Status disable_all_filters() { + for (auto filter : _runtime_filters) { + filter->set_disabled(); + } + return Status::OK(); + } + Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) { // process IN_OR_BLOOM_FILTER's real type for (auto filter : _runtime_filters) { @@ -135,7 +148,7 @@ class VRuntimeFilterSlots { int result_column_id = _build_expr_context[i]->get_last_result_column_id(); const auto& column = block->get_by_position(result_column_id).column; for (auto* filter : iter->second) { - if (filter->get_ignored()) { + if (filter->get_ignored() || filter->get_disabled()) { continue; } filter->insert_batch(column, 1); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 47560875b51252..e5d6a952d47992 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -17,6 +17,7 @@ #include "hashjoin_build_sink.h" +#include #include #include "exprs/bloom_filter_func.h" @@ -105,6 +106,15 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(JoinBuildSinkLocalState::open(state)); + +#ifndef NDEBUG + if (state->fuzzy_disable_runtime_filter_in_be()) { + if ((_parent->operator_id() + random()) % 2 == 0) { + RETURN_IF_ERROR(disable_runtime_filters(state)); + } + } +#endif + return Status::OK(); } @@ -135,7 +145,8 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu } }}; - if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || !_eos) { + if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || !_eos || + _runtime_filters_disabled) { return Base::close(state, exec_status); } @@ -150,7 +161,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu { SCOPED_TIMER(_runtime_filter_init_timer); RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size)); - RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state)); + RETURN_IF_ERROR(_runtime_filter_slots->disable_meaningless_filters(state)); } if (hash_table_size > 1) { SCOPED_TIMER(_runtime_filter_compute_timer); @@ -179,6 +190,33 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu return Base::close(state, exec_status); } +Status HashJoinBuildSinkLocalState::disable_runtime_filters(RuntimeState* state) { + if (_runtime_filters_disabled) { + return Status::OK(); + } + + if (_runtime_filters.empty()) { + return Status::OK(); + } + + if (!_should_build_hash_table) { + return Status::OK(); + } + + if (_runtime_filters.empty()) { + return Status::OK(); + } + + DCHECK(_runtime_filter_slots) << "_runtime_filter_slots should be initialized"; + + _runtime_filters_disabled = true; + RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency)); + RETURN_IF_ERROR(_runtime_filter_slots->disable_all_filters()); + + SCOPED_TIMER(_publish_runtime_filter_timer); + return _runtime_filter_slots->publish(state, !_should_build_hash_table); +} + bool HashJoinBuildSinkLocalState::build_unique() const { return _parent->cast()._build_unique; } @@ -509,9 +547,12 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._shared_state->build_block = std::make_shared( local_state._build_side_mutable_block.to_block()); - RETURN_IF_ERROR(local_state._runtime_filter_slots->send_filter_size( - state, local_state._shared_state->build_block->rows(), - local_state._finish_dependency)); + if (!local_state._runtime_filters_disabled) { + RETURN_IF_ERROR(local_state._runtime_filter_slots->send_filter_size( + state, local_state._shared_state->build_block->rows(), + local_state._finish_dependency)); + } + RETURN_IF_ERROR( local_state.process_build_block(state, (*local_state._shared_state->build_block))); if (_shared_hashtable_controller) { diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index cc78e6a769f3c3..906fc5f9bd4fe5 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -54,6 +54,8 @@ class HashJoinBuildSinkLocalState final Status close(RuntimeState* state, Status exec_status) override; + Status disable_runtime_filters(RuntimeState* state); + protected: Status _hash_table_init(RuntimeState* state); void _set_build_side_has_external_nullmap(vectorized::Block& block, @@ -76,6 +78,8 @@ class HashJoinBuildSinkLocalState final bool _should_build_hash_table = true; + bool _runtime_filters_disabled = false; + size_t _build_side_rows = 0; vectorized::MutableBlock _build_side_mutable_block; diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index b4a38173d72222..f43b037fc19e3d 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -391,10 +391,11 @@ Status RuntimeFilterMergeControllerEntity::merge(std::weak_ptr que void* data = nullptr; int len = 0; bool has_attachment = false; - if (!cnt_val->filter->get_ignored()) { + if (!cnt_val->filter->get_ignored() && !cnt_val->filter->get_disabled()) { RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, &data, &len)); } else { - apply_request.set_ignored(true); + apply_request.set_ignored(cnt_val->filter->get_ignored()); + apply_request.set_disabled(cnt_val->filter->get_disabled()); apply_request.set_filter_type(PFilterType::UNKNOW_FILTER); } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 1e7c1e579f7735..3cc3d97b5d22c5 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -592,6 +592,11 @@ class RuntimeState { _query_options.enable_shared_exchange_sink_buffer; } + bool fuzzy_disable_runtime_filter_in_be() const { + return _query_options.__isset.fuzzy_disable_runtime_filter_in_be && + _query_options.fuzzy_disable_runtime_filter_in_be; + } + int64_t min_revocable_mem() const { if (_query_options.__isset.min_revocable_mem) { return std::max(_query_options.min_revocable_mem, (int64_t)1); diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index fd3a753be8c0b3..ff9ad4d0ef48c2 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -47,6 +47,7 @@ struct RuntimeFilterContext { std::shared_ptr bloom_filter_func; std::shared_ptr bitmap_filter_func; bool ignored = false; + bool disabled = false; std::string err_msg; }; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 75f21c786b8c37..e0702d0015b022 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -574,6 +574,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_FORCE_SPILL = "enable_force_spill"; public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks"; + public static final String FUZZY_DISABLE_RUNTIME_FILTER_IN_BE = "fuzzy_disable_runtime_filter_in_be"; + public static final String GENERATE_STATS_FACTOR = "generate_stats_factor"; public static final String HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS @@ -2250,6 +2252,13 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) { needForward = true, fuzzy = true) public long dataQueueMaxBlocks = 1; + @VariableMgr.VarAttr( + name = FUZZY_DISABLE_RUNTIME_FILTER_IN_BE, + description = {"在 BE 上开启禁用 runtime filter 的随机开关,用于测试", + "Disable the runtime filter on the BE for testing purposes."}, + needForward = true, fuzzy = false) + public boolean fuzzyDisableRuntimeFilterInBE = false; + // If the memory consumption of sort node exceed this limit, will trigger spill to disk; // Set to 0 to disable; min: 128M public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 2097152; @@ -2499,6 +2508,8 @@ public void initFuzzyModeVariables() { this.batchSize = 1024; this.enableFoldConstantByBe = false; } + + this.fuzzyDisableRuntimeFilterInBE = true; } } @@ -3979,6 +3990,7 @@ public TQueryOptions toThrift() { tResult.setEnableForceSpill(enableForceSpill); tResult.setMinRevocableMem(minRevocableMem); tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks); + tResult.setFuzzyDisableRuntimeFilterInBe(fuzzyDisableRuntimeFilterInBE); tResult.setEnableLocalMergeSort(enableLocalMergeSort); tResult.setEnableSharedExchangeSinkBuffer(enableSharedExchangeSinkBuffer); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 8db8bab16cd499..547b2588168755 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -595,6 +595,7 @@ message PMergeFilterRequest { optional bool contain_null = 11; optional bool ignored = 12; optional uint64 local_merge_time = 13; + optional bool disabled = 14; }; message PMergeFilterResponse { @@ -615,6 +616,7 @@ message PPublishFilterRequest { optional PColumnType column_type = 10; optional bool contain_null = 11; optional bool ignored = 12; + optional bool disabled = 13; }; message PPublishFilterRequestV2 { @@ -631,6 +633,7 @@ message PPublishFilterRequestV2 { optional bool ignored = 11; repeated int32 fragment_ids = 12; optional uint64 local_merge_time = 13; + optional bool disabled = 14; }; message PPublishFilterResponse { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 0a1ea4a98fca94..6463935c5c8e88 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -360,6 +360,9 @@ struct TQueryOptions { 141: optional bool ignore_runtime_filter_error = false; 142: optional bool enable_fixed_len_to_uint32_v2 = false; 143: optional bool enable_shared_exchange_sink_buffer = true; + + 144: optional bool fuzzy_disable_runtime_filter_in_be = false; + // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. // In read path, read from file cache or remote storage when execute query.