Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Dec 18, 2024
1 parent 3d5053b commit 6ab1b6f
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,12 @@ protected List<AssignedJob> insideMachineParallelization(
ScanSource scanSource = entry.getValue().scanSource;

// usually, its tablets num, or buckets num
int scanSourceMaxParallel = Math.max(scanSource.maxParallel(scanNodes), 1);
int scanSourceMaxParallel = scanSource.maxParallel(scanNodes);

// now we should compute how many instances to process the data,
// for example: two instances
int instanceNum = degreeOfParallelism(scanSourceMaxParallel, useLocalShuffleToAddParallel);

if (useLocalShuffleToAddParallel) {
assignLocalShuffleJobs(scanSource, instanceNum, instances, context, worker);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected Map<String, String> extraInfo() {
@Override
protected String formatScanSourceString() {
if (receiveDataFromLocal) {
return "read data from first instance of " + getAssignedWorker();
return "read data from other instances";
} else {
return super.formatScanSourceString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,26 +170,28 @@ protected List<AssignedJob> insideMachineParallelization(
protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, List<AssignedJob> instances,
ConnectContext context, DistributedPlanWorker worker) {
// only generate one instance to scan all data, in this step
List<ScanSource> assignJoinBuckets = scanSource.parallelize(
scanNodes, 1
);
List<ScanSource> assignJoinBuckets = scanSource.parallelize(scanNodes, instanceNum);

// one scan range generate multiple instances,
// different instances reference the same scan source
int shareScanId = shareScanIdGenerator.getAndIncrement();

Set<Integer> firstInstanceAssignedJoinBuckets
= ((BucketScanSource) assignJoinBuckets.get(0)).bucketIndexToScanNodeToTablets.keySet();

BucketScanSource shareScanSource = (BucketScanSource) scanSource;
ScanSource emptyShareScanSource = shareScanSource.newEmpty();
for (int i = 0; i < assignJoinBuckets.size(); i++) {
BucketScanSource assignedJoinBucket = (BucketScanSource) assignJoinBuckets.get(i);
LocalShuffleBucketJoinAssignedJob instance = new LocalShuffleBucketJoinAssignedJob(
instances.size(), shareScanId, false,
context.nextInstanceId(), this, worker,
assignedJoinBucket,
Utils.fastToImmutableSet(assignedJoinBucket.bucketIndexToScanNodeToTablets.keySet())
);
instances.add(instance);
}

for (int i = 0; i < instanceNum; i++) {
ScanSource emptyShareScanSource = shareScanSource.newEmpty();
for (int i = assignJoinBuckets.size(); i < instanceNum; i++) {
LocalShuffleBucketJoinAssignedJob instance = new LocalShuffleBucketJoinAssignedJob(
instances.size(), shareScanId, i > 0,
instances.size(), shareScanId, true,
context.nextInstanceId(), this, worker,
i == 0 ? shareScanSource : emptyShareScanSource,
i == 0 ? Utils.fastToImmutableSet(firstInstanceAssignedJoinBuckets) : ImmutableSet.of()
emptyShareScanSource,
ImmutableSet.of()
);
instances.add(instance);
}
Expand Down

0 comments on commit 6ab1b6f

Please sign in to comment.