From 163b4af4239776dcf2f81f9b70a221a4ea41e482 Mon Sep 17 00:00:00 2001 From: 924060929 <924060929@qq.com> Date: Sat, 22 Jun 2024 01:34:45 +0800 Subject: [PATCH] fix local shuffle with bucket shuffle join --- .../worker/job/AbstractUnassignedScanJob.java | 9 +++--- .../nereids/worker/job/BucketScanSource.java | 5 ++- .../job/UnassignedScanBucketOlapTableJob.java | 22 ++++++++++--- .../distribute/local_shuffle.out | 5 +++ .../distribute/local_shuffle.groovy | 32 +++++++++++++++++++ 5 files changed, 61 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AbstractUnassignedScanJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AbstractUnassignedScanJob.java index 7b384ffb9aa13aa..bd0a46779813239 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AbstractUnassignedScanJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AbstractUnassignedScanJob.java @@ -100,14 +100,15 @@ protected List insideMachineParallelization( // |(share scan node, and local shuffle data to other local instances to parallel compute this data)| // +------------------------------------------------------------------------------------------------+ ScanSource shareScanSource = instanceToScanRanges.get(0); + + // one scan range generate multiple instances, + // different instances reference the same scan source + int shareScanId = shareScanIdGenerator.getAndIncrement(); for (int i = 0; i < instanceNum; i++) { - // one scan range generate multiple instances, - // different instances reference the same scan source LocalShuffleAssignedJob instance = new LocalShuffleAssignedJob( - instanceIndexInFragment++, shareScanIdGenerator.get(), this, worker, shareScanSource); + instanceIndexInFragment++, shareScanId, this, worker, shareScanSource); instances.add(instance); } - shareScanIdGenerator.incrementAndGet(); } else { // split the scanRanges to some partitions, one partition for one instance // for example: diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/BucketScanSource.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/BucketScanSource.java index 31f6014167ba501..51f3e9031502811 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/BucketScanSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/BucketScanSource.java @@ -21,7 +21,6 @@ import org.apache.doris.planner.ScanNode; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -96,7 +95,7 @@ public List parallelize(List scanNodes, int instanceNum) { // rebuild BucketScanSource for each instance ImmutableList.Builder instancesScanSource = ImmutableList.builder(); for (List>> oneInstanceScanBuckets : scanBucketsPerInstance) { - ImmutableMap.Builder> bucketsScanSources = ImmutableMap.builder(); + Map> bucketsScanSources = Maps.newLinkedHashMap(); for (Entry> bucketIndexToScanNodeToScanRange : oneInstanceScanBuckets) { Integer bucketIndex = bucketIndexToScanNodeToScanRange.getKey(); Map scanNodeToScanRanges = bucketIndexToScanNodeToScanRange.getValue(); @@ -104,7 +103,7 @@ public List parallelize(List scanNodes, int instanceNum) { } instancesScanSource.add(new BucketScanSource( - bucketsScanSources.build() + bucketsScanSources )); } return instancesScanSource.build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanBucketOlapTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanBucketOlapTableJob.java index acb1daee1dcc7a5..dd45cd51f4197d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanBucketOlapTableJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanBucketOlapTableJob.java @@ -205,19 +205,31 @@ private List fillUpInstances( scanEmptyBuckets.put(bucketIndex, scanTableWithEmptyData); } - AssignedJob fillUpInstance; + AssignedJob fillUpInstance = null; Worker worker = workerToBuckets.getKey(); BucketScanSource scanSource = new BucketScanSource(scanEmptyBuckets); if (useLocalShuffle) { - fillUpInstance = new LocalShuffleAssignedJob( - newInstances.size(), shareScanIdGenerator.getAndIncrement(), this, worker, scanSource - ); + boolean mergedBucketsInSameWorkerInstance = false; + for (AssignedJob newInstance : newInstances) { + if (newInstance.getAssignedWorker().equals(worker)) { + BucketScanSource bucketScanSource = (BucketScanSource) newInstance.getScanSource(); + bucketScanSource.bucketIndexToScanNodeToTablets.putAll(scanEmptyBuckets); + mergedBucketsInSameWorkerInstance = true; + } + } + if (!mergedBucketsInSameWorkerInstance) { + fillUpInstance = new LocalShuffleAssignedJob( + newInstances.size(), shareScanIdGenerator.getAndIncrement(), this, worker, scanSource + ); + } } else { fillUpInstance = assignWorkerAndDataSources( newInstances.size(), worker, scanSource ); } - newInstances.add(fillUpInstance); + if (fillUpInstance != null) { + newInstances.add(fillUpInstance); + } } return newInstances; } diff --git a/regression-test/data/nereids_syntax_p0/distribute/local_shuffle.out b/regression-test/data/nereids_syntax_p0/distribute/local_shuffle.out index 93a1f4ae1e0719d..7b83b20f270fab4 100644 --- a/regression-test/data/nereids_syntax_p0/distribute/local_shuffle.out +++ b/regression-test/data/nereids_syntax_p0/distribute/local_shuffle.out @@ -16,6 +16,7 @@ 2 2 2 2 -- !bucket_shuffle_with_prune_tablets -- +\N \N 2 2 \N \N 3 3 -- !bucket_shuffle_with_prune_tablets2 -- @@ -25,3 +26,7 @@ -- !bucket_shuffle_with_prune_tablets3 -- 1 1 \N \N +-- !fillup_bucket -- +\N 2 +\N 3 + diff --git a/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy b/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy index ff52b5c549e4b0f..c5dbf65a1baa4ea 100644 --- a/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy +++ b/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy @@ -124,4 +124,36 @@ suite("local_shuffle") { test_local_shuffle2 on a.id=test_local_shuffle2.id2 """ + + multi_sql """ + drop table if exists test_outer_join_decimal1; + CREATE TABLE IF NOT EXISTS test_outer_join_decimal1 ( + c0 int + ) + DISTRIBUTED BY HASH (c0) BUCKETS 10 PROPERTIES ("replication_num" = "1"); + + drop table if exists test_outer_join_decimal2; + CREATE TABLE IF NOT EXISTS test_outer_join_decimal2 ( + c0 int + ) + DISTRIBUTED BY HASH (c0) BUCKETS 10 PROPERTIES ("replication_num" = "1"); + INSERT INTO test_outer_join_decimal1 (c0) VALUES (1), (3); + INSERT INTO test_outer_join_decimal2 (c0) VALUES (2), (3); + + sync; + + set enable_nereids_distribute_planner=true; + set enable_pipeline_x_engine=true; + set disable_join_reorder=true; + set enable_local_shuffle=true; + set force_to_local_shuffle=true; + """ + + order_qt_fillup_bucket """ + SELECT cast(a.c0 as int), cast(b.c0 as int) FROM + (select * from test_outer_join_decimal1 where c0 =1)a + RIGHT OUTER JOIN + (select * from test_outer_join_decimal2)b + ON a.c0 = b.c0 + """ }