Skip to content

Commit

Permalink
new scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Oct 29, 2024
1 parent 518b712 commit 51ce209
Show file tree
Hide file tree
Showing 162 changed files with 4,388 additions and 372 deletions.
29 changes: 27 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,21 @@
import org.apache.doris.load.loadv2.LoadJobScheduler;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.load.routineload.RoutineLoadManager;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.NereidsCoordinator;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.GlobalTransactionMgr;
Expand Down Expand Up @@ -134,15 +141,33 @@ public BrokerLoadJob createBrokerLoadJob() {

public Coordinator createCoordinator(ConnectContext context, Analyzer analyzer, Planner planner,
StatsErrorEstimator statsErrorEstimator) {
if (planner instanceof NereidsPlanner && SessionVariable.canUseNereidsDistributePlanner()) {
return new NereidsCoordinator(context, analyzer, (NereidsPlanner) planner, statsErrorEstimator);
}
return new Coordinator(context, analyzer, planner, statsErrorEstimator);
}

// Used for broker load task/export task/update coordinator
public Coordinator createCoordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable,
List<PlanFragment> fragments, List<ScanNode> scanNodes,
String timezone, boolean loadZeroTolerance, boolean enableProfile) {
return new Coordinator(jobId, queryId, descTable, fragments, scanNodes, timezone, loadZeroTolerance,
enableProfile);
if (SessionVariable.canUseNereidsDistributePlanner()) {
ConnectContext connectContext = new ConnectContext();
connectContext.setQueryId(queryId);
StatementContext statementContext = new StatementContext(
connectContext, new OriginStatement("", 0)
);
DistributePlanner distributePlanner = new DistributePlanner(statementContext, fragments);
FragmentIdMapping<DistributedPlan> distributedPlans = distributePlanner.plan();

return new NereidsCoordinator(
jobId, queryId, descTable, fragments, distributedPlans.valueList(),
scanNodes, timezone, loadZeroTolerance, enableProfile
);
}
return new Coordinator(
jobId, queryId, descTable, fragments, scanNodes, timezone, loadZeroTolerance, enableProfile
);
}

