From e007125bab33503cc02f73ecc39d69df63e9e99a Mon Sep 17 00:00:00 2001 From: 924060929 <924060929@qq.com> Date: Thu, 27 Jun 2024 19:40:55 +0800 Subject: [PATCH] fix by comments --- .../apache/doris/common/profile/Profile.java | 1 - .../ChildrenPropertiesRegulator.java | 8 ++-- .../expressions/functions/table/Numbers.java | 10 ---- .../functions/table/TableValuedFunction.java | 4 ++ .../distribute}/BucketSpecifyInstances.java | 6 +-- .../distribute}/DefaultSpecifyInstances.java | 6 +-- .../plans/distribute/DistributePlanner.java | 8 ++-- .../plans/distribute/DistributedPlan.java | 2 +- .../distribute}/NereidsSpecifyInstances.java | 16 +++---- .../distribute/PipelineDistributedPlan.java | 4 +- .../distribute}/worker/BackendWorker.java | 4 +- .../worker/BackendWorkerManager.java | 6 +-- .../worker/DistributedPlanWorker.java} | 10 ++-- .../worker/LoadBalanceScanWorkerSelector.java | 46 +++++++++---------- .../worker/ScanWorkerSelector.java | 12 ++--- .../distribute}/worker/WorkerManager.java | 6 +-- .../distribute}/worker/WorkerScanRanges.java | 8 ++-- .../plans/distribute}/worker/Workload.java | 2 +- .../worker/job/AbstractUnassignedJob.java | 2 +- .../worker/job/AbstractUnassignedScanJob.java | 24 +++++----- .../distribute}/worker/job/AssignedJob.java | 6 +-- .../worker/job/AssignedJobBuilder.java | 4 +- .../worker/job/BucketScanSource.java | 2 +- .../worker/job/CustomAssignmentJob.java | 2 +- .../worker/job/DefaultScanSource.java | 2 +- .../worker/job/LocalShuffleAssignedJob.java | 6 +-- .../distribute}/worker/job/ScanRange.java | 2 +- .../distribute}/worker/job/ScanRanges.java | 2 +- .../distribute}/worker/job/ScanSource.java | 2 +- .../distribute}/worker/job/Splittable.java | 2 +- .../worker/job/StaticAssignedJob.java | 10 ++-- ...ssignedGatherScanMultiRemoteTablesJob.java | 10 ++-- .../distribute}/worker/job/UnassignedJob.java | 8 ++-- .../worker/job/UnassignedJobBuilder.java | 6 +-- .../job/UnassignedQueryConstantJob.java | 8 ++-- .../job/UnassignedScanBucketOlapTableJob.java | 32 ++++++------- .../job/UnassignedScanSingleOlapTableJob.java | 12 ++--- .../UnassignedScanSingleRemoteTableJob.java | 10 ++-- .../worker/job/UnassignedShuffleJob.java | 22 ++++----- .../job/UnassignedSpecifyInstancesJob.java | 6 +-- .../worker/job/UninstancedScanSource.java | 2 +- .../worker/job/WorkerScanSource.java | 8 ++-- .../apache/doris/planner/PlanFragment.java | 3 +- .../apache/doris/qe/NereidsCoordinator.java | 18 ++++---- .../org/apache/doris/qe/SessionVariable.java | 23 +++++----- 45 files changed, 195 insertions(+), 198 deletions(-) rename fe/fe-core/src/main/java/org/apache/doris/{planner => nereids/trees/plans/distribute}/BucketSpecifyInstances.java (82%) rename fe/fe-core/src/main/java/org/apache/doris/{planner => nereids/trees/plans/distribute}/DefaultSpecifyInstances.java (82%) rename fe/fe-core/src/main/java/org/apache/doris/{planner => nereids/trees/plans/distribute}/NereidsSpecifyInstances.java (76%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/BackendWorker.java (93%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/BackendWorkerManager.java (92%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{worker/Worker.java => trees/plans/distribute/worker/DistributedPlanWorker.java} (76%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/LoadBalanceScanWorkerSelector.java (88%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/ScanWorkerSelector.java (83%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/WorkerManager.java (83%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/WorkerScanRanges.java (80%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/Workload.java (93%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/AbstractUnassignedJob.java (97%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/AbstractUnassignedScanJob.java (88%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/AssignedJob.java (85%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/AssignedJobBuilder.java (95%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/BucketScanSource.java (98%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/CustomAssignmentJob.java (94%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/DefaultScanSource.java (98%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/LocalShuffleAssignedJob.java (87%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/ScanRange.java (92%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/ScanRanges.java (98%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/ScanSource.java (95%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/Splittable.java (96%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/StaticAssignedJob.java (91%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java (89%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/UnassignedJob.java (84%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/UnassignedJobBuilder.java (98%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/UnassignedQueryConstantJob.java (85%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/UnassignedScanBucketOlapTableJob.java (89%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/UnassignedScanSingleOlapTableJob.java (87%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/UnassignedScanSingleRemoteTableJob.java (83%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/UnassignedShuffleJob.java (85%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/UnassignedSpecifyInstancesJob.java (89%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/UninstancedScanSource.java (94%) rename fe/fe-core/src/main/java/org/apache/doris/nereids/{ => trees/plans/distribute}/worker/job/WorkerScanSource.java (78%) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java index a1f5e16c1501f1e..b45527f60bd5969 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java @@ -111,7 +111,6 @@ public synchronized void updateSummary(long startTime, Map summa summaryInfo.put(SummaryProfile.PHYSICAL_PLAN, builder.toString().replace("\n", "\n ")); - FragmentIdMapping distributedPlans = nereidsPlanner.getDistributedPlans(); if (distributedPlans != null) { summaryInfo.put(SummaryProfile.DISTRIBUTED_PLAN, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java index fd2ecbb80dbf08a..f4029b3e7d58e21 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java @@ -214,7 +214,7 @@ private boolean couldNotRightBucketShuffleJoin(JoinType joinType, DistributionSp || joinType == JoinType.FULL_OUTER_JOIN); boolean isSpecInScope = (leftHashSpec.getShuffleType() == ShuffleType.NATURAL || rightHashSpec.getShuffleType() == ShuffleType.NATURAL); - return isJoinTypeInScope && isSpecInScope; + return isJoinTypeInScope && isSpecInScope && !SessionVariable.canUseNereidsDistributePlanner(); } @Override @@ -249,8 +249,7 @@ public Boolean visitPhysicalHashJoin( if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) { // check colocate join with scan return true; - } else if (couldNotRightBucketShuffleJoin(hashJoin.getJoinType(), leftHashSpec, rightHashSpec) - && !SessionVariable.canUseNereidsDistributePlanner()) { + } else if (couldNotRightBucketShuffleJoin(hashJoin.getJoinType(), leftHashSpec, rightHashSpec)) { // right anti, right outer, full outer join could not do bucket shuffle join // TODO remove this after we refactor coordinator updatedForLeft = Optional.of(calAnotherSideRequired( @@ -307,6 +306,9 @@ public Boolean visitPhysicalHashJoin( && rightHashSpec.getShuffleType() == ShuffleType.NATURAL) { if (SessionVariable.canUseNereidsDistributePlanner()) { // nereids coordinator can exchange left side to right side to do bucket shuffle join + // TODO: maybe we should check if left child is PhysicalDistribute. + // If so add storage bucketed shuffle on left side. Other wise, + // add execution bucketed shuffle on right side. updatedForLeft = Optional.of(calAnotherSideRequired( ShuffleType.STORAGE_BUCKETED, rightHashSpec, leftHashSpec, (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Numbers.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Numbers.java index 9be3658e77b42c8..845baa045cc041e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Numbers.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Numbers.java @@ -22,13 +22,11 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.NereidsException; import org.apache.doris.nereids.exceptions.AnalysisException; -import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.Properties; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; import org.apache.doris.nereids.types.BigIntType; -import org.apache.doris.qe.SessionVariable; import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.ColumnStatisticBuilder; import org.apache.doris.statistics.Statistics; @@ -95,14 +93,6 @@ public R accept(ExpressionVisitor visitor, C context) { return visitor.visitNumbers(this, context); } - @Override - public PhysicalProperties getPhysicalProperties() { - if (SessionVariable.canUseNereidsDistributePlanner()) { - return PhysicalProperties.ANY; - } - return super.getPhysicalProperties(); - } - @Override public Numbers withChildren(List children) { Preconditions.checkArgument(children().size() == 1 diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/TableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/TableValuedFunction.java index 26602435651fab6..fda5302059831a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/TableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/TableValuedFunction.java @@ -31,6 +31,7 @@ import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression; import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; import org.apache.doris.nereids.types.DataType; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.Statistics; import org.apache.doris.tablefunction.TableValuedFunctionIf; @@ -113,6 +114,9 @@ public boolean nullable() { } public PhysicalProperties getPhysicalProperties() { + if (SessionVariable.canUseNereidsDistributePlanner()) { + return PhysicalProperties.ANY; + } return PhysicalProperties.STORAGE_ANY; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BucketSpecifyInstances.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/BucketSpecifyInstances.java similarity index 82% rename from fe/fe-core/src/main/java/org/apache/doris/planner/BucketSpecifyInstances.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/BucketSpecifyInstances.java index 11efafb4dfe4fd1..24c0ddf5509b60f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BucketSpecifyInstances.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/BucketSpecifyInstances.java @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.planner; +package org.apache.doris.nereids.trees.plans.distribute; -import org.apache.doris.nereids.worker.job.BucketScanSource; -import org.apache.doris.nereids.worker.job.WorkerScanSource; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.WorkerScanSource; import java.util.List; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DefaultSpecifyInstances.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DefaultSpecifyInstances.java similarity index 82% rename from fe/fe-core/src/main/java/org/apache/doris/planner/DefaultSpecifyInstances.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DefaultSpecifyInstances.java index 0c0dc7ff3c61cfd..75d4b24c2d44805 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DefaultSpecifyInstances.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DefaultSpecifyInstances.java @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.planner; +package org.apache.doris.nereids.trees.plans.distribute; -import org.apache.doris.nereids.worker.job.DefaultScanSource; -import org.apache.doris.nereids.worker.job.WorkerScanSource; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.WorkerScanSource; import java.util.List; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java index bad620d189330cb..ceef281c0fc534c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java @@ -17,10 +17,10 @@ package org.apache.doris.nereids.trees.plans.distribute; -import org.apache.doris.nereids.worker.job.AssignedJob; -import org.apache.doris.nereids.worker.job.AssignedJobBuilder; -import org.apache.doris.nereids.worker.job.UnassignedJob; -import org.apache.doris.nereids.worker.job.UnassignedJobBuilder; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJobBuilder; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJobBuilder; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanFragmentId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributedPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributedPlan.java index f4bf53cc232ea82..76a48c41cd6339a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributedPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributedPlan.java @@ -19,7 +19,7 @@ import org.apache.doris.nereids.trees.AbstractTreeNode; import org.apache.doris.nereids.util.Utils; -import org.apache.doris.nereids.worker.job.UnassignedJob; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob; import java.util.List; import java.util.Objects; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/NereidsSpecifyInstances.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/NereidsSpecifyInstances.java similarity index 76% rename from fe/fe-core/src/main/java/org/apache/doris/planner/NereidsSpecifyInstances.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/NereidsSpecifyInstances.java index 34b67a2bbc5ceb2..8f3068a0e4f43af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/NereidsSpecifyInstances.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/NereidsSpecifyInstances.java @@ -15,14 +15,14 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.planner; +package org.apache.doris.nereids.trees.plans.distribute; -import org.apache.doris.nereids.worker.Worker; -import org.apache.doris.nereids.worker.job.AssignedJob; -import org.apache.doris.nereids.worker.job.ScanSource; -import org.apache.doris.nereids.worker.job.StaticAssignedJob; -import org.apache.doris.nereids.worker.job.UnassignedJob; -import org.apache.doris.nereids.worker.job.WorkerScanSource; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanSource; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.StaticAssignedJob; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.WorkerScanSource; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TUniqueId; @@ -46,7 +46,7 @@ public List buildAssignedJobs(UnassignedJob unassignedJob) { ConnectContext context = ConnectContext.get(); for (WorkerScanSource workerToScanSource : workerScanSources) { TUniqueId instanceId = context.nextInstanceId(); - Worker worker = workerToScanSource.worker; + DistributedPlanWorker worker = workerToScanSource.worker; ScanSource scanSource = workerToScanSource.scanSource; StaticAssignedJob assignedJob = new StaticAssignedJob( instanceNum++, instanceId, unassignedJob, worker, scanSource diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/PipelineDistributedPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/PipelineDistributedPlan.java index d268e8da20b32f7..a4c1cd856c03af0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/PipelineDistributedPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/PipelineDistributedPlan.java @@ -18,8 +18,8 @@ package org.apache.doris.nereids.trees.plans.distribute; import org.apache.doris.nereids.util.Utils; -import org.apache.doris.nereids.worker.job.AssignedJob; -import org.apache.doris.nereids.worker.job.UnassignedJob; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob; import org.apache.doris.thrift.TExplainLevel; import java.util.List; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/BackendWorker.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java similarity index 93% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/BackendWorker.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java index e64683773b97ee1..702a00dd358d29a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/BackendWorker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java @@ -15,14 +15,14 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker; +package org.apache.doris.nereids.trees.plans.distribute.worker; import org.apache.doris.system.Backend; import java.util.Objects; /** BackendWorker */ -public class BackendWorker implements Worker { +public class BackendWorker implements DistributedPlanWorker { private final Backend backend; public BackendWorker(Backend backend) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/BackendWorkerManager.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorkerManager.java similarity index 92% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/BackendWorkerManager.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorkerManager.java index 1e5a91e29ddca39..7c17a2680d0b446 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/BackendWorkerManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorkerManager.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker; +package org.apache.doris.nereids.trees.plans.distribute.worker; import org.apache.doris.catalog.Env; import org.apache.doris.common.NereidsException; @@ -39,7 +39,7 @@ public class BackendWorkerManager implements WorkerManager { }); @Override - public Worker getWorker(long backendId) { + public DistributedPlanWorker getWorker(long backendId) { ImmutableMap backends = this.backends.get(); Backend backend = backends.get(backendId); if (backend == null) { @@ -49,7 +49,7 @@ public Worker getWorker(long backendId) { } @Override - public Worker randomAvailableWorker() { + public DistributedPlanWorker randomAvailableWorker() { try { Reference selectedBackendId = new Reference<>(); ImmutableMap backends = this.backends.get(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/Worker.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorker.java similarity index 76% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/Worker.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorker.java index 8d1179fc1e20124..c86675a6dab27c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/Worker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/DistributedPlanWorker.java @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker; +package org.apache.doris.nereids.trees.plans.distribute.worker; -/** Worker */ -public interface Worker extends Comparable { +/** + * DistributedPlanWorker: a worker who can execute the assigned job(instance) of the DistributedPlan + */ +public interface DistributedPlanWorker extends Comparable { long id(); // ipv4/ipv6 address @@ -32,7 +34,7 @@ public interface Worker extends Comparable { boolean available(); @Override - default int compareTo(Worker worker) { + default int compareTo(DistributedPlanWorker worker) { return address().compareTo(worker.address()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/LoadBalanceScanWorkerSelector.java similarity index 88% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/LoadBalanceScanWorkerSelector.java index 31da2cddb02f65e..87a6b0088a38b87 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/LoadBalanceScanWorkerSelector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/LoadBalanceScanWorkerSelector.java @@ -15,16 +15,16 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker; +package org.apache.doris.nereids.trees.plans.distribute.worker; import org.apache.doris.common.Pair; import org.apache.doris.nereids.exceptions.AnalysisException; -import org.apache.doris.nereids.worker.job.BucketScanSource; -import org.apache.doris.nereids.worker.job.DefaultScanSource; -import org.apache.doris.nereids.worker.job.ScanRanges; -import org.apache.doris.nereids.worker.job.UnassignedJob; -import org.apache.doris.nereids.worker.job.UnassignedScanBucketOlapTableJob; -import org.apache.doris.nereids.worker.job.UninstancedScanSource; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanRanges; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedScanBucketOlapTableJob; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.UninstancedScanSource; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.PlanFragment; @@ -51,7 +51,7 @@ /** LoadBalanceScanWorkerSelector */ public class LoadBalanceScanWorkerSelector implements ScanWorkerSelector { private final BackendWorkerManager workerManager = new BackendWorkerManager(); - private final Map workloads = Maps.newLinkedHashMap(); + private final Map workloads = Maps.newLinkedHashMap(); @Override public WorkerManager getWorkerManager() { @@ -59,10 +59,10 @@ public WorkerManager getWorkerManager() { } @Override - public Worker selectMinWorkloadWorker(List workers) { - Worker minWorkloadWorker = null; + public DistributedPlanWorker selectMinWorkloadWorker(List workers) { + DistributedPlanWorker minWorkloadWorker = null; WorkerWorkload minWorkload = new WorkerWorkload(Integer.MAX_VALUE, Long.MAX_VALUE); - for (Worker worker : workers) { + for (DistributedPlanWorker worker : workers) { WorkerWorkload workload = getWorkload(worker); if (minWorkload.compareTo(workload) > 0) { minWorkloadWorker = worker; @@ -74,8 +74,8 @@ public Worker selectMinWorkloadWorker(List workers) { } @Override - public Map selectReplicaAndWorkerWithoutBucket(ScanNode scanNode) { - Map workerScanRanges = Maps.newLinkedHashMap(); + public Map selectReplicaAndWorkerWithoutBucket(ScanNode scanNode) { + Map workerScanRanges = Maps.newLinkedHashMap(); // allScanRangesLocations is all scan ranges in all partition which need to scan List allScanRangesLocations = scanNode.getScanRangeLocations(0); for (TScanRangeLocations onePartitionOneScanRangeLocation : allScanRangesLocations) { @@ -97,7 +97,7 @@ public Map selectReplicaAndWorkerWithoutBucket(Sc } @Override - public Map selectReplicaAndWorkerWithBucket( + public Map selectReplicaAndWorkerWithBucket( UnassignedScanBucketOlapTableJob unassignedJob) { PlanFragment fragment = unassignedJob.getFragment(); List scanNodes = unassignedJob.getScanNodes(); @@ -142,11 +142,11 @@ private Function> bucketBytesSupplier() { }; } - private Map selectForBucket( + private Map selectForBucket( UnassignedJob unassignedJob, List scanNodes, BiFunction> bucketScanRangeSupplier, Function> bucketBytesSupplier) { - Map assignment = Maps.newLinkedHashMap(); + Map assignment = Maps.newLinkedHashMap(); Map bucketIndexToBytes = computeEachBucketScanBytes(scanNodes, bucketBytesSupplier); @@ -154,7 +154,7 @@ private Map selectForBucket( Integer bucketIndex = kv.getKey(); long allScanNodeScanBytesInOneBucket = kv.getValue(); - Worker selectedWorker = null; + DistributedPlanWorker selectedWorker = null; for (ScanNode scanNode : scanNodes) { List allPartitionTabletsInOneBucketInOneTable = bucketScanRangeSupplier.apply(scanNode, bucketIndex); @@ -201,12 +201,12 @@ private WorkerScanRanges selectScanReplicaAndMinWorkloadWorker( List replicaLocations = tabletLocation.getLocations(); int replicaNum = replicaLocations.size(); WorkerWorkload minWorkload = new WorkerWorkload(Integer.MAX_VALUE, Long.MAX_VALUE); - Worker minWorkLoadWorker = null; + DistributedPlanWorker minWorkLoadWorker = null; TScanRangeLocation selectedReplicaLocation = null; for (int i = 0; i < replicaNum; i++) { TScanRangeLocation replicaLocation = replicaLocations.get(i); - Worker worker = workerManager.getWorker(replicaLocation.getBackendId()); + DistributedPlanWorker worker = workerManager.getWorker(replicaLocation.getBackendId()); if (!worker.available()) { continue; } @@ -224,7 +224,7 @@ private WorkerScanRanges selectScanReplicaAndMinWorkloadWorker( minWorkload.recordOneScanTask(tabletBytes); ScanRanges scanRanges = new ScanRanges(); TScanRangeParams scanReplicaParams = - ScanWorkerSelector.buildScanReplicaParams(tabletLocation, selectedReplicaLocation); + buildScanReplicaParams(tabletLocation, selectedReplicaLocation); scanRanges.addScanRange(scanReplicaParams, tabletBytes); return new WorkerScanRanges(minWorkLoadWorker, scanRanges); } @@ -241,7 +241,7 @@ private List> filterReplicaByWorkerInBucket( boolean foundTabletInThisWorker = false; for (TScanRangeLocation replicaLocation : onePartitionOneTabletLocation.getLocations()) { if (replicaLocation.getBackendId() == filterWorkerId) { - TScanRangeParams scanReplicaParams = ScanWorkerSelector.buildScanReplicaParams( + TScanRangeParams scanReplicaParams = buildScanReplicaParams( onePartitionOneTabletLocation, replicaLocation); Long replicaSize = ((OlapScanNode) scanNode).getTabletSingleReplicaSize(tabletId); selectedReplicasInOneBucket.add(Pair.of(scanReplicaParams, replicaSize)); @@ -255,7 +255,7 @@ private List> filterReplicaByWorkerInBucket( } } else if (onePartitionOneTabletLocation.getLocations().size() == 1) { TScanRangeLocation replicaLocation = onePartitionOneTabletLocation.getLocations().get(0); - TScanRangeParams scanReplicaParams = ScanWorkerSelector.buildScanReplicaParams( + TScanRangeParams scanReplicaParams = buildScanReplicaParams( onePartitionOneTabletLocation, replicaLocation); Long replicaSize = 0L; selectedReplicasInOneBucket.add(Pair.of(scanReplicaParams, replicaSize)); @@ -280,7 +280,7 @@ private Map computeEachBucketScanBytes( return bucketIndexToBytes; } - private WorkerWorkload getWorkload(Worker worker) { + private WorkerWorkload getWorkload(DistributedPlanWorker worker) { return workloads.computeIfAbsent(worker, w -> new WorkerWorkload()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/ScanWorkerSelector.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/ScanWorkerSelector.java similarity index 83% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/ScanWorkerSelector.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/ScanWorkerSelector.java index 05f1f46576e8a26..c534cd5455d9ebc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/ScanWorkerSelector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/ScanWorkerSelector.java @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker; +package org.apache.doris.nereids.trees.plans.distribute.worker; -import org.apache.doris.nereids.worker.job.UnassignedScanBucketOlapTableJob; -import org.apache.doris.nereids.worker.job.UninstancedScanSource; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedScanBucketOlapTableJob; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.UninstancedScanSource; import org.apache.doris.planner.ScanNode; import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; @@ -31,11 +31,11 @@ public interface ScanWorkerSelector { WorkerManager getWorkerManager(); - Worker selectMinWorkloadWorker(List workers); + DistributedPlanWorker selectMinWorkloadWorker(List workers); // for a scan node, select replica for each scan range(denote tablet if the ScanNode is OlapScanNode), // use the replica location to build a worker execute the instance - Map selectReplicaAndWorkerWithoutBucket(ScanNode scanNode); + Map selectReplicaAndWorkerWithoutBucket(ScanNode scanNode); // return // key: Worker, the backend which will process this fragment @@ -48,7 +48,7 @@ public interface ScanWorkerSelector { // p1 values[(1), (10)) and p2 values[(10), 11) with integer partition column part, // and distributed by hash(id) buckets 10. And, so, there has 10 buckets from bucket 0 to // bucket 9, and every bucket contains two tablets, because there are two partitions. - Map selectReplicaAndWorkerWithBucket( + Map selectReplicaAndWorkerWithBucket( UnassignedScanBucketOlapTableJob unassignedJob); static TScanRangeParams buildScanReplicaParams( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerManager.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/WorkerManager.java similarity index 83% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerManager.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/WorkerManager.java index 5db890d78227778..6d965495d3c5529 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/WorkerManager.java @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker; +package org.apache.doris.nereids.trees.plans.distribute.worker; /** WorkerManager */ public interface WorkerManager { - Worker getWorker(long backendId); + DistributedPlanWorker getWorker(long backendId); - Worker randomAvailableWorker(); + DistributedPlanWorker randomAvailableWorker(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerScanRanges.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/WorkerScanRanges.java similarity index 80% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerScanRanges.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/WorkerScanRanges.java index 7f861d33cfc13c2..25daa29bd870d10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/WorkerScanRanges.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/WorkerScanRanges.java @@ -15,18 +15,18 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker; +package org.apache.doris.nereids.trees.plans.distribute.worker; -import org.apache.doris.nereids.worker.job.ScanRanges; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanRanges; import java.util.Objects; /** WorkerScanRange */ public class WorkerScanRanges { - public final Worker worker; + public final DistributedPlanWorker worker; public final ScanRanges scanRanges; - public WorkerScanRanges(Worker worker, ScanRanges scanRanges) { + public WorkerScanRanges(DistributedPlanWorker worker, ScanRanges scanRanges) { this.worker = Objects.requireNonNull(worker, "scanRangeParams can not be null"); this.scanRanges = Objects.requireNonNull(scanRanges, "scanRanges can not be null"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/Workload.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/Workload.java similarity index 93% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/Workload.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/Workload.java index f67c3660c1520ce..faf6ec1c5e824bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/Workload.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/Workload.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker; +package org.apache.doris.nereids.trees.plans.distribute.worker; /** Workload */ public interface Workload extends Comparable { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AbstractUnassignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedJob.java similarity index 97% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AbstractUnassignedJob.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedJob.java index 6812480dd8a1615..f53ee6145233797 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AbstractUnassignedJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedJob.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; import org.apache.doris.nereids.trees.AbstractTreeNode; import org.apache.doris.nereids.util.Utils; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AbstractUnassignedScanJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java similarity index 88% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AbstractUnassignedScanJob.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java index 8e9d3deb12f4cd4..6242d0ba946d2b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AbstractUnassignedScanJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; -import org.apache.doris.nereids.worker.Worker; -import org.apache.doris.nereids.worker.WorkerManager; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; +import org.apache.doris.nereids.trees.plans.distribute.worker.WorkerManager; import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.PlanFragment; @@ -47,25 +47,25 @@ public AbstractUnassignedScanJob(PlanFragment fragment, public List computeAssignedJobs(WorkerManager workerManager, ListMultimap inputJobs) { - Map workerToScanSource = multipleMachinesParallelization( + Map workerToScanSource = multipleMachinesParallelization( workerManager, inputJobs); return insideMachineParallelization(workerToScanSource, inputJobs); } - protected abstract Map multipleMachinesParallelization( + protected abstract Map multipleMachinesParallelization( WorkerManager workerManager, ListMultimap inputJobs); protected List insideMachineParallelization( - Map workerToScanRanges, + Map workerToScanRanges, ListMultimap inputJobs) { ConnectContext context = ConnectContext.get(); boolean useLocalShuffleToAddParallel = useLocalShuffleToAddParallel(workerToScanRanges); int instanceIndexInFragment = 0; List instances = Lists.newArrayList(); - for (Entry entry : workerToScanRanges.entrySet()) { - Worker worker = entry.getKey(); + for (Entry entry : workerToScanRanges.entrySet()) { + DistributedPlanWorker worker = entry.getKey(); // the scanRanges which this worker should scan, // for example: @@ -135,14 +135,14 @@ protected List insideMachineParallelization( return instances; } - protected boolean useLocalShuffleToAddParallel(Map workerToScanRanges) { + protected boolean useLocalShuffleToAddParallel(Map workerToScanRanges) { if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isForceToLocalShuffle()) { return true; } return parallelTooLittle(workerToScanRanges); } - protected boolean parallelTooLittle(Map workerToScanRanges) { + protected boolean parallelTooLittle(Map workerToScanRanges) { if (this instanceof UnassignedScanBucketOlapTableJob) { return scanRangesToLittle(workerToScanRanges) && bucketsTooLittle(workerToScanRanges); } else if (this instanceof UnassignedScanSingleOlapTableJob @@ -154,7 +154,7 @@ protected boolean parallelTooLittle(Map workerToS } protected boolean scanRangesToLittle( - Map workerToScanRanges) { + Map workerToScanRanges) { ConnectContext context = ConnectContext.get(); int backendNum = workerToScanRanges.size(); for (ScanNode scanNode : scanNodes) { @@ -184,7 +184,7 @@ protected int degreeOfParallelism(int maxParallel) { return Math.min(maxParallel, Math.max(fragment.getParallelExecNum(), 1)); } - protected boolean bucketsTooLittle(Map workerToScanRanges) { + protected boolean bucketsTooLittle(Map workerToScanRanges) { int parallelExecNum = fragment.getParallelExecNum(); for (UninstancedScanSource uninstancedScanSource : workerToScanRanges.values()) { ScanSource scanSource = uninstancedScanSource.scanSource; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJob.java similarity index 85% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJob.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJob.java index 96b8c8a51d04c9b..f9f6b9dea1451b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJob.java @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; -import org.apache.doris.nereids.worker.Worker; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; import org.apache.doris.thrift.TUniqueId; /** @@ -31,7 +31,7 @@ public interface AssignedJob { UnassignedJob unassignedJob(); - Worker getAssignedWorker(); + DistributedPlanWorker getAssignedWorker(); ScanSource getScanSource(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJobBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJobBuilder.java similarity index 95% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJobBuilder.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJobBuilder.java index 5ab80979b91e8de..147e56dbbef2834 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJobBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJobBuilder.java @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; -import org.apache.doris.nereids.worker.BackendWorkerManager; +import org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorkerManager; import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.PlanFragmentId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/BucketScanSource.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/BucketScanSource.java similarity index 98% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/BucketScanSource.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/BucketScanSource.java index 51f3e9031502811..33d066a02b9fcb4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/BucketScanSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/BucketScanSource.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; import org.apache.doris.common.util.ListUtil; import org.apache.doris.planner.ScanNode; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/CustomAssignmentJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/CustomAssignmentJob.java similarity index 94% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/CustomAssignmentJob.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/CustomAssignmentJob.java index b1f9d6d8a875a13..dc009dba09acef8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/CustomAssignmentJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/CustomAssignmentJob.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; import org.apache.doris.planner.ExchangeNode; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DefaultScanSource.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/DefaultScanSource.java similarity index 98% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DefaultScanSource.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/DefaultScanSource.java index c763e9fa90b90ca..89e5270801dcb42 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/DefaultScanSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/DefaultScanSource.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; import org.apache.doris.planner.ScanNode; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/LocalShuffleAssignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java similarity index 87% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/LocalShuffleAssignedJob.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java index 3bb6c349ca065c5..50e43fc0282755c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/LocalShuffleAssignedJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; -import org.apache.doris.nereids.worker.Worker; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.ImmutableMap; @@ -31,7 +31,7 @@ public class LocalShuffleAssignedJob extends StaticAssignedJob { public LocalShuffleAssignedJob( int indexInUnassignedJob, int shareScanId, TUniqueId instanceId, UnassignedJob unassignedJob, - Worker worker, ScanSource scanSource) { + DistributedPlanWorker worker, ScanSource scanSource) { super(indexInUnassignedJob, instanceId, unassignedJob, worker, scanSource); this.shareScanId = shareScanId; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanRange.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/ScanRange.java similarity index 92% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanRange.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/ScanRange.java index a20897eee9426fe..24bf7a6d910a5f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanRange.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/ScanRange.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; /** ScanRange */ public class ScanRange { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanRanges.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/ScanRanges.java similarity index 98% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanRanges.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/ScanRanges.java index eddb58f830c7555..2539b8416dea08d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanRanges.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/ScanRanges.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; import org.apache.doris.thrift.TPaloScanRange; import org.apache.doris.thrift.TScanRangeParams; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanSource.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/ScanSource.java similarity index 95% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanSource.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/ScanSource.java index 4e8a49bcfafbc5c..b124e14bd73c776 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/ScanSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/ScanSource.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; import org.apache.doris.planner.ScanNode; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/Splittable.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/Splittable.java similarity index 96% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/Splittable.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/Splittable.java index 0a18d299402159e..269dcedfb868f95 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/Splittable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/Splittable.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; import com.google.common.base.Preconditions; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/StaticAssignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/StaticAssignedJob.java similarity index 91% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/StaticAssignedJob.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/StaticAssignedJob.java index 1e02767644bd013..1a92cf71019e667 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/StaticAssignedJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/StaticAssignedJob.java @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.nereids.worker.Worker; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.ImmutableMap; @@ -32,11 +32,11 @@ public class StaticAssignedJob implements AssignedJob { private final int indexInUnassignedJob; private final UnassignedJob unassignedJob; private final TUniqueId instanceId; - private final Worker worker; + private final DistributedPlanWorker worker; private final ScanSource scanSource; public StaticAssignedJob( - int indexInUnassignedJob, TUniqueId instanceId, UnassignedJob unassignedJob, Worker worker, + int indexInUnassignedJob, TUniqueId instanceId, UnassignedJob unassignedJob, DistributedPlanWorker worker, ScanSource scanSource) { this.indexInUnassignedJob = indexInUnassignedJob; this.instanceId = Objects.requireNonNull(instanceId, "instanceId can not be null"); @@ -61,7 +61,7 @@ public UnassignedJob unassignedJob() { } @Override - public Worker getAssignedWorker() { + public DistributedPlanWorker getAssignedWorker() { return worker; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java similarity index 89% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java index da0bc1b7c3aaa4a..e27a9ebc6b5f1a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; -import org.apache.doris.nereids.worker.ScanWorkerSelector; -import org.apache.doris.nereids.worker.Worker; -import org.apache.doris.nereids.worker.WorkerManager; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; +import org.apache.doris.nereids.trees.plans.distribute.worker.WorkerManager; +import org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector; import org.apache.doris.planner.DataGenScanNode; import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.PlanFragment; @@ -77,7 +77,7 @@ public List computeAssignedJobs(WorkerManager workerManager, scanNodeToScanRanges.put(scanNode, scanRanges); } - Worker randomWorker = workerManager.randomAvailableWorker(); + DistributedPlanWorker randomWorker = workerManager.randomAvailableWorker(); return ImmutableList.of( assignWorkerAndDataSources(0, context.nextInstanceId(), randomWorker, new DefaultScanSource(scanNodeToScanRanges) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJob.java similarity index 84% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJob.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJob.java index 8c88e7edff51807..6536656313ca313 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJob.java @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; import org.apache.doris.nereids.trees.TreeNode; -import org.apache.doris.nereids.worker.Worker; -import org.apache.doris.nereids.worker.WorkerManager; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; +import org.apache.doris.nereids.trees.plans.distribute.worker.WorkerManager; import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.ScanNode; @@ -46,7 +46,7 @@ List computeAssignedJobs( // generate an instance job // e.g. build an instance job by a backends and the replica ids it contains default AssignedJob assignWorkerAndDataSources( - int instanceIndexInFragment, TUniqueId instanceId, Worker worker, ScanSource scanSource) { + int instanceIndexInFragment, TUniqueId instanceId, DistributedPlanWorker worker, ScanSource scanSource) { return new StaticAssignedJob(instanceIndexInFragment, instanceId, this, worker, scanSource); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJobBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java similarity index 98% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJobBuilder.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java index 71ec4823745346d..c5fadc8fb8df578 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJobBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping; -import org.apache.doris.nereids.worker.LoadBalanceScanWorkerSelector; -import org.apache.doris.nereids.worker.ScanWorkerSelector; +import org.apache.doris.nereids.trees.plans.distribute.worker.LoadBalanceScanWorkerSelector; +import org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector; import org.apache.doris.planner.DataSink; import org.apache.doris.planner.DataStreamSink; import org.apache.doris.planner.ExchangeNode; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedQueryConstantJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedQueryConstantJob.java similarity index 85% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedQueryConstantJob.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedQueryConstantJob.java index c7371b64fb77dd8..e771a78da0c0d6f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedQueryConstantJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedQueryConstantJob.java @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; -import org.apache.doris.nereids.worker.Worker; -import org.apache.doris.nereids.worker.WorkerManager; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; +import org.apache.doris.nereids.trees.plans.distribute.worker.WorkerManager; import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.qe.ConnectContext; @@ -39,7 +39,7 @@ public UnassignedQueryConstantJob(PlanFragment fragment) { @Override public List computeAssignedJobs(WorkerManager workerManager, ListMultimap inputJobs) { - Worker randomWorker = workerManager.randomAvailableWorker(); + DistributedPlanWorker randomWorker = workerManager.randomAvailableWorker(); ConnectContext context = ConnectContext.get(); return ImmutableList.of( new StaticAssignedJob(0, context.nextInstanceId(), this, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanBucketOlapTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java similarity index 89% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanBucketOlapTableJob.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java index bdc05a8c5abb93d..9fc3ca6773cc98a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanBucketOlapTableJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java @@ -15,16 +15,16 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; import org.apache.doris.analysis.JoinOperator; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Tablet; -import org.apache.doris.nereids.worker.ScanWorkerSelector; -import org.apache.doris.nereids.worker.Worker; -import org.apache.doris.nereids.worker.WorkerManager; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; +import org.apache.doris.nereids.trees.plans.distribute.worker.WorkerManager; +import org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector; import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.HashJoinNode; import org.apache.doris.planner.OlapScanNode; @@ -74,7 +74,7 @@ public List getOlapScanNodes() { } @Override - protected Map multipleMachinesParallelization( + protected Map multipleMachinesParallelization( WorkerManager workerManager, ListMultimap inputJobs) { // for every bucket tablet, select its replica and worker. // for example, colocate join: @@ -99,7 +99,7 @@ protected Map multipleMachinesParallelization( @Override protected List insideMachineParallelization( - Map workerToScanRanges, + Map workerToScanRanges, ListMultimap inputJobs) { // separate buckets to instanceNum groups, let one instance process some buckets. // for example, colocate join: @@ -186,12 +186,12 @@ private List fillUpInstances(List instances) { OlapScanNode olapScanNode = (OlapScanNode) scanNodes.get(0); MaterializedIndex randomPartition = randomPartition(olapScanNode); - ListMultimap missingBuckets = selectWorkerForMissingBuckets( + ListMultimap missingBuckets = selectWorkerForMissingBuckets( olapScanNode, randomPartition, missingBucketIndexes); boolean useLocalShuffle = instances.stream().anyMatch(LocalShuffleAssignedJob.class::isInstance); List newInstances = new ArrayList<>(instances); - for (Entry> workerToBuckets : missingBuckets.asMap().entrySet()) { + for (Entry> workerToBuckets : missingBuckets.asMap().entrySet()) { Map> scanEmptyBuckets = Maps.newLinkedHashMap(); for (Integer bucketIndex : workerToBuckets.getValue()) { Map scanTableWithEmptyData = Maps.newLinkedHashMap(); @@ -202,7 +202,7 @@ private List fillUpInstances(List instances) { } AssignedJob fillUpInstance = null; - Worker worker = workerToBuckets.getKey(); + DistributedPlanWorker worker = workerToBuckets.getKey(); BucketScanSource scanSource = new BucketScanSource(scanEmptyBuckets); if (useLocalShuffle) { // when use local shuffle, we should ensure every backend only process one instance! @@ -277,30 +277,30 @@ private MaterializedIndex randomPartition(OlapScanNode olapScanNode) { return partition.getBaseIndex(); } - private ListMultimap selectWorkerForMissingBuckets( + private ListMultimap selectWorkerForMissingBuckets( OlapScanNode olapScanNode, MaterializedIndex partition, Set selectBucketIndexes) { List tabletIdsInOrder = partition.getTabletIdsInOrder(); - ListMultimap fillUpWorkerToBuckets = ArrayListMultimap.create(); + ListMultimap fillUpWorkerToBuckets = ArrayListMultimap.create(); for (Integer bucketIndex : selectBucketIndexes) { Long tabletIdInBucket = tabletIdsInOrder.get(bucketIndex); Tablet tabletInBucket = partition.getTablet(tabletIdInBucket); - List workers = getWorkersByReplicas(tabletInBucket); + List workers = getWorkersByReplicas(tabletInBucket); if (workers.isEmpty()) { throw new IllegalStateException("Can not found available replica for bucket " + bucketIndex + ", table: " + olapScanNode); } - Worker worker = scanWorkerSelector.selectMinWorkloadWorker(workers); + DistributedPlanWorker worker = scanWorkerSelector.selectMinWorkloadWorker(workers); fillUpWorkerToBuckets.put(worker, bucketIndex); } return fillUpWorkerToBuckets; } - private List getWorkersByReplicas(Tablet tablet) { + private List getWorkersByReplicas(Tablet tablet) { WorkerManager workerManager = scanWorkerSelector.getWorkerManager(); List replicas = tablet.getReplicas(); - List workers = Lists.newArrayListWithCapacity(replicas.size()); + List workers = Lists.newArrayListWithCapacity(replicas.size()); for (Replica replica : replicas) { - Worker worker = workerManager.getWorker(replica.getBackendId()); + DistributedPlanWorker worker = workerManager.getWorker(replica.getBackendId()); if (worker.available()) { workers.add(worker); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanSingleOlapTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java similarity index 87% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanSingleOlapTableJob.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java index 410a4edd8651ddc..aba257c021dbc93 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanSingleOlapTableJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; -import org.apache.doris.nereids.worker.ScanWorkerSelector; -import org.apache.doris.nereids.worker.Worker; -import org.apache.doris.nereids.worker.WorkerManager; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; +import org.apache.doris.nereids.trees.plans.distribute.worker.WorkerManager; +import org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector; import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.PlanFragment; @@ -47,7 +47,7 @@ public UnassignedScanSingleOlapTableJob( } @Override - protected Map multipleMachinesParallelization( + protected Map multipleMachinesParallelization( WorkerManager workerManager, ListMultimap inputJobs) { // for every tablet, select its replica and worker. // for example: @@ -62,7 +62,7 @@ protected Map multipleMachinesParallelization( @Override protected List insideMachineParallelization( - Map workerToScanRanges, + Map workerToScanRanges, ListMultimap inputJobs) { // for each worker, compute how many instances should be generated, and which data should be scanned. // for example: diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanSingleRemoteTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleRemoteTableJob.java similarity index 83% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanSingleRemoteTableJob.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleRemoteTableJob.java index 4c6025dfa7356f0..beaa44de4bfe45b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanSingleRemoteTableJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleRemoteTableJob.java @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; -import org.apache.doris.nereids.worker.ScanWorkerSelector; -import org.apache.doris.nereids.worker.Worker; -import org.apache.doris.nereids.worker.WorkerManager; +import org.apache.doris.nereids.trees.plans.distribute.worker.WorkerManager; +import org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.ScanNode; @@ -46,7 +46,7 @@ public UnassignedScanSingleRemoteTableJob( } @Override - protected Map multipleMachinesParallelization( + protected Map multipleMachinesParallelization( WorkerManager workerManager, ListMultimap inputJobs) { return scanWorkerSelector.selectReplicaAndWorkerWithoutBucket(scanNodes.get(0)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedShuffleJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java similarity index 85% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedShuffleJob.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java index 22131db4c9f9d20..bbe40a3a32a9ada 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedShuffleJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; -import org.apache.doris.nereids.worker.Worker; -import org.apache.doris.nereids.worker.WorkerManager; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; +import org.apache.doris.nereids.trees.plans.distribute.worker.WorkerManager; import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.qe.ConnectContext; @@ -52,16 +52,16 @@ public List computeAssignedJobs( // When group by cardinality is smaller than number of backend, only some backends always // process while other has no data to process. // So we shuffle instances to make different backends handle different queries. - List shuffleWorkersInBiggestParallelChildFragment + List shuffleWorkersInBiggestParallelChildFragment = distinctShuffleWorkers(biggestParallelChildFragment); - Function workerSelector = instanceIndex -> { + Function workerSelector = instanceIndex -> { int selectIndex = instanceIndex % shuffleWorkersInBiggestParallelChildFragment.size(); return shuffleWorkersInBiggestParallelChildFragment.get(selectIndex); }; return buildInstances(expectInstanceNum, workerSelector); } else { // keep same instance num like child fragment - Function workerSelector = instanceIndex -> { + Function workerSelector = instanceIndex -> { int selectIndex = instanceIndex % biggestParallelChildFragment.size(); return biggestParallelChildFragment.get(selectIndex).getAssignedWorker(); }; @@ -99,11 +99,11 @@ private List getInstancesOfBiggestParallelChildFragment( return biggestParallelChildFragment; } - private List buildInstances(int instanceNum, Function workerSelector) { + private List buildInstances(int instanceNum, Function workerSelector) { ImmutableList.Builder instances = ImmutableList.builderWithExpectedSize(instanceNum); ConnectContext context = ConnectContext.get(); for (int i = 0; i < instanceNum; i++) { - Worker selectedWorker = workerSelector.apply(i); + DistributedPlanWorker selectedWorker = workerSelector.apply(i); AssignedJob assignedJob = assignWorkerAndDataSources( i, context.nextInstanceId(), selectedWorker, new DefaultScanSource(ImmutableMap.of()) ); @@ -112,12 +112,12 @@ private List buildInstances(int instanceNum, Function distinctShuffleWorkers(List instances) { - Set candidateWorkerSet = Sets.newLinkedHashSet(); + private List distinctShuffleWorkers(List instances) { + Set candidateWorkerSet = Sets.newLinkedHashSet(); for (AssignedJob instance : instances) { candidateWorkerSet.add(instance.getAssignedWorker()); } - List candidateWorkers = Lists.newArrayList(candidateWorkerSet); + List candidateWorkers = Lists.newArrayList(candidateWorkerSet); Collections.shuffle(candidateWorkers); return candidateWorkers; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedSpecifyInstancesJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedSpecifyInstancesJob.java similarity index 89% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedSpecifyInstancesJob.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedSpecifyInstancesJob.java index 1a877d229c9ce7b..63631acc0ac2433 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedSpecifyInstancesJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedSpecifyInstancesJob.java @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; -import org.apache.doris.nereids.worker.WorkerManager; +import org.apache.doris.nereids.trees.plans.distribute.worker.WorkerManager; import org.apache.doris.planner.ExchangeNode; -import org.apache.doris.planner.NereidsSpecifyInstances; +import org.apache.doris.nereids.trees.plans.distribute.NereidsSpecifyInstances; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.ScanNode; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UninstancedScanSource.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UninstancedScanSource.java similarity index 94% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UninstancedScanSource.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UninstancedScanSource.java index 110256530b0cbff..cce4c2f25f966a3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UninstancedScanSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UninstancedScanSource.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; import com.google.common.collect.ImmutableMap; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/WorkerScanSource.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/WorkerScanSource.java similarity index 78% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/WorkerScanSource.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/WorkerScanSource.java index 1abb952a06f5ca6..f9c5993a5b5eb34 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/WorkerScanSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/WorkerScanSource.java @@ -15,16 +15,16 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.worker.job; +package org.apache.doris.nereids.trees.plans.distribute.worker.job; -import org.apache.doris.nereids.worker.Worker; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; /** WorkerScanSource */ public class WorkerScanSource { - public final Worker worker; + public final DistributedPlanWorker worker; public final S scanSource; - public WorkerScanSource(Worker worker, S scanSource) { + public WorkerScanSource(DistributedPlanWorker worker, S scanSource) { this.worker = worker; this.scanSource = scanSource; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index 23b392889dfcff7..8a26ec4af66ac1f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -27,7 +27,8 @@ import org.apache.doris.analysis.StatementBase; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.common.TreeNode; -import org.apache.doris.nereids.worker.job.ScanSource; +import org.apache.doris.nereids.trees.plans.distribute.NereidsSpecifyInstances; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanSource; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPartitionType; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java index 83ef98fdcca9e35..4f5af3762c51af4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java @@ -24,14 +24,14 @@ import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan; import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping; import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan; -import org.apache.doris.nereids.worker.Worker; -import org.apache.doris.nereids.worker.job.AssignedJob; -import org.apache.doris.nereids.worker.job.BucketScanSource; -import org.apache.doris.nereids.worker.job.DefaultScanSource; -import org.apache.doris.nereids.worker.job.LocalShuffleAssignedJob; -import org.apache.doris.nereids.worker.job.ScanRanges; -import org.apache.doris.nereids.worker.job.ScanSource; -import org.apache.doris.nereids.worker.job.UnassignedJob; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleAssignedJob; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanRanges; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanSource; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.Planner; @@ -96,7 +96,7 @@ protected void computeFragmentHosts() { } for (AssignedJob instanceJob : instanceJobs) { - Worker worker = instanceJob.getAssignedWorker(); + DistributedPlanWorker worker = instanceJob.getAssignedWorker(); TNetworkAddress address = new TNetworkAddress(worker.host(), worker.port()); FInstanceExecParam instanceExecParam = new FInstanceExecParam( null, address, 0, fragmentExecParams); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 81af1fffce09112..df34e0fc78918b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -18,7 +18,6 @@ package org.apache.doris.qe; import org.apache.doris.analysis.SetVar; -import org.apache.doris.analysis.StatementBase; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; @@ -1250,7 +1249,13 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { private boolean nereidsStarSchemaSupport = true; @VariableMgr.VarAttr(name = ENABLE_NEREIDS_DISTRIBUTE_PLANNER, needForward = true, - fuzzy = true, varType = VariableAnnotation.EXPERIMENTAL) + fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, description = { + "使用新的nereids的分布式规划器的开关,这个分布式规划器可以规划出一些更高效的查询计划,比如在某些情况下," + + "可以把左表shuffle到右表去做bucket shuffle join", + "The switch to use new DistributedPlanner of nereids, this planner can planning some " + + "more efficient query plans, e.g. in certain situations, shuffle left side to " + + "right side to do bucket shuffle join" + }) private boolean enableNereidsDistributePlanner = false; @VariableMgr.VarAttr(name = REWRITE_OR_TO_IN_PREDICATE_THRESHOLD, fuzzy = true) @@ -3080,6 +3085,7 @@ public void setEnableNereidsPlanner(boolean enableNereidsPlanner) { /** canUseNereidsDistributePlanner */ public static boolean canUseNereidsDistributePlanner() { + // TODO: support cloud mode if (Config.isCloudMode()) { return false; } @@ -3087,21 +3093,14 @@ public static boolean canUseNereidsDistributePlanner() { if (connectContext == null) { return false; } - if (!connectContext.getState().isNereids()) { - return false; - } StatementContext statementContext = connectContext.getStatementContext(); if (statementContext == null) { return false; } - StatementBase parsedStatement = statementContext.getParsedStatement(); - if (!(parsedStatement instanceof LogicalPlanAdapter)) { - return false; - } - LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStatement).getLogicalPlan(); + LogicalPlan logicalPlan = ((LogicalPlanAdapter) statementContext.getParsedStatement()).getLogicalPlan(); SessionVariable sessionVariable = connectContext.getSessionVariable(); - if (logicalPlan instanceof UnboundResultSink - && sessionVariable.enableNereidsDistributePlanner && sessionVariable.enablePipelineXEngine) { + // TODO: support other sink + if (logicalPlan instanceof UnboundResultSink && sessionVariable.enableNereidsDistributePlanner) { return true; } return false;