From f8fd405dcdd012dd31aa3bf8c1eace3f9998e27f Mon Sep 17 00:00:00 2001 From: 924060929 <924060929@qq.com> Date: Thu, 6 Jun 2024 22:52:07 +0800 Subject: [PATCH] fix --- .../worker/LoadBalanceScanWorkerSelector.java | 16 +++++-- .../nereids/worker/ScanWorkerSelector.java | 3 +- .../nereids/worker/job/BucketScanSource.java | 5 ++ .../nereids/worker/job/DefaultScanSource.java | 17 +++++-- .../doris/nereids/worker/job/ScanSource.java | 8 ++++ .../job/UnassignedScanNativeTableJob.java | 48 +++++++++---------- .../worker/job/UnparallelizedScanSource.java | 30 ++++++++++++ .../java/org/apache/doris/qe/Coordinator.java | 2 +- 8 files changed, 94 insertions(+), 35 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnparallelizedScanSource.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java index 8b5d9bec8e2a191..69feed4ea1c26a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java @@ -19,8 +19,10 @@ import org.apache.doris.common.Pair; import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.worker.job.DefaultScanSource; import org.apache.doris.nereids.worker.job.ScanRanges; import org.apache.doris.nereids.worker.job.UnassignedJob; +import org.apache.doris.nereids.worker.job.UnparallelizedScanSource; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.PlanFragment; @@ -51,8 +53,8 @@ public class LoadBalanceScanWorkerSelector implements ScanWorkerSelector { private final Map workloads = Maps.newLinkedHashMap(); @Override - public Map selectReplicaAndWorkerWithoutBucket(ScanNode scanNode) { - Map workerScanRanges = Maps.newLinkedHashMap(); + public Map selectReplicaAndWorkerWithoutBucket(ScanNode scanNode) { + Map workerScanRanges = Maps.newLinkedHashMap(); // allScanRangesLocations is all scan ranges in all partition which need to scan List allScanRangesLocations = scanNode.getScanRangeLocations(0); for (TScanRangeLocations onePartitionOneScanRangeLocation : allScanRangesLocations) { @@ -61,8 +63,14 @@ public Map selectReplicaAndWorkerWithoutBucket(ScanNode scan WorkerScanRanges assigned = selectScanReplicaAndMinWorkloadWorker( onePartitionOneScanRangeLocation, bytes); - ScanRanges scanRanges = workerScanRanges.computeIfAbsent(assigned.worker, w -> new ScanRanges()); - scanRanges.addScanRanges(scanRanges); + UnparallelizedScanSource scanRanges = workerScanRanges.computeIfAbsent( + assigned.worker, + w -> new UnparallelizedScanSource( + new DefaultScanSource(ImmutableMap.of(scanNode, new ScanRanges())) + ) + ); + DefaultScanSource scanSource = (DefaultScanSource) scanRanges.scanSource; + scanSource.scanNodeToScanRanges.get(scanNode).addScanRanges(assigned.scanRanges); } return workerScanRanges; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/ScanWorkerSelector.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/ScanWorkerSelector.java index d402bf11e359c87..5c824403a157d52 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/ScanWorkerSelector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/ScanWorkerSelector.java @@ -19,6 +19,7 @@ import org.apache.doris.nereids.worker.job.ScanRanges; import org.apache.doris.nereids.worker.job.UnassignedJob; +import org.apache.doris.nereids.worker.job.UnparallelizedScanSource; import org.apache.doris.planner.ScanNode; import java.util.Map; @@ -27,7 +28,7 @@ public interface ScanWorkerSelector { // for a scan node, select replica for each scan range(denote tablet if the ScanNode is OlapScanNode), // use the replica location to build a worker execute the instance - Map selectReplicaAndWorkerWithoutBucket(ScanNode scanNode); + Map selectReplicaAndWorkerWithoutBucket(ScanNode scanNode); // return // key: Worker, the backend which will process this fragment diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/BucketScanSource.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/BucketScanSource.java index 226994bdb294361..bd2d37dd2b4fdde 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/BucketScanSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/BucketScanSource.java @@ -33,6 +33,11 @@ public BucketScanSource(Map> bucketIndexToSca this.bucketIndexToScanNodeToTablets = bucketIndexToScanNodeToTablets; } + @Override + int maxParallel(ScanNode scanNode) { + return bucketIndexToScanNodeToTablets.size(); + } + /** toString */ public void toString(StringBuilder str, String prefix) { int i = 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DefaultScanSource.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DefaultScanSource.java index 8733e9cd2aac082..1be94aba953b9e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DefaultScanSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DefaultScanSource.java @@ -27,15 +27,24 @@ public class DefaultScanSource extends ScanSource { // for example: // 1. use OlapScanNode(tableName=`tbl1`) to scan with tablet: [tablet 10001, tablet 10002] // 2. use OlapScanNode(tableName=`tbl2`) to scan with tablet: [tablet 10003, tablet 10004] - public final Map scanNodeToTablets; + public final Map scanNodeToScanRanges; - public DefaultScanSource(Map scanNodeToTablets) { - this.scanNodeToTablets = scanNodeToTablets; + public DefaultScanSource(Map scanNodeToScanRanges) { + this.scanNodeToScanRanges = scanNodeToScanRanges; + } + + @Override + int maxParallel(ScanNode scanNode) { + ScanRanges scanRanges = scanNodeToScanRanges.get(scanNode); + if (scanRanges != null) { + return scanRanges.params.size(); + } + return 0; } @Override public void toString(StringBuilder str, String prefix) { - toString(scanNodeToTablets, str, prefix); + toString(scanNodeToScanRanges, str, prefix); } /** toString */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanSource.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanSource.java index 0b123bcd1f471fc..6ec612f97894236 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanSource.java @@ -17,6 +17,10 @@ package org.apache.doris.nereids.worker.job; +import org.apache.doris.planner.ScanNode; + +import java.util.List; + /** ScanSource */ public abstract class ScanSource { @Override @@ -26,5 +30,9 @@ public String toString() { return str.toString(); } + abstract int maxParallel(ScanNode scanNode); + + abstract List parallelize(ScanNode scanNode, int instanceNum); + abstract void toString(StringBuilder str, String prefix); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanNativeTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanNativeTableJob.java index 597b016503d4cb3..5afb8cb2c72fb41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanNativeTableJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanNativeTableJob.java @@ -28,7 +28,6 @@ import org.apache.doris.qe.SessionVariable; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -125,11 +124,11 @@ private List assignWithoutBucket(OlapScanNode olapScanNode) { // for example: // { // BackendWorker("172.0.0.1"): - // ScanRanges([tablet_10001, tablet_10002, tablet_10003, tablet_10004]), + // olapScanNode1: ScanRanges([tablet_10001, tablet_10002, tablet_10003, tablet_10004]), // BackendWorker("172.0.0.2"): - // ScanRanges([tablet_10005, tablet_10006, tablet_10007, tablet_10008, tablet_10009]) + // olapScanNode1: ScanRanges([tablet_10005, tablet_10006, tablet_10007, tablet_10008, tablet_10009]) // } - Map assignedScanRanges = multipleMachinesParallelization(olapScanNode); + Map assignedScanRanges = multipleMachinesParallelization(olapScanNode); // for each worker, compute how many instances should be generated, and which data should be scanned. // for example: @@ -144,7 +143,7 @@ private List assignWithoutBucket(OlapScanNode olapScanNode) { // instance 5: ScanRanges([tablet_10007]) // ], // } - Map> workerToPerInstanceScanRanges + Map> workerToPerInstanceScanRanges = insideMachineParallelization(olapScanNode, assignedScanRanges); // flatten to instances. @@ -156,28 +155,31 @@ private List assignWithoutBucket(OlapScanNode olapScanNode) { // instance 4: AssignedJob(BackendWorker("172.0.0.2"), ScanRanges([tablet_10006, tablet_10009])), // instance 5: AssignedJob(BackendWorker("172.0.0.2"), ScanRanges([tablet_10007])), // ] - return buildInstances(olapScanNode, workerToPerInstanceScanRanges); + return buildInstances(workerToPerInstanceScanRanges); } - protected Map multipleMachinesParallelization(OlapScanNode olapScanNode) { + protected Map multipleMachinesParallelization(OlapScanNode olapScanNode) { return scanWorkerSelector.selectReplicaAndWorkerWithoutBucket(olapScanNode); } - protected > Map> insideMachineParallelization( - OlapScanNode olapScanNode, Map workerToScanRanges) { + protected Map> insideMachineParallelization( + OlapScanNode olapScanNode, Map workerToScanRanges) { - Map> workerToInstances = Maps.newLinkedHashMap(); + Map> workerToInstances = Maps.newLinkedHashMap(); - for (Entry entry : workerToScanRanges.entrySet()) { + for (Entry entry : workerToScanRanges.entrySet()) { Worker worker = entry.getKey(); // the scanRanges which this worker should scan, // for example: scan [tablet_10001, tablet_10002, tablet_10003, tablet_10004] - S allScanRanges = entry.getValue(); + ScanSource scanSource = entry.getValue().scanSource; + + // usually, its tablets num + int maxParallel = scanSource.maxParallel(olapScanNode); // now we should compute how many instances to process the data, // for example: two instances - int instanceNum = degreeOfParallelism(olapScanNode, allScanRanges.itemSize()); + int instanceNum = degreeOfParallelism(olapScanNode, maxParallel); // split the scanRanges to some partitions, one partition for one instance // for example: @@ -185,7 +187,7 @@ protected > Map> insideMachineParalleliz // instance 1: [tablet_10001, tablet_10003] // instance 2: [tablet_10002, tablet_10004] // ] - List instanceToScanRanges = allScanRanges.split(instanceNum); + List instanceToScanRanges = scanSource.parallelize(olapScanNode, instanceNum); workerToInstances.put(worker, instanceToScanRanges); } @@ -193,32 +195,28 @@ protected > Map> insideMachineParalleliz return workerToInstances; } - protected > List buildInstances( - OlapScanNode olapScanNode, - Map> workerToPerInstanceScanRanges) { + protected List buildInstances(Map> workerToPerInstanceScanSource) { List assignments = Lists.newArrayList(); int instanceIndexInFragment = 0; - for (Entry> entry : workerToPerInstanceScanRanges.entrySet()) { + for (Entry> entry : workerToPerInstanceScanSource.entrySet()) { Worker selectedWorker = entry.getKey(); - List scanRangesPerInstance = entry.getValue(); - for (ScanRanges oneInstanceScanRanges : scanRangesPerInstance) { + List scanSourcePerInstance = entry.getValue(); + for (ScanSource oneInstanceScanSource : scanSourcePerInstance) { AssignedJob instanceJob = assignWorkerAndDataSources( - instanceIndexInFragment++, selectedWorker, - new DefaultScanSource(ImmutableMap.of(olapScanNode, oneInstanceScanRanges)) - ); + instanceIndexInFragment++, selectedWorker, oneInstanceScanSource); assignments.add(instanceJob); } } return assignments; } - protected int degreeOfParallelism(ScanNode olapScanNode, int scanRangesSize) { + protected int degreeOfParallelism(ScanNode olapScanNode, int maxParallel) { // if the scan node have limit and no conjuncts, only need 1 instance to save cpu and mem resource if (ConnectContext.get() != null && olapScanNode.shouldUseOneInstance(ConnectContext.get())) { return 1; } // the scan instance num should not larger than the tablets num - return Math.min(scanRangesSize, Math.max(fragment.getParallelExecNum(), 1)); + return Math.min(maxParallel, Math.max(fragment.getParallelExecNum(), 1)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnparallelizedScanSource.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnparallelizedScanSource.java new file mode 100644 index 000000000000000..fabe9bd5d92ef70 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnparallelizedScanSource.java @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker.job; + +/** + * UnparallelizedScanRanges: + * a ScanRanges which doesn't parallelize/split to instances + */ +public class UnparallelizedScanSource { + public final ScanSource scanSource; + + public UnparallelizedScanSource(ScanSource scanSource) { + this.scanSource = scanSource; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index a6e7ab31bbafda4..7ec88ab5a18f440 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1957,7 +1957,7 @@ private boolean containsSetOperationNode(PlanNode node) { } private void setForDefaultScanSource(FInstanceExecParam instanceExecParam, DefaultScanSource scanSource) { - for (Entry scanNodeIdToReplicaIds : scanSource.scanNodeToTablets.entrySet()) { + for (Entry scanNodeIdToReplicaIds : scanSource.scanNodeToScanRanges.entrySet()) { ScanNode scanNode = scanNodeIdToReplicaIds.getKey(); ScanRanges scanReplicas = scanNodeIdToReplicaIds.getValue(); instanceExecParam.perNodeScanRanges.put(scanNode.getId().asInt(), scanReplicas.params);