Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Jun 6, 2024
1 parent bfa8161 commit f8fd405
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import org.apache.doris.common.Pair;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.worker.job.DefaultScanSource;
import org.apache.doris.nereids.worker.job.ScanRanges;
import org.apache.doris.nereids.worker.job.UnassignedJob;
import org.apache.doris.nereids.worker.job.UnparallelizedScanSource;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanFragment;
Expand Down Expand Up @@ -51,8 +53,8 @@ public class LoadBalanceScanWorkerSelector implements ScanWorkerSelector {
private final Map<Worker, WorkerWorkload> workloads = Maps.newLinkedHashMap();

@Override
public Map<Worker, ScanRanges> selectReplicaAndWorkerWithoutBucket(ScanNode scanNode) {
Map<Worker, ScanRanges> workerScanRanges = Maps.newLinkedHashMap();
public Map<Worker, UnparallelizedScanSource> selectReplicaAndWorkerWithoutBucket(ScanNode scanNode) {
Map<Worker, UnparallelizedScanSource> workerScanRanges = Maps.newLinkedHashMap();
// allScanRangesLocations is all scan ranges in all partition which need to scan
List<TScanRangeLocations> allScanRangesLocations = scanNode.getScanRangeLocations(0);
for (TScanRangeLocations onePartitionOneScanRangeLocation : allScanRangesLocations) {
Expand All @@ -61,8 +63,14 @@ public Map<Worker, ScanRanges> selectReplicaAndWorkerWithoutBucket(ScanNode scan

WorkerScanRanges assigned = selectScanReplicaAndMinWorkloadWorker(
onePartitionOneScanRangeLocation, bytes);
ScanRanges scanRanges = workerScanRanges.computeIfAbsent(assigned.worker, w -> new ScanRanges());
scanRanges.addScanRanges(scanRanges);
UnparallelizedScanSource scanRanges = workerScanRanges.computeIfAbsent(
assigned.worker,
w -> new UnparallelizedScanSource(
new DefaultScanSource(ImmutableMap.of(scanNode, new ScanRanges()))
)
);
DefaultScanSource scanSource = (DefaultScanSource) scanRanges.scanSource;
scanSource.scanNodeToScanRanges.get(scanNode).addScanRanges(assigned.scanRanges);
}
return workerScanRanges;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.nereids.worker.job.ScanRanges;
import org.apache.doris.nereids.worker.job.UnassignedJob;
import org.apache.doris.nereids.worker.job.UnparallelizedScanSource;
import org.apache.doris.planner.ScanNode;

import java.util.Map;
Expand All @@ -27,7 +28,7 @@
public interface ScanWorkerSelector {
// for a scan node, select replica for each scan range(denote tablet if the ScanNode is OlapScanNode),
// use the replica location to build a worker execute the instance
Map<Worker, ScanRanges> selectReplicaAndWorkerWithoutBucket(ScanNode scanNode);
Map<Worker, UnparallelizedScanSource> selectReplicaAndWorkerWithoutBucket(ScanNode scanNode);

// return
// key: Worker, the backend which will process this fragment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public BucketScanSource(Map<Integer, Map<ScanNode, ScanRanges>> bucketIndexToSca
this.bucketIndexToScanNodeToTablets = bucketIndexToScanNodeToTablets;
}

@Override
int maxParallel(ScanNode scanNode) {
return bucketIndexToScanNodeToTablets.size();
}

/** toString */
public void toString(StringBuilder str, String prefix) {
int i = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,24 @@ public class DefaultScanSource extends ScanSource {
// for example:
// 1. use OlapScanNode(tableName=`tbl1`) to scan with tablet: [tablet 10001, tablet 10002]
// 2. use OlapScanNode(tableName=`tbl2`) to scan with tablet: [tablet 10003, tablet 10004]
public final Map<ScanNode, ScanRanges> scanNodeToTablets;
public final Map<ScanNode, ScanRanges> scanNodeToScanRanges;

public DefaultScanSource(Map<ScanNode, ScanRanges> scanNodeToTablets) {
this.scanNodeToTablets = scanNodeToTablets;
public DefaultScanSource(Map<ScanNode, ScanRanges> scanNodeToScanRanges) {
this.scanNodeToScanRanges = scanNodeToScanRanges;
}

@Override
int maxParallel(ScanNode scanNode) {
ScanRanges scanRanges = scanNodeToScanRanges.get(scanNode);
if (scanRanges != null) {
return scanRanges.params.size();
}
return 0;
}

@Override
public void toString(StringBuilder str, String prefix) {
toString(scanNodeToTablets, str, prefix);
toString(scanNodeToScanRanges, str, prefix);
}

/** toString */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.doris.nereids.worker.job;

import org.apache.doris.planner.ScanNode;

import java.util.List;

/** ScanSource */
public abstract class ScanSource {
@Override
Expand All @@ -26,5 +30,9 @@ public String toString() {
return str.toString();
}

abstract int maxParallel(ScanNode scanNode);

abstract List<ScanSource> parallelize(ScanNode scanNode, int instanceNum);

abstract void toString(StringBuilder str, String prefix);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.doris.qe.SessionVariable;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -125,11 +124,11 @@ private List<AssignedJob> assignWithoutBucket(OlapScanNode olapScanNode) {
// for example:
// {
// BackendWorker("172.0.0.1"):
// ScanRanges([tablet_10001, tablet_10002, tablet_10003, tablet_10004]),
// olapScanNode1: ScanRanges([tablet_10001, tablet_10002, tablet_10003, tablet_10004]),
// BackendWorker("172.0.0.2"):
// ScanRanges([tablet_10005, tablet_10006, tablet_10007, tablet_10008, tablet_10009])
// olapScanNode1: ScanRanges([tablet_10005, tablet_10006, tablet_10007, tablet_10008, tablet_10009])
// }
Map<Worker, ScanRanges> assignedScanRanges = multipleMachinesParallelization(olapScanNode);
Map<Worker, UnparallelizedScanSource> assignedScanRanges = multipleMachinesParallelization(olapScanNode);

// for each worker, compute how many instances should be generated, and which data should be scanned.
// for example:
Expand All @@ -144,7 +143,7 @@ private List<AssignedJob> assignWithoutBucket(OlapScanNode olapScanNode) {
// instance 5: ScanRanges([tablet_10007])
// ],
// }
Map<Worker, List<ScanRanges>> workerToPerInstanceScanRanges
Map<Worker, List<ScanSource>> workerToPerInstanceScanRanges
= insideMachineParallelization(olapScanNode, assignedScanRanges);

// flatten to instances.
Expand All @@ -156,69 +155,68 @@ private List<AssignedJob> assignWithoutBucket(OlapScanNode olapScanNode) {
// instance 4: AssignedJob(BackendWorker("172.0.0.2"), ScanRanges([tablet_10006, tablet_10009])),
// instance 5: AssignedJob(BackendWorker("172.0.0.2"), ScanRanges([tablet_10007])),
// ]
return buildInstances(olapScanNode, workerToPerInstanceScanRanges);
return buildInstances(workerToPerInstanceScanRanges);
}

protected Map<Worker, ScanRanges> multipleMachinesParallelization(OlapScanNode olapScanNode) {
protected Map<Worker, UnparallelizedScanSource> multipleMachinesParallelization(OlapScanNode olapScanNode) {
return scanWorkerSelector.selectReplicaAndWorkerWithoutBucket(olapScanNode);
}

protected <S extends Splittable<S>> Map<Worker, List<S>> insideMachineParallelization(
OlapScanNode olapScanNode, Map<Worker, S> workerToScanRanges) {
protected Map<Worker, List<ScanSource>> insideMachineParallelization(
OlapScanNode olapScanNode, Map<Worker, UnparallelizedScanSource> workerToScanRanges) {

Map<Worker, List<S>> workerToInstances = Maps.newLinkedHashMap();
Map<Worker, List<ScanSource>> workerToInstances = Maps.newLinkedHashMap();

for (Entry<Worker, S> entry : workerToScanRanges.entrySet()) {
for (Entry<Worker, UnparallelizedScanSource> entry : workerToScanRanges.entrySet()) {
Worker worker = entry.getKey();

// the scanRanges which this worker should scan,
// for example: scan [tablet_10001, tablet_10002, tablet_10003, tablet_10004]
S allScanRanges = entry.getValue();
ScanSource scanSource = entry.getValue().scanSource;

// usually, its tablets num
int maxParallel = scanSource.maxParallel(olapScanNode);

// now we should compute how many instances to process the data,
// for example: two instances
int instanceNum = degreeOfParallelism(olapScanNode, allScanRanges.itemSize());
int instanceNum = degreeOfParallelism(olapScanNode, maxParallel);

// split the scanRanges to some partitions, one partition for one instance
// for example:
// [
// instance 1: [tablet_10001, tablet_10003]
// instance 2: [tablet_10002, tablet_10004]
// ]
List<S> instanceToScanRanges = allScanRanges.split(instanceNum);
List<ScanSource> instanceToScanRanges = scanSource.parallelize(olapScanNode, instanceNum);

workerToInstances.put(worker, instanceToScanRanges);
}

return workerToInstances;
}

protected <S extends Splittable<S>> List<AssignedJob> buildInstances(
OlapScanNode olapScanNode,
Map<Worker, List<ScanRanges>> workerToPerInstanceScanRanges) {
protected List<AssignedJob> buildInstances(Map<Worker, List<ScanSource>> workerToPerInstanceScanSource) {
List<AssignedJob> assignments = Lists.newArrayList();
int instanceIndexInFragment = 0;
for (Entry<Worker, List<ScanRanges>> entry : workerToPerInstanceScanRanges.entrySet()) {
for (Entry<Worker, List<ScanSource>> entry : workerToPerInstanceScanSource.entrySet()) {
Worker selectedWorker = entry.getKey();
List<ScanRanges> scanRangesPerInstance = entry.getValue();
for (ScanRanges oneInstanceScanRanges : scanRangesPerInstance) {
List<ScanSource> scanSourcePerInstance = entry.getValue();
for (ScanSource oneInstanceScanSource : scanSourcePerInstance) {
AssignedJob instanceJob = assignWorkerAndDataSources(
instanceIndexInFragment++, selectedWorker,
new DefaultScanSource(ImmutableMap.of(olapScanNode, oneInstanceScanRanges))
);
instanceIndexInFragment++, selectedWorker, oneInstanceScanSource);
assignments.add(instanceJob);
}
}
return assignments;
}

protected int degreeOfParallelism(ScanNode olapScanNode, int scanRangesSize) {
protected int degreeOfParallelism(ScanNode olapScanNode, int maxParallel) {
// if the scan node have limit and no conjuncts, only need 1 instance to save cpu and mem resource
if (ConnectContext.get() != null && olapScanNode.shouldUseOneInstance(ConnectContext.get())) {
return 1;
}

// the scan instance num should not larger than the tablets num
return Math.min(scanRangesSize, Math.max(fragment.getParallelExecNum(), 1));
return Math.min(maxParallel, Math.max(fragment.getParallelExecNum(), 1));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.worker.job;

/**
* UnparallelizedScanRanges:
* a ScanRanges which doesn't parallelize/split to instances
*/
public class UnparallelizedScanSource {
public final ScanSource scanSource;

public UnparallelizedScanSource(ScanSource scanSource) {
this.scanSource = scanSource;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1957,7 +1957,7 @@ private boolean containsSetOperationNode(PlanNode node) {
}

private void setForDefaultScanSource(FInstanceExecParam instanceExecParam, DefaultScanSource scanSource) {
for (Entry<ScanNode, ScanRanges> scanNodeIdToReplicaIds : scanSource.scanNodeToTablets.entrySet()) {
for (Entry<ScanNode, ScanRanges> scanNodeIdToReplicaIds : scanSource.scanNodeToScanRanges.entrySet()) {
ScanNode scanNode = scanNodeIdToReplicaIds.getKey();
ScanRanges scanReplicas = scanNodeIdToReplicaIds.getValue();
instanceExecParam.perNodeScanRanges.put(scanNode.getId().asInt(), scanReplicas.params);
Expand Down

0 comments on commit f8fd405

Please sign in to comment.