From 7a1de3986290406bb86df21ea02df742b38f3dd0 Mon Sep 17 00:00:00 2001 From: Pxl Date: Tue, 3 Dec 2024 15:42:07 +0800 Subject: [PATCH] [Bug](runtime-filter) fix wrong build_bf_exactly when sync filter size disabled (#44716) fix wrong build_bf_exactly when sync filter size disabled introduced by #44169 --- be/src/exprs/runtime_filter.cpp | 20 +++-------- be/src/exprs/runtime_filter.h | 5 ++- be/src/pipeline/exec/hashjoin_build_sink.cpp | 4 +-- .../exec/nested_loop_join_build_operator.cpp | 2 +- be/src/runtime/runtime_filter_mgr.cpp | 22 ++++++------ be/src/runtime/runtime_filter_mgr.h | 8 ++--- be/src/runtime/runtime_state.cpp | 13 ++++--- be/src/runtime/runtime_state.h | 3 +- .../translator/RuntimeFilterTranslator.java | 1 + .../apache/doris/planner/RuntimeFilter.java | 35 +++++++++++++++++-- gensrc/thrift/PlanNodes.thrift | 4 ++- 11 files changed, 66 insertions(+), 51 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 2169ec727b2428..6eee685b0d94f8 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -982,11 +982,10 @@ class RuntimePredicateWrapper { Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, const TQueryOptions* query_options, const RuntimeFilterRole role, - int node_id, std::shared_ptr* res, - bool build_bf_exactly) { + int node_id, std::shared_ptr* res) { *res = std::make_shared(state, desc); (*res)->set_role(role); - return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly); + return (*res)->init_with_desc(desc, query_options, node_id); } RuntimeFilterContextSPtr& IRuntimeFilter::get_shared_context_ref() { @@ -1348,7 +1347,7 @@ std::string IRuntimeFilter::formatted_state() const { } Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, - int node_id, bool build_bf_exactly) { + int node_id) { // if node_id == -1 , it shouldn't be a consumer DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer())); @@ -1370,21 +1369,10 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue params.runtime_bloom_filter_max_size = options->__isset.runtime_bloom_filter_max_size ? options->runtime_bloom_filter_max_size : 0; - auto sync_filter_size = desc->__isset.sync_filter_size && desc->sync_filter_size; - // We build runtime filter by exact distinct count iff three conditions are met: - // 1. Only 1 join key - // 2. Bloom filter - // 3. Size of all bloom filters will be same (size will be sync or this is a broadcast join). - params.build_bf_exactly = - build_bf_exactly && (_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER || - _runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER); + params.build_bf_exactly = desc->__isset.build_bf_exactly && desc->build_bf_exactly; params.bloom_filter_size_calculated_by_ndv = desc->bloom_filter_size_calculated_by_ndv; - if (!sync_filter_size) { - params.build_bf_exactly &= !_is_broadcast_join; - } - if (desc->__isset.bloom_filter_size_bytes) { params.bloom_filter_size = desc->bloom_filter_size_bytes; } diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 8b52031d76566d..f8c8f073004d11 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -212,8 +212,7 @@ class IRuntimeFilter { static Status create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, const TQueryOptions* query_options, const RuntimeFilterRole role, - int node_id, std::shared_ptr* res, - bool build_bf_exactly = false); + int node_id, std::shared_ptr* res); RuntimeFilterContextSPtr& get_shared_context_ref(); @@ -259,7 +258,7 @@ class IRuntimeFilter { // init filter with desc Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, - int node_id = -1, bool build_bf_exactly = false); + int node_id = -1); // serialize _wrapper to protobuf Status serialize(PMergeFilterRequest* request, void** data, int* len); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 46272ea1328731..1db31b87c29db2 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -91,8 +91,8 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _hash_table_init(state); _runtime_filters.resize(p._runtime_filter_descs.size()); for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) { - RETURN_IF_ERROR(state->register_producer_runtime_filter( - p._runtime_filter_descs[i], &_runtime_filters[i], _build_expr_ctxs.size() == 1)); + RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i], + &_runtime_filters[i])); } _runtime_filter_slots = diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp index c9f7ee7cf5ecf9..7b531fcd2d5bfd 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp @@ -67,7 +67,7 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta _runtime_filters.resize(p._runtime_filter_descs.size()); for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) { RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i], - &_runtime_filters[i], false)); + &_runtime_filters[i])); } return Status::OK(); } diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index a6883660be7723..55ddf8ea56e5e2 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -90,7 +90,7 @@ std::vector> RuntimeFilterMgr::get_consume_filte Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, int node_id, std::shared_ptr* consumer_filter, - bool build_bf_exactly, bool need_local_merge) { + bool need_local_merge) { SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; bool has_exist = false; @@ -110,7 +110,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc if (!has_exist) { std::shared_ptr filter; RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, RuntimeFilterRole::CONSUMER, - node_id, &filter, build_bf_exactly)); + node_id, &filter)); _consumer_map[key].emplace_back(node_id, filter); *consumer_filter = filter; } else if (!need_local_merge) { @@ -122,7 +122,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc Status RuntimeFilterMgr::register_local_merge_producer_filter( const doris::TRuntimeFilterDesc& desc, const doris::TQueryOptions& options, - std::shared_ptr producer_filter, bool build_bf_exactly) { + std::shared_ptr producer_filter) { DCHECK(_is_global); SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; @@ -143,8 +143,7 @@ Status RuntimeFilterMgr::register_local_merge_producer_filter( if (iter->second.filters.empty()) { std::shared_ptr merge_filter; RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, - RuntimeFilterRole::PRODUCER, -1, &merge_filter, - build_bf_exactly)); + RuntimeFilterRole::PRODUCER, -1, &merge_filter)); merge_filter->set_ignored(); iter->second.filters.emplace_back(merge_filter); } @@ -181,10 +180,9 @@ doris::LocalMergeFilters* RuntimeFilterMgr::get_local_merge_producer_filters(int return &iter->second; } -Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc, - const TQueryOptions& options, - std::shared_ptr* producer_filter, - bool build_bf_exactly) { +Status RuntimeFilterMgr::register_producer_filter( + const TRuntimeFilterDesc& desc, const TQueryOptions& options, + std::shared_ptr* producer_filter) { DCHECK(!_is_global); SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; @@ -196,7 +194,7 @@ Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc return Status::InvalidArgument("filter has registed"); } RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, RuntimeFilterRole::PRODUCER, -1, - producer_filter, build_bf_exactly)); + producer_filter)); _producer_map.emplace(key, *producer_filter); return Status::OK(); } @@ -233,8 +231,8 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( cnt_val->filter = cnt_val->pool->add(new IRuntimeFilter(_state, runtime_filter_desc)); auto filter_id = runtime_filter_desc->filter_id; - RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options, - -1, false)); + RETURN_IF_ERROR( + cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options, -1)); cnt_val->filter->set_ignored(); _filter_map.emplace(filter_id, cnt_val); return Status::OK(); diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 0a6f8318feaba0..9f4cf5f4e22a07 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -100,19 +100,17 @@ class RuntimeFilterMgr { // register filter Status register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, int node_id, std::shared_ptr* consumer_filter, - bool build_bf_exactly = false, bool need_local_merge = false); + bool need_local_merge = false); Status register_local_merge_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, - std::shared_ptr producer_filter, - bool build_bf_exactly = false); + std::shared_ptr producer_filter); Status get_local_merge_producer_filters(int filter_id, LocalMergeFilters** local_merge_filters); LocalMergeFilters* get_local_merge_producer_filters(int filter_id); Status register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, - std::shared_ptr* producer_filter, - bool build_bf_exactly = false); + std::shared_ptr* producer_filter); // update filter by remote void set_runtime_filter_params(const TRuntimeFilterParams& runtime_filter_params); diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index a22ad18ce04fb4..80b018a4a19d62 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -522,14 +522,13 @@ RuntimeFilterMgr* RuntimeState::global_runtime_filter_mgr() { } Status RuntimeState::register_producer_runtime_filter( - const TRuntimeFilterDesc& desc, std::shared_ptr* producer_filter, - bool build_bf_exactly) { + const TRuntimeFilterDesc& desc, std::shared_ptr* producer_filter) { // Producers are created by local runtime filter mgr and shared by global runtime filter manager. // When RF is published, consumers in both global and local RF mgr will be found. - RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter( - desc, query_options(), producer_filter, build_bf_exactly)); + RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(desc, query_options(), + producer_filter)); RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter( - desc, query_options(), *producer_filter, build_bf_exactly)); + desc, query_options(), *producer_filter)); return Status::OK(); } @@ -538,10 +537,10 @@ Status RuntimeState::register_consumer_runtime_filter( std::shared_ptr* consumer_filter) { if (desc.has_remote_targets || need_local_merge) { return global_runtime_filter_mgr()->register_consumer_filter(desc, query_options(), node_id, - consumer_filter, false, true); + consumer_filter, true); } else { return local_runtime_filter_mgr()->register_consumer_filter(desc, query_options(), node_id, - consumer_filter, false, false); + consumer_filter, false); } } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index e1891c23a4e014..ab2a193cec4eb1 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -561,8 +561,7 @@ class RuntimeState { } Status register_producer_runtime_filter(const doris::TRuntimeFilterDesc& desc, - std::shared_ptr* producer_filter, - bool build_bf_exactly); + std::shared_ptr* producer_filter); Status register_consumer_runtime_filter(const doris::TRuntimeFilterDesc& desc, bool need_local_merge, int node_id, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java index 3dbd6cfcec7917..07e0af601739b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java @@ -143,6 +143,7 @@ public void createLegacyRuntimeFilter(RuntimeFilter filter, JoinNodeBase node, P targetTupleIdMapList, context.getLimits()); if (node instanceof HashJoinNode) { origFilter.setIsBroadcast(((HashJoinNode) node).getDistributionMode() == DistributionMode.BROADCAST); + origFilter.setSingleEq(((HashJoinNode) node).getEqJoinConjuncts().size()); } else { // nest loop join origFilter.setIsBroadcast(true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java index 2f3948aee161b8..80497798083dc7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java @@ -112,6 +112,8 @@ public final class RuntimeFilter { private boolean bloomFilterSizeCalculatedByNdv = false; + private boolean singleEq = false; + /** * Internal representation of a runtime filter target. */ @@ -216,9 +218,36 @@ public TRuntimeFilterDesc toThrift() { tFilter.setIsBroadcastJoin(isBroadcastJoin); tFilter.setHasLocalTargets(hasLocalTargets); tFilter.setHasRemoteTargets(hasRemoteTargets); + + boolean hasSerialTargets = false; for (RuntimeFilterTarget target : targets) { tFilter.putToPlanIdToTargetExpr(target.node.getId().asInt(), target.expr.treeToThrift()); + hasSerialTargets = hasSerialTargets + || (target.node.isSerialOperator() && target.node.fragment.useSerialSource(ConnectContext.get())); } + + boolean enableSyncFilterSize = ConnectContext.get() != null + && ConnectContext.get().getSessionVariable().enableSyncRuntimeFilterSize(); + + // there are two cases has local exchange between join and scan + // 1. hasRemoteTargets is true means join probe side do least once shuffle (has shuffle between join and scan) + // 2. hasSerialTargets is true means scan is pooled (has local shuffle between join and scan) + boolean needShuffle = hasRemoteTargets || hasSerialTargets; + + // There are two cases where all instances of rf have the same size. + // 1. enableSyncFilterSize is true means backends will collect global size and send to every instance + // 2. isBroadcastJoin is true means each join node instance have the same full amount of data + boolean hasGlobalSize = enableSyncFilterSize || isBroadcastJoin; + + // build runtime filter by exact distinct count if all of 3 conditions are met: + // 1. only single eq conjunct + // 2. rf type may be bf + // 3. each filter only acts on self instance(do not need any shuffle), or size of + // all filters will be same + boolean buildBfExactly = singleEq && (runtimeFilterType == TRuntimeFilterType.IN_OR_BLOOM + || runtimeFilterType == TRuntimeFilterType.BLOOM) && (!needShuffle || hasGlobalSize); + tFilter.setBuildBfExactly(buildBfExactly); + tFilter.setType(runtimeFilterType); tFilter.setBloomFilterSizeBytes(filterSizeBytes); if (runtimeFilterType.equals(TRuntimeFilterType.BITMAP)) { @@ -239,8 +268,6 @@ public TRuntimeFilterDesc toThrift() { tFilter.setNullAware(false); } } - tFilter.setSyncFilterSize(ConnectContext.get() != null - && ConnectContext.get().getSessionVariable().enableSyncRuntimeFilterSize()); return tFilter; } @@ -597,6 +624,10 @@ public void addTarget(RuntimeFilterTarget target) { targets.add(target); } + public void setSingleEq(int eqJoinConjunctsNumbers) { + singleEq = (eqJoinConjunctsNumbers == 1); + } + public void setIsBroadcast(boolean isBroadcast) { isBroadcastJoin = isBroadcast; } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 9aaa7076901ce6..70c6722b9d8580 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -1301,7 +1301,9 @@ struct TRuntimeFilterDesc { // true, if join type is null aware like <=>. rf should dispose the case 15: optional bool null_aware; - 16: optional bool sync_filter_size; + 16: optional bool sync_filter_size; // Deprecated + + 17: optional bool build_bf_exactly; }