Skip to content

Commit

Permalink
remote table
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed May 31, 2024
1 parent 2d70281 commit e4cea15
Showing 1 changed file with 14 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,36 +103,36 @@ private Function<ScanNode, Map<Integer, Long>> bucketBytesSupplier() {
}

private Map<Worker, Map<Integer, Map<ScanNode, ScanRanges>>> selectForBucket(
UnassignedJob unassignedJob, List<ScanNode> olapScanNodes,
UnassignedJob unassignedJob, List<ScanNode> scanNodes,
BiFunction<ScanNode, Integer, List<TScanRangeLocations>> bucketScanRangeSupplier,
Function<ScanNode, Map<Integer, Long>> bucketBytesSupplier) {
Map<Worker, Map<Integer, Map<ScanNode, ScanRanges>>> assignment = Maps.newLinkedHashMap();

Map<Integer, Long> bucketIndexToBytes =
computeEachBucketScanBytes(unassignedJob.getFragment(), olapScanNodes, bucketBytesSupplier);
computeEachBucketScanBytes(unassignedJob.getFragment(), scanNodes, bucketBytesSupplier);

ScanNode firstOlapScanNode = olapScanNodes.get(0);
ScanNode firstScanNode = scanNodes.get(0);
for (Entry<Integer, Long> kv : bucketIndexToBytes.entrySet()) {
Integer bucketIndex = kv.getKey();
long allScanNodeScanBytesInOneBucket = kv.getValue();

List<TScanRangeLocations> allPartitionTabletsInOneBucketInFirstTable
= bucketScanRangeSupplier.apply(firstOlapScanNode, bucketIndex);
= bucketScanRangeSupplier.apply(firstScanNode, bucketIndex);
SelectResult replicaAndWorker = selectScanReplicaAndMinWorkloadWorker(
allPartitionTabletsInOneBucketInFirstTable.get(0), allScanNodeScanBytesInOneBucket);
Worker selectedWorker = replicaAndWorker.selectWorker;
long workerId = selectedWorker.id();
for (ScanNode olapScanNode : olapScanNodes) {
for (ScanNode scanNode : scanNodes) {
List<TScanRangeLocations> allPartitionTabletsInOneBucket
= bucketScanRangeSupplier.apply(olapScanNode, bucketIndex);
= bucketScanRangeSupplier.apply(scanNode, bucketIndex);
List<Pair<TScanRangeParams, Long>> selectedReplicasInOneBucket = filterReplicaByWorkerInBucket(
olapScanNode, workerId, bucketIndex, allPartitionTabletsInOneBucket
scanNode, workerId, bucketIndex, allPartitionTabletsInOneBucket
);
Map<Integer, Map<ScanNode, ScanRanges>> bucketIndexToScanNodeToTablets
= assignment.computeIfAbsent(selectedWorker, worker -> Maps.newLinkedHashMap());
Map<ScanNode, ScanRanges> scanNodeToScanRanges = bucketIndexToScanNodeToTablets
.computeIfAbsent(bucketIndex, bucket -> Maps.newLinkedHashMap());
ScanRanges scanRanges = scanNodeToScanRanges.computeIfAbsent(olapScanNode, node -> new ScanRanges());
ScanRanges scanRanges = scanNodeToScanRanges.computeIfAbsent(scanNode, node -> new ScanRanges());
for (Pair<TScanRangeParams, Long> replica : selectedReplicasInOneBucket) {
TScanRangeParams replicaParam = replica.first;
Long scanBytes = replica.second;
Expand Down Expand Up @@ -214,7 +214,7 @@ private List<OlapScanNode> filterOlapScanNodes(List<ScanNode> scanNodes) {
}

private List<Pair<TScanRangeParams, Long>> filterReplicaByWorkerInBucket(
ScanNode olapScanNode, long filterWorkerId, int bucketIndex,
ScanNode scanNode, long filterWorkerId, int bucketIndex,
List<TScanRangeLocations> allPartitionTabletsInOneBucket) {
List<Pair<TScanRangeParams, Long>> selectedReplicasInOneBucket = Lists.newArrayList();
for (TScanRangeLocations onePartitionOneTabletLocation : allPartitionTabletsInOneBucket) {
Expand All @@ -225,7 +225,7 @@ private List<Pair<TScanRangeParams, Long>> filterReplicaByWorkerInBucket(
if (replicaLocation.getBackendId() == filterWorkerId) {
TScanRangeParams scanReplicaParams =
buildScanReplicaParams(onePartitionOneTabletLocation, replicaLocation);
Long replicaSize = ((OlapScanNode) olapScanNode).getTabletSingleReplicaSize(tabletId);
Long replicaSize = ((OlapScanNode) scanNode).getTabletSingleReplicaSize(tabletId);
selectedReplicasInOneBucket.add(Pair.of(scanReplicaParams, replicaSize));
break;
}
Expand All @@ -245,15 +245,14 @@ private List<Pair<TScanRangeParams, Long>> filterReplicaByWorkerInBucket(
}

private Map<Integer, Long> computeEachBucketScanBytes(
PlanFragment fragment, List<ScanNode> olapScanNodes,
PlanFragment fragment, List<ScanNode> scanNodes,
Function<ScanNode, Map<Integer, Long>> bucketBytesSupplier) {
Map<Integer, Long> bucketIndexToBytes = Maps.newLinkedHashMap();
for (ScanNode olapScanNode : olapScanNodes) {
Map<Integer, Long> bucketSeq2Bytes = bucketBytesSupplier.apply(olapScanNode);
// Set<Entry<Integer, Long>> bucketSeq2Bytes = olapScanNode.bucketSeq2Bytes.entrySet();
for (ScanNode scanNode : scanNodes) {
Map<Integer, Long> bucketSeq2Bytes = bucketBytesSupplier.apply(scanNode);
if (!bucketIndexToBytes.isEmpty() && bucketIndexToBytes.size() != bucketSeq2Bytes.size()) {
throw new IllegalStateException("Illegal fragment " + fragment.getFragmentId()
+ ", every OlapScanNode should has same bucket num");
+ ", every ScanNode should has same bucket num");
}

for (Entry<Integer, Long> bucketSeq2Byte : bucketSeq2Bytes.entrySet()) {
Expand Down

0 comments on commit e4cea15

Please sign in to comment.