Skip to content

Commit

Permalink
pass through StatementContext
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Oct 25, 2024
1 parent 9d8a411 commit 10db0f8
Show file tree
Hide file tree
Showing 15 changed files with 83 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ protected void doDistribute(boolean canUseNereidsDistributePlanner) {
return;
}

distributedPlans = new DistributePlanner(this, fragments).plan();
distributedPlans = new DistributePlanner(statementContext, fragments).plan();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsDistributeTime();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
package org.apache.doris.nereids.trees.plans.distribute;

import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
import org.apache.doris.nereids.trees.plans.distribute.worker.DummyWorker;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
Expand Down Expand Up @@ -64,28 +63,26 @@
/** DistributePlanner */
public class DistributePlanner {
private static final Logger LOG = LogManager.getLogger(DistributePlanner.class);
private final NereidsPlanner planner;
private final CascadesContext cascadesContext;
private final StatementContext statementContext;
private final FragmentIdMapping<PlanFragment> idToFragments;

public DistributePlanner(NereidsPlanner planner, List<PlanFragment> fragments) {
this.planner = Objects.requireNonNull(planner, "planner can not be null");
this.cascadesContext = planner.getCascadesContext();
public DistributePlanner(StatementContext statementContext, List<PlanFragment> fragments) {
this.statementContext = Objects.requireNonNull(statementContext, "statementContext can not be null");
this.idToFragments = FragmentIdMapping.buildFragmentMapping(fragments);
}

/** plan */
public FragmentIdMapping<DistributedPlan> plan() {
try {
FragmentIdMapping<UnassignedJob> fragmentJobs = UnassignedJobBuilder.buildJobs(planner, idToFragments);
FragmentIdMapping<UnassignedJob> fragmentJobs
= UnassignedJobBuilder.buildJobs(statementContext, idToFragments);
ListMultimap<PlanFragmentId, AssignedJob> instanceJobs = AssignedJobBuilder.buildJobs(fragmentJobs);
FragmentIdMapping<DistributedPlan> distributedPlans = buildDistributePlans(fragmentJobs, instanceJobs);
FragmentIdMapping<DistributedPlan> linkedPlans = linkPlans(distributedPlans);
updateProfileIfPresent(SummaryProfile::setAssignFragmentTime);
return linkedPlans;
} catch (Throwable t) {
LOG.error("Failed to build distribute plans.\nPlan:\n"
+ planner.getOptimizedPlan().treeString(), t);
LOG.error("Failed to build distribute plans", t);
throw t;
}
}
Expand Down Expand Up @@ -127,7 +124,7 @@ private FragmentIdMapping<DistributedPlan> buildDistributePlans(
}

private FragmentIdMapping<DistributedPlan> linkPlans(FragmentIdMapping<DistributedPlan> plans) {
boolean enableShareHashTableForBroadcastJoin = cascadesContext.getConnectContext()
boolean enableShareHashTableForBroadcastJoin = statementContext.getConnectContext()
.getSessionVariable()
.enableShareHashTableForBroadcastJoin;
for (DistributedPlan receiverPlan : plans.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.doris.nereids.trees.plans.distribute.worker.job;

import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.AbstractTreeNode;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.planner.ExchangeNode;
Expand All @@ -32,16 +32,16 @@
/** AbstractUnassignedJob */
public abstract class AbstractUnassignedJob
extends AbstractTreeNode<UnassignedJob> implements UnassignedJob {
protected final NereidsPlanner planner;
protected final StatementContext statementContext;
protected final PlanFragment fragment;
protected final List<ScanNode> scanNodes;
protected final ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob;

/** AbstractUnassignedJob */
public AbstractUnassignedJob(NereidsPlanner planner, PlanFragment fragment,
public AbstractUnassignedJob(StatementContext statementContext, PlanFragment fragment,
List<ScanNode> scanNodes, ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob) {
super(Utils.fastToImmutableList(exchangeToChildJob.values()));
this.planner = Objects.requireNonNull(planner, "planner can not be null");
this.statementContext = Objects.requireNonNull(statementContext, "statementContext can not be null");
this.fragment = Objects.requireNonNull(fragment, "fragment can not be null");
this.scanNodes = Utils.fastToImmutableList(
Objects.requireNonNull(scanNodes, "scanNodes can not be null")
Expand All @@ -51,8 +51,8 @@ public AbstractUnassignedJob(NereidsPlanner planner, PlanFragment fragment,
}

@Override
public NereidsPlanner getPlanner() {
return planner;
public StatementContext getStatementContext() {
return statementContext;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.doris.nereids.trees.plans.distribute.worker.job;

import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorkerManager;
import org.apache.doris.planner.ExchangeNode;
Expand All @@ -40,9 +40,9 @@
public abstract class AbstractUnassignedScanJob extends AbstractUnassignedJob {
protected final AtomicInteger shareScanIdGenerator = new AtomicInteger();

public AbstractUnassignedScanJob(NereidsPlanner planner, PlanFragment fragment,
public AbstractUnassignedScanJob(StatementContext statementContext, PlanFragment fragment,
List<ScanNode> scanNodes, ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob) {
super(planner, fragment, scanNodes, exchangeToChildJob);
super(statementContext, fragment, scanNodes, exchangeToChildJob);
}

@Override
Expand Down Expand Up @@ -71,7 +71,7 @@ protected List<AssignedJob> insideMachineParallelization(
Map<DistributedPlanWorker, UninstancedScanSource> workerToScanRanges,
ListMultimap<ExchangeNode, AssignedJob> inputJobs, DistributedPlanWorkerManager workerManager) {

ConnectContext context = planner.getCascadesContext().getConnectContext();
ConnectContext context = statementContext.getConnectContext();
boolean useLocalShuffleToAddParallel = useLocalShuffleToAddParallel(workerToScanRanges);
int instanceIndexInFragment = 0;
List<AssignedJob> instances = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.doris.nereids.trees.plans.distribute.worker.job;

import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorkerManager;
import org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector;
Expand All @@ -39,9 +39,9 @@
/** UnassignedGatherScanMultiRemoteTablesJob */
public class UnassignedGatherScanMultiRemoteTablesJob extends AbstractUnassignedJob {

public UnassignedGatherScanMultiRemoteTablesJob(NereidsPlanner planner, PlanFragment fragment,
public UnassignedGatherScanMultiRemoteTablesJob(StatementContext statementContext, PlanFragment fragment,
List<ScanNode> scanNodes, ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob) {
super(planner, fragment, scanNodes, exchangeToChildJob);
super(statementContext, fragment, scanNodes, exchangeToChildJob);
}

/** canApply */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.doris.nereids.trees.plans.distribute.worker.job;

import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker;
import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorkerManager;
import org.apache.doris.planner.ExchangeNode;
Expand All @@ -33,19 +32,17 @@

/** UnassignedGroupCommitJob */
public class UnassignedGroupCommitJob extends AbstractUnassignedJob {
public UnassignedGroupCommitJob(NereidsPlanner planner,
public UnassignedGroupCommitJob(StatementContext statementContext,
PlanFragment fragment, List<ScanNode> scanNodes,
ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob) {
super(planner, fragment, scanNodes, exchangeToChildJob);
super(statementContext, fragment, scanNodes, exchangeToChildJob);
}

@Override
public List<AssignedJob> computeAssignedJobs(
DistributedPlanWorkerManager workerManager, ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
CascadesContext cascadesContext = planner.getCascadesContext();
TUniqueId instanceId = cascadesContext.getConnectContext().nextInstanceId();
BackendWorker selectBackend = new BackendWorker(
cascadesContext.getStatementContext().getGroupCommitMergeBackend());
TUniqueId instanceId = statementContext.getConnectContext().nextInstanceId();
BackendWorker selectBackend = new BackendWorker(statementContext.getGroupCommitMergeBackend());
return ImmutableList.of(
new StaticAssignedJob(
0, instanceId, this, selectBackend, DefaultScanSource.empty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.doris.nereids.trees.plans.distribute.worker.job;

import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.TreeNode;
import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorkerManager;
Expand All @@ -35,7 +35,7 @@
* for example: a fragment job, which doesn't parallelization to some instance jobs and also no worker to invoke it
*/
public interface UnassignedJob extends TreeNode<UnassignedJob> {
NereidsPlanner getPlanner();
StatementContext getStatementContext();

PlanFragment getFragment();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.doris.nereids.trees.plans.distribute.worker.job;

import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.nereids.trees.plans.distribute.worker.LoadBalanceScanWorkerSelector;
import org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector;
Expand Down Expand Up @@ -53,7 +53,7 @@ public class UnassignedJobBuilder {
* build job from fragment.
*/
public static FragmentIdMapping<UnassignedJob> buildJobs(
NereidsPlanner planner, FragmentIdMapping<PlanFragment> fragments) {
StatementContext statementContext, FragmentIdMapping<PlanFragment> fragments) {
UnassignedJobBuilder builder = new UnassignedJobBuilder();

FragmentLineage fragmentLineage = buildFragmentLineage(fragments);
Expand All @@ -70,31 +70,31 @@ public static FragmentIdMapping<UnassignedJob> buildJobs(

ListMultimap<ExchangeNode, UnassignedJob> inputJobs = findInputJobs(
fragmentLineage, fragmentId, unassignedJobs);
UnassignedJob unassignedJob = builder.buildJob(planner, fragment, inputJobs, isTopFragment);
UnassignedJob unassignedJob = builder.buildJob(statementContext, fragment, inputJobs, isTopFragment);
unassignedJobs.put(fragmentId, unassignedJob);
}

return unassignedJobs;
}

private UnassignedJob buildJob(
NereidsPlanner planner, PlanFragment planFragment,
StatementContext statementContext, PlanFragment planFragment,
ListMultimap<ExchangeNode, UnassignedJob> inputJobs, boolean isTopFragment) {
List<ScanNode> scanNodes = collectScanNodesInThisFragment(planFragment);
if (planFragment.specifyInstances.isPresent()) {
return buildSpecifyInstancesJob(planner, planFragment, scanNodes, inputJobs);
return buildSpecifyInstancesJob(statementContext, planFragment, scanNodes, inputJobs);
} else if (scanNodes.isEmpty() && isTopFragment
&& planner.getCascadesContext().getStatementContext().getGroupCommitMergeBackend() != null) {
return new UnassignedGroupCommitJob(planner, planFragment, scanNodes, inputJobs);
&& statementContext.getGroupCommitMergeBackend() != null) {
return new UnassignedGroupCommitJob(statementContext, planFragment, scanNodes, inputJobs);
} else if (!scanNodes.isEmpty() || isLeafFragment(planFragment)) {
return buildLeafOrScanJob(planner, planFragment, scanNodes, inputJobs);
return buildLeafOrScanJob(statementContext, planFragment, scanNodes, inputJobs);
} else {
return buildShuffleJob(planner, planFragment, inputJobs);
return buildShuffleJob(statementContext, planFragment, inputJobs);
}
}

private UnassignedJob buildLeafOrScanJob(
NereidsPlanner planner, PlanFragment planFragment, List<ScanNode> scanNodes,
StatementContext statementContext, PlanFragment planFragment, List<ScanNode> scanNodes,
ListMultimap<ExchangeNode, UnassignedJob> inputJobs) {
int olapScanNodeNum = olapScanNodeNum(scanNodes);

Expand All @@ -104,24 +104,24 @@ private UnassignedJob buildLeafOrScanJob(
// so that the OlapScanNode can find the data in the backend
// e.g. select * from olap_table
unassignedJob = buildScanOlapTableJob(
planner, planFragment, (List) scanNodes, inputJobs, scanWorkerSelector
statementContext, planFragment, (List) scanNodes, inputJobs, scanWorkerSelector
);
} else if (scanNodes.isEmpty()) {
// select constant without table,
// e.g. select 100 union select 200
unassignedJob = buildQueryConstantJob(planner, planFragment);
unassignedJob = buildQueryConstantJob(statementContext, planFragment);
} else if (olapScanNodeNum == 0) {
ScanNode scanNode = scanNodes.get(0);
if (scanNode instanceof SchemaScanNode) {
// select * from information_schema.tables
unassignedJob = buildScanMetadataJob(
planner, planFragment, (SchemaScanNode) scanNode, scanWorkerSelector
statementContext, planFragment, (SchemaScanNode) scanNode, scanWorkerSelector
);
} else {
// only scan external tables or cloud tables or table valued functions
// e,g. select * from numbers('number'='100')
unassignedJob = buildScanRemoteTableJob(
planner, planFragment, scanNodes, inputJobs, scanWorkerSelector
statementContext, planFragment, scanNodes, inputJobs, scanWorkerSelector
);
}
}
Expand All @@ -135,21 +135,21 @@ private UnassignedJob buildLeafOrScanJob(
}

private UnassignedJob buildSpecifyInstancesJob(
NereidsPlanner planner, PlanFragment planFragment,
StatementContext statementContext, PlanFragment planFragment,
List<ScanNode> scanNodes, ListMultimap<ExchangeNode, UnassignedJob> inputJobs) {
return new UnassignedSpecifyInstancesJob(planner, planFragment, scanNodes, inputJobs);
return new UnassignedSpecifyInstancesJob(statementContext, planFragment, scanNodes, inputJobs);
}

private UnassignedJob buildScanOlapTableJob(
NereidsPlanner planner, PlanFragment planFragment, List<OlapScanNode> olapScanNodes,
StatementContext statementContext, PlanFragment planFragment, List<OlapScanNode> olapScanNodes,
ListMultimap<ExchangeNode, UnassignedJob> inputJobs,
ScanWorkerSelector scanWorkerSelector) {
if (shouldAssignByBucket(planFragment)) {
return new UnassignedScanBucketOlapTableJob(
planner, planFragment, olapScanNodes, inputJobs, scanWorkerSelector);
statementContext, planFragment, olapScanNodes, inputJobs, scanWorkerSelector);
} else if (olapScanNodes.size() == 1) {
return new UnassignedScanSingleOlapTableJob(
planner, planFragment, olapScanNodes.get(0), inputJobs, scanWorkerSelector);
statementContext, planFragment, olapScanNodes.get(0), inputJobs, scanWorkerSelector);
} else {
throw new IllegalStateException("Not supported multiple scan multiple "
+ "OlapTable but not contains colocate join or bucket shuffle join: "
Expand All @@ -176,35 +176,36 @@ private boolean isLeafFragment(PlanFragment planFragment) {
}

private UnassignedQueryConstantJob buildQueryConstantJob(
NereidsPlanner nereidsPlanner, PlanFragment planFragment) {
return new UnassignedQueryConstantJob(nereidsPlanner, planFragment);
StatementContext statementContext, PlanFragment planFragment) {
return new UnassignedQueryConstantJob(statementContext, planFragment);
}

private UnassignedJob buildScanMetadataJob(
NereidsPlanner planner, PlanFragment fragment,
StatementContext statementContext, PlanFragment fragment,
SchemaScanNode schemaScanNode, ScanWorkerSelector scanWorkerSelector) {
return new UnassignedScanMetadataJob(planner, fragment, schemaScanNode, scanWorkerSelector);
return new UnassignedScanMetadataJob(statementContext, fragment, schemaScanNode, scanWorkerSelector);
}

private UnassignedJob buildScanRemoteTableJob(
NereidsPlanner planner, PlanFragment planFragment, List<ScanNode> scanNodes,
StatementContext statementContext, PlanFragment planFragment, List<ScanNode> scanNodes,
ListMultimap<ExchangeNode, UnassignedJob> inputJobs,
ScanWorkerSelector scanWorkerSelector) {
if (scanNodes.size() == 1) {
return new UnassignedScanSingleRemoteTableJob(
planner, planFragment, scanNodes.get(0), inputJobs, scanWorkerSelector);
statementContext, planFragment, scanNodes.get(0), inputJobs, scanWorkerSelector);
} else if (UnassignedGatherScanMultiRemoteTablesJob.canApply(scanNodes)) {
// select * from numbers("number" = "10") a union all select * from numbers("number" = "20") b;
// use an instance to scan table a and table b
return new UnassignedGatherScanMultiRemoteTablesJob(planner, planFragment, scanNodes, inputJobs);
return new UnassignedGatherScanMultiRemoteTablesJob(statementContext, planFragment, scanNodes, inputJobs);
} else {
return null;
}
}

private UnassignedShuffleJob buildShuffleJob(
NereidsPlanner planner, PlanFragment planFragment, ListMultimap<ExchangeNode, UnassignedJob> inputJobs) {
return new UnassignedShuffleJob(planner, planFragment, inputJobs);
StatementContext statementContext, PlanFragment planFragment,
ListMultimap<ExchangeNode, UnassignedJob> inputJobs) {
return new UnassignedShuffleJob(statementContext, planFragment, inputJobs);
}

private static ListMultimap<ExchangeNode, UnassignedJob> findInputJobs(
Expand Down
Loading

0 comments on commit 10db0f8

Please sign in to comment.