Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Jun 6, 2024
1 parent 94ef2fe commit 072eff2
Showing 1 changed file with 2 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public List<AssignedJob> computeAssignedJobs(
// When group by cardinality is smaller than number of backend, only some backends always
// process while other has no data to process.
// So we shuffle instances to make different backends handle different queries.
List<Worker> shuffleWorkersInBiggestParallelChildFragment = shuffleWorkers(biggestParallelChildFragment);
List<Worker> shuffleWorkersInBiggestParallelChildFragment = distinctShuffleWorkers(biggestParallelChildFragment);
Function<Integer, Worker> workerSelector = instanceIndex -> {
int selectIndex = instanceIndex % shuffleWorkersInBiggestParallelChildFragment.size();
return shuffleWorkersInBiggestParallelChildFragment.get(selectIndex);
Expand Down Expand Up @@ -114,7 +114,7 @@ i, selectedWorker, new DefaultScanSource(ImmutableMap.of())
return instances.build();
}

private List<Worker> shuffleWorkers(List<AssignedJob> instances) {
private List<Worker> distinctShuffleWorkers(List<AssignedJob> instances) {
Set<Worker> candidateWorkerSet = Sets.newLinkedHashSet();
for (AssignedJob instance : instances) {
candidateWorkerSet.add(instance.getAssignedWorker());
Expand Down

0 comments on commit 072eff2

Please sign in to comment.