diff --git a/docs/en/connector-v2/sink/Iceberg.md b/docs/en/connector-v2/sink/Iceberg.md
new file mode 100644
index 000000000000..0ca271a60279
--- /dev/null
+++ b/docs/en/connector-v2/sink/Iceberg.md
@@ -0,0 +1,184 @@
+# Apache Iceberg
+
+> Apache Iceberg sink connector
+
+## Support Iceberg Version
+
+- 1.4.2
+
+## Support Those Engines
+
+> Spark
+> Flink
+> SeaTunnel Zeta
+
+## Description
+
+Sink connector for Apache Iceberg. It can support cdc mode 、auto create table and table schema evolution.
+
+## Supported DataSource Info
+
+| Datasource | Dependent | Maven |
+|------------|-----------|---------------------------------------------------------------------------|
+| Iceberg | hive-exec | [Download](https://mvnrepository.com/artifact/org.apache.hive/hive-exec) |
+| Iceberg | libfb303 | [Download](https://mvnrepository.com/artifact/org.apache.thrift/libfb303) |
+
+## Database Dependency
+
+> In order to be compatible with different versions of Hadoop and Hive, the scope of hive-exec in the project pom file are provided, so if you use the Flink engine, first you may need to add the following Jar packages to /lib directory, if you are using the Spark engine and integrated with Hadoop, then you do not need to add the following Jar packages.
+
+```
+hive-exec-xxx.jar
+libfb303-xxx.jar
+```
+
+> Some versions of the hive-exec package do not have libfb303-xxx.jar, so you also need to manually import the Jar package.
+
+## Data Type Mapping
+
+| SeaTunnel Data type | Iceberg Data type |
+|---------------------|-------------------|
+| BOOLEAN | BOOLEAN |
+| INT | INTEGER |
+| BIGINT | LONG |
+| FLOAT | FLOAT |
+| DOUBLE | DOUBLE |
+| DATE | DATE |
+| TIME | TIME |
+| TIMESTAMP | TIMESTAMP |
+| STRING | STRING |
+| BYTES | FIXED
BINARY |
+| DECIMAL | DECIMAL |
+| ROW | STRUCT |
+| ARRAY | LIST |
+| MAP | MAP |
+
+## Sink Options
+
+| Name | Type | Required | Default | Description |
+|----------------------------------------|---------|----------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| catalog_name | string | yes | default | User-specified catalog name. default is `default` |
+| namespace | string | yes | default | The iceberg database name in the backend catalog. default is `default` |
+| table | string | yes | - | The iceberg table name in the backend catalog. |
+| iceberg.catalog.config | map | yes | - | Specify the properties for initializing the Iceberg catalog, which can be referenced in this file:"https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/CatalogProperties.java" |
+| hadoop.config | map | no | - | Properties passed through to the Hadoop configuration |
+| iceberg.hadoop-conf-path | string | no | - | The specified loading paths for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files. |
+| case_sensitive | boolean | no | false | If data columns where selected via schema [config], controls whether the match to the schema will be done with case sensitivity. |
+| iceberg.table.write-props | map | no | - | Properties passed through to Iceberg writer initialization, these take precedence, such as 'write.format.default', 'write.target-file-size-bytes', and other settings, can be found with specific parameters at 'https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/TableProperties.java'. |
+| iceberg.table.auto-create-enabled | boolean | no | false | Set to true to automatically create destination table, false otherwise |
+| iceberg.table.auto-create-props | map | no | - | Configuration specified by Iceberg during automatic table creation. |
+| iceberg.table.schema-evolution-enabled | boolean | no | false | Setting to true enables Iceberg tables to support schema evolution during the synchronization process |
+| iceberg.table.primary-keys | string | no | - | Default comma-separated list of columns that identify a row in tables (primary key) |
+| iceberg.table.partition-keys | string | no | - | Default comma-separated list of partition fields to use when creating tables |
+| iceberg.table.upsert-mode-enabled | boolean | no | false | Set to `true` to enable upsert mode, default is `false` |
+
+## Task Example
+
+### Simple:
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ MySQL-CDC {
+ result_table_name = "customers_mysql_cdc_iceberg"
+ server-id = 5652
+ username = "st_user"
+ password = "seatunnel"
+ table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
+ base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+ }
+}
+
+transform {
+}
+
+sink {
+ Iceberg {
+ catalog_name="seatunnel_test"
+ iceberg.catalog.config={
+ "type"="hadoop"
+ "warehouse"="file:///tmp/seatunnel/iceberg/hadoop-sink/"
+ }
+ namespace="seatunnel_namespace"
+ table="iceberg_sink_table"
+ iceberg.table.write-props={
+ write.format.default="parquet"
+ write.target-file-size-bytes=536870912
+ }
+ iceberg.table.auto-create-enabled="true"
+ iceberg.table.primary-keys="id"
+ iceberg.table.partition-keys="f_datetime"
+ iceberg.table.upsert-mode-enabled=true
+ iceberg.table.schema-evolution-enabled=true
+ case_sensitive=true
+ }
+}
+```
+
+### Hive Catalog:
+
+```hocon
+sink {
+ Iceberg {
+ catalog_name="seatunnel_test"
+ iceberg.catalog.config={
+ type = "hive"
+ uri = "thrift://localhost:9083"
+ warehouse = "hdfs://your_cluster//tmp/seatunnel/iceberg/"
+ }
+ namespace="seatunnel_namespace"
+ table="iceberg_sink_table"
+ iceberg.table.write-props={
+ write.format.default="parquet"
+ write.target-file-size-bytes=536870912
+ }
+ iceberg.table.auto-create-enabled="true"
+ iceberg.table.primary-keys="id"
+ iceberg.table.partition-keys="f_datetime"
+ iceberg.table.upsert-mode-enabled=true
+ iceberg.table.schema-evolution-enabled=true
+ case_sensitive=true
+ }
+}
+```
+
+### Hadoop catalog:
+
+```hocon
+sink {
+ Iceberg {
+ catalog_name="seatunnel_test"
+ iceberg.catalog.config={
+ type = "hadoop"
+ warehouse = "hdfs://your_cluster/tmp/seatunnel/iceberg/"
+ }
+ namespace="seatunnel_namespace"
+ table="iceberg_sink_table"
+ iceberg.table.write-props={
+ write.format.default="parquet"
+ write.target-file-size-bytes=536870912
+ }
+ iceberg.table.auto-create-enabled="true"
+ iceberg.table.primary-keys="id"
+ iceberg.table.partition-keys="f_datetime"
+ iceberg.table.upsert-mode-enabled=true
+ iceberg.table.schema-evolution-enabled=true
+ case_sensitive=true
+ }
+}
+
+```
+
+## Changelog
+
+### 2.3.4-SNAPSHOT 2024-01-18
+
+- Add Iceberg Sink Connector
+
+### next version
+
diff --git a/docs/en/connector-v2/source/Iceberg.md b/docs/en/connector-v2/source/Iceberg.md
index a01833f066bb..8fb296467a88 100644
--- a/docs/en/connector-v2/source/Iceberg.md
+++ b/docs/en/connector-v2/source/Iceberg.md
@@ -4,7 +4,7 @@
## Support Iceberg Version
-- 0.14.0
+- 1.4.2
## Support Those Engines
@@ -34,18 +34,16 @@ Source connector for Apache Iceberg. It can support batch and stream mode.
## Supported DataSource Info
-| Datasource | Dependent | Maven |
-|------------|---------------------|---------------------------------------------------------------------------|
-| Iceberg | flink-shaded-hadoop | [Download](https://mvnrepository.com/search?q=flink-shaded-hadoop-) |
-| Iceberg | hive-exec | [Download](https://mvnrepository.com/artifact/org.apache.hive/hive-exec) |
-| Iceberg | libfb303 | [Download](https://mvnrepository.com/artifact/org.apache.thrift/libfb303) |
+| Datasource | Dependent | Maven |
+|------------|-----------|---------------------------------------------------------------------------|
+| Iceberg | hive-exec | [Download](https://mvnrepository.com/artifact/org.apache.hive/hive-exec) |
+| Iceberg | libfb303 | [Download](https://mvnrepository.com/artifact/org.apache.thrift/libfb303) |
## Database Dependency
-> In order to be compatible with different versions of Hadoop and Hive, the scope of hive-exec and flink-shaded-hadoop-2 in the project pom file are provided, so if you use the Flink engine, first you may need to add the following Jar packages to /lib directory, if you are using the Spark engine and integrated with Hadoop, then you do not need to add the following Jar packages.
+> In order to be compatible with different versions of Hadoop and Hive, the scope of hive-exec in the project pom file are provided, so if you use the Flink engine, first you may need to add the following Jar packages to /lib directory, if you are using the Spark engine and integrated with Hadoop, then you do not need to add the following Jar packages.
```
-flink-shaded-hadoop-x-xxx.jar
hive-exec-xxx.jar
libfb303-xxx.jar
```
@@ -76,11 +74,11 @@ libfb303-xxx.jar
| Name | Type | Required | Default | Description |
|--------------------------|---------|----------|----------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| catalog_name | string | yes | - | User-specified catalog name. |
-| catalog_type | string | yes | - | The optional values are: hive(The hive metastore catalog),hadoop(The hadoop catalog) |
-| uri | string | no | - | The Hive metastore’s thrift URI. |
-| warehouse | string | yes | - | The location to store metadata files and data files. |
| namespace | string | yes | - | The iceberg database name in the backend catalog. |
| table | string | yes | - | The iceberg table name in the backend catalog. |
+| iceberg.catalog.config | map | yes | - | Specify the properties for initializing the Iceberg catalog, which can be referenced in this file:"https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/CatalogProperties.java" |
+| hadoop.config | map | no | - | Properties passed through to the Hadoop configuration |
+| iceberg.hadoop-conf-path | string | no | - | The specified loading paths for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files. |
| schema | config | no | - | Use projection to select data columns and columns order. |
| case_sensitive | boolean | no | false | If data columns where selected via schema [config], controls whether the match to the schema will be done with case sensitivity. |
| start_snapshot_timestamp | long | no | - | Instructs this scan to look for changes starting from the most recent snapshot for the table as of the timestamp.
timestamp – the timestamp in millis since the Unix epoch |
@@ -123,8 +121,10 @@ source {
}
}
catalog_name = "seatunnel"
- catalog_type = "hadoop"
- warehouse = "file:///tmp/seatunnel/iceberg/hadoop/"
+ iceberg.catalog.config={
+ type = "hadoop"
+ warehouse = "file:///tmp/seatunnel/iceberg/hadoop/"
+ }
namespace = "database1"
table = "source"
result_table_name = "iceberg"
@@ -147,9 +147,13 @@ sink {
source {
Iceberg {
catalog_name = "seatunnel"
+ iceberg.catalog.config={
+ type = "hive"
+ uri = "thrift://localhost:9083"
+ warehouse = "hdfs://your_cluster//tmp/seatunnel/iceberg/"
+ }
catalog_type = "hive"
- uri = "thrift://localhost:9083"
- warehouse = "hdfs://your_cluster//tmp/seatunnel/iceberg/"
+
namespace = "your_iceberg_database"
table = "your_iceberg_table"
}
@@ -162,8 +166,10 @@ source {
source {
Iceberg {
catalog_name = "seatunnel"
- catalog_type = "hadoop"
- warehouse = "hdfs://your_cluster/tmp/seatunnel/iceberg/"
+ iceberg.catalog.config={
+ type = "hadoop"
+ warehouse = "hdfs://your_cluster/tmp/seatunnel/iceberg/"
+ }
namespace = "your_iceberg_database"
table = "your_iceberg_table"
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index c5ccae20a222..9e3769550556 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -72,6 +72,7 @@ seatunnel.sink.Sentry = connector-sentry
seatunnel.source.MongoDB = connector-mongodb
seatunnel.sink.MongoDB = connector-mongodb
seatunnel.source.Iceberg = connector-iceberg
+seatunnel.sink.Iceberg = connector-iceberg
seatunnel.source.InfluxDB = connector-influxdb
seatunnel.source.S3File = connector-file-s3
seatunnel.sink.S3File = connector-file-s3
diff --git a/pom.xml b/pom.xml
index 274fe05c7eba..3c9865273fb1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,6 @@
1.2.17
1.2.3
1.2
- 0.13.1
1.13.6
1.15.3
2.4.0
@@ -87,6 +86,7 @@
2.13.3
1.18.24
1.20
+ 1.11.1
false
false
false
@@ -105,7 +105,6 @@
2.11.0
4.4
3.3.0
- 1.20
1.8.0
provided
provided
diff --git a/release-note.md b/release-note.md
index 831018d27329..ac84ca90aca5 100644
--- a/release-note.md
+++ b/release-note.md
@@ -187,6 +187,7 @@
- [Transform-V2] Add support CatalogTable for FilterFieldTransform (#4422)
- [Transform-V2] Add catalog support for SQL Transform plugin (#4819)
- [Connector-V2] [Assert] Support check the precision and scale of Decimal type (#6110)
+- [Connector-V2] [Iceberg] Support iceberg sink #6198
### Zeta(ST-Engine)
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
index a4f082a3eae2..d9947ec324cc 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
@@ -69,8 +69,18 @@
org.glassfish.jersey.core
*
+
+
+ com.github.luben
+ zstd-jni
+
+
+ com.github.luben
+ zstd-jni
+ 1.5.5-5
+
@@ -84,6 +94,10 @@
io.debezium
debezium-embedded
+
+ com.github.luben
+ zstd-jni
+
com.zaxxer
HikariCP
diff --git a/seatunnel-connectors-v2/connector-iceberg/pom.xml b/seatunnel-connectors-v2/connector-iceberg/pom.xml
index d1689e8ef9e2..309900b09c77 100644
--- a/seatunnel-connectors-v2/connector-iceberg/pom.xml
+++ b/seatunnel-connectors-v2/connector-iceberg/pom.xml
@@ -30,12 +30,23 @@
SeaTunnel : Connectors V2 : Iceberg
- 0.14.0
- 1.12.3
+ 1.4.2
+ 1.13.1
1.11.3
2.3.9
+ connector-iceberg
+
+
+
+ com.github.luben
+ zstd-jni
+ 1.5.5-5
+
+
+
+
org.apache.seatunnel
@@ -48,6 +59,13 @@
iceberg-core
${iceberg.version}
+
+
+ org.apache.iceberg
+ iceberg-common
+ ${iceberg.version}
+
+
org.apache.iceberg
iceberg-api
@@ -119,13 +137,15 @@
- org.apache.flink
- flink-shaded-hadoop-2
+ org.apache.seatunnel
+ seatunnel-hadoop3-3.1.4-uber
+ ${project.version}
+ optional
provided
org.apache.avro
- *
+ avro
@@ -186,10 +206,50 @@
junit
junit
- ${junit4.version}
+ 4.13.2
test
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+
+ shade
+
+ package
+
+
+
+ org.apache.avro
+
+ ${seatunnel.shade.package}.${connector.name}.org.apache.avro
+
+
+ org.apache.orc
+ ${seatunnel.shade.package}.${connector.name}.org.apache.orc
+
+
+ org.apache.parquet
+
+ ${seatunnel.shade.package}.${connector.name}.org.apache.parquet
+
+
+ shaded.parquet
+
+ ${seatunnel.shade.package}.${connector.name}.shaded.parquet
+
+
+
+
+
+
+
+
+
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java
index 83a406897657..554099ef7b8c 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java
@@ -17,72 +17,108 @@
package org.apache.seatunnel.connectors.seatunnel.iceberg;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType;
-import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
+import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList;
+
+import org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.hadoop.HadoopCatalog;
-import org.apache.iceberg.hadoop.SerializableConfiguration;
-import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.common.DynClasses;
+import org.apache.iceberg.common.DynMethods;
-import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+@Slf4j
public class IcebergCatalogFactory implements Serializable {
private static final long serialVersionUID = -6003040601422350869L;
+ private static final List HADOOP_CONF_FILES =
+ ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml");
+ private CommonConfig config;
- private final String catalogName;
- private final IcebergCatalogType catalogType;
- private final String warehouse;
- private final String uri;
+ public IcebergCatalogFactory(CommonConfig config) {
+ this.config = config;
+ }
- public IcebergCatalogFactory(
- @NonNull String catalogName,
- @NonNull IcebergCatalogType catalogType,
- @NonNull String warehouse,
- String uri) {
- this.catalogName = catalogName;
- this.catalogType = catalogType;
- this.warehouse = warehouse;
- this.uri = uri;
+ public Catalog loadCatalog() {
+ // When using the seatunel engine, set the current class loader to prevent loading failures
+ Thread.currentThread().setContextClassLoader(IcebergCatalogFactory.class.getClassLoader());
+ return CatalogUtil.buildIcebergCatalog(
+ config.getCatalogName(), config.getCatalogProps(), loadHadoopConfig(config));
}
- public Catalog create() {
- Configuration conf = new Configuration();
- SerializableConfiguration serializableConf = new SerializableConfiguration(conf);
- Map properties = new HashMap<>();
- properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
+ /**
+ * Loading Hadoop configuration through reflection
+ *
+ * @param config
+ * @return
+ */
+ private Object loadHadoopConfig(CommonConfig config) {
+ Class> configClass =
+ DynClasses.builder()
+ .impl("org.apache.hadoop.hdfs.HdfsConfiguration")
+ .orNull()
+ .build();
+ if (configClass == null) {
+ configClass =
+ DynClasses.builder()
+ .impl("org.apache.hadoop.conf.Configuration")
+ .orNull()
+ .build();
+ }
- switch (catalogType) {
- case HADOOP:
- return hadoop(catalogName, serializableConf, properties);
- case HIVE:
- properties.put(CatalogProperties.URI, uri);
- return hive(catalogName, serializableConf, properties);
- default:
- throw new IcebergConnectorException(
- CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
- String.format("Unsupported catalogType: %s", catalogType));
+ if (configClass == null) {
+ log.info("Hadoop not found on classpath, not creating Hadoop config");
+ return null;
}
- }
- private static Catalog hadoop(
- String catalogName, SerializableConfiguration conf, Map properties) {
- return CatalogUtil.loadCatalog(
- HadoopCatalog.class.getName(), catalogName, properties, conf.get());
- }
+ try {
+ Object result = configClass.getDeclaredConstructor().newInstance();
+ DynMethods.BoundMethod addResourceMethod =
+ DynMethods.builder("addResource").impl(configClass, URL.class).build(result);
+ DynMethods.BoundMethod setMethod =
+ DynMethods.builder("set")
+ .impl(configClass, String.class, String.class)
+ .build(result);
- private static Catalog hive(
- String catalogName, SerializableConfiguration conf, Map properties) {
- return CatalogUtil.loadCatalog(
- HiveCatalog.class.getName(), catalogName, properties, conf.get());
+ // load any config files in the specified config directory
+ String hadoopConfPath = config.getHadoopConfPath();
+ if (hadoopConfPath != null) {
+ HADOOP_CONF_FILES.forEach(
+ confFile -> {
+ Path path = Paths.get(hadoopConfPath, confFile);
+ if (Files.exists(path)) {
+ try {
+ addResourceMethod.invoke(path.toUri().toURL());
+ } catch (IOException e) {
+ log.warn(
+ "Error adding Hadoop resource {}, resource was not added",
+ path,
+ e);
+ }
+ }
+ });
+ }
+ config.getHadoopProps().forEach(setMethod::invoke);
+ log.info("Hadoop config initialized: {}", configClass.getName());
+ return result;
+ } catch (InstantiationException
+ | IllegalAccessException
+ | NoSuchMethodException
+ | InvocationTargetException e) {
+ log.warn(
+ "Hadoop found on classpath but could not create config, proceeding without config",
+ e);
+ }
+ return null;
}
}
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergTableLoader.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergTableLoader.java
index 554dd0bd7599..591c2450d0fa 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergTableLoader.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergTableLoader.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.iceberg;
-import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.Table;
@@ -31,16 +31,13 @@
import java.io.IOException;
import java.io.Serializable;
-import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
-
public class IcebergTableLoader implements Closeable, Serializable {
private static final long serialVersionUID = 9061073826700804273L;
private final IcebergCatalogFactory icebergCatalogFactory;
private final String tableIdentifierStr;
-
- private Catalog catalog;
+ private transient Catalog catalog;
public IcebergTableLoader(
@NonNull IcebergCatalogFactory icebergCatalogFactory,
@@ -49,14 +46,24 @@ public IcebergTableLoader(
this.tableIdentifierStr = tableIdentifier.toString();
}
- public void open() {
- catalog = CachingCatalog.wrap(icebergCatalogFactory.create());
+ public Catalog getCatalog() {
+ return catalog;
+ }
+
+ public TableIdentifier getTableIdentifier() {
+ return TableIdentifier.parse(tableIdentifierStr);
+ }
+
+ public IcebergTableLoader open() {
+ catalog = CachingCatalog.wrap(icebergCatalogFactory.loadCatalog());
+ return this;
}
public Table loadTable() {
TableIdentifier tableIdentifier = TableIdentifier.parse(tableIdentifierStr);
- checkArgument(
- catalog.tableExists(tableIdentifier), "Illegal source table: " + tableIdentifier);
+ if (catalog == null) {
+ open();
+ }
return catalog.loadTable(tableIdentifier);
}
@@ -67,16 +74,10 @@ public void close() throws IOException {
}
}
- public static IcebergTableLoader create(SourceConfig sourceConfig) {
- IcebergCatalogFactory catalogFactory =
- new IcebergCatalogFactory(
- sourceConfig.getCatalogName(),
- sourceConfig.getCatalogType(),
- sourceConfig.getWarehouse(),
- sourceConfig.getUri());
+ public static IcebergTableLoader create(CommonConfig config) {
+ IcebergCatalogFactory catalogFactory = new IcebergCatalogFactory(config);
return new IcebergTableLoader(
catalogFactory,
- TableIdentifier.of(
- Namespace.of(sourceConfig.getNamespace()), sourceConfig.getTable()));
+ TableIdentifier.of(Namespace.of(config.getNamespace()), config.getTable()));
}
}
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java
index 2f893da092bf..a7503e6e3062 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java
@@ -17,19 +17,18 @@
package org.apache.seatunnel.connectors.seatunnel.iceberg.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.common.config.ConfigRuntimeException;
import lombok.Getter;
import lombok.ToString;
import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
-import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType.HADOOP;
-import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType.HIVE;
-import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
@Getter
@@ -40,19 +39,13 @@ public class CommonConfig implements Serializable {
public static final Option KEY_CATALOG_NAME =
Options.key("catalog_name")
.stringType()
- .noDefaultValue()
+ .defaultValue("default")
.withDescription(" the iceberg catalog name");
- public static final Option KEY_CATALOG_TYPE =
- Options.key("catalog_type")
- .enumType(IcebergCatalogType.class)
- .noDefaultValue()
- .withDescription(" the iceberg catalog type");
-
public static final Option KEY_NAMESPACE =
Options.key("namespace")
.stringType()
- .noDefaultValue()
+ .defaultValue("default")
.withDescription(" the iceberg namespace");
public static final Option KEY_TABLE =
@@ -61,17 +54,25 @@ public class CommonConfig implements Serializable {
.noDefaultValue()
.withDescription(" the iceberg table");
- public static final Option KEY_URI =
- Options.key("uri")
- .stringType()
+ public static final Option