Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Jun 6, 2024
1 parent f8fd405 commit ff09749
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@

package org.apache.doris.nereids.worker.job;

import org.apache.doris.common.util.ListUtil;
import org.apache.doris.planner.ScanNode;

import com.clearspring.analytics.util.Lists;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

Expand All @@ -34,10 +41,51 @@ public BucketScanSource(Map<Integer, Map<ScanNode, ScanRanges>> bucketIndexToSca
}

@Override
int maxParallel(ScanNode scanNode) {
public int maxParallel(ScanNode scanNode) {
return bucketIndexToScanNodeToTablets.size();
}

@Override
public List<ScanSource> parallelize(ScanNode scanNode, int instanceNum) {
// collect the scan ranges about current scan nodes
List<Entry<Integer, ScanRanges>> bucketIndexToScanRanges
= Lists.newArrayList(getBucketIndexToScanRanges(scanNode).entrySet());

// split to some instance scan sources
List<List<Entry<Integer, ScanRanges>>> scanBucketsPerInstance
= ListUtil.splitBySize(bucketIndexToScanRanges, instanceNum);

// rebuild BucketScanSource for each instance
ImmutableList.Builder<ScanSource> instancesScanSource = ImmutableList.builder();
for (List<Entry<Integer, ScanRanges>> oneInstanceScanBuckets : scanBucketsPerInstance) {
ImmutableMap.Builder<Integer, Map<ScanNode, ScanRanges>> bucketsScanSources = ImmutableMap.builder();
for (Entry<Integer, ScanRanges> bucketIndexToScanRange : oneInstanceScanBuckets) {
Integer bucketIndex = bucketIndexToScanRange.getKey();
ScanRanges scanRanges = bucketIndexToScanRange.getValue();
bucketsScanSources.put(bucketIndex, ImmutableMap.of(scanNode, scanRanges));
}

instancesScanSource.add(new BucketScanSource(
bucketsScanSources.build()
));
}
return instancesScanSource.build();
}

public Map<Integer, ScanRanges> getBucketIndexToScanRanges(ScanNode scanNode) {
Map<Integer, ScanRanges> bucketIndexToScanRanges = Maps.newLinkedHashMap();
for (Entry<Integer, Map<ScanNode, ScanRanges>> entry : bucketIndexToScanNodeToTablets.entrySet()) {
Integer bucketIndex = entry.getKey();
Map<ScanNode, ScanRanges> scanNodeToScanRanges = entry.getValue();
ScanRanges scanRanges = scanNodeToScanRanges.get(scanNode);
if (scanRanges != null) {
bucketIndexToScanRanges.put(bucketIndex, scanRanges);
}
}

return bucketIndexToScanRanges;
}

/** toString */
public void toString(StringBuilder str, String prefix) {
int i = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

import org.apache.doris.planner.ScanNode;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

Expand All @@ -34,14 +38,33 @@ public DefaultScanSource(Map<ScanNode, ScanRanges> scanNodeToScanRanges) {
}

@Override
int maxParallel(ScanNode scanNode) {
public int maxParallel(ScanNode scanNode) {
ScanRanges scanRanges = scanNodeToScanRanges.get(scanNode);
if (scanRanges != null) {
return scanRanges.params.size();
}
return 0;
}

@Override
public List<ScanSource> parallelize(ScanNode scanNode, int instanceNum) {
ScanRanges scanRanges = scanNodeToScanRanges.get(scanNode);
if (scanRanges == null) {
return ImmutableList.of();
}

List<ScanRanges> scanRangesPerInstance = scanRanges.split(instanceNum);

ImmutableList.Builder<ScanSource> instancesSource
= ImmutableList.builderWithExpectedSize(scanRangesPerInstance.size());
for (ScanRanges oneInstanceScanRanges : scanRangesPerInstance) {
instancesSource.add(
new DefaultScanSource(ImmutableMap.of(scanNode, oneInstanceScanRanges))
);
}
return instancesSource.build();
}

@Override
public void toString(StringBuilder str, String prefix) {
toString(scanNodeToScanRanges, str, prefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ public String toString() {
return str.toString();
}

abstract int maxParallel(ScanNode scanNode);
public abstract int maxParallel(ScanNode scanNode);

abstract List<ScanSource> parallelize(ScanNode scanNode, int instanceNum);
public abstract List<ScanSource> parallelize(ScanNode scanNode, int instanceNum);

abstract void toString(StringBuilder str, String prefix);
public abstract void toString(StringBuilder str, String prefix);
}

0 comments on commit ff09749

Please sign in to comment.