Skip to content

Commit

Permalink
[Feature][Transform-v2] Add metadata transform
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 committed Oct 29, 2024
1 parent 50113e7 commit 5bebabe
Show file tree
Hide file tree
Showing 36 changed files with 1,719 additions and 14 deletions.
82 changes: 82 additions & 0 deletions docs/en/transform-v2/metadata.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Metadata

> Metadata transform plugin
## Description
Metadata transform plugin for adding metadata fields to data

## Available Metadata

| Key | DataType | Description |
|:---------:|:--------:|:---------------------------------------------------------------------------------------------------|
| database | string | Name of the table that contain the row. |
| table | string | Name of the table that contain the row. |
| rowKind | string | The type of operation |
| ts_ms | Long | The time at which the connector processed the event. |
| delay | Long | The difference between data extraction time and database change time |
| partition | string | Contains the partition field of the corresponding number table of the row, multiple using `,` join |

## Options

| name | type | required | default value | Description |
|:---------------:|------|----------|---------------|---------------------------------------------------------------------------|
| metadata_fields | map | yes | | A mapping metadata input fields and their corresponding output fields. |

### metadata_fields [map]

A mapping between metadata fields and their respective output fields.

```hocon
metadata_fields {
database = c_database
table = c_table
rowKind = c_rowKind
ts_ms = c_ts_ms
delay = c_delay
}
```

## Examples

```yaml

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
read_limit.bytes_per_second = 7000000
read_limit.rows_per_second = 400
}

source {
MySQL-CDC {
result_table_name = "customers_mysql_cdc"
server-id = 5652
username = "root"
password = "zdyk_Dev@2024"
table-names = ["source.user"]
base-url = "jdbc:mysql://172.16.17.123:3306/source"
}
}

transform {
Metadata {
metadata_fields {
database = database
table = table
rowKind = rowKind
ts_ms = ts_ms
delay = delay
}
result_table_name = "trans_result"
}
}

sink {
Console {
source_table_name = "custom_name"
}
}

```

82 changes: 82 additions & 0 deletions docs/zh/transform-v2/metadata.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Metadata

> Metadata transform plugin
## Description
元数据转换插件,用于将元数据字段添加到数据中

## 支持的元数据

| Key | DataType | Description |
|:---------:|:--------:|:-----------------------:|
| database | string | 包含该行的数据库名 |
| table | string | 包含该行的数表名 |
| rowKind | string | 行类型 |
| ts_ms | Long | |
| delay | Long | 数据抽取时间与数据库变更时间的差 |
| partition | string | 包含该行对应数表的分区字段,多个使用`,`连接 |

## 配置选项

| name | type | required | default value | Description |
|:---------------:|------|:--------:|:-------------:|-------------------|
| metadata_fields | map || - | 元数据字段与输入字段相应的映射关系 |

### metadata_fields [map]

元数据字段和相应的输出字段之间的映射关系

```hocon
metadata_fields {
database = c_database
table = c_table
rowKind = c_rowKind
ts_ms = c_ts_ms
delay = c_delay
}
```

## 示例

```yaml

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
read_limit.bytes_per_second = 7000000
read_limit.rows_per_second = 400
}

source {
MySQL-CDC {
result_table_name = "customers_mysql_cdc"
server-id = 5652
username = "root"
password = "zdyk_Dev@2024"
table-names = ["source.user"]
base-url = "jdbc:mysql://172.16.17.123:3306/source"
}
}

transform {
Metadata {
metadata_fields {
database = database
table = table
rowKind = rowKind
ts_ms = ts_ms
delay = delay
}
result_table_name = "trans_result"
}
}

sink {
Console {
source_table_name = "custom_name"
}
}

```

