Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37087][doc] Add docs for alter materialized table as query #26064

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 66 additions & 3 deletions docs/content.zh/docs/dev/table/materialized-table/statements.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,16 +254,18 @@ CREATE MATERIALIZED TABLE my_materialized_table_full

## 限制
- 不支持显式指定列名
- 不支持修改查询语句
- 不支持在 select 查询语句中引用临时表、临时视图或临时函数

# ALTER MATERIALIZED TABLE

```
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)] | REFRESH [PARTITION partition_spec]
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name
SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)] |
REFRESH [PARTITION partition_spec] |
AS <select_statement>
```

`ALTER MATERIALIZED TABLE` 用于管理物化表。用户可以使用此命令暂停和恢复物化表的刷新管道,并手动触发数据刷新。
`ALTER MATERIALIZED TABLE` 用于管理物化表。用户可以使用此命令暂停和恢复物化表的刷新管道,并手动触发数据刷新,以及修改物化表的查询定义


## SUSPEND
Expand Down Expand Up @@ -326,6 +328,67 @@ ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds='2024-06-28
<span class="label label-danger">注意</span>
- REFRESH 操作会启动批作业来刷新表的数据。

## AS <select_statement>
```sql
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS <select_statement>
```

`AS <select_statement>` 子句用于修改刷新物化表的查询定义。它会先使用新查询推导的 `schema` 更新表的 `schema`,然后使用新查询刷新表数据。需要特别强调的是,默认情况下,这不会影响历史数据。

具体修改流程取决于物化表的刷新模式:

**全量模式:**

1. 更新物化表的 `schema` 和查询定义。
2. 在刷新作业下次触发执行时,将使用新的查询定义刷新数据:
- 如果修改的物化表是分区表,且[partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) 配置正确,则仅刷新最新分区。
- 否则,将刷新整个表的数据。

**持续模式:**

1. 暂停当前的流式刷新作业。
2. 更新物化表的 `schema` 和查询定义。
3. 启动新的流式作业以刷新物化表:
- 新的流式作业会从头开始,而不会从之前的流式作业状态恢复。
- 数据源的起始位点会由到连接器的默认实现或查询中设置的 [dynamic hint]({{< ref "docs/dev/table/sql/queries/hints" >}}#dynamic-table-options) 决定。

**示例:**

```sql
-- 原始物化表定义
CREATE MATERIALIZED TABLE my_materialized_table
FRESHNESS = INTERVAL '10' SECOND
AS
SELECT
user_id,
COUNT(*) AS event_count,
SUM(amount) AS total_amount
FROM
kafka_catalog.db1.events
WHERE
event_type = 'purchase'
GROUP BY
user_id;

-- 修改现有物化表的查询
ALTER MATERIALIZED TABLE my_materialized_table
AS SELECT
user_id,
COUNT(*) AS event_count,
SUM(amount) AS total_amount,
AVG(amount) AS avg_amount -- 在末尾追加新的可为空列
FROM
kafka_catalog.db1.events
WHERE
event_type = 'purchase'
GROUP BY
user_id;
```

<span class="label label-danger">注意</span>
- Schema 演进当前仅支持在原表 schema 尾部追加`可空列`。
- 在持续模式下,新的流式作业不会从原来的流式作业的状态恢复。这可能会导致短暂的数据重复或丢失。

# DROP MATERIALIZED TABLE

```text
Expand Down
73 changes: 69 additions & 4 deletions docs/content/docs/dev/table/materialized-table/statements.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ CREATE MATERIALIZED TABLE my_materialized_table

## AS <select_statement>

This clause is used to define the query for populating materialized view data. The upstream table can be a materialized table, table, or view. The select statement supports all Flink SQL [Queries]({{< ref "docs/dev/table/sql/queries/overview" >}}).
This clause is used to define the query for populating materialized table data. The upstream table can be a materialized table, table, or view. The select statement supports all Flink SQL [Queries]({{< ref "docs/dev/table/sql/queries/overview" >}}).

**Example:**

Expand Down Expand Up @@ -255,16 +255,18 @@ CREATE MATERIALIZED TABLE my_materialized_table_full
## Limitations

- Does not support explicitly specifying columns
- Does not support modifying query statements
- Does not support referring to temporary tables, temporary views, or temporary functions in the select query

# ALTER MATERIALIZED TABLE

```
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)] | REFRESH [PARTITION partition_spec]
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name
SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)] |
REFRESH [PARTITION partition_spec] |
AS <select_statement>
```

`ALTER MATERIALIZED TABLE` is used to manage materialized tables. This command allows users to suspend and resume refresh pipeline of materialized tables and manually trigger data refreshes.
`ALTER MATERIALIZED TABLE` is used to manage materialized tables. This command allows users to suspend and resume refresh pipeline of materialized tables and manually trigger data refreshes, and modify the query definition of materialized tables.

## SUSPEND

Expand Down Expand Up @@ -326,6 +328,69 @@ ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds='2024-06-28
<span class="label label-danger">Note</span>
- The REFRESH operation will start a Flink batch job to refresh the materialized table data.

## AS <select_statement>

```sql
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS <select_statement>
```

The `AS <select_statement>` clause allows you to modify the query definition for refreshing materialized table. It will first evolve the table's schema using the schema derived from the new query and then use the new query to refresh the table data. It is important to emphasize that, by default, this does not impact historical data.

The modification process depends on the refresh mode of the materialized table:

**Full mode:**
hackergin marked this conversation as resolved.
Show resolved Hide resolved

1. Update the `schema` and `query definition` of the materialized table.
2. The table is refreshed using the new query definition when the next refresh job is triggered:
- If it is a partitioned table and [partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) is correctly set, only the latest partition will be refreshed.
- Otherwise, the table will be overwritten entirely.

**Continuous mode:**
hackergin marked this conversation as resolved.
Show resolved Hide resolved

1. Pause the current running refresh job.
2. Update the `schema` and `query definition` of the materialized table.
3. Start a new refresh job to refresh the materialized table:
- The new refresh job starts from the beginning and does not restore from the previous state.
- The starting offset of the data source is determined by the connector’s default implementation or the [dynamic hint]({{< ref "docs/dev/table/sql/queries/hints" >}}#dynamic-table-options) specified in the query.

**Example:**

```sql
-- Definition of origin materialized table
CREATE MATERIALIZED TABLE my_materialized_table
FRESHNESS = INTERVAL '10' SECOND
AS
SELECT
user_id,
COUNT(*) AS event_count,
SUM(amount) AS total_amount
FROM
kafka_catalog.db1.events
WHERE
event_type = 'purchase'
GROUP BY
user_id;

-- Modify the query definition of materialized table
ALTER MATERIALIZED TABLE my_materialized_table
AS
SELECT
user_id,
COUNT(*) AS event_count,
SUM(amount) AS total_amount,
AVG(amount) AS avg_amount -- Add a new nullable column at the end
FROM
kafka_catalog.db1.events
WHERE
event_type = 'purchase'
GROUP BY
user_id;
```

<span class="label label-danger">Note</span>
- Schema evolution currently only supports adding `nullable` columns to the end of the original table's schema.
- In continuous mode, the new refresh job will not restore from the state of the original refresh job. This may result in temporary data duplication or loss.

# DROP MATERIALIZED TABLE

```text
Expand Down