From a81b9695bc626b64a299b283b10b96826df318df Mon Sep 17 00:00:00 2001 From: 924060929 <924060929@qq.com> Date: Thu, 26 Sep 2024 17:56:44 +0800 Subject: [PATCH] 1. fix distinct multi receivers 2. support bucket shuffle join --- .../plans/distribute/DistributePlanner.java | 38 +++++++++++- .../distribute/PipelineDistributedPlan.java | 3 +- .../plans/distribute/worker/DummyWorker.java | 59 +++++++++++++++++++ .../distribute/worker/job/AssignedJob.java | 1 + .../worker/job/BucketScanSource.java | 17 ++++++ .../apache/doris/qe/NereidsCoordinator.java | 2 +- .../doris/qe/runtime/QueryProcessor.java | 16 +++-- .../doris/qe/runtime/ThriftPlansBuilder.java | 17 ++++-- 8 files changed, 139 insertions(+), 14 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DummyWorker.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java index ad3241431508df1..e9987cbc5cf5d9d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java @@ -18,20 +18,27 @@ package org.apache.doris.nereids.trees.plans.distribute; import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; +import org.apache.doris.nereids.trees.plans.distribute.worker.DummyWorker; import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob; import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJobBuilder; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource; import org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleAssignedJob; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.StaticAssignedJob; import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob; import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJobBuilder; import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedScanBucketOlapTableJob; import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanFragmentId; +import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; @@ -103,7 +110,36 @@ private void linkPipelinePlan(PipelineDistributedPlan receiverPlan, PipelineDist private void linkBucketShuffleJoinPlan( PipelineDistributedPlan joinSide, PipelineDistributedPlan shuffleSide, boolean useLocalShuffle) { - // joinSide.getFragmentJob().getFragment().getBucketNum() + UnassignedScanBucketOlapTableJob bucketJob = (UnassignedScanBucketOlapTableJob) joinSide.getFragmentJob(); + int bucketNum = bucketJob.getOlapScanNodes().get(0).getBucketNum(); + List instancePerBucket = sortInstanceByBuckets(joinSide, bucketNum); + shuffleSide.setDestinations(instancePerBucket); + + // joinSide.getFragmentJob().getFragment().getBucketNum(); + // linkAllInstances(joinSide, shuffleSide); + } + + private List sortInstanceByBuckets(PipelineDistributedPlan plan, int bucketNum) { + AssignedJob[] instances = new AssignedJob[bucketNum]; + for (AssignedJob instanceJob : plan.getInstanceJobs()) { + BucketScanSource bucketScanSource = (BucketScanSource) instanceJob.getScanSource(); + for (Integer bucketIndex : bucketScanSource.bucketIndexToScanNodeToTablets.keySet()) { + instances[bucketIndex] = instanceJob; + } + } + + for (int i = 0; i < instances.length; i++) { + if (instances[i] == null) { + instances[i] = new StaticAssignedJob( + i, + new TUniqueId(-1, -1), + plan.getFragmentJob(), + DummyWorker.INSTANCE, + new DefaultScanSource(ImmutableMap.of()) + ); + } + } + return Arrays.asList(instances); } private void linkNormalPlans( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/PipelineDistributedPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/PipelineDistributedPlan.java index 78e579dd6ce2de0..7164d3f24235de4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/PipelineDistributedPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/PipelineDistributedPlan.java @@ -56,8 +56,7 @@ public List getDestinations() { return destinations; } - public void setDestinations( - List destinations) { + public void setDestinations(List destinations) { this.destinations = destinations; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DummyWorker.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DummyWorker.java new file mode 100644 index 000000000000000..4e5c60b885b4993 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DummyWorker.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.distribute.worker; + +public class DummyWorker implements DistributedPlanWorker { + public static final DummyWorker INSTANCE = new DummyWorker(); + + private DummyWorker() {} + + @Override + public long id() { + return 0; + } + + @Override + public String address() { + return "0.0.0.0:0"; + } + + @Override + public String host() { + return "0.0.0.0"; + } + + @Override + public int port() { + return 0; + } + + @Override + public String brpcAddress() { + return "0.0.0.0:0"; + } + + @Override + public int brpcPort() { + return 0; + } + + @Override + public boolean available() { + return false; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJob.java index f9f6b9dea1451b8..d74626b4889468a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJob.java @@ -36,4 +36,5 @@ public interface AssignedJob { ScanSource getScanSource(); String toString(boolean showUnassignedJob); + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/BucketScanSource.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/BucketScanSource.java index 33d066a02b9fcb4..d8ea00355a9d66b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/BucketScanSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/BucketScanSource.java @@ -18,8 +18,10 @@ package org.apache.doris.nereids.trees.plans.distribute.worker.job; import org.apache.doris.common.util.ListUtil; +import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.ScanNode; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -27,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.function.Supplier; /** BucketScanSource */ public class BucketScanSource extends ScanSource { @@ -35,8 +38,22 @@ public class BucketScanSource extends ScanSource { // 2. bucket 1 use OlapScanNode(tableName=`tbl`) to scan with tablet: [tablet 10002, tablet 10004] public final Map> bucketIndexToScanNodeToTablets; + private final Supplier bucketNum; + public BucketScanSource(Map> bucketIndexToScanNodeToTablets) { this.bucketIndexToScanNodeToTablets = bucketIndexToScanNodeToTablets; + + this.bucketNum = Suppliers.memoize(() -> bucketIndexToScanNodeToTablets.values() + .stream() + .flatMap(scanNodeToRanges -> scanNodeToRanges.keySet().stream()) + .filter(OlapScanNode.class::isInstance) + .map(scanNode -> ((OlapScanNode) scanNode).getBucketNum()) + .reduce(Integer::max) + .orElse(1)); + } + + public int bucketNum() { + return bucketNum.get(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java index 043f67825e3e498..2972bbdf4a26a05 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java @@ -164,7 +164,7 @@ public RowBatch getNext() throws Exception { } public boolean isEof() { - return coordinatorContext.asQueryProcessor().isEof(); + return coordinatorContext.asQueryProcessor().isEos(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java index 52af42941bf0113..fcee71dc878d4d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java @@ -36,11 +36,13 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; @@ -83,7 +85,12 @@ public static QueryProcessor build(CoordinatorContext coordinatorContext, SqlPip List topInstances = topFragment.getInstanceJobs(); List receivers = Lists.newArrayListWithCapacity(topInstances.size()); + Map distinctWorkerJobs = Maps.newLinkedHashMap(); for (AssignedJob topInstance : topInstances) { + distinctWorkerJobs.putIfAbsent(topInstance.getAssignedWorker().id(), topInstance); + } + + for (AssignedJob topInstance : distinctWorkerJobs.values()) { DistributedPlanWorker topWorker = topInstance.getAssignedWorker(); TNetworkAddress execBeAddr = new TNetworkAddress(topWorker.host(), topWorker.brpcPort()); receivers.add( @@ -104,15 +111,11 @@ public static QueryProcessor build(CoordinatorContext coordinatorContext, SqlPip return new QueryProcessor(coordinatorContext, sqlPipelineTask, receivers); } - public boolean isEof() { + public boolean isEos() { return runningReceivers.isEmpty(); } public RowBatch getNext() throws UserException, TException, RpcException { - if (runningReceivers.isEmpty()) { - throw new UserException("There is no receiver."); - } - ResultReceiver receiver = runningReceivers.get(receiverOffset); Status status = new Status(); RowBatch resultBatch = receiver.getNext(status); @@ -120,8 +123,9 @@ public RowBatch getNext() throws UserException, TException, RpcException { LOG.warn("Query {} coordinator get next fail, {}, need cancel.", DebugUtil.printId(coordinatorContext.queryId), status.getErrorMsg()); } + coordinatorContext.updateStatusIfOk(status); - Status copyStatus = coordinatorContext.updateStatusIfOk(status); + Status copyStatus = coordinatorContext.readCloneStatus(); if (!copyStatus.ok()) { if (Strings.isNullOrEmpty(copyStatus.getErrorMsg())) { copyStatus.rewriteErrorMsg(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java index af2946d5c64742b..9827d8d8f789cb4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java @@ -27,6 +27,7 @@ import org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource; import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanRanges; import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanSource; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedScanBucketOlapTableJob; import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.OlapTableSink; import org.apache.doris.planner.PlanFragment; @@ -81,8 +82,6 @@ private static Map plansToTh for (int recvrId = 0; recvrId < currentFragmentPlan.getInstanceJobs().size(); recvrId++) { AssignedJob instanceJob = currentFragmentPlan.getInstanceJobs().get(recvrId); - // Suggestion: Do not modify currentFragmentParam out of the `fragmentToThriftIfAbsent` method, - // except add instanceParam into local_params TPipelineFragmentParams currentFragmentParam = fragmentToThriftIfAbsent( currentFragmentPlan, instanceJob, workerToCurrentFragment, exchangeSenderNum, fileScanRangeParams, @@ -92,7 +91,6 @@ private static Map plansToTh List instancesParams = currentFragmentParam.getLocalParams(); currentFragmentParam.getShuffleIdxToInstanceIdx().put(recvrId, instancesParams.size()); currentFragmentParam.getPerNodeSharedScans().putAll(instanceParam.getPerNodeSharedScans()); - currentFragmentParam.setNumBuckets(0); instancesParams.add(instanceParam); } @@ -254,7 +252,15 @@ private static TPipelineFragmentParams fragmentToThriftIfAbsent( params.setWorkloadGroups(coordinatorContext.getWorkloadGroups()); params.setFileScanParams(fileScanRangeParamsMap); - // params.setNumBuckets(fragment.getBucketNum()); + + if (fragmentPlan.getFragmentJob() instanceof UnassignedScanBucketOlapTableJob) { + int bucketNum = ((UnassignedScanBucketOlapTableJob) fragmentPlan.getFragmentJob()) + .getOlapScanNodes() + .get(0) + .getBucketNum(); + params.setNumBuckets(bucketNum); + } + params.setPerNodeSharedScans(new LinkedHashMap<>()); // if (ignoreDataDistribution) { // params.setParallelInstances(parallelTasksNum); @@ -315,14 +321,17 @@ private static void setDefaultScanSourceParam(DefaultScanSource defaultScanSourc private static void setBucketScanSourceParam(BucketScanSource bucketScanSource, TPipelineInstanceParams params) { Map> scanNodeIdToScanRanges = Maps.newLinkedHashMap(); + Map perNodeSharedScans = Maps.newLinkedHashMap(); for (Map scanNodeToRanges : bucketScanSource.bucketIndexToScanNodeToTablets.values()) { for (Entry kv2 : scanNodeToRanges.entrySet()) { int scanNodeId = kv2.getKey().getId().asInt(); + perNodeSharedScans.put(scanNodeId, false); List scanRanges = scanNodeIdToScanRanges.computeIfAbsent(scanNodeId, ArrayList::new); List currentScanRanges = kv2.getValue().params; scanRanges.addAll(currentScanRanges); } } params.setPerNodeScanRanges(scanNodeIdToScanRanges); + params.setPerNodeSharedScans(perNodeSharedScans); } }