From 709bf93534fcdfd2b4452667af450f1748bf1ccc Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Thu, 21 Mar 2024 17:04:22 +0100 Subject: [PATCH] [FLINK-34910] Fix optimizing window join (#24549) --- .../JoinTableFunctionScanToCorrelateRule.java | 12 +++++ .../plan/stream/sql/join/WindowJoinTest.xml | 44 +++++++++++++++++++ .../plan/stream/sql/join/WindowJoinTest.scala | 14 ++++++ 3 files changed, 70 insertions(+) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinTableFunctionScanToCorrelateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinTableFunctionScanToCorrelateRule.java index b14054906c129..bd5b783b200c4 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinTableFunctionScanToCorrelateRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinTableFunctionScanToCorrelateRule.java @@ -18,16 +18,22 @@ package org.apache.flink.table.planner.plan.rules.logical; +import org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalConstantTableFunctionScanRule; + import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelRule; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.logical.LogicalTableFunctionScan; +import org.apache.calcite.rex.RexUtil; import org.immutables.value.Value; /** * Rule that rewrites {@link org.apache.calcite.rel.core.Join} on {@link * org.apache.calcite.rel.core.TableFunctionScan} to {@link org.apache.calcite.rel.core.Correlate}. + * + *

Note: The rule was implemented so that we can apply {@link + * StreamPhysicalConstantTableFunctionScanRule} later. */ @Value.Enclosing public class JoinTableFunctionScanToCorrelateRule @@ -68,6 +74,12 @@ public interface Config extends RelRule.Config { b2.operand( LogicalTableFunctionScan .class) + .predicate( + scan -> + !RexUtil + .containsInputRef( + scan + .getCall())) .noInputs())) .description("JoinTableFunctionScanToCorrelateRule") .build(); diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml index 12cb68df0eeac..a0a733641c374 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml @@ -2009,6 +2009,50 @@ Calc(select=[a, b, c, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, windo ]]> + + + + + + + + + + +