From 7dbb956997e0ae1258bd31db7c40eaeffcd38136 Mon Sep 17 00:00:00 2001 From: 924060929 <924060929@qq.com> Date: Mon, 6 May 2024 15:48:16 +0800 Subject: [PATCH] refactor coordinator --- be/CMakeLists.txt | 3 +- .../main/java/org/apache/doris/common/Id.java | 7 +- .../org/apache/doris/common/TreeNode.java | 28 ++ .../apache/doris/nereids/NereidsPlanner.java | 23 +- .../ChildrenPropertiesRegulator.java | 4 +- .../plans/distribute/DistributePlanner.java | 66 +++++ .../plans/distribute/DistributedPlan.java | 84 ++++++ .../plans/distribute/FragmentIdMapping.java | 71 +++++ .../org/apache/doris/nereids/util/Utils.java | 14 + .../doris/nereids/worker/BackendWorker.java | 76 +++++ .../nereids/worker/BackendWorkerManager.java | 63 ++++ .../nereids/worker/BackendWorkerPool.java | 43 +++ .../worker/LoadBalanceScanWorkerSelector.java | 268 ++++++++++++++++++ .../nereids/worker/ScanWorkerSelector.java | 51 ++++ .../apache/doris/nereids/worker/Worker.java | 38 +++ .../doris/nereids/worker/WorkerGroup.java | 47 +++ .../doris/nereids/worker/WorkerManager.java | 25 ++ .../doris/nereids/worker/WorkerPool.java | 31 ++ .../apache/doris/nereids/worker/Workload.java | 22 ++ .../worker/job/AbstractUnassignedJob.java | 73 +++++ .../doris/nereids/worker/job/AssignedJob.java | 36 +++ .../worker/job/AssignedJobBuilder.java | 61 ++++ .../nereids/worker/job/AssignedJobImpl.java | 67 +++++ .../nereids/worker/job/BucketScanSource.java | 51 ++++ .../worker/job/CustomAssignmentJob.java | 29 ++ .../nereids/worker/job/DefaultScanSource.java | 61 ++++ .../doris/nereids/worker/job/ScanRanges.java | 81 ++++++ .../doris/nereids/worker/job/ScanSource.java | 30 ++ .../nereids/worker/job/UnassignedJob.java | 55 ++++ .../worker/job/UnassignedJobBuilder.java | 204 +++++++++++++ .../worker/job/UnassignedNearStorageJob.java | 27 ++ .../job/UnassignedQueryConstantJob.java | 47 +++ .../job/UnassignedScanNativeTableJob.java | 136 +++++++++ .../job/UnassignedScanRemoteTableJob.java | 39 +++ .../worker/job/UnassignedShuffleJob.java | 113 ++++++++ .../job/UnassignedSpecifyInstancesJob.java | 50 ++++ .../nereids/worker/job/WorkerScanSource.java | 34 +++ .../doris/planner/BucketSpecifyInstances.java | 33 +++ .../planner/DefaultSpecifyInstances.java | 33 +++ .../planner/NereidsSpecifyInstances.java | 55 ++++ .../apache/doris/planner/OlapScanNode.java | 23 +- .../apache/doris/planner/PlanFragment.java | 30 ++ .../org/apache/doris/planner/PlanNode.java | 25 ++ .../java/org/apache/doris/qe/Coordinator.java | 110 +++++++ .../apache/doris/qe/NereidsCoordinator.java | 64 +++++ .../org/apache/doris/qe/SessionVariable.java | 13 + .../org/apache/doris/qe/StmtExecutor.java | 6 +- 47 files changed, 2540 insertions(+), 10 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributedPlan.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/FragmentIdMapping.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/BackendWorker.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/BackendWorkerManager.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/BackendWorkerPool.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/ScanWorkerSelector.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/Worker.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerGroup.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerManager.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerPool.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/Workload.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AbstractUnassignedJob.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJob.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJobBuilder.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJobImpl.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/BucketScanSource.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/CustomAssignmentJob.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DefaultScanSource.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanRanges.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanSource.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJob.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJobBuilder.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedNearStorageJob.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedQueryConstantJob.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanNativeTableJob.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanRemoteTableJob.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedShuffleJob.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedSpecifyInstancesJob.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/WorkerScanSource.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/BucketSpecifyInstances.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/DefaultSpecifyInstances.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/NereidsSpecifyInstances.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 9dd71d40245fa80..7c56a17032c8bad 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -269,6 +269,7 @@ if (COMPILER_GCC) endif () if (COMPILER_CLANG) + set(CMAKE_CXX_COMPILER_VERSION "17.0.10") if (CMAKE_CXX_COMPILER_VERSION VERSION_LESS "16") message(FATAL_ERROR "Need Clang version at least 16") endif() @@ -370,7 +371,7 @@ endif() # For CMAKE_BUILD_TYPE=Debug if (OS_MACOSX AND ARCH_ARM) # Using -O0 may meet ARM64 branch out of range errors when linking with tcmalloc. - set(CXX_FLAGS_DEBUG "${CXX_GCC_FLAGS} -Og") + set(CXX_FLAGS_DEBUG "${CXX_GCC_FLAGS} -Og -std=c++20") else() set(CXX_FLAGS_DEBUG "${CXX_GCC_FLAGS} -O0") endif() diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Id.java b/fe/fe-core/src/main/java/org/apache/doris/common/Id.java index 9d6dad50a462380..a6bd3896708058c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Id.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Id.java @@ -25,7 +25,7 @@ /** * Integer ids that cannot accidentally be compared with ints. */ -public class Id> { +public class Id> implements Comparable> { protected final int id; public Id(int id) { @@ -62,4 +62,9 @@ public ArrayList asList() { public String toString() { return Integer.toString(id); } + + @Override + public int compareTo(Id idTypeId) { + return id - idTypeId.id; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/TreeNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/TreeNode.java index 7693acf3bb33d28..1870c231ae5331f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/TreeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/TreeNode.java @@ -239,4 +239,32 @@ public C findFirstOf(Class cl) { return null; } + /** anyMatch */ + public boolean anyMatch(Predicate> func) { + if (func.apply(this)) { + return true; + } + + for (NodeType child : children) { + if (child.anyMatch(func)) { + return true; + } + } + return false; + } + + + /** foreachDown */ + public boolean foreachDown(Predicate> visitor) { + if (!visitor.test(this)) { + return false; + } + + for (TreeNode child : getChildren()) { + if (!child.foreachDown(visitor)) { + return false; + } + } + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index ac7a5ffbc586cd5..18a6372336a18f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -52,6 +52,9 @@ import org.apache.doris.nereids.trees.expressions.literal.Literal; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel; +import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner; +import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan; +import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache; import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation; @@ -99,6 +102,7 @@ public class NereidsPlanner extends Planner { private Plan rewrittenPlan; private Plan optimizedPlan; private PhysicalPlan physicalPlan; + private FragmentIdMapping distributedPlans; // The cost of optimized plan private double cost = 0; private LogicalPlanAdapter logicalPlanAdapter; @@ -126,6 +130,7 @@ public void plan(StatementBase queryStmt, org.apache.doris.thrift.TQueryOptions LogicalPlan parsedPlan = logicalPlanAdapter.getLogicalPlan(); NereidsTracer.logImportantTime("EndParsePlan"); setParsedPlan(parsedPlan); + PhysicalProperties requireProperties = buildInitRequireProperties(); statementContext.getStopwatch().start(); boolean showPlanProcess = showPlanProcess(queryStmt.getExplainOptions()); @@ -135,8 +140,9 @@ public void plan(StatementBase queryStmt, org.apache.doris.thrift.TQueryOptions if (explainLevel.isPlanLevel) { return; } + physicalPlan = (PhysicalPlan) resultPlan; - translate(physicalPlan); + distribute(physicalPlan); } @VisibleForTesting @@ -311,7 +317,7 @@ private void optimize() { } } - private void translate(PhysicalPlan resultPlan) throws UserException { + private void splitFragments(PhysicalPlan resultPlan) throws UserException { if (resultPlan instanceof PhysicalSqlCache) { return; } @@ -355,6 +361,15 @@ private void translate(PhysicalPlan resultPlan) throws UserException { ScanNode.setVisibleVersionForOlapScanNodes(getScanNodes()); } + private void distribute(PhysicalPlan physicalPlan) throws UserException { + splitFragments(physicalPlan); + + if (!statementContext.getConnectContext().getSessionVariable().isEnableNereidsCoordinator()) { + return; + } + distributedPlans = new DistributePlanner(fragments).plan(); + } + private PhysicalPlan postProcess(PhysicalPlan physicalPlan) { return new PlanPostProcessors(cascadesContext).process(physicalPlan); } @@ -665,6 +680,10 @@ public PhysicalPlan getPhysicalPlan() { return physicalPlan; } + public FragmentIdMapping getDistributedPlans() { + return distributedPlans; + } + public LogicalPlanAdapter getLogicalPlanAdapter() { return logicalPlanAdapter; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java index 038e2646a6dd996..4a6a8d884b48506 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java @@ -227,8 +227,8 @@ private boolean couldNotRightBucketShuffleJoin(JoinType joinType, DistributionSp } @Override - public Boolean visitPhysicalHashJoin(PhysicalHashJoin hashJoin, - Void context) { + public Boolean visitPhysicalHashJoin( + PhysicalHashJoin hashJoin, Void context) { Preconditions.checkArgument(children.size() == 2, "children.size() != 2"); Preconditions.checkArgument(childrenProperties.size() == 2); Preconditions.checkArgument(requiredProperties.size() == 2); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java new file mode 100644 index 000000000000000..7ff373d39bf2289 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.distribute; + +import org.apache.doris.nereids.worker.job.AssignedJob; +import org.apache.doris.nereids.worker.job.AssignedJobBuilder; +import org.apache.doris.nereids.worker.job.UnassignedJob; +import org.apache.doris.nereids.worker.job.UnassignedJobBuilder; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.PlanFragmentId; + +import com.google.common.collect.ListMultimap; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; + +/** DistributePlanner */ +public class DistributePlanner { + private final List fragments; + private final FragmentIdMapping idToFragments; + + public DistributePlanner(List fragments) { + this.fragments = Objects.requireNonNull(fragments, "fragments can not be null"); + this.idToFragments = FragmentIdMapping.buildFragmentMapping(fragments); + } + + public FragmentIdMapping plan() { + FragmentIdMapping fragmentJobs = UnassignedJobBuilder.buildJobs(idToFragments); + ListMultimap instanceJobs = AssignedJobBuilder.buildJobs(fragmentJobs); + return buildDistributePlans(fragmentJobs, instanceJobs); + } + + private FragmentIdMapping buildDistributePlans( + Map idToUnassignedJobs, + ListMultimap idToAssignedJobs) { + FragmentIdMapping idToDistributedPlans = new FragmentIdMapping<>(); + for (Entry kv : idToFragments.entrySet()) { + PlanFragmentId fragmentId = kv.getKey(); + PlanFragment fragment = kv.getValue(); + + UnassignedJob fragmentJob = idToUnassignedJobs.get(fragmentId); + List instanceJobs = idToAssignedJobs.get(fragmentId); + + List childrenPlans = idToDistributedPlans.getByChildrenFragments(fragment); + idToDistributedPlans.put(fragmentId, new DistributedPlan(fragmentJob, instanceJobs, childrenPlans)); + } + return idToDistributedPlans; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributedPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributedPlan.java new file mode 100644 index 000000000000000..1812687d600d32d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributedPlan.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.distribute; + +import org.apache.doris.nereids.trees.TreeNode; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.nereids.worker.job.AssignedJob; +import org.apache.doris.nereids.worker.job.UnassignedJob; + +import org.apache.commons.lang3.StringUtils; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** DistributedPlan */ +@lombok.Getter +public class DistributedPlan implements TreeNode { + private final UnassignedJob fragmentJob; + private final List instanceJobs; + private final List inputs; + + public DistributedPlan( + UnassignedJob fragmentJob, List instanceJobs, List inputs) { + this.fragmentJob = Objects.requireNonNull(fragmentJob, "fragmentJob can not be null"); + this.instanceJobs = Utils.fastToImmutableList( + Objects.requireNonNull(instanceJobs, "instanceJobs can not be null")); + this.inputs = Utils.fastToImmutableList(Objects.requireNonNull(inputs, "inputs can not be null")); + } + + @Override + public List children() { + return inputs; + } + + @Override + public DistributedPlan child(int index) { + return inputs.get(index); + } + + @Override + public int arity() { + return inputs.size(); + } + + @Override + public Optional getMutableState(String key) { + return Optional.empty(); + } + + @Override + public void setMutableState(String key, Object value) { + throw new UnsupportedOperationException(); + } + + @Override + public DistributedPlan withChildren(List children) { + throw new UnsupportedOperationException(); + } + + @Override + public String toString() { + String instancesStr = StringUtils.join(instanceJobs, ",\n"); + String instancesStrWithIndent = Utils.addLinePrefix(instancesStr, " "); + + return "DistributedPlan(\n parallel: " + instanceJobs.size() + ",\n fragmentJob: " + fragmentJob + + "\n instanceJobs: [\n" + instancesStrWithIndent + "\n ]\n)"; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/FragmentIdMapping.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/FragmentIdMapping.java new file mode 100644 index 000000000000000..95bf36051d2033b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/FragmentIdMapping.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.distribute; + +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.PlanFragmentId; + +import com.google.common.collect.ImmutableList; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +/** + * FragmentIdMapping: + * key: PlanFragmentId + * value: T + * + * NOTE: this map should order by PlanFragmentId asc + */ +public class FragmentIdMapping extends TreeMap { + public FragmentIdMapping() { + } + + public FragmentIdMapping(Comparator comparator) { + super(comparator); + } + + public FragmentIdMapping(Map m) { + super(m); + } + + public FragmentIdMapping(SortedMap m) { + super(m); + } + + /** getByChildrenFragments */ + public List getByChildrenFragments(PlanFragment fragment) { + List children = fragment.getChildren(); + ImmutableList.Builder values = ImmutableList.builderWithExpectedSize(children.size()); + for (PlanFragment child : children) { + values.add(get(child.getFragmentId())); + } + return values.build(); + } + + public static FragmentIdMapping buildFragmentMapping(List fragments) { + FragmentIdMapping idToFragments = new FragmentIdMapping<>(); + for (PlanFragment fragment : fragments) { + idToFragments.put(fragment.getFragmentId(), fragment); + } + return idToFragments; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java index 852e148ef1d9cbf..908f37b7d87a5e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java @@ -431,4 +431,18 @@ public static Optional fastReduce(List list, BiFunction> backends = Suppliers.memoize(() -> { + try { + return Env.getCurrentSystemInfo().getBackendsWithIdByCurrentCluster(); + } catch (Exception t) { + throw new NereidsException("Can not get backends: " + t, t); + } + }); + + @Override + public Worker getWorker(long backendId) { + ImmutableMap backends = this.backends.get(); + Backend backend = backends.get(backendId); + if (backend == null) { + throw new IllegalStateException("Backend " + backendId + " is not exist"); + } + return new BackendWorker(backend); + } + + @Override + public Worker randomAvailableWorker() { + try { + Reference selectedBackendId = new Reference<>(); + ImmutableMap backends = this.backends.get(); + SimpleScheduler.getHost(backends, selectedBackendId); + Backend selctedBackend = backends.get(selectedBackendId.getRef()); + return new BackendWorker(selctedBackend); + } catch (Exception t) { + throw new NereidsException("Can not get backends: " + t, t); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/BackendWorkerPool.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/BackendWorkerPool.java new file mode 100644 index 000000000000000..66e45e9af345122 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/BackendWorkerPool.java @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker; + +import java.util.List; + +/** BackendWorkerPool */ +public class BackendWorkerPool implements WorkerPool { + @Override + public List getAvailableWorkers() { + return null; + } + + @Override + public Worker getWorker(String address) { + return null; + } + + @Override + public List getWorkers() { + return null; + } + + @Override + public List getWorkers(WorkerGroup workerGroup) { + return null; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java new file mode 100644 index 000000000000000..2ff3536ebcc7c5c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java @@ -0,0 +1,268 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker; + +import org.apache.doris.common.Pair; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.worker.job.ScanRanges; +import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.ScanNode; +import org.apache.doris.thrift.TScanRangeLocation; +import org.apache.doris.thrift.TScanRangeLocations; +import org.apache.doris.thrift.TScanRangeParams; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +/** LoadBalanceScanWorkerSelector */ +public class LoadBalanceScanWorkerSelector implements ScanWorkerSelector { + private final BackendWorkerManager workerManager = new BackendWorkerManager(); + private final Map workloads = Maps.newLinkedHashMap(); + + @Override + public Map> selectReplicaAndWorkerWithoutBucket( + UnassignedNearStorageJob unassignedJob) { + List nearStorageScanNodes = unassignedJob.nearStorageScanNodes(); + if (nearStorageScanNodes.size() != 1 || !(nearStorageScanNodes.get(0) instanceof OlapScanNode)) { + throw new IllegalStateException("Illegal fragment type, " + + "should only contains one OlapScanNode but meet " + nearStorageScanNodes); + } + return selectForSingleOlapTable((OlapScanNode) nearStorageScanNodes.get(0)); + } + + @Override + public Map>> selectReplicaAndWorkerWithBucket( + UnassignedNearStorageJob unassignedJob) { + PlanFragment fragment = unassignedJob.getFragment(); + List olapScanNodes = filterOlapScanNodes(unassignedJob.getScanNodes()); + if (olapScanNodes.size() == 1 && fragment.isBucketShuffleJoinInput()) { + return selectForBucket(unassignedJob, olapScanNodes); + } else if (!olapScanNodes.isEmpty() && fragment.hasColocatePlanNode()) { + return selectForBucket(unassignedJob, olapScanNodes); + } else { + throw new IllegalStateException( + "Illegal bucket shuffle join or colocate join in fragment: " + fragment.getFragmentId()); + } + } + + private Map>> selectForBucket( + UnassignedNearStorageJob unassignedJob, List olapScanNodes) { + Map>> assignment = Maps.newLinkedHashMap(); + + Map bucketIndexToBytes = + computeEachBucketScanBytes(unassignedJob.getFragment(), olapScanNodes); + + OlapScanNode firstOlapScanNode = olapScanNodes.get(0); + for (Entry kv : bucketIndexToBytes.entrySet()) { + Integer bucketIndex = kv.getKey(); + long allScanNodeScanBytesInOneBucket = kv.getValue(); + + List allPartitionTabletsInOneBucket + = firstOlapScanNode.bucketSeq2locations.get(bucketIndex); + SelectResult replicaAndWorker = selectScanReplicaAndMinWorkloadWorker( + allPartitionTabletsInOneBucket.get(0), allScanNodeScanBytesInOneBucket); + Worker selectedWorker = replicaAndWorker.selectWorker; + long workerId = selectedWorker.id(); + for (OlapScanNode olapScanNode : olapScanNodes) { + List> selectedReplicasInOneBucket = + filterReplicaByWorkerInBucket(olapScanNode, workerId, bucketIndex); + Map> bucketIndexToScanNodeToTablets + = assignment.computeIfAbsent(selectedWorker, worker -> Maps.newLinkedHashMap()); + Map scanNodeToScanRanges = bucketIndexToScanNodeToTablets + .computeIfAbsent(bucketIndex, bucket -> Maps.newLinkedHashMap()); + ScanRanges scanRanges = scanNodeToScanRanges.computeIfAbsent(olapScanNode, node -> new ScanRanges()); + for (Pair replica : selectedReplicasInOneBucket) { + TScanRangeParams replicaParam = replica.first; + Long scanBytes = replica.second; + scanRanges.addScanRange(replicaParam, scanBytes); + } + } + } + return assignment; + } + + private Map> selectForSingleOlapTable( + OlapScanNode nearStorageScanNode) { + Map> workerToScanNodeAndReplicas = Maps.newHashMap(); + List allScanTabletLocations = nearStorageScanNode.getScanRangeLocations(0); + for (TScanRangeLocations onePartitionOneTabletLocation : allScanTabletLocations) { + long tabletId = onePartitionOneTabletLocation.getScanRange().getPaloScanRange().getTabletId(); + Long tabletBytes = nearStorageScanNode.getTabletSingleReplicaSize(tabletId); + + SelectResult selectedReplicaAndWorker + = selectScanReplicaAndMinWorkloadWorker(onePartitionOneTabletLocation, tabletBytes); + Worker selectedWorker = selectedReplicaAndWorker.selectWorker; + TScanRangeLocation selectedReplica = selectedReplicaAndWorker.selectReplica; + + Map scanNodeToRanges + = workerToScanNodeAndReplicas.computeIfAbsent(selectedWorker, worker -> Maps.newLinkedHashMap()); + ScanRanges selectedReplicas + = scanNodeToRanges.computeIfAbsent(nearStorageScanNode, node -> new ScanRanges()); + TScanRangeParams scanReplicaParam = buildScanReplicaParams(onePartitionOneTabletLocation, selectedReplica); + selectedReplicas.addScanRange(scanReplicaParam, tabletBytes); + } + + // scan empty table, assign a random worker with empty + if (workerToScanNodeAndReplicas.isEmpty()) { + workerToScanNodeAndReplicas.put( + workerManager.randomAvailableWorker(), + ImmutableMap.of(nearStorageScanNode, new ScanRanges()) + ); + } + return workerToScanNodeAndReplicas; + } + + private SelectResult selectScanReplicaAndMinWorkloadWorker( + TScanRangeLocations tabletLocation, long tabletBytes) { + List replicaLocations = tabletLocation.getLocations(); + int replicaNum = replicaLocations.size(); + WorkerWorkload minWorkload = new WorkerWorkload(Integer.MAX_VALUE, Long.MAX_VALUE); + Worker minWorkLoadWorker = null; + TScanRangeLocation selectedReplicaLocation = null; + + for (int i = 0; i < replicaNum; i++) { + TScanRangeLocation replicaLocation = replicaLocations.get(i); + Worker worker = workerManager.getWorker(replicaLocation.getBackendId()); + if (!worker.available()) { + continue; + } + + WorkerWorkload workload = getWorkload(worker); + if (workload.compareTo(minWorkload) < 0) { + minWorkLoadWorker = worker; + minWorkload = workload; + selectedReplicaLocation = replicaLocation; + } + } + if (minWorkLoadWorker == null) { + throw new AnalysisException("No available workers"); + } else { + minWorkload.recordOneScanTask(tabletBytes); + return new SelectResult(minWorkLoadWorker, selectedReplicaLocation, minWorkload.scanBytes); + } + } + + private List filterOlapScanNodes(List scanNodes) { + ImmutableList.Builder olapScanNodes = ImmutableList.builderWithExpectedSize(scanNodes.size()); + for (ScanNode scanNode : scanNodes) { + if (scanNode instanceof OlapScanNode) { + olapScanNodes.add((OlapScanNode) scanNode); + } + } + return olapScanNodes.build(); + } + + private List> filterReplicaByWorkerInBucket( + OlapScanNode olapScanNode, long filterWorkerId, int bucketIndex) { + List> selectedReplicasInOneBucket = Lists.newArrayList(); + for (TScanRangeLocations onePartitionOneTabletLocation + : olapScanNode.bucketSeq2locations.get(bucketIndex)) { + long tabletId = onePartitionOneTabletLocation.getScanRange().getPaloScanRange().getTabletId(); + for (TScanRangeLocation replicaLocation : onePartitionOneTabletLocation.getLocations()) { + if (replicaLocation.getBackendId() == filterWorkerId) { + TScanRangeParams scanReplicaParams = + buildScanReplicaParams(onePartitionOneTabletLocation, replicaLocation); + Long replicaSize = olapScanNode.getTabletSingleReplicaSize(tabletId); + selectedReplicasInOneBucket.add(Pair.of(scanReplicaParams, replicaSize)); + break; + } + } + } + return selectedReplicasInOneBucket; + } + + private Map computeEachBucketScanBytes(PlanFragment fragment, List olapScanNodes) { + Map bucketIndexToBytes = Maps.newLinkedHashMap(); + for (OlapScanNode olapScanNode : olapScanNodes) { + Set> bucketSeq2Bytes = olapScanNode.bucketSeq2Bytes.entrySet(); + if (!bucketIndexToBytes.isEmpty() && bucketIndexToBytes.size() != bucketSeq2Bytes.size()) { + throw new IllegalStateException("Illegal fragment " + fragment.getFragmentId() + + ", every OlapScanNode should has same bucket num"); + } + + for (Entry bucketSeq2Byte : bucketSeq2Bytes) { + Integer bucketIndex = bucketSeq2Byte.getKey(); + Long scanBytes = bucketSeq2Byte.getValue(); + bucketIndexToBytes.merge(bucketIndex, scanBytes, Long::sum); + } + } + return bucketIndexToBytes; + } + + private TScanRangeParams buildScanReplicaParams( + TScanRangeLocations tabletLocation, TScanRangeLocation replicaLocation) { + TScanRangeParams replicaParam = new TScanRangeParams(); + replicaParam.scan_range = tabletLocation.scan_range; + // Volume is optional, so we need to set the value and the is-set bit + replicaParam.setVolumeId(replicaLocation.volume_id); + return replicaParam; + } + + private WorkerWorkload getWorkload(Worker worker) { + return workloads.computeIfAbsent(worker, w -> new WorkerWorkload()); + } + + private static class SelectResult { + private final Worker selectWorker; + private final TScanRangeLocation selectReplica; + private final long bytes; + + public SelectResult(Worker selecteWorker, TScanRangeLocation selectReplica, long bytes) { + this.selectWorker = selecteWorker; + this.selectReplica = selectReplica; + this.bytes = bytes; + } + } + + private static class WorkerWorkload implements Comparable { + private int taskNum; + private long scanBytes; + + public WorkerWorkload() { + this(0, 1); + } + + public WorkerWorkload(int taskNum, long scanBytes) { + this.taskNum = taskNum; + this.scanBytes = scanBytes; + } + + public void recordOneScanTask(long scanBytes) { + this.scanBytes += scanBytes; + } + + // order by scanBytes asc, taskNum asc + @Override + public int compareTo(WorkerWorkload workerWorkload) { + int compareScanBytes = Long.compare(this.scanBytes, workerWorkload.scanBytes); + if (compareScanBytes != 0) { + return compareScanBytes; + } + return taskNum - workerWorkload.taskNum; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/ScanWorkerSelector.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/ScanWorkerSelector.java new file mode 100644 index 000000000000000..55dc9d2b2651e43 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/ScanWorkerSelector.java @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker; + +import org.apache.doris.nereids.worker.job.ScanRanges; +import org.apache.doris.nereids.worker.job.UnassignedNearStorageJob; +import org.apache.doris.planner.ScanNode; + +import java.util.Map; + +/** ScanWorkerSelector */ +public interface ScanWorkerSelector { + // for a scan job: + // 1. select some workers + // 2. select replicas for each worker + // + // return + // key: backend + // value: which data should scan + Map> selectReplicaAndWorkerWithoutBucket( + UnassignedNearStorageJob unassignedJob); + + // return + // key: Worker, the backend which will process this fragment + // value.key: Integer, the bucket index, from 0 to (bucket_num - 1) + // for example, create table statement contains: distributed by hash(id) buckets 10, + // the bucket index will from 0 to 9 + // value.value.key: ScanNode, which ScanNode the worker will process scan task + // value.value.value: ScanRanges, the tablets in current bucket, + // for example, colocate table `tbl` has 2 range partitions: + // p1 values[(1), (10)) and p2 values[(10), 11) with integer partition column part, + // and distributed by hash(id) buckets 10. And, so, there has 10 buckets from bucket 0 to + // bucket 9, and every bucket contains two tablets, because there are two partitions. + Map>> selectReplicaAndWorkerWithBucket( + UnassignedNearStorageJob unassignedJob); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/Worker.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/Worker.java new file mode 100644 index 000000000000000..8d1179fc1e20124 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/Worker.java @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker; + +/** Worker */ +public interface Worker extends Comparable { + long id(); + + // ipv4/ipv6 address + String address(); + + String host(); + + int port(); + + // whether is this worker alive? + boolean available(); + + @Override + default int compareTo(Worker worker) { + return address().compareTo(worker.address()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerGroup.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerGroup.java new file mode 100644 index 000000000000000..eeed506f1275a74 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerGroup.java @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker; + +import java.util.List; + +/** WorkerGroup */ +public interface WorkerGroup { + AllWorkers ALL_WORKERS = new AllWorkers(); + AllAliveWorkers ALL_ALIVE_WORKERS = new AllAliveWorkers(); + + List filterWorkers(WorkerPool workerPool); + + /** AllWorkers */ + class AllWorkers implements WorkerGroup { + private AllWorkers() {} + + @Override + public List filterWorkers(WorkerPool workerPool) { + return workerPool.getAvailableWorkers(); + } + } + + /** AllAliveWorkers */ + class AllAliveWorkers implements WorkerGroup { + private AllAliveWorkers() {} + + public List filterWorkers(WorkerPool workerPool) { + return workerPool.getAvailableWorkers(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerManager.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerManager.java new file mode 100644 index 000000000000000..5db890d78227778 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerManager.java @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker; + +/** WorkerManager */ +public interface WorkerManager { + Worker getWorker(long backendId); + + Worker randomAvailableWorker(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerPool.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerPool.java new file mode 100644 index 000000000000000..ced498e5fa578b8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerPool.java @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker; + +import java.util.List; + +/** WorkerPool */ +public interface WorkerPool { + List getAvailableWorkers(); + + Worker getWorker(String address); + + List getWorkers(); + + List getWorkers(WorkerGroup workerGroup); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/Workload.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/Workload.java new file mode 100644 index 000000000000000..f67c3660c1520ce --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/Workload.java @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker; + +/** Workload */ +public interface Workload extends Comparable { +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AbstractUnassignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AbstractUnassignedJob.java new file mode 100644 index 000000000000000..1e03d97a12429a8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AbstractUnassignedJob.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker.job; + +import org.apache.doris.nereids.trees.AbstractTreeNode; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.ScanNode; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** AbstractUnassignedJob */ +public abstract class AbstractUnassignedJob + extends AbstractTreeNode implements UnassignedJob { + protected final PlanFragment fragment; + protected final List scanNodes; + protected final Map exchangeToChildJob; + + /** AbstractUnassignedJob */ + public AbstractUnassignedJob(PlanFragment fragment, List scanNodes, + Map exchangeToChildJob) { + super(Utils.fastToImmutableList(exchangeToChildJob.values())); + this.fragment = Objects.requireNonNull(fragment, "fragment can not be null"); + this.scanNodes = Utils.fastToImmutableList( + Objects.requireNonNull(scanNodes, "scanNodes can not be null") + ); + this.exchangeToChildJob + = Objects.requireNonNull(exchangeToChildJob, "exchangeToChildJob can not be null"); + } + + @Override + public PlanFragment getFragment() { + return fragment; + } + + @Override + public List getScanNodes() { + return scanNodes; + } + + @Override + public Map getExchangeToChildJob() { + return exchangeToChildJob; + } + + @Override + public String toString() { + return getClass().getSimpleName() + ": " + fragment.getFragmentId(); + } + + @Override + public UnassignedJob withChildren(List children) { + throw new UnsupportedOperationException(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJob.java new file mode 100644 index 000000000000000..6e467042968876c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJob.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker.job; + +import org.apache.doris.nereids.worker.Worker; + +import java.util.Set; + +/** + * AssignedJob. + * for example: an instance job in a fragment job, which already assign to a worker and some data sources + */ +public interface AssignedJob { + int indexInUnassignedJob(); + + UnassignedJob unassignedJob(); + + Worker getAssignedWorker(); + + ScanSource getScanSource(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJobBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJobBuilder.java new file mode 100644 index 000000000000000..42eb3e3ead35216 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJobBuilder.java @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker.job; + +import org.apache.doris.nereids.worker.BackendWorkerManager; +import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.PlanFragmentId; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** AssignedJobBuilder */ +public class AssignedJobBuilder { + /** buildJobs */ + public static ListMultimap buildJobs( + Map unassignedJobs) { + BackendWorkerManager workerManager = new BackendWorkerManager(); + ListMultimap allAssignedJobs = ArrayListMultimap.create(); + for (Entry kv : unassignedJobs.entrySet()) { + PlanFragmentId fragmentId = kv.getKey(); + UnassignedJob unassignedJob = kv.getValue(); + ListMultimap inputAssignedJobs + = getInputAssignedJobs(unassignedJob, allAssignedJobs); + List fragmentAssignedJobs = + unassignedJob.computeAssignedJobs(workerManager, inputAssignedJobs); + allAssignedJobs.putAll(fragmentId, fragmentAssignedJobs); + } + return allAssignedJobs; + } + + private static ListMultimap getInputAssignedJobs( + UnassignedJob unassignedJob, ListMultimap assignedJobs) { + ListMultimap inputJobs = ArrayListMultimap.create(); + for (Entry exchangeNodeToChildJob + : unassignedJob.getExchangeToChildJob().entrySet()) { + ExchangeNode exchangeNode = exchangeNodeToChildJob.getKey(); + UnassignedJob childJob = exchangeNodeToChildJob.getValue(); + inputJobs.putAll(exchangeNode, assignedJobs.get(childJob.getFragment().getFragmentId())); + } + return inputJobs; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJobImpl.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJobImpl.java new file mode 100644 index 000000000000000..4beedd58520cb74 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJobImpl.java @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker.job; + +import org.apache.doris.nereids.worker.Worker; + +import java.util.Objects; + +/** AssignedJobImpl */ +public class AssignedJobImpl implements AssignedJob { + private final int indexInUnassignedJob; + private final UnassignedJob unassignedJob; + private final Worker worker; + private final ScanSource scanSource; + + public AssignedJobImpl( + int indexInUnassignedJob, UnassignedJob unassignedJob, Worker worker, + ScanSource scanSource) { + this.indexInUnassignedJob = indexInUnassignedJob; + this.unassignedJob = Objects.requireNonNull(unassignedJob, "unassignedJob can not be null"); + this.worker = worker; + this.scanSource = Objects.requireNonNull(scanSource, "scanSource can not be null"); + } + + @Override + public int indexInUnassignedJob() { + return indexInUnassignedJob; + } + + @Override + public UnassignedJob unassignedJob() { + return unassignedJob; + } + + @Override + public Worker getAssignedWorker() { + return worker; + } + + @Override + public ScanSource getScanSource() { + return scanSource; + } + + @Override + public String toString() { + StringBuilder scanSourceString = new StringBuilder(); + scanSource.toString(scanSourceString, " "); + return "AssignedJobImpl(\n unassignedJob: " + unassignedJob + + ",\n worker: " + worker + "\n scanSource: " + scanSourceString + "\n)"; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/BucketScanSource.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/BucketScanSource.java new file mode 100644 index 000000000000000..488b4fd76ffb242 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/BucketScanSource.java @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker.job; + +import org.apache.doris.planner.ScanNode; + +import java.util.Map; +import java.util.Map.Entry; + +/** BucketScanSource */ +public class BucketScanSource extends ScanSource { + // for example: + // 1. bucket 0 use OlapScanNode(tableName=`tbl`) to scan with tablet: [tablet 10001, tablet 10003] + // 2. bucket 1 use OlapScanNode(tableName=`tbl`) to scan with tablet: [tablet 10002, tablet 10004] + public final Map> bucketIndexToScanNodeToTablets; + + public BucketScanSource(Map> bucketIndexToScanNodeToTablets) { + this.bucketIndexToScanNodeToTablets = bucketIndexToScanNodeToTablets; + } + + public void toString(StringBuilder str, String prefix) { + int i = 0; + String nextIndent = prefix + " "; + str.append("{\n"); + for (Entry> entry : bucketIndexToScanNodeToTablets.entrySet()) { + Integer bucketId = entry.getKey(); + Map scanNodeToScanRanges = entry.getValue(); + str.append(prefix).append(" bucket ").append(bucketId).append(": "); + DefaultScanSource.toString(scanNodeToScanRanges, str, nextIndent); + if (++i < bucketIndexToScanNodeToTablets.size()) { + str.append(",\n"); + } + } + str.append("\n").append(prefix).append("}"); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/CustomAssignmentJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/CustomAssignmentJob.java new file mode 100644 index 000000000000000..b1f9d6d8a875a13 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/CustomAssignmentJob.java @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker.job; + +import org.apache.doris.planner.ExchangeNode; + +import com.google.common.collect.ListMultimap; + +import java.util.List; + +/** CustomAssignmentJob */ +public interface CustomAssignmentJob { + List customAssignment(ListMultimap inputJobs); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DefaultScanSource.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DefaultScanSource.java new file mode 100644 index 000000000000000..2025b71b36027aa --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DefaultScanSource.java @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker.job; + +import org.apache.doris.planner.ScanNode; + +import java.util.Map; +import java.util.Map.Entry; + +/** DefaultScanSource */ +public class DefaultScanSource extends ScanSource { + // for example: + // 1. use OlapScanNode(tableName=`tbl1`) to scan with tablet: [tablet 10001, tablet 10002] + // 2. use OlapScanNode(tableName=`tbl2`) to scan with tablet: [tablet 10003, tablet 10004] + public final Map scanNodeToTablets; + + public DefaultScanSource(Map scanNodeToTablets) { + this.scanNodeToTablets = scanNodeToTablets; + } + + @Override + public void toString(StringBuilder str, String prefix) { + toString(scanNodeToTablets, str, prefix); + } + + public static void toString(Map scanNodeToScanRanges, StringBuilder str, String prefix) { + int i = 0; + String nextIndent = prefix + " "; + str.append("{\n"); + for (Entry entry : scanNodeToScanRanges.entrySet()) { + ScanNode scanNode = entry.getKey(); + ScanRanges scanRanges = entry.getValue(); + str.append(prefix).append(" {\n") + .append(prefix).append(" scanNode: ").append(scanNode).append(",\n") + .append(prefix).append(" scanRanges: "); + + scanRanges.toString(str, nextIndent); + str.append("\n").append(prefix).append(" }"); + + if (++i < scanNodeToScanRanges.size()) { + str.append(",\n"); + } + } + str.append("\n").append(prefix).append("}"); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanRanges.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanRanges.java new file mode 100644 index 000000000000000..11592f4b47f8c30 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanRanges.java @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker.job; + +import org.apache.doris.thrift.TPaloScanRange; +import org.apache.doris.thrift.TScanRangeParams; + +import com.google.common.collect.Lists; + +import java.util.List; + +/**ScanRanges */ +public class ScanRanges { + // usually, it's tablets + public final List params; + // size corresponding to tablets one by one + public final List bytes; + public long totalBytes; + + public ScanRanges() { + this(Lists.newArrayList(), Lists.newArrayList()); + } + + public ScanRanges(List params, List bytes) { + this.params = params; + this.bytes = bytes; + long totalBytes = 0; + for (Long size : bytes) { + totalBytes += size; + } + this.totalBytes = totalBytes; + } + + public void addScanRange(TScanRangeParams params, long bytes) { + this.params.add(params); + this.bytes.add(bytes); + this.totalBytes += bytes; + } + + @Override + public String toString() { + StringBuilder str = new StringBuilder(); + toString(str, ""); + return str.toString(); + } + + public void toString(StringBuilder str, String prefix) { + str.append("ScanRanges(bytes:" + totalBytes + ", ranges: [\n"); + for (int i = 0; i < params.size(); i++) { + str.append(prefix).append(" " + toString(params.get(i)) + ", bytes: " + bytes.get(i)); + if (i + 1 < params.size()) { + str.append(",\n"); + } + } + str.append("\n").append(prefix).append("])"); + } + + private String toString(TScanRangeParams scanRange) { + TPaloScanRange paloScanRange = scanRange.getScanRange().getPaloScanRange(); + if (paloScanRange != null) { + return "tablet " + paloScanRange.getTabletId(); + } else { + return scanRange.toString(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanSource.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanSource.java new file mode 100644 index 000000000000000..0b123bcd1f471fc --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanSource.java @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker.job; + +/** ScanSource */ +public abstract class ScanSource { + @Override + public String toString() { + StringBuilder str = new StringBuilder(); + toString(str, ""); + return str.toString(); + } + + abstract void toString(StringBuilder str, String prefix); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJob.java new file mode 100644 index 000000000000000..b96e5fa5e5cd763 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJob.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker.job; + +import org.apache.doris.nereids.trees.TreeNode; +import org.apache.doris.nereids.worker.Worker; +import org.apache.doris.nereids.worker.WorkerManager; +import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.ScanNode; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ListMultimap; + +import java.util.List; +import java.util.Map; + +/** + * WorkerJob. + * for example: a fragment job, which doesn't parallelization to some instance jobs and also no worker to invoke it + */ +public interface UnassignedJob extends TreeNode { + PlanFragment getFragment(); + + List getScanNodes(); + + Map getExchangeToChildJob(); + + default List computeAssignedJobs( + WorkerManager workerManager, ListMultimap inputJobs) { + return ImmutableList.of(); + } + + // generate an instance job + // e.g. build an instance job by a backends and the replica ids it contains + default AssignedJob assignWorkerAndDataSources( + int instanceIndexInFragment, Worker worker, ScanSource scanSource) { + return new AssignedJobImpl(instanceIndexInFragment, this, worker, scanSource); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJobBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJobBuilder.java new file mode 100644 index 000000000000000..8b90cb8f24925e9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJobBuilder.java @@ -0,0 +1,204 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker.job; + +import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping; +import org.apache.doris.nereids.worker.LoadBalanceScanWorkerSelector; +import org.apache.doris.nereids.worker.ScanWorkerSelector; +import org.apache.doris.planner.DataSink; +import org.apache.doris.planner.DataStreamSink; +import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.PlanFragmentId; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.planner.ScanNode; + +import com.google.common.collect.Maps; + +import java.util.IdentityHashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * UnassignedJobBuilder. + * build UnassignedJob by fragment + */ +public class UnassignedJobBuilder { + private final ScanWorkerSelector scanWorkerSelector = new LoadBalanceScanWorkerSelector(); + + /** + * build job from fragment. + */ + public static FragmentIdMapping buildJobs(FragmentIdMapping fragments) { + UnassignedJobBuilder builder = new UnassignedJobBuilder(); + + FragmentLineage fragmentLineage = buildFragmentLineage(fragments); + FragmentIdMapping unassignedJobs = new FragmentIdMapping<>(); + + // build from leaf to parent + for (Entry kv : fragments.entrySet()) { + PlanFragmentId fragmentId = kv.getKey(); + PlanFragment fragment = kv.getValue(); + + Map inputJobs = findInputJobs( + fragmentLineage, fragmentId, unassignedJobs); + UnassignedJob unassignedJob = builder.buildJob(fragment, inputJobs); + unassignedJobs.put(fragmentId, unassignedJob); + } + return unassignedJobs; + } + + private UnassignedJob buildJob( + PlanFragment planFragment, Map inputJobs) { + List scanNodes = collectScanNodesInThisFragment(planFragment); + if (planFragment.specifyInstances.isPresent()) { + return buildSpecifyInstancesJob(planFragment, scanNodes, inputJobs); + } else if (!scanNodes.isEmpty() || isLeafFragment(planFragment)) { + return buildLeafOrScanJob(planFragment, scanNodes, inputJobs); + } else { + return buildShuffleJob(planFragment, inputJobs); + } + } + + private UnassignedJob buildLeafOrScanJob( + PlanFragment planFragment, List scanNodes, Map inputJobs) { + boolean hasOlapScanNodes = hasOlapScanNode(scanNodes); + if (hasOlapScanNodes) { + // we need assign a backend which contains the data, + // so that the OlapScanNode can find the data in the backend + return buildScanLocalTableJob(planFragment, scanNodes, inputJobs, scanWorkerSelector); + } else if (scanNodes.isEmpty()) { + // select constant without table, + // e.g. select 100 union select 200 + return buildQueryConstantJob(planFragment); + } else { + // only scan external tables or cloud tables or table valued functions + // e,g. select * from numbers('number'='100') + return buildScanRemoteTableJob(planFragment, scanNodes, inputJobs); + } + } + + private UnassignedJob buildSpecifyInstancesJob( + PlanFragment planFragment, List scanNodes, Map inputJobs) { + return new UnassignedSpecifyInstancesJob(planFragment, scanNodes, inputJobs); + } + + private UnassignedScanNativeTableJob buildScanLocalTableJob( + PlanFragment planFragment, List scanNodes, Map inputJobs, + ScanWorkerSelector scanWorkerSelector) { + return new UnassignedScanNativeTableJob(planFragment, scanNodes, inputJobs, scanWorkerSelector); + } + + private List collectScanNodesInThisFragment(PlanFragment planFragment) { + return planFragment.getPlanRoot().collectInCurrentFragment(ScanNode.class::isInstance); + } + + private boolean hasOlapScanNode(List scanNodes) { + for (ScanNode scanNode : scanNodes) { + if (scanNode instanceof OlapScanNode) { + return true; + } + } + return false; + } + + private boolean isLeafFragment(PlanFragment planFragment) { + return planFragment.getChildren().isEmpty(); + } + + private UnassignedQueryConstantJob buildQueryConstantJob(PlanFragment planFragment) { + return new UnassignedQueryConstantJob(planFragment); + } + + private UnassignedScanRemoteTableJob buildScanRemoteTableJob( + PlanFragment planFragment, List scanNodes, Map inputJobs) { + return new UnassignedScanRemoteTableJob(planFragment, scanNodes, inputJobs); + } + + private UnassignedShuffleJob buildShuffleJob( + PlanFragment planFragment, Map inputJobs) { + return new UnassignedShuffleJob(planFragment, inputJobs); + } + + private static Map findInputJobs( + FragmentLineage lineage, PlanFragmentId fragmentId, FragmentIdMapping unassignedJobs) { + Map inputJobs = new IdentityHashMap<>(); + Map exchangeNodes = lineage.parentFragmentToExchangeNode.get(fragmentId); + if (exchangeNodes != null) { + for (Entry idToExchange : exchangeNodes.entrySet()) { + PlanNodeId exchangeId = idToExchange.getKey(); + ExchangeNode exchangeNode = idToExchange.getValue(); + PlanFragmentId childFragmentId = lineage.exchangeToChildFragment.get(exchangeId); + inputJobs.put(exchangeNode, unassignedJobs.get(childFragmentId)); + } + } + return inputJobs; + } + + private static List collectExchangeNodesInThisFragment(PlanFragment planFragment) { + return planFragment + .getPlanRoot() + .collectInCurrentFragment(ExchangeNode.class::isInstance); + } + + private static FragmentLineage buildFragmentLineage( + FragmentIdMapping fragments) { + Map exchangeToChildFragment = new LinkedHashMap<>(); + FragmentIdMapping> parentFragmentToExchangeNode = new FragmentIdMapping<>(); + + for (PlanFragment fragment : fragments.values()) { + PlanFragmentId fragmentId = fragment.getFragmentId(); + + // 1. link child fragment to exchange node + DataSink sink = fragment.getSink(); + if (sink instanceof DataStreamSink) { + PlanNodeId exchangeNodeId = sink.getExchNodeId(); + exchangeToChildFragment.put(exchangeNodeId, fragmentId); + } + + // 2. link parent fragment to exchange node + List exchangeNodes = collectExchangeNodesInThisFragment(fragment); + Map exchangeNodesInFragment = Maps.newLinkedHashMap(); + for (ExchangeNode exchangeNode : exchangeNodes) { + exchangeNodesInFragment.put(exchangeNode.getId(), exchangeNode); + } + parentFragmentToExchangeNode.put(fragmentId, exchangeNodesInFragment); + } + + return new FragmentLineage(fragments, parentFragmentToExchangeNode, exchangeToChildFragment); + } + + // the class support find exchange nodes in the fragment, and find child fragment by exchange node id + private static class FragmentLineage { + private final FragmentIdMapping idToFragments; + private final FragmentIdMapping> parentFragmentToExchangeNode; + private final Map exchangeToChildFragment; + + public FragmentLineage( + FragmentIdMapping idToFragments, + FragmentIdMapping> parentFragmentToExchangeNode, + Map exchangeToChildFragment) { + this.idToFragments = idToFragments; + this.parentFragmentToExchangeNode = parentFragmentToExchangeNode; + this.exchangeToChildFragment = exchangeToChildFragment; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedNearStorageJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedNearStorageJob.java new file mode 100644 index 000000000000000..5e5158bb5c0d49d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedNearStorageJob.java @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker.job; + +import org.apache.doris.planner.ScanNode; + +import java.util.List; + +/** UnassignedNearStorageJob */ +public interface UnassignedNearStorageJob extends UnassignedJob { + List nearStorageScanNodes(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedQueryConstantJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedQueryConstantJob.java new file mode 100644 index 000000000000000..ae7a2540ae646c0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedQueryConstantJob.java @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker.job; + +import org.apache.doris.nereids.worker.Worker; +import org.apache.doris.nereids.worker.WorkerManager; +import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.PlanFragment; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ListMultimap; + +import java.util.List; + +/** UnassignedQueryConstantJob */ +public class UnassignedQueryConstantJob extends AbstractUnassignedJob { + public UnassignedQueryConstantJob(PlanFragment fragment) { + super(fragment, ImmutableList.of(), ImmutableMap.of()); + } + + @Override + public List computeAssignedJobs(WorkerManager workerManager, + ListMultimap inputJobs) { + Worker randomWorker = workerManager.randomAvailableWorker(); + return ImmutableList.of( + new AssignedJobImpl(0, this, randomWorker, + new DefaultScanSource(ImmutableMap.of()) + ) + ); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanNativeTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanNativeTableJob.java new file mode 100644 index 000000000000000..b08c1bb4aad8fa8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanNativeTableJob.java @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker.job; + +import org.apache.doris.nereids.worker.ScanWorkerSelector; +import org.apache.doris.nereids.worker.Worker; +import org.apache.doris.nereids.worker.WorkerManager; +import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.ScanNode; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; + +/** + * UnassignedScanNativeTableJob. + * scan native olap table, we can assign a worker near the storage + */ +public class UnassignedScanNativeTableJob extends AbstractUnassignedJob implements UnassignedNearStorageJob { + private final ScanWorkerSelector scanWorkerSelector; + private final List olapScanNodes; + + /** UnassignedScanNativeTableJob */ + public UnassignedScanNativeTableJob( + PlanFragment fragment, List allScanNodes, + Map exchangeToChildJob, + ScanWorkerSelector scanWorkerSelector) { + super(fragment, allScanNodes, exchangeToChildJob); + this.scanWorkerSelector = Objects.requireNonNull( + scanWorkerSelector, "scanWorkerSelector cat not be null"); + + // filter scan nodes + ImmutableList.Builder olapScanNodes = ImmutableList.builderWithExpectedSize(allScanNodes.size()); + for (ScanNode allScanNode : allScanNodes) { + if (allScanNode instanceof OlapScanNode) { + olapScanNodes.add((OlapScanNode) allScanNode); + } + } + this.olapScanNodes = olapScanNodes.build(); + } + + @Override + public List nearStorageScanNodes() { + return (List) olapScanNodes; + } + + @Override + public List computeAssignedJobs( + WorkerManager workerManager, ListMultimap inputJobs) { + if (shouldAssignByBucket()) { + return assignWithBucket(); + } else { + return assignWithoutBucket(); + } + } + + private boolean shouldAssignByBucket() { + if (fragment.hasColocatePlanNode()) { + return true; + } + if (enableBucketShuffleJoin() && fragment.isBucketShuffleJoinInput()) { + return true; + } + return false; + } + + private boolean enableBucketShuffleJoin() { + if (ConnectContext.get() != null) { + SessionVariable sessionVariable = ConnectContext.get().getSessionVariable(); + if (!sessionVariable.isEnableBucketShuffleJoin() && !sessionVariable.isEnableNereidsPlanner()) { + return false; + } + } + return true; + } + + private List assignWithBucket() { + Map>> workerToReplicas + = scanWorkerSelector.selectReplicaAndWorkerWithBucket(this); + + List assignments = Lists.newArrayListWithCapacity(workerToReplicas.size()); + int instanceIndexInFragment = 0; + for (Entry>> entry : workerToReplicas.entrySet()) { + Worker selectedWorker = entry.getKey(); + Map> bucketIndexToScanNodeToToReplicas = entry.getValue(); + + AssignedJob instanceJob = assignWorkerAndDataSources( + instanceIndexInFragment++, selectedWorker, + new BucketScanSource(bucketIndexToScanNodeToToReplicas) + ); + assignments.add(instanceJob); + } + return assignments; + } + + private List assignWithoutBucket() { + Map> workerToReplicas + = scanWorkerSelector.selectReplicaAndWorkerWithoutBucket(this); + + List assignments = Lists.newArrayListWithCapacity(workerToReplicas.size()); + int instanceIndexInFragment = 0; + for (Entry> entry : workerToReplicas.entrySet()) { + Worker selectedWorker = entry.getKey(); + Map scanNodeToRanges = entry.getValue(); + AssignedJob instanceJob = assignWorkerAndDataSources( + instanceIndexInFragment++, selectedWorker, new DefaultScanSource(scanNodeToRanges) + ); + assignments.add(instanceJob); + } + return assignments; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanRemoteTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanRemoteTableJob.java new file mode 100644 index 000000000000000..27841ca26ced121 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanRemoteTableJob.java @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker.job; + +import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.ScanNode; + +import java.util.List; +import java.util.Map; + +/** + * UnassignedScanRemoteTableJob + * it should be a leaf job which not contains scan native olap table node, + * for example, select literal without table, or scan an external table + */ +public class UnassignedScanRemoteTableJob extends AbstractUnassignedJob { + private int assignedJobNum; + + public UnassignedScanRemoteTableJob( + PlanFragment fragment, List scanNodes, Map exchangeToChildJob) { + super(fragment, scanNodes, exchangeToChildJob); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedShuffleJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedShuffleJob.java new file mode 100644 index 000000000000000..573bec43746fa3f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedShuffleJob.java @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker.job; + +import org.apache.doris.nereids.worker.Worker; +import org.apache.doris.nereids.worker.WorkerManager; +import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** UnassignedExchangeJob */ +public class UnassignedShuffleJob extends AbstractUnassignedJob { + public UnassignedShuffleJob(PlanFragment fragment, Map exchangeToChildJob) { + super(fragment, ImmutableList.of(), exchangeToChildJob); + } + + @Override + public List computeAssignedJobs( + WorkerManager workerManager, ListMultimap inputJobs) { + int expectInstanceNum = degreeOfParallelism(); + List biggestParallelChildFragment = getInstancesOfBiggestParallelChildFragment(inputJobs); + + if (expectInstanceNum > 0 && expectInstanceNum < biggestParallelChildFragment.size()) { + List shuffleWorkersInBiggestParallelChildFragment = shuffleWorkers(biggestParallelChildFragment); + + // random select expectInstanceNum workers in the biggestParallelChildFragment + Function workerSelector = shuffleWorkersInBiggestParallelChildFragment::get; + return buildInstances(expectInstanceNum, workerSelector); + } else { + // select workers based on the same position as biggestParallelChildFragment + Function workerSelector = + instanceIndex -> biggestParallelChildFragment.get(instanceIndex).getAssignedWorker(); + return buildInstances(biggestParallelChildFragment.size(), workerSelector); + } + } + + private int degreeOfParallelism() { + if (!fragment.getDataPartition().isPartitioned()) { + return 1; + } + + int expectInstanceNum = -1; + if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) { + expectInstanceNum = ConnectContext.get().getSessionVariable().getExchangeInstanceParallel(); + } + return expectInstanceNum; + } + + private List getInstancesOfBiggestParallelChildFragment( + ListMultimap inputJobs) { + int maxInstanceNum = -1; + List biggestParallelChildFragment = ImmutableList.of(); + // skip broadcast exchange + for (Entry> exchangeToChildInstances : inputJobs.asMap().entrySet()) { + List instances = (List) exchangeToChildInstances.getValue(); + if (instances.size() > maxInstanceNum) { + biggestParallelChildFragment = instances; + maxInstanceNum = instances.size(); + } + } + return biggestParallelChildFragment; + } + + private List buildInstances( + int instanceNum, + Function workerSelector) { + ImmutableList.Builder instances = ImmutableList.builderWithExpectedSize(instanceNum); + for (int i = 0; i < instanceNum; i++) { + Worker selectedWorker = workerSelector.apply(i); + AssignedJob assignedJob = assignWorkerAndDataSources( + i, selectedWorker, new DefaultScanSource(ImmutableMap.of()) + ); + instances.add(assignedJob); + } + return instances.build(); + } + + private List shuffleWorkers(List instances) { + List candidateWorkers = Lists.newArrayList(); + for (AssignedJob instance : instances) { + candidateWorkers.add(instance.getAssignedWorker()); + } + Collections.shuffle(candidateWorkers); + return candidateWorkers; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedSpecifyInstancesJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedSpecifyInstancesJob.java new file mode 100644 index 000000000000000..f9a66a96552f3da --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedSpecifyInstancesJob.java @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker.job; + +import org.apache.doris.nereids.worker.WorkerManager; +import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.NereidsSpecifyInstances; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.ScanNode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ListMultimap; + +import java.util.List; +import java.util.Map; + +/** UnassignedSpecifyInstancesJob */ +public class UnassignedSpecifyInstancesJob extends AbstractUnassignedJob { + private final NereidsSpecifyInstances specifyInstances; + + public UnassignedSpecifyInstancesJob( + PlanFragment fragment, List scanNodes, + Map exchangeToChildJob) { + super(fragment, scanNodes, exchangeToChildJob); + Preconditions.checkArgument(fragment.specifyInstances.isPresent(), + "Missing fragment specifyInstances"); + this.specifyInstances = fragment.specifyInstances.get(); + } + + @Override + public List computeAssignedJobs(WorkerManager workerManager, + ListMultimap inputJobs) { + return specifyInstances.buildAssignedJobs(this); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/WorkerScanSource.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/WorkerScanSource.java new file mode 100644 index 000000000000000..dbb6ed130aeaa2e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/WorkerScanSource.java @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/PlanFragment.java +// and modified by Doris + +package org.apache.doris.nereids.worker.job; + +import org.apache.doris.nereids.worker.Worker; + +/** WorkerScanSource */ +public class WorkerScanSource { + public final Worker worker; + public final S scanSource; + + public WorkerScanSource(Worker worker, S scanSource) { + this.worker = worker; + this.scanSource = scanSource; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BucketSpecifyInstances.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BucketSpecifyInstances.java new file mode 100644 index 000000000000000..3b775145981d195 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BucketSpecifyInstances.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/PlanFragment.java +// and modified by Doris + +package org.apache.doris.planner; + +import org.apache.doris.nereids.worker.job.BucketScanSource; +import org.apache.doris.nereids.worker.job.WorkerScanSource; + +import java.util.List; + +/** DefaultNereidsSpecifyInstances */ +public class BucketSpecifyInstances extends NereidsSpecifyInstances { + public BucketSpecifyInstances(List> workerScanSources) { + super(workerScanSources); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DefaultSpecifyInstances.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DefaultSpecifyInstances.java new file mode 100644 index 000000000000000..bda38b28614ea7b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DefaultSpecifyInstances.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/PlanFragment.java +// and modified by Doris + +package org.apache.doris.planner; + +import org.apache.doris.nereids.worker.job.DefaultScanSource; +import org.apache.doris.nereids.worker.job.WorkerScanSource; + +import java.util.List; + +/** DefaultSpecifyInstances */ +public class DefaultSpecifyInstances extends NereidsSpecifyInstances { + public DefaultSpecifyInstances(List> workerToScanSources) { + super(workerToScanSources); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/NereidsSpecifyInstances.java b/fe/fe-core/src/main/java/org/apache/doris/planner/NereidsSpecifyInstances.java new file mode 100644 index 000000000000000..1d65fbd8e712ca2 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NereidsSpecifyInstances.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/PlanFragment.java +// and modified by Doris + +package org.apache.doris.planner; + +import org.apache.doris.nereids.worker.Worker; +import org.apache.doris.nereids.worker.job.AssignedJob; +import org.apache.doris.nereids.worker.job.AssignedJobImpl; +import org.apache.doris.nereids.worker.job.ScanSource; +import org.apache.doris.nereids.worker.job.UnassignedJob; +import org.apache.doris.nereids.worker.job.WorkerScanSource; + +import com.google.common.collect.Lists; + +import java.util.List; +import java.util.Objects; + +/** NereidsSpecifyInstances */ +public abstract class NereidsSpecifyInstances { + public final List> workerScanSources; + + public NereidsSpecifyInstances(List> workerScanSources) { + this.workerScanSources = Objects.requireNonNull(workerScanSources, + "workerScanSources can not be null"); + } + + public List buildAssignedJobs(UnassignedJob unassignedJob) { + List instances = Lists.newArrayListWithCapacity(workerScanSources.size()); + int instanceNum = 0; + for (WorkerScanSource workerToScanSource: workerScanSources) { + Worker worker = workerToScanSource.worker; + ScanSource scanSource = workerToScanSource.scanSource; + AssignedJobImpl assignedJob = new AssignedJobImpl(instanceNum++, unassignedJob, worker, scanSource); + instances.add(assignedJob); + } + return instances; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 3e32853119b6c48..4b80793a40555f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -170,6 +170,8 @@ public class OlapScanNode extends ScanNode { private int selectedPartitionNum = 0; private Collection selectedPartitionIds = Lists.newArrayList(); private long totalBytes = 0; + // tablet id to single replica bytes + private Map tabletBytes = Maps.newLinkedHashMap(); private SortInfo sortInfo = null; private Set outputColumnUniqueIds = new HashSet<>(); @@ -192,6 +194,7 @@ public class OlapScanNode extends ScanNode { // a bucket seq may map to many tablets, and each tablet has a // TScanRangeLocations. public ArrayListMultimap bucketSeq2locations = ArrayListMultimap.create(); + public Map bucketSeq2Bytes = Maps.newLinkedHashMap(); boolean isFromPrepareStmt = false; // For point query @@ -749,6 +752,10 @@ public void updateScanRangeVersions(Map visibleVersionMap) { } } + public Long getTabletSingleReplicaSize(Long tabletId) { + return tabletBytes.get(tabletId); + } + private void addScanRangeLocations(Partition partition, List tablets) throws UserException { long visibleVersion = Partition.PARTITION_INIT_VERSION; @@ -878,6 +885,9 @@ private void addScanRangeLocations(Partition partition, boolean tabletIsNull = true; boolean collectedStat = false; List errs = Lists.newArrayList(); + + int replicaInTablet = 0; + long oneReplicaBytes = 0; for (Replica replica : replicas) { Backend backend = Env.getCurrentSystemInfo().getBackend(replica.getBackendId()); if (backend == null || !backend.isAlive()) { @@ -917,7 +927,13 @@ private void addScanRangeLocations(Partition partition, // for CBO if (!collectedStat && replica.getRowCount() != -1) { - totalBytes += replica.getDataSize(); + long dataSize = replica.getDataSize(); + if (replicaInTablet == 0) { + oneReplicaBytes = dataSize; + tabletBytes.put(tabletId, dataSize); + } + replicaInTablet++; + totalBytes += dataSize; collectedStat = true; } scanBackendIds.add(backend.getId()); @@ -935,8 +951,9 @@ private void addScanRangeLocations(Partition partition, scanRange.setPaloScanRange(paloRange); locations.setScanRange(scanRange); - bucketSeq2locations.put(tabletId2BucketSeq.get(tabletId), locations); - + Integer bucketSeq = tabletId2BucketSeq.get(tabletId); + bucketSeq2locations.put(bucketSeq, locations); + bucketSeq2Bytes.merge(bucketSeq, oneReplicaBytes, Long::sum); scanRangeLocations.add(locations); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index 2469d087cdd5a50..44146997b774039 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -27,6 +27,7 @@ import org.apache.doris.analysis.StatementBase; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.common.TreeNode; +import org.apache.doris.nereids.worker.job.ScanSource; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPartitionType; @@ -34,6 +35,7 @@ import org.apache.doris.thrift.TResultSinkType; import com.google.common.base.Preconditions; +import com.google.common.base.Suppliers; import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; @@ -43,7 +45,9 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -148,9 +152,12 @@ public class PlanFragment extends TreeNode { // has colocate plan node protected boolean hasColocatePlanNode = false; + protected final Supplier hasBucketShuffleJoin; private TResultSinkType resultSinkType = TResultSinkType.MYSQL_PROTOCAL; + public Optional> specifyInstances = Optional.empty(); + /** * C'tor for fragment with specific partition; the output is by default broadcast. */ @@ -162,6 +169,7 @@ public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition) { this.transferQueryStatisticsWithEveryBatch = false; this.builderRuntimeFilterIds = new HashSet<>(); this.targetRuntimeFilterIds = new HashSet<>(); + this.hasBucketShuffleJoin = buildHasBucketShuffleJoin(); setParallelExecNumIfExists(); setFragmentInPlanTree(planRoot); } @@ -178,6 +186,18 @@ public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition, this.targetRuntimeFilterIds = new HashSet<>(targetRuntimeFilterIds); } + private Supplier buildHasBucketShuffleJoin() { + return Suppliers.memoize(() -> { + List hashJoinNodes = getPlanRoot().collectInCurrentFragment(HashJoinNode.class::isInstance); + for (HashJoinNode hashJoinNode : hashJoinNodes) { + if (hashJoinNode.isBucketShuffle()) { + return true; + } + } + return false; + }); + } + /** * Assigns 'this' as fragment of all PlanNodes in the plan tree rooted at node. * Does not traverse the children of ExchangeNodes because those must belong to a @@ -240,6 +260,16 @@ public void setHasColocatePlanNode(boolean hasColocatePlanNode) { this.hasColocatePlanNode = hasColocatePlanNode; } + public boolean isBucketShuffleJoinInput() { + if (hasBucketShuffleJoin.get()) { + return true; + } + if (destNode != null && destNode.getFragment().hasBucketShuffleJoin.get()) { + return true; + } + return false; + } + public void setResultSinkType(TResultSinkType resultSinkType) { this.resultSinkType = resultSinkType; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 8cc18a527a86d6a..2b634d8554730fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -59,6 +59,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Predicate; import java.util.stream.Collectors; /** @@ -1255,4 +1257,27 @@ public void addIntermediateOutputTupleDescList(TupleDescriptor tupleDescriptor) public void addIntermediateProjectList(List exprs) { intermediateProjectListList.add(exprs); } + + public List collectInCurrentFragment(Predicate predicate) { + List result = Lists.newArrayList(); + foreachDownInCurrentFragment(child -> { + if (predicate.test(child)) { + result.add(child); + } + }); + return (List) result; + } + + /** foreachDownInCurrentFragment */ + public void foreachDownInCurrentFragment(Consumer visitor) { + int currentFragmentId = getFragmentId().asInt(); + foreachDown(child -> { + PlanNode childNode = (PlanNode) child; + if (childNode.getFragmentId().asInt() != currentFragmentId) { + return false; + } + visitor.accept(childNode); + return true; + }); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index a9bcc69c4a6d8ee..d4edc63d82b2d09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -42,6 +42,15 @@ import org.apache.doris.mysql.MysqlCommand; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.stats.StatsErrorEstimator; +import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan; +import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping; +import org.apache.doris.nereids.worker.Worker; +import org.apache.doris.nereids.worker.job.AssignedJob; +import org.apache.doris.nereids.worker.job.BucketScanSource; +import org.apache.doris.nereids.worker.job.DefaultScanSource; +import org.apache.doris.nereids.worker.job.ScanRanges; +import org.apache.doris.nereids.worker.job.ScanSource; +import org.apache.doris.nereids.worker.job.UnassignedJob; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.DataSink; import org.apache.doris.planner.DataStreamSink; @@ -184,6 +193,7 @@ public class Coordinator implements CoordInterface { // copied from TQueryExecRequest; constant across all fragments private final TDescriptorTable descTable; + private FragmentIdMapping distributedPlans; // scan node id -> TFileScanRangeParams private Map fileScanRangeParamsMap = Maps.newHashMap(); @@ -339,6 +349,8 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) { if (!useNereids) { // Enable local shuffle on pipelineX engine only if Nereids planner is applied. queryOptions.setEnableLocalShuffle(false); + } else { + distributedPlans = ((NereidsPlanner) planner).getDistributedPlans(); } setFromUserProperty(context); @@ -1934,9 +1946,96 @@ private boolean containsSetOperationNode(PlanNode node) { return false; } + private void setForDefaultScanSource(FInstanceExecParam instanceExecParam, DefaultScanSource scanSource) { + for (Entry scanNodeIdToReplicaIds : scanSource.scanNodeToTablets.entrySet()) { + ScanNode scanNode = scanNodeIdToReplicaIds.getKey(); + ScanRanges scanReplicas = scanNodeIdToReplicaIds.getValue(); + instanceExecParam.perNodeScanRanges.put(scanNode.getId().asInt(), scanReplicas.params); + } + } + + private void setForBucketScanSource(FInstanceExecParam instanceExecParam, BucketScanSource bucketScanSource) { + for (Integer bucketIndex : bucketScanSource.bucketIndexToScanNodeToTablets.keySet()) { + instanceExecParam.addBucketSeq(bucketIndex); + } + + for (Entry> bucketIndexToScanTablets : + bucketScanSource.bucketIndexToScanNodeToTablets.entrySet()) { + Integer bucketIndex = bucketIndexToScanTablets.getKey(); + Map scanNodeToRangeMap = bucketIndexToScanTablets.getValue(); + for (Entry scanNodeToRange : scanNodeToRangeMap.entrySet()) { + ScanNode scanNode = scanNodeToRange.getKey(); + ScanRanges scanRanges = scanNodeToRange.getValue(); + instanceExecParam.perNodeScanRanges.put(scanNode.getId().asInt(), scanRanges.params); + + if (scanNode instanceof OlapScanNode) { + OlapScanNode olapScanNode = (OlapScanNode) scanNode; + if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) { + // In bucket shuffle join, we have 2 situation. + // 1. Only one partition: in this case, we use scanNode.getTotalTabletsNum() to get the right bucket num + // because when table turn on dynamic partition, the bucket number in default distribution info + // is not correct. + // 2. Table is colocated: in this case, table could have more than one partition, but all partition's + // bucket number must be same, so we use default bucket num is ok. + int bucketNum = 0; + if (olapScanNode.getOlapTable().isColocateTable()) { + bucketNum = olapScanNode.getOlapTable().getDefaultDistributionInfo() + .getBucketNum(); + } else { + bucketNum = (int) (olapScanNode.getTotalTabletsNum()); + } + fragmentIdToSeqToAddressMap.put(olapScanNode.getFragmentId(), new HashMap<>()); + bucketShuffleJoinController.fragmentIdBucketSeqToScanRangeMap + .put(scanNode.getFragmentId(), new BucketSeqToScanRange()); + bucketShuffleJoinController.fragmentIdToBucketNumMap + .put(scanNode.getFragmentId(), bucketNum); + olapScanNode.getFragment().setBucketNum(bucketNum); + } + } + + BucketSeqToScanRange bucketSeqToScanRange = bucketShuffleJoinController. + fragmentIdBucketSeqToScanRangeMap.get(scanNode.getFragmentId()); + + Map> scanNodeIdToReplicas + = bucketSeqToScanRange.computeIfAbsent(bucketIndex, set -> Maps.newLinkedHashMap()); + List tablets = scanNodeIdToReplicas.computeIfAbsent( + scanNode.getId().asInt(), id -> new ArrayList<>()); + tablets.addAll(scanRanges.params); + } + } + } + // For each fragment in fragments, computes hosts on which to run the instances // and stores result in fragmentExecParams.hosts. private void computeFragmentHosts() throws Exception { + if (context.getState().isNereids && context.getSessionVariable().isEnableNereidsCoordinator()) { + for (DistributedPlan distributedPlan : distributedPlans.values()) { + UnassignedJob fragmentJob = distributedPlan.getFragmentJob(); + PlanFragment fragment = fragmentJob.getFragment(); + FragmentExecParams fragmentExecParams = fragmentExecParamsMap.computeIfAbsent( + fragment.getFragmentId(), id -> new FragmentExecParams(fragment) + ); + + bucketShuffleJoinController + .isBucketShuffleJoin(fragment.getFragmentId().asInt(), fragment.getPlanRoot()); + + for (AssignedJob instanceJob : distributedPlan.getInstanceJobs()) { + Worker worker = instanceJob.getAssignedWorker(); + TNetworkAddress address = new TNetworkAddress(worker.host(), worker.port()); + FInstanceExecParam instanceExecParam = new FInstanceExecParam( + null, address, 0, fragmentExecParams); + fragmentExecParams.instanceExecParams.add(instanceExecParam); + addressToBackendID.put(address, worker.id()); + ScanSource scanSource = instanceJob.getScanSource(); + if (scanSource instanceof BucketScanSource) { + setForBucketScanSource(instanceExecParam, (BucketScanSource) scanSource); + } else { + setForDefaultScanSource(instanceExecParam, (DefaultScanSource) scanSource); + } + } + } + return; + } // compute hosts of producer fragment before those of consumer fragment(s), // the latter might inherit the set of hosts from the former // compute hosts *bottom up*. @@ -2273,6 +2372,17 @@ private Map getReplicaNumPerHostForOlapTable() { // Populates scan_range_assignment_. // > protected void computeScanRangeAssignment() throws Exception { + if (context.getState().isNereids && context.getSessionVariable().isEnableNereidsCoordinator()) { + // for (ScanNode scanNode : scanNodes) { + // if (bucketShuffleJoinController + // .isBucketShuffleJoin(scanNode.getFragmentId().asInt(), scanNode.getFragment().getPlanRoot())) { + // Map replicaNumPerHost = getReplicaNumPerHostForOlapTable(); + // bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode, + // idToBackend, addressToBackendID, replicaNumPerHost); + // } + // } + return; + } Map assignedBytesPerHost = Maps.newHashMap(); Map replicaNumPerHost = getReplicaNumPerHostForOlapTable(); Collections.shuffle(scanNodes); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java new file mode 100644 index 000000000000000..d8caba578520dff --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.common.Status; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan; +import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping; +import org.apache.doris.thrift.TNetworkAddress; + +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; + +/** NereidsCoordinator */ +public class NereidsCoordinator implements CoordInterface { + private NereidsPlanner nereidsPlanner; + private FragmentIdMapping distributedPlans; + + public NereidsCoordinator(NereidsPlanner nereidsPlanner) { + this.nereidsPlanner = Objects.requireNonNull(nereidsPlanner, "nereidsPlanner can not be null"); + this.distributedPlans = Objects.requireNonNull( + nereidsPlanner.getDistributedPlans(), "distributedPlans can not be null" + ); + } + + @Override + public void exec() throws Exception { + // build fragment from leaf to root + } + + @Override + public RowBatch getNext() throws Exception { + RowBatch rowBatch = new RowBatch(); + rowBatch.setEos(true); + return rowBatch; + } + + @Override + public void cancel(Status cancelReason) { + + } + + @Override + public List getInvolvedBackends() { + return ImmutableList.of(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 615569e4c83efec..8b5368a90f0e460 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -289,6 +289,7 @@ public class SessionVariable implements Serializable, Writable { public static final String NTH_OPTIMIZED_PLAN = "nth_optimized_plan"; public static final String ENABLE_NEREIDS_PLANNER = "enable_nereids_planner"; + public static final String ENABLE_NEREIDS_COORDINATOR = "enable_nereids_coordinator"; public static final String DISABLE_NEREIDS_RULES = "disable_nereids_rules"; public static final String ENABLE_NEREIDS_RULES = "enable_nereids_rules"; public static final String ENABLE_NEW_COST_MODEL = "enable_new_cost_model"; @@ -1190,6 +1191,10 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { @VariableMgr.VarAttr(name = NEREIDS_STAR_SCHEMA_SUPPORT) private boolean nereidsStarSchemaSupport = true; + @VariableMgr.VarAttr(name = ENABLE_NEREIDS_COORDINATOR, needForward = true, + fuzzy = true, varType = VariableAnnotation.EXPERIMENTAL) + private boolean enableNereidsCoordinator = false; + @VariableMgr.VarAttr(name = REWRITE_OR_TO_IN_PREDICATE_THRESHOLD, fuzzy = true) private int rewriteOrToInPredicateThreshold = 2; @@ -2930,6 +2935,14 @@ public void setEnableNereidsPlanner(boolean enableNereidsPlanner) { this.enableNereidsPlanner = enableNereidsPlanner; } + public boolean isEnableNereidsCoordinator() { + return enableNereidsCoordinator; + } + + public void setEnableNereidsCoordinator(boolean enableNereidsCoordinator) { + this.enableNereidsCoordinator = enableNereidsCoordinator; + } + public int getNthOptimizedPlan() { return nthOptimizedPlan; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 4a127e514973f57..9f914bcfb891ec2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1837,8 +1837,12 @@ private void sendResult(boolean isOutfileQuery, boolean isSendFields, Queriable if (queryStmt instanceof SelectStmt && ((SelectStmt) parsedStmt).isPointQueryShortCircuit()) { coordBase = new PointQueryExec(planner, analyzer, context.getSessionVariable().getMaxMsgSizeOfResultReceiver()); + // } else if (context.getState().isNereids() + // && context.getSessionVariable().isEnableNereidsCoordinator() + // && planner instanceof NereidsPlanner) { + // coordBase = new NereidsCoordinator((NereidsPlanner) planner); } else { - coord = EnvFactory.getInstance().createCoordinator(context, analyzer, + coord = EnvFactory.getInstance().createCoordinator(context, analyzer, planner, context.getStatsErrorEstimator()); profile.addExecutionProfile(coord.getExecutionProfile()); QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),