Skip to content

Commit

Permalink
[BugFix] Cached fragment misuses exogenous runtime filter (#51150)
Browse files Browse the repository at this point in the history
Signed-off-by: satanson <[email protected]>
(cherry picked from commit 427d13b)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/planner/PlanFragment.java
#	fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java
#	fe/fe-core/src/test/java/com/starrocks/sql/plan/ReplayFromDumpTest.java
  • Loading branch information
satanson authored and mergify[bot] committed Sep 20, 2024
1 parent c715428 commit bc2e16d
Show file tree
Hide file tree
Showing 6 changed files with 1,412 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ public boolean normalize() {
// Get leftmost path
List<PlanNode> leftNodesTopDown = Lists.newArrayList();
for (PlanNode currNode = root; currNode != null && currNode.getFragment() == fragment;
currNode = currNode.getChild(0)) {
currNode = currNode.getChild(0)) {
leftNodesTopDown.add(currNode);
}

Expand Down Expand Up @@ -774,15 +774,17 @@ public boolean normalize() {
// Not cacheable unless alien GRF(s) take effects on this PlanFragment.
// The alien GRF(s) mean the GRF(S) that not created by PlanNodes of the subtree rooted at
// the PlanFragment.planRoot.

Set<Integer> grfBuilders =
fragment.getProbeRuntimeFilters().values().stream().filter(RuntimeFilterDescription::isHasRemoteTargets)
.map(RuntimeFilterDescription::getBuildPlanNodeId).collect(Collectors.toSet());
if (!grfBuilders.isEmpty()) {
List<PlanFragment> rightSiblings = Lists.newArrayList();
collectRightSiblingFragments(root, rightSiblings, Sets.newHashSet());
Set<Integer> acceptableGrfBuilders = rightSiblings.stream().flatMap(
frag -> frag.getBuildRuntimeFilters().values().stream().map(
RuntimeFilterDescription::getBuildPlanNodeId)).collect(Collectors.toSet());
Set<Integer> acceptableGrfBuilders = rightSiblings.stream()
.flatMap(frag -> frag.getBuildRuntimeFilters().values().stream())
.map(RuntimeFilterDescription::getBuildPlanNodeId)
.collect(Collectors.toSet());
boolean hasAlienGrf = !Sets.difference(grfBuilders, acceptableGrfBuilders).isEmpty();
if (hasAlienGrf) {
return false;
Expand Down
60 changes: 36 additions & 24 deletions fe/fe-core/src/main/java/com/starrocks/planner/PlanFragment.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,17 @@

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
<<<<<<< HEAD
=======
import java.util.function.Consumer;
import java.util.function.Function;
>>>>>>> 427d13b300 ([BugFix] Cached fragment misuses exogenous runtime filter (#51150))
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -146,8 +152,15 @@ public class PlanFragment extends TreeNode<PlanFragment> {
protected boolean assignScanRangesPerDriverSeq = false;
protected boolean withLocalShuffle = false;

<<<<<<< HEAD
protected final Map<Integer, RuntimeFilterDescription> buildRuntimeFilters = Maps.newTreeMap();
protected final Map<Integer, RuntimeFilterDescription> probeRuntimeFilters = Maps.newTreeMap();
=======
protected double fragmentCost;

protected Map<Integer, RuntimeFilterDescription> buildRuntimeFilters = Maps.newHashMap();
protected Map<Integer, RuntimeFilterDescription> probeRuntimeFilters = Maps.newHashMap();
>>>>>>> 427d13b300 ([BugFix] Cached fragment misuses exogenous runtime filter (#51150))

protected List<Pair<Integer, ColumnDict>> queryGlobalDicts = Lists.newArrayList();
protected List<Pair<Integer, ColumnDict>> loadGlobalDicts = Lists.newArrayList();
Expand Down Expand Up @@ -601,34 +614,33 @@ public boolean isTransferQueryStatisticsWithEveryBatch() {
return transferQueryStatisticsWithEveryBatch;
}

public void collectBuildRuntimeFilters(PlanNode root) {
if (root instanceof ExchangeNode) {
return;
}

if (root instanceof RuntimeFilterBuildNode) {
RuntimeFilterBuildNode rfBuildNode = (RuntimeFilterBuildNode) root;
for (RuntimeFilterDescription description : rfBuildNode.getBuildRuntimeFilters()) {
buildRuntimeFilters.put(description.getFilterId(), description);
}
}

for (PlanNode node : root.getChildren()) {
collectBuildRuntimeFilters(node);
}
public void collectBuildRuntimeFilters() {
Map<Integer, RuntimeFilterDescription> filters = Maps.newHashMap();
collectNodes().stream()
.filter(node -> node instanceof RuntimeFilterBuildNode)
.flatMap(node -> ((RuntimeFilterBuildNode) node).getBuildRuntimeFilters().stream())
.forEach(desc -> filters.put(desc.getFilterId(), desc));
buildRuntimeFilters = filters;
}

public void collectProbeRuntimeFilters(PlanNode root) {
for (RuntimeFilterDescription description : root.getProbeRuntimeFilters()) {
probeRuntimeFilters.put(description.getFilterId(), description);
}
public void collectProbeRuntimeFilters() {
Map<Integer, RuntimeFilterDescription> filters = Maps.newHashMap();
collectNodes().stream()
.flatMap(node -> node.getProbeRuntimeFilters().stream())
.forEach(desc -> filters.put(desc.getFilterId(), desc));
probeRuntimeFilters = filters;
}

if (root instanceof ExchangeNode) {
return;
}
public List<PlanNode> collectNodes() {
List<PlanNode> nodes = Lists.newArrayList();
collectNodesImpl(getPlanRoot(), nodes);
return nodes;
}

for (PlanNode node : root.getChildren()) {
collectProbeRuntimeFilters(node);
private void collectNodesImpl(PlanNode root, List<PlanNode> nodes) {
nodes.add(root);
if (!(root instanceof ExchangeNode)) {
root.getChildren().forEach(child -> collectNodesImpl(child, nodes));
}
}

Expand Down
Loading

0 comments on commit bc2e16d

Please sign in to comment.