Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Jul 10, 2024
1 parent 08a0213 commit ab3ecd0
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,16 @@ public List<AssignedJob> computeAssignedJobs(DistributedPlanWorkerManager worker
Map<DistributedPlanWorker, UninstancedScanSource> workerToScanSource = multipleMachinesParallelization(
workerManager, inputJobs);

return insideMachineParallelization(workerToScanSource, inputJobs, workerManager);
List<AssignedJob> assignedJobs = insideMachineParallelization(workerToScanSource, inputJobs, workerManager);

return fillUpAssignedJobs(assignedJobs, workerManager, inputJobs);
}

protected List<AssignedJob> fillUpAssignedJobs(
List<AssignedJob> assignedJobs,
DistributedPlanWorkerManager workerManager,
ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
return assignedJobs;
}

protected abstract Map<DistributedPlanWorker, UninstancedScanSource> multipleMachinesParallelization(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,8 @@ protected Map<DistributedPlanWorker, UninstancedScanSource> multipleMachinesPara
}

@Override
protected List<AssignedJob> insideMachineParallelization(
Map<DistributedPlanWorker, UninstancedScanSource> workerToScanRanges,
ListMultimap<ExchangeNode, AssignedJob> inputJobs, DistributedPlanWorkerManager workerManager) {
List<AssignedJob> assignedJobs = super.insideMachineParallelization(
workerToScanRanges, inputJobs, workerManager);
protected List<AssignedJob> fillUpAssignedJobs(List<AssignedJob> assignedJobs,
DistributedPlanWorkerManager workerManager, ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
if (assignedJobs.isEmpty()) {
return fillUpSingleEmptyInstance(workerManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,12 @@ protected List<AssignedJob> insideMachineParallelization(
// instance 5: olapScanNode1: ScanRanges([tablet_10007])
// ],
// }
List<AssignedJob> assignedJobs = super.insideMachineParallelization(
workerToScanRanges, inputJobs, workerManager);
return super.insideMachineParallelization(workerToScanRanges, inputJobs, workerManager);
}

@Override
protected List<AssignedJob> fillUpAssignedJobs(List<AssignedJob> assignedJobs,
DistributedPlanWorkerManager workerManager, ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
if (assignedJobs.isEmpty()) {
// the tablets have pruned, so no assignedJobs,
// we should allocate an instance of it,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,8 @@ protected Map<DistributedPlanWorker, UninstancedScanSource> multipleMachinesPara
}

@Override
protected List<AssignedJob> insideMachineParallelization(
Map<DistributedPlanWorker, UninstancedScanSource> workerToScanRanges,
ListMultimap<ExchangeNode, AssignedJob> inputJobs, DistributedPlanWorkerManager workerManager) {
List<AssignedJob> assignedJobs = super.insideMachineParallelization(
workerToScanRanges, inputJobs, workerManager);
protected List<AssignedJob> fillUpAssignedJobs(List<AssignedJob> assignedJobs,
DistributedPlanWorkerManager workerManager, ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
if (assignedJobs.isEmpty()) {
// the file scan have pruned, so no assignedJobs,
// we should allocate an instance of it,
Expand Down

0 comments on commit ab3ecd0

Please sign in to comment.