diff --git a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html index e59a1289b10c4..ae2caa7aad89a 100644 --- a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html +++ b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html @@ -85,7 +85,7 @@
table.optimizer.reuse-sink-enabled

Batch Streaming - false + true Boolean When it is true, the optimizer will try to find out duplicated table sinks and reuse them. This works only when table.optimizer.reuse-sub-plan-enabled is true. diff --git a/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q b/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q index 39230d129216b..afee14890fa60 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q +++ b/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q @@ -73,9 +73,9 @@ INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE')); END; !output| result || result || == Abstract Syntax Tree == LogicalSink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1]) +- LogicalProject(EXPR$0=[$0], EXPR$1=[$1]) @@ -87,21 +87,17 @@ LogicalSink(table=[default_catalog.default_database.StreamingTable], fields=[EXP == Optimized Physical Plan == Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1]) -+- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]]) - -Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1]) -+- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]]) ++- Union(all=[true], union=[EXPR$0, EXPR$1]) + :- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]]) + +- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]]) == Optimized Execution Plan == -Values(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])(reuse_id=[1]) - Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1]) -+- Reused(reference_id=[1]) - -Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1]) -+- Reused(reference_id=[1]) ++- Union(all=[true], union=[EXPR$0, EXPR$1]) + :- Values(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])(reuse_id=[1]) + +- Reused(reference_id=[1]) |row in set !ok diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q b/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q index cd9a33aca583c..486fe3d43cc98 100644 --- a/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q +++ b/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q @@ -105,19 +105,15 @@ LogicalSink(table=[default_catalog.default_database.StreamingTable], fields=[EXP == Optimized Physical Plan == Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1]) -+- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]]) - -Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1]) -+- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]]) ++- Union(all=[true], union=[EXPR$0, EXPR$1]) + :- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]]) + +- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]]) == Optimized Execution Plan == -Values(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])(reuse_id=[1]) - Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1]) -+- Reused(reference_id=[1]) - -Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1]) -+- Reused(reference_id=[1]) ++- Union(all=[true], union=[EXPR$0, EXPR$1]) + :- Values(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])(reuse_id=[1]) + +- Reused(reference_id=[1]) !ok EXECUTE STATEMENT SET BEGIN diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java index 23d15d0f978d8..62d085d5cd6e3 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java @@ -109,7 +109,7 @@ public class OptimizerConfigOptions { public static final ConfigOption TABLE_OPTIMIZER_REUSE_SINK_ENABLED = key("table.optimizer.reuse-sink-enabled") .booleanType() - .defaultValue(false) + .defaultValue(true) .withDescription( "When it is true, the optimizer will try to find out duplicated table sinks and " + "reuse them. This works only when " diff --git a/flink-table/flink-table-planner/src/test/resources/explain/testStatementSetExecutionExplain.out b/flink-table/flink-table-planner/src/test/resources/explain/testStatementSetExecutionExplain.out index edee1984dddbd..d4e6a87a3d1c1 100644 --- a/flink-table/flink-table-planner/src/test/resources/explain/testStatementSetExecutionExplain.out +++ b/flink-table/flink-table-planner/src/test/resources/explain/testStatementSetExecutionExplain.out @@ -9,17 +9,15 @@ LogicalSink(table=[default_catalog.default_database.MySink], fields=[first]) == Optimized Physical Plan == Sink(table=[default_catalog.default_database.MySink], fields=[last]) -+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: last)]]], fields=[last]) - -Sink(table=[default_catalog.default_database.MySink], fields=[first]) -+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first]) ++- Union(all=[true], union=[last]) + :- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: last)]]], fields=[last]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first]) == Optimized Execution Plan == Sink(table=[default_catalog.default_database.MySink], fields=[last]) -+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: last)]]], fields=[last]) - -Sink(table=[default_catalog.default_database.MySink], fields=[first]) -+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first]) ++- Union(all=[true], union=[last]) + :- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: last)]]], fields=[last]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first]) == Physical Execution Plan == { @@ -51,17 +49,6 @@ Sink(table=[default_catalog.default_database.MySink], fields=[first]) "ship_strategy" : "FORWARD", "side" : "second" } ] - }, { - "id" : , - "type" : "StreamingFileWriter", - "pact" : "Operator", - "contents" : "StreamingFileWriter", - "parallelism" : 1, - "predecessors" : [ { - "id" : , - "ship_strategy" : "FORWARD", - "side" : "second" - } ] }, { "id" : , "type" : "Source: Custom File source", @@ -100,14 +87,7 @@ Sink(table=[default_catalog.default_database.MySink], fields=[first]) "id" : , "ship_strategy" : "FORWARD", "side" : "second" - } ] - }, { - "id" : , - "type" : "end: Writer", - "pact" : "Operator", - "contents" : "end: Writer", - "parallelism" : 1, - "predecessors" : [ { + }, { "id" : , "ship_strategy" : "FORWARD", "side" : "second" diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml index 9813db3c568d7..1ba58f298ae78 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml @@ -157,11 +157,10 @@ LogicalSink(table=[default_catalog.default_database.MySink], fields=[a, b, c]) @@ -196,13 +195,10 @@ LogicalSink(table=[default_catalog.default_database.MySink], fields=[a, b, c]) @@ -221,10 +217,9 @@ LogicalSink(table=[default_catalog.default_database.MySink], fields=[a, b, c]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml index 513a568b5aaf8..bd5a6a20be2d1 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml @@ -367,32 +367,31 @@ LogicalSink(table=[default_catalog.default_database.sink2], fields=[number, word (word, 'a')], changelogMode=[I,UB,UA]) - +- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA]) - :- Calc(select=[+(number, 1) AS number, word], changelogMode=[I,UB,UA]) - : +- Calc(select=[number, word], changelogMode=[I,UB,UA]) - : +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA]) - : +- Exchange(distribution=[hash[word]], changelogMode=[I]) - : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I]) - +- Calc(select=[-(number, 1) AS number, word], changelogMode=[I,UB,UA]) - +- Calc(select=[number, word], changelogMode=[I,UB,UA]) - +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA]) - +- Exchange(distribution=[hash[word]], changelogMode=[I]) - +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I]) - -Sink(table=[default_catalog.default_database.sink1], fields=[number, word], changelogMode=[NONE]) -+- Calc(select=[number, word], where=[<(word, 'a')], changelogMode=[I,UB,UA]) - +- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA]) - :- Calc(select=[+(number, 1) AS number, word], changelogMode=[I,UB,UA]) - : +- Calc(select=[number, word], changelogMode=[I,UB,UA]) - : +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA]) - : +- Exchange(distribution=[hash[word]], changelogMode=[I]) - : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I]) - +- Calc(select=[-(number, 1) AS number, word], changelogMode=[I,UB,UA]) - +- Calc(select=[number, word], changelogMode=[I,UB,UA]) - +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA]) - +- Exchange(distribution=[hash[word]], changelogMode=[I]) - +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I]) ++- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA]) + :- Calc(select=[number, word], where=[>(word, 'a')], changelogMode=[I,UB,UA]) + : +- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA]) + : :- Calc(select=[+(number, 1) AS number, word], changelogMode=[I,UB,UA]) + : : +- Calc(select=[number, word], changelogMode=[I,UB,UA]) + : : +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA]) + : : +- Exchange(distribution=[hash[word]], changelogMode=[I]) + : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I]) + : +- Calc(select=[-(number, 1) AS number, word], changelogMode=[I,UB,UA]) + : +- Calc(select=[number, word], changelogMode=[I,UB,UA]) + : +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA]) + : +- Exchange(distribution=[hash[word]], changelogMode=[I]) + : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I]) + +- Calc(select=[number, word], where=[<(word, 'a')], changelogMode=[I,UB,UA]) + +- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA]) + :- Calc(select=[+(number, 1) AS number, word], changelogMode=[I,UB,UA]) + : +- Calc(select=[number, word], changelogMode=[I,UB,UA]) + : +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA]) + : +- Exchange(distribution=[hash[word]], changelogMode=[I]) + : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I]) + +- Calc(select=[-(number, 1) AS number, word], changelogMode=[I,UB,UA]) + +- Calc(select=[number, word], changelogMode=[I,UB,UA]) + +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA]) + +- Exchange(distribution=[hash[word]], changelogMode=[I]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I]) Sink(table=[default_catalog.default_database.sink2], fields=[number, word], changelogMode=[NONE]) +- Calc(select=[number, word], changelogMode=[I,UB,UA]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml index e71a198ba6e45..b18b5a61bcf22 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml @@ -129,16 +129,13 @@ LogicalSink(table=[default_catalog.default_database.t_keyed_sink], fields=[name, @@ -179,15 +176,12 @@ LogicalSink(table=[default_catalog.default_database.t_sink], fields=[out]) @@ -252,15 +246,12 @@ LogicalSink(table=[default_catalog.default_database.t_keyed_sink], fields=[name, @@ -286,17 +277,14 @@ LogicalSink(table=[default_catalog.default_database.t_keyed_sink], fields=[name, diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml index 9f1e3b17485d5..ca722ace65392 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml @@ -2209,22 +2209,19 @@ LogicalSink(table=[default_catalog.default_database.s1], fields=[window_start, w @@ -2254,24 +2251,21 @@ LogicalSink(table=[default_catalog.default_database.s1], fields=[window_start, w diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala index 6055fcb363b81..ce02921776621 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala @@ -584,10 +584,7 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) { val tableResult = stmtSet.execute() // only check the schema - checkInsertTableResult( - tableResult, - "default_catalog.default_database.MySink_1", - "default_catalog.default_database.MySink_2") + checkInsertTableResult(tableResult, "default_catalog.default_database.MySink_1") } @TestTemplate