From 01aef1331881dfabfb0c3e065bceae02bd02c65b Mon Sep 17 00:00:00 2001 From: 924060929 <924060929@qq.com> Date: Mon, 14 Oct 2024 22:16:02 +0800 Subject: [PATCH] fix multicastsink --- .../plans/distribute/DistributePlanner.java | 23 +++- .../plans/distribute/DistributedPlan.java | 8 +- .../distribute/PipelineDistributedPlan.java | 36 ++++-- .../doris/qe/runtime/ThriftPlansBuilder.java | 114 ++++++++++++------ 4 files changed, 125 insertions(+), 56 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java index eaf302f374d566e..159c91ee55be9ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java @@ -31,16 +31,20 @@ import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJobBuilder; import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedScanBucketOlapTableJob; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.planner.DataSink; +import org.apache.doris.planner.DataStreamSink; import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.MultiCastDataSink; import org.apache.doris.planner.MultiCastPlanFragment; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanFragmentId; import org.apache.doris.thrift.TUniqueId; -import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.LinkedHashMultimap; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; +import com.google.common.collect.SetMultimap; import java.util.Arrays; import java.util.List; @@ -80,7 +84,7 @@ private FragmentIdMapping buildDistributePlans( UnassignedJob fragmentJob = idToUnassignedJobs.get(fragmentId); List instanceJobs = idToAssignedJobs.get(fragmentId); - ListMultimap exchangeNodeToChildren = ArrayListMultimap.create(); + SetMultimap exchangeNodeToChildren = LinkedHashMultimap.create(); for (PlanFragment childFragment : fragment.getChildren()) { if (childFragment instanceof MultiCastPlanFragment) { for (ExchangeNode exchangeNode : ((MultiCastPlanFragment) childFragment).getDestNodeList()) { @@ -88,7 +92,6 @@ private FragmentIdMapping buildDistributePlans( exchangeNodeToChildren.put( exchangeNode, idToDistributedPlans.get(childFragment.getFragmentId()) ); - break; } } } else { @@ -137,7 +140,19 @@ private void linkPipelinePlan( if (receiveSideIsBucketShuffleJoinSide) { receiverInstances = getDestinationsByBuckets(receiverPlan, receiverInstances); } - senderPlan.setDestinations(receiverInstances); + + DataSink sink = senderPlan.getFragmentJob().getFragment().getSink(); + if (sink instanceof MultiCastDataSink) { + MultiCastDataSink multiCastDataSink = (MultiCastDataSink) sink; + for (DataStreamSink realSink : multiCastDataSink.getDataStreamSinks()) { + if (realSink.getExchNodeId() == linkNode.getId()) { + senderPlan.addDestinations(realSink, receiverInstances); + break; + } + } + } else { + senderPlan.addDestinations(sink, receiverInstances); + } } private List getDestinationsByBuckets( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributedPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributedPlan.java index f787d34bb3df7c5..5b934bf7bfe534e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributedPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributedPlan.java @@ -21,7 +21,7 @@ import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob; import org.apache.doris.planner.ExchangeNode; -import com.google.common.collect.ListMultimap; +import com.google.common.collect.SetMultimap; import java.util.List; import java.util.Objects; @@ -30,9 +30,9 @@ @lombok.Getter public abstract class DistributedPlan extends AbstractTreeNode { protected final UnassignedJob fragmentJob; - protected final ListMultimap inputs; + protected final SetMultimap inputs; - public DistributedPlan(UnassignedJob fragmentJob, ListMultimap inputs) { + public DistributedPlan(UnassignedJob fragmentJob, SetMultimap inputs) { this.fragmentJob = Objects.requireNonNull(fragmentJob, "fragmentJob can not be null"); this.inputs = Objects.requireNonNull(inputs, "inputs can not be null"); } @@ -41,7 +41,7 @@ public UnassignedJob getFragmentJob() { return fragmentJob; } - public ListMultimap getInputs() { + public SetMultimap getInputs() { return inputs; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/PipelineDistributedPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/PipelineDistributedPlan.java index da50708912bbf0e..93348f7d746569b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/PipelineDistributedPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/PipelineDistributedPlan.java @@ -21,13 +21,15 @@ import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob; import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.planner.DataSink; import org.apache.doris.planner.ExchangeNode; import org.apache.doris.thrift.TExplainLevel; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ListMultimap; +import com.google.common.collect.Maps; +import com.google.common.collect.SetMultimap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -36,29 +38,29 @@ public class PipelineDistributedPlan extends DistributedPlan { protected final List instanceJobs; // current, we only support all instances of the same fragment reuse the same destination - private List destinations; + private Map> destinations; public PipelineDistributedPlan( UnassignedJob fragmentJob, List instanceJobs, - ListMultimap inputs) { + SetMultimap inputs) { super(fragmentJob, inputs); this.instanceJobs = Utils.fastToImmutableList( Objects.requireNonNull(instanceJobs, "instanceJobs can not be null") ); - this.destinations = ImmutableList.of(); + this.destinations = Maps.newLinkedHashMap(); } public List getInstanceJobs() { return instanceJobs; } - public List getDestinations() { + public Map> getDestinations() { return destinations; } - public void setDestinations(List destinations) { - this.destinations = destinations; + public void addDestinations(DataSink sink, List destinations) { + this.destinations.put(sink, destinations); } @Override @@ -82,11 +84,19 @@ public String toString(int displayFragmentId) { ); AtomicInteger bucketNum = new AtomicInteger(0); - String destinationStr = destinations.stream() - .map(destination -> " " - + "#" + bucketNum.getAndIncrement() + ": " - + DebugUtil.printId(destination.instanceId())) - .collect(Collectors.joining(",\n")); + String destinationStr = destinations.entrySet() + .stream() + .map(kv -> { + String str = kv.getValue() + .stream() + .map(destination -> " " + + "#" + bucketNum.getAndIncrement() + ": " + + DebugUtil.printId(destination.instanceId())) + .collect(Collectors.joining(",\n")); + return " Exchange " + kv.getKey().getExchNodeId().asInt() + + ": [" + (str.isEmpty() ? "" : "\n" + str + "\n ") + "]"; + }) + .collect(Collectors.joining(",\n")); return "PipelineDistributedPlan(\n" + " id: " + displayFragmentId + ",\n" + " parallel: " + instanceJobs.size() + ",\n" diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java index d0de12c422b5a7f..f010d7baaae850f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java @@ -30,7 +30,10 @@ import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanSource; import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedScanBucketOlapTableJob; import org.apache.doris.nereids.trees.plans.physical.TopnFilter; +import org.apache.doris.planner.DataSink; +import org.apache.doris.planner.DataStreamSink; import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.MultiCastDataSink; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.OlapTableSink; import org.apache.doris.planner.PlanFragment; @@ -100,7 +103,6 @@ private static Map plansToTh Map fileScanRangeParams = computeFileScanRangeParams(currentFragmentPlan); Map exchangeSenderNum = computeExchangeSenderNum(currentFragmentPlan); Map workerToCurrentFragment = Maps.newLinkedHashMap(); - List destinations = destinationToThrift(currentFragmentPlan); for (int instanceNumInCurrentFragment = 0; instanceNumInCurrentFragment < currentFragmentPlan.getInstanceJobs().size(); @@ -109,7 +111,7 @@ private static Map plansToTh TPipelineFragmentParams currentFragmentParam = fragmentToThriftIfAbsent( currentFragmentPlan, instanceJob, workerToCurrentFragment, exchangeSenderNum, fileScanRangeParams, - workerProcessInstanceNum, destinations, coordinatorContext); + workerProcessInstanceNum, coordinatorContext); TPipelineInstanceParams instanceParam = instanceToThrift( currentFragmentParam, instanceJob, runtimeFiltersThriftBuilder, @@ -232,32 +234,59 @@ private static Map computeExchangeSenderNum(PipelineDistribute return senderNum; } - private static List destinationToThrift(PipelineDistributedPlan plan) { - List destinationJobs = plan.getDestinations(); - List destinations = Lists.newArrayListWithCapacity(destinationJobs.size()); - for (int receiverId = 0; receiverId < destinationJobs.size(); receiverId++) { - AssignedJob destinationJob = destinationJobs.get(receiverId); - DistributedPlanWorker worker = destinationJob.getAssignedWorker(); - String host = worker.host(); - int port = worker.port(); - int brpcPort = worker.brpcPort(); - - TPlanFragmentDestination destination = new TPlanFragmentDestination(); - destination.setServer(new TNetworkAddress(host, port)); - destination.setBrpcServer(new TNetworkAddress(host, brpcPort)); - destination.setFragmentInstanceId(destinationJob.instanceId()); - destinations.add(destination); + private static void setMultiCastDestinationThrift(PipelineDistributedPlan fragmentPlan) { + MultiCastDataSink multiCastDataSink = (MultiCastDataSink) fragmentPlan.getFragmentJob().getFragment().getSink(); + List> destinationList = multiCastDataSink.getDestinations(); + + List dataStreamSinks = multiCastDataSink.getDataStreamSinks(); + for (int i = 0; i < dataStreamSinks.size(); i++) { + DataStreamSink realSink = dataStreamSinks.get(i); + List destinations = destinationList.get(i); + for (Entry> kv : fragmentPlan.getDestinations().entrySet()) { + DataSink sink = kv.getKey(); + if (sink == realSink) { + List destInstances = kv.getValue(); + for (AssignedJob destInstance : destInstances) { + destinations.add(instanceToDestination(destInstance)); + } + break; + } + } + } + } + + private static List nonMultiCastDestinationToThrift(PipelineDistributedPlan plan) { + Map> destinationsMapping = plan.getDestinations(); + List destinations = Lists.newArrayList(); + if (!destinationsMapping.isEmpty()) { + List destinationJobs = destinationsMapping.entrySet().iterator().next().getValue(); + for (AssignedJob destinationJob : destinationJobs) { + destinations.add(instanceToDestination(destinationJob)); + } } return destinations; } + private static TPlanFragmentDestination instanceToDestination(AssignedJob instance) { + DistributedPlanWorker worker = instance.getAssignedWorker(); + String host = worker.host(); + int port = worker.port(); + int brpcPort = worker.brpcPort(); + + TPlanFragmentDestination destination = new TPlanFragmentDestination(); + destination.setServer(new TNetworkAddress(host, port)); + destination.setBrpcServer(new TNetworkAddress(host, brpcPort)); + destination.setFragmentInstanceId(instance.instanceId()); + return destination; + } + private static TPipelineFragmentParams fragmentToThriftIfAbsent( PipelineDistributedPlan fragmentPlan, AssignedJob assignedJob, Map workerToFragmentParams, Map exchangeSenderNum, Map fileScanRangeParamsMap, Multiset workerProcessInstanceNum, - List destinations, SqlCoordinatorContext coordinatorContext) { + SqlCoordinatorContext coordinatorContext) { DistributedPlanWorker worker = assignedJob.getAssignedWorker(); return workerToFragmentParams.computeIfAbsent(worker, w -> { PlanFragment fragment = fragmentPlan.getFragmentJob().getFragment(); @@ -278,7 +307,15 @@ private static TPipelineFragmentParams fragmentToThriftIfAbsent( params.setNeedWaitExecutionTrigger(coordinatorContext.twoPhaseExecution); params.setPerExchNumSenders(exchangeSenderNum); - params.setDestinations(destinations); + + List nonMultiCastDestinations; + if (fragment.getSink() instanceof MultiCastDataSink) { + nonMultiCastDestinations = Lists.newArrayList(); + setMultiCastDestinationThrift(fragmentPlan); + } else { + nonMultiCastDestinations = nonMultiCastDestinationToThrift(fragmentPlan); + } + params.setDestinations(nonMultiCastDestinations); int instanceNumInThisFragment = fragmentPlan.getInstanceJobs().size(); params.setNumSenders(instanceNumInThisFragment); @@ -467,24 +504,31 @@ private static void filterInstancesWhichReceiveDataFromRemote( return; } PipelineDistributedPlan firstInputPlan = (PipelineDistributedPlan) inputPlans.iterator().next(); - Set destinations = Sets.newLinkedHashSet(firstInputPlan.getDestinations()); + PlanFragment receiverFragment = receivePlan.getFragmentJob().getFragment(); + Map> sinkToDestInstances = firstInputPlan.getDestinations(); + for (Entry> kv : sinkToDestInstances.entrySet()) { + DataSink senderSink = kv.getKey(); + if (senderSink.getFragment() == receiverFragment) { + Set destinations = Sets.newLinkedHashSet(kv.getValue()); + Map backendIdToInstanceCount = Maps.newLinkedHashMap(); + List instanceJobs = receivePlan.getInstanceJobs(); + for (AssignedJob instanceJob : instanceJobs) { + if (!destinations.contains(instanceJob)) { + // the non-first-local-shuffle instances per host + // and non-first-share-broadcast-hash-table instances per host + // are not need receive data from other fragments, so we will skip it + continue; + } - Map backendIdToInstanceCount = Maps.newLinkedHashMap(); - List instanceJobs = receivePlan.getInstanceJobs(); - for (AssignedJob instanceJob : instanceJobs) { - if (!destinations.contains(instanceJob)) { - // the non-first-local-shuffle instances per host - // and non-first-share-broadcast-hash-table instances per host - // are not need receive data from other fragments, so we will skip it - continue; + AtomicInteger instanceCount = backendIdToInstanceCount.computeIfAbsent( + instanceJob.getAssignedWorker().id(), + (backendId) -> new AtomicInteger() + ); + int instanceIdInThisBackend = instanceCount.getAndIncrement(); + computeFn.accept(instanceJob, instanceIdInThisBackend); + } + break; } - - AtomicInteger instanceCount = backendIdToInstanceCount.computeIfAbsent( - instanceJob.getAssignedWorker().id(), - (backendId) -> new AtomicInteger() - ); - int instanceIdInThisBackend = instanceCount.getAndIncrement(); - computeFn.accept(instanceJob, instanceIdInThisBackend); } }