From 844cbf810ab273e5fb9dad46b6e7f48190ee2ebd Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Thu, 28 Nov 2024 12:53:52 +0800 Subject: [PATCH 1/5] fix wrong build_bf_exactly when sync filter size disabled --- be/src/exprs/runtime_filter.cpp | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 6eed49e8567e1c..430c6f910e5932 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1371,16 +1371,13 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue // 1. Only 1 join key // 2. Bloom filter // 3. Size of all bloom filters will be same (size will be sync or this is a broadcast join). - params.build_bf_exactly = - build_bf_exactly && (_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER || - _runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER); + params.build_bf_exactly = build_bf_exactly && + (_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER || + _runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) && + (sync_filter_size || _is_broadcast_join); params.bloom_filter_size_calculated_by_ndv = desc->bloom_filter_size_calculated_by_ndv; - if (!sync_filter_size) { - params.build_bf_exactly &= !_is_broadcast_join; - } - if (desc->__isset.bloom_filter_size_bytes) { params.bloom_filter_size = desc->bloom_filter_size_bytes; } From 4bbedc9fe960e47e7ca92b6d01e3269e1213b908 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Thu, 28 Nov 2024 15:44:49 +0800 Subject: [PATCH 2/5] update --- be/src/exprs/runtime_filter.cpp | 17 ++------ be/src/exprs/runtime_filter.h | 5 +-- be/src/pipeline/exec/hashjoin_build_sink.cpp | 4 +- .../exec/nested_loop_join_build_operator.cpp | 2 +- be/src/runtime/runtime_filter_mgr.cpp | 20 +++++----- be/src/runtime/runtime_filter_mgr.h | 8 ++-- be/src/runtime/runtime_state.cpp | 13 +++---- be/src/runtime/runtime_state.h | 3 +- .../translator/RuntimeFilterTranslator.java | 1 + .../apache/doris/planner/RuntimeFilter.java | 39 ++++++++++++++++++- gensrc/thrift/PlanNodes.thrift | 4 +- 11 files changed, 69 insertions(+), 47 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 430c6f910e5932..b491158513ec39 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -975,11 +975,10 @@ class RuntimePredicateWrapper { Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, const TQueryOptions* query_options, const RuntimeFilterRole role, - int node_id, std::shared_ptr* res, - bool build_bf_exactly) { + int node_id, std::shared_ptr* res) { *res = std::make_shared(state, desc); (*res)->set_role(role); - return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly); + return (*res)->init_with_desc(desc, query_options, node_id); } RuntimeFilterContextSPtr& IRuntimeFilter::get_shared_context_ref() { @@ -1344,7 +1343,7 @@ std::string IRuntimeFilter::formatted_state() const { } Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, - int node_id, bool build_bf_exactly) { + int node_id) { // if node_id == -1 , it shouldn't be a consumer DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer())); @@ -1366,16 +1365,8 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue params.runtime_bloom_filter_max_size = options->__isset.runtime_bloom_filter_max_size ? options->runtime_bloom_filter_max_size : 0; - auto sync_filter_size = desc->__isset.sync_filter_size && desc->sync_filter_size; - // We build runtime filter by exact distinct count if all of 3 conditions are met: - // 1. Only 1 join key - // 2. Bloom filter - // 3. Size of all bloom filters will be same (size will be sync or this is a broadcast join). - params.build_bf_exactly = build_bf_exactly && - (_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER || - _runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) && - (sync_filter_size || _is_broadcast_join); + params.build_bf_exactly = desc->build_bf_exactly; params.bloom_filter_size_calculated_by_ndv = desc->bloom_filter_size_calculated_by_ndv; if (desc->__isset.bloom_filter_size_bytes) { diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 50ee52865be6d6..1223c4a36c85fe 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -213,8 +213,7 @@ class IRuntimeFilter { static Status create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, const TQueryOptions* query_options, const RuntimeFilterRole role, - int node_id, std::shared_ptr* res, - bool build_bf_exactly = false); + int node_id, std::shared_ptr* res); RuntimeFilterContextSPtr& get_shared_context_ref(); @@ -260,7 +259,7 @@ class IRuntimeFilter { // init filter with desc Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, - int node_id = -1, bool build_bf_exactly = false); + int node_id = -1); // serialize _wrapper to protobuf Status serialize(PMergeFilterRequest* request, void** data, int* len); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 5c1fa9daa0d837..55ec22e83b6fa8 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -91,8 +91,8 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo RETURN_IF_ERROR(_hash_table_init(state)); _runtime_filters.resize(p._runtime_filter_descs.size()); for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) { - RETURN_IF_ERROR(state->register_producer_runtime_filter( - p._runtime_filter_descs[i], &_runtime_filters[i], _build_expr_ctxs.size() == 1)); + RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i], + &_runtime_filters[i])); } _runtime_filter_slots = diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp index 9e3e8a08ca83a5..35b9de619f393d 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp @@ -67,7 +67,7 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta _runtime_filters.resize(p._runtime_filter_descs.size()); for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) { RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i], - &_runtime_filters[i], false)); + &_runtime_filters[i])); } return Status::OK(); } diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index bb100fcbb42ec5..8c5934bb164746 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -90,7 +90,7 @@ std::vector> RuntimeFilterMgr::get_consume_filte Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, int node_id, std::shared_ptr* consumer_filter, - bool build_bf_exactly, bool need_local_merge) { + bool need_local_merge) { SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; bool has_exist = false; @@ -110,7 +110,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc if (!has_exist) { std::shared_ptr filter; RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, RuntimeFilterRole::CONSUMER, - node_id, &filter, build_bf_exactly)); + node_id, &filter)); _consumer_map[key].emplace_back(node_id, filter); *consumer_filter = filter; } else if (!need_local_merge) { @@ -122,7 +122,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc Status RuntimeFilterMgr::register_local_merge_producer_filter( const doris::TRuntimeFilterDesc& desc, const doris::TQueryOptions& options, - std::shared_ptr producer_filter, bool build_bf_exactly) { + std::shared_ptr producer_filter) { DCHECK(_is_global); SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; @@ -143,8 +143,7 @@ Status RuntimeFilterMgr::register_local_merge_producer_filter( if (iter->second.filters.empty()) { std::shared_ptr merge_filter; RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, - RuntimeFilterRole::PRODUCER, -1, &merge_filter, - build_bf_exactly)); + RuntimeFilterRole::PRODUCER, -1, &merge_filter)); merge_filter->set_ignored(); iter->second.filters.emplace_back(merge_filter); } @@ -181,10 +180,9 @@ doris::LocalMergeFilters* RuntimeFilterMgr::get_local_merge_producer_filters(int return &iter->second; } -Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc, - const TQueryOptions& options, - std::shared_ptr* producer_filter, - bool build_bf_exactly) { +Status RuntimeFilterMgr::register_producer_filter( + const TRuntimeFilterDesc& desc, const TQueryOptions& options, + std::shared_ptr* producer_filter) { DCHECK(!_is_global); SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; @@ -196,7 +194,7 @@ Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc return Status::InvalidArgument("filter has registed"); } RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, RuntimeFilterRole::PRODUCER, -1, - producer_filter, build_bf_exactly)); + producer_filter)); _producer_map.emplace(key, *producer_filter); return Status::OK(); } @@ -234,7 +232,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( auto filter_id = runtime_filter_desc->filter_id; RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options, - -1, false)); + -1)); cnt_val->filter->set_ignored(); _filter_map.emplace(filter_id, cnt_val); return Status::OK(); diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 0a6f8318feaba0..9f4cf5f4e22a07 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -100,19 +100,17 @@ class RuntimeFilterMgr { // register filter Status register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, int node_id, std::shared_ptr* consumer_filter, - bool build_bf_exactly = false, bool need_local_merge = false); + bool need_local_merge = false); Status register_local_merge_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, - std::shared_ptr producer_filter, - bool build_bf_exactly = false); + std::shared_ptr producer_filter); Status get_local_merge_producer_filters(int filter_id, LocalMergeFilters** local_merge_filters); LocalMergeFilters* get_local_merge_producer_filters(int filter_id); Status register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, - std::shared_ptr* producer_filter, - bool build_bf_exactly = false); + std::shared_ptr* producer_filter); // update filter by remote void set_runtime_filter_params(const TRuntimeFilterParams& runtime_filter_params); diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 344180bad771ac..75129ea13f597b 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -516,14 +516,13 @@ RuntimeFilterMgr* RuntimeState::global_runtime_filter_mgr() { } Status RuntimeState::register_producer_runtime_filter( - const TRuntimeFilterDesc& desc, std::shared_ptr* producer_filter, - bool build_bf_exactly) { + const TRuntimeFilterDesc& desc, std::shared_ptr* producer_filter) { // Producers are created by local runtime filter mgr and shared by global runtime filter manager. // When RF is published, consumers in both global and local RF mgr will be found. - RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter( - desc, query_options(), producer_filter, build_bf_exactly)); + RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(desc, query_options(), + producer_filter)); RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter( - desc, query_options(), *producer_filter, build_bf_exactly)); + desc, query_options(), *producer_filter)); return Status::OK(); } @@ -532,10 +531,10 @@ Status RuntimeState::register_consumer_runtime_filter( std::shared_ptr* consumer_filter) { if (desc.has_remote_targets || need_local_merge) { return global_runtime_filter_mgr()->register_consumer_filter(desc, query_options(), node_id, - consumer_filter, false, true); + consumer_filter, true); } else { return local_runtime_filter_mgr()->register_consumer_filter(desc, query_options(), node_id, - consumer_filter, false, false); + consumer_filter, false); } } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 0bc81bca4d99a1..ad63510e2af82c 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -561,8 +561,7 @@ class RuntimeState { } Status register_producer_runtime_filter(const doris::TRuntimeFilterDesc& desc, - std::shared_ptr* producer_filter, - bool build_bf_exactly); + std::shared_ptr* producer_filter); Status register_consumer_runtime_filter(const doris::TRuntimeFilterDesc& desc, bool need_local_merge, int node_id, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java index 3dbd6cfcec7917..07e0af601739b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java @@ -143,6 +143,7 @@ public void createLegacyRuntimeFilter(RuntimeFilter filter, JoinNodeBase node, P targetTupleIdMapList, context.getLimits()); if (node instanceof HashJoinNode) { origFilter.setIsBroadcast(((HashJoinNode) node).getDistributionMode() == DistributionMode.BROADCAST); + origFilter.setSingleEq(((HashJoinNode) node).getEqJoinConjuncts().size()); } else { // nest loop join origFilter.setIsBroadcast(true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java index 2f3948aee161b8..da0c934f2c853a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java @@ -112,6 +112,8 @@ public final class RuntimeFilter { private boolean bloomFilterSizeCalculatedByNdv = false; + private boolean singleEq = false; + /** * Internal representation of a runtime filter target. */ @@ -216,9 +218,36 @@ public TRuntimeFilterDesc toThrift() { tFilter.setIsBroadcastJoin(isBroadcastJoin); tFilter.setHasLocalTargets(hasLocalTargets); tFilter.setHasRemoteTargets(hasRemoteTargets); + + boolean hasSerialTargets = false; for (RuntimeFilterTarget target : targets) { tFilter.putToPlanIdToTargetExpr(target.node.getId().asInt(), target.expr.treeToThrift()); + hasSerialTargets = hasSerialTargets + || (target.node.isSerialOperator() && target.node.fragment.useSerialSource(ConnectContext.get())); } + + boolean enableSyncFilterSize = ConnectContext.get() != null + && ConnectContext.get().getSessionVariable().enableSyncRuntimeFilterSize(); + // there are two cases need merge rf + // 1. hasRemoteTargets is true means join type is hash shuffle join then rf will + // merged into one + // 2. hasSerialTargets is true means scan is pooled then rf need merged into one + boolean needMerge = hasRemoteTargets || hasSerialTargets; + + // There are two cases where all instances of rf have the same size. + // 1. enableSyncFilterSize is true means backends will collect global size and send to every instance + // 2. isBroadcastJoin is true means each join node instance have the same full amount of data + boolean hasGlobalSize = enableSyncFilterSize || isBroadcastJoin; + + // build runtime filter by exact distinct count if all of 3 conditions are met: + // 1. only single eq conjunct + // 2. rf type may be bf + // 3. each filter only acts on self instance(do not need any merge), or size of + // all filters will be same + boolean buildBfExactly = singleEq && (runtimeFilterType == TRuntimeFilterType.IN_OR_BLOOM + || runtimeFilterType == TRuntimeFilterType.BLOOM) && (!needMerge || hasGlobalSize); + tFilter.setBuildBfExactly(buildBfExactly); + tFilter.setType(runtimeFilterType); tFilter.setBloomFilterSizeBytes(filterSizeBytes); if (runtimeFilterType.equals(TRuntimeFilterType.BITMAP)) { @@ -239,8 +268,6 @@ public TRuntimeFilterDesc toThrift() { tFilter.setNullAware(false); } } - tFilter.setSyncFilterSize(ConnectContext.get() != null - && ConnectContext.get().getSessionVariable().enableSyncRuntimeFilterSize()); return tFilter; } @@ -597,6 +624,14 @@ public void addTarget(RuntimeFilterTarget target) { targets.add(target); } + public void setSingleEq(int eqJoinConjunctsNumbers) { + singleEq = (eqJoinConjunctsNumbers == 1); + } + + public boolean getSingleEq() { + return singleEq; + } + public void setIsBroadcast(boolean isBroadcast) { isBroadcastJoin = isBroadcast; } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 0bbd364fda1c2a..961433ab28d7e4 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -1293,7 +1293,9 @@ struct TRuntimeFilterDesc { // true, if join type is null aware like <=>. rf should dispose the case 15: optional bool null_aware; - 16: optional bool sync_filter_size; + 16: optional bool sync_filter_size; // Deprecated + + 17: optional bool build_bf_exactly; } From 5a0245266bb1896a52441c397da755b65e75f931 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Thu, 28 Nov 2024 15:47:51 +0800 Subject: [PATCH 3/5] format --- be/src/runtime/runtime_filter_mgr.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 8c5934bb164746..c16db7c67d3420 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -231,8 +231,8 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( cnt_val->filter = cnt_val->pool->add(new IRuntimeFilter(_state, runtime_filter_desc)); auto filter_id = runtime_filter_desc->filter_id; - RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options, - -1)); + RETURN_IF_ERROR( + cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options, -1)); cnt_val->filter->set_ignored(); _filter_map.emplace(filter_id, cnt_val); return Status::OK(); From f8b8ad3c65f93762e97cc32fe12ae2d2899c7aca Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Mon, 2 Dec 2024 11:30:31 +0800 Subject: [PATCH 4/5] update --- be/src/exprs/runtime_filter.cpp | 2 +- .../src/main/java/org/apache/doris/planner/RuntimeFilter.java | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index b491158513ec39..54b13dae313f94 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1366,7 +1366,7 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue ? options->runtime_bloom_filter_max_size : 0; - params.build_bf_exactly = desc->build_bf_exactly; + params.build_bf_exactly = desc->__isset.build_bf_exactly && desc->build_bf_exactly; params.bloom_filter_size_calculated_by_ndv = desc->bloom_filter_size_calculated_by_ndv; if (desc->__isset.bloom_filter_size_bytes) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java index da0c934f2c853a..43b311aeabd8e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java @@ -628,10 +628,6 @@ public void setSingleEq(int eqJoinConjunctsNumbers) { singleEq = (eqJoinConjunctsNumbers == 1); } - public boolean getSingleEq() { - return singleEq; - } - public void setIsBroadcast(boolean isBroadcast) { isBroadcastJoin = isBroadcast; } From f1e4db4dc0176edb7669ebe937a0d0c80a4ce7d2 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Tue, 3 Dec 2024 11:23:05 +0800 Subject: [PATCH 5/5] update comment --- .../org/apache/doris/planner/RuntimeFilter.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java index 43b311aeabd8e3..80497798083dc7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java @@ -228,11 +228,11 @@ public TRuntimeFilterDesc toThrift() { boolean enableSyncFilterSize = ConnectContext.get() != null && ConnectContext.get().getSessionVariable().enableSyncRuntimeFilterSize(); - // there are two cases need merge rf - // 1. hasRemoteTargets is true means join type is hash shuffle join then rf will - // merged into one - // 2. hasSerialTargets is true means scan is pooled then rf need merged into one - boolean needMerge = hasRemoteTargets || hasSerialTargets; + + // there are two cases has local exchange between join and scan + // 1. hasRemoteTargets is true means join probe side do least once shuffle (has shuffle between join and scan) + // 2. hasSerialTargets is true means scan is pooled (has local shuffle between join and scan) + boolean needShuffle = hasRemoteTargets || hasSerialTargets; // There are two cases where all instances of rf have the same size. // 1. enableSyncFilterSize is true means backends will collect global size and send to every instance @@ -242,10 +242,10 @@ public TRuntimeFilterDesc toThrift() { // build runtime filter by exact distinct count if all of 3 conditions are met: // 1. only single eq conjunct // 2. rf type may be bf - // 3. each filter only acts on self instance(do not need any merge), or size of + // 3. each filter only acts on self instance(do not need any shuffle), or size of // all filters will be same boolean buildBfExactly = singleEq && (runtimeFilterType == TRuntimeFilterType.IN_OR_BLOOM - || runtimeFilterType == TRuntimeFilterType.BLOOM) && (!needMerge || hasGlobalSize); + || runtimeFilterType == TRuntimeFilterType.BLOOM) && (!needShuffle || hasGlobalSize); tFilter.setBuildBfExactly(buildBfExactly); tFilter.setType(runtimeFilterType);