From 26d1be46531919dcd00d02eded6795035b342e13 Mon Sep 17 00:00:00 2001 From: Feng Jin Date: Thu, 23 Jan 2025 20:21:30 +0800 Subject: [PATCH 1/5] [FLINK-37087][doc] Add docs for alter materialized table as query --- .../table/materialized-table/statements.md | 69 +++++++++++++++++- .../table/materialized-table/statements.md | 73 ++++++++++++++++++- 2 files changed, 135 insertions(+), 7 deletions(-) diff --git a/docs/content.zh/docs/dev/table/materialized-table/statements.md b/docs/content.zh/docs/dev/table/materialized-table/statements.md index acb4f2c677206..3df4ddcca64c3 100644 --- a/docs/content.zh/docs/dev/table/materialized-table/statements.md +++ b/docs/content.zh/docs/dev/table/materialized-table/statements.md @@ -254,16 +254,18 @@ CREATE MATERIALIZED TABLE my_materialized_table_full ## 限制 - 不支持显式指定列名 -- 不支持修改查询语句 - 不支持在 select 查询语句中引用临时表、临时视图或临时函数 # ALTER MATERIALIZED TABLE ``` -ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)] | REFRESH [PARTITION partition_spec] +ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name + SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)] | + REFRESH [PARTITION partition_spec] | + AS ``` -`ALTER MATERIALIZED TABLE` 用于管理物化表。用户可以使用此命令暂停和恢复物化表的刷新管道,并手动触发数据刷新。 +`ALTER MATERIALIZED TABLE` 用于管理物化表。用户可以使用此命令暂停和恢复物化表的刷新管道,并手动触发数据刷新,以及修改物化表的查询定义。 ## SUSPEND @@ -326,6 +328,67 @@ ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds='2024-06-28 注意 - REFRESH 操作会启动批作业来刷新表的数据。 +## AS +```sql +ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS +``` + +`AS ` 用于修改物化表的查询定义。它会更新物化表刷新任务中使用的查询,并基于更新后的查询推导出新的 `schema` ,从而调整表的 `schema` 。但该操作不会直接影响现有数据。 + +具体修改流程取决于物化表的刷新模式: + +**全量模式:** + +1. 更新物化表的 `schema` 和查询定义。 +2. 下次刷新任务启动时,将使用新的查询刷新: +- 如果修改的物化表是分区表,且[partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) 配置正确,则仅刷新最新分区。 +- 否则,将刷新整个表的数据。 + +**持续模式:** + +1. 暂停当前的实时刷新任务。 +2. 更新物化表的 `schema` 和查询定义。 +3. 启动新的实时任务以刷新物化表: +- 新的刷新任务会从头开始,而不是从之前的状态继续。 +- 数据源的起始消费位置会由到连接器的默认实现或查询中设置的 `option hint` 决定) + +**示例:** + +```sql +-- 原始物化表定义 +CREATE MATERIALIZED TABLE my_materialized_table + FRESHNESS = INTERVAL '10' SECOND + AS + SELECT + user_id, + COUNT(*) AS event_count, + SUM(amount) AS total_amount + FROM + kafka_catalog.db1.events + WHERE + event_type = 'purchase' + GROUP BY + user_id; + +-- 修改现有物化表的查询 +ALTER MATERIALIZED TABLE my_materialized_table +AS SELECT + user_id, + COUNT(*) AS event_count, + SUM(amount) AS total_amount, + AVG(amount) AS avg_amount -- 在末尾添加新的可为空列 +FROM + kafka_catalog.db1.events +WHERE + event_type = 'purchase' +GROUP BY + user_id; +``` + +注意 +- Schema 修改仅支持在原表 schema 末尾添加 `nullable` 列。 +- 在持续模式下,新的刷新任务不会从原来的刷新任务的状态恢复。这可能会导致短暂的数据重复或丢失。 + # DROP MATERIALIZED TABLE ```text diff --git a/docs/content/docs/dev/table/materialized-table/statements.md b/docs/content/docs/dev/table/materialized-table/statements.md index 593f7eed9baa8..aaccae79c3263 100644 --- a/docs/content/docs/dev/table/materialized-table/statements.md +++ b/docs/content/docs/dev/table/materialized-table/statements.md @@ -180,7 +180,7 @@ CREATE MATERIALIZED TABLE my_materialized_table ## AS -This clause is used to define the query for populating materialized view data. The upstream table can be a materialized table, table, or view. The select statement supports all Flink SQL [Queries]({{< ref "docs/dev/table/sql/queries/overview" >}}). +This clause is used to define the query for populating materialized table data. The upstream table can be a materialized table, table, or view. The select statement supports all Flink SQL [Queries]({{< ref "docs/dev/table/sql/queries/overview" >}}). **Example:** @@ -255,16 +255,18 @@ CREATE MATERIALIZED TABLE my_materialized_table_full ## Limitations - Does not support explicitly specifying columns -- Does not support modifying query statements - Does not support referring to temporary tables, temporary views, or temporary functions in the select query # ALTER MATERIALIZED TABLE ``` -ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)] | REFRESH [PARTITION partition_spec] +ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name + SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)] | + REFRESH [PARTITION partition_spec] | + AS ``` -`ALTER MATERIALIZED TABLE` is used to manage materialized tables. This command allows users to suspend and resume refresh pipeline of materialized tables and manually trigger data refreshes. +`ALTER MATERIALIZED TABLE` is used to manage materialized tables. This command allows users to suspend and resume refresh pipeline of materialized tables and manually trigger data refreshes, and modify the query definition of materialized tables. ## SUSPEND @@ -326,6 +328,69 @@ ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds='2024-06-28 Note - The REFRESH operation will start a Flink batch job to refresh the materialized table data. +## AS + +```sql +ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS +``` + +The `AS ` clause allows you to modify the query definition of a materialized table. It updates the query used by the materialized table refresh job and infers a new schema based on the updated query to adjust the table’s schema. However, this operation does not directly affect existing data. + +The modification process depends on the refresh mode of the materialized table: + +**Full mode:** + +1. Update the schema and query definition of the materialized table. +2. During the next refresh job, the table is refreshed using the new query definition: +- If the table is a partitioned table and [partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) is correctly set, only the latest partition will be refreshed. +- Otherwise, the entire table will be refreshed. + +**Continuous mode:** + +1. Pause the current continuous refresh jobs. +2. Update the `schema` and `query definition` of the materialized table. +3. Start a new continuous refresh job to refresh the materialized table: +- The new refresh job starts from the beginning and does not restore from the previous state. +- The starting consumption position of the data source is determined by the connector’s default implementation or the option hint specified in the query. + +**Example:** + +```sql +-- Definition of origin materialized table +CREATE MATERIALIZED TABLE my_materialized_table + FRESHNESS = INTERVAL '10' SECOND + AS + SELECT + user_id, + COUNT(*) AS event_count, + SUM(amount) AS total_amount + FROM + kafka_catalog.db1.events + WHERE + event_type = 'purchase' + GROUP BY + user_id; + +-- Modify the query definition of materialized table +ALTER MATERIALIZED TABLE my_materialized_table + AS + SELECT + user_id, + COUNT(*) AS event_count, + SUM(amount) AS total_amount, + AVG(amount) AS avg_amount -- Add a new nullable column at the end + FROM + kafka_catalog.db1.events + WHERE + event_type = 'purchase' + GROUP BY + user_id; +``` + +Note +- Schema modification only supports adding `nullable` columns at the end of the original table's schema. +- In continuous mode, the new refresh job will not restore from the state of the original refresh job. This may result in temporary data duplication or loss. + # DROP MATERIALIZED TABLE ```text From 03e6b1ab9eb348168077f54a549328c930de2981 Mon Sep 17 00:00:00 2001 From: Feng Jin Date: Sun, 26 Jan 2025 14:51:48 +0800 Subject: [PATCH 2/5] Update docs/content/docs/dev/table/materialized-table/statements.md Co-authored-by: Ron --- docs/content/docs/dev/table/materialized-table/statements.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/content/docs/dev/table/materialized-table/statements.md b/docs/content/docs/dev/table/materialized-table/statements.md index aaccae79c3263..2d15714f17c3b 100644 --- a/docs/content/docs/dev/table/materialized-table/statements.md +++ b/docs/content/docs/dev/table/materialized-table/statements.md @@ -388,7 +388,7 @@ ALTER MATERIALIZED TABLE my_materialized_table ``` Note -- Schema modification only supports adding `nullable` columns at the end of the original table's schema. +- Schema evolution currently only supports adding `nullable` columns to the end of the original table's schema. - In continuous mode, the new refresh job will not restore from the state of the original refresh job. This may result in temporary data duplication or loss. # DROP MATERIALIZED TABLE From 0a9b2d8d56fc31d947fa4218aa75131f7c7057e2 Mon Sep 17 00:00:00 2001 From: Feng Jin Date: Sun, 26 Jan 2025 15:14:01 +0800 Subject: [PATCH 3/5] address the comments --- .../dev/table/materialized-table/statements.md | 14 +++++++------- .../dev/table/materialized-table/statements.md | 16 ++++++++-------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/docs/content.zh/docs/dev/table/materialized-table/statements.md b/docs/content.zh/docs/dev/table/materialized-table/statements.md index 3df4ddcca64c3..2cfafb42f1eba 100644 --- a/docs/content.zh/docs/dev/table/materialized-table/statements.md +++ b/docs/content.zh/docs/dev/table/materialized-table/statements.md @@ -333,14 +333,14 @@ ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds='2024-06-28 ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS ``` -`AS ` 用于修改物化表的查询定义。它会更新物化表刷新任务中使用的查询,并基于更新后的查询推导出新的 `schema` ,从而调整表的 `schema` 。但该操作不会直接影响现有数据。 +`AS ` 子句用于修改刷新物化表的查询定义。它会先使用新查询派生的 `schema` 更新表的 `schema`,然后使用新查询刷新表数据。需要特别强调的是,默认情况下,这不会影响历史数据。 具体修改流程取决于物化表的刷新模式: **全量模式:** 1. 更新物化表的 `schema` 和查询定义。 -2. 下次刷新任务启动时,将使用新的查询刷新: +2. 在刷新任务下次触发执行时,将使用新的查询定义刷新数据: - 如果修改的物化表是分区表,且[partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) 配置正确,则仅刷新最新分区。 - 否则,将刷新整个表的数据。 @@ -348,9 +348,9 @@ ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS 注意 -- Schema 修改仅支持在原表 schema 末尾添加 `nullable` 列。 -- 在持续模式下,新的刷新任务不会从原来的刷新任务的状态恢复。这可能会导致短暂的数据重复或丢失。 +- Schema 演进当前仅支持在原表 schema 尾部添加`可空列`。 +- 在持续模式下,新的流式任务不会从原来的流式任务的状态恢复。这可能会导致短暂的数据重复或丢失。 # DROP MATERIALIZED TABLE diff --git a/docs/content/docs/dev/table/materialized-table/statements.md b/docs/content/docs/dev/table/materialized-table/statements.md index 2d15714f17c3b..9923e661fe5dc 100644 --- a/docs/content/docs/dev/table/materialized-table/statements.md +++ b/docs/content/docs/dev/table/materialized-table/statements.md @@ -334,24 +334,24 @@ ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds='2024-06-28 ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS ``` -The `AS ` clause allows you to modify the query definition of a materialized table. It updates the query used by the materialized table refresh job and infers a new schema based on the updated query to adjust the table’s schema. However, this operation does not directly affect existing data. +The `AS ` clause allows you to modify the query definition for refreshing materialized table. It will first evolve the table's schema using the schema derived from the new query and then use the new query to refresh the table data. It is important to emphasize that, by default, this does not impact historical data. The modification process depends on the refresh mode of the materialized table: **Full mode:** -1. Update the schema and query definition of the materialized table. -2. During the next refresh job, the table is refreshed using the new query definition: -- If the table is a partitioned table and [partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) is correctly set, only the latest partition will be refreshed. -- Otherwise, the entire table will be refreshed. +1. Update the `schema` and `query definition` of the materialized table. +2. The table is refreshed using the new query definition when the next refresh job is triggered: +- If it is a partitioned table and [partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) is correctly set, only the latest partition will be refreshed. +- Otherwise, the table will be overwritten entirely. **Continuous mode:** -1. Pause the current continuous refresh jobs. +1. Pause the current running refresh job. 2. Update the `schema` and `query definition` of the materialized table. -3. Start a new continuous refresh job to refresh the materialized table: +3. Start a new refresh job to refresh the materialized table: - The new refresh job starts from the beginning and does not restore from the previous state. -- The starting consumption position of the data source is determined by the connector’s default implementation or the option hint specified in the query. +- The starting offset of the data source is determined by the connector’s default implementation or the `option hint` specified in the query. **Example:** From e84aa86216890a988102069276ffeb9745574fb0 Mon Sep 17 00:00:00 2001 From: Feng Jin Date: Thu, 6 Feb 2025 13:08:32 +0800 Subject: [PATCH 4/5] address comments --- .../dev/table/materialized-table/statements.md | 18 +++++++++--------- .../dev/table/materialized-table/statements.md | 8 ++++---- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/docs/content.zh/docs/dev/table/materialized-table/statements.md b/docs/content.zh/docs/dev/table/materialized-table/statements.md index 2cfafb42f1eba..8e6fc3cb6bac4 100644 --- a/docs/content.zh/docs/dev/table/materialized-table/statements.md +++ b/docs/content.zh/docs/dev/table/materialized-table/statements.md @@ -333,24 +333,24 @@ ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds='2024-06-28 ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS ``` -`AS ` 子句用于修改刷新物化表的查询定义。它会先使用新查询派生的 `schema` 更新表的 `schema`,然后使用新查询刷新表数据。需要特别强调的是,默认情况下,这不会影响历史数据。 +`AS ` 子句用于修改刷新物化表的查询定义。它会先使用新查询推导的 `schema` 更新表的 `schema`,然后使用新查询刷新表数据。需要特别强调的是,默认情况下,这不会影响历史数据。 具体修改流程取决于物化表的刷新模式: **全量模式:** 1. 更新物化表的 `schema` 和查询定义。 -2. 在刷新任务下次触发执行时,将使用新的查询定义刷新数据: -- 如果修改的物化表是分区表,且[partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) 配置正确,则仅刷新最新分区。 -- 否则,将刷新整个表的数据。 +2. 在刷新作业下次触发执行时,将使用新的查询定义刷新数据: + - 如果修改的物化表是分区表,且[partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) 配置正确,则仅刷新最新分区。 + - 否则,将刷新整个表的数据。 **持续模式:** -1. 暂停当前的实时刷新任务。 +1. 暂停当前的流式刷新作业。 2. 更新物化表的 `schema` 和查询定义。 3. 启动新的流式任务以刷新物化表: -- 新的流式任务会从头开始,而不会从之前的流式任务状态恢复。 -- 数据源的起始位点会由到连接器的默认实现或查询中设置的 `option hint` 决定。 + - 新的流式任务会从头开始,而不会从之前的流式任务状态恢复。 + - 数据源的起始位点会由到连接器的默认实现或查询中设置的 [dynamic hint]({{< ref "docs/dev/table/sql/queries/hints" >}}#dynamic-table-options) 决定。 **示例:** @@ -376,7 +376,7 @@ AS SELECT user_id, COUNT(*) AS event_count, SUM(amount) AS total_amount, - AVG(amount) AS avg_amount -- 在末尾添加新的可为空列 + AVG(amount) AS avg_amount -- 在末尾追加新的可为空列 FROM kafka_catalog.db1.events WHERE @@ -386,7 +386,7 @@ GROUP BY ``` 注意 -- Schema 演进当前仅支持在原表 schema 尾部添加`可空列`。 +- Schema 演进当前仅支持在原表 schema 尾部追加`可空列`。 - 在持续模式下,新的流式任务不会从原来的流式任务的状态恢复。这可能会导致短暂的数据重复或丢失。 # DROP MATERIALIZED TABLE diff --git a/docs/content/docs/dev/table/materialized-table/statements.md b/docs/content/docs/dev/table/materialized-table/statements.md index 9923e661fe5dc..a5c7745397747 100644 --- a/docs/content/docs/dev/table/materialized-table/statements.md +++ b/docs/content/docs/dev/table/materialized-table/statements.md @@ -342,16 +342,16 @@ The modification process depends on the refresh mode of the materialized table: 1. Update the `schema` and `query definition` of the materialized table. 2. The table is refreshed using the new query definition when the next refresh job is triggered: -- If it is a partitioned table and [partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) is correctly set, only the latest partition will be refreshed. -- Otherwise, the table will be overwritten entirely. + - If it is a partitioned table and [partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) is correctly set, only the latest partition will be refreshed. + - Otherwise, the table will be overwritten entirely. **Continuous mode:** 1. Pause the current running refresh job. 2. Update the `schema` and `query definition` of the materialized table. 3. Start a new refresh job to refresh the materialized table: -- The new refresh job starts from the beginning and does not restore from the previous state. -- The starting offset of the data source is determined by the connector’s default implementation or the `option hint` specified in the query. + - The new refresh job starts from the beginning and does not restore from the previous state. + - The starting offset of the data source is determined by the connector’s default implementation or the [dynamic hint]({{< ref "docs/dev/table/sql/queries/hints" >}}#dynamic-table-options) specified in the query. **Example:** From ddbcfb31b076f354bdb3ad9bf8733e0dc0d4a03b Mon Sep 17 00:00:00 2001 From: Feng Jin Date: Sat, 8 Feb 2025 15:21:00 +0800 Subject: [PATCH 5/5] address comment --- .../docs/dev/table/materialized-table/statements.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/content.zh/docs/dev/table/materialized-table/statements.md b/docs/content.zh/docs/dev/table/materialized-table/statements.md index 8e6fc3cb6bac4..6e0e641c96828 100644 --- a/docs/content.zh/docs/dev/table/materialized-table/statements.md +++ b/docs/content.zh/docs/dev/table/materialized-table/statements.md @@ -348,8 +348,8 @@ ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS }}#dynamic-table-options) 决定。 **示例:** @@ -387,7 +387,7 @@ GROUP BY 注意 - Schema 演进当前仅支持在原表 schema 尾部追加`可空列`。 -- 在持续模式下,新的流式任务不会从原来的流式任务的状态恢复。这可能会导致短暂的数据重复或丢失。 +- 在持续模式下,新的流式作业不会从原来的流式作业的状态恢复。这可能会导致短暂的数据重复或丢失。 # DROP MATERIALIZED TABLE