Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Jan 3, 2025
1 parent 52d8f6f commit b3587ef
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 55 deletions.
8 changes: 4 additions & 4 deletions be/src/vec/aggregate_functions/aggregate_function_collect.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,10 @@ struct AggregateFunctionCollectListData<StringRef, HasLimit> {
}
max_size = rhs.max_size;

data->insert_range_from(
*rhs.data, 0,
std::min(assert_cast<size_t, TypeCheckOnRelease::DISABLE>(max_size - size()),
rhs.size()));
data->insert_range_from(*rhs.data, 0,
std::min(assert_cast<size_t, TypeCheckOnRelease::DISABLE>(
static_cast<size_t>(max_size - size())),
rhs.size()));
} else {
data->insert_range_from(*rhs.data, 0, rhs.size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleAssignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleBucketJoinAssignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanSource;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.StaticAssignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJobBuilder;
Expand Down Expand Up @@ -184,7 +186,7 @@ private List<AssignedJob> filterInstancesWhichCanReceiveDataFromRemote(
boolean useLocalShuffle = receiverPlan.getInstanceJobs().stream()
.anyMatch(LocalShuffleAssignedJob.class::isInstance);
if (useLocalShuffle) {
return getFirstInstancePerShareScan(receiverPlan);
return getLocalShuffleRemoteReceiverJob(receiverPlan);
} else if (enableShareHashTableForBroadcastJoin && linkNode.isRightChildOfBroadcastHashJoin()) {
return getFirstInstancePerWorker(receiverPlan.getInstanceJobs());
} else {
Expand All @@ -196,14 +198,26 @@ private List<AssignedJob> sortDestinationInstancesByBuckets(
PipelineDistributedPlan plan, List<AssignedJob> unsorted, int bucketNum) {
AssignedJob[] instances = new AssignedJob[bucketNum];
for (AssignedJob instanceJob : unsorted) {
BucketScanSource bucketScanSource = (BucketScanSource) instanceJob.getScanSource();
for (Integer bucketIndex : bucketScanSource.bucketIndexToScanNodeToTablets.keySet()) {
if (instances[bucketIndex] != null) {
throw new IllegalStateException(
"Multi instances scan same buckets: " + instances[bucketIndex] + " and " + instanceJob
);
if (instanceJob instanceof LocalShuffleBucketJoinAssignedJob) {
LocalShuffleBucketJoinAssignedJob localShuffleJob = (LocalShuffleBucketJoinAssignedJob) instanceJob;
for (Integer bucketIndex : localShuffleJob.getAssignedJoinBucketIndexes()) {
if (instances[bucketIndex] != null) {
throw new IllegalStateException(
"Multi instances scan same buckets: " + instances[bucketIndex] + " and " + instanceJob
);
}
instances[bucketIndex] = instanceJob;
}
} else {
BucketScanSource bucketScanSource = (BucketScanSource) instanceJob.getScanSource();
for (Integer bucketIndex : bucketScanSource.bucketIndexToScanNodeToTablets.keySet()) {
if (instances[bucketIndex] != null) {
throw new IllegalStateException(
"Multi instances scan same buckets: " + instances[bucketIndex] + " and " + instanceJob
);
}
instances[bucketIndex] = instanceJob;
}
instances[bucketIndex] = instanceJob;
}
}

Expand All @@ -221,12 +235,19 @@ private List<AssignedJob> sortDestinationInstancesByBuckets(
return Arrays.asList(instances);
}

private List<AssignedJob> getFirstInstancePerShareScan(PipelineDistributedPlan plan) {
private List<AssignedJob> getLocalShuffleRemoteReceiverJob(PipelineDistributedPlan plan) {
List<AssignedJob> canReceiveDataFromRemote = Lists.newArrayListWithCapacity(plan.getInstanceJobs().size());
for (AssignedJob instanceJob : plan.getInstanceJobs()) {
LocalShuffleAssignedJob localShuffleJob = (LocalShuffleAssignedJob) instanceJob;
if (!localShuffleJob.receiveDataFromLocal) {
canReceiveDataFromRemote.add(localShuffleJob);
if (localShuffleJob instanceof LocalShuffleBucketJoinAssignedJob) {
LocalShuffleBucketJoinAssignedJob bucketJob = (LocalShuffleBucketJoinAssignedJob) localShuffleJob;
if (!bucketJob.getAssignedJoinBucketIndexes().isEmpty()) {
canReceiveDataFromRemote.add(localShuffleJob);
}
} else {
if (!instanceJob.getScanSource().isEmpty()) {
canReceiveDataFromRemote.add(localShuffleJob);
}
}
}
return canReceiveDataFromRemote;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ protected List<AssignedJob> insideMachineParallelization(

// now we should compute how many instances to process the data,
// for example: two instances
int instanceNum = degreeOfParallelism(scanSourceMaxParallel);
int instanceNum = degreeOfParallelism(scanSourceMaxParallel, useLocalShuffleToAddParallel);

if (useLocalShuffleToAddParallel) {
assignLocalShuffleJobs(scanSource, instanceNum, instances, context, worker);
Expand Down Expand Up @@ -129,7 +129,7 @@ protected void assignedDefaultJobs(ScanSource scanSource, int instanceNum, List<
protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, List<AssignedJob> instances,
ConnectContext context, DistributedPlanWorker worker) {
// only generate one instance to scan all data, in this step
List<ScanSource> instanceToScanRanges = scanSource.parallelize(scanNodes, 1);
List<ScanSource> assignedJoinBuckets = scanSource.parallelize(scanNodes, instanceNum);

// when data not big, but aggregation too slow, we will use 1 instance to scan data,
// and use more instances (to ***add parallel***) to process aggregate.
Expand All @@ -144,23 +144,23 @@ protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, Li
// |(share scan node, instance1 will scan all data and local shuffle to other local instances |
// | to parallel compute this data) |
// +------------------------------------------------------------------------------------------------+
ScanSource shareScanSource = instanceToScanRanges.get(0);
ScanSource shareScanSource = assignedJoinBuckets.get(0);

// one scan range generate multiple instances,
// different instances reference the same scan source
int shareScanId = shareScanIdGenerator.getAndIncrement();
ScanSource emptyShareScanSource = shareScanSource.newEmpty();
for (int i = 0; i < instanceNum; i++) {
LocalShuffleAssignedJob instance = new LocalShuffleAssignedJob(
instances.size(), shareScanId, i > 0,
context.nextInstanceId(), this, worker,
i == 0 ? shareScanSource : emptyShareScanSource
instances.size(), shareScanId, context.nextInstanceId(), this, worker,
// only first instance need to scan data
i == 0 ? scanSource : emptyShareScanSource
);
instances.add(instance);
}
}

protected int degreeOfParallelism(int maxParallel) {
protected int degreeOfParallelism(int maxParallel, boolean useLocalShuffleToAddParallel) {
Preconditions.checkArgument(maxParallel > 0, "maxParallel must be positive");
if (!fragment.getDataPartition().isPartitioned()) {
return 1;
Expand All @@ -179,6 +179,10 @@ protected int degreeOfParallelism(int maxParallel) {
}
}

if (useLocalShuffleToAddParallel) {
return Math.max(fragment.getParallelExecNum(), 1);
}

// the scan instance num should not larger than the tablets num
return Math.min(maxParallel, Math.max(fragment.getParallelExecNum(), 1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,17 @@
*/
public class LocalShuffleAssignedJob extends StaticAssignedJob {
public final int shareScanId;
public final boolean receiveDataFromLocal;

public LocalShuffleAssignedJob(
int indexInUnassignedJob, int shareScanId, boolean receiveDataFromLocal, TUniqueId instanceId,
int indexInUnassignedJob, int shareScanId, TUniqueId instanceId,
UnassignedJob unassignedJob,
DistributedPlanWorker worker, ScanSource scanSource) {
super(indexInUnassignedJob, instanceId, unassignedJob, worker, scanSource);
this.shareScanId = shareScanId;
this.receiveDataFromLocal = receiveDataFromLocal;
}

@Override
protected Map<String, String> extraInfo() {
return ImmutableMap.of("shareScanIndex", String.valueOf(shareScanId));
}

@Override
protected String formatScanSourceString() {
if (receiveDataFromLocal) {
return "read data from first instance of " + getAssignedWorker();
} else {
return super.formatScanSourceString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ public class LocalShuffleBucketJoinAssignedJob extends LocalShuffleAssignedJob {
private volatile Set<Integer> assignedJoinBucketIndexes;

public LocalShuffleBucketJoinAssignedJob(
int indexInUnassignedJob, int shareScanId, boolean receiveDataFromLocal,
int indexInUnassignedJob, int shareScanId,
TUniqueId instanceId, UnassignedJob unassignedJob,
DistributedPlanWorker worker, ScanSource scanSource,
Set<Integer> assignedJoinBucketIndexes) {
super(indexInUnassignedJob, shareScanId, receiveDataFromLocal, instanceId, unassignedJob, worker, scanSource);
super(indexInUnassignedJob, shareScanId, instanceId, unassignedJob, worker, scanSource);
this.assignedJoinBucketIndexes = Utils.fastToImmutableSet(assignedJoinBucketIndexes);
}

public Set<Integer> getAssignedJoinBucketIndexes() {
return assignedJoinBucketIndexes;
}

public void addAssignedJoinBucketIndexes(Set<Integer> joinBucketIndexes) {
public synchronized void addAssignedJoinBucketIndexes(Set<Integer> joinBucketIndexes) {
this.assignedJoinBucketIndexes = ImmutableSet.<Integer>builder()
.addAll(assignedJoinBucketIndexes)
.addAll(joinBucketIndexes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,15 @@ public List<AssignedJob> computeAssignedJobs(

DefaultScanSource shareScan = new DefaultScanSource(ImmutableMap.of());
LocalShuffleAssignedJob receiveDataFromRemote = new LocalShuffleAssignedJob(
0, 0, false,
connectContext.nextInstanceId(), this, selectedWorker, shareScan);
0, 0, connectContext.nextInstanceId(),
this, selectedWorker, shareScan
);

instances.add(receiveDataFromRemote);
for (int i = 1; i < expectInstanceNum; ++i) {
LocalShuffleAssignedJob receiveDataFromLocal = new LocalShuffleAssignedJob(
i, 0, true,
connectContext.nextInstanceId(), this, selectedWorker, shareScan);
i, 0, connectContext.nextInstanceId(), this, selectedWorker, shareScan
);
instances.add(receiveDataFromLocal);
}
return instances.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -184,13 +185,23 @@ protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, Li
Set<Integer> assignedJoinBuckets
= ((BucketScanSource) assignJoinBuckets.get(i)).bucketIndexToScanNodeToTablets.keySet();
LocalShuffleBucketJoinAssignedJob instance = new LocalShuffleBucketJoinAssignedJob(
instances.size(), shareScanId, i > 0,
context.nextInstanceId(), this, worker,
instances.size(), shareScanId, context.nextInstanceId(),
this, worker,
i == 0 ? shareScanSource : emptyShareScanSource,
Utils.fastToImmutableSet(assignedJoinBuckets)
);
instances.add(instance);
}

for (int i = assignJoinBuckets.size(); i < instanceNum; ++i) {
LocalShuffleBucketJoinAssignedJob instance = new LocalShuffleBucketJoinAssignedJob(
instances.size(), shareScanId, context.nextInstanceId(),
this, worker, emptyShareScanSource,
// these instance not need to join, because no any bucket assign to it
ImmutableSet.of()
);
instances.add(instance);
}
}

private boolean shouldFillUpInstances(List<HashJoinNode> hashJoinNodes) {
Expand Down Expand Up @@ -256,7 +267,7 @@ private List<AssignedJob> fillUpInstances(List<AssignedJob> instances) {
if (!mergedBucketsInSameWorkerInstance) {
fillUpInstance = new LocalShuffleBucketJoinAssignedJob(
newInstances.size(), shareScanIdGenerator.getAndIncrement(),
false, context.nextInstanceId(), this, worker, scanSource,
context.nextInstanceId(), this, worker, scanSource,
assignedJoinBuckets
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,6 @@ protected int degreeOfParallelism() {
}

// TODO: check nested loop join do right outer / semi / anti join
PlanNode leftMostNode = findLeftmostNode(fragment.getPlanRoot()).second;
// when we use nested loop join do right outer / semi / anti join, the instance must be 1.
if (leftMostNode.getNumInstances() == 1) {
expectInstanceNum = 1;
}
return expectInstanceNum;
}

Expand Down Expand Up @@ -153,14 +148,12 @@ private List<AssignedJob> buildInstancesWithLocalShuffle(

DefaultScanSource shareScanSource = new DefaultScanSource(ImmutableMap.of());

boolean receiveDataFromLocal = false;
for (Integer indexInFragment : indexesInFragment) {
LocalShuffleAssignedJob instance = new LocalShuffleAssignedJob(
indexInFragment, shareScanId, receiveDataFromLocal, connectContext.nextInstanceId(),
indexInFragment, shareScanId, connectContext.nextInstanceId(),
this, worker, shareScanSource
);
instances.add(instance);
receiveDataFromLocal = true;
}
shareScanId++;
}
Expand All @@ -182,7 +175,7 @@ private List<DistributedPlanWorker> distinctShuffleWorkers(List<AssignedJob> ins
private Pair<PlanNode, PlanNode> findLeftmostNode(PlanNode plan) {
PlanNode childPlan = plan;
PlanNode fatherPlan = null;
while (childPlan.getChildren().size() != 0 && !(childPlan instanceof ExchangeNode)) {
while (!childPlan.getChildren().isEmpty() && !(childPlan instanceof ExchangeNode)) {
fatherPlan = childPlan;
childPlan = childPlan.getChild(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1810,7 +1810,8 @@ protected void computeFragmentHosts() throws Exception {
exchangeInstances = ConnectContext.get().getSessionVariable().getExchangeInstanceParallel();
}
// when we use nested loop join do right outer / semi / anti join, the instance must be 1.
if (leftMostNode.getNumInstances() == 1) {
boolean isNereids = context != null && context.getState().isNereids();
if (!isNereids && leftMostNode.getNumInstances() == 1) {
exchangeInstances = 1;
}
// Using serial source means a serial source operator will be used in this fragment (e.g. data will be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ private static void setScanSourceParam(
TPipelineInstanceParams instanceParams) {

boolean isLocalShuffle = instance instanceof LocalShuffleAssignedJob;
if (isLocalShuffle && ((LocalShuffleAssignedJob) instance).receiveDataFromLocal) {
if (isLocalShuffle && instance.getScanSource().isEmpty()) {
// save thrift rpc message size, don't need perNodeScanRanges,
// but the perNodeScanRanges is required rpc field
instanceParams.setPerNodeScanRanges(Maps.newLinkedHashMap());
Expand Down

0 comments on commit b3587ef

Please sign in to comment.