Skip to content

Commit

Permalink
share hash table
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Oct 9, 2024
1 parent 27692b7 commit 92787fe
Showing 1 changed file with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,13 @@ private void linkPipelinePlan(
PipelineDistributedPlan senderPlan,
ExchangeNode linkNode,
boolean enableShareHashTableForBroadcastJoin) {
List<AssignedJob> receiverInstances = filterInstancesWhichCanReceiveDataFromRemote(receiverPlan);
List<AssignedJob> receiverInstances = filterInstancesWhichCanReceiveDataFromRemote(
receiverPlan, enableShareHashTableForBroadcastJoin, linkNode);

boolean receiveSideIsBucketShuffleJoinSide
= receiverPlan.getFragmentJob() instanceof UnassignedScanBucketOlapTableJob;
if (receiveSideIsBucketShuffleJoinSide) {
receiverInstances = getDestinationsByBuckets(receiverPlan, receiverInstances);
} else if (enableShareHashTableForBroadcastJoin && linkNode.isRightChildOfBroadcastHashJoin()) {
receiverInstances = getFirstInstancePerWorker(receiverInstances);
}
senderPlan.setDestinations(receiverInstances);
}
Expand All @@ -136,11 +135,16 @@ private List<AssignedJob> getDestinationsByBuckets(
return sortDestinationInstancesByBuckets(joinSide, receiverInstances, bucketNum);
}

private List<AssignedJob> filterInstancesWhichCanReceiveDataFromRemote(PipelineDistributedPlan receiverPlan) {
private List<AssignedJob> filterInstancesWhichCanReceiveDataFromRemote(
PipelineDistributedPlan receiverPlan,
boolean enableShareHashTableForBroadcastJoin,
ExchangeNode linkNode) {
boolean useLocalShuffle = receiverPlan.getInstanceJobs().stream()
.anyMatch(LocalShuffleAssignedJob.class::isInstance);
if (useLocalShuffle) {
return getFirstInstancePerShareScan(receiverPlan);
} else if (enableShareHashTableForBroadcastJoin && linkNode.isRightChildOfBroadcastHashJoin()) {
return getFirstInstancePerShareScan(receiverPlan);
} else {
return receiverPlan.getInstanceJobs();
}
Expand Down

0 comments on commit 92787fe

Please sign in to comment.