From 92787fe13b752a1c5d73333e2518b4858b6c903b Mon Sep 17 00:00:00 2001 From: 924060929 <924060929@qq.com> Date: Wed, 9 Oct 2024 20:50:24 +0800 Subject: [PATCH] share hash table --- .../trees/plans/distribute/DistributePlanner.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java index 65bc401b7dbcf6a..13e00eb69672f7e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java @@ -116,14 +116,13 @@ private void linkPipelinePlan( PipelineDistributedPlan senderPlan, ExchangeNode linkNode, boolean enableShareHashTableForBroadcastJoin) { - List receiverInstances = filterInstancesWhichCanReceiveDataFromRemote(receiverPlan); + List receiverInstances = filterInstancesWhichCanReceiveDataFromRemote( + receiverPlan, enableShareHashTableForBroadcastJoin, linkNode); boolean receiveSideIsBucketShuffleJoinSide = receiverPlan.getFragmentJob() instanceof UnassignedScanBucketOlapTableJob; if (receiveSideIsBucketShuffleJoinSide) { receiverInstances = getDestinationsByBuckets(receiverPlan, receiverInstances); - } else if (enableShareHashTableForBroadcastJoin && linkNode.isRightChildOfBroadcastHashJoin()) { - receiverInstances = getFirstInstancePerWorker(receiverInstances); } senderPlan.setDestinations(receiverInstances); } @@ -136,11 +135,16 @@ private List getDestinationsByBuckets( return sortDestinationInstancesByBuckets(joinSide, receiverInstances, bucketNum); } - private List filterInstancesWhichCanReceiveDataFromRemote(PipelineDistributedPlan receiverPlan) { + private List filterInstancesWhichCanReceiveDataFromRemote( + PipelineDistributedPlan receiverPlan, + boolean enableShareHashTableForBroadcastJoin, + ExchangeNode linkNode) { boolean useLocalShuffle = receiverPlan.getInstanceJobs().stream() .anyMatch(LocalShuffleAssignedJob.class::isInstance); if (useLocalShuffle) { return getFirstInstancePerShareScan(receiverPlan); + } else if (enableShareHashTableForBroadcastJoin && linkNode.isRightChildOfBroadcastHashJoin()) { + return getFirstInstancePerShareScan(receiverPlan); } else { return receiverPlan.getInstanceJobs(); }