1 change: 1 addition & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,4 @@ seatunnel.transform.DynamicCompile = seatunnel-transforms-v2
seatunnel.transform.LLM = seatunnel-transforms-v2
seatunnel.transform.Embedding = seatunnel-transforms-v2
seatunnel.transform.RowKindExtractor = seatunnel-transforms-v2
seatunnel.transform.Metadata = seatunnel-transforms-v2
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,44 @@ public enum CommonOptions {
/**
* The key of {@link Column#getOptions()} to specify the column value is a json format string.
*/
JSON("Json"),
JSON("Json", 0),
/** The key of {@link Column#getOptions()} to specify the column value is a metadata field. */
METADATA("Metadata"),
METADATA("Metadata", 0),
/**
* The key of {@link SeaTunnelRow#getOptions()} to store the partition value of the row value.
*/
PARTITION("Partition"),
;
PARTITION("Partition", 1),
/**
* The key of {@link SeaTunnelRow#getOptions()} to store the DATABASE value of the row value.
*/
DATABASE("Database", 1),
/** The key of {@link SeaTunnelRow#getOptions()} to store the TABLE value of the row value. */
TABLE("Table", 1),
/**
* The key of {@link SeaTunnelRow#getOptions()} to store the ROW_KIND value of the row value.
*/
ROW_KIND("RowKind", 1),
/**
* The key of {@link SeaTunnelRow#getOptions()} to store the EVENT_TIME value of the row value.
*/
EVENT_TIME("EventTime", 1),
/** The key of {@link SeaTunnelRow#getOptions()} to store the DELAY value of the row value. */
DELAY("Delay", 1);

private final String name;
private final int supportMetadataTrans;

CommonOptions(String name) {
CommonOptions(String name, int supportMetadataTrans) {
this.name = name;
this.supportMetadataTrans = supportMetadataTrans;
}

public static CommonOptions fromName(String name) {
for (CommonOptions option : CommonOptions.values()) {
if (option.getName().equals(name)) {
return option;
}
}
throw new IllegalArgumentException("Unknown option name: " + name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.table.type;

import org.apache.seatunnel.api.table.catalog.TablePath;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;

import static org.apache.seatunnel.api.table.type.CommonOptions.DELAY;
import static org.apache.seatunnel.api.table.type.CommonOptions.EVENT_TIME;
import static org.apache.seatunnel.api.table.type.CommonOptions.PARTITION;

public class MetadataUtil {

public static final List<String> METADATA_FIELDS;

static {
METADATA_FIELDS = new ArrayList<>();
Stream.of(CommonOptions.values())
.filter(v -> v.getSupportMetadataTrans() == 1)
.map(CommonOptions::getName)
.forEach(METADATA_FIELDS::add);
}

public static void setDelay(SeaTunnelRow row, Long delay) {
row.getOptions().put(DELAY.getName(), delay);
}

public static void setPartition(SeaTunnelRow row, String[] partition) {
row.getOptions().put(PARTITION.getName(), partition);
}

public static void setEventTime(SeaTunnelRow row, Long delay) {
row.getOptions().put(EVENT_TIME.getName(), delay);
}

public static Long getDelay(SeaTunnelRow row) {
return (Long) row.getOptions().get(DELAY.getName());
}

public static String getDatabase(SeaTunnelRow row) {
TablePath tablePath = TablePath.of(row.getTableId());
return tablePath.getDatabaseName();
}

public static String getTable(SeaTunnelRow row) {
TablePath tablePath = TablePath.of(row.getTableId());
return tablePath.getTableName();
}

public static String getRowKind(SeaTunnelRow row) {
return row.getRowKind().shortString();
}

public static String getPartitionStr(SeaTunnelRow row) {
Object partition = row.getOptions().get(PARTITION.getName());
return Objects.nonNull(partition) ? String.join(",", (String[]) partition) : null;
}

public static String[] getPartition(SeaTunnelRow row) {
return (String[]) row.getOptions().get(PARTITION.getName());
}

public static Long getEventTime(SeaTunnelRow row) {
return (Long) row.getOptions().get(EVENT_TIME.getName());
}

public static boolean isMetadataField(String fieldName) {
return METADATA_FIELDS.contains(fieldName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@ public final class SeaTunnelRow implements Serializable {
/** The array to store the actual internal format values. */
private final Object[] fields;

private volatile int size;

private Map<String, Object> options;

private volatile int size;

public SeaTunnelRow(int arity) {
this.fields = new Object[arity];
this.options = new HashMap<>();
}

public SeaTunnelRow(Object[] fields) {
this.fields = fields;
this.options = new HashMap<>();
}

public void setField(int pos, Object value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,9 @@ public static Object[] rowToArray(ResultSet rs, int size) throws SQLException {
}

/**
* Return the timestamp when the change event is produced in MySQL.
*
* <p>The field `source.ts_ms` in {@link SourceRecord} data struct is the time when the change
* event is operated in MySQL.
* In the source object, ts_ms indicates the time that the change was made in the database. By
* comparing the value for payload.source.ts_ms with the value for payload.ts_ms, you can
* determine the lag between the source database update and Debezium.
*/
public static Long getMessageTimestamp(SourceRecord record) {
Schema schema = record.valueSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher;
import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler;
import org.apache.seatunnel.api.table.type.MetadataUtil;
import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand Down Expand Up @@ -173,26 +174,39 @@ private void deserializeDataChangeRecord(SourceRecord record, Collector<SeaTunne
} else {
converters = tableRowConverters.get(DEFAULT_TABLE_NAME_KEY);
}

Long fetchTimestamp = SourceRecordUtils.getFetchTimestamp(record);
Long messageTimestamp = SourceRecordUtils.getMessageTimestamp(record);
long delay = -1L;
if (fetchTimestamp != null && messageTimestamp != null) {
delay = fetchTimestamp - messageTimestamp;
}
if (operation == Envelope.Operation.CREATE || operation == Envelope.Operation.READ) {
SeaTunnelRow insert = extractAfterRow(converters, record, messageStruct, valueSchema);
insert.setRowKind(RowKind.INSERT);
insert.setTableId(tableId);
MetadataUtil.setDelay(insert, delay);
MetadataUtil.setEventTime(insert, fetchTimestamp);
collector.collect(insert);
} else if (operation == Envelope.Operation.DELETE) {
SeaTunnelRow delete = extractBeforeRow(converters, record, messageStruct, valueSchema);
delete.setRowKind(RowKind.DELETE);
delete.setTableId(tableId);
MetadataUtil.setDelay(delete, delay);
MetadataUtil.setEventTime(delete, fetchTimestamp);
collector.collect(delete);
} else if (operation == Envelope.Operation.UPDATE) {
SeaTunnelRow before = extractBeforeRow(converters, record, messageStruct, valueSchema);
before.setRowKind(RowKind.UPDATE_BEFORE);
before.setTableId(tableId);
MetadataUtil.setDelay(before, delay);
MetadataUtil.setEventTime(before, fetchTimestamp);
collector.collect(before);

SeaTunnelRow after = extractAfterRow(converters, record, messageStruct, valueSchema);
after.setRowKind(RowKind.UPDATE_AFTER);
after.setTableId(tableId);
MetadataUtil.setDelay(after, delay);
MetadataUtil.setEventTime(after, fetchTimestamp);
collector.collect(after);
} else {
log.warn("Received {} operation, skip", operation);
Expand Down
Loading

0 comments on commit 5bebabe

Please sign in to comment.