From cb1c15623cbf284a448caf774322d2566ef4f2af Mon Sep 17 00:00:00 2001 From: 924060929 <924060929@qq.com> Date: Thu, 6 Jun 2024 11:27:34 +0800 Subject: [PATCH] [enhancement](Nereids) support 4 phases distinct aggregate with full distribution (#35871) The origin implementation of 4 phases distinct aggregate only support the pattern which not contains `group by`, and only one distinct aggregate function for example: ```sql select count(distinct sex), sum(age) from student ``` This pr complement the 4 phases distinct aggregate with full distribution, to avoid data skew in the `group by`. for example ```sql select sex, sum(distinct age) from student group by sex; ``` The sex only contains two distinct values, `male` and `female`, and the table store millions rows. Shuffle by the `sex` cause the data skew and lots of instances process empty rows. The 4 phase aggregate shuffle `sex, age` to distinct rows first, so more instances can do parallel distinct, the plan shape will like this: ``` PhysicalAggregate(groupBy=[sex], output=[sex, sum(partial_sum(age))], mode=BUFFER_TO_RESULT) | PhysicalDistribute(columns=[sex]) | PhysicalAggregate(groupBy=[sex], output=[sex, partial_sum(age)], mode=INPUT_TO_BUFFER) | PhysicalAggregate(groupBy=[sex, age], output=[sex, age], mode=BUFFER_TO_BUFFER) | PhysicalDistribute(columns=[sex, age]) # more columns to shuffle avoid data skew | PhysicalAggregate(groupBy=[sex, age], output=[sex, age], mode=INPUT_TO_BUFFER) | PhysicalOlapScan(name=student) ``` (cherry picked from commit 03f1cbde7aea4aca9a1a2ac5b3ef43e250d2ca92) --- .../ChildrenPropertiesRegulator.java | 10 -- .../apache/doris/nereids/rules/RuleType.java | 1 + .../implementation/AggregateStrategies.java | 119 ++++++++++++++---- .../trees/plans/algebra/Aggregate.java | 22 ++++ .../data/nereids_p0/aggregate/aggregate.out | 4 + .../nereids_p0/aggregate/aggregate.groovy | 22 ++++ .../nereids_syntax_p0/agg_4_phase.groovy | 4 +- 7 files changed, 144 insertions(+), 38 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java index 038e2646a6dd996..3beed014aac9109 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java @@ -113,16 +113,6 @@ public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate // this means one stage gather agg, usually bad pattern return false; } - // forbid three or four stage distinct agg inter by distribute - if (agg.getAggMode() == AggMode.BUFFER_TO_BUFFER && children.get(0).getPlan() instanceof PhysicalDistribute) { - // if distinct without group by key, we prefer three or four stage distinct agg - // because the second phase of multi-distinct only have one instance, and it is slow generally. - if (agg.getGroupByExpressions().size() == 1 - && agg.getOutputExpressions().size() == 1) { - return true; - } - return false; - } // forbid TWO_PHASE_AGGREGATE_WITH_DISTINCT after shuffle // TODO: this is forbid good plan after cte reuse by mistake diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index 45439e4cd51bf55..24ba21c06a35cfa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -443,6 +443,7 @@ public enum RuleType { TWO_PHASE_AGGREGATE_WITH_MULTI_DISTINCT(RuleTypeClass.IMPLEMENTATION), THREE_PHASE_AGGREGATE_WITH_DISTINCT(RuleTypeClass.IMPLEMENTATION), FOUR_PHASE_AGGREGATE_WITH_DISTINCT(RuleTypeClass.IMPLEMENTATION), + FOUR_PHASE_AGGREGATE_WITH_DISTINCT_WITH_FULL_DISTRIBUTE(RuleTypeClass.IMPLEMENTATION), LOGICAL_UNION_TO_PHYSICAL_UNION(RuleTypeClass.IMPLEMENTATION), LOGICAL_EXCEPT_TO_PHYSICAL_EXCEPT(RuleTypeClass.IMPLEMENTATION), LOGICAL_INTERSECT_TO_PHYSICAL_INTERSECT(RuleTypeClass.IMPLEMENTATION), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java index e1095df7bab0b97..9cb7b4d84a47f7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java @@ -75,6 +75,7 @@ import org.apache.doris.nereids.util.TypeCoercionUtils; import org.apache.doris.qe.ConnectContext; +import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -293,15 +294,89 @@ && couldConvertToMulti(agg)) // .thenApplyMulti(ctx -> twoPhaseAggregateWithDistinct(ctx.root, ctx.connectContext)) // ), RuleType.THREE_PHASE_AGGREGATE_WITH_DISTINCT.build( - basePattern - .when(agg -> agg.getDistinctArguments().size() == 1) - .thenApplyMulti(ctx -> threePhaseAggregateWithDistinct(ctx.root, ctx.connectContext)) + basePattern + .when(agg -> agg.getDistinctArguments().size() == 1) + .thenApplyMulti(ctx -> threePhaseAggregateWithDistinct(ctx.root, ctx.connectContext)) ), + /* + * sql: + * select count(distinct name), sum(age) from student; + *

+ * 4 phase plan + * DISTINCT_GLOBAL(BUFFER_TO_RESULT, groupBy(), + * output[count(partial_count(name)), sum(partial_sum(partial_sum(age)))], + * GATHER) + * +--DISTINCT_LOCAL(INPUT_TO_BUFFER, groupBy(), + * output(partial_count(name), partial_sum(partial_sum(age))), + * hash distribute by name) + * +--GLOBAL(BUFFER_TO_BUFFER, groupBy(name), + * output(name, partial_sum(age)), + * hash_distribute by name) + * +--LOCAL(INPUT_TO_BUFFER, groupBy(name), output(name, partial_sum(age))) + * +--scan(name, age) + */ RuleType.FOUR_PHASE_AGGREGATE_WITH_DISTINCT.build( - basePattern - .when(agg -> agg.getDistinctArguments().size() == 1) - .when(agg -> agg.getGroupByExpressions().isEmpty()) - .thenApplyMulti(ctx -> fourPhaseAggregateWithDistinct(ctx.root, ctx.connectContext)) + basePattern + .when(agg -> agg.getDistinctArguments().size() == 1) + .when(agg -> agg.getGroupByExpressions().isEmpty()) + .thenApplyMulti(ctx -> { + Function, RequireProperties> secondPhaseRequireDistinctHash = + groupByAndDistinct -> RequireProperties.of( + PhysicalProperties.createHash( + ctx.root.getDistinctArguments(), ShuffleType.REQUIRE + ) + ); + Function, RequireProperties> fourPhaseRequireGather = + agg -> RequireProperties.of(PhysicalProperties.GATHER); + return fourPhaseAggregateWithDistinct( + ctx.root, ctx.connectContext, + secondPhaseRequireDistinctHash, fourPhaseRequireGather + ); + }) + ), + /* + * sql: + * select age, count(distinct name) from student group by age; + *

+ * 4 phase plan + * DISTINCT_GLOBAL(BUFFER_TO_RESULT, groupBy(age), + * output[age, sum(partial_count(name))], + * hash distribute by name) + * +--DISTINCT_LOCAL(INPUT_TO_BUFFER, groupBy(age), + * output(age, partial_count(name)), + * hash distribute by age, name) + * +--GLOBAL(BUFFER_TO_BUFFER, groupBy(age, name), + * output(age, name), + * hash_distribute by age, name) + * +--LOCAL(INPUT_TO_BUFFER, groupBy(age, name), output(age, name)) + * +--scan(age, name) + */ + RuleType.FOUR_PHASE_AGGREGATE_WITH_DISTINCT_WITH_FULL_DISTRIBUTE.build( + basePattern + .when(agg -> agg.everyDistinctArgumentNumIsOne() && !agg.getGroupByExpressions().isEmpty()) + .when(agg -> + ImmutableSet.builder() + .addAll(agg.getGroupByExpressions()) + .addAll(agg.getDistinctArguments()) + .build().size() > agg.getGroupByExpressions().size() + ) + .thenApplyMulti(ctx -> { + Function, RequireProperties> secondPhaseRequireGroupByAndDistinctHash = + groupByAndDistinct -> RequireProperties.of( + PhysicalProperties.createHash(groupByAndDistinct, ShuffleType.REQUIRE) + ); + + Function, RequireProperties> fourPhaseRequireGroupByHash = + agg -> RequireProperties.of( + PhysicalProperties.createHash( + agg.getGroupByExpressions(), ShuffleType.REQUIRE + ) + ); + return fourPhaseAggregateWithDistinct( + ctx.root, ctx.connectContext, + secondPhaseRequireGroupByAndDistinctHash, fourPhaseRequireGroupByHash + ); + }) ) ); } @@ -1649,19 +1724,10 @@ private boolean enablePushDownNoGroupAgg() { return connectContext == null || connectContext.getSessionVariable().enablePushDownNoGroupAgg(); } - /** - * sql: - * select count(distinct name), sum(age) from student; - *

- * 4 phase plan - * DISTINCT_GLOBAL, BUFFER_TO_RESULT groupBy(), output[count(name), sum(age#5)], [GATHER] - * +--DISTINCT_LOCAL, INPUT_TO_BUFFER, groupBy()), output(count(name), partial_sum(age)), hash distribute by name - * +--GLOBAL, BUFFER_TO_BUFFER, groupBy(name), output(name, partial_sum(age)), hash_distribute by name - * +--LOCAL, INPUT_TO_BUFFER, groupBy(name), output(name, partial_sum(age)) - * +--scan(name, age) - */ private List> fourPhaseAggregateWithDistinct( - LogicalAggregate logicalAgg, ConnectContext connectContext) { + LogicalAggregate logicalAgg, ConnectContext connectContext, + Function, RequireProperties> secondPhaseRequireSupplier, + Function, RequireProperties> fourPhaseRequireSupplier) { boolean couldBanned = couldConvertToMulti(logicalAgg); Set aggregateFunctions = logicalAgg.getAggregateFunctions(); @@ -1734,16 +1800,13 @@ private List> fourPhaseAggregateWithDistin globalAggOutput = ImmutableList.of(new Alias(new NullLiteral(TinyIntType.INSTANCE))); } - RequireProperties requireGather = RequireProperties.of(PhysicalProperties.GATHER); - - RequireProperties requireDistinctHash = RequireProperties.of( - PhysicalProperties.createHash(logicalAgg.getDistinctArguments(), ShuffleType.REQUIRE)); + RequireProperties secondPhaseRequire = secondPhaseRequireSupplier.apply(localAggGroupBy); //phase 2 PhysicalHashAggregate anyLocalHashGlobalAgg = new PhysicalHashAggregate<>( localAggGroupBy, globalAggOutput, Optional.of(ImmutableList.copyOf(logicalAgg.getDistinctArguments())), bufferToBufferParam, false, logicalAgg.getLogicalProperties(), - requireDistinctHash, anyLocalAgg); + secondPhaseRequire, anyLocalAgg); // phase 3 AggregateParam distinctLocalParam = new AggregateParam( @@ -1787,7 +1850,7 @@ private List> fourPhaseAggregateWithDistin PhysicalHashAggregate distinctLocal = new PhysicalHashAggregate<>( logicalAgg.getGroupByExpressions(), localDistinctOutput, Optional.empty(), distinctLocalParam, false, logicalAgg.getLogicalProperties(), - requireDistinctHash, anyLocalHashGlobalAgg); + secondPhaseRequire, anyLocalHashGlobalAgg); //phase 4 AggregateParam distinctGlobalParam = new AggregateParam( @@ -1801,7 +1864,7 @@ private List> fourPhaseAggregateWithDistin if (aggregateFunction.isDistinct()) { Set aggChild = Sets.newLinkedHashSet(aggregateFunction.children()); Preconditions.checkArgument(aggChild.size() == 1 - || aggregateFunction.getDistinctArguments().size() == 1, + || aggregateFunction.getDistinctArguments().size() == 1, "cannot process more than one child in aggregate distinct function: " + aggregateFunction); AggregateFunction nonDistinct = aggregateFunction @@ -1821,10 +1884,12 @@ private List> fourPhaseAggregateWithDistin }); globalDistinctOutput.add(outputExprPhase4); } + + RequireProperties fourPhaseRequire = fourPhaseRequireSupplier.apply(logicalAgg); PhysicalHashAggregate distinctGlobal = new PhysicalHashAggregate<>( logicalAgg.getGroupByExpressions(), globalDistinctOutput, Optional.empty(), distinctGlobalParam, false, logicalAgg.getLogicalProperties(), - requireGather, distinctLocal); + fourPhaseRequire, distinctLocal); return ImmutableList.>builder() .add(distinctGlobal) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java index e7d09b8cf8b9ce4..acce8eef309de39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; /** * Common interface for logical/physical Aggregate. @@ -68,4 +69,25 @@ default Set getDistinctArguments() { } return distinctArguments.build(); } + + /** everyDistinctArgumentNumIsOne */ + default boolean everyDistinctArgumentNumIsOne() { + AtomicBoolean hasDistinctArguments = new AtomicBoolean(false); + for (NamedExpression outputExpression : getOutputExpressions()) { + boolean distinctArgumentSizeNotOne = outputExpression.anyMatch(expr -> { + if (expr instanceof AggregateFunction) { + AggregateFunction aggFun = (AggregateFunction) expr; + if (aggFun.isDistinct()) { + hasDistinctArguments.set(true); + return aggFun.getDistinctArguments().size() != 1; + } + } + return false; + }); + if (distinctArgumentSizeNotOne) { + return false; + } + } + return hasDistinctArguments.get(); + } } diff --git a/regression-test/data/nereids_p0/aggregate/aggregate.out b/regression-test/data/nereids_p0/aggregate/aggregate.out index 9578f6bff092e2e..9264ba994e8c29d 100644 --- a/regression-test/data/nereids_p0/aggregate/aggregate.out +++ b/regression-test/data/nereids_p0/aggregate/aggregate.out @@ -685,3 +685,7 @@ TESTING AGAIN -- !having_with_limit -- 7 -32767.0 + +-- !four_phase_full_distribute -- +hello 1 1 +world 1 1 diff --git a/regression-test/suites/nereids_p0/aggregate/aggregate.groovy b/regression-test/suites/nereids_p0/aggregate/aggregate.groovy index 60601cee7cef829..43b6fa65e057509 100644 --- a/regression-test/suites/nereids_p0/aggregate/aggregate.groovy +++ b/regression-test/suites/nereids_p0/aggregate/aggregate.groovy @@ -347,4 +347,26 @@ suite("aggregate") { sql "insert into table_10_undef_partitions2_keys3_properties4_distributed_by5(pk,col_bigint_undef_signed,col_varchar_10__undef_signed,col_varchar_64__undef_signed) values (0,111,'from','t'),(1,null,'h','out'),(2,3814,'get','q'),(3,5166561111626303305,'s','right'),(4,2688963514917402600,'b','hey'),(5,-5065987944147755706,'p','mean'),(6,31061,'v','d'),(7,122,'the','t'),(8,-2882446,'going','a'),(9,-43,'y','a');" sql "SELECT MIN( `pk` ) FROM table_10_undef_partitions2_keys3_properties4_distributed_by5 WHERE ( col_varchar_64__undef_signed LIKE CONCAT ('come' , '%' ) OR col_varchar_10__undef_signed IN ( 'could' , 'was' , 'that' ) ) OR ( `pk` IS NULL OR ( `pk` <> 186 ) ) AND ( `pk` IS NOT NULL OR `pk` BETWEEN 255 AND -99 + 8 ) AND ( ( `pk` != 6 ) OR `pk` IS NULL );" + + sql "drop table if exists test_four_phase_full_distribute" + sql """CREATE TABLE `test_four_phase_full_distribute` ( + `id` INT NULL, + `age` INT NULL, + `name` VARCHAR(65533) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + );""" + + sql "insert into test_four_phase_full_distribute values(1, 21, 'hello'), (2, 22, 'world')" + sql " sync " + order_qt_four_phase_full_distribute """select + /*+SET_VAR(disable_nereids_rules='TWO_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI,TWO_PHASE_AGGREGATE_WITH_MULTI_DISTINCT,THREE_PHASE_AGGREGATE_WITH_COUNT_DISTINCT_MULTI,THREE_PHASE_AGGREGATE_WITH_DISTINCT,FOUR_PHASE_AGGREGATE_WITH_DISTINCT')*/ + name, count(distinct name), count(distinct age) + from test_four_phase_full_distribute + group by name + """ } diff --git a/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy b/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy index 19cac99c153fa7b..d3b418660abe824 100644 --- a/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy +++ b/regression-test/suites/nereids_syntax_p0/agg_4_phase.groovy @@ -58,4 +58,6 @@ suite("agg_4_phase") { qt_4phase (test_sql) sql """select GROUP_CONCAT(distinct name, " ") from agg_4_phase_tbl;""" -} \ No newline at end of file + + sql """select /*+SET_VAR(disable_nereids_rules='TWO_PHASE_AGGREGATE_SINGLE_DISTINCT_TO_MULTI,THREE_PHASE_AGGREGATE_WITH_DISTINCT,FOUR_PHASE_AGGREGATE_WITH_DISTINCT')*/ GROUP_CONCAT(distinct name, " ") from agg_4_phase_tbl group by gender;""" +}