From 304a8fcaf929280f95f7d5500bd8dc144e16dc8f Mon Sep 17 00:00:00 2001 From: 924060929 <924060929@qq.com> Date: Thu, 17 Oct 2024 21:44:51 +0800 Subject: [PATCH] fix bucket_seq_to_instance_index --- .../doris/qe/runtime/ThriftPlansBuilder.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java index b18404fb9194311..9bf1014b223f041 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java @@ -472,12 +472,16 @@ private static Map computeBucketIdToInstanceId(PipelineDistrib } Map bucketIdToInstanceId = Maps.newLinkedHashMap(); - filterInstancesWhichReceiveDataFromRemote(receivePlan, (instanceJob, instanceIdInThisBackend) -> { - BucketScanSource scanSource = (BucketScanSource) instanceJob.getScanSource(); - for (Integer bucketIndex : scanSource.bucketIndexToScanNodeToTablets.keySet()) { - bucketIdToInstanceId.put(bucketIndex, instanceIdInThisBackend); + for (AssignedJob instanceJob : instanceJobs) { + if (instanceJob instanceof LocalShuffleAssignedJob + && ((LocalShuffleAssignedJob) instanceJob).receiveDataFromLocal) { + continue; } - }); + BucketScanSource bucketScanSource = (BucketScanSource) instanceJob.getScanSource(); + for (Integer bucketIndex : bucketScanSource.bucketIndexToScanNodeToTablets.keySet()) { + bucketIdToInstanceId.put(bucketIndex, instanceJob.indexInUnassignedJob()); + } + } return bucketIdToInstanceId; }