From 6ab1b6f87f952f81ddf71bec75b32cc3bf6c178b Mon Sep 17 00:00:00 2001 From: 924060929 Date: Wed, 18 Dec 2024 16:29:12 +0800 Subject: [PATCH] fix --- .../worker/job/AbstractUnassignedScanJob.java | 3 +- .../worker/job/LocalShuffleAssignedJob.java | 2 +- .../job/UnassignedScanBucketOlapTableJob.java | 30 ++++++++++--------- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java index cdc7e670e89ac01..5f5afb741226f41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java @@ -87,11 +87,12 @@ protected List insideMachineParallelization( ScanSource scanSource = entry.getValue().scanSource; // usually, its tablets num, or buckets num - int scanSourceMaxParallel = Math.max(scanSource.maxParallel(scanNodes), 1); + int scanSourceMaxParallel = scanSource.maxParallel(scanNodes); // now we should compute how many instances to process the data, // for example: two instances int instanceNum = degreeOfParallelism(scanSourceMaxParallel, useLocalShuffleToAddParallel); + if (useLocalShuffleToAddParallel) { assignLocalShuffleJobs(scanSource, instanceNum, instances, context, worker); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java index 2ba269a5a7b89fb..9184893aba287f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java @@ -50,7 +50,7 @@ protected Map extraInfo() { @Override protected String formatScanSourceString() { if (receiveDataFromLocal) { - return "read data from first instance of " + getAssignedWorker(); + return "read data from other instances"; } else { return super.formatScanSourceString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java index da361b8e8c53ec3..8a0f18c00406720 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java @@ -170,26 +170,28 @@ protected List insideMachineParallelization( protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, List instances, ConnectContext context, DistributedPlanWorker worker) { // only generate one instance to scan all data, in this step - List assignJoinBuckets = scanSource.parallelize( - scanNodes, 1 - ); + List assignJoinBuckets = scanSource.parallelize(scanNodes, instanceNum); - // one scan range generate multiple instances, - // different instances reference the same scan source int shareScanId = shareScanIdGenerator.getAndIncrement(); - - Set firstInstanceAssignedJoinBuckets - = ((BucketScanSource) assignJoinBuckets.get(0)).bucketIndexToScanNodeToTablets.keySet(); - BucketScanSource shareScanSource = (BucketScanSource) scanSource; - ScanSource emptyShareScanSource = shareScanSource.newEmpty(); + for (int i = 0; i < assignJoinBuckets.size(); i++) { + BucketScanSource assignedJoinBucket = (BucketScanSource) assignJoinBuckets.get(i); + LocalShuffleBucketJoinAssignedJob instance = new LocalShuffleBucketJoinAssignedJob( + instances.size(), shareScanId, false, + context.nextInstanceId(), this, worker, + assignedJoinBucket, + Utils.fastToImmutableSet(assignedJoinBucket.bucketIndexToScanNodeToTablets.keySet()) + ); + instances.add(instance); + } - for (int i = 0; i < instanceNum; i++) { + ScanSource emptyShareScanSource = shareScanSource.newEmpty(); + for (int i = assignJoinBuckets.size(); i < instanceNum; i++) { LocalShuffleBucketJoinAssignedJob instance = new LocalShuffleBucketJoinAssignedJob( - instances.size(), shareScanId, i > 0, + instances.size(), shareScanId, true, context.nextInstanceId(), this, worker, - i == 0 ? shareScanSource : emptyShareScanSource, - i == 0 ? Utils.fastToImmutableSet(firstInstanceAssignedJoinBuckets) : ImmutableSet.of() + emptyShareScanSource, + ImmutableSet.of() ); instances.add(instance); }