Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Jun 6, 2024
1 parent 9cd1d3a commit bfa8161
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,33 @@ public class LoadBalanceScanWorkerSelector implements ScanWorkerSelector {
private final Map<Worker, WorkerWorkload> workloads = Maps.newLinkedHashMap();

@Override
public Map<Worker, Map<ScanNode, ScanRanges>> selectReplicaAndWorkerWithoutBucket(
UnassignedJob unassignedJob) {
List<ScanNode> scanNodes = unassignedJob.getScanNodes();
if (scanNodes.size() != 1) {
throw new IllegalStateException("Illegal fragment type, "
+ "should only contains one ScanNode but meet " + scanNodes.size() + " ScanNodes");
public Map<Worker, ScanRanges> selectReplicaAndWorkerWithoutBucket(ScanNode scanNode) {
Map<Worker, ScanRanges> workerScanRanges = Maps.newLinkedHashMap();
// allScanRangesLocations is all scan ranges in all partition which need to scan
List<TScanRangeLocations> allScanRangesLocations = scanNode.getScanRangeLocations(0);
for (TScanRangeLocations onePartitionOneScanRangeLocation : allScanRangesLocations) {
// usually, the onePartitionOneScanRangeLocation is a tablet in one partition
long bytes = getScanRangeSize(scanNode, onePartitionOneScanRangeLocation);

WorkerScanRanges assigned = selectScanReplicaAndMinWorkloadWorker(
onePartitionOneScanRangeLocation, bytes);
ScanRanges scanRanges = workerScanRanges.computeIfAbsent(assigned.worker, w -> new ScanRanges());
scanRanges.addScanRanges(scanRanges);
}
return selectForSingleScanTable(scanNodes.get(0));
return workerScanRanges;
}

// @Override
// public Map<Worker, Map<ScanNode, ScanRanges>> selectReplicaAndWorkerWithoutBucket(
// UnassignedJob unassignedJob) {
// List<ScanNode> scanNodes = unassignedJob.getScanNodes();
// if (scanNodes.size() != 1) {
// throw new IllegalStateException("Illegal fragment type, "
// + "should only contains one ScanNode but meet " + scanNodes.size() + " ScanNodes");
// }
// return selectForSingleScanTable(scanNodes.get(0));
// }

@Override
public Map<Worker, Map<Integer, Map<ScanNode, ScanRanges>>> selectReplicaAndWorkerWithBucket(
UnassignedJob unassignedJob) {
Expand Down Expand Up @@ -148,36 +165,7 @@ private Map<Worker, Map<Integer, Map<ScanNode, ScanRanges>>> selectForBucket(
return assignment;
}

private Map<Worker, Map<ScanNode, ScanRanges>> selectForSingleScanTable(ScanNode scanNode) {
Map<Worker, Map<ScanNode, ScanRanges>> workerToScanNodeAndReplicas = Maps.newHashMap();
List<TScanRangeLocations> allScanTabletLocations = scanNode.getScanRangeLocations(0);
for (TScanRangeLocations onePartitionOneTabletLocation : allScanTabletLocations) {
Long tabletBytes = getScanRangeSize(scanNode, onePartitionOneTabletLocation);

SelectResult selectedReplicaAndWorker
= selectScanReplicaAndMinWorkloadWorker(onePartitionOneTabletLocation, tabletBytes);
Worker selectedWorker = selectedReplicaAndWorker.selectWorker;
TScanRangeLocation selectedReplica = selectedReplicaAndWorker.selectReplica;

Map<ScanNode, ScanRanges> scanNodeToRanges
= workerToScanNodeAndReplicas.computeIfAbsent(selectedWorker, worker -> Maps.newLinkedHashMap());
ScanRanges selectedReplicas
= scanNodeToRanges.computeIfAbsent(scanNode, node -> new ScanRanges());
TScanRangeParams scanReplicaParam = buildScanReplicaParams(onePartitionOneTabletLocation, selectedReplica);
selectedReplicas.addScanRange(scanReplicaParam, tabletBytes);
}

// scan empty table, assign a random worker with empty
if (workerToScanNodeAndReplicas.isEmpty()) {
workerToScanNodeAndReplicas.put(
workerManager.randomAvailableWorker(),
ImmutableMap.of(scanNode, new ScanRanges())
);
}
return workerToScanNodeAndReplicas;
}

private SelectResult selectScanReplicaAndMinWorkloadWorker(
private WorkerScanRanges selectScanReplicaAndMinWorkloadWorker(
TScanRangeLocations tabletLocation, long tabletBytes) {
List<TScanRangeLocation> replicaLocations = tabletLocation.getLocations();
int replicaNum = replicaLocations.size();
Expand All @@ -203,7 +191,10 @@ private SelectResult selectScanReplicaAndMinWorkloadWorker(
throw new AnalysisException("No available workers");
} else {
minWorkload.recordOneScanTask(tabletBytes);
return new SelectResult(minWorkLoadWorker, selectedReplicaLocation, minWorkload.scanBytes);
ScanRanges scanRanges = new ScanRanges();
TScanRangeParams scanReplicaParams = buildScanReplicaParams(tabletLocation, selectedReplicaLocation);
scanRanges.addScanRange(scanReplicaParams, tabletBytes);
return new WorkerScanRanges(minWorkLoadWorker, scanRanges);
}
}

Expand Down Expand Up @@ -303,18 +294,6 @@ private long getScanRangeSize(ScanNode scanNode, TScanRangeLocations scanRangeLo
return 0L;
}

private static class SelectResult {
private final Worker selectWorker;
private final TScanRangeLocation selectReplica;
private final long bytes;

public SelectResult(Worker selecteWorker, TScanRangeLocation selectReplica, long bytes) {
this.selectWorker = selecteWorker;
this.selectReplica = selectReplica;
this.bytes = bytes;
}
}

private static class WorkerWorkload implements Comparable<WorkerWorkload> {
private int taskNum;
private long scanBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,9 @@

/** ScanWorkerSelector */
public interface ScanWorkerSelector {
// for a scan job:
// 1. select some workers
// 2. select replicas for each worker
//
// return
// key: backend
// value: which data should scan
Map<Worker, Map<ScanNode, ScanRanges>> selectReplicaAndWorkerWithoutBucket(
UnassignedJob unassignedJob);
// for a scan node, select replica for each scan range(denote tablet if the ScanNode is OlapScanNode),
// use the replica location to build a worker execute the instance
Map<Worker, ScanRanges> selectReplicaAndWorkerWithoutBucket(ScanNode scanNode);

// return
// key: Worker, the backend which will process this fragment
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.worker;

import org.apache.doris.nereids.worker.job.ScanRanges;

import java.util.Objects;

/** WorkerScanRange */
public class WorkerScanRanges {
public final Worker worker;
public final ScanRanges scanRanges;

public WorkerScanRanges(Worker worker, ScanRanges scanRanges) {
this.worker = Objects.requireNonNull(worker, "scanRangeParams can not be null");
this.scanRanges = Objects.requireNonNull(scanRanges, "scanRanges can not be null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.doris.nereids.worker.job;

import org.apache.doris.common.util.ListUtil;
import org.apache.doris.nereids.trees.AbstractTreeNode;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.nereids.worker.WorkerScanRanges;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.ScanNode;
Expand Down Expand Up @@ -46,6 +48,21 @@ public AbstractUnassignedJob(PlanFragment fragment, List<ScanNode> scanNodes,
= Objects.requireNonNull(exchangeToChildJob, "exchangeToChildJob can not be null");
}

// one instance per tablets is too expensive, we should coalesce to less instances.
// for example:
// assignedScanRanges: [tablet_1001, tablet_1002, tablet_1003, tablet_1004],
// instanceNumPerWorker: 2
//
// we will generate two instances, every instance process two tablets:
// [
// [tablet_1001, tablet_1002],
// [tablet_1003, tablet_1004]
// ]
protected List<List<WorkerScanRanges>> coalesceInstances(
List<WorkerScanRanges> assignedScanRanges, int instanceNumPerWorker) {
return ListUtil.splitBySize(assignedScanRanges, instanceNumPerWorker);
}

@Override
public PlanFragment getFragment() {
return fragment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@

package org.apache.doris.nereids.worker.job;

import org.apache.doris.planner.ScanNode;
/** ScanRange */
public class ScanRange {

import java.util.List;

/** UnassignedNearStorageJob */
public interface UnassignedNearStorageJob extends UnassignedJob {
List<ScanNode> nearStorageScanNodes();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.List;

/**ScanRanges */
public class ScanRanges {
public class ScanRanges implements Splittable<ScanRanges> {
// usually, it's tablets
public final List<TScanRangeParams> params;
// size corresponding to tablets one by one
Expand All @@ -47,12 +47,33 @@ public ScanRanges(List<TScanRangeParams> params, List<Long> bytes) {
this.totalBytes = totalBytes;
}

public void addScanRanges(ScanRanges scanRanges) {
this.params.addAll(scanRanges.params);
this.bytes.addAll(scanRanges.bytes);
this.totalBytes += scanRanges.totalBytes;
}

public void addScanRange(TScanRangeParams params, long bytes) {
this.params.add(params);
this.bytes.add(bytes);
this.totalBytes += bytes;
}

@Override
public int itemSize() {
return params.size();
}

@Override
public void addItem(ScanRanges other, int index) {
addScanRange(other.params.get(index), other.bytes.get(index));
}

@Override
public ScanRanges newSplittable() {
return new ScanRanges();
}

@Override
public String toString() {
StringBuilder str = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.worker.job;

import com.google.common.base.Preconditions;

import java.util.ArrayList;
import java.util.List;

/** Splittable */
public interface Splittable<S extends Splittable<S>> {

int itemSize();

void addItem(S other, int index);

default List<S> split(int splitSize) {
return Splittable.split(this, splitSize);
}

S newSplittable();

/**
* split a list to multi expected number sublist
* for example:
*
* list is : [1, 2, 3, 4, 5, 6, 7]
* expectedSize is : 3
*
* return :
* [1, 4, 7]
* [2, 5]
* [3, 6]
*/
static <S extends Splittable<S>> List<S> split(Splittable<S> splittable, int splitSize) {
Preconditions.checkNotNull(splittable, "splittable must not be null");
Preconditions.checkArgument(splitSize > 0, "splitSize must larger than 0");

int itemSize = splittable.itemSize();
splitSize = Math.min(splitSize, itemSize);

List<S> result = new ArrayList<>(splitSize);
for (int i = 0; i < splitSize; i++) {
result.add(splittable.newSplittable());
}

int index = 0;
for (int i = 0; i < itemSize; i++) {
result.get(index).addItem((S) splittable, i);
index = (index + 1) % splitSize;
}
return result;
}
}
Loading

0 comments on commit bfa8161

Please sign in to comment.