Skip to content

Commit

Permalink
new distribute planner
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Jun 19, 2024
1 parent 695743b commit b647c8c
Show file tree
Hide file tree
Showing 68 changed files with 3,979 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ planType
| OPTIMIZED | PHYSICAL // same type
| SHAPE
| MEMO
| DISTRIBUTED
| ALL // default type
;

Expand Down
7 changes: 6 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/common/Id.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/**
* Integer ids that cannot accidentally be compared with ints.
*/
public class Id<IdType extends Id<IdType>> {
public class Id<IdType extends Id<IdType>> implements Comparable<Id<IdType>> {
protected final int id;

public Id(int id) {
Expand Down Expand Up @@ -62,4 +62,9 @@ public ArrayList<IdType> asList() {
public String toString() {
return Integer.toString(id);
}

@Override
public int compareTo(Id<IdType> idTypeId) {
return id - idTypeId.id;
}
}
25 changes: 25 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/common/TreeNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,29 @@ public <C extends NodeType> C findFirstOf(Class<C> cl) {
return null;
}

/** anyMatch */
public boolean anyMatch(Predicate<TreeNode<? extends NodeType>> func) {
if (func.apply(this)) {
return true;
}

for (NodeType child : children) {
if (child.anyMatch(func)) {
return true;
}
}
return false;
}


/** foreachDown */
public void foreachDown(Predicate<TreeNode<NodeType>> visitor) {
if (!visitor.test(this)) {
return;
}

for (TreeNode<NodeType> child : getChildren()) {
child.foreachDown(visitor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ public class SummaryProfile {
private long nereidsRewriteFinishTime = -1;
private long nereidsOptimizeFinishTime = -1;
private long nereidsTranslateFinishTime = -1;
private long nereidsDistributeFinishTime = -1;
// timestamp of query begin
private long queryBeginTime = -1;
// Analysis end time
Expand Down Expand Up @@ -421,6 +422,10 @@ public void setNereidsTranslateTime() {
this.nereidsTranslateFinishTime = TimeUtils.getStartTimeMs();
}

public void setNereidsDistributeTime() {
this.nereidsDistributeFinishTime = TimeUtils.getStartTimeMs();
}

public void setQueryBeginTime() {
this.queryBeginTime = TimeUtils.getStartTimeMs();
}
Expand Down Expand Up @@ -661,6 +666,10 @@ public String getPrettyNereidsTranslateTime() {
return getPrettyTime(nereidsTranslateFinishTime, nereidsOptimizeFinishTime, TUnit.TIME_MS);
}

public String getPrettyNereidsDistributeTime() {
return getPrettyTime(nereidsDistributeFinishTime, nereidsTranslateFinishTime, TUnit.TIME_MS);
}

private String getPrettyGetPartitionVersionTime() {
if (getPartitionVersionTime == 0) {
return "N/A";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
Expand All @@ -70,6 +73,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.ResultSetMetaData;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.cache.CacheAnalyzer;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -102,6 +106,7 @@ public class NereidsPlanner extends Planner {
private Plan rewrittenPlan;
private Plan optimizedPlan;
private PhysicalPlan physicalPlan;
private FragmentIdMapping<DistributedPlan> distributedPlans;
// The cost of optimized plan
private double cost = 0;
private LogicalPlanAdapter logicalPlanAdapter;
Expand Down Expand Up @@ -129,17 +134,16 @@ public void plan(StatementBase queryStmt, org.apache.doris.thrift.TQueryOptions
LogicalPlan parsedPlan = logicalPlanAdapter.getLogicalPlan();
NereidsTracer.logImportantTime("EndParsePlan");
setParsedPlan(parsedPlan);

PhysicalProperties requireProperties = buildInitRequireProperties();
statementContext.getStopwatch().start();
boolean showPlanProcess = showPlanProcess(queryStmt.getExplainOptions());
Plan resultPlan = plan(parsedPlan, requireProperties, explainLevel, showPlanProcess);
statementContext.getStopwatch().stop();
setOptimizedPlan(resultPlan);
if (explainLevel.isPlanLevel) {
return;
}

physicalPlan = (PhysicalPlan) resultPlan;
translate(physicalPlan);
distribute(physicalPlan);
}

@VisibleForTesting
Expand Down Expand Up @@ -314,7 +318,7 @@ private void optimize() {
}
}

private void translate(PhysicalPlan resultPlan) throws UserException {
private void splitFragments(PhysicalPlan resultPlan) throws UserException {
if (resultPlan instanceof PhysicalSqlCache) {
return;
}
Expand Down Expand Up @@ -359,6 +363,18 @@ private void translate(PhysicalPlan resultPlan) throws UserException {
ScanNode.setVisibleVersionForOlapScanNodes(getScanNodes());
}

private void distribute(PhysicalPlan physicalPlan) throws UserException {
splitFragments(physicalPlan);

if (!SessionVariable.canUseNereidsDistributePlanner()) {
return;
}
distributedPlans = new DistributePlanner(fragments).plan();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsDistributeTime();
}
}

private PhysicalPlan postProcess(PhysicalPlan physicalPlan) {
return new PlanPostProcessors(cascadesContext).process(physicalPlan);
}
Expand Down Expand Up @@ -490,6 +506,17 @@ public String getExplainString(ExplainOptions explainOptions) {
+ "\n\n========== MATERIALIZATIONS ==========\n"
+ materializationStringBuilder;
break;
case DISTRIBUTED_PLAN:
StringBuilder distributedPlanStringBuilder = new StringBuilder();

distributedPlanStringBuilder.append("========== DISTRIBUTED PLAN ==========\n");
if (distributedPlans == null || distributedPlans.isEmpty()) {
plan = "Distributed plan not generated, please set enable_nereids_distribute_planner "
+ "and enable_pipeline_x_engine to true";
} else {
plan += DistributedPlan.toString(Lists.newArrayList(distributedPlans.values())) + "\n\n";
}
break;
case ALL_PLAN:
plan = "========== PARSED PLAN "
+ getTimeMetricString(SummaryProfile::getPrettyParseSqlTime) + " ==========\n"
Expand All @@ -502,7 +529,13 @@ public String getExplainString(ExplainOptions explainOptions) {
+ rewrittenPlan.treeString() + "\n\n"
+ "========== OPTIMIZED PLAN "
+ getTimeMetricString(SummaryProfile::getPrettyNereidsOptimizeTime) + " ==========\n"
+ optimizedPlan.treeString();
+ optimizedPlan.treeString() + "\n\n";

if (distributedPlans != null && !distributedPlans.isEmpty()) {
plan += "========== DISTRIBUTED PLAN "
+ getTimeMetricString(SummaryProfile::getPrettyNereidsDistributeTime) + " ==========\n";
plan += DistributedPlan.toString(Lists.newArrayList(distributedPlans.values())) + "\n\n";
}
break;
default:
plan = super.getExplainString(explainOptions)
Expand Down Expand Up @@ -673,6 +706,10 @@ public PhysicalPlan getPhysicalPlan() {
return physicalPlan;
}

public FragmentIdMapping<DistributedPlan> getDistributedPlans() {
return distributedPlans;
}

public LogicalPlanAdapter getLogicalPlanAdapter() {
return logicalPlanAdapter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3417,6 +3417,9 @@ private ExplainLevel parseExplainPlanType(PlanTypeContext planTypeContext) {
if (planTypeContext.MEMO() != null) {
return ExplainLevel.MEMO_PLAN;
}
if (planTypeContext.DISTRIBUTED() != null) {
return ExplainLevel.DISTRIBUTED_PLAN;
}
return ExplainLevel.ALL_PLAN;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.JoinUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -217,8 +218,8 @@ private boolean couldNotRightBucketShuffleJoin(JoinType joinType, DistributionSp
}

@Override
public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin,
Void context) {
public Boolean visitPhysicalHashJoin(
PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin, Void context) {
Preconditions.checkArgument(children.size() == 2, "children.size() != 2");
Preconditions.checkArgument(childrenProperties.size() == 2);
Preconditions.checkArgument(requiredProperties.size() == 2);
Expand Down Expand Up @@ -248,7 +249,8 @@ public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) {
// check colocate join with scan
return true;
} else if (couldNotRightBucketShuffleJoin(hashJoin.getJoinType(), leftHashSpec, rightHashSpec)) {
} else if (couldNotRightBucketShuffleJoin(hashJoin.getJoinType(), leftHashSpec, rightHashSpec)
&& !SessionVariable.canUseNereidsDistributePlanner()) {
// right anti, right outer, full outer join could not do bucket shuffle join
// TODO remove this after we refactor coordinator
updatedForLeft = Optional.of(calAnotherSideRequired(
Expand Down Expand Up @@ -302,7 +304,8 @@ public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
} else if (leftHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED
&& rightHashSpec.getShuffleType() == ShuffleType.NATURAL) {
&& rightHashSpec.getShuffleType() == ShuffleType.NATURAL
&& !SessionVariable.canUseNereidsDistributePlanner()) {
// TODO: we must do shuffle on right because coordinator could not do right be selection in this case,
// since it always to check the left most node whether olap scan node.
// after we fix coordinator problem, we could do right to left bucket shuffle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.apache.doris.catalog.Type;
import org.apache.doris.common.NereidsException;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.BigIntType;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.ColumnStatisticBuilder;
import org.apache.doris.statistics.Statistics;
Expand Down Expand Up @@ -93,6 +95,14 @@ public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitNumbers(this, context);
}

@Override
public PhysicalProperties getPhysicalProperties() {
if (SessionVariable.canUseNereidsDistributePlanner()) {
return PhysicalProperties.ANY;
}
return super.getPhysicalProperties();
}

@Override
public Numbers withChildren(List<Expression> children) {
Preconditions.checkArgument(children().size() == 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
/**
* placeholder id for prepared statement parameters
*/
public class PlaceholderId extends Id<PlaceholderId> implements Comparable<PlaceholderId> {
public class PlaceholderId extends Id<PlaceholderId> {

public PlaceholderId(int id) {
super(id);
Expand Down Expand Up @@ -55,9 +55,4 @@ public boolean equals(Object obj) {
public int hashCode() {
return super.hashCode();
}

@Override
public int compareTo(PlaceholderId o) {
return this.id - o.id;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public enum ExplainLevel {
OPTIMIZED_PLAN(true),
SHAPE_PLAN(true),
MEMO_PLAN(true),
DISTRIBUTED_PLAN(true),
ALL_PLAN(true)
;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.distribute;

import org.apache.doris.nereids.worker.job.AssignedJob;
import org.apache.doris.nereids.worker.job.AssignedJobBuilder;
import org.apache.doris.nereids.worker.job.UnassignedJob;
import org.apache.doris.nereids.worker.job.UnassignedJobBuilder;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;

import com.google.common.collect.ListMultimap;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;

/** DistributePlanner */
public class DistributePlanner {
private final List<PlanFragment> fragments;
private final FragmentIdMapping<PlanFragment> idToFragments;

public DistributePlanner(List<PlanFragment> fragments) {
this.fragments = Objects.requireNonNull(fragments, "fragments can not be null");
this.idToFragments = FragmentIdMapping.buildFragmentMapping(fragments);
}

public FragmentIdMapping<DistributedPlan> plan() {
FragmentIdMapping<UnassignedJob> fragmentJobs = UnassignedJobBuilder.buildJobs(idToFragments);
ListMultimap<PlanFragmentId, AssignedJob> instanceJobs = AssignedJobBuilder.buildJobs(fragmentJobs);
return buildDistributePlans(fragmentJobs, instanceJobs);
}

private FragmentIdMapping<DistributedPlan> buildDistributePlans(
Map<PlanFragmentId, UnassignedJob> idToUnassignedJobs,
ListMultimap<PlanFragmentId, AssignedJob> idToAssignedJobs) {
FragmentIdMapping<PipelineDistributedPlan> idToDistributedPlans = new FragmentIdMapping<>();
for (Entry<PlanFragmentId, PlanFragment> kv : idToFragments.entrySet()) {
PlanFragmentId fragmentId = kv.getKey();
PlanFragment fragment = kv.getValue();

UnassignedJob fragmentJob = idToUnassignedJobs.get(fragmentId);
List<AssignedJob> instanceJobs = idToAssignedJobs.get(fragmentId);

List<PipelineDistributedPlan> childrenPlans = idToDistributedPlans.getByChildrenFragments(fragment);
idToDistributedPlans.put(fragmentId, new PipelineDistributedPlan(fragmentJob, instanceJobs, childrenPlans));
}
return (FragmentIdMapping) idToDistributedPlans;
}
}
Loading

0 comments on commit b647c8c

Please sign in to comment.