diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java index 99fac0be36add7..cfc4478b9fc882 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java @@ -103,36 +103,36 @@ private Function> bucketBytesSupplier() { } private Map>> selectForBucket( - UnassignedJob unassignedJob, List olapScanNodes, + UnassignedJob unassignedJob, List scanNodes, BiFunction> bucketScanRangeSupplier, Function> bucketBytesSupplier) { Map>> assignment = Maps.newLinkedHashMap(); Map bucketIndexToBytes = - computeEachBucketScanBytes(unassignedJob.getFragment(), olapScanNodes, bucketBytesSupplier); + computeEachBucketScanBytes(unassignedJob.getFragment(), scanNodes, bucketBytesSupplier); - ScanNode firstOlapScanNode = olapScanNodes.get(0); + ScanNode firstScanNode = scanNodes.get(0); for (Entry kv : bucketIndexToBytes.entrySet()) { Integer bucketIndex = kv.getKey(); long allScanNodeScanBytesInOneBucket = kv.getValue(); List allPartitionTabletsInOneBucketInFirstTable - = bucketScanRangeSupplier.apply(firstOlapScanNode, bucketIndex); + = bucketScanRangeSupplier.apply(firstScanNode, bucketIndex); SelectResult replicaAndWorker = selectScanReplicaAndMinWorkloadWorker( allPartitionTabletsInOneBucketInFirstTable.get(0), allScanNodeScanBytesInOneBucket); Worker selectedWorker = replicaAndWorker.selectWorker; long workerId = selectedWorker.id(); - for (ScanNode olapScanNode : olapScanNodes) { + for (ScanNode scanNode : scanNodes) { List allPartitionTabletsInOneBucket - = bucketScanRangeSupplier.apply(olapScanNode, bucketIndex); + = bucketScanRangeSupplier.apply(scanNode, bucketIndex); List> selectedReplicasInOneBucket = filterReplicaByWorkerInBucket( - olapScanNode, workerId, bucketIndex, allPartitionTabletsInOneBucket + scanNode, workerId, bucketIndex, allPartitionTabletsInOneBucket ); Map> bucketIndexToScanNodeToTablets = assignment.computeIfAbsent(selectedWorker, worker -> Maps.newLinkedHashMap()); Map scanNodeToScanRanges = bucketIndexToScanNodeToTablets .computeIfAbsent(bucketIndex, bucket -> Maps.newLinkedHashMap()); - ScanRanges scanRanges = scanNodeToScanRanges.computeIfAbsent(olapScanNode, node -> new ScanRanges()); + ScanRanges scanRanges = scanNodeToScanRanges.computeIfAbsent(scanNode, node -> new ScanRanges()); for (Pair replica : selectedReplicasInOneBucket) { TScanRangeParams replicaParam = replica.first; Long scanBytes = replica.second; @@ -214,7 +214,7 @@ private List filterOlapScanNodes(List scanNodes) { } private List> filterReplicaByWorkerInBucket( - ScanNode olapScanNode, long filterWorkerId, int bucketIndex, + ScanNode scanNode, long filterWorkerId, int bucketIndex, List allPartitionTabletsInOneBucket) { List> selectedReplicasInOneBucket = Lists.newArrayList(); for (TScanRangeLocations onePartitionOneTabletLocation : allPartitionTabletsInOneBucket) { @@ -225,7 +225,7 @@ private List> filterReplicaByWorkerInBucket( if (replicaLocation.getBackendId() == filterWorkerId) { TScanRangeParams scanReplicaParams = buildScanReplicaParams(onePartitionOneTabletLocation, replicaLocation); - Long replicaSize = ((OlapScanNode) olapScanNode).getTabletSingleReplicaSize(tabletId); + Long replicaSize = ((OlapScanNode) scanNode).getTabletSingleReplicaSize(tabletId); selectedReplicasInOneBucket.add(Pair.of(scanReplicaParams, replicaSize)); break; } @@ -245,15 +245,14 @@ private List> filterReplicaByWorkerInBucket( } private Map computeEachBucketScanBytes( - PlanFragment fragment, List olapScanNodes, + PlanFragment fragment, List scanNodes, Function> bucketBytesSupplier) { Map bucketIndexToBytes = Maps.newLinkedHashMap(); - for (ScanNode olapScanNode : olapScanNodes) { - Map bucketSeq2Bytes = bucketBytesSupplier.apply(olapScanNode); - // Set> bucketSeq2Bytes = olapScanNode.bucketSeq2Bytes.entrySet(); + for (ScanNode scanNode : scanNodes) { + Map bucketSeq2Bytes = bucketBytesSupplier.apply(scanNode); if (!bucketIndexToBytes.isEmpty() && bucketIndexToBytes.size() != bucketSeq2Bytes.size()) { throw new IllegalStateException("Illegal fragment " + fragment.getFragmentId() - + ", every OlapScanNode should has same bucket num"); + + ", every ScanNode should has same bucket num"); } for (Entry bucketSeq2Byte : bucketSeq2Bytes.entrySet()) {