diff --git a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
index e59a1289b10c4..ae2caa7aad89a 100644
--- a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
@@ -85,7 +85,7 @@
table.optimizer.reuse-sink-enabled Batch Streaming |
- false |
+ true |
Boolean |
When it is true, the optimizer will try to find out duplicated table sinks and reuse them. This works only when table.optimizer.reuse-sub-plan-enabled is true. |
diff --git a/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q b/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
index 39230d129216b..afee14890fa60 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql_multi/statement_set.q
@@ -73,9 +73,9 @@ INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'),
INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
END;
!output

-| result |


+| result |

| == Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1])
+- LogicalProject(EXPR$0=[$0], EXPR$1=[$1])
@@ -87,21 +87,17 @@ LogicalSink(table=[default_catalog.default_database.StreamingTable], fields=[EXP
== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1])
-+- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])
-
-Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1])
-+- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])
++- Union(all=[true], union=[EXPR$0, EXPR$1])
+ :- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])
+ +- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])
== Optimized Execution Plan ==
-Values(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])(reuse_id=[1])
-
Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1])
-+- Reused(reference_id=[1])
-
-Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1])
-+- Reused(reference_id=[1])
++- Union(all=[true], union=[EXPR$0, EXPR$1])
+ :- Values(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])(reuse_id=[1])
+ +- Reused(reference_id=[1])
|


