From d9b7698f27720467d2d94ec2a99d72c44394164d Mon Sep 17 00:00:00 2001 From: 924060929 Date: Fri, 15 Nov 2024 18:53:34 +0800 Subject: [PATCH] opt --- .../apache/doris/common/profile/Profile.java | 12 +- .../jobs/cascades/CostAndEnforcerJob.java | 83 ++++---- .../ChildrenPropertiesRegulator.java | 188 +++++++++++------- .../worker/job/AbstractUnassignedScanJob.java | 112 ++++++----- .../LocalShuffleBucketJoinAssignedJob.java | 56 ++++++ .../worker/job/StaticAssignedJob.java | 5 + .../job/UnassignedScanBucketOlapTableJob.java | 38 +++- .../runtime/RuntimeFiltersThriftBuilder.java | 15 +- .../doris/qe/runtime/ThriftPlansBuilder.java | 19 +- 9 files changed, 347 insertions(+), 181 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java index 5381cc26bd225aa..aaa9f8af42714eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java @@ -286,12 +286,14 @@ public synchronized void updateSummary(Map summaryInfo, boolean NereidsPlanner nereidsPlanner = ((NereidsPlanner) planner); physicalPlan = nereidsPlanner.getPhysicalPlan(); physicalRelations.addAll(nereidsPlanner.getPhysicalRelations()); - FragmentIdMapping distributedPlans = nereidsPlanner.getDistributedPlans(); - if (distributedPlans != null) { - summaryInfo.put(SummaryProfile.DISTRIBUTED_PLAN, + if (profileLevel >= 3) { + FragmentIdMapping distributedPlans = nereidsPlanner.getDistributedPlans(); + if (distributedPlans != null) { + summaryInfo.put(SummaryProfile.DISTRIBUTED_PLAN, DistributedPlan.toString(Lists.newArrayList(distributedPlans.values())) - .replace("\n", "\n ") - ); + .replace("\n", "\n ") + ); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java index 101b3ca6670671a..96960475e9ad4b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java @@ -234,52 +234,61 @@ public void execute() { * @return false if error occurs, the caller will return. */ private boolean calculateEnforce(List requestChildrenProperties, - List outputChildrenProperties) { + List originOutputChildrenProperties) { + // to ensure distributionSpec has been added sufficiently. // it's certain that lowestCostChildren is equals to arity(). ChildrenPropertiesRegulator regulator = new ChildrenPropertiesRegulator(groupExpression, - lowestCostChildren, outputChildrenProperties, requestChildrenProperties, context); - boolean success = regulator.adjustChildrenProperties(); - if (!success) { + lowestCostChildren, new ArrayList<>(originOutputChildrenProperties), + requestChildrenProperties, context); + List> childrenOutputSpace = regulator.adjustChildrenProperties(); + if (childrenOutputSpace.isEmpty()) { // invalid enforce, return. return false; } - // Not need to do pruning here because it has been done when we get the - // best expr from the child group - ChildOutputPropertyDeriver childOutputPropertyDeriver - = new ChildOutputPropertyDeriver(outputChildrenProperties); - // the physical properties the group expression support for its parent. - PhysicalProperties outputProperty = childOutputPropertyDeriver.getOutputProperties(getConnectContext(), - groupExpression); - - // update current group statistics and re-compute costs. - if (groupExpression.children().stream().anyMatch(group -> group.getStatistics() == null) - && groupExpression.getOwnerGroup().getStatistics() == null) { - // if we come here, mean that we have some error in stats calculator and should fix it. - LOG.warn("Nereids try to calculate cost without stats for group expression {}", groupExpression); - return false; - } + boolean hasSuccess = false; + for (List outputChildrenProperties : childrenOutputSpace) { + // Not need to do pruning here because it has been done when we get the + // best expr from the child group + ChildOutputPropertyDeriver childOutputPropertyDeriver + = new ChildOutputPropertyDeriver(outputChildrenProperties); + // the physical properties the group expression support for its parent. + // some cases maybe output some possibilities, for example, shuffle join + // maybe select left shuffle to right, or right shuffle to left, so their + // are 2 output properties possibilities + PhysicalProperties outputProperty + = childOutputPropertyDeriver.getOutputProperties(getConnectContext(), groupExpression); + + // update current group statistics and re-compute costs. + if (groupExpression.children().stream().anyMatch(group -> group.getStatistics() == null) + && groupExpression.getOwnerGroup().getStatistics() == null) { + // if we come here, mean that we have some error in stats calculator and should fix it. + LOG.warn("Nereids try to calculate cost without stats for group expression {}", groupExpression); + } - // recompute cost after adjusting property - curNodeCost = CostCalculator.calculateCost(getConnectContext(), groupExpression, requestChildrenProperties); - groupExpression.setCost(curNodeCost); - curTotalCost = curNodeCost; - for (int i = 0; i < outputChildrenProperties.size(); i++) { - PhysicalProperties childProperties = outputChildrenProperties.get(i); - curTotalCost = CostCalculator.addChildCost( - getConnectContext(), - groupExpression.getPlan(), - curTotalCost, - groupExpression.child(i).getLowestCostPlan(childProperties).get().first, - i); - } + // recompute cost after adjusting property + curNodeCost = CostCalculator.calculateCost(getConnectContext(), groupExpression, requestChildrenProperties); + groupExpression.setCost(curNodeCost); + curTotalCost = curNodeCost; + for (int i = 0; i < outputChildrenProperties.size(); i++) { + PhysicalProperties childProperties = outputChildrenProperties.get(i); + curTotalCost = CostCalculator.addChildCost( + getConnectContext(), + groupExpression.getPlan(), + curTotalCost, + groupExpression.child(i).getLowestCostPlan(childProperties).get().first, + i); + } - // record map { outputProperty -> outputProperty }, { ANY -> outputProperty }, - recordPropertyAndCost(groupExpression, outputProperty, PhysicalProperties.ANY, outputChildrenProperties); - recordPropertyAndCost(groupExpression, outputProperty, outputProperty, outputChildrenProperties); - enforce(outputProperty, outputChildrenProperties); - return true; + // record map { outputProperty -> outputProperty }, { ANY -> outputProperty }, + recordPropertyAndCost(groupExpression, outputProperty, PhysicalProperties.ANY, outputChildrenProperties); + recordPropertyAndCost(groupExpression, outputProperty, outputProperty, outputChildrenProperties); + enforce(outputProperty, outputChildrenProperties); + + hasSuccess = true; + } + return hasSuccess; } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java index 9985b9c567f8fe7..b821ff0de87ae04 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java @@ -65,20 +65,20 @@ * NOTICE: all visitor should call visit(plan, context) at proper place * to process must shuffle except project and filter */ -public class ChildrenPropertiesRegulator extends PlanVisitor { +public class ChildrenPropertiesRegulator extends PlanVisitor>, Void> { private final GroupExpression parent; private final List children; - private final List childrenProperties; + private final List originChildrenProperties; private final List requiredProperties; private final JobContext jobContext; public ChildrenPropertiesRegulator(GroupExpression parent, List children, - List childrenProperties, List requiredProperties, + List originChildrenProperties, List requiredProperties, JobContext jobContext) { this.parent = parent; this.children = children; - this.childrenProperties = childrenProperties; + this.originChildrenProperties = originChildrenProperties; this.requiredProperties = requiredProperties; this.jobContext = jobContext; } @@ -88,34 +88,35 @@ public ChildrenPropertiesRegulator(GroupExpression parent, List * * @return enforce cost. */ - public boolean adjustChildrenProperties() { + public List> adjustChildrenProperties() { return parent.getPlan().accept(this, null); } @Override - public Boolean visit(Plan plan, Void context) { + public List> visit(Plan plan, Void context) { // process must shuffle for (int i = 0; i < children.size(); i++) { - DistributionSpec distributionSpec = childrenProperties.get(i).getDistributionSpec(); + DistributionSpec distributionSpec = originChildrenProperties.get(i).getDistributionSpec(); if (distributionSpec instanceof DistributionSpecMustShuffle) { updateChildEnforceAndCost(i, PhysicalProperties.EXECUTION_ANY); } } - return true; + return ImmutableList.of(originChildrenProperties); } @Override - public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate agg, Void context) { + public List> visitPhysicalHashAggregate( + PhysicalHashAggregate agg, Void context) { if (agg.getGroupByExpressions().isEmpty() && agg.getOutputExpressions().isEmpty()) { - return false; + return ImmutableList.of(); } if (!agg.getAggregateParam().canBeBanned) { - return true; + return ImmutableList.of(originChildrenProperties); } // forbid one phase agg on distribute if (agg.getAggMode() == AggMode.INPUT_TO_RESULT && children.get(0).getPlan() instanceof PhysicalDistribute) { // this means one stage gather agg, usually bad pattern - return false; + return ImmutableList.of(); } // forbid TWO_PHASE_AGGREGATE_WITH_DISTINCT after shuffle @@ -123,7 +124,7 @@ public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate if (agg.getAggMode() == AggMode.INPUT_TO_BUFFER && requiredProperties.get(0).getDistributionSpec() instanceof DistributionSpecHash && children.get(0).getPlan() instanceof PhysicalDistribute) { - return false; + return ImmutableList.of(); } // agg(group by x)-union all(A, B) @@ -132,7 +133,7 @@ public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate if (agg.getAggMode() == AggMode.INPUT_TO_RESULT && children.get(0).getPlan() instanceof PhysicalUnion && !((PhysicalUnion) children.get(0).getPlan()).isDistinct()) { - return false; + return ImmutableList.of(); } // forbid multi distinct opt that bad than multi-stage version when multi-stage can be executed in one fragment if (agg.getAggMode() == AggMode.INPUT_TO_BUFFER || agg.getAggMode() == AggMode.INPUT_TO_RESULT) { @@ -146,7 +147,7 @@ public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate .collect(Collectors.toList()); if (multiDistinctions.size() == 1) { Expression distinctChild = multiDistinctions.get(0).child(0); - DistributionSpec childDistribution = childrenProperties.get(0).getDistributionSpec(); + DistributionSpec childDistribution = originChildrenProperties.get(0).getDistributionSpec(); if (distinctChild instanceof SlotReference && childDistribution instanceof DistributionSpecHash) { SlotReference slotReference = (SlotReference) distinctChild; DistributionSpecHash distributionSpecHash = (DistributionSpecHash) childDistribution; @@ -163,7 +164,7 @@ public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate if ((!groupByColumns.isEmpty() && distributionSpecHash.satisfy(groupByRequire)) || (groupByColumns.isEmpty() && distributionSpecHash.satisfy(distinctChildRequire))) { if (!agg.mustUseMultiDistinctAgg()) { - return false; + return ImmutableList.of(); } } } @@ -171,40 +172,41 @@ public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate // because the second phase of multi-distinct only have one instance, and it is slow generally. if (agg.getOutputExpressions().size() == 1 && agg.getGroupByExpressions().isEmpty() && !agg.mustUseMultiDistinctAgg()) { - return false; + return ImmutableList.of(); } } } // process must shuffle visit(agg, context); // process agg - return true; + return ImmutableList.of(originChildrenProperties); } @Override - public Boolean visitPhysicalPartitionTopN(PhysicalPartitionTopN partitionTopN, Void context) { + public List> visitPhysicalPartitionTopN( + PhysicalPartitionTopN partitionTopN, Void context) { if (partitionTopN.getPhase().isOnePhaseGlobal() && children.get(0).getPlan() instanceof PhysicalDistribute) { // one phase partition topn, if the child is an enforced distribution, discard this // and use two phase candidate. - return false; + return ImmutableList.of(); } else if (partitionTopN.getPhase().isTwoPhaseGlobal() && !(children.get(0).getPlan() instanceof PhysicalDistribute)) { // two phase partition topn, if global's child is not distribution, which means // the local distribution has met final requirement, discard this candidate. - return false; + return ImmutableList.of(); } else { visit(partitionTopN, context); - return true; + return ImmutableList.of(originChildrenProperties); } } @Override - public Boolean visitPhysicalFilter(PhysicalFilter filter, Void context) { + public List> visitPhysicalFilter(PhysicalFilter filter, Void context) { // do not process must shuffle if (children.get(0).getPlan() instanceof PhysicalDistribute) { - return false; + return ImmutableList.of(); } - return true; + return ImmutableList.of(originChildrenProperties); } private boolean isBucketShuffleDownGrade(Plan oneSidePlan, DistributionSpecHash otherSideSpec) { @@ -268,20 +270,20 @@ private boolean couldNotRightBucketShuffleJoin(JoinType joinType, DistributionSp } @Override - public Boolean visitPhysicalHashJoin( + public List> visitPhysicalHashJoin( PhysicalHashJoin hashJoin, Void context) { Preconditions.checkArgument(children.size() == 2, "children.size() != 2"); - Preconditions.checkArgument(childrenProperties.size() == 2); + Preconditions.checkArgument(originChildrenProperties.size() == 2); Preconditions.checkArgument(requiredProperties.size() == 2); // process must shuffle visit(hashJoin, context); // process hash join - DistributionSpec leftDistributionSpec = childrenProperties.get(0).getDistributionSpec(); - DistributionSpec rightDistributionSpec = childrenProperties.get(1).getDistributionSpec(); + DistributionSpec leftDistributionSpec = originChildrenProperties.get(0).getDistributionSpec(); + DistributionSpec rightDistributionSpec = originChildrenProperties.get(1).getDistributionSpec(); // broadcast do not need regular if (rightDistributionSpec instanceof DistributionSpecReplicated) { - return true; + return ImmutableList.of(originChildrenProperties); } // shuffle @@ -301,7 +303,7 @@ public Boolean visitPhysicalHashJoin( if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec, hashJoin.getHashJoinConjuncts())) { // check colocate join with scan - return true; + return ImmutableList.of(originChildrenProperties); } else if (couldNotRightBucketShuffleJoin(hashJoin.getJoinType(), leftHashSpec, rightHashSpec)) { // right anti, right outer, full outer join could not do bucket shuffle join // TODO remove this after we refactor coordinator @@ -339,6 +341,24 @@ public Boolean visitPhysicalHashJoin( (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec())); } else if (leftHashSpec.getShuffleType() == ShuffleType.NATURAL && rightHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED) { + if (SessionVariable.canUseNereidsDistributePlanner()) { + List shuffleToLeft = Lists.newArrayList(originChildrenProperties); + PhysicalProperties enforceShuffleRight = calAnotherSideRequired( + ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec, + (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(), + (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()); + updateChildEnforceAndCost(1, enforceShuffleRight, shuffleToLeft); + + List shuffleToRight = Lists.newArrayList(originChildrenProperties); + PhysicalProperties enforceShuffleLeft = calAnotherSideRequired( + ShuffleType.EXECUTION_BUCKETED, rightHashSpec, leftHashSpec, + (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(), + (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec() + ); + updateChildEnforceAndCost(0, enforceShuffleLeft, shuffleToRight); + return ImmutableList.of(shuffleToLeft, shuffleToRight); + } + // must add enforce because shuffle algorithm is not same between NATURAL and BUCKETED updatedForRight = Optional.of(calAnotherSideRequired( ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec, @@ -349,7 +369,7 @@ public Boolean visitPhysicalHashJoin( if (bothSideShuffleKeysAreSameOrder(leftHashSpec, rightHashSpec, (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(), (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec())) { - return true; + return ImmutableList.of(originChildrenProperties); } updatedForRight = Optional.of(calAnotherSideRequired( ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec, @@ -362,10 +382,25 @@ public Boolean visitPhysicalHashJoin( // TODO: maybe we should check if left child is PhysicalDistribute. // If so add storage bucketed shuffle on left side. Other wise, // add execution bucketed shuffle on right side. - updatedForLeft = Optional.of(calAnotherSideRequired( + // updatedForLeft = Optional.of(calAnotherSideRequired( + // ShuffleType.STORAGE_BUCKETED, rightHashSpec, leftHashSpec, + // (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(), + // (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec())); + List shuffleToLeft = Lists.newArrayList(originChildrenProperties); + PhysicalProperties enforceShuffleRight = calAnotherSideRequired( + ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec, + (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(), + (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()); + updateChildEnforceAndCost(1, enforceShuffleRight, shuffleToLeft); + + List shuffleToRight = Lists.newArrayList(originChildrenProperties); + PhysicalProperties enforceShuffleLeft = calAnotherSideRequired( ShuffleType.STORAGE_BUCKETED, rightHashSpec, leftHashSpec, (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(), - (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec())); + (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec() + ); + updateChildEnforceAndCost(0, enforceShuffleLeft, shuffleToRight); + return ImmutableList.of(shuffleToLeft, shuffleToRight); } else { // legacy coordinator could not do right be selection in this case, // since it always to check the left most node whether olap scan node. @@ -380,7 +415,7 @@ public Boolean visitPhysicalHashJoin( if (bothSideShuffleKeysAreSameOrder(rightHashSpec, leftHashSpec, (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(), (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec())) { - return true; + return ImmutableList.of(originChildrenProperties); } updatedForRight = Optional.of(calAnotherSideRequired( ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec, @@ -427,7 +462,7 @@ public Boolean visitPhysicalHashJoin( if (bothSideShuffleKeysAreSameOrder(rightHashSpec, leftHashSpec, (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(), (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec())) { - return true; + return ImmutableList.of(originChildrenProperties); } if (children.get(0).getPlan() instanceof PhysicalDistribute) { updatedForLeft = Optional.of(calAnotherSideRequired( @@ -445,58 +480,59 @@ public Boolean visitPhysicalHashJoin( updatedForLeft.ifPresent(physicalProperties -> updateChildEnforceAndCost(0, physicalProperties)); updatedForRight.ifPresent(physicalProperties -> updateChildEnforceAndCost(1, physicalProperties)); - return true; + return ImmutableList.of(originChildrenProperties); } @Override - public Boolean visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin nestedLoopJoin, - Void context) { + public List> visitPhysicalNestedLoopJoin( + PhysicalNestedLoopJoin nestedLoopJoin, Void context) { Preconditions.checkArgument(children.size() == 2, String.format("children.size() is %d", children.size())); - Preconditions.checkArgument(childrenProperties.size() == 2); + Preconditions.checkArgument(originChildrenProperties.size() == 2); Preconditions.checkArgument(requiredProperties.size() == 2); // process must shuffle visit(nestedLoopJoin, context); // process nlj - DistributionSpec rightDistributionSpec = childrenProperties.get(1).getDistributionSpec(); + DistributionSpec rightDistributionSpec = originChildrenProperties.get(1).getDistributionSpec(); if (rightDistributionSpec instanceof DistributionSpecStorageGather) { updateChildEnforceAndCost(1, PhysicalProperties.GATHER); } - return true; + return ImmutableList.of(originChildrenProperties); } @Override - public Boolean visitPhysicalProject(PhysicalProject project, Void context) { + public List> visitPhysicalProject(PhysicalProject project, Void context) { // do not process must shuffle if (children.get(0).getPlan() instanceof PhysicalDistribute) { - return false; + return ImmutableList.of(); } - return true; + return ImmutableList.of(originChildrenProperties); } @Override - public Boolean visitPhysicalSetOperation(PhysicalSetOperation setOperation, Void context) { + public List> visitPhysicalSetOperation(PhysicalSetOperation setOperation, Void context) { // process must shuffle visit(setOperation, context); // union with only constant exprs list if (children.isEmpty()) { - return true; + return ImmutableList.of(originChildrenProperties); } // process set operation PhysicalProperties requiredProperty = requiredProperties.get(0); DistributionSpec requiredDistributionSpec = requiredProperty.getDistributionSpec(); if (requiredDistributionSpec instanceof DistributionSpecGather) { - for (int i = 0; i < childrenProperties.size(); i++) { - if (childrenProperties.get(i).getDistributionSpec() instanceof DistributionSpecStorageGather) { + for (int i = 0; i < originChildrenProperties.size(); i++) { + if (originChildrenProperties.get(i).getDistributionSpec() instanceof DistributionSpecStorageGather) { updateChildEnforceAndCost(i, PhysicalProperties.GATHER); } } } else if (requiredDistributionSpec instanceof DistributionSpecAny) { - for (int i = 0; i < childrenProperties.size(); i++) { - if (childrenProperties.get(i).getDistributionSpec() instanceof DistributionSpecStorageAny - || childrenProperties.get(i).getDistributionSpec() instanceof DistributionSpecStorageGather - || childrenProperties.get(i).getDistributionSpec() instanceof DistributionSpecGather - || (childrenProperties.get(i).getDistributionSpec() instanceof DistributionSpecHash - && ((DistributionSpecHash) childrenProperties.get(i).getDistributionSpec()) + for (int i = 0; i < originChildrenProperties.size(); i++) { + PhysicalProperties physicalProperties = originChildrenProperties.get(i); + if (physicalProperties.getDistributionSpec() instanceof DistributionSpecStorageAny + || physicalProperties.getDistributionSpec() instanceof DistributionSpecStorageGather + || physicalProperties.getDistributionSpec() instanceof DistributionSpecGather + || (physicalProperties.getDistributionSpec() instanceof DistributionSpecHash + && ((DistributionSpecHash) physicalProperties.getDistributionSpec()) .getShuffleType() == ShuffleType.NATURAL)) { updateChildEnforceAndCost(i, PhysicalProperties.EXECUTION_ANY); } @@ -504,8 +540,9 @@ public Boolean visitPhysicalSetOperation(PhysicalSetOperation setOperation, Void } else if (requiredDistributionSpec instanceof DistributionSpecHash) { // TODO: should use the most common hash spec as basic DistributionSpecHash basic = (DistributionSpecHash) requiredDistributionSpec; - for (int i = 0; i < childrenProperties.size(); i++) { - DistributionSpecHash current = (DistributionSpecHash) childrenProperties.get(i).getDistributionSpec(); + for (int i = 0; i < originChildrenProperties.size(); i++) { + DistributionSpecHash current + = (DistributionSpecHash) originChildrenProperties.get(i).getDistributionSpec(); if (current.getShuffleType() != ShuffleType.EXECUTION_BUCKETED || !bothSideShuffleKeysAreSameOrder(basic, current, (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(), @@ -518,41 +555,42 @@ public Boolean visitPhysicalSetOperation(PhysicalSetOperation setOperation, Void } } } - return true; + return ImmutableList.of(originChildrenProperties); } @Override - public Boolean visitAbstractPhysicalSort(AbstractPhysicalSort sort, Void context) { + public List> visitAbstractPhysicalSort( + AbstractPhysicalSort sort, Void context) { // process must shuffle visit(sort, context); if (sort.getSortPhase() == SortPhase.GATHER_SORT && sort.child() instanceof PhysicalDistribute) { // forbid gather sort need explicit shuffle - return false; + return ImmutableList.of(); } - return true; + return ImmutableList.of(originChildrenProperties); } @Override - public Boolean visitPhysicalTopN(PhysicalTopN topN, Void context) { + public List> visitPhysicalTopN(PhysicalTopN topN, Void context) { // process must shuffle visit(topN, context); int sortPhaseNum = jobContext.getCascadesContext().getConnectContext().getSessionVariable().sortPhaseNum; // if control sort phase, forbid nothing if (sortPhaseNum == 1 || sortPhaseNum == 2) { - return true; + return ImmutableList.of(originChildrenProperties); } // If child is DistributionSpecGather, topN should forbid two-phase topN if (topN.getSortPhase() == SortPhase.LOCAL_SORT - && childrenProperties.get(0).getDistributionSpec().equals(DistributionSpecGather.INSTANCE)) { - return false; + && originChildrenProperties.get(0).getDistributionSpec().equals(DistributionSpecGather.INSTANCE)) { + return ImmutableList.of(); } // forbid one step topn with distribute as child if (topN.getSortPhase() == SortPhase.GATHER_SORT && children.get(0).getPlan() instanceof PhysicalDistribute) { - return false; + return ImmutableList.of(); } - return true; + return ImmutableList.of(originChildrenProperties); } /** @@ -645,8 +683,14 @@ private PhysicalProperties calAnotherSideRequired(ShuffleType shuffleType, } private void updateChildEnforceAndCost(int index, PhysicalProperties targetProperties) { + updateChildEnforceAndCost(index, targetProperties, originChildrenProperties); + } + + private void updateChildEnforceAndCost( + int index, PhysicalProperties targetProperties, List childrenProperties) { GroupExpression child = children.get(index); - Pair> lowest = child.getLowestCostTable().get(childrenProperties.get(index)); + Pair> lowest + = child.getLowestCostTable().get(childrenProperties.get(index)); PhysicalProperties output = child.getOutputProperties(childrenProperties.get(index)); DistributionSpec target = targetProperties.getDistributionSpec(); updateChildEnforceAndCost(child, output, target, lowest.first); @@ -668,10 +712,10 @@ private void updateChildEnforceAndCost(GroupExpression child, PhysicalProperties GroupExpression enforcer = target.addEnforcer(child.getOwnerGroup()); child.getOwnerGroup().addEnforcer(enforcer); ConnectContext connectContext = jobContext.getCascadesContext().getConnectContext(); - Cost totalCost = CostCalculator.addChildCost(connectContext, enforcer.getPlan(), - CostCalculator.calculateCost(connectContext, enforcer, Lists.newArrayList(childOutput)), - currentCost, - 0); + Cost enforceCost = CostCalculator.calculateCost(connectContext, enforcer, Lists.newArrayList(childOutput)); + enforcer.setCost(enforceCost); + Cost totalCost = CostCalculator.addChildCost( + connectContext, enforcer.getPlan(), enforceCost, currentCost, 0); if (enforcer.updateLowestCostTable(newOutputProperty, Lists.newArrayList(childOutput), totalCost)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java index ef3236690f1b341..d2fbb9905e1aeae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java @@ -74,8 +74,7 @@ protected List insideMachineParallelization( DistributeContext distributeContext) { ConnectContext context = statementContext.getConnectContext(); - boolean useLocalShuffleToAddParallel = fragment.useSerialSource(ConnectContext.get()); - int instanceIndexInFragment = 0; + boolean useLocalShuffleToAddParallel = useLocalShuffleToAddParallel(); List instances = Lists.newArrayList(); for (Entry entry : workerToScanRanges.entrySet()) { DistributedPlanWorker worker = entry.getKey(); @@ -94,64 +93,73 @@ protected List insideMachineParallelization( // for example: two instances int instanceNum = degreeOfParallelism(scanSourceMaxParallel); - List instanceToScanRanges; if (useLocalShuffleToAddParallel) { - // only generate one instance to scan all data, in this step - instanceToScanRanges = scanSource.parallelize( - scanNodes, 1 - ); - - // when data not big, but aggregation too slow, we will use 1 instance to scan data, - // and use more instances (to ***add parallel***) to process aggregate. - // We call it `ignore data distribution` of `share scan`. Backend will know this instances - // share the same ScanSource, and will not scan same data multiple times. - // - // +-------------------------------- same fragment in one host -------------------------------------+ - // | instance1 instance2 instance3 instance4 | - // | \ \ / / | - // | | - // | OlapScanNode | - // |(share scan node, instance1 will scan all data and local shuffle to other local instances | - // | to parallel compute this data) | - // +------------------------------------------------------------------------------------------------+ - ScanSource shareScanSource = instanceToScanRanges.get(0); - - // one scan range generate multiple instances, - // different instances reference the same scan source - int shareScanId = shareScanIdGenerator.getAndIncrement(); - ScanSource emptyShareScanSource = shareScanSource.newEmpty(); - for (int i = 0; i < instanceNum; i++) { - LocalShuffleAssignedJob instance = new LocalShuffleAssignedJob( - instanceIndexInFragment++, shareScanId, i > 0, - context.nextInstanceId(), this, worker, - i == 0 ? shareScanSource : emptyShareScanSource - ); - instances.add(instance); - } + assignLocalShuffleJobs(scanSource, instanceNum, instances, context, worker); } else { - // split the scanRanges to some partitions, one partition for one instance - // for example: - // [ - // scan tbl1: [tablet_10001, tablet_10003], // instance 1 - // scan tbl1: [tablet_10002, tablet_10004] // instance 2 - // ] - instanceToScanRanges = scanSource.parallelize( - scanNodes, instanceNum - ); - - for (ScanSource instanceToScanRange : instanceToScanRanges) { - instances.add( - assignWorkerAndDataSources( - instanceIndexInFragment++, context.nextInstanceId(), worker, instanceToScanRange - ) - ); - } + assignedDefaultJobs(scanSource, instanceNum, instances, context, worker); } } return instances; } + protected boolean useLocalShuffleToAddParallel() { + return fragment.useSerialSource(ConnectContext.get()); + } + + protected void assignedDefaultJobs(ScanSource scanSource, int instanceNum, List instances, + ConnectContext context, DistributedPlanWorker worker) { + // split the scanRanges to some partitions, one partition for one instance + // for example: + // [ + // scan tbl1: [tablet_10001, tablet_10003], // instance 1 + // scan tbl1: [tablet_10002, tablet_10004] // instance 2 + // ] + List instanceToScanRanges = scanSource.parallelize(scanNodes, instanceNum); + + for (ScanSource instanceToScanRange : instanceToScanRanges) { + instances.add( + assignWorkerAndDataSources( + instances.size(), context.nextInstanceId(), worker, instanceToScanRange + ) + ); + } + } + + protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, List instances, + ConnectContext context, DistributedPlanWorker worker) { + // only generate one instance to scan all data, in this step + List instanceToScanRanges = scanSource.parallelize(scanNodes, 1); + + // when data not big, but aggregation too slow, we will use 1 instance to scan data, + // and use more instances (to ***add parallel***) to process aggregate. + // We call it `ignore data distribution` of `share scan`. Backend will know this instances + // share the same ScanSource, and will not scan same data multiple times. + // + // +-------------------------------- same fragment in one host -------------------------------------+ + // | instance1 instance2 instance3 instance4 | + // | \ \ / / | + // | | + // | OlapScanNode | + // |(share scan node, instance1 will scan all data and local shuffle to other local instances | + // | to parallel compute this data) | + // +------------------------------------------------------------------------------------------------+ + ScanSource shareScanSource = instanceToScanRanges.get(0); + + // one scan range generate multiple instances, + // different instances reference the same scan source + int shareScanId = shareScanIdGenerator.getAndIncrement(); + ScanSource emptyShareScanSource = shareScanSource.newEmpty(); + for (int i = 0; i < instanceNum; i++) { + LocalShuffleAssignedJob instance = new LocalShuffleAssignedJob( + instances.size(), shareScanId, i > 0, + context.nextInstanceId(), this, worker, + i == 0 ? shareScanSource : emptyShareScanSource + ); + instances.add(instance); + } + } + protected int degreeOfParallelism(int maxParallel) { Preconditions.checkArgument(maxParallel > 0, "maxParallel must be positive"); if (!fragment.getDataPartition().isPartitioned()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java new file mode 100644 index 000000000000000..443acb50d78c787 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.distribute.worker.job; + +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.thrift.TUniqueId; + +import com.google.common.collect.ImmutableSet; + +import java.util.Set; + +/** LocalShuffleBucketJoinAssignedJob */ +public class LocalShuffleBucketJoinAssignedJob extends LocalShuffleAssignedJob { + private volatile Set assignedJoinBucketIndexes; + + public LocalShuffleBucketJoinAssignedJob( + int indexInUnassignedJob, int shareScanId, boolean receiveDataFromLocal, + TUniqueId instanceId, UnassignedJob unassignedJob, + DistributedPlanWorker worker, ScanSource scanSource, + Set assignedJoinBucketIndexes) { + super(indexInUnassignedJob, shareScanId, receiveDataFromLocal, instanceId, unassignedJob, worker, scanSource); + this.assignedJoinBucketIndexes = Utils.fastToImmutableSet(assignedJoinBucketIndexes); + } + + public Set getAssignedJoinBucketIndexes() { + return assignedJoinBucketIndexes; + } + + public void addAssignedJoinBucketIndexes(Set joinBucketIndexes) { + this.assignedJoinBucketIndexes = ImmutableSet.builder() + .addAll(assignedJoinBucketIndexes) + .addAll(joinBucketIndexes) + .build(); + } + + @Override + protected String formatOtherString() { + return ",\n assigned join buckets: " + assignedJoinBucketIndexes; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/StaticAssignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/StaticAssignedJob.java index 75849ad8146737b..77494d621ee4f0c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/StaticAssignedJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/StaticAssignedJob.java @@ -89,6 +89,7 @@ public String toString(boolean showUnassignedJob) { } return str + .append(formatOtherString()) .append(",\n scanSource: " + formatScanSourceString()) .append("\n)") .toString(); @@ -108,6 +109,10 @@ protected String formatScanSourceString() { return scanSourceString.toString(); } + protected String formatOtherString() { + return ""; + } + @Override public int hashCode() { return indexInUnassignedJob; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java index 70a91ca2b30f326..f90fe7ea6e2c211 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java @@ -27,6 +27,7 @@ import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorkerManager; import org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector; +import org.apache.doris.nereids.util.Utils; import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.HashJoinNode; import org.apache.doris.planner.OlapScanNode; @@ -164,6 +165,34 @@ protected List insideMachineParallelization( return assignedJobs; } + @Override + protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, List instances, + ConnectContext context, DistributedPlanWorker worker) { + // only generate one instance to scan all data, in this step + List assignJoinBuckets = scanSource.parallelize( + scanNodes, instanceNum + ); + + // one scan range generate multiple instances, + // different instances reference the same scan source + int shareScanId = shareScanIdGenerator.getAndIncrement(); + + BucketScanSource shareScanSource = (BucketScanSource) scanSource; + ScanSource emptyShareScanSource = shareScanSource.newEmpty(); + + for (int i = 0; i < assignJoinBuckets.size(); i++) { + Set assignedJoinBuckets + = ((BucketScanSource) assignJoinBuckets.get(i)).bucketIndexToScanNodeToTablets.keySet(); + LocalShuffleBucketJoinAssignedJob instance = new LocalShuffleBucketJoinAssignedJob( + instances.size(), shareScanId, i > 0, + context.nextInstanceId(), this, worker, + i == 0 ? shareScanSource : emptyShareScanSource, + Utils.fastToImmutableSet(assignedJoinBuckets) + ); + instances.add(instance); + } + } + private boolean shouldFillUpInstances(List hashJoinNodes) { for (HashJoinNode hashJoinNode : hashJoinNodes) { if (!hashJoinNode.isBucketShuffle()) { @@ -198,6 +227,7 @@ private List fillUpInstances(List instances) { List newInstances = new ArrayList<>(instances); for (Entry> workerToBuckets : missingBuckets.asMap().entrySet()) { Map> scanEmptyBuckets = Maps.newLinkedHashMap(); + Set assignedJoinBuckets = Utils.fastToImmutableSet(workerToBuckets.getValue()); for (Integer bucketIndex : workerToBuckets.getValue()) { Map scanTableWithEmptyData = Maps.newLinkedHashMap(); for (ScanNode scanNode : scanNodes) { @@ -218,12 +248,16 @@ private List fillUpInstances(List instances) { BucketScanSource bucketScanSource = (BucketScanSource) newInstance.getScanSource(); bucketScanSource.bucketIndexToScanNodeToTablets.putAll(scanEmptyBuckets); mergedBucketsInSameWorkerInstance = true; + + LocalShuffleBucketJoinAssignedJob instance = (LocalShuffleBucketJoinAssignedJob) newInstance; + instance.addAssignedJoinBucketIndexes(assignedJoinBuckets); } } if (!mergedBucketsInSameWorkerInstance) { - fillUpInstance = new LocalShuffleAssignedJob( + fillUpInstance = new LocalShuffleBucketJoinAssignedJob( newInstances.size(), shareScanIdGenerator.getAndIncrement(), - false, context.nextInstanceId(), this, worker, scanSource + false, context.nextInstanceId(), this, worker, scanSource, + assignedJoinBuckets ); } } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java index 42cf08fb2e3b188..47c01ef8eb34a60 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java @@ -82,11 +82,13 @@ public void setRuntimeFilterThriftParams(TRuntimeFilterParams runtimeFilterParam target.address, address -> { TRuntimeFilterTargetParamsV2 params = new TRuntimeFilterTargetParamsV2(); params.target_fragment_instance_addr = address; + params.target_fragment_ids = new ArrayList<>(); + // required field params.target_fragment_instance_ids = new ArrayList<>(); return params; }); - targetParams.target_fragment_instance_ids.add(target.instanceId); + targetParams.target_fragment_ids.add(target.fragmentId); } runtimeFilterParams.putToRidToTargetParamv2( @@ -95,7 +97,8 @@ public void setRuntimeFilterThriftParams(TRuntimeFilterParams runtimeFilterParam } else { List targetParams = Lists.newArrayList(); for (RuntimeFilterTarget target : targets) { - targetParams.add(new TRuntimeFilterTargetParams(target.instanceId, target.address)); + // Instance id make no sense if this runtime filter doesn't have remote targets. + targetParams.add(new TRuntimeFilterTargetParams(new TUniqueId(), target.address)); } runtimeFilterParams.putToRidToTargetParam(rf.getFilterId().asInt(), targetParams); } @@ -135,7 +138,7 @@ public static RuntimeFiltersThriftBuilder compute( BackendWorker backendWorker = (BackendWorker) instanceJob.getAssignedWorker(); Backend backend = backendWorker.getBackend(); targetFragments.add(new RuntimeFilterTarget( - instanceJob.instanceId(), + fragment.getFragmentId().asInt(), new TNetworkAddress(backend.getHost(), backend.getBrpcPort()) )); } @@ -158,11 +161,11 @@ public static RuntimeFiltersThriftBuilder compute( } public static class RuntimeFilterTarget { - public final TUniqueId instanceId; + public final int fragmentId; public final TNetworkAddress address; - public RuntimeFilterTarget(TUniqueId instanceId, TNetworkAddress address) { - this.instanceId = instanceId; + public RuntimeFilterTarget(int fragmentId, TNetworkAddress address) { + this.fragmentId = fragmentId; this.address = address; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java index c04861cbf436936..f0e3febe1928546 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java @@ -26,6 +26,7 @@ import org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource; import org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource; import org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleAssignedJob; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleBucketJoinAssignedJob; import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanRanges; import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanSource; import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedScanBucketOlapTableJob; @@ -498,14 +499,18 @@ private static Map computeBucketIdToInstanceId( if (instanceJob.getAssignedWorker().id() != worker.id()) { continue; } - if (instanceJob instanceof LocalShuffleAssignedJob - && ((LocalShuffleAssignedJob) instanceJob).receiveDataFromLocal) { - continue; - } + Integer instanceIndex = instanceToIndex.get(instanceJob); - BucketScanSource bucketScanSource = (BucketScanSource) instanceJob.getScanSource(); - for (Integer bucketIndex : bucketScanSource.bucketIndexToScanNodeToTablets.keySet()) { - bucketIdToInstanceId.put(bucketIndex, instanceIndex); + if (instanceJob instanceof LocalShuffleBucketJoinAssignedJob) { + LocalShuffleBucketJoinAssignedJob assignedJob = (LocalShuffleBucketJoinAssignedJob) instanceJob; + for (Integer bucketIndex : assignedJob.getAssignedJoinBucketIndexes()) { + bucketIdToInstanceId.put(bucketIndex, instanceIndex); + } + } else { + BucketScanSource bucketScanSource = (BucketScanSource) instanceJob.getScanSource(); + for (Integer bucketIndex : bucketScanSource.bucketIndexToScanNodeToTablets.keySet()) { + bucketIdToInstanceId.put(bucketIndex, instanceIndex); + } } } return bucketIdToInstanceId;