Skip to content

Commit

Permalink
[FLINK-37087][doc] Add docs for alter materialized table as query
Browse files Browse the repository at this point in the history
  • Loading branch information
hackergin committed Jan 24, 2025
1 parent 5749a07 commit 38fd3d0
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 7 deletions.
69 changes: 66 additions & 3 deletions docs/content.zh/docs/dev/table/materialized-table/statements.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,16 +254,18 @@ CREATE MATERIALIZED TABLE my_materialized_table_full

## 限制
- 不支持显式指定列名
- 不支持修改查询语句
- 不支持在 select 查询语句中引用临时表、临时视图或临时函数

# ALTER MATERIALIZED TABLE

```
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)] | REFRESH [PARTITION partition_spec]
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name
SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)] |
REFRESH [PARTITION partition_spec] |
AS <select_statement>
```

`ALTER MATERIALIZED TABLE` 用于管理物化表。用户可以使用此命令暂停和恢复物化表的刷新管道,并手动触发数据刷新。
`ALTER MATERIALIZED TABLE` 用于管理物化表。用户可以使用此命令暂停和恢复物化表的刷新管道,并手动触发数据刷新,以及修改物化表的查询定义


## SUSPEND
Expand Down Expand Up @@ -326,6 +328,67 @@ ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds='2024-06-28
<span class="label label-danger">注意</span>
- REFRESH 操作会启动批作业来刷新表的数据。

## AS <select_statement>
```sql
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS <select_statement>
```

`AS <select_statement>` 用于修改物化表的查询定义。它会更新物化表刷新任务中使用的查询,并基于更新后的查询推导出新的 `schema` ,从而调整表的 `schema` 。但该操作不会直接影响现有数据。

具体修改流程取决于物化表的刷新模式:

**全量模式:**

1. 更新物化表的 `schema` 和查询定义。
2. 下次刷新任务启动时,将使用新的查询刷新:
- 如果修改的物化表是分区表,且[partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) 配置正确,则仅刷新最新分区。
- 否则,将刷新整个表的数据。

**持续模式:**

1. 暂停当前的实时刷新任务。
2. 更新物化表的 `schema` 和查询定义。
3. 启动新的实时任务以刷新物化表:
- 新的刷新任务会从头开始,而不是从之前的状态继续。
- 数据源的起始消费位置会由到连接器的默认实现或查询中设置的 `option hint` 决定)

**示例:**

```sql
-- 原始物化表定义
CREATE MATERIALIZED TABLE my_materialized_table
FRESHNESS = INTERVAL '10' SECOND
AS
SELECT
user_id,
COUNT(*) AS event_count,
SUM(amount) AS total_amount
FROM
kafka_catalog.db1.events
WHERE
event_type = 'purchase'
GROUP BY
user_id;

-- 修改现有物化表的查询
ALTER MATERIALIZED TABLE my_materialized_table
AS SELECT
user_id,
COUNT(*) AS event_count,
SUM(amount) AS total_amount,
AVG(amount) AS avg_amount -- 在末尾添加新的可为空列
FROM
kafka_catalog.db1.events
WHERE
event_type = 'purchase'
GROUP BY
user_id;
```

<span class="label label-danger">注意</span>
- Schema 修改仅支持在原表 schema 末尾添加 `nullable` 列。
- 在持续模式下,新的刷新任务不会从原来的刷新任务的状态恢复。这可能会导致短暂的数据重复或丢失。

# DROP MATERIALIZED TABLE

```text
Expand Down
73 changes: 69 additions & 4 deletions docs/content/docs/dev/table/materialized-table/statements.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ CREATE MATERIALIZED TABLE my_materialized_table

## AS <select_statement>

This clause is used to define the query for populating materialized view data. The upstream table can be a materialized table, table, or view. The select statement supports all Flink SQL [Queries]({{< ref "docs/dev/table/sql/queries/overview" >}}).
This clause is used to define the query for populating materialized table data. The upstream table can be a materialized table, table, or view. The select statement supports all Flink SQL [Queries]({{< ref "docs/dev/table/sql/queries/overview" >}}).

**Example:**

Expand Down Expand Up @@ -255,16 +255,18 @@ CREATE MATERIALIZED TABLE my_materialized_table_full
## Limitations

- Does not support explicitly specifying columns
- Does not support modifying query statements
- Does not support referring to temporary tables, temporary views, or temporary functions in the select query

# ALTER MATERIALIZED TABLE

```
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)] | REFRESH [PARTITION partition_spec]
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name
SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)] |
REFRESH [PARTITION partition_spec] |
AS <select_statement>
```

`ALTER MATERIALIZED TABLE` is used to manage materialized tables. This command allows users to suspend and resume refresh pipeline of materialized tables and manually trigger data refreshes.
`ALTER MATERIALIZED TABLE` is used to manage materialized tables. This command allows users to suspend and resume refresh pipeline of materialized tables and manually trigger data refreshes, and modify the query definition of materialized tables.

## SUSPEND

Expand Down Expand Up @@ -326,6 +328,69 @@ ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds='2024-06-28
<span class="label label-danger">Note</span>
- The REFRESH operation will start a Flink batch job to refresh the materialized table data.

## AS <select_statement>

```sql
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS <select_statement>
```

The `AS <select_statement>` clause allows you to modify the query definition of a materialized table. It updates the query used by the materialized table refresh job and infers a new schema based on the updated query to adjust the table’s schema. However, this operation does not directly affect existing data.

The modification process depends on the refresh mode of the materialized table:

**Full mode:**

1. Update the schema and query definition of the materialized table.
2. During the next refresh task, the table is refreshed using the new query definition:
- If the table is a partitioned table and [partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) is correctly set, only the latest partition will be refreshed.
- Otherwise, the entire table will be refreshed.

**Continuous mode:**

1. Pause the current continuous refresh jobs.
2. Update the `schema` and `query definition` of the materialized table.
3. Start a new continuous refresh job to refresh the materialized table:
- The new refresh task starts from the beginning and does not resume from the previous state.
- The starting consumption position of the data source is determined by the connector’s default implementation or the option hint specified in the query.

**Example:**

```sql
-- Definition of origin materialized table
CREATE MATERIALIZED TABLE my_materialized_table
FRESHNESS = INTERVAL '10' SECOND
AS
SELECT
user_id,
COUNT(*) AS event_count,
SUM(amount) AS total_amount
FROM
kafka_catalog.db1.events
WHERE
event_type = 'purchase'
GROUP BY
user_id;

-- Modify the query definition of materialized table
ALTER MATERIALIZED TABLE my_materialized_table
AS
SELECT
user_id,
COUNT(*) AS event_count,
SUM(amount) AS total_amount,
AVG(amount) AS avg_amount -- Add a new nullable column at the end
FROM
kafka_catalog.db1.events
WHERE
event_type = 'purchase'
GROUP BY
user_id;
```

<span class="label label-danger">Note</span>
- Schema modification only supports adding `nullable` columns at the end of the original table's schema.
- In continuous mode, the new refresh job will not resume from the state of the original refresh job. This may result in temporary data duplication or loss.

# DROP MATERIALIZED TABLE

```text
Expand Down

0 comments on commit 38fd3d0

Please sign in to comment.