diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index c983af0fb3ea716..79525319dafe0a1 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -394,8 +394,8 @@ class RuntimePredicateWrapper { BloomFilterFuncBase* get_bloomfilter() const { return _context->bloom_filter_func.get(); } void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) { - if (is_ignored()) { - throw Exception(ErrorCode::INTERNAL_ERROR, "insert_fixed_len meet ignored rf"); + if (is_ignored() || is_disabled()) { + throw Exception(ErrorCode::INTERNAL_ERROR, "insert_fixed_len meet ignored/disabled rf"); } switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { @@ -476,6 +476,18 @@ class RuntimePredicateWrapper { const TExpr& probe_expr); Status merge(const RuntimePredicateWrapper* wrapper) { + if (wrapper->_context->disabled) { + _context->disabled = true; + _context->ignored = true; + + LOG(INFO) << "Runtime filter is disabled: " << _filter_id; + return Status::OK(); + } + + if (_context->disabled) { + return Status::OK(); + } + if (wrapper->is_ignored()) { return Status::OK(); } @@ -932,6 +944,10 @@ class RuntimePredicateWrapper { void set_ignored() { _context->ignored = true; } + bool is_disabled() const { return _context->disabled; } + + void set_disabled() { _context->disabled = true; } + void batch_assign(const PInFilter* filter, void (*assign_func)(std::shared_ptr& _hybrid_set, PColumnValue&)) { @@ -1211,9 +1227,10 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress merge_filter_callback->cntl_->ignore_eovercrowded(); } - if (get_ignored()) { + if (get_ignored() || get_disabled()) { merge_filter_request->set_filter_type(PFilterType::UNKNOW_FILTER); - merge_filter_request->set_ignored(true); + merge_filter_request->set_disabled(get_disabled()); + merge_filter_request->set_ignored(get_ignored()); } else { RETURN_IF_ERROR(serialize(merge_filter_request.get(), &data, &len)); } @@ -1235,7 +1252,7 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::listis_ignored()) { + if (!_wrapper->is_ignored() && !_wrapper->is_disabled()) { _set_push_down(!is_late_arrival); RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs, _probe_expr)); } @@ -1284,10 +1301,10 @@ PrimitiveType IRuntimeFilter::column_type() const { void IRuntimeFilter::signal() { DCHECK(is_consumer()); - if (!_wrapper->is_ignored() && _wrapper->is_bloomfilter() && + if (!_wrapper->is_ignored() && !_wrapper->is_disabled() && _wrapper->is_bloomfilter() && !_wrapper->get_bloomfilter()->inited()) { - throw Exception(ErrorCode::INTERNAL_ERROR, "bf not inited and not ignored, rf: {}", - debug_string()); + throw Exception(ErrorCode::INTERNAL_ERROR, + "bf not inited and not ignored and not disabled, rf: {}", debug_string()); } COUNTER_SET(_wait_timer, int64_t((MonotonicMillis() - registration_time_) * NANOS_PER_MILLIS)); @@ -1339,12 +1356,21 @@ bool IRuntimeFilter::get_ignored() { return _wrapper->is_ignored(); } +void IRuntimeFilter::set_disabled() { + _wrapper->_context->disabled = true; +} + +bool IRuntimeFilter::get_disabled() const { + return _wrapper->is_disabled(); +} + std::string IRuntimeFilter::formatted_state() const { return fmt::format( "[Id = {}, IsPushDown = {}, RuntimeFilterState = {}, HasRemoteTarget = {}, " - "HasLocalTarget = {}, Ignored = {}]", + "HasLocalTarget = {}, Ignored = {}, Disabled = {}, Type = {}, WaitTimeMS = {}]", _filter_id, _is_push_down, _get_explain_state_string(), _has_remote_target, - _has_local_target, _wrapper->_context->ignored); + _has_local_target, _wrapper->_context->ignored, _wrapper->_context->disabled, + _wrapper->get_real_type(), wait_time_ms()); } Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, @@ -1497,6 +1523,10 @@ Status IRuntimeFilter::_create_wrapper(const T* param, *wrapper = std::make_unique(column_type, get_type(filter_type), param->request->filter_id()); + if (param->request->has_disabled() && param->request->disabled()) { + (*wrapper)->set_disabled(); + } + if (param->request->has_ignored() && param->request->ignored()) { (*wrapper)->set_ignored(); return Status::OK(); diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 63ef3e2dbd46537..4e3a9aff77e294a 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -285,6 +285,10 @@ class IRuntimeFilter { bool get_ignored(); + void set_disabled(); + + bool get_disabled() const; + RuntimeFilterType get_real_type(); bool need_sync_filter_size(); diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 7eb8c131c8a303d..65232ad433fb8f2 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -66,11 +66,18 @@ class VRuntimeFilterSlots { return filter->need_sync_filter_size() ? filter->get_synced_size() : hash_table_size; } + Status disable_filters(RuntimeState* state) { + for (auto& filter : _runtime_filters) { + filter->set_disabled(); + } + return Status::OK(); + } + Status ignore_filters(RuntimeState* state) { // process ignore duplicate IN_FILTER std::unordered_set has_in_filter; for (auto filter : _runtime_filters) { - if (filter->get_ignored()) { + if (filter->get_ignored() || filter->get_disabled()) { continue; } if (filter->get_real_type() != RuntimeFilterType::IN_FILTER) { @@ -89,7 +96,7 @@ class VRuntimeFilterSlots { // process ignore filter when it has IN_FILTER on same expr for (auto filter : _runtime_filters) { - if (filter->get_ignored()) { + if (filter->get_ignored() || filter->get_disabled()) { continue; } if (filter->get_real_type() == RuntimeFilterType::IN_FILTER || @@ -135,7 +142,7 @@ class VRuntimeFilterSlots { int result_column_id = _build_expr_context[i]->get_last_result_column_id(); const auto& column = block->get_by_position(result_column_id).column; for (auto* filter : iter->second) { - if (filter->get_ignored()) { + if (filter->get_ignored() || filter->get_disabled()) { continue; } filter->insert_batch(column, 1); diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 57ebfd24558b6b8..8e221a1c7e2341f 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -261,7 +261,7 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b while (!state->is_cancelled() && !has_agg_data && !_shared_state->spill_partitions.empty()) { while (!_shared_state->spill_partitions[0]->spill_streams_.empty() && - !state->is_cancelled()) { + !state->is_cancelled() && !has_agg_data) { auto& stream = _shared_state->spill_partitions[0]->spill_streams_[0]; stream->set_read_counters(profile()); vectorized::Block block; diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index f6cea157cd5d182..5f7d39750f3f5fe 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -968,7 +968,6 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori local_state._shared_state->inner_runtime_state.get(), block, eos)); if (*eos) { _update_profile_from_internal_states(local_state); - local_state._shared_state->inner_runtime_state.reset(); } } diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 95675004c701bda..e4a65d0cdf844fc 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -23,7 +23,9 @@ #include #include +#include "common/exception.h" #include "common/logging.h" +#include "common/status.h" #include "pipeline/exec/operator.h" #include "pipeline/exec/spill_utils.h" #include "pipeline/pipeline_task.h" @@ -31,6 +33,7 @@ #include "util/mem_info.h" #include "util/pretty_printer.h" #include "util/runtime_profile.h" +#include "vec/core/block.h" #include "vec/spill/spill_stream.h" #include "vec/spill/spill_stream_manager.h" @@ -47,11 +50,19 @@ Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state, _rows_in_partitions.assign(p._partition_count, 0); + _build_expr_ctxs.resize(p._build_expr_ctxs.size()); + for (size_t i = 0; i < _build_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(p._build_expr_ctxs[i]->clone(state, _build_expr_ctxs[i])); + } + + _finish_dependency = std::make_shared( + p.operator_id(), p.node_id(), p.get_name() + "_FINISH_DEPENDENCY"); + _spill_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), "HashJoinBuildSpillDependency", true); state->get_task()->add_spill_dependency(_spill_dependency.get()); - _internal_runtime_profile.reset(new RuntimeProfile("internal_profile")); + _internal_runtime_profile = std::make_unique("internal_profile"); _partition_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillPartitionTime", 1); _partition_shuffle_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillPartitionShuffleTime", 1); @@ -60,6 +71,15 @@ Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state, _memory_usage_reserved = ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageReserved", TUnit::BYTES, 1); + _runtime_filter_init_timer = ADD_TIMER_WITH_LEVEL(profile(), "RuntimeFilterInitTime", 1); + _publish_runtime_filter_timer = ADD_TIMER_WITH_LEVEL(profile(), "PublishRuntimeFilterTime", 1); + _runtime_filter_compute_timer = ADD_TIMER_WITH_LEVEL(profile(), "BuildRuntimeFilterTime", 1); + + _runtime_filters.resize(p._runtime_filter_descs.size()); + for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) { + RETURN_IF_ERROR(state->register_producer_runtime_filter( + p._runtime_filter_descs[i], &_runtime_filters[i], _build_expr_ctxs.size() == 1)); + } return Status::OK(); } @@ -86,6 +106,46 @@ Status PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status exec return Status::OK(); } dec_running_big_mem_op_num(state); + + if (_runtime_filters.empty() || _shared_state->need_to_spill) { + return PipelineXSpillSinkLocalState::close(state, exec_status); + } + + if (state->get_task()->wake_up_by_downstream()) { + // partitial ignore rf to make global rf work + _runtime_filter_slots = + std::make_shared(_build_expr_ctxs, _runtime_filters); + RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency)); + RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters()); + } else if (_runtime_filter_slots) { + DCHECK(_runtime_filter_slots != nullptr); + HashJoinBuildSinkLocalState* inner_sink_state = nullptr; + size_t build_rows = 0; + vectorized::Block* build_block = nullptr; + if (_shared_state->inner_runtime_state) { + if (auto* tmp_sink_state = _shared_state->inner_runtime_state->get_sink_local_state()) { + inner_sink_state = assert_cast(tmp_sink_state); + build_block = inner_sink_state->_shared_state->build_block.get(); + build_rows = build_block != nullptr ? build_block->rows() : 0; + } + } + + { + SCOPED_TIMER(_runtime_filter_init_timer); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION( + _runtime_filter_slots->init_filters(state, build_rows)); + RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state)); + } + + if (build_rows > 1) { + SCOPED_TIMER(_runtime_filter_compute_timer); + _runtime_filter_slots->insert(build_block); + } + } + + SCOPED_TIMER(_publish_runtime_filter_timer); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_filter_slots->publish(state, false)); + return PipelineXSpillSinkLocalState::close(state, exec_status); } @@ -116,6 +176,47 @@ size_t PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state return mem_size; } +Status PartitionedHashJoinSinkLocalState::_setup_runtime_filters(RuntimeState* state) { + if (_runtime_filters.empty()) { + return Status::OK(); + } + + DCHECK(_child_eos); + DCHECK(!_shared_state->need_to_spill); + + size_t build_rows = 0; + if (!_shared_state->need_to_spill && _shared_state->inner_runtime_state) { + HashJoinBuildSinkLocalState* inner_sink_state = nullptr; + if (auto* tmp_sink_state = _shared_state->inner_runtime_state->get_sink_local_state()) { + inner_sink_state = assert_cast(tmp_sink_state); + auto* build_block = inner_sink_state->_shared_state->build_block.get(); + build_rows = build_block != nullptr ? build_block->rows() : 0; + _runtime_filter_slots = std::make_shared( + inner_sink_state->_build_expr_ctxs, _runtime_filters); + } + } + + RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, build_rows, _finish_dependency)); + return Status::OK(); +} + +Status PartitionedHashJoinSinkLocalState::_setup_runtime_filters_for_spilling(RuntimeState* state) { + if (_runtime_filters.empty() || _runtime_filter_slots) { + return Status::OK(); + } + + DCHECK(_shared_state->need_to_spill); + _runtime_filter_slots = + std::make_shared(_build_expr_ctxs, _runtime_filters); + RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency)); + RETURN_IF_ERROR(_runtime_filter_slots->disable_filters(state)); + + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_filter_slots->publish(state, false)); + LOG(INFO) << "Query: " << print_id(state->query_id()) << " node: " << _parent->node_id() + << ", task: " << state->task_id() << " disable all runtime filters"; + return Status::OK(); +} + void PartitionedHashJoinSinkLocalState::update_memory_usage() { if (!_shared_state->need_to_spill) { if (_shared_state->inner_shared_state) { @@ -477,7 +578,27 @@ Status PartitionedHashJoinSinkOperatorX::init(const TPlanNode& tnode, RuntimeSta RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(eq_join_conjunct.right, ctx)); _build_exprs.emplace_back(eq_join_conjunct.right); partition_exprs.emplace_back(eq_join_conjunct.right); + + vectorized::VExprContextSPtr build_ctx; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(eq_join_conjunct.right, build_ctx)); + { + // for type check + vectorized::VExprContextSPtr probe_ctx; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(eq_join_conjunct.left, probe_ctx)); + auto build_side_expr_type = build_ctx->root()->data_type(); + auto probe_side_expr_type = probe_ctx->root()->data_type(); + if (!vectorized::make_nullable(build_side_expr_type) + ->equals(*vectorized::make_nullable(probe_side_expr_type))) { + return Status::InternalError( + "build side type {}, not match probe side type {} , node info " + "{}", + build_side_expr_type->get_name(), probe_side_expr_type->get_name(), + this->debug_string(0)); + } + } + _build_expr_ctxs.push_back(build_ctx); } + _partitioner = std::make_unique(_partition_count); RETURN_IF_ERROR(_partitioner->init(_build_exprs)); @@ -489,6 +610,8 @@ Status PartitionedHashJoinSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(_inner_sink_operator->set_child(_child)); RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc())); RETURN_IF_ERROR(_partitioner->open(state)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_build_expr_ctxs, state, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::open(_build_expr_ctxs, state)); return _inner_sink_operator->open(state); } @@ -560,6 +683,10 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B PrettyPrinter::print_bytes(revocable_size)); } + if (need_to_spill) { + RETURN_IF_ERROR(local_state._setup_runtime_filters_for_spilling(state)); + } + if (rows == 0) { if (eos) { if (need_to_spill) { @@ -580,6 +707,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B // even if memory is low, and will cause cancel of queries. // So make a check here, if build blocks mem usage is too high, // then trigger revoke memory. + const auto revocable_size = revocable_mem_size(state); auto revocable_memory_high_watermark_percent = state->revocable_memory_high_watermark_percent(); if (revocable_memory_high_watermark_percent > 0 && @@ -603,6 +731,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B print_id(state->query_id()), state->task_id(), node_id(), _inner_sink_operator->get_memory_usage_debug_str( local_state._shared_state->inner_runtime_state.get())); + RETURN_IF_ERROR(local_state._setup_runtime_filters(state)); } std::for_each(local_state._shared_state->partitioned_build_blocks.begin(), @@ -658,6 +787,8 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B print_id(state->query_id()), state->task_id(), node_id(), _inner_sink_operator->get_memory_usage_debug_str( local_state._shared_state->inner_runtime_state.get())); + + RETURN_IF_ERROR(local_state._setup_runtime_filters(state)); local_state._dependency->set_ready_to_read(); } } diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index b5e28f8b2443b7b..7fd4711acd29c28 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -50,6 +50,7 @@ class PartitionedHashJoinSinkLocalState size_t revocable_mem_size(RuntimeState* state) const; [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos); void update_memory_usage(); + Dependency* finishdependency() override { return _finish_dependency.get(); } protected: PartitionedHashJoinSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) @@ -66,11 +67,18 @@ class PartitionedHashJoinSinkLocalState Status _finish_spilling(); + Status _setup_runtime_filters(RuntimeState* state); + Status _setup_runtime_filters_for_spilling(RuntimeState* state); + friend class PartitionedHashJoinSinkOperatorX; bool _child_eos {false}; + vectorized::VExprContextSPtrs _build_expr_ctxs; std::unique_ptr _partitioner; + std::vector> _runtime_filters; + std::shared_ptr _runtime_filter_slots; + std::shared_ptr _finish_dependency; std::unique_ptr _internal_runtime_profile; @@ -79,6 +87,9 @@ class PartitionedHashJoinSinkLocalState RuntimeProfile::Counter* _spill_build_timer = nullptr; RuntimeProfile::Counter* _in_mem_rows_counter = nullptr; RuntimeProfile::Counter* _memory_usage_reserved = nullptr; + RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr; + RuntimeProfile::Counter* _runtime_filter_compute_timer = nullptr; + RuntimeProfile::Counter* _runtime_filter_init_timer = nullptr; }; class PartitionedHashJoinSinkOperatorX @@ -142,6 +153,7 @@ class PartitionedHashJoinSinkOperatorX const TJoinDistributionType::type _join_distribution; std::vector _build_exprs; + vectorized::VExprContextSPtrs _build_expr_ctxs; std::shared_ptr _inner_sink_operator; std::shared_ptr _inner_probe_operator; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 42d2640441e03ba..1ebb71a9b47f23f 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1363,7 +1363,6 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo const auto enable_spill = _runtime_state->enable_spill(); if (enable_spill && !is_broadcast_join) { auto tnode_ = tnode; - /// TODO: support rf in partitioned hash join tnode_.runtime_filters.clear(); const uint32_t partition_count = 32; auto inner_probe_operator = @@ -1389,12 +1388,12 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); auto sink_operator = std::make_shared( - pool, next_sink_operator_id(), tnode_, descs, partition_count); + pool, next_sink_operator_id(), tnode, descs, partition_count); sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator); DataSinkOperatorPtr sink = std::move(sink_operator); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); - RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get())); + RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get())); _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index ade4f228850dc23..0eb3483a5ddab21 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -395,7 +395,10 @@ void WorkloadGroupMgr::handle_paused_queries() { if (!has_changed_hard_limit) { update_queries_limit_(wg, true); has_changed_hard_limit = true; - LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) << " reserve memory(" + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) << " usage: " + << PrettyPrinter::print_bytes( + query_ctx->query_mem_tracker->consumption()) + << ", reserve memory(" << PrettyPrinter::print_bytes(query_it->reserve_size_) << ") failed due to workload group memory exceed, " "should set the workload group work in memory insufficent mode, " diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index fd3a753be8c0b35..d6f2c5cc582908f 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -46,6 +46,7 @@ struct RuntimeFilterContext { std::shared_ptr hybrid_set; std::shared_ptr bloom_filter_func; std::shared_ptr bitmap_filter_func; + bool disabled = false; bool ignored = false; std::string err_msg; }; diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 8db8bab16cd499b..5eccc97bf5cf02b 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -595,6 +595,7 @@ message PMergeFilterRequest { optional bool contain_null = 11; optional bool ignored = 12; optional uint64 local_merge_time = 13; + optional bool disabled = 14; // Disable the merged rf if true }; message PMergeFilterResponse { @@ -615,6 +616,7 @@ message PPublishFilterRequest { optional PColumnType column_type = 10; optional bool contain_null = 11; optional bool ignored = 12; + optional bool disabled = 13; // Disable the merged rf if true }; message PPublishFilterRequestV2 { @@ -631,6 +633,7 @@ message PPublishFilterRequestV2 { optional bool ignored = 11; repeated int32 fragment_ids = 12; optional uint64 local_merge_time = 13; + optional bool disabled = 14; // Disable the merged rf if true }; message PPublishFilterResponse {