Skip to content

Commit

Permalink
fix local shuffle with bucket shuffle join
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Jun 21, 2024
1 parent b0a99fd commit 163b4af
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,15 @@ protected List<AssignedJob> insideMachineParallelization(
// |(share scan node, and local shuffle data to other local instances to parallel compute this data)|
// +------------------------------------------------------------------------------------------------+
ScanSource shareScanSource = instanceToScanRanges.get(0);

// one scan range generate multiple instances,
// different instances reference the same scan source
int shareScanId = shareScanIdGenerator.getAndIncrement();
for (int i = 0; i < instanceNum; i++) {
// one scan range generate multiple instances,
// different instances reference the same scan source
LocalShuffleAssignedJob instance = new LocalShuffleAssignedJob(
instanceIndexInFragment++, shareScanIdGenerator.get(), this, worker, shareScanSource);
instanceIndexInFragment++, shareScanId, this, worker, shareScanSource);
instances.add(instance);
}
shareScanIdGenerator.incrementAndGet();
} else {
// split the scanRanges to some partitions, one partition for one instance
// for example:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.doris.planner.ScanNode;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

Expand Down Expand Up @@ -96,15 +95,15 @@ public List<ScanSource> parallelize(List<ScanNode> scanNodes, int instanceNum) {
// rebuild BucketScanSource for each instance
ImmutableList.Builder<ScanSource> instancesScanSource = ImmutableList.builder();
for (List<Entry<Integer, Map<ScanNode, ScanRanges>>> oneInstanceScanBuckets : scanBucketsPerInstance) {
ImmutableMap.Builder<Integer, Map<ScanNode, ScanRanges>> bucketsScanSources = ImmutableMap.builder();
Map<Integer, Map<ScanNode, ScanRanges>> bucketsScanSources = Maps.newLinkedHashMap();
for (Entry<Integer, Map<ScanNode, ScanRanges>> bucketIndexToScanNodeToScanRange : oneInstanceScanBuckets) {
Integer bucketIndex = bucketIndexToScanNodeToScanRange.getKey();
Map<ScanNode, ScanRanges> scanNodeToScanRanges = bucketIndexToScanNodeToScanRange.getValue();
bucketsScanSources.put(bucketIndex, scanNodeToScanRanges);
}

instancesScanSource.add(new BucketScanSource(
bucketsScanSources.build()
bucketsScanSources
));
}
return instancesScanSource.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,19 +205,31 @@ private List<AssignedJob> fillUpInstances(
scanEmptyBuckets.put(bucketIndex, scanTableWithEmptyData);
}

AssignedJob fillUpInstance;
AssignedJob fillUpInstance = null;
Worker worker = workerToBuckets.getKey();
BucketScanSource scanSource = new BucketScanSource(scanEmptyBuckets);
if (useLocalShuffle) {
fillUpInstance = new LocalShuffleAssignedJob(
newInstances.size(), shareScanIdGenerator.getAndIncrement(), this, worker, scanSource
);
boolean mergedBucketsInSameWorkerInstance = false;
for (AssignedJob newInstance : newInstances) {
if (newInstance.getAssignedWorker().equals(worker)) {
BucketScanSource bucketScanSource = (BucketScanSource) newInstance.getScanSource();
bucketScanSource.bucketIndexToScanNodeToTablets.putAll(scanEmptyBuckets);
mergedBucketsInSameWorkerInstance = true;
}
}
if (!mergedBucketsInSameWorkerInstance) {
fillUpInstance = new LocalShuffleAssignedJob(
newInstances.size(), shareScanIdGenerator.getAndIncrement(), this, worker, scanSource
);
}
} else {
fillUpInstance = assignWorkerAndDataSources(
newInstances.size(), worker, scanSource
);
}
newInstances.add(fillUpInstance);
if (fillUpInstance != null) {
newInstances.add(fillUpInstance);
}
}
return newInstances;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
2 2 2 2

-- !bucket_shuffle_with_prune_tablets --
\N \N 2 2
\N \N 3 3

-- !bucket_shuffle_with_prune_tablets2 --
Expand All @@ -25,3 +26,7 @@
-- !bucket_shuffle_with_prune_tablets3 --
1 1 \N \N

-- !fillup_bucket --
\N 2
\N 3

Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,36 @@ suite("local_shuffle") {
test_local_shuffle2
on a.id=test_local_shuffle2.id2
"""

multi_sql """
drop table if exists test_outer_join_decimal1;
CREATE TABLE IF NOT EXISTS test_outer_join_decimal1 (
c0 int
)
DISTRIBUTED BY HASH (c0) BUCKETS 10 PROPERTIES ("replication_num" = "1");
drop table if exists test_outer_join_decimal2;
CREATE TABLE IF NOT EXISTS test_outer_join_decimal2 (
c0 int
)
DISTRIBUTED BY HASH (c0) BUCKETS 10 PROPERTIES ("replication_num" = "1");
INSERT INTO test_outer_join_decimal1 (c0) VALUES (1), (3);
INSERT INTO test_outer_join_decimal2 (c0) VALUES (2), (3);
sync;
set enable_nereids_distribute_planner=true;
set enable_pipeline_x_engine=true;
set disable_join_reorder=true;
set enable_local_shuffle=true;
set force_to_local_shuffle=true;
"""

order_qt_fillup_bucket """
SELECT cast(a.c0 as int), cast(b.c0 as int) FROM
(select * from test_outer_join_decimal1 where c0 =1)a
RIGHT OUTER JOIN
(select * from test_outer_join_decimal2)b
ON a.c0 = b.c0
"""
}

0 comments on commit 163b4af

Please sign in to comment.