From e8ef9adb078bce9193ff0ce0b0555e486ffc010c Mon Sep 17 00:00:00 2001 From: "shuming.li" Date: Thu, 22 Aug 2024 17:44:21 +0800 Subject: [PATCH] Support push down aggregate functions in mv rewrite Signed-off-by: shuming.li --- .../AggregateFunctionRollupUtils.java | 13 + .../materialization/EquationRewriter.java | 104 ++++++++ ...aterializedViewAggPushDownRewriteTest.java | 7 +- .../planner/MaterializedViewManualTest.java | 126 +++++++++ .../com/starrocks/sql/plan/TracerMVTest.java | 4 +- .../test_mv_rewrite_with_push_down_aggregate | 242 ++++++++++++++++++ .../test_mv_rewrite_with_push_down_aggregate | 116 +++++++++ 7 files changed, 609 insertions(+), 3 deletions(-) create mode 100644 test/sql/test_materialized_view_rewrite/R/test_mv_rewrite_with_push_down_aggregate create mode 100644 test/sql/test_materialized_view_rewrite/T/test_mv_rewrite_with_push_down_aggregate diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/AggregateFunctionRollupUtils.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/AggregateFunctionRollupUtils.java index c5be262cfd6fda..55ef969c37edb6 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/AggregateFunctionRollupUtils.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/AggregateFunctionRollupUtils.java @@ -63,6 +63,19 @@ public class AggregateFunctionRollupUtils { .put(FunctionSet.ARRAY_AGG_DISTINCT, FunctionSet.ARRAY_UNIQUE_AGG) .build(); + // Functions that can be pushed down to mv union rewrite. + // eg: + // sum(fn(col)) = fn(sum(col)) + // min(fn(col)) = fn(min(col)) + // max(fn(col)) = fn(max(col)) + // if fn is a scalar function, it can be pushed down to mv union rewrite. + public static final Map MV_REWRITE_PUSH_DOWN_FUNCTION_MAP = ImmutableMap.builder() + // Functions and rollup functions are the same. + .put(FunctionSet.SUM, FunctionSet.SUM) + .put(FunctionSet.MAX, FunctionSet.MAX) + .put(FunctionSet.MIN, FunctionSet.MIN) + .build(); + public static final Set NON_CUMULATIVE_ROLLUP_FUNCTION_MAP = ImmutableSet.builder() .add(FunctionSet.MAX) .add(FunctionSet.MIN) diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/EquationRewriter.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/EquationRewriter.java index 63a517ff54a34c..bee49c60f73e98 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/EquationRewriter.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/materialization/EquationRewriter.java @@ -25,10 +25,12 @@ import com.starrocks.common.Pair; import com.starrocks.sql.optimizer.operator.scalar.BinaryPredicateOperator; import com.starrocks.sql.optimizer.operator.scalar.CallOperator; +import com.starrocks.sql.optimizer.operator.scalar.CaseWhenOperator; import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator; import com.starrocks.sql.optimizer.operator.scalar.ScalarOperator; import com.starrocks.sql.optimizer.operator.scalar.ScalarOperatorVisitor; import com.starrocks.sql.optimizer.rewrite.BaseScalarOperatorShuttle; +import com.starrocks.sql.optimizer.rewrite.ReplaceColumnRefRewriter; import com.starrocks.sql.optimizer.rule.transformation.materialization.equivalent.EquivalentShuttleContext; import com.starrocks.sql.optimizer.rule.transformation.materialization.equivalent.IRewriteEquivalent; import com.starrocks.sql.optimizer.rule.transformation.materialization.equivalent.RewriteEquivalent; @@ -48,6 +50,9 @@ public class EquationRewriter { private AggregateFunctionRewriter aggregateFunctionRewriter; boolean underAggFunctionRewriteContext; + // Replace the corresponding ColumnRef with ScalarOperator if this call operator can be pushed down. + private Map> pushDownAggOperatorMap = Maps.newHashMap(); + public EquationRewriter() { this.equationMap = ArrayListMultimap.create(); this.rewriteEquivalents = Maps.newHashMap(); @@ -162,9 +167,88 @@ public ScalarOperator visitCall(CallOperator call, Void context) { } } + // rewrite by pushing down aggregate + rewritten = rewriteByPushDownAggregation(call); + if (rewritten != null) { + return rewritten; + } + return super.visitCall(call, context); } + private ScalarOperator rewriteByPushDownAggregation(CallOperator call) { + if (AggregateFunctionRollupUtils.MV_REWRITE_PUSH_DOWN_FUNCTION_MAP.containsKey(call.getFnName()) && + pushDownAggOperatorMap.containsKey(call.getFnName())) { + Map operatorMap = pushDownAggOperatorMap.get(call.getFnName()); + ScalarOperator arg0 = call.getChild(0); + if (call.getChildren().size() != 1) { + return null; + } + // push down aggregate now only supports one child + if (!canPushDownCallOperator(arg0, operatorMap)) { + return null; + } + + ReplaceColumnRefRewriter rewriter = new ReplaceColumnRefRewriter(operatorMap); + ScalarOperator pdCall = rewriter.rewrite(arg0); + if (pdCall == null || pdCall.equals(arg0)) { + return null; + } + ScalarOperator rewritten = pdCall.accept(this, null); + if (rewritten != null) { + shuttleContext.setRewrittenByEquivalent(true); + return new CallOperator(call.getFnName(), call.getType(), Lists.newArrayList(rewritten), + call.getFunction()); + } + } + return null; + } + + private boolean canPushDownCallOperator(ScalarOperator arg0, + Map operatorMap) { + if (arg0 == null || !(arg0 instanceof CallOperator)) { + return false; + } + CallOperator call = (CallOperator) arg0; + List columnRefs = arg0.getColumnRefs(); + if (columnRefs.size() == 1) { + return operatorMap.containsKey(columnRefs.get(0)); + } else { + // if there are many column refs in arg0, the agg column must be the same. + if (call.getFnName().equalsIgnoreCase(FunctionSet.IF)) { + if (columnRefs.size() != 3) { + return false; + } + // if the first column ref is in the operatorMap, means the agg column maybe as condition which + // cannot be rewritten + if (operatorMap.containsKey(columnRefs.get(0))) { + return false; + } + return true; + } else if (call instanceof CaseWhenOperator) { + CaseWhenOperator caseWhen = (CaseWhenOperator) call; + // if case condition contains any agg column ref, return false + if (caseWhen.getCaseClause() != null) { + List caseColumnRefs = caseWhen.getCaseClause().getColumnRefs(); + if (caseColumnRefs.stream().anyMatch(x -> operatorMap.containsKey(x))) { + return false; + } + } + // if case condition contains any agg column ref, return false + for (int i = 0; i < caseWhen.getWhenClauseSize(); i++) { + ScalarOperator when = caseWhen.getWhenClause(i); + if (when != null) { + List whenColumnRefs = when.getColumnRefs(); + if (whenColumnRefs.stream().anyMatch(x -> operatorMap.containsKey(x))) { + return false; + } + } + } + } + return false; + } + } + Optional replace(ScalarOperator scalarOperator) { if (equationMap.containsKey(scalarOperator)) { Optional> mappedColumnAndExprRef = @@ -233,6 +317,7 @@ public void addMapping(ScalarOperator expr, ColumnRefOperator col) { equationMap.put(extendedEntry.first, Pair.create(col, extendedEntry.second)); } + // add into equivalents for (IRewriteEquivalent equivalent : EQUIVALENTS) { IRewriteEquivalent.RewriteEquivalentContext eqContext = equivalent.prepare(expr); if (eqContext != null) { @@ -241,6 +326,25 @@ public void addMapping(ScalarOperator expr, ColumnRefOperator col) { .add(eq); } } + + // add into a push-down operator map + if (expr instanceof CallOperator) { + CallOperator call = (CallOperator) expr; + String fnName = call.getFnName(); + if (AggregateFunctionRollupUtils.REWRITE_ROLLUP_FUNCTION_MAP.containsKey(fnName) && call.getChildren().size() == 1) { + ScalarOperator arg0 = call.getChild(0); + // NOTE: only support push down when the argument is a column ref. + // eg: + // mv: sum(cast(col as tinyint)) + // query: 2 * sum(col) + // query cannot be used to push down, because the argument is not a column ref. + if (arg0 != null && arg0.isColumnRef()) { + pushDownAggOperatorMap + .computeIfAbsent(fnName, x -> Maps.newHashMap()) + .put((ColumnRefOperator) arg0, call); + } + } + } } private static class EquationTransformer extends ScalarOperatorVisitor { diff --git a/fe/fe-core/src/test/java/com/starrocks/planner/MaterializedViewAggPushDownRewriteTest.java b/fe/fe-core/src/test/java/com/starrocks/planner/MaterializedViewAggPushDownRewriteTest.java index e6e6b23cabd2bc..6cd85116d6ae86 100644 --- a/fe/fe-core/src/test/java/com/starrocks/planner/MaterializedViewAggPushDownRewriteTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/planner/MaterializedViewAggPushDownRewriteTest.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.Set; +import static com.starrocks.sql.optimizer.rule.transformation.materialization.AggregateFunctionRollupUtils.MV_REWRITE_PUSH_DOWN_FUNCTION_MAP; import static com.starrocks.sql.optimizer.rule.transformation.materialization.AggregateFunctionRollupUtils.REWRITE_ROLLUP_FUNCTION_MAP; import static com.starrocks.sql.optimizer.rule.transformation.materialization.AggregateFunctionRollupUtils.SAFE_REWRITE_ROLLUP_FUNCTION_MAP; @@ -162,7 +163,11 @@ public void testAggPushDown_RollupFunctions_QueryMV_NoMatch() { String query = String.format("select LO_ORDERDATE, %s as revenue_sum\n" + " from lineorder l join dates d on l.LO_ORDERDATE = d.d_datekey\n" + " group by LO_ORDERDATE", queryAggFunc); - sql(query).nonMatch("mv0"); + if (MV_REWRITE_PUSH_DOWN_FUNCTION_MAP.containsKey(funcName)) { + sql(query).contains("mv0"); + } else { + sql(query).nonMatch("mv0"); + } }); } } diff --git a/fe/fe-core/src/test/java/com/starrocks/planner/MaterializedViewManualTest.java b/fe/fe-core/src/test/java/com/starrocks/planner/MaterializedViewManualTest.java index c28479f2a39262..1dd42e02c4b118 100644 --- a/fe/fe-core/src/test/java/com/starrocks/planner/MaterializedViewManualTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/planner/MaterializedViewManualTest.java @@ -208,6 +208,132 @@ public void testNullableTestCase1() throws Exception { .match("join_null_mv_2"); } + @Test + public void testRewriteWithPushDownEquivalent1() throws Exception { + starRocksAssert.withTable("CREATE TABLE `tbl1` (\n" + + " `k1` date,\n" + + " `k2` decimal64(18, 2),\n" + + " `k3` varchar(255),\n" + + " `v1` bigint \n" + + ") ENGINE=OLAP \n" + + "DUPLICATE KEY(`k1`, `k2`, `k3`)\n" + + "DISTRIBUTED BY RANDOM\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ");"); + starRocksAssert.withMaterializedView("CREATE MATERIALIZED VIEW `mv1` \n" + + "DISTRIBUTED BY RANDOM\n" + + "REFRESH ASYNC\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ")\n" + + "AS SELECT k1, k2, k3, sum(v1) from tbl1 group by k1, k2, k3"); + { + String sql = "select t1.k1, " + + " sum(case when t1.k1 between date_add('2024-07-20', interval -1 month) and " + + " date_add('2024-07-20', interval 1 month) then t1.v1 else 0 end) " + + " from tbl1 t1 group by t1.k1"; + sql(sql).contains("mv1") + .contains(" 1:AGGREGATE (update serialize)\n" + + " | STREAMING\n" + + " | output: sum(if((7: k1 >= '2024-06-20') AND (7: k1 <= '2024-08-20'), 10: sum(v1), 0))\n" + + " | group by: 7: k1\n" + + " | \n" + + " 0:OlapScanNode\n" + + " TABLE: mv1"); + } + { + String sql = "select t1.k1, " + + " 2 * sum(case when t1.k1 between date_add('2024-07-20', interval -1 month) and " + + " date_add('2024-07-20', interval 1 month) then t1.v1 else 0 end) " + + " from tbl1 t1 group by t1.k1"; + sql(sql).contains("mv1") + .contains(" 1:AGGREGATE (update serialize)\n" + + " | STREAMING\n" + + " | output: sum(if((7: k1 >= '2024-06-20') AND (7: k1 <= '2024-08-20'), 10: sum(v1), 0))\n" + + " | group by: 7: k1\n" + + " | \n" + + " 0:OlapScanNode\n" + + " TABLE: mv1") + .contains(" 4:Project\n" + + " | : 8: k1\n" + + " | : 2 * 12: sum"); + } + starRocksAssert.dropMaterializedView("mv1"); + starRocksAssert.dropTable("tbl1"); + } + + @Test + public void testRewriteWithPushDownEquivalent2() throws Exception { + starRocksAssert.withTable("CREATE TABLE `tbl1` (\n" + + " `k1` date,\n" + + " `k2` decimal64(18, 2),\n" + + " `k3` varchar(255),\n" + + " `v1` bigint \n" + + ") ENGINE=OLAP \n" + + "DUPLICATE KEY(`k1`, `k2`, `k3`)\n" + + "DISTRIBUTED BY RANDOM\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ");"); + starRocksAssert.withMaterializedView("CREATE MATERIALIZED VIEW `mv1` \n" + + "DISTRIBUTED BY RANDOM\n" + + "REFRESH ASYNC\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ")\n" + + "AS SELECT k1, k2, k3, sum(2 * v1) from tbl1 group by k1, k2, k3"); + { + // mv's sum doesn't contain column ref which cannot be used for rewrite + String sql = "select t1.k1, " + + " sum(case when t1.k1 between date_add('2024-07-20', interval -1 month) and " + + " date_add('2024-07-20', interval 1 month) then 2 * t1.v1 else 0 end) " + + " from tbl1 t1 group by t1.k1"; + sql(sql).notContain("mv1"); + } + starRocksAssert.dropMaterializedView("mv1"); + starRocksAssert.dropTable("tbl1"); + } + + @Test + public void testRewriteWithPushDownEquivalent3() throws Exception { + starRocksAssert.withTable("CREATE TABLE `tbl1` (\n" + + " `k1` date,\n" + + " `k2` decimal64(18, 2),\n" + + " `k3` varchar(255),\n" + + " `v1` bigint \n" + + ") ENGINE=OLAP \n" + + "DUPLICATE KEY(`k1`, `k2`, `k3`)\n" + + "DISTRIBUTED BY RANDOM\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ");"); + starRocksAssert.withMaterializedView("CREATE MATERIALIZED VIEW `mv1` \n" + + "DISTRIBUTED BY RANDOM\n" + + "REFRESH ASYNC\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ")\n" + + "AS SELECT k1, k2, k3, sum(v1), max(v1) from tbl1 group by k1, k2, k3"); + { + String sql = "select t1.k1, 2 * min(v1 + 1) from tbl1 t1 group by t1.k1"; + sql(sql).notContain("mv1"); + } + { + String sql = "select t1.k1, 2 * sum(case when t1.v1 > 10 then t1.v1 else 0 end) " + + " from tbl1 t1 group by t1.k1"; + sql(sql).notContain("mv1"); + } + { + String sql = "select t1.k1, " + + " 2 * sum(v1 + cast(k3 as int)) " + + " from tbl1 t1 group by t1.k1"; + sql(sql).notContain("mv1"); + } + starRocksAssert.dropMaterializedView("mv1"); + starRocksAssert.dropTable("tbl1"); + } + @Test public void testNullableTestCase2() throws Exception { String mv = "create materialized view join_null_mv\n" + diff --git a/fe/fe-core/src/test/java/com/starrocks/sql/plan/TracerMVTest.java b/fe/fe-core/src/test/java/com/starrocks/sql/plan/TracerMVTest.java index 57babceb01260a..c680acfbc587d0 100644 --- a/fe/fe-core/src/test/java/com/starrocks/sql/plan/TracerMVTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/sql/plan/TracerMVTest.java @@ -148,7 +148,7 @@ public void testTracerLogMV_Fail1() { Tracers.init(connectContext, Tracers.Mode.LOGS, "MV"); String mv = "select locations.locationid, empid, sum(emps.deptno) as col3 from emps " + "join locations on emps.locationid = locations.locationid group by empid,locations.locationid"; - testRewriteFail(mv, "select emps.locationid, empid, sum(emps.deptno + 1) as col3 from emps " + + testRewriteFail(mv, "select emps.locationid, empid, min(emps.deptno + 1) as col3 from emps " + "join locations on emps.locationid = locations.locationid where empid > 10 group by empid,emps.locationid"); String pr = Tracers.printLogs(); Tracers.close(); @@ -163,7 +163,7 @@ public void testTracerLogMV_Fail2() { Tracers.init(connectContext, Tracers.Mode.LOGS, "MV"); String mv = "select locations.locationid, empid, sum(emps.deptno) as col3 from emps " + "join locations on emps.locationid = locations.locationid group by empid,locations.locationid"; - testRewriteFail(mv, "select emps.locationid, empid, sum(emps.deptno + 1) as col3 from emps " + + testRewriteFail(mv, "select emps.locationid, empid, min(emps.deptno + 1) as col3 from emps " + "join locations on emps.locationid = locations.locationid where empid > 10 group by empid,emps.locationid"); String pr = Tracers.printLogs(); Tracers.close(); diff --git a/test/sql/test_materialized_view_rewrite/R/test_mv_rewrite_with_push_down_aggregate b/test/sql/test_materialized_view_rewrite/R/test_mv_rewrite_with_push_down_aggregate new file mode 100644 index 00000000000000..ee2c1c65a149af --- /dev/null +++ b/test/sql/test_materialized_view_rewrite/R/test_mv_rewrite_with_push_down_aggregate @@ -0,0 +1,242 @@ +-- name: test_mv_rewrite_with_push_down_aggregate +create database db_${uuid0}; +-- result: +-- !result +use db_${uuid0}; +-- result: +-- !result +CREATE TABLE t1 ( + k1 date NULL COMMENT "", + k2 varchar(255) NULL COMMENT "", + k3 varchar(255) NULL COMMENT "", + v1 decimal64(18, 2) NULL COMMENT "" +) ENGINE=OLAP +DUPLICATE KEY(k1, k2, k3) +DISTRIBUTED BY RANDOM +PROPERTIES ( + "replication_num" = "1" +); +-- result: +-- !result +INSERT INTO t1 (k1, v1, k3, k2) VALUES +('2024-07-20', 10000.00, 'LOB1', 'Region1'), +('2024-07-19', 8500.00, 'LOB1', 'Region1'), +('2024-06-20', 9500.00, 'LOB1', 'Region1'), +('2024-06-19', 9000.00, 'LOB2', 'Region2'), +('2024-07-01', 12000.00, 'LOB2', 'Region2'); +-- result: +-- !result +CREATE MATERIALIZED VIEW test_mv1 +DISTRIBUTED BY RANDOM +REFRESH ASYNC +PROPERTIES ( + "replication_num" = "1" +) +AS SELECT t1.k1, t1.k3,t1.k2, sum(t1.v1) AS period_actual_value FROM t1 GROUP BY t1.k1, t1.k3, t1.k2; +-- result: +-- !result +refresh materialized view test_mv1 with sync mode; +result=EXPLAIN WITH date_cte AS ( + SELECT + '2024-07-20' AS date1, + 'last_month_period' AS date2, + 'LD' AS date3, + '2024-07-20' AS date4, + '2024-07-20' AS date5, + date_add('2024-07-20', interval -1 month) AS date6, + date_add('2024-07-20',interval -1 month) AS date7 +) +SELECT date_cte.date1, date_cte.date2, k3, k2, SUM(v1) +FROM t1 JOIN date_cte ON TRUE +WHERE k1 BETWEEN date_cte.date4 AND date_cte.date5 OR k1 BETWEEN date_cte.date6 AND date_cte.date7 +GROUP BY date_cte.date1, date_cte.date2, k3, k2; +-- result: +PLAN FRAGMENT 0 + OUTPUT EXPRS:10: expr | 7: expr | 3: k3 | 2: k2 | 13: sum + PARTITION: UNPARTITIONED + + RESULT SINK + + 5:EXCHANGE + +PLAN FRAGMENT 1 + OUTPUT EXPRS: + PARTITION: HASH_PARTITIONED: 15: k3, 16: k2 + + STREAM DATA SINK + EXCHANGE ID: 05 + UNPARTITIONED + + 4:Project + | : 16: k2 + | : 15: k3 + | : 'last_month_period' + | : '2024-07-20' + | : 18: sum + | + 3:AGGREGATE (merge finalize) + | output: sum(18: sum) + | group by: 15: k3, 16: k2 + | + 2:EXCHANGE + +PLAN FRAGMENT 2 + OUTPUT EXPRS: + colocate exec groups: ExecGroup{groupId=1, nodeIds=[0, 1]} + PARTITION: RANDOM + + STREAM DATA SINK + EXCHANGE ID: 02 + HASH_PARTITIONED: 15: k3, 16: k2 + + 1:AGGREGATE (update serialize) + | STREAMING + | output: sum(17: period_actual_value) + | group by: 15: k3, 16: k2 + | + 0:OlapScanNode + TABLE: test_mv1 + PREAGGREGATION: ON + PREDICATES: 14: k1 IN ('2024-06-20', '2024-07-20') + partitions=1/1 + rollup: test_mv1 + tabletRatio=2/2 + tabletList=95555,95557 + cardinality=1 + avgRowSize=0.0 + MaterializedView: true +-- !result +function: check_hit_materialized_view_plan("""${result}""", "test_mv1") +-- result: +None +-- !result +WITH date_cte AS ( + SELECT + '2024-07-20' AS date1, + 'last_month_period' AS date2, + 'LD' AS date3, + '2024-07-20' AS date4, + '2024-07-20' AS date5, + date_add('2024-07-20', interval -1 month) AS date6, + date_add('2024-07-20',interval -1 month) AS date7 +) +SELECT date_cte.date1, date_cte.date2, k3, k2, SUM(v1) +FROM t1 JOIN date_cte ON TRUE +WHERE k1 BETWEEN date_cte.date4 AND date_cte.date5 OR k1 BETWEEN date_cte.date6 AND date_cte.date7 +GROUP BY date_cte.date1, date_cte.date2, k3, k2 order by k3, k2; +-- result: +2024-07-20 last_month_period LOB1 Region1 19500.00 +-- !result +result=EXPLAIN WITH date_cte AS ( + SELECT + '2024-07-20' AS date1, + 'last_month_period' AS date2, + 'LD' AS date3, + '2024-07-20' AS date4, + '2024-07-26' AS date5, + date_add('2024-07-20', interval -1 month) AS date6, + date_add('2024-07-20',interval -1 month) AS date7 +) +SELECT + date_cte.date1, + date_cte.date2, + k2, + SUM(CASE + WHEN k1 BETWEEN date_cte.date6 AND date_cte.date7 + THEN v1 + ELSE 0 + END), + SUM(CASE + WHEN k1 BETWEEN date_cte.date1 AND date_cte.date5 + THEN v1 + ELSE 0 + END) +FROM t1 JOIN date_cte ON TRUE GROUP BY date_cte.date1, date_cte.date2, k2; +-- result: +PLAN FRAGMENT 0 + OUTPUT EXPRS:9: expr | 7: expr | 2: k2 | 15: sum | 16: sum + PARTITION: UNPARTITIONED + + RESULT SINK + + 5:EXCHANGE + +PLAN FRAGMENT 1 + OUTPUT EXPRS: + PARTITION: HASH_PARTITIONED: 23: k2 + + STREAM DATA SINK + EXCHANGE ID: 05 + UNPARTITIONED + + 4:Project + | : DictDecode(23: k2, []) + | : 'last_month_period' + | : '2024-07-20' + | : 21: sum + | : 22: sum + | + 3:AGGREGATE (merge finalize) + | output: sum(21: sum), sum(22: sum) + | group by: 23: k2 + | + 2:EXCHANGE + +PLAN FRAGMENT 2 + OUTPUT EXPRS: + colocate exec groups: ExecGroup{groupId=1, nodeIds=[0, 1]} + PARTITION: RANDOM + + STREAM DATA SINK + EXCHANGE ID: 02 + HASH_PARTITIONED: 23: k2 + + 1:AGGREGATE (update serialize) + | STREAMING + | output: sum(if((17: k1 >= CAST('2024-06-20 00:00:00' AS DATE)) AND (17: k1 <= CAST('2024-06-20 00:00:00' AS DATE)), 20: period_actual_value, 0)), sum(if((17: k1 >= CAST('2024-07-20' AS DATE)) AND (17: k1 <= CAST('2024-07-26' AS DATE)), 20: period_actual_value, 0)) + | group by: 23: k2 + | + 0:OlapScanNode + TABLE: test_mv1 + PREAGGREGATION: ON + partitions=1/1 + rollup: test_mv1 + tabletRatio=2/2 + tabletList=95555,95557 + cardinality=1 + avgRowSize=0.0 + MaterializedView: true +-- !result +function: check_hit_materialized_view_plan("""${result}""", "test_mv1") +-- result: +None +-- !result +WITH date_cte AS ( + SELECT + '2024-07-20' AS date1, + 'last_month_period' AS date2, + 'LD' AS date3, + '2024-07-20' AS date4, + '2024-07-26' AS date5, + date_add('2024-07-20', interval -1 month) AS date6, + date_add('2024-07-20',interval -1 month) AS date7 +) +SELECT + date_cte.date1, + date_cte.date2, + k2, + SUM(CASE + WHEN k1 BETWEEN date_cte.date6 AND date_cte.date7 + THEN v1 + ELSE 0 + END), + SUM(CASE + WHEN k1 BETWEEN date_cte.date1 AND date_cte.date5 + THEN v1 + ELSE 0 + END) +FROM t1 JOIN date_cte ON TRUE GROUP BY date_cte.date1, date_cte.date2, k2 order by k2; +-- result: +2024-07-20 last_month_period Region1 9500.00 12000.00 +2024-07-20 last_month_period Region2 0.00 0.00 +-- !result \ No newline at end of file diff --git a/test/sql/test_materialized_view_rewrite/T/test_mv_rewrite_with_push_down_aggregate b/test/sql/test_materialized_view_rewrite/T/test_mv_rewrite_with_push_down_aggregate new file mode 100644 index 00000000000000..d437b56ecbfb91 --- /dev/null +++ b/test/sql/test_materialized_view_rewrite/T/test_mv_rewrite_with_push_down_aggregate @@ -0,0 +1,116 @@ +-- name: test_mv_rewrite_with_push_down_aggregate + +create database db_${uuid0}; +use db_${uuid0}; +--数据准备 +CREATE TABLE t1 ( + k1 date NULL COMMENT "", + k2 varchar(255) NULL COMMENT "", + k3 varchar(255) NULL COMMENT "", + v1 decimal64(18, 2) NULL COMMENT "" +) ENGINE=OLAP +DUPLICATE KEY(k1, k2, k3) +DISTRIBUTED BY RANDOM +PROPERTIES ( + "replication_num" = "1" +); +INSERT INTO t1 (k1, v1, k3, k2) VALUES +('2024-07-20', 10000.00, 'LOB1', 'Region1'), +('2024-07-19', 8500.00, 'LOB1', 'Region1'), +('2024-06-20', 9500.00, 'LOB1', 'Region1'), +('2024-06-19', 9000.00, 'LOB2', 'Region2'), +('2024-07-01', 12000.00, 'LOB2', 'Region2'); +--mv +CREATE MATERIALIZED VIEW test_mv1 +DISTRIBUTED BY RANDOM +REFRESH ASYNC +PROPERTIES ( + "replication_num" = "1" +) +AS SELECT t1.k1, t1.k3,t1.k2, sum(t1.v1) AS period_actual_value FROM t1 GROUP BY t1.k1, t1.k3, t1.k2; +refresh materialized view test_mv1 with sync mode; + +-- case1 +result=EXPLAIN WITH date_cte AS ( + SELECT + '2024-07-20' AS date1, + 'last_month_period' AS date2, + 'LD' AS date3, + '2024-07-20' AS date4, + '2024-07-20' AS date5, + date_add('2024-07-20', interval -1 month) AS date6, + date_add('2024-07-20',interval -1 month) AS date7 +) +SELECT date_cte.date1, date_cte.date2, k3, k2, SUM(v1) +FROM t1 JOIN date_cte ON TRUE +WHERE k1 BETWEEN date_cte.date4 AND date_cte.date5 OR k1 BETWEEN date_cte.date6 AND date_cte.date7 +GROUP BY date_cte.date1, date_cte.date2, k3, k2; +function: check_hit_materialized_view_plan("""${result}""", "test_mv1") +WITH date_cte AS ( + SELECT + '2024-07-20' AS date1, + 'last_month_period' AS date2, + 'LD' AS date3, + '2024-07-20' AS date4, + '2024-07-20' AS date5, + date_add('2024-07-20', interval -1 month) AS date6, + date_add('2024-07-20',interval -1 month) AS date7 +) +SELECT date_cte.date1, date_cte.date2, k3, k2, SUM(v1) +FROM t1 JOIN date_cte ON TRUE +WHERE k1 BETWEEN date_cte.date4 AND date_cte.date5 OR k1 BETWEEN date_cte.date6 AND date_cte.date7 +GROUP BY date_cte.date1, date_cte.date2, k3, k2 order by k3, k2; + +-- case2 +result=EXPLAIN WITH date_cte AS ( + SELECT + '2024-07-20' AS date1, + 'last_month_period' AS date2, + 'LD' AS date3, + '2024-07-20' AS date4, + '2024-07-26' AS date5, + date_add('2024-07-20', interval -1 month) AS date6, + date_add('2024-07-20',interval -1 month) AS date7 +) +SELECT + date_cte.date1, + date_cte.date2, + k2, + SUM(CASE + WHEN k1 BETWEEN date_cte.date6 AND date_cte.date7 + THEN v1 + ELSE 0 + END), + SUM(CASE + WHEN k1 BETWEEN date_cte.date1 AND date_cte.date5 + THEN v1 + ELSE 0 + END) +FROM t1 JOIN date_cte ON TRUE GROUP BY date_cte.date1, date_cte.date2, k2; +function: check_hit_materialized_view_plan("""${result}""", "test_mv1") + +WITH date_cte AS ( + SELECT + '2024-07-20' AS date1, + 'last_month_period' AS date2, + 'LD' AS date3, + '2024-07-20' AS date4, + '2024-07-26' AS date5, + date_add('2024-07-20', interval -1 month) AS date6, + date_add('2024-07-20',interval -1 month) AS date7 +) +SELECT + date_cte.date1, + date_cte.date2, + k2, + SUM(CASE + WHEN k1 BETWEEN date_cte.date6 AND date_cte.date7 + THEN v1 + ELSE 0 + END), + SUM(CASE + WHEN k1 BETWEEN date_cte.date1 AND date_cte.date5 + THEN v1 + ELSE 0 + END) +FROM t1 JOIN date_cte ON TRUE GROUP BY date_cte.date1, date_cte.date2, k2 order by k2; \ No newline at end of file