Skip to content

Commit

Permalink
[FLINK-34910] Fix optimizing window join (apache#24549)
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys authored Mar 21, 2024
1 parent 8ee552a commit 709bf93
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,22 @@

package org.apache.flink.table.planner.plan.rules.logical;

import org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalConstantTableFunctionScanRule;

import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
import org.apache.calcite.rex.RexUtil;
import org.immutables.value.Value;

/**
* Rule that rewrites {@link org.apache.calcite.rel.core.Join} on {@link
* org.apache.calcite.rel.core.TableFunctionScan} to {@link org.apache.calcite.rel.core.Correlate}.
*
* <p>Note: The rule was implemented so that we can apply {@link
* StreamPhysicalConstantTableFunctionScanRule} later.
*/
@Value.Enclosing
public class JoinTableFunctionScanToCorrelateRule
Expand Down Expand Up @@ -68,6 +74,12 @@ public interface Config extends RelRule.Config {
b2.operand(
LogicalTableFunctionScan
.class)
.predicate(
scan ->
!RexUtil
.containsInputRef(
scan
.getCall()))
.noInputs()))
.description("JoinTableFunctionScanToCorrelateRule")
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2009,6 +2009,50 @@ Calc(select=[a, b, c, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, windo
]]>
</Resource>
</TestCase>
<TestCase name="testWindowJoinWithoutProjections">
<Resource name="sql">
<![CDATA[
SELECT *
FROM
TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) AS L
JOIN
TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) AS R
ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], window_start=[$5], window_end=[$6], window_time=[$7], a0=[$8], b0=[$9], c0=[$10], rowtime0=[$11], proctime0=[$12], window_start0=[$13], window_end0=[$14], window_time0=[$15])
+- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], joinType=[inner])
:- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($3), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
: +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
: +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
: +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()])
: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($3), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, window_start, window_end, window_time, a0, b0, c0, rowtime0, PROCTIME_MATERIALIZE(proctime0) AS proctime0, window_start0, window_end0, window_time0])
+- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, rowtime, proctime, window_start, window_end, window_time, a0, b0, c0, rowtime0, proctime0, window_start0, window_end0, window_time0])
:- Exchange(distribution=[hash[a]])
: +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min])])
: +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
: +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime])
+- Exchange(distribution=[hash[a]])
+- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min])])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, rowtime])
]]>
</Resource>
</TestCase>
<TestCase name="testSimplifyTumbleWindowTVFBeforeWindowJoinWithLeftCalc">
<Resource name="sql">
<![CDATA[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,20 @@ class WindowJoinTest extends TableTestBase {
util.verifyRelPlan(sql)
}

@Test
def testWindowJoinWithoutProjections(): Unit = {
val sql =
"""
|SELECT *
|FROM
| TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) AS L
|JOIN
| TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) AS R
|ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a
""".stripMargin
util.verifyRelPlan(sql)
}

@Test
def testUnsupportedWindowTVF_TumbleOnProctime(): Unit = {
val sql =
Expand Down

0 comments on commit 709bf93

Please sign in to comment.