From 3a44f7d66c23a4030fb752140963f2734320674c Mon Sep 17 00:00:00 2001 From: 924060929 Date: Wed, 18 Dec 2024 16:36:01 +0800 Subject: [PATCH] fix --- .../worker/job/AbstractUnassignedScanJob.java | 6 +----- .../worker/job/LocalShuffleAssignedJob.java | 13 +------------ .../job/LocalShuffleBucketJoinAssignedJob.java | 4 ++-- .../distribute/worker/job/UnassignedGatherJob.java | 6 ++---- .../job/UnassignedScanBucketOlapTableJob.java | 10 ++++------ .../distribute/worker/job/UnassignedShuffleJob.java | 4 +--- 6 files changed, 11 insertions(+), 32 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java index 5f5afb741226f41..51c70798ac8b3c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java @@ -35,12 +35,9 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.atomic.AtomicInteger; /** AbstractUnassignedScanJob */ public abstract class AbstractUnassignedScanJob extends AbstractUnassignedJob { - protected final AtomicInteger shareScanIdGenerator = new AtomicInteger(); - public AbstractUnassignedScanJob(StatementContext statementContext, PlanFragment fragment, List scanNodes, ListMultimap exchangeToChildJob) { super(statementContext, fragment, scanNodes, exchangeToChildJob); @@ -148,11 +145,10 @@ protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, Li // one scan range generate multiple instances, // different instances reference the same scan source - int shareScanId = shareScanIdGenerator.getAndIncrement(); ScanSource emptyShareScanSource = shareScanSource.newEmpty(); for (int i = 0; i < instanceNum; i++) { LocalShuffleAssignedJob instance = new LocalShuffleAssignedJob( - instances.size(), shareScanId, i > 0, + instances.size(), i > 0, context.nextInstanceId(), this, worker, i == 0 ? shareScanSource : emptyShareScanSource ); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java index 9184893aba287f9..3e3bbd92101cffb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java @@ -20,33 +20,22 @@ import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; import org.apache.doris.thrift.TUniqueId; -import com.google.common.collect.ImmutableMap; - -import java.util.Map; - /** * LocalShuffleAssignedJob: * this instance will use ignore_data_distribution function of local shuffle to add parallel * after scan data */ public class LocalShuffleAssignedJob extends StaticAssignedJob { - public final int shareScanId; public final boolean receiveDataFromLocal; public LocalShuffleAssignedJob( - int indexInUnassignedJob, int shareScanId, boolean receiveDataFromLocal, TUniqueId instanceId, + int indexInUnassignedJob, boolean receiveDataFromLocal, TUniqueId instanceId, UnassignedJob unassignedJob, DistributedPlanWorker worker, ScanSource scanSource) { super(indexInUnassignedJob, instanceId, unassignedJob, worker, scanSource); - this.shareScanId = shareScanId; this.receiveDataFromLocal = receiveDataFromLocal; } - @Override - protected Map extraInfo() { - return ImmutableMap.of("shareScanIndex", String.valueOf(shareScanId)); - } - @Override protected String formatScanSourceString() { if (receiveDataFromLocal) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java index 443acb50d78c787..60c31b2c5e5b82d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java @@ -30,11 +30,11 @@ public class LocalShuffleBucketJoinAssignedJob extends LocalShuffleAssignedJob { private volatile Set assignedJoinBucketIndexes; public LocalShuffleBucketJoinAssignedJob( - int indexInUnassignedJob, int shareScanId, boolean receiveDataFromLocal, + int indexInUnassignedJob, boolean receiveDataFromLocal, TUniqueId instanceId, UnassignedJob unassignedJob, DistributedPlanWorker worker, ScanSource scanSource, Set assignedJoinBucketIndexes) { - super(indexInUnassignedJob, shareScanId, receiveDataFromLocal, instanceId, unassignedJob, worker, scanSource); + super(indexInUnassignedJob, receiveDataFromLocal, instanceId, unassignedJob, worker, scanSource); this.assignedJoinBucketIndexes = Utils.fastToImmutableSet(assignedJoinBucketIndexes); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java index 830342514bd387e..2da339514ff044d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java @@ -54,14 +54,12 @@ public List computeAssignedJobs( DefaultScanSource shareScan = new DefaultScanSource(ImmutableMap.of()); LocalShuffleAssignedJob receiveDataFromRemote = new LocalShuffleAssignedJob( - 0, 0, false, - connectContext.nextInstanceId(), this, selectedWorker, shareScan); + 0, false, connectContext.nextInstanceId(), this, selectedWorker, shareScan); instances.add(receiveDataFromRemote); for (int i = 1; i < expectInstanceNum; ++i) { LocalShuffleAssignedJob receiveDataFromLocal = new LocalShuffleAssignedJob( - i, 0, true, - connectContext.nextInstanceId(), this, selectedWorker, shareScan); + i, true, connectContext.nextInstanceId(), this, selectedWorker, shareScan); instances.add(receiveDataFromLocal); } return instances.build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java index 8a0f18c00406720..4e7622f3252768f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java @@ -172,12 +172,11 @@ protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, Li // only generate one instance to scan all data, in this step List assignJoinBuckets = scanSource.parallelize(scanNodes, instanceNum); - int shareScanId = shareScanIdGenerator.getAndIncrement(); BucketScanSource shareScanSource = (BucketScanSource) scanSource; for (int i = 0; i < assignJoinBuckets.size(); i++) { BucketScanSource assignedJoinBucket = (BucketScanSource) assignJoinBuckets.get(i); LocalShuffleBucketJoinAssignedJob instance = new LocalShuffleBucketJoinAssignedJob( - instances.size(), shareScanId, false, + instances.size(), false, context.nextInstanceId(), this, worker, assignedJoinBucket, Utils.fastToImmutableSet(assignedJoinBucket.bucketIndexToScanNodeToTablets.keySet()) @@ -188,7 +187,7 @@ protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, Li ScanSource emptyShareScanSource = shareScanSource.newEmpty(); for (int i = assignJoinBuckets.size(); i < instanceNum; i++) { LocalShuffleBucketJoinAssignedJob instance = new LocalShuffleBucketJoinAssignedJob( - instances.size(), shareScanId, true, + instances.size(), true, context.nextInstanceId(), this, worker, emptyShareScanSource, ImmutableSet.of() @@ -259,9 +258,8 @@ private List fillUpInstances(List instances) { } if (!mergedBucketsInSameWorkerInstance) { fillUpInstance = new LocalShuffleBucketJoinAssignedJob( - newInstances.size(), shareScanIdGenerator.getAndIncrement(), - false, context.nextInstanceId(), this, worker, scanSource, - assignedJoinBuckets + newInstances.size(), false, context.nextInstanceId(), + this, worker, scanSource, assignedJoinBuckets ); } } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java index ebad6ffdefff1c4..f8d326e9eb69a84 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java @@ -141,7 +141,6 @@ private List buildInstancesWithLocalShuffle( workerToInstanceIds.put(selectedWorker, i); } - int shareScanId = 0; for (Entry> kv : workerToInstanceIds.asMap().entrySet()) { DistributedPlanWorker worker = kv.getKey(); Collection indexesInFragment = kv.getValue(); @@ -151,13 +150,12 @@ private List buildInstancesWithLocalShuffle( boolean receiveDataFromLocal = false; for (Integer indexInFragment : indexesInFragment) { LocalShuffleAssignedJob instance = new LocalShuffleAssignedJob( - indexInFragment, shareScanId, receiveDataFromLocal, connectContext.nextInstanceId(), + indexInFragment, receiveDataFromLocal, connectContext.nextInstanceId(), this, worker, shareScanSource ); instances.add(instance); receiveDataFromLocal = true; } - shareScanId++; } return instances.build(); }