public GroupCommitPlanner createGroupCommitPlanner(Database db, OlapTable table, List<String> targetColumnNames,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public class SummaryProfile {
NEREIDS_REWRITE_TIME,
NEREIDS_OPTIMIZE_TIME,
NEREIDS_TRANSLATE_TIME,
NEREIDS_DISTRIBUTE_TIME,
WORKLOAD_GROUP,
ANALYSIS_TIME,
PLAN_TIME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public Plan planWithLock(LogicalPlan plan, PhysicalProperties requireProperties,
}
}

private Plan planWithoutLock(
protected Plan planWithoutLock(
LogicalPlan plan, ExplainLevel explainLevel,
boolean showPlanProcess, PhysicalProperties requireProperties) {
// resolve column, table and function
Expand Down Expand Up @@ -311,7 +311,7 @@ private Plan planWithoutLock(
return physicalPlan;
}

private LogicalPlan preprocess(LogicalPlan logicalPlan) {
protected LogicalPlan preprocess(LogicalPlan logicalPlan) {
return new PlanPreprocessors(statementContext).process(logicalPlan);
}

Expand All @@ -322,7 +322,7 @@ private void initCascadesContext(LogicalPlan plan, PhysicalProperties requirePro
}
}

private void analyze(boolean showPlanProcess) {
protected void analyze(boolean showPlanProcess) {
if (LOG.isDebugEnabled()) {
LOG.debug("Start analyze plan");
}
Expand All @@ -337,7 +337,7 @@ private void analyze(boolean showPlanProcess) {
/**
* Logical plan rewrite based on a series of heuristic rules.
*/
private void rewrite(boolean showPlanProcess) {
protected void rewrite(boolean showPlanProcess) {
if (LOG.isDebugEnabled()) {
LOG.debug("Start rewrite plan");
}
Expand All @@ -349,7 +349,7 @@ private void rewrite(boolean showPlanProcess) {
}

// DependsRules: EnsureProjectOnTopJoin.class
private void optimize() {
protected void optimize() {
if (LOG.isDebugEnabled()) {
LOG.debug("Start optimize plan");
}
Expand All @@ -360,7 +360,7 @@ private void optimize() {
}
}

private void splitFragments(PhysicalPlan resultPlan) {
protected void splitFragments(PhysicalPlan resultPlan) {
if (resultPlan instanceof PhysicalSqlCache) {
return;
}
Expand Down Expand Up @@ -455,7 +455,7 @@ private void splitFragments(PhysicalPlan resultPlan) {
}
}

private void distribute(PhysicalPlan physicalPlan, ExplainLevel explainLevel) {
protected void distribute(PhysicalPlan physicalPlan, ExplainLevel explainLevel) {
boolean canUseNereidsDistributePlanner = SessionVariable.canUseNereidsDistributePlanner();
if ((!canUseNereidsDistributePlanner && explainLevel.isPlanLevel)) {
return;
Expand All @@ -465,18 +465,21 @@ private void distribute(PhysicalPlan physicalPlan, ExplainLevel explainLevel) {
}

splitFragments(physicalPlan);
doDistribute(canUseNereidsDistributePlanner);
}

protected void doDistribute(boolean canUseNereidsDistributePlanner) {
if (!canUseNereidsDistributePlanner) {
return;
}

distributedPlans = new DistributePlanner(fragments).plan();
distributedPlans = new DistributePlanner(statementContext, fragments).plan();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsDistributeTime();
}
}

private PhysicalPlan postProcess(PhysicalPlan physicalPlan) {
protected PhysicalPlan postProcess(PhysicalPlan physicalPlan) {
return new PlanPostProcessors(cascadesContext).process(physicalPlan);
}

Expand Down Expand Up @@ -735,6 +738,10 @@ public CascadesContext getCascadesContext() {
return cascadesContext;
}

public ConnectContext getConnectContext() {
return cascadesContext.getConnectContext();
}

public static PhysicalProperties buildInitRequireProperties() {
return PhysicalProperties.GATHER;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,19 @@
import org.apache.doris.qe.ShortCircuitQueryContext;
import org.apache.doris.qe.cache.CacheAnalyzer;
import org.apache.doris.statistics.Statistics;
import org.apache.doris.system.Backend;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.sparkproject.guava.base.Throwables;

import java.io.Closeable;
import java.util.ArrayList;
Expand Down Expand Up @@ -171,6 +172,8 @@ public class StatementContext implements Closeable {

private String disableJoinReorderReason;

private Backend groupCommitMergeBackend;

public StatementContext() {
this(ConnectContext.get(), null, 0);
}
Expand Down Expand Up @@ -568,4 +571,13 @@ public Optional<String> getDisableJoinReorderReason() {
public void setDisableJoinReorderReason(String disableJoinReorderReason) {
this.disableJoinReorderReason = disableJoinReorderReason;
}

public Backend getGroupCommitMergeBackend() {
return groupCommitMergeBackend;
}

public void setGroupCommitMergeBackend(
Backend groupCommitMergeBackend) {
this.groupCommitMergeBackend = groupCommitMergeBackend;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ private void ensureChildrenRewritten() {

// some rule return new plan tree, which the number of new plan node > 1,
// we should transform this new plan nodes too.
// NOTICE: this relay on pull up cte anchor
if (isTraverseChildren.test(plan)) {
pushChildrenJobs(plan);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ protected final RewriteResult rewrite(Plan plan, List<Rule> rules, RewriteJobCon
}
Plan newPlan = newPlans.get(0);
if (!newPlan.deepEquals(plan)) {
// don't remove this comment, it can help us to trace some bug when developing.

NereidsTracer.logRewriteEvent(rule.toString(), pattern, plan, newPlan);
String traceBefore = null;
if (showPlanProcess) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public void execute() {
RewriteJobContext newRewriteJobContext = rewriteJobContext.withChildrenVisited(true);
pushJob(new PlanTreeRewriteTopDownJob(newRewriteJobContext, context, isTraverseChildren, rules));

// NOTICE: this relay on pull up cte anchor
if (isTraverseChildren.test(rewriteJobContext.plan)) {
pushChildrenJobs(newRewriteJobContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,12 +505,15 @@ private PhysicalProperties computeShuffleJoinOutputProperties(

switch (hashJoin.getJoinType()) {
case INNER_JOIN:
case CROSS_JOIN:
if (shuffleSide == ShuffleSide.LEFT) {
return new PhysicalProperties(DistributionSpecHash.merge(
rightHashSpec, leftHashSpec, outputShuffleType));
return new PhysicalProperties(
DistributionSpecHash.merge(rightHashSpec, leftHashSpec, outputShuffleType)
);
} else {
return new PhysicalProperties(DistributionSpecHash.merge(
leftHashSpec, rightHashSpec, outputShuffleType));
return new PhysicalProperties(
DistributionSpecHash.merge(leftHashSpec, rightHashSpec, outputShuffleType)
);
}
case LEFT_SEMI_JOIN:
case LEFT_ANTI_JOIN:
Expand All @@ -526,12 +529,13 @@ private PhysicalProperties computeShuffleJoinOutputProperties(
case RIGHT_SEMI_JOIN:
case RIGHT_ANTI_JOIN:
case RIGHT_OUTER_JOIN:
if (shuffleSide == ShuffleSide.RIGHT) {
return new PhysicalProperties(
rightHashSpec.withShuffleTypeAndForbidColocateJoin(outputShuffleType)
);
} else {
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec, hashJoin.getHashJoinConjuncts())) {
return new PhysicalProperties(rightHashSpec);
} else {
// retain left shuffle type, since coordinator use left most node to schedule fragment
// forbid colocate join, since right table already shuffle
return new PhysicalProperties(rightHashSpec.withShuffleTypeAndForbidColocateJoin(
leftHashSpec.getShuffleType()));
}
case FULL_OUTER_JOIN:
return PhysicalProperties.createAnyFromHash(leftHashSpec, rightHashSpec);
Expand Down Expand Up @@ -563,6 +567,9 @@ private ShuffleSide computeShuffleSide(DistributionSpecHash leftHashSpec, Distri
case STORAGE_BUCKETED:
// use storage hash to shuffle right to left to do bucket shuffle join
return ShuffleSide.RIGHT;
case EXECUTION_BUCKETED:
// compatible old ut
return ShuffleSide.RIGHT;
default:
}
break;
Expand Down
Loading

0 comments on commit 51ce209

Please sign in to comment.