Skip to content

Commit

Permalink
[Feature][Connector-Paimon] Support dynamic bucket splitting improves…
Browse files Browse the repository at this point in the history
… Paimon writing efficiency (#7335)
  • Loading branch information
hawk9821 committed Sep 20, 2024
1 parent 4a2e272 commit bc0326c
Show file tree
Hide file tree
Showing 41 changed files with 1,477 additions and 78 deletions.
47 changes: 46 additions & 1 deletion docs/en/connector-v2/sink/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ libfb303-xxx.jar
| data_save_mode | Enum | No | APPEND_DATA | The data save mode |
| paimon.table.primary-keys | String | No | - | Default comma-separated list of columns (primary key) that identify a row in tables.(Notice: The partition field needs to be included in the primary key fields) |
| paimon.table.partition-keys | String | No | - | Default comma-separated list of partition fields to use when creating tables. |
| paimon.table.write-props | Map | No | - | Properties passed through to paimon table initialization, [reference](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions). |
| paimon.table.write-props | Map | No | - | Properties passed through to paimon table initialization, [reference](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions). |
| paimon.hadoop.conf | Map | No | - | Properties in hadoop conf |
| paimon.hadoop.conf-path | String | No | - | The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files |

Expand Down Expand Up @@ -241,6 +241,51 @@ sink {
}
```

### Write to dynamic bucket table

Single dynamic bucket table with write props of paimon,operates on the primary key table and bucket is -1.

#### core options

Please [reference](https://paimon.apache.org/docs/master/primary-key-table/data-distribution/#dynamic-bucket)

| name | type | required | default values | Description |
|--------------------------------|------|----------|----------------|------------------------------------------------|
| dynamic-bucket.target-row-num | long | yes | 2000000L | controls the target row number for one bucket. |
| dynamic-bucket.initial-buckets | int | no | | controls the number of initialized bucket. |

```hocon
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
Mysql-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
username = "root"
password = "******"
table-names = ["seatunnel.role"]
}
}
sink {
Paimon {
catalog_name="seatunnel_test"
warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
database="seatunnel"
table="role"
paimon.table.write-props = {
bucket = -1
dynamic-bucket.target-row-num = 50000
}
paimon.table.partition-keys = "dt"
paimon.table.primary-keys = "pk_id,dt"
}
}
```

### Multiple table

#### example1
Expand Down
112 changes: 97 additions & 15 deletions docs/zh/connector-v2/sink/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,21 @@ libfb303-xxx.jar

## 连接器选项

| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|-----------------------------|-------|----------|------------------------------|------------------------------------------------------------------------------------------------------|
| warehouse | 字符串 || - | Paimon warehouse路径 |
| catalog_type | 字符串 || filesystem | Paimon的catalog类型,目前支持filesystem和hive |
| catalog_uri | 字符串 || - | Paimon catalog的uri,仅当catalog_type为hive时需要配置 |
| database | 字符串 || - | 数据库名称 |
| table | 字符串 || - | 表名 |
| hdfs_site_path | 字符串 || - | hdfs-site.xml文件路径 |
| schema_save_mode | 枚举 || CREATE_SCHEMA_WHEN_NOT_EXIST | Schema保存模式 |
| data_save_mode | 枚举 || APPEND_DATA | 数据保存模式 |
| paimon.table.primary-keys | 字符串 || - | 主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中) |
| paimon.table.partition-keys | 字符串 || - | 分区字段列表,多字段使用逗号分隔 |
| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|-----------------------------|-------|----------|------------------------------|---------------------------------------------------------------------------------------------------|
| warehouse | 字符串 || - | Paimon warehouse路径 |
| catalog_type | 字符串 || filesystem | Paimon的catalog类型,目前支持filesystem和hive |
| catalog_uri | 字符串 || - | Paimon catalog的uri,仅当catalog_type为hive时需要配置 |
| database | 字符串 || - | 数据库名称 |
| table | 字符串 || - | 表名 |
| hdfs_site_path | 字符串 || - | hdfs-site.xml文件路径 |
| schema_save_mode | 枚举 || CREATE_SCHEMA_WHEN_NOT_EXIST | Schema保存模式 |
| data_save_mode | 枚举 || APPEND_DATA | 数据保存模式 |
| paimon.table.primary-keys | 字符串 || - | 主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中) |
| paimon.table.partition-keys | 字符串 || - | 分区字段列表,多字段使用逗号分隔 |
| paimon.table.write-props | Map || - | Paimon表初始化指定的属性, [参考](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions) |
| paimon.hadoop.conf | Map || - | Hadoop配置文件属性信息 |
| paimon.hadoop.conf-path | 字符串 || - | Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置 |

| paimon.hadoop.conf | Map || - | Hadoop配置文件属性信息 |
| paimon.hadoop.conf-path | 字符串 || - | Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置 |

## 示例

Expand Down Expand Up @@ -241,8 +240,53 @@ sink {
}
```

