Skip to content

Commit

Permalink
1. fix distinct multi receivers
Browse files Browse the repository at this point in the history
2. support bucket shuffle join
  • Loading branch information
924060929 committed Sep 26, 2024
1 parent 33069bb commit a81b969
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,27 @@
package org.apache.doris.nereids.trees.plans.distribute;

import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
import org.apache.doris.nereids.trees.plans.distribute.worker.DummyWorker;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJobBuilder;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleAssignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.StaticAssignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJobBuilder;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedScanBucketOlapTableJob;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -103,7 +110,36 @@ private void linkPipelinePlan(PipelineDistributedPlan receiverPlan, PipelineDist

private void linkBucketShuffleJoinPlan(
PipelineDistributedPlan joinSide, PipelineDistributedPlan shuffleSide, boolean useLocalShuffle) {
// joinSide.getFragmentJob().getFragment().getBucketNum()
UnassignedScanBucketOlapTableJob bucketJob = (UnassignedScanBucketOlapTableJob) joinSide.getFragmentJob();
int bucketNum = bucketJob.getOlapScanNodes().get(0).getBucketNum();
List<AssignedJob> instancePerBucket = sortInstanceByBuckets(joinSide, bucketNum);
shuffleSide.setDestinations(instancePerBucket);

// joinSide.getFragmentJob().getFragment().getBucketNum();
// linkAllInstances(joinSide, shuffleSide);
}

private List<AssignedJob> sortInstanceByBuckets(PipelineDistributedPlan plan, int bucketNum) {
AssignedJob[] instances = new AssignedJob[bucketNum];
for (AssignedJob instanceJob : plan.getInstanceJobs()) {
BucketScanSource bucketScanSource = (BucketScanSource) instanceJob.getScanSource();
for (Integer bucketIndex : bucketScanSource.bucketIndexToScanNodeToTablets.keySet()) {
instances[bucketIndex] = instanceJob;
}
}

for (int i = 0; i < instances.length; i++) {
if (instances[i] == null) {
instances[i] = new StaticAssignedJob(
i,
new TUniqueId(-1, -1),
plan.getFragmentJob(),
DummyWorker.INSTANCE,
new DefaultScanSource(ImmutableMap.of())
);
}
}
return Arrays.asList(instances);
}

private void linkNormalPlans(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public List<AssignedJob> getDestinations() {
return destinations;
}

public void setDestinations(
List<AssignedJob> destinations) {
public void setDestinations(List<AssignedJob> destinations) {
this.destinations = destinations;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.distribute.worker;

public class DummyWorker implements DistributedPlanWorker {
public static final DummyWorker INSTANCE = new DummyWorker();

private DummyWorker() {}

@Override
public long id() {
return 0;
}

@Override
public String address() {
return "0.0.0.0:0";
}

@Override
public String host() {
return "0.0.0.0";
}

@Override
public int port() {
return 0;
}

@Override
public String brpcAddress() {
return "0.0.0.0:0";
}

@Override
public int brpcPort() {
return 0;
}

@Override
public boolean available() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ public interface AssignedJob {
ScanSource getScanSource();

String toString(boolean showUnassignedJob);

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
package org.apache.doris.nereids.trees.plans.distribute.worker.job;

import org.apache.doris.common.util.ListUtil;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.ScanNode;

import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Supplier;

/** BucketScanSource */
public class BucketScanSource extends ScanSource {
Expand All @@ -35,8 +38,22 @@ public class BucketScanSource extends ScanSource {
// 2. bucket 1 use OlapScanNode(tableName=`tbl`) to scan with tablet: [tablet 10002, tablet 10004]
public final Map<Integer, Map<ScanNode, ScanRanges>> bucketIndexToScanNodeToTablets;

private final Supplier<Integer> bucketNum;

public BucketScanSource(Map<Integer, Map<ScanNode, ScanRanges>> bucketIndexToScanNodeToTablets) {
this.bucketIndexToScanNodeToTablets = bucketIndexToScanNodeToTablets;

this.bucketNum = Suppliers.memoize(() -> bucketIndexToScanNodeToTablets.values()
.stream()
.flatMap(scanNodeToRanges -> scanNodeToRanges.keySet().stream())
.filter(OlapScanNode.class::isInstance)
.map(scanNode -> ((OlapScanNode) scanNode).getBucketNum())
.reduce(Integer::max)
.orElse(1));
}

public int bucketNum() {
return bucketNum.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public RowBatch getNext() throws Exception {
}

public boolean isEof() {
return coordinatorContext.asQueryProcessor().isEof();
return coordinatorContext.asQueryProcessor().isEos();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;

Expand Down Expand Up @@ -83,7 +85,12 @@ public static QueryProcessor build(CoordinatorContext coordinatorContext, SqlPip

List<AssignedJob> topInstances = topFragment.getInstanceJobs();
List<ResultReceiver> receivers = Lists.newArrayListWithCapacity(topInstances.size());
Map<Long, AssignedJob> distinctWorkerJobs = Maps.newLinkedHashMap();
for (AssignedJob topInstance : topInstances) {
distinctWorkerJobs.putIfAbsent(topInstance.getAssignedWorker().id(), topInstance);
}

for (AssignedJob topInstance : distinctWorkerJobs.values()) {
DistributedPlanWorker topWorker = topInstance.getAssignedWorker();
TNetworkAddress execBeAddr = new TNetworkAddress(topWorker.host(), topWorker.brpcPort());
receivers.add(
Expand All @@ -104,24 +111,21 @@ public static QueryProcessor build(CoordinatorContext coordinatorContext, SqlPip
return new QueryProcessor(coordinatorContext, sqlPipelineTask, receivers);
}

public boolean isEof() {
public boolean isEos() {
return runningReceivers.isEmpty();
}

public RowBatch getNext() throws UserException, TException, RpcException {
if (runningReceivers.isEmpty()) {
throw new UserException("There is no receiver.");
}

ResultReceiver receiver = runningReceivers.get(receiverOffset);
Status status = new Status();
RowBatch resultBatch = receiver.getNext(status);
if (!status.ok()) {
LOG.warn("Query {} coordinator get next fail, {}, need cancel.",
DebugUtil.printId(coordinatorContext.queryId), status.getErrorMsg());
}
coordinatorContext.updateStatusIfOk(status);

Status copyStatus = coordinatorContext.updateStatusIfOk(status);
Status copyStatus = coordinatorContext.readCloneStatus();
if (!copyStatus.ok()) {
if (Strings.isNullOrEmpty(copyStatus.getErrorMsg())) {
copyStatus.rewriteErrorMsg();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanRanges;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanSource;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedScanBucketOlapTableJob;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PlanFragment;
Expand Down Expand Up @@ -81,8 +82,6 @@ private static Map<DistributedPlanWorker, TPipelineFragmentParamsList> plansToTh

for (int recvrId = 0; recvrId < currentFragmentPlan.getInstanceJobs().size(); recvrId++) {
AssignedJob instanceJob = currentFragmentPlan.getInstanceJobs().get(recvrId);
// Suggestion: Do not modify currentFragmentParam out of the `fragmentToThriftIfAbsent` method,
// except add instanceParam into local_params
TPipelineFragmentParams currentFragmentParam = fragmentToThriftIfAbsent(
currentFragmentPlan, instanceJob, workerToCurrentFragment,
exchangeSenderNum, fileScanRangeParams,
Expand All @@ -92,7 +91,6 @@ private static Map<DistributedPlanWorker, TPipelineFragmentParamsList> plansToTh
List<TPipelineInstanceParams> instancesParams = currentFragmentParam.getLocalParams();
currentFragmentParam.getShuffleIdxToInstanceIdx().put(recvrId, instancesParams.size());
currentFragmentParam.getPerNodeSharedScans().putAll(instanceParam.getPerNodeSharedScans());
currentFragmentParam.setNumBuckets(0);

instancesParams.add(instanceParam);
}
Expand Down Expand Up @@ -254,7 +252,15 @@ private static TPipelineFragmentParams fragmentToThriftIfAbsent(
params.setWorkloadGroups(coordinatorContext.getWorkloadGroups());

params.setFileScanParams(fileScanRangeParamsMap);
// params.setNumBuckets(fragment.getBucketNum());

if (fragmentPlan.getFragmentJob() instanceof UnassignedScanBucketOlapTableJob) {
int bucketNum = ((UnassignedScanBucketOlapTableJob) fragmentPlan.getFragmentJob())
.getOlapScanNodes()
.get(0)
.getBucketNum();
params.setNumBuckets(bucketNum);
}

params.setPerNodeSharedScans(new LinkedHashMap<>());
// if (ignoreDataDistribution) {
// params.setParallelInstances(parallelTasksNum);
Expand Down Expand Up @@ -315,14 +321,17 @@ private static void setDefaultScanSourceParam(DefaultScanSource defaultScanSourc

private static void setBucketScanSourceParam(BucketScanSource bucketScanSource, TPipelineInstanceParams params) {
Map<Integer, List<TScanRangeParams>> scanNodeIdToScanRanges = Maps.newLinkedHashMap();
Map<Integer, Boolean> perNodeSharedScans = Maps.newLinkedHashMap();
for (Map<ScanNode, ScanRanges> scanNodeToRanges : bucketScanSource.bucketIndexToScanNodeToTablets.values()) {
for (Entry<ScanNode, ScanRanges> kv2 : scanNodeToRanges.entrySet()) {
int scanNodeId = kv2.getKey().getId().asInt();
perNodeSharedScans.put(scanNodeId, false);
List<TScanRangeParams> scanRanges = scanNodeIdToScanRanges.computeIfAbsent(scanNodeId, ArrayList::new);
List<TScanRangeParams> currentScanRanges = kv2.getValue().params;
scanRanges.addAll(currentScanRanges);
}
}
params.setPerNodeScanRanges(scanNodeIdToScanRanges);
params.setPerNodeSharedScans(perNodeSharedScans);
}
}

0 comments on commit a81b969

Please sign in to comment.