diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java index 3578194d28c89cf..8b5d9bec8e2a191 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java @@ -51,16 +51,33 @@ public class LoadBalanceScanWorkerSelector implements ScanWorkerSelector { private final Map workloads = Maps.newLinkedHashMap(); @Override - public Map> selectReplicaAndWorkerWithoutBucket( - UnassignedJob unassignedJob) { - List scanNodes = unassignedJob.getScanNodes(); - if (scanNodes.size() != 1) { - throw new IllegalStateException("Illegal fragment type, " - + "should only contains one ScanNode but meet " + scanNodes.size() + " ScanNodes"); + public Map selectReplicaAndWorkerWithoutBucket(ScanNode scanNode) { + Map workerScanRanges = Maps.newLinkedHashMap(); + // allScanRangesLocations is all scan ranges in all partition which need to scan + List allScanRangesLocations = scanNode.getScanRangeLocations(0); + for (TScanRangeLocations onePartitionOneScanRangeLocation : allScanRangesLocations) { + // usually, the onePartitionOneScanRangeLocation is a tablet in one partition + long bytes = getScanRangeSize(scanNode, onePartitionOneScanRangeLocation); + + WorkerScanRanges assigned = selectScanReplicaAndMinWorkloadWorker( + onePartitionOneScanRangeLocation, bytes); + ScanRanges scanRanges = workerScanRanges.computeIfAbsent(assigned.worker, w -> new ScanRanges()); + scanRanges.addScanRanges(scanRanges); } - return selectForSingleScanTable(scanNodes.get(0)); + return workerScanRanges; } + // @Override + // public Map> selectReplicaAndWorkerWithoutBucket( + // UnassignedJob unassignedJob) { + // List scanNodes = unassignedJob.getScanNodes(); + // if (scanNodes.size() != 1) { + // throw new IllegalStateException("Illegal fragment type, " + // + "should only contains one ScanNode but meet " + scanNodes.size() + " ScanNodes"); + // } + // return selectForSingleScanTable(scanNodes.get(0)); + // } + @Override public Map>> selectReplicaAndWorkerWithBucket( UnassignedJob unassignedJob) { @@ -148,36 +165,7 @@ private Map>> selectForBucket( return assignment; } - private Map> selectForSingleScanTable(ScanNode scanNode) { - Map> workerToScanNodeAndReplicas = Maps.newHashMap(); - List allScanTabletLocations = scanNode.getScanRangeLocations(0); - for (TScanRangeLocations onePartitionOneTabletLocation : allScanTabletLocations) { - Long tabletBytes = getScanRangeSize(scanNode, onePartitionOneTabletLocation); - - SelectResult selectedReplicaAndWorker - = selectScanReplicaAndMinWorkloadWorker(onePartitionOneTabletLocation, tabletBytes); - Worker selectedWorker = selectedReplicaAndWorker.selectWorker; - TScanRangeLocation selectedReplica = selectedReplicaAndWorker.selectReplica; - - Map scanNodeToRanges - = workerToScanNodeAndReplicas.computeIfAbsent(selectedWorker, worker -> Maps.newLinkedHashMap()); - ScanRanges selectedReplicas - = scanNodeToRanges.computeIfAbsent(scanNode, node -> new ScanRanges()); - TScanRangeParams scanReplicaParam = buildScanReplicaParams(onePartitionOneTabletLocation, selectedReplica); - selectedReplicas.addScanRange(scanReplicaParam, tabletBytes); - } - - // scan empty table, assign a random worker with empty - if (workerToScanNodeAndReplicas.isEmpty()) { - workerToScanNodeAndReplicas.put( - workerManager.randomAvailableWorker(), - ImmutableMap.of(scanNode, new ScanRanges()) - ); - } - return workerToScanNodeAndReplicas; - } - - private SelectResult selectScanReplicaAndMinWorkloadWorker( + private WorkerScanRanges selectScanReplicaAndMinWorkloadWorker( TScanRangeLocations tabletLocation, long tabletBytes) { List replicaLocations = tabletLocation.getLocations(); int replicaNum = replicaLocations.size(); @@ -203,7 +191,10 @@ private SelectResult selectScanReplicaAndMinWorkloadWorker( throw new AnalysisException("No available workers"); } else { minWorkload.recordOneScanTask(tabletBytes); - return new SelectResult(minWorkLoadWorker, selectedReplicaLocation, minWorkload.scanBytes); + ScanRanges scanRanges = new ScanRanges(); + TScanRangeParams scanReplicaParams = buildScanReplicaParams(tabletLocation, selectedReplicaLocation); + scanRanges.addScanRange(scanReplicaParams, tabletBytes); + return new WorkerScanRanges(minWorkLoadWorker, scanRanges); } } @@ -303,18 +294,6 @@ private long getScanRangeSize(ScanNode scanNode, TScanRangeLocations scanRangeLo return 0L; } - private static class SelectResult { - private final Worker selectWorker; - private final TScanRangeLocation selectReplica; - private final long bytes; - - public SelectResult(Worker selecteWorker, TScanRangeLocation selectReplica, long bytes) { - this.selectWorker = selecteWorker; - this.selectReplica = selectReplica; - this.bytes = bytes; - } - } - private static class WorkerWorkload implements Comparable { private int taskNum; private long scanBytes; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/ScanWorkerSelector.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/ScanWorkerSelector.java index a96033f7f0594a8..d402bf11e359c87 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/ScanWorkerSelector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/ScanWorkerSelector.java @@ -25,15 +25,9 @@ /** ScanWorkerSelector */ public interface ScanWorkerSelector { - // for a scan job: - // 1. select some workers - // 2. select replicas for each worker - // - // return - // key: backend - // value: which data should scan - Map> selectReplicaAndWorkerWithoutBucket( - UnassignedJob unassignedJob); + // for a scan node, select replica for each scan range(denote tablet if the ScanNode is OlapScanNode), + // use the replica location to build a worker execute the instance + Map selectReplicaAndWorkerWithoutBucket(ScanNode scanNode); // return // key: Worker, the backend which will process this fragment diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerScanRanges.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerScanRanges.java new file mode 100644 index 000000000000000..7f861d33cfc13c2 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerScanRanges.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker; + +import org.apache.doris.nereids.worker.job.ScanRanges; + +import java.util.Objects; + +/** WorkerScanRange */ +public class WorkerScanRanges { + public final Worker worker; + public final ScanRanges scanRanges; + + public WorkerScanRanges(Worker worker, ScanRanges scanRanges) { + this.worker = Objects.requireNonNull(worker, "scanRangeParams can not be null"); + this.scanRanges = Objects.requireNonNull(scanRanges, "scanRanges can not be null"); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AbstractUnassignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AbstractUnassignedJob.java index 1e03d97a12429a8..45d66b3c4df28ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AbstractUnassignedJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AbstractUnassignedJob.java @@ -17,8 +17,10 @@ package org.apache.doris.nereids.worker.job; +import org.apache.doris.common.util.ListUtil; import org.apache.doris.nereids.trees.AbstractTreeNode; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.nereids.worker.WorkerScanRanges; import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.ScanNode; @@ -46,6 +48,21 @@ public AbstractUnassignedJob(PlanFragment fragment, List scanNodes, = Objects.requireNonNull(exchangeToChildJob, "exchangeToChildJob can not be null"); } + // one instance per tablets is too expensive, we should coalesce to less instances. + // for example: + // assignedScanRanges: [tablet_1001, tablet_1002, tablet_1003, tablet_1004], + // instanceNumPerWorker: 2 + // + // we will generate two instances, every instance process two tablets: + // [ + // [tablet_1001, tablet_1002], + // [tablet_1003, tablet_1004] + // ] + protected List> coalesceInstances( + List assignedScanRanges, int instanceNumPerWorker) { + return ListUtil.splitBySize(assignedScanRanges, instanceNumPerWorker); + } + @Override public PlanFragment getFragment() { return fragment; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedNearStorageJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanRange.java similarity index 80% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedNearStorageJob.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanRange.java index 5e5158bb5c0d49d..a20897eee9426fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedNearStorageJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanRange.java @@ -17,11 +17,7 @@ package org.apache.doris.nereids.worker.job; -import org.apache.doris.planner.ScanNode; +/** ScanRange */ +public class ScanRange { -import java.util.List; - -/** UnassignedNearStorageJob */ -public interface UnassignedNearStorageJob extends UnassignedJob { - List nearStorageScanNodes(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanRanges.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanRanges.java index e15f2c0bead0d5f..5767995ec1ac9e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanRanges.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanRanges.java @@ -25,7 +25,7 @@ import java.util.List; /**ScanRanges */ -public class ScanRanges { +public class ScanRanges implements Splittable { // usually, it's tablets public final List params; // size corresponding to tablets one by one @@ -47,12 +47,33 @@ public ScanRanges(List params, List bytes) { this.totalBytes = totalBytes; } + public void addScanRanges(ScanRanges scanRanges) { + this.params.addAll(scanRanges.params); + this.bytes.addAll(scanRanges.bytes); + this.totalBytes += scanRanges.totalBytes; + } + public void addScanRange(TScanRangeParams params, long bytes) { this.params.add(params); this.bytes.add(bytes); this.totalBytes += bytes; } + @Override + public int itemSize() { + return params.size(); + } + + @Override + public void addItem(ScanRanges other, int index) { + addScanRange(other.params.get(index), other.bytes.get(index)); + } + + @Override + public ScanRanges newSplittable() { + return new ScanRanges(); + } + @Override public String toString() { StringBuilder str = new StringBuilder(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/Splittable.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/Splittable.java new file mode 100644 index 000000000000000..78c353b940be1c8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/Splittable.java @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.worker.job; + +import com.google.common.base.Preconditions; + +import java.util.ArrayList; +import java.util.List; + +/** Splittable */ +public interface Splittable> { + + int itemSize(); + + void addItem(S other, int index); + + default List split(int splitSize) { + return Splittable.split(this, splitSize); + } + + S newSplittable(); + + /** + * split a list to multi expected number sublist + * for example: + * + * list is : [1, 2, 3, 4, 5, 6, 7] + * expectedSize is : 3 + * + * return : + * [1, 4, 7] + * [2, 5] + * [3, 6] + */ + static > List split(Splittable splittable, int splitSize) { + Preconditions.checkNotNull(splittable, "splittable must not be null"); + Preconditions.checkArgument(splitSize > 0, "splitSize must larger than 0"); + + int itemSize = splittable.itemSize(); + splitSize = Math.min(splitSize, itemSize); + + List result = new ArrayList<>(splitSize); + for (int i = 0; i < splitSize; i++) { + result.add(splittable.newSplittable()); + } + + int index = 0; + for (int i = 0; i < itemSize; i++) { + result.get(index).addItem((S) splittable, i); + index = (index + 1) % splitSize; + } + return result; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanNativeTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanNativeTableJob.java index b08c1bb4aad8fa8..597b016503d4cb3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanNativeTableJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanNativeTableJob.java @@ -27,9 +27,11 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; -import com.google.common.collect.ImmutableList; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import java.util.List; import java.util.Map; @@ -40,7 +42,7 @@ * UnassignedScanNativeTableJob. * scan native olap table, we can assign a worker near the storage */ -public class UnassignedScanNativeTableJob extends AbstractUnassignedJob implements UnassignedNearStorageJob { +public class UnassignedScanNativeTableJob extends AbstractUnassignedJob { private final ScanWorkerSelector scanWorkerSelector; private final List olapScanNodes; @@ -53,19 +55,16 @@ public UnassignedScanNativeTableJob( this.scanWorkerSelector = Objects.requireNonNull( scanWorkerSelector, "scanWorkerSelector cat not be null"); - // filter scan nodes - ImmutableList.Builder olapScanNodes = ImmutableList.builderWithExpectedSize(allScanNodes.size()); - for (ScanNode allScanNode : allScanNodes) { - if (allScanNode instanceof OlapScanNode) { - olapScanNodes.add((OlapScanNode) allScanNode); + Preconditions.checkArgument(!allScanNodes.isEmpty(), "OlapScanNode is empty"); + + for (ScanNode scanNode : allScanNodes) { + if (!(scanNode instanceof OlapScanNode)) { + throw new IllegalStateException( + "UnassignedScanNativeTableJob only support process OlapScanNode, but meet: " + + scanNode.getClass().getSimpleName()); } } - this.olapScanNodes = olapScanNodes.build(); - } - - @Override - public List nearStorageScanNodes() { - return (List) olapScanNodes; + this.olapScanNodes = (List) allScanNodes; } @Override @@ -74,7 +73,11 @@ public List computeAssignedJobs( if (shouldAssignByBucket()) { return assignWithBucket(); } else { - return assignWithoutBucket(); + Preconditions.checkState( + olapScanNodes.size() == 1, + "One fragment contains multiple OlapScanNodes but not contains colocate join or bucket shuffle join" + ); + return assignWithoutBucket(olapScanNodes.get(0)); } } @@ -117,20 +120,105 @@ private List assignWithBucket() { return assignments; } - private List assignWithoutBucket() { - Map> workerToReplicas - = scanWorkerSelector.selectReplicaAndWorkerWithoutBucket(this); + private List assignWithoutBucket(OlapScanNode olapScanNode) { + // for every tablet, select its replica and worker. + // for example: + // { + // BackendWorker("172.0.0.1"): + // ScanRanges([tablet_10001, tablet_10002, tablet_10003, tablet_10004]), + // BackendWorker("172.0.0.2"): + // ScanRanges([tablet_10005, tablet_10006, tablet_10007, tablet_10008, tablet_10009]) + // } + Map assignedScanRanges = multipleMachinesParallelization(olapScanNode); + + // for each worker, compute how many instances should be generated, and which data should be scanned. + // for example: + // { + // BackendWorker("172.0.0.1"): [ + // instance 1: ScanRanges([tablet_10001, tablet_10003]) + // instance 2: ScanRanges([tablet_10002, tablet_10004]) + // ], + // BackendWorker("172.0.0.2"): [ + // instance 3: ScanRanges([tablet_10005, tablet_10008]) + // instance 4: ScanRanges([tablet_10006, tablet_10009]) + // instance 5: ScanRanges([tablet_10007]) + // ], + // } + Map> workerToPerInstanceScanRanges + = insideMachineParallelization(olapScanNode, assignedScanRanges); + + // flatten to instances. + // for example: + // [ + // instance 1: AssignedJob(BackendWorker("172.0.0.1"), ScanRanges([tablet_10001, tablet_10003])), + // instance 2: AssignedJob(BackendWorker("172.0.0.1"), ScanRanges([tablet_10002, tablet_10004])), + // instance 3: AssignedJob(BackendWorker("172.0.0.2"), ScanRanges([tablet_10005, tablet_10008])), + // instance 4: AssignedJob(BackendWorker("172.0.0.2"), ScanRanges([tablet_10006, tablet_10009])), + // instance 5: AssignedJob(BackendWorker("172.0.0.2"), ScanRanges([tablet_10007])), + // ] + return buildInstances(olapScanNode, workerToPerInstanceScanRanges); + } - List assignments = Lists.newArrayListWithCapacity(workerToReplicas.size()); + protected Map multipleMachinesParallelization(OlapScanNode olapScanNode) { + return scanWorkerSelector.selectReplicaAndWorkerWithoutBucket(olapScanNode); + } + + protected > Map> insideMachineParallelization( + OlapScanNode olapScanNode, Map workerToScanRanges) { + + Map> workerToInstances = Maps.newLinkedHashMap(); + + for (Entry entry : workerToScanRanges.entrySet()) { + Worker worker = entry.getKey(); + + // the scanRanges which this worker should scan, + // for example: scan [tablet_10001, tablet_10002, tablet_10003, tablet_10004] + S allScanRanges = entry.getValue(); + + // now we should compute how many instances to process the data, + // for example: two instances + int instanceNum = degreeOfParallelism(olapScanNode, allScanRanges.itemSize()); + + // split the scanRanges to some partitions, one partition for one instance + // for example: + // [ + // instance 1: [tablet_10001, tablet_10003] + // instance 2: [tablet_10002, tablet_10004] + // ] + List instanceToScanRanges = allScanRanges.split(instanceNum); + + workerToInstances.put(worker, instanceToScanRanges); + } + + return workerToInstances; + } + + protected > List buildInstances( + OlapScanNode olapScanNode, + Map> workerToPerInstanceScanRanges) { + List assignments = Lists.newArrayList(); int instanceIndexInFragment = 0; - for (Entry> entry : workerToReplicas.entrySet()) { + for (Entry> entry : workerToPerInstanceScanRanges.entrySet()) { Worker selectedWorker = entry.getKey(); - Map scanNodeToRanges = entry.getValue(); - AssignedJob instanceJob = assignWorkerAndDataSources( - instanceIndexInFragment++, selectedWorker, new DefaultScanSource(scanNodeToRanges) - ); - assignments.add(instanceJob); + List scanRangesPerInstance = entry.getValue(); + for (ScanRanges oneInstanceScanRanges : scanRangesPerInstance) { + AssignedJob instanceJob = assignWorkerAndDataSources( + instanceIndexInFragment++, selectedWorker, + new DefaultScanSource(ImmutableMap.of(olapScanNode, oneInstanceScanRanges)) + ); + assignments.add(instanceJob); + } } return assignments; } + + protected int degreeOfParallelism(ScanNode olapScanNode, int scanRangesSize) { + // if the scan node have limit and no conjuncts, only need 1 instance to save cpu and mem resource + if (ConnectContext.get() != null && olapScanNode.shouldUseOneInstance(ConnectContext.get())) { + return 1; + } + + // the scan instance num should not larger than the tablets num + return Math.min(scanRangesSize, Math.max(fragment.getParallelExecNum(), 1)); + } }