Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed May 9, 2024
1 parent bad3055 commit ce87d5e
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation;
Expand All @@ -60,8 +62,6 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache;
import org.apache.doris.nereids.worker.job.UnassignedJob;
import org.apache.doris.nereids.worker.job.UnassignedJobBuilder;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.RuntimeFilter;
Expand Down Expand Up @@ -365,7 +365,8 @@ private void distribute(LogicalPlanAdapter logicalPlanAdapter) throws UserExcept
if (!statementContext.getConnectContext().getSessionVariable().isEnableNereidsCoordinator()) {
return;
}
List<UnassignedJob> unassignedJobs = UnassignedJobBuilder.buildJobs(fragments);
DistributePlanner distributePlanner = new DistributePlanner(fragments);
List<DistributedPlan> distributedPlans = distributePlanner.plan();
}

private PhysicalPlan postProcess(PhysicalPlan physicalPlan) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.distribute;

import org.apache.doris.nereids.util.Utils;
import org.apache.doris.nereids.worker.NativeBackendWorkerManager;
import org.apache.doris.nereids.worker.RoundRobinWorkerSelector;
import org.apache.doris.nereids.worker.WorkerManager;
import org.apache.doris.nereids.worker.job.UnassignedJob;
import org.apache.doris.nereids.worker.job.UnassignedJobBuilder;
import org.apache.doris.planner.PlanFragment;

import java.util.List;
import java.util.Objects;

/** DistributePlanner */
public class DistributePlanner {
private final List<PlanFragment> fragments;

public DistributePlanner(List<PlanFragment> fragments) {
this.fragments = Utils.fastToImmutableList(
Objects.requireNonNull(fragments, "fragments can not be null")
);
}

public List<DistributedPlan> plan() {
List<UnassignedJob> unassignedJobs = UnassignedJobBuilder.buildJobs(fragments);
WorkerManager workerManager = new NativeBackendWorkerManager(new RoundRobinWorkerSelector());
// List<AssignedJob> assignedJobs = workerManager.offerJobs(unassignedJobs);
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public AssignedJobImpl(
this.dataSourceIds = dataSourceIds;
}


@Override
public int indexInUnassignedJob() {
return indexInUnassignedJob;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.ScanNode;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;

import java.util.List;
import java.util.Map;

/**
* UnassignedJobBuilder.
Expand All @@ -34,14 +37,20 @@
public class UnassignedJobBuilder {
public static List<UnassignedJob> buildJobs(List<PlanFragment> fragments) {
// build from leaf to parent
Map<PlanFragmentId, UnassignedJob> unassignedJobs = Maps.newLinkedHashMap();
ImmutableList.Builder<UnassignedJob> jobs = ImmutableList.builderWithExpectedSize(fragments.size());
for (int i = fragments.size() - 1; i >= 0; i--) {
jobs.add(buildJob(fragments.get(i)));
PlanFragment planFragment = fragments.get(i);
List<UnassignedJob> inputJobs = getInputJobs(planFragment, unassignedJobs);
UnassignedJob unassignedJob = buildJob(planFragment, inputJobs);
unassignedJobs.put(planFragment.getFragmentId(), unassignedJob);
jobs.add(unassignedJob);

}
return jobs.build();
}

private static UnassignedJob buildJob(PlanFragment planFragment) {
private static UnassignedJob buildJob(PlanFragment planFragment, List<UnassignedJob> inputJobs) {
List<ScanNode> scanNodes = collectScanNodesInThisFragment(planFragment);
if (!scanNodes.isEmpty() || isLeafFragment(planFragment)) {
return buildLeafOrScanJob(planFragment, scanNodes);
Expand Down Expand Up @@ -127,4 +136,15 @@ private static UnassignedGatherJob buildGatherJob(PlanFragment planFragment) {
"Gather node should only have one exchange");
return new UnassignedGatherJob(planFragment, exchangeNodes.get(0));
}

private static List<UnassignedJob> getInputJobs(
PlanFragment planFragment, Map<PlanFragmentId, UnassignedJob> fragmentIdToJob) {
List<PlanFragment> childrenFragments = planFragment.getChildren();
ImmutableList.Builder<UnassignedJob> inputJobs
= ImmutableList.builderWithExpectedSize(childrenFragments.size());
for (PlanFragment childrenFragment : childrenFragments) {
inputJobs.add(fragmentIdToJob.get(childrenFragment.getFragmentId()));
}
return inputJobs.build();
}
}

0 comments on commit ce87d5e

Please sign in to comment.