Skip to content

Commit

Permalink
[Feature][CONNECTORS-V2-Paimon] Dynamic bucket splitting improves Pai…
Browse files Browse the repository at this point in the history
…mon writing efficiency

[Feature][CONNECTORS-V2-Paimon] Dynamic bucket splitting improves Paimon writing efficiency

[Feature][CONNECTORS-V2-Paimon] Dynamic bucket splitting improves Paimon writing efficiency

[Feature][CONNECTORS-V2-Paimon] Dynamic bucket splitting improves Paimon writing efficiency

[Feature][CONNECTORS-V2-Paimon] Dynamic bucket splitting improves Paimon writing efficiency

[Feature][CONNECTORS-V2-Paimon] Dynamic bucket splitting improves Paimon writing efficiency

[Feature][CONNECTORS-V2-Paimon] Dynamic bucket splitting improves Paimon writing efficiency

[Feature][CONNECTORS-V2-Paimon] Dynamic bucket splitting improves Paimon writing efficiency

[Feature][CONNECTORS-V2-Paimon] Dynamic bucket splitting improves Paimon writing efficiency

[Feature][CONNECTORS-V2-Paimon] Dynamic bucket splitting improves Paimon writing efficiency

[Feature][CONNECTORS-V2-Paimon] Dynamic bucket splitting improves Paimon writing efficiency

[Feature][CONNECTORS-V2-Paimon] Dynamic bucket splitting improves Paimon writing efficiency
  • Loading branch information
