From d82d3915ee76c8a6ebb9a1d352a101c1e4442a4d Mon Sep 17 00:00:00 2001 From: 924060929 <924060929@qq.com> Date: Tue, 25 Jun 2024 21:38:24 +0800 Subject: [PATCH] fix bucket shuffle join --- .../ChildrenPropertiesRegulator.java | 24 ++++++---- .../apache/doris/nereids/util/JoinUtils.java | 22 ++++++++-- .../worker/LoadBalanceScanWorkerSelector.java | 2 +- .../worker/job/UnassignedJobBuilder.java | 2 +- .../apache/doris/planner/PlanFragment.java | 10 +---- .../distribute/local_shuffle.out | 8 +--- .../distribute/local_shuffle.groovy | 44 +++++++++++++++---- .../distribute/shuffle_left_join.groovy | 9 +++- 8 files changed, 83 insertions(+), 38 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java index af9174d31325ed7..bfb6a4952ce68f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java @@ -304,15 +304,23 @@ public Boolean visitPhysicalHashJoin( (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(), (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec())); } else if (leftHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED - && rightHashSpec.getShuffleType() == ShuffleType.NATURAL - && !SessionVariable.canUseNereidsDistributePlanner()) { - // TODO: we must do shuffle on right because coordinator could not do right be selection in this case, - // since it always to check the left most node whether olap scan node. + && rightHashSpec.getShuffleType() == ShuffleType.NATURAL) { + // TODO: we must do shuffle on right because coordinator // after we fix coordinator problem, we could do right to left bucket shuffle - updatedForRight = Optional.of(calAnotherSideRequired( - ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec, - (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(), - (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec())); + if (SessionVariable.canUseNereidsDistributePlanner()) { + // nereids coordinator can exchange left side to right side to do bucket shuffle join + updatedForLeft = Optional.of(calAnotherSideRequired( + ShuffleType.STORAGE_BUCKETED, rightHashSpec, leftHashSpec, + (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(), + (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec())); + } else { + // legacy coordinator could not do right be selection in this case, + // since it always to check the left most node whether olap scan node. + updatedForRight = Optional.of(calAnotherSideRequired( + ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec, + (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(), + (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec())); + } } else if (leftHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED && rightHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED) { if (bothSideShuffleKeysAreSameOrder(rightHashSpec, leftHashSpec, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java index c9067e7cc4643df..986aef4fc176663 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java @@ -25,6 +25,7 @@ import org.apache.doris.nereids.properties.DistributionSpecHash; import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType; import org.apache.doris.nereids.properties.DistributionSpecReplicated; +import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.rules.rewrite.ForeignKeyContext; import org.apache.doris.nereids.trees.expressions.EqualPredicate; import org.apache.doris.nereids.trees.expressions.ExprId; @@ -224,12 +225,25 @@ public static EqualPredicate swapEqualToForChildrenOrder(EqualPredicate equalTo, * return true if we should do bucket shuffle join when translate plan. */ public static boolean shouldBucketShuffleJoin(AbstractPhysicalJoin join) { - DistributionSpec rightDistributionSpec = join.right().getPhysicalProperties().getDistributionSpec(); - if (!(rightDistributionSpec instanceof DistributionSpecHash)) { + if (isStorageBucketed(join.right().getPhysicalProperties())) { + return true; + } else if (SessionVariable.canUseNereidsDistributePlanner() + && isStorageBucketed(join.left().getPhysicalProperties())) { + return true; + } + return false; + } + + private static boolean isStorageBucketed(PhysicalProperties physicalProperties) { + DistributionSpec distributionSpec = physicalProperties.getDistributionSpec(); + if (!(distributionSpec instanceof DistributionSpecHash)) { return false; } - DistributionSpecHash rightHash = (DistributionSpecHash) rightDistributionSpec; - return rightHash.getShuffleType() == ShuffleType.STORAGE_BUCKETED; + DistributionSpecHash rightHash = (DistributionSpecHash) distributionSpec; + if (rightHash.getShuffleType() == ShuffleType.STORAGE_BUCKETED) { + return true; + } + return false; } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java index f3c1657e220ae85..31da2cddb02f65e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java @@ -107,7 +107,7 @@ public Map selectReplicaAndWorkerWithBucket( Function> bucketBytesSupplier = bucketBytesSupplier(); // all are olap scan nodes if (!scanNodes.isEmpty() && scanNodes.size() == olapScanNodes.size()) { - if (olapScanNodes.size() == 1 && fragment.isBucketShuffleJoinInput()) { + if (olapScanNodes.size() == 1 && fragment.hasBucketShuffleJoin()) { return selectForBucket(unassignedJob, scanNodes, bucketScanRangeSupplier, bucketBytesSupplier); } else if (fragment.hasColocatePlanNode()) { return selectForBucket(unassignedJob, scanNodes, bucketScanRangeSupplier, bucketBytesSupplier); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJobBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJobBuilder.java index 6020a00655dc948..71ec4823745346d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJobBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJobBuilder.java @@ -235,7 +235,7 @@ private static boolean shouldAssignByBucket(PlanFragment fragment) { if (fragment.hasColocatePlanNode()) { return true; } - if (enableBucketShuffleJoin() && fragment.isBucketShuffleJoinInput()) { + if (enableBucketShuffleJoin() && fragment.hasBucketShuffleJoin()) { return true; } return false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index 9eac5875b70db28..23b392889dfcff7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -260,14 +260,8 @@ public void setHasColocatePlanNode(boolean hasColocatePlanNode) { this.hasColocatePlanNode = hasColocatePlanNode; } - public boolean isBucketShuffleJoinInput() { - if (hasBucketShuffleJoin.get()) { - return true; - } - if (destNode != null && destNode.getFragment().hasBucketShuffleJoin.get()) { - return true; - } - return false; + public boolean hasBucketShuffleJoin() { + return hasBucketShuffleJoin.get(); } public void setResultSinkType(TResultSinkType resultSinkType) { diff --git a/regression-test/data/nereids_syntax_p0/distribute/local_shuffle.out b/regression-test/data/nereids_syntax_p0/distribute/local_shuffle.out index 7b83b20f270fab4..189034710a342ab 100644 --- a/regression-test/data/nereids_syntax_p0/distribute/local_shuffle.out +++ b/regression-test/data/nereids_syntax_p0/distribute/local_shuffle.out @@ -16,17 +16,13 @@ 2 2 2 2 -- !bucket_shuffle_with_prune_tablets -- -\N \N 2 2 -\N \N 3 3 -- !bucket_shuffle_with_prune_tablets2 -- -2 2 \N \N -3 3 \N \N -- !bucket_shuffle_with_prune_tablets3 -- 1 1 \N \N -- !fillup_bucket -- -\N 2 -\N 3 + +-- !shuffle_left -- diff --git a/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy b/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy index c5dbf65a1baa4ea..fbe2705ad6f20c5 100644 --- a/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy +++ b/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy @@ -126,19 +126,19 @@ suite("local_shuffle") { """ multi_sql """ - drop table if exists test_outer_join_decimal1; - CREATE TABLE IF NOT EXISTS test_outer_join_decimal1 ( + drop table if exists test_local_shuffle3; + CREATE TABLE IF NOT EXISTS test_local_shuffle3 ( c0 int ) DISTRIBUTED BY HASH (c0) BUCKETS 10 PROPERTIES ("replication_num" = "1"); - drop table if exists test_outer_join_decimal2; - CREATE TABLE IF NOT EXISTS test_outer_join_decimal2 ( + drop table if exists test_local_shuffle4; + CREATE TABLE IF NOT EXISTS test_local_shuffle4 ( c0 int ) DISTRIBUTED BY HASH (c0) BUCKETS 10 PROPERTIES ("replication_num" = "1"); - INSERT INTO test_outer_join_decimal1 (c0) VALUES (1), (3); - INSERT INTO test_outer_join_decimal2 (c0) VALUES (2), (3); + INSERT INTO test_local_shuffle3 (c0) VALUES (1), (3); + INSERT INTO test_local_shuffle4 (c0) VALUES (2), (3); sync; @@ -151,9 +151,37 @@ suite("local_shuffle") { order_qt_fillup_bucket """ SELECT cast(a.c0 as int), cast(b.c0 as int) FROM - (select * from test_outer_join_decimal1 where c0 =1)a + (select * from test_local_shuffle3 where c0 =1)a RIGHT OUTER JOIN - (select * from test_outer_join_decimal2)b + (select * from test_local_shuffle4)b ON a.c0 = b.c0 """ + + multi_sql """ + drop table if exists test_shuffle_left_with_local_shuffle; + CREATE TABLE `test_shuffle_left_with_local_shuffle` ( + id int, + id2 int + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + + insert into test_shuffle_left values (1, 1), (2, 2), (3, 4); + """ + + order_qt_shuffle_left """ + select * + from + ( + select id2 + from test_shuffle_left_with_local_shuffle + group by id2 + ) a + inner join [shuffle] + test_shuffle_left_with_local_shuffle b + on a.id2=b.id; + """ } diff --git a/regression-test/suites/nereids_syntax_p0/distribute/shuffle_left_join.groovy b/regression-test/suites/nereids_syntax_p0/distribute/shuffle_left_join.groovy index 48b5a55e2d45d26..8c56c257b0e2d97 100644 --- a/regression-test/suites/nereids_syntax_p0/distribute/shuffle_left_join.groovy +++ b/regression-test/suites/nereids_syntax_p0/distribute/shuffle_left_join.groovy @@ -66,7 +66,12 @@ suite("shuffle_left_join") { def sqlStr = """ select * - from test_shuffle_left a + from + ( + select id2 + from test_shuffle_left + group by id2 + ) a inner join [shuffle] test_shuffle_left b on a.id2=b.id; @@ -90,7 +95,7 @@ suite("shuffle_left_join") { .collect(Collectors.joining("\n")) logger.info("Variables:\n${variableString}") - extractFragment(sqlStr, "INNER JOIN(PARTITIONED)") { exchangeNum -> + extractFragment(sqlStr, "INNER JOIN(BUCKET_SHUFFLE)") { exchangeNum -> assertTrue(exchangeNum == 1) }