Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed May 9, 2024
1 parent 40e54c8 commit d5022d0
Show file tree
Hide file tree
Showing 12 changed files with 164 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ private void distribute(LogicalPlanAdapter logicalPlanAdapter) throws UserExcept
return;
}
DistributePlanner distributePlanner = new DistributePlanner(fragments);
// distributePlanner.plan();
distributePlanner.plan();
}

private PhysicalPlan postProcess(PhysicalPlan physicalPlan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.doris.nereids.worker.job.ExchangeInputs.AllInputs;
import org.apache.doris.nereids.worker.job.UnassignedGatherJob;
import org.apache.doris.nereids.worker.job.UnassignedJob;
import org.apache.doris.nereids.worker.job.UnassignedLeafJob;
import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob;
import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob.DataId;
import org.apache.doris.planner.ExchangeNode;
Expand Down Expand Up @@ -56,10 +55,10 @@ default List<AssignedJob> offerJob(
} else if (unassignedJob instanceof UnassignedNearStorageJob) {
// scan native olap table, we can assign a worker near the storage
return offerNearStorageJob(workerSelector, (UnassignedNearStorageJob) unassignedJob);
} else if (unassignedJob instanceof UnassignedLeafJob) {
} else if (unassignedJob.getFragment().getChildren().isEmpty()) {
// it should be a leaf job which not contains scan native olap table node,
// for example, select literal without table, or scan an external table
return offerRemoteStorageJob(workerSelector, (UnassignedLeafJob) unassignedJob);
return offerRemoteStorageJob(workerSelector, unassignedJob);
} else if (unassignedJob instanceof UnassignedGatherJob) {
return offerGatherJob((UnassignedGatherJob) unassignedJob, inputJobs);
} else {
Expand Down Expand Up @@ -92,7 +91,7 @@ default List<AssignedJob> offerNearStorageJob(
return assignments;
}

default List<AssignedJob> offerRemoteStorageJob(WorkerSelector workerSelector, UnassignedLeafJob leafJob) {
default List<AssignedJob> offerRemoteStorageJob(WorkerSelector workerSelector, UnassignedJob scanJob) {

return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,48 @@
package org.apache.doris.nereids.worker.job;

import org.apache.doris.nereids.trees.AbstractTreeNode;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.PlanFragment;

import com.google.common.collect.ImmutableList;
import org.apache.doris.planner.ScanNode;

import java.util.List;
import java.util.Map;
import java.util.Objects;

/** AbstractUnassignedJob */
public abstract class AbstractUnassignedJob
extends AbstractTreeNode<UnassignedJob> implements UnassignedJob {
protected final PlanFragment fragment;
protected final List<ScanNode> scanNodes;
protected final Map<ExchangeNode, UnassignedJob> exchangeToChildJob;

public AbstractUnassignedJob(PlanFragment fragment) {
this(fragment, ImmutableList.of());
}

public AbstractUnassignedJob(PlanFragment fragment, List<UnassignedJob> children) {
super(children);
public AbstractUnassignedJob(PlanFragment fragment, List<ScanNode> scanNodes,
Map<ExchangeNode, UnassignedJob> exchangeToChildJob) {
super(Utils.fastToImmutableList(exchangeToChildJob.values()));
this.fragment = Objects.requireNonNull(fragment, "fragment can not be null");
this.scanNodes = Utils.fastToImmutableList(
Objects.requireNonNull(scanNodes, "scanNodes can not be null")
);
this.exchangeToChildJob
= Objects.requireNonNull(exchangeToChildJob, "exchangeToChildJob can not be null");
}

@Override
public PlanFragment getFragment() {
return fragment;
}

@Override
public List<ScanNode> getScanNodes() {
return scanNodes;
}

@Override
public Map<ExchangeNode, UnassignedJob> getExchangeToChildJob() {
return exchangeToChildJob;
}

@Override
public String toString() {
return getClass().getSimpleName() + ": " + fragment.getFragmentId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@
import org.apache.doris.planner.PlanFragment;

import com.google.common.collect.ImmutableList;

import java.util.Objects;
import com.google.common.collect.ImmutableMap;

/** UnassignedGatherJob */
public class UnassignedGatherJob extends AbstractUnassignedJob {
private final ExchangeNode exchangeNode;
private int assignedJobNum;
private ExchangeNode exchangeNode;

public UnassignedGatherJob(PlanFragment fragment, ExchangeNode exchangeNode, UnassignedJob inputJob) {
super(fragment, ImmutableList.of(inputJob));
this.exchangeNode = Objects.requireNonNull(exchangeNode, "exchangeNode can not be null");
super(fragment, ImmutableList.of(), ImmutableMap.of(exchangeNode, inputJob));
this.exchangeNode = exchangeNode;
}

public ExchangeNode getExchangeNode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.doris.planner.ScanNode;

import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;

/**
* WorkerJob.
Expand All @@ -37,6 +39,10 @@ default int computeDegreeOfParallelism() {

PlanFragment getFragment();

List<ScanNode> getScanNodes();

Map<ExchangeNode, UnassignedJob> getExchangeToChildJob();

// generate an instance job
// e.g. build an instance job by a backends and the replica ids it contains
default AssignedJob assignWorkerAndDataSources(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,22 @@
package org.apache.doris.nereids.worker.job;

import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;

import java.util.IdentityHashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

/**
Expand All @@ -40,19 +46,21 @@ public class UnassignedJobBuilder {
*/
public static FragmentIdMapping<UnassignedJob> buildJobs(FragmentIdMapping<PlanFragment> fragments) {
// build from leaf to parent

FragmentLineage fragmentLineage = buildFragmentLineage(fragments);
FragmentIdMapping<UnassignedJob> unassignedJobs = new FragmentIdMapping<>();
for (Entry<PlanFragmentId, PlanFragment> kv : fragments.entrySet()) {
PlanFragmentId fragmentId = kv.getKey();
PlanFragment fragment = kv.getValue();

List<UnassignedJob> inputJobs = unassignedJobs.getByChildrenFragments(fragment);
Map<ExchangeNode, UnassignedJob> inputJobs = findInputJobs(fragmentLineage, fragmentId, unassignedJobs);
UnassignedJob unassignedJob = buildJob(fragment, inputJobs);
unassignedJobs.put(fragmentId, unassignedJob);
}
return unassignedJobs;
}

private static UnassignedJob buildJob(PlanFragment planFragment, List<UnassignedJob> inputJobs) {
private static UnassignedJob buildJob(PlanFragment planFragment, Map<ExchangeNode, UnassignedJob> inputJobs) {
List<ScanNode> scanNodes = collectScanNodesInThisFragment(planFragment);
if (!scanNodes.isEmpty() || isLeafFragment(planFragment)) {
return buildLeafOrScanJob(planFragment, scanNodes, inputJobs);
Expand All @@ -61,13 +69,13 @@ private static UnassignedJob buildJob(PlanFragment planFragment, List<Unassigned
}
}

private static UnassignedLeafJob buildLeafOrScanJob(
PlanFragment planFragment, List<ScanNode> scanNodes, List<UnassignedJob> inputJobs) {
List<OlapScanNode> olapScanNodes = filterOlapScanNodes(scanNodes);
if (!olapScanNodes.isEmpty()) {
private static UnassignedJob buildLeafOrScanJob(
PlanFragment planFragment, List<ScanNode> scanNodes, Map<ExchangeNode, UnassignedJob> inputJobs) {
boolean hasOlapScanNodes = hasOlapScanNodes(scanNodes);
if (hasOlapScanNodes) {
// we need assign a backend which contains the data,
// so that the OlapScanNode can find the data in the backend
return buildScanLocalTableJob(planFragment, olapScanNodes, inputJobs);
return buildScanLocalTableJob(planFragment, scanNodes, inputJobs);
} else if (scanNodes.isEmpty()) {
// select constant without table,
// e.g. select 100 union select 200
Expand All @@ -79,7 +87,8 @@ private static UnassignedLeafJob buildLeafOrScanJob(
}
}

private static UnassignedJob buildExchangeJob(PlanFragment planFragment, List<UnassignedJob> inputJobs) {
private static UnassignedJob buildExchangeJob(
PlanFragment planFragment, Map<ExchangeNode, UnassignedJob> inputJobs) {
if (planFragment.getDataPartition().isPartitioned()) {
// shuffle to some backends,
// it means every child is from exchange node
Expand All @@ -91,22 +100,21 @@ private static UnassignedJob buildExchangeJob(PlanFragment planFragment, List<Un
}

private static UnassignedScanNativeTableJob buildScanLocalTableJob(
PlanFragment planFragment, List<OlapScanNode> olapScanNodes, List<UnassignedJob> inputJobs) {
return new UnassignedScanNativeTableJob(planFragment, olapScanNodes, inputJobs);
PlanFragment planFragment, List<ScanNode> scanNodes, Map<ExchangeNode, UnassignedJob> inputJobs) {
return new UnassignedScanNativeTableJob(planFragment, scanNodes, inputJobs);
}

private static List<ScanNode> collectScanNodesInThisFragment(PlanFragment planFragment) {
return planFragment.getPlanRoot().collectUntil(ScanNode.class, ExchangeNode.class);
}

private static List<OlapScanNode> filterOlapScanNodes(List<ScanNode> scanNodes) {
ImmutableList.Builder<OlapScanNode> olapScanNodes = ImmutableList.builderWithExpectedSize(scanNodes.size());
private static boolean hasOlapScanNodes(List<ScanNode> scanNodes) {
for (ScanNode scanNode : scanNodes) {
if (scanNode instanceof OlapScanNode) {
olapScanNodes.add((OlapScanNode) scanNode);
return true;
}
}
return olapScanNodes.build();
return false;
}

private static boolean isLeafFragment(PlanFragment planFragment) {
Expand All @@ -118,27 +126,85 @@ private static UnassignedQueryConstantJob buildQueryConstantJob(PlanFragment pla
}

private static UnassignedScanRemoteTableJob buildScanRemoteTableJob(
PlanFragment planFragment, List<ScanNode> scanNodes, List<UnassignedJob> inputJobs) {
PlanFragment planFragment, List<ScanNode> scanNodes, Map<ExchangeNode, UnassignedJob> inputJobs) {
return new UnassignedScanRemoteTableJob(planFragment, scanNodes, inputJobs);
}

private static UnassignedShuffleJob buildShuffleJob(PlanFragment planFragment, List<UnassignedJob> inputJobs) {
// collect ExchangeNodes in this fragment
List<ExchangeNode> exchangeNodes = planFragment
.getPlanRoot()
.collectUntil(ExchangeNode.class, ExchangeNode.class);
return new UnassignedShuffleJob(planFragment, exchangeNodes);
private static UnassignedShuffleJob buildShuffleJob(
PlanFragment planFragment, Map<ExchangeNode, UnassignedJob> inputJobs) {
return new UnassignedShuffleJob(planFragment, inputJobs);
}

private static UnassignedGatherJob buildGatherJob(PlanFragment planFragment, List<UnassignedJob> inputJobs) {
private static UnassignedGatherJob buildGatherJob(
PlanFragment planFragment, Map<ExchangeNode, UnassignedJob> inputJobs) {
// collect ExchangeNodes in this fragment
List<ExchangeNode> exchangeNodes = planFragment
Preconditions.checkArgument(inputJobs.size() == 1,
"Gather node should only have one exchange");
Entry<ExchangeNode, UnassignedJob> exchangeAndJob = inputJobs.entrySet().iterator().next();
return new UnassignedGatherJob(planFragment, exchangeAndJob.getKey(), exchangeAndJob.getValue());
}

private static Map<ExchangeNode, UnassignedJob> findInputJobs(
FragmentLineage lineage, PlanFragmentId fragmentId, FragmentIdMapping<UnassignedJob> unassignedJobs) {
Map<ExchangeNode, UnassignedJob> candidate = new IdentityHashMap<>();

Map<PlanNodeId, ExchangeNode> exchangeNodes = lineage.parentFragmentToExchangeNode.get(fragmentId);
if (exchangeNodes != null) {
for (Entry<PlanNodeId, ExchangeNode> idToExchange : exchangeNodes.entrySet()) {
PlanNodeId exchangeId = idToExchange.getKey();
ExchangeNode exchangeNode = idToExchange.getValue();
PlanFragmentId childFragmentId = lineage.exchangeToChildFragment.get(exchangeId);
candidate.put(exchangeNode, unassignedJobs.get(childFragmentId));
}
}
return candidate;
}

private static List<ExchangeNode> collectExchangeNodesInThisFragment(PlanFragment planFragment) {
return planFragment
.getPlanRoot()
.collectUntil(ExchangeNode.class, ExchangeNode.class);
Preconditions.checkArgument(exchangeNodes.size() == 1,
"Gather node should only have one exchange");
Preconditions.checkArgument(inputJobs.size() == 1,
"Gather node should only have one input job");
return new UnassignedGatherJob(planFragment, exchangeNodes.get(0), inputJobs.get(0));
}

private static FragmentLineage buildFragmentLineage(
FragmentIdMapping<PlanFragment> fragments) {
Map<PlanNodeId, PlanFragmentId> exchangeToChildFragment = new LinkedHashMap<>();
FragmentIdMapping<Map<PlanNodeId, ExchangeNode>> parentFragmentToExchangeNode = new FragmentIdMapping<>();

for (PlanFragment fragment : fragments.values()) {
PlanFragmentId fragmentId = fragment.getFragmentId();

// 1. link child fragment to exchange node
DataSink sink = fragment.getSink();
if (sink instanceof DataStreamSink) {
PlanNodeId exchangeNodeId = sink.getExchNodeId();
exchangeToChildFragment.put(exchangeNodeId, fragmentId);
}

// 2. link parent fragment to exchange node
List<ExchangeNode> exchangeNodes = collectExchangeNodesInThisFragment(fragment);
Map<PlanNodeId, ExchangeNode> exchangeNodesInFragment = Maps.newLinkedHashMap();
for (ExchangeNode exchangeNode : exchangeNodes) {
exchangeNodesInFragment.put(exchangeNode.getId(), exchangeNode);
}
parentFragmentToExchangeNode.put(fragmentId, exchangeNodesInFragment);
}

return new FragmentLineage(fragments, parentFragmentToExchangeNode, exchangeToChildFragment);
}

private static class FragmentLineage {
private final FragmentIdMapping<PlanFragment> idToFragments;
private final FragmentIdMapping<Map<PlanNodeId, ExchangeNode>> parentFragmentToExchangeNode;
private final Map<PlanNodeId, PlanFragmentId> exchangeToChildFragment;

public FragmentLineage(
FragmentIdMapping<PlanFragment> idToFragments,
FragmentIdMapping<Map<PlanNodeId, ExchangeNode>> parentFragmentToExchangeNode,
Map<PlanNodeId, PlanFragmentId> exchangeToChildFragment) {
this.idToFragments = idToFragments;
this.parentFragmentToExchangeNode = parentFragmentToExchangeNode;
this.exchangeToChildFragment = exchangeToChildFragment;
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.doris.nereids.worker.DataTopology;

/** UnassignedNearStorageJob */
public interface UnassignedNearStorageJob extends UnassignedLeafJob {
public interface UnassignedNearStorageJob {
// get the map which mapping tablet to backend, note that it is maybe partial topology,
// for example, just select partial tablets to scan
DataTopology usedDataTopology();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

import org.apache.doris.planner.PlanFragment;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

/** UnassignedQueryConstantJob */
public class UnassignedQueryConstantJob extends AbstractUnassignedJob implements UnassignedLeafJob {
public class UnassignedQueryConstantJob extends AbstractUnassignedJob {
public UnassignedQueryConstantJob(PlanFragment fragment) {
super(fragment);
super(fragment, ImmutableList.of(), ImmutableMap.of());
}
}
Loading

0 comments on commit d5022d0

Please sign in to comment.