Skip to content

Commit

Permalink
fix multicastsink
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Oct 14, 2024
1 parent 9c430a6 commit 01aef13
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@
import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJobBuilder;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedScanBucketOlapTableJob;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.MultiCastDataSink;
import org.apache.doris.planner.MultiCastPlanFragment;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.SetMultimap;

import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -80,15 +84,14 @@ private FragmentIdMapping<DistributedPlan> buildDistributePlans(
UnassignedJob fragmentJob = idToUnassignedJobs.get(fragmentId);
List<AssignedJob> instanceJobs = idToAssignedJobs.get(fragmentId);

ListMultimap<ExchangeNode, DistributedPlan> exchangeNodeToChildren = ArrayListMultimap.create();
SetMultimap<ExchangeNode, DistributedPlan> exchangeNodeToChildren = LinkedHashMultimap.create();
for (PlanFragment childFragment : fragment.getChildren()) {
if (childFragment instanceof MultiCastPlanFragment) {
for (ExchangeNode exchangeNode : ((MultiCastPlanFragment) childFragment).getDestNodeList()) {
if (exchangeNode.getFragment() == fragment) {
exchangeNodeToChildren.put(
exchangeNode, idToDistributedPlans.get(childFragment.getFragmentId())
);
break;
}
}
} else {
Expand Down Expand Up @@ -137,7 +140,19 @@ private void linkPipelinePlan(
if (receiveSideIsBucketShuffleJoinSide) {
receiverInstances = getDestinationsByBuckets(receiverPlan, receiverInstances);
}
senderPlan.setDestinations(receiverInstances);

DataSink sink = senderPlan.getFragmentJob().getFragment().getSink();
if (sink instanceof MultiCastDataSink) {
MultiCastDataSink multiCastDataSink = (MultiCastDataSink) sink;
for (DataStreamSink realSink : multiCastDataSink.getDataStreamSinks()) {
if (realSink.getExchNodeId() == linkNode.getId()) {
senderPlan.addDestinations(realSink, receiverInstances);
break;
}
}
} else {
senderPlan.addDestinations(sink, receiverInstances);
}
}

private List<AssignedJob> getDestinationsByBuckets(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob;
import org.apache.doris.planner.ExchangeNode;

import com.google.common.collect.ListMultimap;
import com.google.common.collect.SetMultimap;

import java.util.List;
import java.util.Objects;
Expand All @@ -30,9 +30,9 @@
@lombok.Getter
public abstract class DistributedPlan extends AbstractTreeNode<DistributedPlan> {
protected final UnassignedJob fragmentJob;
protected final ListMultimap<ExchangeNode, DistributedPlan> inputs;
protected final SetMultimap<ExchangeNode, DistributedPlan> inputs;

public DistributedPlan(UnassignedJob fragmentJob, ListMultimap<ExchangeNode, DistributedPlan> inputs) {
public DistributedPlan(UnassignedJob fragmentJob, SetMultimap<ExchangeNode, DistributedPlan> inputs) {
this.fragmentJob = Objects.requireNonNull(fragmentJob, "fragmentJob can not be null");
this.inputs = Objects.requireNonNull(inputs, "inputs can not be null");
}
Expand All @@ -41,7 +41,7 @@ public UnassignedJob getFragmentJob() {
return fragmentJob;
}

public ListMultimap<ExchangeNode, DistributedPlan> getInputs() {
public SetMultimap<ExchangeNode, DistributedPlan> getInputs() {
return inputs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.thrift.TExplainLevel;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand All @@ -36,29 +38,29 @@
public class PipelineDistributedPlan extends DistributedPlan {
protected final List<AssignedJob> instanceJobs;
// current, we only support all instances of the same fragment reuse the same destination
private List<AssignedJob> destinations;
private Map<DataSink, List<AssignedJob>> destinations;

public PipelineDistributedPlan(
UnassignedJob fragmentJob,
List<AssignedJob> instanceJobs,
ListMultimap<ExchangeNode, DistributedPlan> inputs) {
SetMultimap<ExchangeNode, DistributedPlan> inputs) {
super(fragmentJob, inputs);
this.instanceJobs = Utils.fastToImmutableList(
Objects.requireNonNull(instanceJobs, "instanceJobs can not be null")
);
this.destinations = ImmutableList.of();
this.destinations = Maps.newLinkedHashMap();
}

public List<AssignedJob> getInstanceJobs() {
return instanceJobs;
}

public List<AssignedJob> getDestinations() {
public Map<DataSink, List<AssignedJob>> getDestinations() {
return destinations;
}

public void setDestinations(List<AssignedJob> destinations) {
this.destinations = destinations;
public void addDestinations(DataSink sink, List<AssignedJob> destinations) {
this.destinations.put(sink, destinations);
}

@Override
Expand All @@ -82,11 +84,19 @@ public String toString(int displayFragmentId) {
);

AtomicInteger bucketNum = new AtomicInteger(0);
String destinationStr = destinations.stream()
.map(destination -> " "
+ "#" + bucketNum.getAndIncrement() + ": "
+ DebugUtil.printId(destination.instanceId()))
.collect(Collectors.joining(",\n"));
String destinationStr = destinations.entrySet()
.stream()
.map(kv -> {
String str = kv.getValue()
.stream()
.map(destination -> " "
+ "#" + bucketNum.getAndIncrement() + ": "
+ DebugUtil.printId(destination.instanceId()))
.collect(Collectors.joining(",\n"));
return " Exchange " + kv.getKey().getExchNodeId().asInt()
+ ": [" + (str.isEmpty() ? "" : "\n" + str + "\n ") + "]";
})
.collect(Collectors.joining(",\n"));
return "PipelineDistributedPlan(\n"
+ " id: " + displayFragmentId + ",\n"
+ " parallel: " + instanceJobs.size() + ",\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanSource;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedScanBucketOlapTableJob;
import org.apache.doris.nereids.trees.plans.physical.TopnFilter;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.MultiCastDataSink;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PlanFragment;
Expand Down Expand Up @@ -100,7 +103,6 @@ private static Map<DistributedPlanWorker, TPipelineFragmentParamsList> plansToTh
Map<Integer, TFileScanRangeParams> fileScanRangeParams = computeFileScanRangeParams(currentFragmentPlan);
Map<Integer, Integer> exchangeSenderNum = computeExchangeSenderNum(currentFragmentPlan);
Map<DistributedPlanWorker, TPipelineFragmentParams> workerToCurrentFragment = Maps.newLinkedHashMap();
List<TPlanFragmentDestination> destinations = destinationToThrift(currentFragmentPlan);

for (int instanceNumInCurrentFragment = 0;
instanceNumInCurrentFragment < currentFragmentPlan.getInstanceJobs().size();
Expand All @@ -109,7 +111,7 @@ private static Map<DistributedPlanWorker, TPipelineFragmentParamsList> plansToTh
TPipelineFragmentParams currentFragmentParam = fragmentToThriftIfAbsent(
currentFragmentPlan, instanceJob, workerToCurrentFragment,
exchangeSenderNum, fileScanRangeParams,
workerProcessInstanceNum, destinations, coordinatorContext);
workerProcessInstanceNum, coordinatorContext);

TPipelineInstanceParams instanceParam = instanceToThrift(
currentFragmentParam, instanceJob, runtimeFiltersThriftBuilder,
Expand Down Expand Up @@ -232,32 +234,59 @@ private static Map<Integer, Integer> computeExchangeSenderNum(PipelineDistribute
return senderNum;
}

private static List<TPlanFragmentDestination> destinationToThrift(PipelineDistributedPlan plan) {
List<AssignedJob> destinationJobs = plan.getDestinations();
List<TPlanFragmentDestination> destinations = Lists.newArrayListWithCapacity(destinationJobs.size());
for (int receiverId = 0; receiverId < destinationJobs.size(); receiverId++) {
AssignedJob destinationJob = destinationJobs.get(receiverId);
DistributedPlanWorker worker = destinationJob.getAssignedWorker();
String host = worker.host();
int port = worker.port();
int brpcPort = worker.brpcPort();

TPlanFragmentDestination destination = new TPlanFragmentDestination();
destination.setServer(new TNetworkAddress(host, port));
destination.setBrpcServer(new TNetworkAddress(host, brpcPort));
destination.setFragmentInstanceId(destinationJob.instanceId());
destinations.add(destination);
private static void setMultiCastDestinationThrift(PipelineDistributedPlan fragmentPlan) {
MultiCastDataSink multiCastDataSink = (MultiCastDataSink) fragmentPlan.getFragmentJob().getFragment().getSink();
List<List<TPlanFragmentDestination>> destinationList = multiCastDataSink.getDestinations();

List<DataStreamSink> dataStreamSinks = multiCastDataSink.getDataStreamSinks();
for (int i = 0; i < dataStreamSinks.size(); i++) {
DataStreamSink realSink = dataStreamSinks.get(i);
List<TPlanFragmentDestination> destinations = destinationList.get(i);
for (Entry<DataSink, List<AssignedJob>> kv : fragmentPlan.getDestinations().entrySet()) {
DataSink sink = kv.getKey();
if (sink == realSink) {
List<AssignedJob> destInstances = kv.getValue();
for (AssignedJob destInstance : destInstances) {
destinations.add(instanceToDestination(destInstance));
}
break;
}
}
}
}

private static List<TPlanFragmentDestination> nonMultiCastDestinationToThrift(PipelineDistributedPlan plan) {
Map<DataSink, List<AssignedJob>> destinationsMapping = plan.getDestinations();
List<TPlanFragmentDestination> destinations = Lists.newArrayList();
if (!destinationsMapping.isEmpty()) {
List<AssignedJob> destinationJobs = destinationsMapping.entrySet().iterator().next().getValue();
for (AssignedJob destinationJob : destinationJobs) {
destinations.add(instanceToDestination(destinationJob));
}
}
return destinations;
}

private static TPlanFragmentDestination instanceToDestination(AssignedJob instance) {
DistributedPlanWorker worker = instance.getAssignedWorker();
String host = worker.host();
int port = worker.port();
int brpcPort = worker.brpcPort();

TPlanFragmentDestination destination = new TPlanFragmentDestination();
destination.setServer(new TNetworkAddress(host, port));
destination.setBrpcServer(new TNetworkAddress(host, brpcPort));
destination.setFragmentInstanceId(instance.instanceId());
return destination;
}

private static TPipelineFragmentParams fragmentToThriftIfAbsent(
PipelineDistributedPlan fragmentPlan, AssignedJob assignedJob,
Map<DistributedPlanWorker, TPipelineFragmentParams> workerToFragmentParams,
Map<Integer, Integer> exchangeSenderNum,
Map<Integer, TFileScanRangeParams> fileScanRangeParamsMap,
Multiset<DistributedPlanWorker> workerProcessInstanceNum,
List<TPlanFragmentDestination> destinations, SqlCoordinatorContext coordinatorContext) {
SqlCoordinatorContext coordinatorContext) {
DistributedPlanWorker worker = assignedJob.getAssignedWorker();
return workerToFragmentParams.computeIfAbsent(worker, w -> {
PlanFragment fragment = fragmentPlan.getFragmentJob().getFragment();
Expand All @@ -278,7 +307,15 @@ private static TPipelineFragmentParams fragmentToThriftIfAbsent(

params.setNeedWaitExecutionTrigger(coordinatorContext.twoPhaseExecution);
params.setPerExchNumSenders(exchangeSenderNum);
params.setDestinations(destinations);

List<TPlanFragmentDestination> nonMultiCastDestinations;
if (fragment.getSink() instanceof MultiCastDataSink) {
nonMultiCastDestinations = Lists.newArrayList();
setMultiCastDestinationThrift(fragmentPlan);
} else {
nonMultiCastDestinations = nonMultiCastDestinationToThrift(fragmentPlan);
}
params.setDestinations(nonMultiCastDestinations);

int instanceNumInThisFragment = fragmentPlan.getInstanceJobs().size();
params.setNumSenders(instanceNumInThisFragment);
Expand Down Expand Up @@ -467,24 +504,31 @@ private static void filterInstancesWhichReceiveDataFromRemote(
return;
}
PipelineDistributedPlan firstInputPlan = (PipelineDistributedPlan) inputPlans.iterator().next();
Set<AssignedJob> destinations = Sets.newLinkedHashSet(firstInputPlan.getDestinations());
PlanFragment receiverFragment = receivePlan.getFragmentJob().getFragment();
Map<DataSink, List<AssignedJob>> sinkToDestInstances = firstInputPlan.getDestinations();
for (Entry<DataSink, List<AssignedJob>> kv : sinkToDestInstances.entrySet()) {
DataSink senderSink = kv.getKey();
if (senderSink.getFragment() == receiverFragment) {
Set<AssignedJob> destinations = Sets.newLinkedHashSet(kv.getValue());
Map<Long, AtomicInteger> backendIdToInstanceCount = Maps.newLinkedHashMap();
List<AssignedJob> instanceJobs = receivePlan.getInstanceJobs();
for (AssignedJob instanceJob : instanceJobs) {
if (!destinations.contains(instanceJob)) {
// the non-first-local-shuffle instances per host
// and non-first-share-broadcast-hash-table instances per host
// are not need receive data from other fragments, so we will skip it
continue;
}

Map<Long, AtomicInteger> backendIdToInstanceCount = Maps.newLinkedHashMap();
List<AssignedJob> instanceJobs = receivePlan.getInstanceJobs();
for (AssignedJob instanceJob : instanceJobs) {
if (!destinations.contains(instanceJob)) {
// the non-first-local-shuffle instances per host
// and non-first-share-broadcast-hash-table instances per host
// are not need receive data from other fragments, so we will skip it
continue;
AtomicInteger instanceCount = backendIdToInstanceCount.computeIfAbsent(
instanceJob.getAssignedWorker().id(),
(backendId) -> new AtomicInteger()
);
int instanceIdInThisBackend = instanceCount.getAndIncrement();
computeFn.accept(instanceJob, instanceIdInThisBackend);
}
break;
}

AtomicInteger instanceCount = backendIdToInstanceCount.computeIfAbsent(
instanceJob.getAssignedWorker().id(),
(backendId) -> new AtomicInteger()
);
int instanceIdInThisBackend = instanceCount.getAndIncrement();
computeFn.accept(instanceJob, instanceIdInThisBackend);
}
}

Expand Down

0 comments on commit 01aef13

Please sign in to comment.