Skip to content

Commit

Permalink
support pushdown project expression
Browse files Browse the repository at this point in the history
Signed-off-by: Murphy <[email protected]>
  • Loading branch information
murphyatwork committed Sep 2, 2024
1 parent 1e10d20 commit e3c4f98
Show file tree
Hide file tree
Showing 6 changed files with 308 additions and 0 deletions.
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,8 @@ public static MaterializedViewRewriteMode parse(String str) {

public static final String ENABLE_PUSHDOWN_OR_PREDICATE = "enable_pushdown_or_predicate";

public static final String ENABLE_PUSHDOWN_JOIN_PROJECTION = "enable_pushdown_join_projection";

public static final String ENABLE_SHOW_PREDICATE_TREE_IN_PROFILE = "enable_show_predicate_tree_in_profile";

public static final String SELECT_RATIO_THRESHOLD = "select_ratio_threshold";
Expand Down Expand Up @@ -1228,6 +1230,9 @@ public static MaterializedViewRewriteMode parse(String str) {
@VariableMgr.VarAttr(name = MAX_PUSHDOWN_CONDITIONS_PER_COLUMN)
private int maxPushdownConditionsPerColumn = -1;

@VariableMgr.VarAttr(name = ENABLE_PUSHDOWN_JOIN_PROJECTION)
private boolean enablePushDownJoinProjection = true;

@VariableMgr.VarAttr(name = HASH_JOIN_PUSH_DOWN_RIGHT_TABLE)
private boolean hashJoinPushDownRightTable = true;

Expand Down Expand Up @@ -2835,6 +2840,14 @@ public boolean isHashJoinPushDownRightTable() {
return this.hashJoinPushDownRightTable;
}

public boolean isEnablePushDownJoinProjection() {
return enablePushDownJoinProjection;
}

public void setEnablePushDownJoinProjection(boolean enablePushDownJoinProjection) {
this.enablePushDownJoinProjection = enablePushDownJoinProjection;
}

public String getStreamingPreaggregationMode() {
return streamingPreaggregationMode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ public Map<ColumnRefOperator, ScalarOperator> getColumnRefMap() {
return columnRefMap;
}

public ColumnRefSet getUsedColumns() {
ColumnRefSet columns = new ColumnRefSet();
columnRefMap.values().forEach(x -> columns.union(x.getUsedColumns()));
return columns;
}

@Override
public ColumnRefSet getOutputColumns(ExpressionContext expressionContext) {
ColumnRefSet columns = new ColumnRefSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
import com.starrocks.sql.optimizer.rule.transformation.PushDownPredicateWindowRule;
import com.starrocks.sql.optimizer.rule.transformation.PushDownProjectLimitRule;
import com.starrocks.sql.optimizer.rule.transformation.PushDownProjectToCTEAnchorRule;
import com.starrocks.sql.optimizer.rule.transformation.PushDownProjectToJoinRule;
import com.starrocks.sql.optimizer.rule.transformation.QuantifiedApply2JoinRule;
import com.starrocks.sql.optimizer.rule.transformation.QuantifiedApply2OuterJoinRule;
import com.starrocks.sql.optimizer.rule.transformation.ReorderIntersectRule;
Expand Down Expand Up @@ -384,6 +385,7 @@ public class RuleSet {
));

REWRITE_RULES.put(RuleSetType.PRUNE_PROJECT, ImmutableList.of(
PushDownProjectToJoinRule.getInstance(),
new PruneProjectRule(),
new PruneProjectEmptyRule(),
new MergeTwoProjectRule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public enum RuleType {
TF_PUSH_DOWN_APPLY_FILTER,
TF_PUSH_DOWN_APPLY_AGG,
TF_PUSH_DOWN_PROJECT_TO_CTE_ANCHOR,
TF_PUSH_DOWN_PROJECT_TO_JOIN,

TF_PRUNE_ASSERT_ONE_ROW,
TF_PUSH_DOWN_ASSERT_ONE_ROW_PROJECT,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.sql.optimizer.rule.transformation;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.starrocks.catalog.Type;
import com.starrocks.sql.optimizer.OptExpression;
import com.starrocks.sql.optimizer.OptimizerContext;
import com.starrocks.sql.optimizer.base.ColumnRefFactory;
import com.starrocks.sql.optimizer.base.ColumnRefSet;
import com.starrocks.sql.optimizer.operator.OperatorType;
import com.starrocks.sql.optimizer.operator.logical.LogicalJoinOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalProjectOperator;
import com.starrocks.sql.optimizer.operator.pattern.Pattern;
import com.starrocks.sql.optimizer.operator.scalar.CallOperator;
import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator;
import com.starrocks.sql.optimizer.operator.scalar.ScalarOperator;
import com.starrocks.sql.optimizer.operator.scalar.ScalarOperatorVisitor;
import com.starrocks.sql.optimizer.rule.RuleType;
import org.apache.commons.collections.CollectionUtils;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Project Join
* | / \
* Join ---> LEFT_CHILD Project
* / \ \
* LEFT_CHILD RIGHT_CHILD RIGHT_CHILD
* <p>
* Conditionally push down Project under Join operator:
* 1. For profitable expression: input is complex type, output is short primitive type
*/
public class PushDownProjectToJoinRule extends TransformationRule {

private static final PushDownProjectToJoinRule INSTANCE = new PushDownProjectToJoinRule();

protected PushDownProjectToJoinRule() {
super(RuleType.TF_PUSH_DOWN_PROJECT_TO_JOIN,
Pattern.create(OperatorType.LOGICAL_PROJECT).addChildren(
Pattern.create(OperatorType.LOGICAL_JOIN, OperatorType.PATTERN_LEAF,
OperatorType.PATTERN_LEAF)
));
}

public static TransformationRule getInstance() {
return INSTANCE;
}

@Override
public boolean check(final OptExpression input, OptimizerContext context) {
LogicalProjectOperator projectOp = input.getOp().cast();
for (var entry : projectOp.getColumnRefMap().entrySet()) {
ScalarOperator expr = entry.getValue();
if (isProfitableExpr(expr)) {
return true;
}
}
return false;
}

/**
* Expression whose input is complex type, and output is short primitive type.
* If it's pushed down, the expression cost can be reduced
*/
private boolean isProfitableExpr(ScalarOperator expr) {
ScalarOperatorVisitor<Boolean, Void> visitor = new ScalarOperatorVisitor<>() {

@Override
public Boolean visit(ScalarOperator scalarOperator, Void context) {
return false;
}

@Override
public Boolean visitCall(CallOperator call, Void context) {
Type retType = call.getType();
List<ColumnRefOperator> argumentColumns =
call.getArguments().stream()
.flatMap(x -> x.getColumnRefs().stream())
.collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(argumentColumns) &&
!retType.isComplexType() &&
argumentColumns.stream().anyMatch(x -> x.getType().isComplexType())) {
return true;
}
return false;
}
};
return expr.accept(visitor, null);
}

private void pushdownProject(Map<ColumnRefOperator, ScalarOperator> projectMap,
ColumnRefSet leftColumns,
ColumnRefSet rightColumns,
Map<ColumnRefOperator, ScalarOperator> leftMap,
Map<ColumnRefOperator, ScalarOperator> rightMap,
OptimizerContext context) {
// push down intersected columns
ColumnRefFactory factory = context.getColumnRefFactory();
Map<ColumnRefOperator, ColumnRefOperator> replacingMap = Maps.newHashMap();
for (var entry : projectMap.entrySet()) {
ScalarOperator expr = entry.getValue();
if (!expr.isColumnRef()) {
if (leftColumns.containsAll(expr.getUsedColumns())) {
ColumnRefOperator newRef = factory.create(expr, expr.getType(), expr.isNullable());
leftMap.put(newRef, expr);
replacingMap.put(entry.getKey(), newRef);
} else if (rightColumns.containsAll(expr.getUsedColumns())) {
ColumnRefOperator newRef = factory.create(expr, expr.getType(), expr.isNullable());
rightMap.put(newRef, expr);
replacingMap.put(entry.getKey(), newRef);
}
}
}

// replace existing expression
projectMap.putAll(replacingMap);
}

@Override
public List<OptExpression> transform(OptExpression input, OptimizerContext context) {
LogicalProjectOperator projectOp = input.getOp().cast();
OptExpression joinExpr = input.inputAt(0);
LogicalJoinOperator joinOp = joinExpr.getOp().cast();
OptExpression joinLeft = joinExpr.inputAt(0);
OptExpression joinRight = joinExpr.inputAt(1);

Map<ColumnRefOperator, ScalarOperator> existingProject = Maps.newHashMap(projectOp.getColumnRefMap());

Map<ColumnRefOperator, ScalarOperator> leftMap = Maps.newHashMap();
Map<ColumnRefOperator, ScalarOperator> rightMap = Maps.newHashMap();
pushdownProject(existingProject,
joinLeft.getOutputColumns(), joinRight.getOutputColumns(),
leftMap, rightMap, context);

if (leftMap.isEmpty() && rightMap.isEmpty()) {
return Lists.newArrayList();
}
if (!leftMap.isEmpty()) {
leftMap.putAll(((LogicalProjectOperator) joinLeft.getOp()).getColumnRefMap());
joinLeft = OptExpression.create(new LogicalProjectOperator.Builder()
.withOperator(projectOp)
.setColumnRefMap(leftMap)
.build(), joinLeft);
}

if (!rightMap.isEmpty()) {
rightMap.putAll(((LogicalProjectOperator) joinRight.getOp()).getColumnRefMap());
joinRight = OptExpression.create(new LogicalProjectOperator.Builder()
.withOperator(projectOp)
.setColumnRefMap(rightMap)
.build(), joinRight);
}

OptExpression newJoin = OptExpression.create(new LogicalJoinOperator.Builder()
.withOperator(joinOp)
.build(), joinLeft, joinRight);

LogicalProjectOperator newProjectOp =
new LogicalProjectOperator.Builder()
.withOperator(projectOp)
.setColumnRefMap(existingProject)
.build();
return Lists.newArrayList(OptExpression.create(newProjectOp, newJoin));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.sql.plan;

import org.junit.Test;

public class PushDownJoinProjectionTest extends PlanTestBase {

@Test
public void testPushdown() throws Exception {
starRocksAssert.query("select tarray.v1, array_contains(tarray.v3, 123) " +
"from t0 join tarray on(t0.v1 = tarray.v1) ")
.explainContains(" 1:Project\n" +
" | <slot 4> : 4: v1\n" +
" | <slot 6> : 6: v3\n" +
" | <slot 9> : array_contains(6: v3, 123)\n" +
" | \n" +
" 0:OlapScanNode\n" +
" TABLE: tarray");
starRocksAssert.query("select t0.v1, t0.v2, tarray.v1 as c1, array_contains(tarray.v3, 123) as c2\n" +
"from t0 join tarray on(t0.v1 = tarray.v1) ")
.explainContains(" 1:Project\n" +
" | <slot 4> : 4: v1\n" +
" | <slot 6> : 6: v3\n" +
" | <slot 9> : array_contains(6: v3, 123)\n" +
" | \n" +
" 0:OlapScanNode");
starRocksAssert.query("select t0.v1, t0.v2, tarray.v1 as c1, 1 + array_contains(tarray.v3, 123) " +
"from t0 join tarray on(t0.v1 = tarray.v1) ")
.explainContains(" 1:Project\n" +
" | <slot 4> : 4: v1\n" +
" | <slot 6> : 6: v3\n" +
" | <slot 9> : 1 + CAST(array_contains(6: v3, 123) AS SMALLINT)\n" +
" | \n" +
" 0:OlapScanNode");

// multiple table join
starRocksAssert.query("select t0.v1, t0.v2, " +
" a1.v1 as c1, array_contains(a1.v3, 123) as c2, " +
" t1.v4, t1.v5" +
" from t0 " +
" join tarray a1 on(t0.v1 = a1.v1) " +
" join t1 on(t0.v1 = t1.v4) ")
.explainContains(" 1:Project\n" +
" | <slot 4> : 4: v1\n" +
" | <slot 6> : 6: v3\n" +
" | <slot 13> : array_contains(6: v3, 123)\n" +
" | \n" +
" 0:OlapScanNode");
starRocksAssert.query("select t0.v1, t0.v2, " +
" a1.v1 as c1, array_contains(a1.v3, 123) as c2, " +
" a2.v1 as c3, map_size(a2.v3) as c4 " +
" from t0 " +
" join tarray a1 on(t0.v1 = a1.v1) " +
" join tmap a2 on(t0.v1 = a2.v1) ")
.explainContains(" 1:Project\n" +
" | <slot 4> : 4: v1\n" +
" | <slot 6> : 6: v3\n" +
" | <slot 15> : array_contains(6: v3, 123)\n" +
" | \n" +
" 0:OlapScanNode\n" +
" TABLE: tarray", " 8:Project\n" +
" | <slot 7> : 7: v1\n" +
" | <slot 13> : map_size(9: v3)\n" +
" | \n" +
" 7:OlapScanNode\n" +
" TABLE: tmap");
starRocksAssert.query("select t0.v1, t0.v2, " +
" a1.v1 as c1, array_contains(a1.v3, 123) as c2, " +
" a2.v1 as c3, array_contains(a2.v3, 456) as c4 " +
" from t0 " +
" join tarray a1 on(t0.v1 = a1.v1) " +
" join tarray a2 on(t0.v1 = a2.v1) ")
.explainContains(" 4:Project\n" +
" | <slot 7> : 7: v1\n" +
" | <slot 9> : 9: v3\n" +
" | <slot 15> : array_contains(9: v3, 456)\n" +
" | \n" +
" 3:OlapScanNode");

// simple expression: do not pushdown
starRocksAssert.query("select t0.v1, t0.v2, " +
"tarray.v1 as c1, 1 + tarray.v1 as c2\n" +
"from t0 join tarray on(t0.v1 = tarray.v1) ")
.explainContains(" UNPARTITIONED\n" +
"\n" +
" 1:OlapScanNode\n" +
" TABLE: tarray", "-2:EXCHANGE\n" +
" | \n" +
" 0:OlapScanNode\n" +
" TABLE: t0");
}
}

0 comments on commit e3c4f98

Please sign in to comment.