hawk9821 committed Aug 23, 2024
1 parent d701c2d commit c93f7b8
Show file tree
Hide file tree
Showing 14 changed files with 814 additions and 24 deletions.
41 changes: 41 additions & 0 deletions docs/en/connector-v2/sink/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,47 @@ sink {
}
```

### Single dynamic bucket table with write props of paimon,operates on the primary key table and bucket is -1

#### core options:[reference](https://paimon.apache.org/docs/0.8/primary-key-table/data-distribution/#dynamic-bucket)

| name | type | required | default values | Description |
|--------------------------------|------|----------|----------------|------------------------------------------------|
| dynamic-bucket.target-row-num | long || 2000000L | controls the target row number for one bucket. |
| dynamic-bucket.initial-buckets | int || | controls the number of initialized bucket. |

```hocon
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
Mysql-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
username = "root"
password = "******"
table-names = ["seatunnel.role"]
}
}
sink {
Paimon {
catalog_name="seatunnel_test"
warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
database="seatunnel"
table="role"
paimon.table.write-props = {
bucket = -1
dynamic-bucket.target-row-num = 50000
}
paimon.table.partition-keys = "dt"
paimon.table.primary-keys = "pk_id,dt"
}
}
```

### Multiple table

#### example1
Expand Down
29 changes: 14 additions & 15 deletions docs/zh/connector-v2/sink/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,20 @@ libfb303-xxx.jar

## 连接器选项

| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|-----------------------------|-----|------|------------------------------|---------------------------------------------------------------------------------------------------|---|
| warehouse | 字符串 || - | Paimon warehouse路径 |
| catalog_type | 字符串 || filesystem | Paimon的catalog类型,目前支持filesystem和hive |
| catalog_uri | 字符串 || - | Paimon catalog的uri,仅当catalog_type为hive时需要配置 | |
| database | 字符串 || - | 数据库名称 |
| table | 字符串 || - | 表名 |
| hdfs_site_path | 字符串 || - | hdfs-site.xml文件路径 |
| schema_save_mode | 枚举 || CREATE_SCHEMA_WHEN_NOT_EXIST | Schema保存模式 |
| data_save_mode | 枚举 || APPEND_DATA | 数据保存模式 |
| paimon.table.primary-keys | 字符串 || - | 主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中) |
| paimon.table.partition-keys | 字符串 || - | 分区字段列表,多字段使用逗号分隔 |
| paimon.table.write-props | Map || - | Paimon表初始化指定的属性, [参考](https://paimon.apache.org/docs/0.8/maintenance/configurations/#coreoptions) |
| paimon.hadoop.conf | Map || - | Hadoop配置文件属性信息 |
| paimon.hadoop.conf-path | 字符串 || - | Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置 |
| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|---------------------------|-----|------|------------------------------|---------------------------------------------------------------------------------------------------|
| warehouse | 字符串 || | Paimon warehouse路径 |
| catalog_type | 字符串 || filesystem | Paimon的catalog类型,目前支持filesystem和hive |
| catalog_uri | 字符串 || | Paimon catalog的uri,仅当catalog_type为hive时需要配置 |
| database | 字符串 || | 数据库名称 |
| table | 字符串 || | 表名 |
| hdfs_site_path | 字符串 || | hdfs-site.xml文件路径 |
| schema_save_mode | 枚举 || CREATE_SCHEMA_WHEN_NOT_EXIST | Schema保存模式 |
| data_save_mode | 枚举 || APPEND_DATA | 数据保存模式 |
| paimon.table.primary-keys | 字符串 || | 主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中) |
| paimon.table.write-props | Map || | Paimon表初始化指定的属性, [参考](https://paimon.apache.org/docs/0.8/maintenance/configurations/#coreoptions) |
| paimon.hadoop.conf | Map || | Hadoop配置文件属性信息 |
| paimon.hadoop.conf-path | 字符串 || | Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置 |

## 示例

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ interface Context extends Serializable {
/** @return The index of this subtask. */
int getIndexOfSubtask();

/** @return parallelism of this writer. */
default int getNumberOfParallelSubtasks() {
return 0;
}

/** @return metricsContext of this reader. */
MetricsContext getMetricsContext();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public int getIndexOfSubtask() {
return index;
}

@Override
public int getNumberOfParallelSubtasks() {
return context.getNumberOfParallelSubtasks();
}

@Override
public MetricsContext getMetricsContext() {
return context.getMetricsContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucketAssigner;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
Expand Down Expand Up @@ -79,7 +81,11 @@ public class PaimonSinkWriter

private final JobContext jobContext;

private TableSchema tableSchema;
private final TableSchema tableSchema;

private final PaimonBucketAssigner bucketAssigner;

private final boolean dynamicBucket;

public PaimonSinkWriter(
Context context,
Expand All @@ -97,6 +103,14 @@ public PaimonSinkWriter(
this.context = context;
this.jobContext = jobContext;
this.tableSchema = ((FileStoreTable) table).schema();
this.bucketAssigner =
new PaimonBucketAssigner(
table,
this.context.getNumberOfParallelSubtasks(),
this.context.getIndexOfSubtask());
BucketMode bucketMode = ((FileStoreTable) table).bucketMode();
this.dynamicBucket =
BucketMode.DYNAMIC == bucketMode || BucketMode.GLOBAL_DYNAMIC == bucketMode;
PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration);
}

Expand Down Expand Up @@ -139,7 +153,12 @@ public void write(SeaTunnelRow element) throws IOException {
try {
PaimonSecurityContext.runSecured(
() -> {
tableWrite.write(rowData);
if (dynamicBucket) {
int bucket = bucketAssigner.assign(rowData);
tableWrite.write(rowData, bucket);
} else {
tableWrite.write(rowData);
}
return null;
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket;

import org.apache.commons.collections.CollectionUtils;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.Projection;
import org.apache.paimon.crosspartition.IndexBootstrap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.index.SimpleHashBucketAssigner;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.RowPartitionKeyExtractor;

import java.io.IOException;

public class PaimonBucketAssigner {

private final RowPartitionKeyExtractor extractor;

private final Projection bucketKeyProjection;

private final SimpleHashBucketAssigner simpleHashBucketAssigner;

private final TableSchema schema;

public PaimonBucketAssigner(Table table, int numAssigners, int assignId) {
FileStoreTable fileStoreTable = (FileStoreTable) table;
this.schema = fileStoreTable.schema();
this.extractor = new RowPartitionKeyExtractor(fileStoreTable.schema());
this.bucketKeyProjection =
CodeGenUtils.newProjection(
fileStoreTable.schema().logicalRowType(),
fileStoreTable.schema().projection(fileStoreTable.schema().bucketKeys()));
long dynamicBucketTargetRowNum =
((FileStoreTable) table).coreOptions().dynamicBucketTargetRowNum();
this.simpleHashBucketAssigner =
new SimpleHashBucketAssigner(numAssigners, assignId, dynamicBucketTargetRowNum);
loadBucketIndex(fileStoreTable, numAssigners, assignId);
}

private void loadBucketIndex(FileStoreTable fileStoreTable, int numAssigners, int assignId) {
IndexBootstrap indexBootstrap = new IndexBootstrap(fileStoreTable);
try (RecordReader<InternalRow> recordReader =
indexBootstrap.bootstrap(numAssigners, assignId)) {
RecordReaderIterator<InternalRow> readerIterator =
new RecordReaderIterator<>(recordReader);
while (readerIterator.hasNext()) {
InternalRow row = readerIterator.next();
assign(row);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public int assign(InternalRow rowData) {
int hash;
if (CollectionUtils.isEmpty(this.schema.bucketKeys())) {
hash = extractor.trimmedPrimaryKey(rowData).hashCode();
} else {
hash = bucketKeyProjection.apply(rowData).hashCode();
}
return Math.abs(
this.simpleHashBucketAssigner.assign(this.extractor.partition(rowData), hash));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
import org.apache.paimon.types.DataTypes;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class PaimonBucketAssignerTest {

private Table table;
private static final String TABLE_NAME = "default_table";
private static final String DATABASE_NAME = "default_database";

@BeforeEach
public void before() throws Exception {
boolean isWindows =
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
Options options = new Options();
if (isWindows) {
options.set("warehouse", "C:/Users/" + System.getProperty("user.name") + "/tmp/paimon");
} else {
options.set("warehouse", "file:///tmp/paimon");
}
Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(options));
catalog.createDatabase(DATABASE_NAME, true);
Identifier identifier = Identifier.create(DATABASE_NAME, TABLE_NAME);
if (!catalog.tableExists(identifier)) {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("id", DataTypes.INT(), "primary Key");
schemaBuilder.column("name", DataTypes.STRING(), "name");
schemaBuilder.primaryKey("id");
schemaBuilder.option("bucket", "-1");
schemaBuilder.option("dynamic-bucket.target-row-num", "20");
Schema schema = schemaBuilder.build();
catalog.createTable(identifier, schema, false);
}
table = catalog.getTable(identifier);
}

@Test
public void bucketAssigner() {
FileStoreTable fileStoreTable = (FileStoreTable) table;
RowPartitionKeyExtractor keyExtractor =
new RowPartitionKeyExtractor(fileStoreTable.schema());
PaimonBucketAssigner paimonBucketAssigner = new PaimonBucketAssigner(fileStoreTable, 1, 0);
Map<Integer, Integer> bucketInformation = new HashMap<>();
for (int i = 0; i < 50; i++) {
GenericRow row = GenericRow.of(i, BinaryString.fromString(String.valueOf(i)));
int assign = paimonBucketAssigner.assign(row);
int hashCode = keyExtractor.trimmedPrimaryKey(row).hashCode();
bucketInformation.put(hashCode, assign);
}
List<Integer> bucketSize =
bucketInformation.values().stream().distinct().collect(Collectors.toList());
Assertions.assertEquals(bucketSize.size(), 3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ private Identifier getIdentifier(String dbName, String tbName) {
private Catalog getCatalog() {
Options options = new Options();
if (isWindows) {
options.set("warehouse", "file://" + CATALOG_DIR_WIN);
options.set("warehouse", CATALOG_DIR_WIN);
} else {
options.set("warehouse", "file://" + CATALOG_DIR);
}
Expand Down
Loading

0 comments on commit c93f7b8

Please sign in to comment.