Skip to content

Commit

Permalink
refactor coordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed May 22, 2024
1 parent 76bf419 commit 7dbb956
Show file tree
Hide file tree
Showing 47 changed files with 2,540 additions and 10 deletions.
3 changes: 2 additions & 1 deletion be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ if (COMPILER_GCC)
endif ()

if (COMPILER_CLANG)
set(CMAKE_CXX_COMPILER_VERSION "17.0.10")
if (CMAKE_CXX_COMPILER_VERSION VERSION_LESS "16")
message(FATAL_ERROR "Need Clang version at least 16")
endif()
Expand Down Expand Up @@ -370,7 +371,7 @@ endif()
# For CMAKE_BUILD_TYPE=Debug
if (OS_MACOSX AND ARCH_ARM)
# Using -O0 may meet ARM64 branch out of range errors when linking with tcmalloc.
set(CXX_FLAGS_DEBUG "${CXX_GCC_FLAGS} -Og")
set(CXX_FLAGS_DEBUG "${CXX_GCC_FLAGS} -Og -std=c++20")
else()
set(CXX_FLAGS_DEBUG "${CXX_GCC_FLAGS} -O0")
endif()
Expand Down
7 changes: 6 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/common/Id.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/**
* Integer ids that cannot accidentally be compared with ints.
*/
public class Id<IdType extends Id<IdType>> {
public class Id<IdType extends Id<IdType>> implements Comparable<Id<IdType>> {
protected final int id;

public Id(int id) {
Expand Down Expand Up @@ -62,4 +62,9 @@ public ArrayList<IdType> asList() {
public String toString() {
return Integer.toString(id);
}

@Override
public int compareTo(Id<IdType> idTypeId) {
return id - idTypeId.id;
}
}
28 changes: 28 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/common/TreeNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,32 @@ public <C extends NodeType> C findFirstOf(Class<C> cl) {
return null;
}

/** anyMatch */
public boolean anyMatch(Predicate<TreeNode<? extends NodeType>> func) {
if (func.apply(this)) {
return true;
}

for (NodeType child : children) {
if (child.anyMatch(func)) {
return true;
}
}
return false;
}


/** foreachDown */
public boolean foreachDown(Predicate<TreeNode<NodeType>> visitor) {
if (!visitor.test(this)) {
return false;
}

for (TreeNode<NodeType> child : getChildren()) {
if (!child.foreachDown(visitor)) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
Expand Down Expand Up @@ -99,6 +102,7 @@ public class NereidsPlanner extends Planner {
private Plan rewrittenPlan;
private Plan optimizedPlan;
private PhysicalPlan physicalPlan;
private FragmentIdMapping<DistributedPlan> distributedPlans;
// The cost of optimized plan
private double cost = 0;
private LogicalPlanAdapter logicalPlanAdapter;
Expand Down Expand Up @@ -126,6 +130,7 @@ public void plan(StatementBase queryStmt, org.apache.doris.thrift.TQueryOptions
LogicalPlan parsedPlan = logicalPlanAdapter.getLogicalPlan();
NereidsTracer.logImportantTime("EndParsePlan");
setParsedPlan(parsedPlan);

PhysicalProperties requireProperties = buildInitRequireProperties();
statementContext.getStopwatch().start();
boolean showPlanProcess = showPlanProcess(queryStmt.getExplainOptions());
Expand All @@ -135,8 +140,9 @@ public void plan(StatementBase queryStmt, org.apache.doris.thrift.TQueryOptions
if (explainLevel.isPlanLevel) {
return;
}

physicalPlan = (PhysicalPlan) resultPlan;
translate(physicalPlan);
distribute(physicalPlan);
}

@VisibleForTesting
Expand Down Expand Up @@ -311,7 +317,7 @@ private void optimize() {
}
}

private void translate(PhysicalPlan resultPlan) throws UserException {
private void splitFragments(PhysicalPlan resultPlan) throws UserException {
if (resultPlan instanceof PhysicalSqlCache) {
return;
}
Expand Down Expand Up @@ -355,6 +361,15 @@ private void translate(PhysicalPlan resultPlan) throws UserException {
ScanNode.setVisibleVersionForOlapScanNodes(getScanNodes());
}

private void distribute(PhysicalPlan physicalPlan) throws UserException {
splitFragments(physicalPlan);

if (!statementContext.getConnectContext().getSessionVariable().isEnableNereidsCoordinator()) {
return;
}
distributedPlans = new DistributePlanner(fragments).plan();
}

private PhysicalPlan postProcess(PhysicalPlan physicalPlan) {
return new PlanPostProcessors(cascadesContext).process(physicalPlan);
}
Expand Down Expand Up @@ -665,6 +680,10 @@ public PhysicalPlan getPhysicalPlan() {
return physicalPlan;
}

public FragmentIdMapping<DistributedPlan> getDistributedPlans() {
return distributedPlans;
}

public LogicalPlanAdapter getLogicalPlanAdapter() {
return logicalPlanAdapter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ private boolean couldNotRightBucketShuffleJoin(JoinType joinType, DistributionSp
}

@Override
public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin,
Void context) {
public Boolean visitPhysicalHashJoin(
PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin, Void context) {
Preconditions.checkArgument(children.size() == 2, "children.size() != 2");
Preconditions.checkArgument(childrenProperties.size() == 2);
Preconditions.checkArgument(requiredProperties.size() == 2);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.distribute;

import org.apache.doris.nereids.worker.job.AssignedJob;
import org.apache.doris.nereids.worker.job.AssignedJobBuilder;
import org.apache.doris.nereids.worker.job.UnassignedJob;
import org.apache.doris.nereids.worker.job.UnassignedJobBuilder;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;

import com.google.common.collect.ListMultimap;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;

/** DistributePlanner */
public class DistributePlanner {
private final List<PlanFragment> fragments;
private final FragmentIdMapping<PlanFragment> idToFragments;

public DistributePlanner(List<PlanFragment> fragments) {
this.fragments = Objects.requireNonNull(fragments, "fragments can not be null");
this.idToFragments = FragmentIdMapping.buildFragmentMapping(fragments);
}

public FragmentIdMapping<DistributedPlan> plan() {
FragmentIdMapping<UnassignedJob> fragmentJobs = UnassignedJobBuilder.buildJobs(idToFragments);
ListMultimap<PlanFragmentId, AssignedJob> instanceJobs = AssignedJobBuilder.buildJobs(fragmentJobs);
return buildDistributePlans(fragmentJobs, instanceJobs);
}

private FragmentIdMapping<DistributedPlan> buildDistributePlans(
Map<PlanFragmentId, UnassignedJob> idToUnassignedJobs,
ListMultimap<PlanFragmentId, AssignedJob> idToAssignedJobs) {
FragmentIdMapping<DistributedPlan> idToDistributedPlans = new FragmentIdMapping<>();
for (Entry<PlanFragmentId, PlanFragment> kv : idToFragments.entrySet()) {
PlanFragmentId fragmentId = kv.getKey();
PlanFragment fragment = kv.getValue();

UnassignedJob fragmentJob = idToUnassignedJobs.get(fragmentId);
List<AssignedJob> instanceJobs = idToAssignedJobs.get(fragmentId);

List<DistributedPlan> childrenPlans = idToDistributedPlans.getByChildrenFragments(fragment);
idToDistributedPlans.put(fragmentId, new DistributedPlan(fragmentJob, instanceJobs, childrenPlans));
}
return idToDistributedPlans;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.distribute;

import org.apache.doris.nereids.trees.TreeNode;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.nereids.worker.job.AssignedJob;
import org.apache.doris.nereids.worker.job.UnassignedJob;

import org.apache.commons.lang3.StringUtils;

import java.util.List;
import java.util.Objects;
import java.util.Optional;

/** DistributedPlan */
@lombok.Getter
public class DistributedPlan implements TreeNode<DistributedPlan> {
private final UnassignedJob fragmentJob;
private final List<AssignedJob> instanceJobs;
private final List<DistributedPlan> inputs;

public DistributedPlan(
UnassignedJob fragmentJob, List<AssignedJob> instanceJobs, List<DistributedPlan> inputs) {
this.fragmentJob = Objects.requireNonNull(fragmentJob, "fragmentJob can not be null");
this.instanceJobs = Utils.fastToImmutableList(
Objects.requireNonNull(instanceJobs, "instanceJobs can not be null"));
this.inputs = Utils.fastToImmutableList(Objects.requireNonNull(inputs, "inputs can not be null"));
}

@Override
public List<DistributedPlan> children() {
return inputs;
}

@Override
public DistributedPlan child(int index) {
return inputs.get(index);
}

@Override
public int arity() {
return inputs.size();
}

@Override
public <T> Optional<T> getMutableState(String key) {
return Optional.empty();
}

@Override
public void setMutableState(String key, Object value) {
throw new UnsupportedOperationException();
}

@Override
public DistributedPlan withChildren(List<DistributedPlan> children) {
throw new UnsupportedOperationException();
}

@Override
public String toString() {
String instancesStr = StringUtils.join(instanceJobs, ",\n");
String instancesStrWithIndent = Utils.addLinePrefix(instancesStr, " ");

return "DistributedPlan(\n parallel: " + instanceJobs.size() + ",\n fragmentJob: " + fragmentJob
+ "\n instanceJobs: [\n" + instancesStrWithIndent + "\n ]\n)";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.distribute;

import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;

import com.google.common.collect.ImmutableList;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

/**
* FragmentIdMapping:
* key: PlanFragmentId
* value: T
*
* NOTE: this map should order by PlanFragmentId asc
*/
public class FragmentIdMapping<T> extends TreeMap<PlanFragmentId, T> {
public FragmentIdMapping() {
}

public FragmentIdMapping(Comparator<? super PlanFragmentId> comparator) {
super(comparator);
}

public FragmentIdMapping(Map<? extends PlanFragmentId, ? extends T> m) {
super(m);
}

public FragmentIdMapping(SortedMap<PlanFragmentId, ? extends T> m) {
super(m);
}

/** getByChildrenFragments */
public List<T> getByChildrenFragments(PlanFragment fragment) {
List<PlanFragment> children = fragment.getChildren();
ImmutableList.Builder<T> values = ImmutableList.builderWithExpectedSize(children.size());
for (PlanFragment child : children) {
values.add(get(child.getFragmentId()));
}
return values.build();
}

public static FragmentIdMapping<PlanFragment> buildFragmentMapping(List<PlanFragment> fragments) {
FragmentIdMapping<PlanFragment> idToFragments = new FragmentIdMapping<>();
for (PlanFragment fragment : fragments) {
idToFragments.put(fragment.getFragmentId(), fragment);
}
return idToFragments;
}
}
14 changes: 14 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -431,4 +431,18 @@ public static <M, T extends M> Optional<M> fastReduce(List<T> list, BiFunction<M
}
return Optional.of(merge);
}

/** addLinePrefix */
public static String addLinePrefix(String str, String prefix) {
StringBuilder newStr = new StringBuilder((int) (str.length() * 1.2));
String[] lines = str.split("\n");
for (int i = 0; i < lines.length; i++) {
String line = lines[i];
newStr.append(prefix).append(line);
if (i + 1 < lines.length) {
newStr.append("\n");
}
}
return newStr.toString();
}
}
Loading

0 comments on commit 7dbb956

Please sign in to comment.