diff --git a/docs/en/connector-v2/sink/Paimon.md b/docs/en/connector-v2/sink/Paimon.md index 7ac12bf9d60..5c822fe3b27 100644 --- a/docs/en/connector-v2/sink/Paimon.md +++ b/docs/en/connector-v2/sink/Paimon.md @@ -282,6 +282,47 @@ sink { } ``` +### Single dynamic bucket table with write props of paimon,operates on the primary key table and bucket is -1 + +#### core options:[reference](https://paimon.apache.org/docs/0.8/primary-key-table/data-distribution/#dynamic-bucket) + +| name | type | required | default values | Description | +|--------------------------------|------|----------|----------------|------------------------------------------------| +| dynamic-bucket.target-row-num | long | 是 | 2000000L | controls the target row number for one bucket. | +| dynamic-bucket.initial-buckets | int | 否 | | controls the number of initialized bucket. | + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + table-names = ["seatunnel.role"] + } +} + +sink { + Paimon { + catalog_name="seatunnel_test" + warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/" + database="seatunnel" + table="role" + paimon.table.write-props = { + bucket = -1 + dynamic-bucket.target-row-num = 50000 + } + paimon.table.partition-keys = "dt" + paimon.table.primary-keys = "pk_id,dt" + } +} +``` + ### Multiple table #### example1 diff --git a/docs/zh/connector-v2/sink/Paimon.md b/docs/zh/connector-v2/sink/Paimon.md index 3f00036d9eb..279450970b1 100644 --- a/docs/zh/connector-v2/sink/Paimon.md +++ b/docs/zh/connector-v2/sink/Paimon.md @@ -30,21 +30,20 @@ libfb303-xxx.jar ## 连接器选项 -| 名称 | 类型 | 是否必须 | 默认值 | 描述 | -|-----------------------------|-----|------|------------------------------|---------------------------------------------------------------------------------------------------|---| -| warehouse | 字符串 | 是 | - | Paimon warehouse路径 | -| catalog_type | 字符串 | 否 | filesystem | Paimon的catalog类型,目前支持filesystem和hive | -| catalog_uri | 字符串 | 否 | - | Paimon catalog的uri,仅当catalog_type为hive时需要配置 | | -| database | 字符串 | 是 | - | 数据库名称 | -| table | 字符串 | 是 | - | 表名 | -| hdfs_site_path | 字符串 | 否 | - | hdfs-site.xml文件路径 | -| schema_save_mode | 枚举 | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | Schema保存模式 | -| data_save_mode | 枚举 | 否 | APPEND_DATA | 数据保存模式 | -| paimon.table.primary-keys | 字符串 | 否 | - | 主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中) | -| paimon.table.partition-keys | 字符串 | 否 | - | 分区字段列表,多字段使用逗号分隔 | -| paimon.table.write-props | Map | 否 | - | Paimon表初始化指定的属性, [参考](https://paimon.apache.org/docs/0.8/maintenance/configurations/#coreoptions) | -| paimon.hadoop.conf | Map | 否 | - | Hadoop配置文件属性信息 | -| paimon.hadoop.conf-path | 字符串 | 否 | - | Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置 | +| 名称 | 类型 | 是否必须 | 默认值 | 描述 | +|---------------------------|-----|------|------------------------------|---------------------------------------------------------------------------------------------------| +| warehouse | 字符串 | 是 | | Paimon warehouse路径 | +| catalog_type | 字符串 | 否 | filesystem | Paimon的catalog类型,目前支持filesystem和hive | +| catalog_uri | 字符串 | 否 | | Paimon catalog的uri,仅当catalog_type为hive时需要配置 | +| database | 字符串 | 是 | | 数据库名称 | +| table | 字符串 | 是 | | 表名 | +| hdfs_site_path | 字符串 | 否 | | hdfs-site.xml文件路径 | +| schema_save_mode | 枚举 | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | Schema保存模式 | +| data_save_mode | 枚举 | 否 | APPEND_DATA | 数据保存模式 | +| paimon.table.primary-keys | 字符串 | 否 | | 主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中) | +| paimon.table.write-props | Map | 否 | | Paimon表初始化指定的属性, [参考](https://paimon.apache.org/docs/0.8/maintenance/configurations/#coreoptions) | +| paimon.hadoop.conf | Map | 否 | | Hadoop配置文件属性信息 | +| paimon.hadoop.conf-path | 字符串 | 否 | | Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置 | ## 示例 diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java index 785f1065dd4..8d204d82f69 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java @@ -92,6 +92,11 @@ interface Context extends Serializable { /** @return The index of this subtask. */ int getIndexOfSubtask(); + /** @return parallelism of this writer. */ + default int getNumberOfParallelSubtasks() { + return 0; + } + /** @return metricsContext of this reader. */ MetricsContext getMetricsContext(); diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkContextProxy.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkContextProxy.java index 3a97bb27bc9..9d64559c2e5 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkContextProxy.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkContextProxy.java @@ -37,6 +37,11 @@ public int getIndexOfSubtask() { return index; } + @Override + public int getNumberOfParallelSubtasks() { + return context.getNumberOfParallelSubtasks(); + } + @Override public MetricsContext getMetricsContext() { return context.getMetricsContext(); diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java index acadf99990b..d58ab65cf70 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java @@ -27,6 +27,7 @@ import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException; import org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext; +import org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucketAssigner; import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo; import org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState; import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil; @@ -34,6 +35,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchTableCommit; @@ -79,7 +81,11 @@ public class PaimonSinkWriter private final JobContext jobContext; - private TableSchema tableSchema; + private final TableSchema tableSchema; + + private final PaimonBucketAssigner bucketAssigner; + + private final boolean dynamicBucket; public PaimonSinkWriter( Context context, @@ -97,6 +103,14 @@ public PaimonSinkWriter( this.context = context; this.jobContext = jobContext; this.tableSchema = ((FileStoreTable) table).schema(); + this.bucketAssigner = + new PaimonBucketAssigner( + table, + this.context.getNumberOfParallelSubtasks(), + this.context.getIndexOfSubtask()); + BucketMode bucketMode = ((FileStoreTable) table).bucketMode(); + this.dynamicBucket = + BucketMode.DYNAMIC == bucketMode || BucketMode.GLOBAL_DYNAMIC == bucketMode; PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration); } @@ -139,7 +153,12 @@ public void write(SeaTunnelRow element) throws IOException { try { PaimonSecurityContext.runSecured( () -> { - tableWrite.write(rowData); + if (dynamicBucket) { + int bucket = bucketAssigner.assign(rowData); + tableWrite.write(rowData, bucket); + } else { + tableWrite.write(rowData); + } return null; }); } catch (Exception e) { diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java new file mode 100644 index 00000000000..4f5f681fff3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.paimon.codegen.CodeGenUtils; +import org.apache.paimon.codegen.Projection; +import org.apache.paimon.crosspartition.IndexBootstrap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.index.SimpleHashBucketAssigner; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.RowPartitionKeyExtractor; + +import java.io.IOException; + +public class PaimonBucketAssigner { + + private final RowPartitionKeyExtractor extractor; + + private final Projection bucketKeyProjection; + + private final SimpleHashBucketAssigner simpleHashBucketAssigner; + + private final TableSchema schema; + + public PaimonBucketAssigner(Table table, int numAssigners, int assignId) { + FileStoreTable fileStoreTable = (FileStoreTable) table; + this.schema = fileStoreTable.schema(); + this.extractor = new RowPartitionKeyExtractor(fileStoreTable.schema()); + this.bucketKeyProjection = + CodeGenUtils.newProjection( + fileStoreTable.schema().logicalRowType(), + fileStoreTable.schema().projection(fileStoreTable.schema().bucketKeys())); + long dynamicBucketTargetRowNum = + ((FileStoreTable) table).coreOptions().dynamicBucketTargetRowNum(); + this.simpleHashBucketAssigner = + new SimpleHashBucketAssigner(numAssigners, assignId, dynamicBucketTargetRowNum); + loadBucketIndex(fileStoreTable, numAssigners, assignId); + } + + private void loadBucketIndex(FileStoreTable fileStoreTable, int numAssigners, int assignId) { + IndexBootstrap indexBootstrap = new IndexBootstrap(fileStoreTable); + try (RecordReader recordReader = + indexBootstrap.bootstrap(numAssigners, assignId)) { + RecordReaderIterator readerIterator = + new RecordReaderIterator<>(recordReader); + while (readerIterator.hasNext()) { + InternalRow row = readerIterator.next(); + assign(row); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public int assign(InternalRow rowData) { + int hash; + if (CollectionUtils.isEmpty(this.schema.bucketKeys())) { + hash = extractor.trimmedPrimaryKey(rowData).hashCode(); + } else { + hash = bucketKeyProjection.apply(rowData).hashCode(); + } + return Math.abs( + this.simpleHashBucketAssigner.assign(this.extractor.partition(rowData), hash)); + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssignerTest.java b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssignerTest.java new file mode 100644 index 00000000000..4bcb909a814 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssignerTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.RowPartitionKeyExtractor; +import org.apache.paimon.types.DataTypes; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class PaimonBucketAssignerTest { + + private Table table; + private static final String TABLE_NAME = "default_table"; + private static final String DATABASE_NAME = "default_database"; + + @BeforeEach + public void before() throws Exception { + boolean isWindows = + System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS"); + Options options = new Options(); + if (isWindows) { + options.set("warehouse", "C:/Users/" + System.getProperty("user.name") + "/tmp/paimon"); + } else { + options.set("warehouse", "file:///tmp/paimon"); + } + Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(options)); + catalog.createDatabase(DATABASE_NAME, true); + Identifier identifier = Identifier.create(DATABASE_NAME, TABLE_NAME); + if (!catalog.tableExists(identifier)) { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("id", DataTypes.INT(), "primary Key"); + schemaBuilder.column("name", DataTypes.STRING(), "name"); + schemaBuilder.primaryKey("id"); + schemaBuilder.option("bucket", "-1"); + schemaBuilder.option("dynamic-bucket.target-row-num", "20"); + Schema schema = schemaBuilder.build(); + catalog.createTable(identifier, schema, false); + } + table = catalog.getTable(identifier); + } + + @Test + public void bucketAssigner() { + FileStoreTable fileStoreTable = (FileStoreTable) table; + RowPartitionKeyExtractor keyExtractor = + new RowPartitionKeyExtractor(fileStoreTable.schema()); + PaimonBucketAssigner paimonBucketAssigner = new PaimonBucketAssigner(fileStoreTable, 1, 0); + Map bucketInformation = new HashMap<>(); + for (int i = 0; i < 50; i++) { + GenericRow row = GenericRow.of(i, BinaryString.fromString(String.valueOf(i))); + int assign = paimonBucketAssigner.assign(row); + int hashCode = keyExtractor.trimmedPrimaryKey(row).hashCode(); + bucketInformation.put(hashCode, assign); + } + List bucketSize = + bucketInformation.values().stream().distinct().collect(Collectors.toList()); + Assertions.assertEquals(bucketSize.size(), 3); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java index 4b1d7dd86ce..5b1c45172f4 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java @@ -611,7 +611,7 @@ private Identifier getIdentifier(String dbName, String tbName) { private Catalog getCatalog() { Options options = new Options(); if (isWindows) { - options.set("warehouse", "file://" + CATALOG_DIR_WIN); + options.set("warehouse", CATALOG_DIR_WIN); } else { options.set("warehouse", "file://" + CATALOG_DIR); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java new file mode 100644 index 00000000000..6d74669ae38 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.paimon; + +import org.apache.seatunnel.common.utils.FileUtils; +import org.apache.seatunnel.core.starter.utils.CompressionUtils; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; + +import org.apache.commons.compress.archivers.ArchiveException; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.crosspartition.IndexBootstrap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.options.Options; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.table.source.TableScan; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.TimestampType; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; + +import lombok.extern.slf4j.Slf4j; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.given; + +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = + "Spark and Flink engine can not auto create paimon table on worker node in local file(e.g flink tm) by savemode feature which can lead error") +@Slf4j +public class PaimonSinkDynamicBucketIT extends TestSuiteBase implements TestResource { + + private static String CATALOG_ROOT_DIR = "/tmp/"; + private static final String NAMESPACE = "paimon"; + private static final String NAMESPACE_TAR = "paimon.tar.gz"; + private static final String CATALOG_DIR = CATALOG_ROOT_DIR + NAMESPACE + "/"; + private String CATALOG_ROOT_DIR_WIN = "C:/Users/"; + private String CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/"; + private boolean isWindows; + + @BeforeAll + @Override + public void startUp() throws Exception { + this.isWindows = + System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS"); + CATALOG_ROOT_DIR_WIN = CATALOG_ROOT_DIR_WIN + System.getProperty("user.name") + "/tmp/"; + CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/"; + } + + @AfterAll + @Override + public void tearDown() throws Exception {} + + @TestTemplate + public void testWriteAndReadPaimon(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult textWriteResult = + container.executeJob("/fake_to_dynamic_bucket_paimon_case1.conf"); + Assertions.assertEquals(0, textWriteResult.getExitCode()); + Container.ExecResult readResult = container.executeJob("/paimon_to_assert.conf"); + Assertions.assertEquals(0, readResult.getExitCode()); + Container.ExecResult readProjectionResult = + container.executeJob("/paimon_projection_to_assert.conf"); + Assertions.assertEquals(0, readProjectionResult.getExitCode()); + } + + @TestTemplate + public void testBucketCount(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult textWriteResult = + container.executeJob("/fake_to_dynamic_bucket_paimon_case2.conf"); + Assertions.assertEquals(0, textWriteResult.getExitCode()); + given().ignoreExceptions() + .await() + .atLeast(100L, TimeUnit.MILLISECONDS) + .atMost(30L, TimeUnit.SECONDS) + .untilAsserted( + () -> { + // copy paimon to local + container.executeExtraCommands(containerExtendedFactory); + FileStoreTable table = + (FileStoreTable) getTable("default", "st_test_2"); + IndexBootstrap indexBootstrap = new IndexBootstrap(table); + List fieldNames = + IndexBootstrap.bootstrapType(table.schema()).getFieldNames(); + int bucketIndexOf = fieldNames.indexOf("_BUCKET"); + Set bucketList = new HashSet<>(); + try (RecordReader recordReader = + indexBootstrap.bootstrap(1, 0)) { + recordReader.forEachRemaining( + row -> bucketList.add(row.getInt(bucketIndexOf))); + } + Assertions.assertEquals(2, bucketList.size()); + }); + } + + @TestTemplate + public void testCDCWrite(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult textWriteResult = + container.executeJob("/fake_cdc_to_dynamic_bucket_paimon_case.conf"); + Assertions.assertEquals(0, textWriteResult.getExitCode()); + given().ignoreExceptions() + .await() + .atLeast(100L, TimeUnit.MILLISECONDS) + .atMost(30L, TimeUnit.SECONDS) + .untilAsserted( + () -> { + // copy paimon to local + container.executeExtraCommands(containerExtendedFactory); + FileStoreTable table = + (FileStoreTable) getTable("default", "st_test_3"); + List fields = table.schema().fields(); + for (DataField field : fields) { + if (field.name().equalsIgnoreCase("one_time")) { + Assertions.assertEquals( + 0, ((TimestampType) field.type()).getPrecision()); + } + if (field.name().equalsIgnoreCase("two_time")) { + Assertions.assertEquals( + 3, ((TimestampType) field.type()).getPrecision()); + } + if (field.name().equalsIgnoreCase("three_time")) { + Assertions.assertEquals( + 6, ((TimestampType) field.type()).getPrecision()); + } + if (field.name().equalsIgnoreCase("four_time")) { + Assertions.assertEquals( + 9, ((TimestampType) field.type()).getPrecision()); + } + } + ReadBuilder readBuilder = table.newReadBuilder(); + TableScan.Plan plan = readBuilder.newScan().plan(); + TableRead tableRead = readBuilder.newRead(); + List result = new ArrayList<>(); + try (RecordReader reader = tableRead.createReader(plan)) { + reader.forEachRemaining( + row -> + result.add( + new PaimonRecord( + row.getLong(0), + row.getString(1).toString(), + row.getTimestamp(2, 0), + row.getTimestamp(3, 3), + row.getTimestamp(4, 6), + row.getTimestamp(5, 9)))); + } + Assertions.assertEquals(2, result.size()); + for (PaimonRecord paimonRecord : result) { + Assertions.assertEquals( + paimonRecord.oneTime.toString(), "2024-03-10T10:00:12"); + Assertions.assertEquals( + paimonRecord.twoTime.toString(), "2024-03-10T10:00:00.123"); + Assertions.assertEquals( + paimonRecord.threeTime.toString(), + "2024-03-10T10:00:00.123456"); + Assertions.assertEquals( + paimonRecord.fourTime.toString(), + "2024-03-10T10:00:00.123456789"); + } + }); + } + + protected final ContainerExtendedFactory containerExtendedFactory = + container -> { + if (isWindows) { + FileUtils.deleteFile(CATALOG_ROOT_DIR_WIN + NAMESPACE_TAR); + FileUtils.deleteFile(CATALOG_ROOT_DIR_WIN + "paimon.tar"); + FileUtils.createNewDir(CATALOG_ROOT_DIR_WIN); + } else { + FileUtils.deleteFile(CATALOG_ROOT_DIR + NAMESPACE_TAR); + FileUtils.createNewDir(CATALOG_DIR); + } + + container.execInContainer( + "sh", + "-c", + "cd " + + CATALOG_ROOT_DIR + + " && tar -czvf " + + NAMESPACE_TAR + + " " + + NAMESPACE); + container.copyFileFromContainer( + CATALOG_ROOT_DIR + NAMESPACE_TAR, + (isWindows ? CATALOG_ROOT_DIR_WIN : CATALOG_ROOT_DIR) + NAMESPACE_TAR); + if (isWindows) { + extractFilesWin(); + } else { + extractFiles(); + } + }; + + private void extractFiles() { + ProcessBuilder processBuilder = new ProcessBuilder(); + processBuilder.command( + "sh", "-c", "cd " + CATALOG_ROOT_DIR + " && tar -zxvf " + NAMESPACE_TAR); + try { + Process process = processBuilder.start(); + // wait command completed + int exitCode = process.waitFor(); + if (exitCode == 0) { + log.info("Extract files successful."); + } else { + log.error("Extract files failed with exit code " + exitCode); + } + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + } + + private void extractFilesWin() { + try { + CompressionUtils.unGzip( + new File(CATALOG_ROOT_DIR_WIN + NAMESPACE_TAR), new File(CATALOG_ROOT_DIR_WIN)); + CompressionUtils.unTar( + new File(CATALOG_ROOT_DIR_WIN + "paimon.tar"), new File(CATALOG_ROOT_DIR_WIN)); + } catch (IOException | ArchiveException e) { + throw new RuntimeException(e); + } + } + + protected Table getTable(String dbName, String tbName) { + Options options = new Options(); + if (isWindows) { + options.set("warehouse", CATALOG_DIR_WIN); + } else { + options.set("warehouse", "file://" + CATALOG_DIR); + } + try { + Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(options)); + return catalog.getTable(Identifier.create(dbName, tbName)); + } catch (Catalog.TableNotExistException e) { + // do something + throw new RuntimeException("table not exist"); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_to_dynamic_bucket_paimon_case.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_to_dynamic_bucket_paimon_case.conf new file mode 100644 index 00000000000..f9993fe33f9 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_to_dynamic_bucket_paimon_case.conf @@ -0,0 +1,131 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + schema = { + columns = [ + { + name = pk_id + type = bigint + nullable = false + comment = "primary key id" + }, + { + name = name + type = "string" + nullable = true + comment = "name" + }, + { + name = one_time + type = timestamp + nullable = false + comment = "one time" + columnScale = 0 + }, + { + name = two_time + type = timestamp + nullable = false + comment = "two time" + columnScale = 3 + }, + { + name = three_time + type = timestamp + nullable = false + comment = "three time" + columnScale = 6 + }, + { + name = four_time + type = timestamp + nullable = false + comment = "four time" + columnScale = 9 + } + ] + primaryKey { + name = "pk_id" + columnNames = [pk_id] + } + } + rows = [ + { + kind = INSERT + fields = [1, "A", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123", "2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"] + }, + { + kind = INSERT + fields = [2, "B", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123", "2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"] + }, + { + kind = INSERT + fields = [3, "C", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123", "2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"] + }, + { + kind = INSERT + fields = [3, "C", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123", "2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"] + }, + { + kind = INSERT + fields = [3, "C", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123", "2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"] + }, + { + kind = INSERT + fields = [3, "C", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123", "2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"] + }, + { + kind = UPDATE_BEFORE + fields = [1, "A", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123", "2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"] + }, + { + kind = UPDATE_AFTER + fields = [1, "A_1", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123", "2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"] + }, + { + kind = DELETE + fields = [2, "B", "2024-03-10T10:00:12", "2024-03-10T10:00:00.123", "2024-03-10T10:00:00.123456", "2024-03-10T10:00:00.123456789"] + } + ] + } +} + +transform { + +} + +sink { + Paimon { + warehouse = "file:///tmp/paimon" + database = "default" + table = "st_test_3" + paimon.table.write-props = { + bucket = -1 + dynamic-bucket.target-row-num = 50000 + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case1.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case1.conf new file mode 100644 index 00000000000..e238f560440 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case1.conf @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + row.num = 100000 + schema = { + fields { + pk_id = int + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(30, 8)" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + primaryKey { + name = "pk_id" + columnNames = [pk_id] + } + } + result_table_name = "fake" + } +} + +sink { + Paimon { + warehouse = "file:///tmp/paimon" + database = "default" + table = "st_test" + paimon.table.write-props = { + bucket = -1 + dynamic-bucket.target-row-num = 50000 + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf new file mode 100644 index 00000000000..338e624d047 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + row.num = 100000 + schema = { + fields { + pk_id = bigint + name = string + score = int + } + primaryKey { + name = "pk_id" + columnNames = [pk_id] + } + } + } +} + +sink { + Paimon { + warehouse = "file:///tmp/paimon" + database = "default" + table = "st_test_2" + paimon.table.write-props = { + bucket = -1 + dynamic-bucket.target-row-num = 50000 + } + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java index 747198d3eb8..77329290f50 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SinkWriterContext.java @@ -21,23 +21,38 @@ import org.apache.seatunnel.api.event.EventListener; import org.apache.seatunnel.api.sink.SinkWriter; +import com.google.common.base.Preconditions; + public class SinkWriterContext implements SinkWriter.Context { private static final long serialVersionUID = -3082515319043725121L; - private final int indexID; + private final int indexOfSubtask; + private final int numberOfParallelSubtasks; private final MetricsContext metricsContext; private final EventListener eventListener; public SinkWriterContext( - int indexID, MetricsContext metricsContext, EventListener eventListener) { - this.indexID = indexID; + int numberOfParallelSubtasks, + int indexOfSubtask, + MetricsContext metricsContext, + EventListener eventListener) { + Preconditions.checkArgument( + numberOfParallelSubtasks >= 1, "Parallelism must be a positive number."); + Preconditions.checkArgument( + indexOfSubtask >= 0, "Task index must be a non-negative number."); + this.numberOfParallelSubtasks = numberOfParallelSubtasks; + this.indexOfSubtask = indexOfSubtask; this.metricsContext = metricsContext; this.eventListener = eventListener; } @Override public int getIndexOfSubtask() { - return indexID; + return indexOfSubtask; + } + + public int getNumberOfParallelSubtasks() { + return numberOfParallelSubtasks; } @Override diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index cacaa75aaef..9699f0fe3c4 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -288,13 +288,21 @@ public void restoreState(List actionStateList) throws Except sinkAction .getSink() .createWriter( - new SinkWriterContext(indexID, metricsContext, eventListener)); + new SinkWriterContext( + sinkAction.getParallelism(), + indexID, + metricsContext, + eventListener)); } else { this.writer = sinkAction .getSink() .restoreWriter( - new SinkWriterContext(indexID, metricsContext, eventListener), + new SinkWriterContext( + sinkAction.getParallelism(), + indexID, + metricsContext, + eventListener), states); } if (this.writer instanceof SupportResourceShare) {