Skip to content

Commit

Permalink
fix bucket shuffle join
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Jun 25, 2024
1 parent a98f8c1 commit d82d391
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -304,15 +304,23 @@ public Boolean visitPhysicalHashJoin(
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
} else if (leftHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED
&& rightHashSpec.getShuffleType() == ShuffleType.NATURAL
&& !SessionVariable.canUseNereidsDistributePlanner()) {
// TODO: we must do shuffle on right because coordinator could not do right be selection in this case,
// since it always to check the left most node whether olap scan node.
&& rightHashSpec.getShuffleType() == ShuffleType.NATURAL) {
// TODO: we must do shuffle on right because coordinator
// after we fix coordinator problem, we could do right to left bucket shuffle
updatedForRight = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
if (SessionVariable.canUseNereidsDistributePlanner()) {
// nereids coordinator can exchange left side to right side to do bucket shuffle join
updatedForLeft = Optional.of(calAnotherSideRequired(
ShuffleType.STORAGE_BUCKETED, rightHashSpec, leftHashSpec,
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
} else {
// legacy coordinator could not do right be selection in this case,
// since it always to check the left most node whether olap scan node.
updatedForRight = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
}
} else if (leftHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED
&& rightHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED) {
if (bothSideShuffleKeysAreSameOrder(rightHashSpec, leftHashSpec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.nereids.properties.DistributionSpecHash;
import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
import org.apache.doris.nereids.properties.DistributionSpecReplicated;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.rewrite.ForeignKeyContext;
import org.apache.doris.nereids.trees.expressions.EqualPredicate;
import org.apache.doris.nereids.trees.expressions.ExprId;
Expand Down Expand Up @@ -224,12 +225,25 @@ public static EqualPredicate swapEqualToForChildrenOrder(EqualPredicate equalTo,
* return true if we should do bucket shuffle join when translate plan.
*/
public static boolean shouldBucketShuffleJoin(AbstractPhysicalJoin<PhysicalPlan, PhysicalPlan> join) {
DistributionSpec rightDistributionSpec = join.right().getPhysicalProperties().getDistributionSpec();
if (!(rightDistributionSpec instanceof DistributionSpecHash)) {
if (isStorageBucketed(join.right().getPhysicalProperties())) {
return true;
} else if (SessionVariable.canUseNereidsDistributePlanner()
&& isStorageBucketed(join.left().getPhysicalProperties())) {
return true;
}
return false;
}

private static boolean isStorageBucketed(PhysicalProperties physicalProperties) {
DistributionSpec distributionSpec = physicalProperties.getDistributionSpec();
if (!(distributionSpec instanceof DistributionSpecHash)) {
return false;
}
DistributionSpecHash rightHash = (DistributionSpecHash) rightDistributionSpec;
return rightHash.getShuffleType() == ShuffleType.STORAGE_BUCKETED;
DistributionSpecHash rightHash = (DistributionSpecHash) distributionSpec;
if (rightHash.getShuffleType() == ShuffleType.STORAGE_BUCKETED) {
return true;
}
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public Map<Worker, UninstancedScanSource> selectReplicaAndWorkerWithBucket(
Function<ScanNode, Map<Integer, Long>> bucketBytesSupplier = bucketBytesSupplier();
// all are olap scan nodes
if (!scanNodes.isEmpty() && scanNodes.size() == olapScanNodes.size()) {
if (olapScanNodes.size() == 1 && fragment.isBucketShuffleJoinInput()) {
if (olapScanNodes.size() == 1 && fragment.hasBucketShuffleJoin()) {
return selectForBucket(unassignedJob, scanNodes, bucketScanRangeSupplier, bucketBytesSupplier);
} else if (fragment.hasColocatePlanNode()) {
return selectForBucket(unassignedJob, scanNodes, bucketScanRangeSupplier, bucketBytesSupplier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private static boolean shouldAssignByBucket(PlanFragment fragment) {
if (fragment.hasColocatePlanNode()) {
return true;
}
if (enableBucketShuffleJoin() && fragment.isBucketShuffleJoinInput()) {
if (enableBucketShuffleJoin() && fragment.hasBucketShuffleJoin()) {
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,14 +260,8 @@ public void setHasColocatePlanNode(boolean hasColocatePlanNode) {
this.hasColocatePlanNode = hasColocatePlanNode;
}

public boolean isBucketShuffleJoinInput() {
if (hasBucketShuffleJoin.get()) {
return true;
}
if (destNode != null && destNode.getFragment().hasBucketShuffleJoin.get()) {
return true;
}
return false;
public boolean hasBucketShuffleJoin() {
return hasBucketShuffleJoin.get();
}

public void setResultSinkType(TResultSinkType resultSinkType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,13 @@
2 2 2 2

-- !bucket_shuffle_with_prune_tablets --
\N \N 2 2
\N \N 3 3

-- !bucket_shuffle_with_prune_tablets2 --
2 2 \N \N
3 3 \N \N

-- !bucket_shuffle_with_prune_tablets3 --
1 1 \N \N

-- !fillup_bucket --
\N 2
\N 3

-- !shuffle_left --

Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,19 @@ suite("local_shuffle") {
"""

multi_sql """
drop table if exists test_outer_join_decimal1;
CREATE TABLE IF NOT EXISTS test_outer_join_decimal1 (
drop table if exists test_local_shuffle3;
CREATE TABLE IF NOT EXISTS test_local_shuffle3 (
c0 int
)
DISTRIBUTED BY HASH (c0) BUCKETS 10 PROPERTIES ("replication_num" = "1");
drop table if exists test_outer_join_decimal2;
CREATE TABLE IF NOT EXISTS test_outer_join_decimal2 (
drop table if exists test_local_shuffle4;
CREATE TABLE IF NOT EXISTS test_local_shuffle4 (
c0 int
)
DISTRIBUTED BY HASH (c0) BUCKETS 10 PROPERTIES ("replication_num" = "1");
INSERT INTO test_outer_join_decimal1 (c0) VALUES (1), (3);
INSERT INTO test_outer_join_decimal2 (c0) VALUES (2), (3);
INSERT INTO test_local_shuffle3 (c0) VALUES (1), (3);
INSERT INTO test_local_shuffle4 (c0) VALUES (2), (3);
sync;
Expand All @@ -151,9 +151,37 @@ suite("local_shuffle") {

order_qt_fillup_bucket """
SELECT cast(a.c0 as int), cast(b.c0 as int) FROM
(select * from test_outer_join_decimal1 where c0 =1)a
(select * from test_local_shuffle3 where c0 =1)a
RIGHT OUTER JOIN
(select * from test_outer_join_decimal2)b
(select * from test_local_shuffle4)b
ON a.c0 = b.c0
"""

multi_sql """
drop table if exists test_shuffle_left_with_local_shuffle;
CREATE TABLE `test_shuffle_left_with_local_shuffle` (
id int,
id2 int
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
insert into test_shuffle_left values (1, 1), (2, 2), (3, 4);
"""

order_qt_shuffle_left """
select *
from
(
select id2
from test_shuffle_left_with_local_shuffle
group by id2
) a
inner join [shuffle]
test_shuffle_left_with_local_shuffle b
on a.id2=b.id;
"""
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ suite("shuffle_left_join") {

def sqlStr = """
select *
from test_shuffle_left a
from
(
select id2
from test_shuffle_left
group by id2
) a
inner join [shuffle]
test_shuffle_left b
on a.id2=b.id;
Expand All @@ -90,7 +95,7 @@ suite("shuffle_left_join") {
.collect(Collectors.joining("\n"))
logger.info("Variables:\n${variableString}")

extractFragment(sqlStr, "INNER JOIN(PARTITIONED)") { exchangeNum ->
extractFragment(sqlStr, "INNER JOIN(BUCKET_SHUFFLE)") { exchangeNum ->
assertTrue(exchangeNum == 1)
}

Expand Down

0 comments on commit d82d391

Please sign in to comment.