Skip to content

Commit

Permalink
fix bucket_seq_to_instance_index
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Oct 17, 2024
1 parent 775c14c commit 304a8fc
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -472,12 +472,16 @@ private static Map<Integer, Integer> computeBucketIdToInstanceId(PipelineDistrib
}

Map<Integer, Integer> bucketIdToInstanceId = Maps.newLinkedHashMap();
filterInstancesWhichReceiveDataFromRemote(receivePlan, (instanceJob, instanceIdInThisBackend) -> {
BucketScanSource scanSource = (BucketScanSource) instanceJob.getScanSource();
for (Integer bucketIndex : scanSource.bucketIndexToScanNodeToTablets.keySet()) {
bucketIdToInstanceId.put(bucketIndex, instanceIdInThisBackend);
for (AssignedJob instanceJob : instanceJobs) {
if (instanceJob instanceof LocalShuffleAssignedJob
&& ((LocalShuffleAssignedJob) instanceJob).receiveDataFromLocal) {
continue;
}
});
BucketScanSource bucketScanSource = (BucketScanSource) instanceJob.getScanSource();
for (Integer bucketIndex : bucketScanSource.bucketIndexToScanNodeToTablets.keySet()) {
bucketIdToInstanceId.put(bucketIndex, instanceJob.indexInUnassignedJob());
}
}
return bucketIdToInstanceId;
}

Expand Down

0 comments on commit 304a8fc

Please sign in to comment.