From bb60019769740614b1d9b03b5986fce69255bb65 Mon Sep 17 00:00:00 2001 From: zhouyao Date: Fri, 18 Aug 2023 18:35:39 +0800 Subject: [PATCH 1/2] [Doc][Iceberg] Improved iceberg documentation --- docs/en/connector-v2/source/Iceberg.md | 218 ++++++++---------- .../iceberg/config/CommonConfig.java | 7 - 2 files changed, 98 insertions(+), 127 deletions(-) diff --git a/docs/en/connector-v2/source/Iceberg.md b/docs/en/connector-v2/source/Iceberg.md index 6a42ee0ddd3..d96830f27a1 100644 --- a/docs/en/connector-v2/source/Iceberg.md +++ b/docs/en/connector-v2/source/Iceberg.md @@ -2,9 +2,15 @@ > Apache Iceberg source connector -## Description +## Support Iceberg Version -Source connector for Apache Iceberg. It can support batch and stream mode. +- 0.14.0 + +## Support Those Engines + +> Spark
+> Flink
+> SeaTunnel Zeta
## Key features @@ -22,126 +28,112 @@ Source connector for Apache Iceberg. It can support batch and stream mode. - [x] hadoop(2.7.1 , 2.7.5 , 3.1.3) - [x] hive(2.3.9 , 3.1.2) -## Options - -| name | type | required | default value | -|--------------------------|---------|----------|----------------------| -| catalog_name | string | yes | - | -| catalog_type | string | yes | - | -| uri | string | no | - | -| warehouse | string | yes | - | -| namespace | string | yes | - | -| table | string | yes | - | -| schema | config | no | - | -| case_sensitive | boolean | no | false | -| start_snapshot_timestamp | long | no | - | -| start_snapshot_id | long | no | - | -| end_snapshot_id | long | no | - | -| use_snapshot_id | long | no | - | -| use_snapshot_timestamp | long | no | - | -| stream_scan_strategy | enum | no | FROM_LATEST_SNAPSHOT | -| common-options | | no | - | - -### catalog_name [string] - -User-specified catalog name. - -### catalog_type [string] - -The optional values are: -- hive: The hive metastore catalog. -- hadoop: The hadoop catalog. - -### uri [string] - -The Hive metastore’s thrift URI. - -### warehouse [string] - -The location to store metadata files and data files. - -### namespace [string] - -The iceberg database name in the backend catalog. - -### table [string] - -The iceberg table name in the backend catalog. - -### case_sensitive [boolean] - -If data columns where selected via schema [config], controls whether the match to the schema will be done with case sensitivity. - -### schema [config] +## Description -#### fields [Config] +Source connector for Apache Iceberg. It can support batch and stream mode. -Use projection to select data columns and columns order. +## Database Dependency -e.g. +> In order to be compatible with different versions of Hadoop and Hive, the scope of hive-exec and flink-shaded-hadoop-2 in the project pom file are provided, so if you use the Flink engine, first you may need to add the following Jar packages to /lib directory, if you are using the Spark engine and integrated with Hadoop, then you do not need to add the following Jar packages. ``` -schema { - fields { - f2 = "boolean" - f1 = "bigint" - f3 = "int" - f4 = "bigint" - } -} +flink-shaded-hadoop-x-xxx.jar +hive-exec-xxx.jar +libfb303-xxx.jar ``` -### start_snapshot_id [long] - -Instructs this scan to look for changes starting from a particular snapshot (exclusive). - -### start_snapshot_timestamp [long] - -Instructs this scan to look for changes starting from the most recent snapshot for the table as of the timestamp. timestamp – the timestamp in millis since the Unix epoch - -### end_snapshot_id [long] - -Instructs this scan to look for changes up to a particular snapshot (inclusive). - -### use_snapshot_id [long] - -Instructs this scan to look for use the given snapshot ID. - -### use_snapshot_timestamp [long] - -Instructs this scan to look for use the most recent snapshot as of the given time in milliseconds. timestamp – the timestamp in millis since the Unix epoch - -### stream_scan_strategy [enum] - -Starting strategy for stream mode execution, Default to use `FROM_LATEST_SNAPSHOT` if don’t specify any value. -The optional values are: -- TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. -- FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. -- FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive. -- FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive. -- FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. - -### common options - -Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. - -## Example - -simple +> Some versions of the hive-exec package do not have libfb303-xxx.jar, so you also need to manually import the Jar package. + +## Data Type Mapping + +| Iceberg Data type | SeaTunnel Data type | +|-------------------|---------------------| +| BOOLEAN | BOOLEAN | +| INTEGER | INT | +| LONG | BIGINT | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| DATE | DATE | +| TIME | TIME | +| TIMESTAMP | TIMESTAMP | +| STRING | STRING | +| FIXED
BINARY | BYTES | +| DECIMAL | DECIMAL | +| STRUCT | ROW | +| LIST | ARRAY | +| MAP | MAP | + +## Source Options + +| Name | Type | Required | Default | Description | +|--------------------------|---------|----------|----------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| catalog_name | string | yes | - | User-specified catalog name. | +| catalog_type | string | yes | - | The optional values are: hive(The hive metastore catalog),hadoop(The hadoop catalog) | +| uri | string | no | - | The Hive metastore’s thrift URI. | +| warehouse | string | yes | - | The location to store metadata files and data files. | +| namespace | string | yes | - | The iceberg database name in the backend catalog. | +| table | string | yes | - | The iceberg table name in the backend catalog. | +| schema | config | no | - | Use projection to select data columns and columns order. | +| case_sensitive | boolean | no | false | If data columns where selected via schema [config], controls whether the match to the schema will be done with case sensitivity. | +| start_snapshot_timestamp | long | no | - | Instructs this scan to look for changes starting from the most recent snapshot for the table as of the timestamp.
timestamp – the timestamp in millis since the Unix epoch | +| start_snapshot_id | long | no | - | Instructs this scan to look for changes starting from a particular snapshot (exclusive). | +| end_snapshot_id | long | no | - | Instructs this scan to look for changes up to a particular snapshot (inclusive). | +| use_snapshot_id | long | no | - | Instructs this scan to look for use the given snapshot ID. | +| use_snapshot_timestamp | long | no | - | Instructs this scan to look for use the most recent snapshot as of the given time in milliseconds. timestamp – the timestamp in millis since the Unix epoch | +| stream_scan_strategy | enum | no | FROM_LATEST_SNAPSHOT | Starting strategy for stream mode execution, Default to use `FROM_LATEST_SNAPSHOT` if don’t specify any value,The optional values are:
TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode.
FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive.
FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive.
FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive.
FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. | +| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. | + +## Task Example + +### Simple: ```hocon +env { + execution.parallelism = 2 + job.mode = "BATCH" +} + source { Iceberg { + schema { + fields { + f2 = "boolean" + f1 = "bigint" + f3 = "int" + f4 = "bigint" + f5 = "float" + f6 = "double" + f7 = "date" + f9 = "timestamp" + f10 = "timestamp" + f11 = "string" + f12 = "bytes" + f13 = "bytes" + f14 = "decimal(19,9)" + f15 = "array" + f16 = "map" + } + } catalog_name = "seatunnel" catalog_type = "hadoop" - warehouse = "hdfs://your_cluster//tmp/seatunnel/iceberg/" - namespace = "your_iceberg_database" - table = "your_iceberg_table" + warehouse = "file:///tmp/seatunnel/iceberg/hadoop/" + namespace = "database1" + table = "source" + result_table_name = "iceberg" + } +} + +transform { +} + +sink { + Console { + source_table_name = "iceberg" } } ``` -Or +### Hive Catalog: ```hocon source { @@ -156,7 +148,7 @@ source { } ``` -column projection +### Column Projection: ```hocon source { @@ -179,20 +171,6 @@ source { } ``` -:::tip - -In order to be compatible with different versions of Hadoop and Hive, the scope of hive-exec and flink-shaded-hadoop-2 in the project pom file are provided, so if you use the Flink engine, first you may need to add the following Jar packages to /lib directory, if you are using the Spark engine and integrated with Hadoop, then you do not need to add the following Jar packages. - -::: - -``` -flink-shaded-hadoop-x-xxx.jar -hive-exec-xxx.jar -libfb303-xxx.jar -``` - -Some versions of the hive-exec package do not have libfb303-xxx.jar, so you also need to manually import the Jar package. - ## Changelog ### 2.2.0-beta 2022-09-26 diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java index b82d21a706e..a370ab78198 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java @@ -26,7 +26,6 @@ import lombok.ToString; import java.io.Serializable; -import java.util.List; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -80,12 +79,6 @@ public class CommonConfig implements Serializable { .defaultValue(false) .withDescription(" the iceberg case_sensitive"); - public static final Option> KEY_FIELDS = - Options.key("fields") - .listType() - .noDefaultValue() - .withDescription(" the iceberg table fields"); - private String catalogName; private IcebergCatalogType catalogType; private String uri; From e45eed62d0a7cb6748d43ef4baa43203f9512fa6 Mon Sep 17 00:00:00 2001 From: zhouyao Date: Sun, 20 Aug 2023 19:15:33 +0800 Subject: [PATCH 2/2] [Doc][Iceberg] Improved iceberg documentation --- docs/en/connector-v2/source/Iceberg.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/en/connector-v2/source/Iceberg.md b/docs/en/connector-v2/source/Iceberg.md index d96830f27a1..b6d3924b95f 100644 --- a/docs/en/connector-v2/source/Iceberg.md +++ b/docs/en/connector-v2/source/Iceberg.md @@ -32,6 +32,14 @@ Source connector for Apache Iceberg. It can support batch and stream mode. +## Supported DataSource Info + +| Datasource | Dependent | Maven | +|------------|---------------------|---------------------------------------------------------------------------| +| Iceberg | flink-shaded-hadoop | [Download](https://mvnrepository.com/search?q=flink-shaded-hadoop-) | +| Iceberg | hive-exec | [Download](https://mvnrepository.com/artifact/org.apache.hive/hive-exec) | +| Iceberg | libfb303 | [Download](https://mvnrepository.com/artifact/org.apache.thrift/libfb303) | + ## Database Dependency > In order to be compatible with different versions of Hadoop and Hive, the scope of hive-exec and flink-shaded-hadoop-2 in the project pom file are provided, so if you use the Flink engine, first you may need to add the following Jar packages to /lib directory, if you are using the Spark engine and integrated with Hadoop, then you do not need to add the following Jar packages.