1 row in set
!ok
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q b/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
index cd9a33aca583c..486fe3d43cc98 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
@@ -105,19 +105,15 @@ LogicalSink(table=[default_catalog.default_database.StreamingTable], fields=[EXP
== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1])
-+- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])
-
-Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1])
-+- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])
++- Union(all=[true], union=[EXPR$0, EXPR$1])
+ :- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])
+ +- Values(type=[RecordType(INTEGER EXPR$0, VARCHAR(11) EXPR$1)], tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])
== Optimized Execution Plan ==
-Values(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])(reuse_id=[1])
-
Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1])
-+- Reused(reference_id=[1])
-
-Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EXPR$1])
-+- Reused(reference_id=[1])
++- Union(all=[true], union=[EXPR$0, EXPR$1])
+ :- Values(tuples=[[{ 1, _UTF-16LE'Hello World' }, { 2, _UTF-16LE'Hi' }, { 2, _UTF-16LE'Hi' }, { 3, _UTF-16LE'Hello' }, { 3, _UTF-16LE'World' }, { 4, _UTF-16LE'ADD' }, { 5, _UTF-16LE'LINE' }]])(reuse_id=[1])
+ +- Reused(reference_id=[1])
!ok
EXECUTE STATEMENT SET BEGIN
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
index 23d15d0f978d8..62d085d5cd6e3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
@@ -109,7 +109,7 @@ public class OptimizerConfigOptions {
public static final ConfigOption TABLE_OPTIMIZER_REUSE_SINK_ENABLED =
key("table.optimizer.reuse-sink-enabled")
.booleanType()
- .defaultValue(false)
+ .defaultValue(true)
.withDescription(
"When it is true, the optimizer will try to find out duplicated table sinks and "
+ "reuse them. This works only when "
diff --git a/flink-table/flink-table-planner/src/test/resources/explain/testStatementSetExecutionExplain.out b/flink-table/flink-table-planner/src/test/resources/explain/testStatementSetExecutionExplain.out
index edee1984dddbd..d4e6a87a3d1c1 100644
--- a/flink-table/flink-table-planner/src/test/resources/explain/testStatementSetExecutionExplain.out
+++ b/flink-table/flink-table-planner/src/test/resources/explain/testStatementSetExecutionExplain.out
@@ -9,17 +9,15 @@ LogicalSink(table=[default_catalog.default_database.MySink], fields=[first])
== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.MySink], fields=[last])
-+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: last)]]], fields=[last])
-
-Sink(table=[default_catalog.default_database.MySink], fields=[first])
-+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first])
++- Union(all=[true], union=[last])
+ :- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: last)]]], fields=[last])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first])
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.MySink], fields=[last])
-+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: last)]]], fields=[last])
-
-Sink(table=[default_catalog.default_database.MySink], fields=[first])
-+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first])
++- Union(all=[true], union=[last])
+ :- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: last)]]], fields=[last])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CsvTableSource(read fields: first)]]], fields=[first])
== Physical Execution Plan ==
{
@@ -51,17 +49,6 @@ Sink(table=[default_catalog.default_database.MySink], fields=[first])
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
- }, {
- "id" : ,
- "type" : "StreamingFileWriter",
- "pact" : "Operator",
- "contents" : "StreamingFileWriter",
- "parallelism" : 1,
- "predecessors" : [ {
- "id" : ,
- "ship_strategy" : "FORWARD",
- "side" : "second"
- } ]
}, {
"id" : ,
"type" : "Source: Custom File source",
@@ -100,14 +87,7 @@ Sink(table=[default_catalog.default_database.MySink], fields=[first])
"id" : ,
"ship_strategy" : "FORWARD",
"side" : "second"
- } ]
- }, {
- "id" : ,
- "type" : "end: Writer",
- "pact" : "Operator",
- "contents" : "end: Writer",
- "parallelism" : 1,
- "predecessors" : [ {
+ }, {
"id" : ,
"ship_strategy" : "FORWARD",
"side" : "second"
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
index 9813db3c568d7..1ba58f298ae78 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
@@ -157,11 +157,10 @@ LogicalSink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
@@ -196,13 +195,10 @@ LogicalSink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
@@ -221,10 +217,9 @@ LogicalSink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
index 513a568b5aaf8..bd5a6a20be2d1 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
@@ -367,32 +367,31 @@ LogicalSink(table=[default_catalog.default_database.sink2], fields=[number, word
(word, 'a')], changelogMode=[I,UB,UA])
- +- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
- :- Calc(select=[+(number, 1) AS number, word], changelogMode=[I,UB,UA])
- : +- Calc(select=[number, word], changelogMode=[I,UB,UA])
- : +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA])
- : +- Exchange(distribution=[hash[word]], changelogMode=[I])
- : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I])
- +- Calc(select=[-(number, 1) AS number, word], changelogMode=[I,UB,UA])
- +- Calc(select=[number, word], changelogMode=[I,UB,UA])
- +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA])
- +- Exchange(distribution=[hash[word]], changelogMode=[I])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I])
-
-Sink(table=[default_catalog.default_database.sink1], fields=[number, word], changelogMode=[NONE])
-+- Calc(select=[number, word], where=[<(word, 'a')], changelogMode=[I,UB,UA])
- +- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
- :- Calc(select=[+(number, 1) AS number, word], changelogMode=[I,UB,UA])
- : +- Calc(select=[number, word], changelogMode=[I,UB,UA])
- : +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA])
- : +- Exchange(distribution=[hash[word]], changelogMode=[I])
- : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I])
- +- Calc(select=[-(number, 1) AS number, word], changelogMode=[I,UB,UA])
- +- Calc(select=[number, word], changelogMode=[I,UB,UA])
- +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA])
- +- Exchange(distribution=[hash[word]], changelogMode=[I])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I])
++- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
+ :- Calc(select=[number, word], where=[>(word, 'a')], changelogMode=[I,UB,UA])
+ : +- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
+ : :- Calc(select=[+(number, 1) AS number, word], changelogMode=[I,UB,UA])
+ : : +- Calc(select=[number, word], changelogMode=[I,UB,UA])
+ : : +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA])
+ : : +- Exchange(distribution=[hash[word]], changelogMode=[I])
+ : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I])
+ : +- Calc(select=[-(number, 1) AS number, word], changelogMode=[I,UB,UA])
+ : +- Calc(select=[number, word], changelogMode=[I,UB,UA])
+ : +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA])
+ : +- Exchange(distribution=[hash[word]], changelogMode=[I])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I])
+ +- Calc(select=[number, word], where=[<(word, 'a')], changelogMode=[I,UB,UA])
+ +- Union(all=[true], union=[number, word], changelogMode=[I,UB,UA])
+ :- Calc(select=[+(number, 1) AS number, word], changelogMode=[I,UB,UA])
+ : +- Calc(select=[number, word], changelogMode=[I,UB,UA])
+ : +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA])
+ : +- Exchange(distribution=[hash[word]], changelogMode=[I])
+ : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I])
+ +- Calc(select=[-(number, 1) AS number, word], changelogMode=[I,UB,UA])
+ +- Calc(select=[number, word], changelogMode=[I,UB,UA])
+ +- GroupAggregate(groupBy=[word], select=[word, SUM(number) AS number], changelogMode=[I,UB,UA])
+ +- Exchange(distribution=[hash[word]], changelogMode=[I])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I])
Sink(table=[default_catalog.default_database.sink2], fields=[number, word], changelogMode=[NONE])
+- Calc(select=[number, word], changelogMode=[I,UB,UA])
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
index e71a198ba6e45..b18b5a61bcf22 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
@@ -129,16 +129,13 @@ LogicalSink(table=[default_catalog.default_database.t_keyed_sink], fields=[name,
@@ -179,15 +176,12 @@ LogicalSink(table=[default_catalog.default_database.t_sink], fields=[out])
@@ -252,15 +246,12 @@ LogicalSink(table=[default_catalog.default_database.t_keyed_sink], fields=[name,
@@ -286,17 +277,14 @@ LogicalSink(table=[default_catalog.default_database.t_keyed_sink], fields=[name,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index 9f1e3b17485d5..ca722ace65392 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -2209,22 +2209,19 @@ LogicalSink(table=[default_catalog.default_database.s1], fields=[window_start, w
@@ -2254,24 +2251,21 @@ LogicalSink(table=[default_catalog.default_database.s1], fields=[window_start, w
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
index 6055fcb363b81..ce02921776621 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
@@ -584,10 +584,7 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) {
val tableResult = stmtSet.execute()
// only check the schema
- checkInsertTableResult(
- tableResult,
- "default_catalog.default_database.MySink_1",
- "default_catalog.default_database.MySink_2")
+ checkInsertTableResult(tableResult, "default_catalog.default_database.MySink_1")
}
@TestTemplate