### 动态分桶paimon单表

只有在主键表并指定bucket = -1时才会生效

#### 核心参数:[参考官网](https://paimon.apache.org/docs/master/primary-key-table/data-distribution/#dynamic-bucket)

| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|--------------------------------|------|------|----------|------------------|
| dynamic-bucket.target-row-num | long || 2000000L | 控制一个bucket的写入的行数 |
| dynamic-bucket.initial-buckets | int || | 控制初始化桶的数量 |

```hocon
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
Mysql-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
username = "root"
password = "******"
table-names = ["seatunnel.role"]
}
}
sink {
Paimon {
catalog_name="seatunnel_test"
warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
database="seatunnel"
table="role"
paimon.table.write-props = {
bucket = -1
dynamic-bucket.target-row-num = 50000
}
paimon.table.partition-keys = "dt"
paimon.table.primary-keys = "pk_id,dt"
}
}
```

### 多表

#### 示例1

```hocon
env {
parallelism = 1
Expand Down Expand Up @@ -272,3 +316,41 @@ sink {
}
```

#### 示例2

```hocon
env {
parallelism = 1
job.mode = "BATCH"
}
source {
Jdbc {
driver = oracle.jdbc.driver.OracleDriver
url = "jdbc:oracle:thin:@localhost:1521/XE"
user = testUser
password = testPassword
table_list = [
{
table_path = "TESTSCHEMA.TABLE_1"
},
{
table_path = "TESTSCHEMA.TABLE_2"
}
]
}
}
transform {
}
sink {
Paimon {
catalog_name="seatunnel_test"
warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
database="${schema_name}_test"
table="${table_name}_test"
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,21 @@
/** The default {@link SinkWriter.Context} implement class. */
public class DefaultSinkWriterContext implements SinkWriter.Context {
private final int subtask;
private final int numberOfParallelSubtasks;
private final EventListener eventListener;

public DefaultSinkWriterContext(int subtask) {
this(subtask, new DefaultEventProcessor());
public DefaultSinkWriterContext(int subtask, int parallelism) {
this(subtask, parallelism, new DefaultEventProcessor());
}

public DefaultSinkWriterContext(String jobId, int subtask) {
this(subtask, new DefaultEventProcessor(jobId));
public DefaultSinkWriterContext(String jobId, int subtask, int parallelism) {
this(subtask, parallelism, new DefaultEventProcessor(jobId));
}

public DefaultSinkWriterContext(int subtask, EventListener eventListener) {
public DefaultSinkWriterContext(
int subtask, int numberOfParallelSubtasks, EventListener eventListener) {
this.subtask = subtask;
this.numberOfParallelSubtasks = numberOfParallelSubtasks;
this.eventListener = eventListener;
}

Expand All @@ -45,6 +48,10 @@ public int getIndexOfSubtask() {
return subtask;
}

public int getNumberOfParallelSubtasks() {
return numberOfParallelSubtasks;
}

@Override
public MetricsContext getMetricsContext() {
// TODO Waiting for Flink and Spark to implement MetricsContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ interface Context extends Serializable {
/** @return The index of this subtask. */
int getIndexOfSubtask();

/** @return parallelism of this writer. */
default int getNumberOfParallelSubtasks() {
return 1;
}

/** @return metricsContext of this reader. */
MetricsContext getMetricsContext();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> createWri
int index = context.getIndexOfSubtask() * replicaNum + i;
writers.put(
SinkIdentifier.of(tableIdentifier, index),
sink.createWriter(new SinkContextProxy(index, context)));
sink.createWriter(new SinkContextProxy(index, replicaNum, context)));
sinkWritersContext.put(SinkIdentifier.of(tableIdentifier, index), context);
}
}
Expand Down Expand Up @@ -100,11 +100,12 @@ public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> restoreWr
if (state.isEmpty()) {
writers.put(
sinkIdentifier,
sink.createWriter(new SinkContextProxy(index, context)));
sink.createWriter(new SinkContextProxy(index, replicaNum, context)));
} else {
writers.put(
sinkIdentifier,
sink.restoreWriter(new SinkContextProxy(index, context), state));
sink.restoreWriter(
new SinkContextProxy(index, replicaNum, context), state));
}
sinkWritersContext.put(SinkIdentifier.of(tableIdentifier, index), context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ public class SinkContextProxy implements SinkWriter.Context {

private final int index;

private final int replicaNum;

private final SinkWriter.Context context;

public SinkContextProxy(int index, SinkWriter.Context context) {
public SinkContextProxy(int index, int replicaNum, SinkWriter.Context context) {
this.index = index;
this.replicaNum = replicaNum;
this.context = context;
}

Expand All @@ -37,6 +40,11 @@ public int getIndexOfSubtask() {
return index;
}

@Override
public int getNumberOfParallelSubtasks() {
return context.getNumberOfParallelSubtasks() * replicaNum;
}

@Override
public MetricsContext getMetricsContext() {
return context.getMetricsContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ void testBelongsToSubtask() {
}

void check(JobContext jobContext) {
DefaultSinkWriterContext dc1 = new DefaultSinkWriterContext(Integer.MAX_VALUE);
DefaultSinkWriterContext dc1 = new DefaultSinkWriterContext(Integer.MAX_VALUE, 1);
Xid xid1 = xidGenerator.generateXid(jobContext, dc1, System.currentTimeMillis());
Assertions.assertTrue(xidGenerator.belongsToSubtask(xid1, jobContext, dc1));
Assertions.assertFalse(
xidGenerator.belongsToSubtask(xid1, jobContext, new DefaultSinkWriterContext(2)));
xidGenerator.belongsToSubtask(
xid1, jobContext, new DefaultSinkWriterContext(2, 1)));
Assertions.assertFalse(xidGenerator.belongsToSubtask(xid1, new JobContext(), dc1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucketAssigner;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
Expand Down Expand Up @@ -79,7 +81,11 @@ public class PaimonSinkWriter

private final JobContext jobContext;

private TableSchema tableSchema;
private final TableSchema tableSchema;

private final PaimonBucketAssigner bucketAssigner;

private final boolean dynamicBucket;

public PaimonSinkWriter(
Context context,
Expand All @@ -97,6 +103,14 @@ public PaimonSinkWriter(
this.context = context;
this.jobContext = jobContext;
this.tableSchema = ((FileStoreTable) table).schema();
this.bucketAssigner =
new PaimonBucketAssigner(
table,
this.context.getNumberOfParallelSubtasks(),
this.context.getIndexOfSubtask());
BucketMode bucketMode = ((FileStoreTable) table).bucketMode();
this.dynamicBucket =
BucketMode.DYNAMIC == bucketMode || BucketMode.GLOBAL_DYNAMIC == bucketMode;
PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration);
}

Expand Down Expand Up @@ -139,7 +153,12 @@ public void write(SeaTunnelRow element) throws IOException {
try {
PaimonSecurityContext.runSecured(
() -> {
tableWrite.write(rowData);
if (dynamicBucket) {
int bucket = bucketAssigner.assign(rowData);
tableWrite.write(rowData, bucket);
} else {
tableWrite.write(rowData);
}
return null;
});
} catch (Exception e) {
Expand Down
Loading

0 comments on commit bc0326c

Please sign in to comment.