Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Dec 18, 2024
1 parent bab8088 commit d78387b
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,9 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;

/** AbstractUnassignedScanJob */
public abstract class AbstractUnassignedScanJob extends AbstractUnassignedJob {
protected final AtomicInteger shareScanIdGenerator = new AtomicInteger();

public AbstractUnassignedScanJob(StatementContext statementContext, PlanFragment fragment,
List<ScanNode> scanNodes, ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob) {
super(statementContext, fragment, scanNodes, exchangeToChildJob);
Expand Down Expand Up @@ -148,11 +145,10 @@ protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, Li

// one scan range generate multiple instances,
// different instances reference the same scan source
int shareScanId = shareScanIdGenerator.getAndIncrement();
ScanSource emptyShareScanSource = shareScanSource.newEmpty();
for (int i = 0; i < instanceNum; i++) {
LocalShuffleAssignedJob instance = new LocalShuffleAssignedJob(
instances.size(), shareScanId, i > 0,
instances.size(), i > 0,
context.nextInstanceId(), this, worker,
i == 0 ? shareScanSource : emptyShareScanSource
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,22 @@
import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.ImmutableMap;

import java.util.Map;

/**
* LocalShuffleAssignedJob:
* this instance will use ignore_data_distribution function of local shuffle to add parallel
* after scan data
*/
public class LocalShuffleAssignedJob extends StaticAssignedJob {
public final int shareScanId;
public final boolean receiveDataFromLocal;

public LocalShuffleAssignedJob(
int indexInUnassignedJob, int shareScanId, boolean receiveDataFromLocal, TUniqueId instanceId,
int indexInUnassignedJob, boolean receiveDataFromLocal, TUniqueId instanceId,
UnassignedJob unassignedJob,
DistributedPlanWorker worker, ScanSource scanSource) {
super(indexInUnassignedJob, instanceId, unassignedJob, worker, scanSource);
this.shareScanId = shareScanId;
this.receiveDataFromLocal = receiveDataFromLocal;
}

@Override
protected Map<String, String> extraInfo() {
return ImmutableMap.of("shareScanIndex", String.valueOf(shareScanId));
}

@Override
protected String formatScanSourceString() {
if (receiveDataFromLocal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ public class LocalShuffleBucketJoinAssignedJob extends LocalShuffleAssignedJob {
private volatile Set<Integer> assignedJoinBucketIndexes;

public LocalShuffleBucketJoinAssignedJob(
int indexInUnassignedJob, int shareScanId, boolean receiveDataFromLocal,
int indexInUnassignedJob, boolean receiveDataFromLocal,
TUniqueId instanceId, UnassignedJob unassignedJob,
DistributedPlanWorker worker, ScanSource scanSource,
Set<Integer> assignedJoinBucketIndexes) {
super(indexInUnassignedJob, shareScanId, receiveDataFromLocal, instanceId, unassignedJob, worker, scanSource);
super(indexInUnassignedJob, receiveDataFromLocal, instanceId, unassignedJob, worker, scanSource);
this.assignedJoinBucketIndexes = Utils.fastToImmutableSet(assignedJoinBucketIndexes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,12 @@ public List<AssignedJob> computeAssignedJobs(

DefaultScanSource shareScan = new DefaultScanSource(ImmutableMap.of());
LocalShuffleAssignedJob receiveDataFromRemote = new LocalShuffleAssignedJob(
0, 0, false,
connectContext.nextInstanceId(), this, selectedWorker, shareScan);
0, false, connectContext.nextInstanceId(), this, selectedWorker, shareScan);

instances.add(receiveDataFromRemote);
for (int i = 1; i < expectInstanceNum; ++i) {
LocalShuffleAssignedJob receiveDataFromLocal = new LocalShuffleAssignedJob(
i, 0, true,
connectContext.nextInstanceId(), this, selectedWorker, shareScan);
i, true, connectContext.nextInstanceId(), this, selectedWorker, shareScan);
instances.add(receiveDataFromLocal);
}
return instances.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,11 @@ protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, Li
// only generate one instance to scan all data, in this step
List<ScanSource> assignJoinBuckets = scanSource.parallelize(scanNodes, instanceNum);

int shareScanId = shareScanIdGenerator.getAndIncrement();
BucketScanSource shareScanSource = (BucketScanSource) scanSource;
for (int i = 0; i < assignJoinBuckets.size(); i++) {
BucketScanSource assignedJoinBucket = (BucketScanSource) assignJoinBuckets.get(i);
LocalShuffleBucketJoinAssignedJob instance = new LocalShuffleBucketJoinAssignedJob(
instances.size(), shareScanId, false,
instances.size(), false,
context.nextInstanceId(), this, worker,
assignedJoinBucket,
Utils.fastToImmutableSet(assignedJoinBucket.bucketIndexToScanNodeToTablets.keySet())
Expand All @@ -188,7 +187,7 @@ protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, Li
ScanSource emptyShareScanSource = shareScanSource.newEmpty();
for (int i = assignJoinBuckets.size(); i < instanceNum; i++) {
LocalShuffleBucketJoinAssignedJob instance = new LocalShuffleBucketJoinAssignedJob(
instances.size(), shareScanId, true,
instances.size(), true,
context.nextInstanceId(), this, worker,
emptyShareScanSource,
ImmutableSet.of()
Expand Down Expand Up @@ -259,9 +258,8 @@ private List<AssignedJob> fillUpInstances(List<AssignedJob> instances) {
}
if (!mergedBucketsInSameWorkerInstance) {
fillUpInstance = new LocalShuffleBucketJoinAssignedJob(
newInstances.size(), shareScanIdGenerator.getAndIncrement(),
false, context.nextInstanceId(), this, worker, scanSource,
assignedJoinBuckets
newInstances.size(), false, context.nextInstanceId(),
this, worker, scanSource, assignedJoinBuckets
);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ private List<AssignedJob> buildInstancesWithLocalShuffle(
workerToInstanceIds.put(selectedWorker, i);
}

int shareScanId = 0;
for (Entry<DistributedPlanWorker, Collection<Integer>> kv : workerToInstanceIds.asMap().entrySet()) {
DistributedPlanWorker worker = kv.getKey();
Collection<Integer> indexesInFragment = kv.getValue();
Expand All @@ -151,13 +150,12 @@ private List<AssignedJob> buildInstancesWithLocalShuffle(
boolean receiveDataFromLocal = false;
for (Integer indexInFragment : indexesInFragment) {
LocalShuffleAssignedJob instance = new LocalShuffleAssignedJob(
indexInFragment, shareScanId, receiveDataFromLocal, connectContext.nextInstanceId(),
indexInFragment, receiveDataFromLocal, connectContext.nextInstanceId(),
this, worker, shareScanSource
);
instances.add(instance);
receiveDataFromLocal = true;
}
shareScanId++;
}
return instances.build();
}
Expand Down

0 comments on commit d78387b

Please sign in to comment.