From ff0974954f2fc9669105af61eb8c227c923c90cf Mon Sep 17 00:00:00 2001 From: 924060929 <924060929@qq.com> Date: Thu, 6 Jun 2024 23:19:52 +0800 Subject: [PATCH] fix --- .../nereids/worker/job/BucketScanSource.java | 50 ++++++++++++++++++- .../nereids/worker/job/DefaultScanSource.java | 25 +++++++++- .../doris/nereids/worker/job/ScanSource.java | 6 +-- 3 files changed, 76 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/BucketScanSource.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/BucketScanSource.java index bd2d37dd2b4fdde..b05df060ea42942 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/BucketScanSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/BucketScanSource.java @@ -17,8 +17,15 @@ package org.apache.doris.nereids.worker.job; +import org.apache.doris.common.util.ListUtil; import org.apache.doris.planner.ScanNode; +import com.clearspring.analytics.util.Lists; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; + +import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -34,10 +41,51 @@ public BucketScanSource(Map> bucketIndexToSca } @Override - int maxParallel(ScanNode scanNode) { + public int maxParallel(ScanNode scanNode) { return bucketIndexToScanNodeToTablets.size(); } + @Override + public List parallelize(ScanNode scanNode, int instanceNum) { + // collect the scan ranges about current scan nodes + List> bucketIndexToScanRanges + = Lists.newArrayList(getBucketIndexToScanRanges(scanNode).entrySet()); + + // split to some instance scan sources + List>> scanBucketsPerInstance + = ListUtil.splitBySize(bucketIndexToScanRanges, instanceNum); + + // rebuild BucketScanSource for each instance + ImmutableList.Builder instancesScanSource = ImmutableList.builder(); + for (List> oneInstanceScanBuckets : scanBucketsPerInstance) { + ImmutableMap.Builder> bucketsScanSources = ImmutableMap.builder(); + for (Entry bucketIndexToScanRange : oneInstanceScanBuckets) { + Integer bucketIndex = bucketIndexToScanRange.getKey(); + ScanRanges scanRanges = bucketIndexToScanRange.getValue(); + bucketsScanSources.put(bucketIndex, ImmutableMap.of(scanNode, scanRanges)); + } + + instancesScanSource.add(new BucketScanSource( + bucketsScanSources.build() + )); + } + return instancesScanSource.build(); + } + + public Map getBucketIndexToScanRanges(ScanNode scanNode) { + Map bucketIndexToScanRanges = Maps.newLinkedHashMap(); + for (Entry> entry : bucketIndexToScanNodeToTablets.entrySet()) { + Integer bucketIndex = entry.getKey(); + Map scanNodeToScanRanges = entry.getValue(); + ScanRanges scanRanges = scanNodeToScanRanges.get(scanNode); + if (scanRanges != null) { + bucketIndexToScanRanges.put(bucketIndex, scanRanges); + } + } + + return bucketIndexToScanRanges; + } + /** toString */ public void toString(StringBuilder str, String prefix) { int i = 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DefaultScanSource.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DefaultScanSource.java index 1be94aba953b9e5..fd7741ae855fac8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DefaultScanSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DefaultScanSource.java @@ -19,6 +19,10 @@ import org.apache.doris.planner.ScanNode; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -34,7 +38,7 @@ public DefaultScanSource(Map scanNodeToScanRanges) { } @Override - int maxParallel(ScanNode scanNode) { + public int maxParallel(ScanNode scanNode) { ScanRanges scanRanges = scanNodeToScanRanges.get(scanNode); if (scanRanges != null) { return scanRanges.params.size(); @@ -42,6 +46,25 @@ int maxParallel(ScanNode scanNode) { return 0; } + @Override + public List parallelize(ScanNode scanNode, int instanceNum) { + ScanRanges scanRanges = scanNodeToScanRanges.get(scanNode); + if (scanRanges == null) { + return ImmutableList.of(); + } + + List scanRangesPerInstance = scanRanges.split(instanceNum); + + ImmutableList.Builder instancesSource + = ImmutableList.builderWithExpectedSize(scanRangesPerInstance.size()); + for (ScanRanges oneInstanceScanRanges : scanRangesPerInstance) { + instancesSource.add( + new DefaultScanSource(ImmutableMap.of(scanNode, oneInstanceScanRanges)) + ); + } + return instancesSource.build(); + } + @Override public void toString(StringBuilder str, String prefix) { toString(scanNodeToScanRanges, str, prefix); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanSource.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanSource.java index 6ec612f97894236..b1414409341c3dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanSource.java @@ -30,9 +30,9 @@ public String toString() { return str.toString(); } - abstract int maxParallel(ScanNode scanNode); + public abstract int maxParallel(ScanNode scanNode); - abstract List parallelize(ScanNode scanNode, int instanceNum); + public abstract List parallelize(ScanNode scanNode, int instanceNum); - abstract void toString(StringBuilder str, String prefix); + public abstract void toString(StringBuilder str, String prefix); }