Skip to content

Commit

Permalink
[Doc][Iceberg] Improved iceberg documentation (#5335)
Browse files Browse the repository at this point in the history
* [Doc][Iceberg] Improved iceberg documentation

* [Doc][Iceberg] Improved iceberg documentation

---------

Co-authored-by: zhouyao <[email protected]>
  • Loading branch information
Carl-Zhou-CN and zhouyao authored Aug 23, 2023
1 parent f684776 commit 659a68a
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 125 deletions.
222 changes: 104 additions & 118 deletions docs/en/connector-v2/source/Iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@

> Apache Iceberg source connector
## Description
## Support Iceberg Version

Source connector for Apache Iceberg. It can support batch and stream mode.
- 0.14.0

## Support Those Engines

> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>
## Key features

Expand All @@ -22,126 +28,120 @@ Source connector for Apache Iceberg. It can support batch and stream mode.
- [x] hadoop(2.7.1 , 2.7.5 , 3.1.3)
- [x] hive(2.3.9 , 3.1.2)

## Options

| name | type | required | default value |
|--------------------------|---------|----------|----------------------|
| catalog_name | string | yes | - |
| catalog_type | string | yes | - |
| uri | string | no | - |
| warehouse | string | yes | - |
| namespace | string | yes | - |
| table | string | yes | - |
| schema | config | no | - |
| case_sensitive | boolean | no | false |
| start_snapshot_timestamp | long | no | - |
| start_snapshot_id | long | no | - |
| end_snapshot_id | long | no | - |
| use_snapshot_id | long | no | - |
| use_snapshot_timestamp | long | no | - |
| stream_scan_strategy | enum | no | FROM_LATEST_SNAPSHOT |
| common-options | | no | - |

### catalog_name [string]

User-specified catalog name.

### catalog_type [string]

The optional values are:
- hive: The hive metastore catalog.
- hadoop: The hadoop catalog.

### uri [string]

The Hive metastore’s thrift URI.

### warehouse [string]

The location to store metadata files and data files.

### namespace [string]

The iceberg database name in the backend catalog.

### table [string]

The iceberg table name in the backend catalog.

### case_sensitive [boolean]
## Description

If data columns where selected via schema [config], controls whether the match to the schema will be done with case sensitivity.
Source connector for Apache Iceberg. It can support batch and stream mode.

### schema [config]
## Supported DataSource Info

#### fields [Config]
| Datasource | Dependent | Maven |
|------------|---------------------|---------------------------------------------------------------------------|
| Iceberg | flink-shaded-hadoop | [Download](https://mvnrepository.com/search?q=flink-shaded-hadoop-) |
| Iceberg | hive-exec | [Download](https://mvnrepository.com/artifact/org.apache.hive/hive-exec) |
| Iceberg | libfb303 | [Download](https://mvnrepository.com/artifact/org.apache.thrift/libfb303) |

Use projection to select data columns and columns order.
## Database Dependency

e.g.
> In order to be compatible with different versions of Hadoop and Hive, the scope of hive-exec and flink-shaded-hadoop-2 in the project pom file are provided, so if you use the Flink engine, first you may need to add the following Jar packages to <FLINK_HOME>/lib directory, if you are using the Spark engine and integrated with Hadoop, then you do not need to add the following Jar packages.
```
schema {
fields {
f2 = "boolean"
f1 = "bigint"
f3 = "int"
f4 = "bigint"
}
}
flink-shaded-hadoop-x-xxx.jar
hive-exec-xxx.jar
libfb303-xxx.jar
```

### start_snapshot_id [long]

Instructs this scan to look for changes starting from a particular snapshot (exclusive).

### start_snapshot_timestamp [long]

Instructs this scan to look for changes starting from the most recent snapshot for the table as of the timestamp. timestamp – the timestamp in millis since the Unix epoch

### end_snapshot_id [long]

Instructs this scan to look for changes up to a particular snapshot (inclusive).

### use_snapshot_id [long]

Instructs this scan to look for use the given snapshot ID.

### use_snapshot_timestamp [long]

Instructs this scan to look for use the most recent snapshot as of the given time in milliseconds. timestamp – the timestamp in millis since the Unix epoch

### stream_scan_strategy [enum]

Starting strategy for stream mode execution, Default to use `FROM_LATEST_SNAPSHOT` if don’t specify any value.
The optional values are:
- TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode.
- FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive.
- FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive.
- FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive.
- FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive.

### common options

Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details.

## Example

simple
> Some versions of the hive-exec package do not have libfb303-xxx.jar, so you also need to manually import the Jar package.
## Data Type Mapping

| Iceberg Data type | SeaTunnel Data type |
|-------------------|---------------------|
| BOOLEAN | BOOLEAN |
| INTEGER | INT |
| LONG | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DATE | DATE |
| TIME | TIME |
| TIMESTAMP | TIMESTAMP |
| STRING | STRING |
| FIXED<br/>BINARY | BYTES |
| DECIMAL | DECIMAL |
| STRUCT | ROW |
| LIST | ARRAY |
| MAP | MAP |

## Source Options

| Name | Type | Required | Default | Description |
|--------------------------|---------|----------|----------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| catalog_name | string | yes | - | User-specified catalog name. |
| catalog_type | string | yes | - | The optional values are: hive(The hive metastore catalog),hadoop(The hadoop catalog) |
| uri | string | no | - | The Hive metastore’s thrift URI. |
| warehouse | string | yes | - | The location to store metadata files and data files. |
| namespace | string | yes | - | The iceberg database name in the backend catalog. |
| table | string | yes | - | The iceberg table name in the backend catalog. |
| schema | config | no | - | Use projection to select data columns and columns order. |
| case_sensitive | boolean | no | false | If data columns where selected via schema [config], controls whether the match to the schema will be done with case sensitivity. |
| start_snapshot_timestamp | long | no | - | Instructs this scan to look for changes starting from the most recent snapshot for the table as of the timestamp. <br/>timestamp – the timestamp in millis since the Unix epoch |
| start_snapshot_id | long | no | - | Instructs this scan to look for changes starting from a particular snapshot (exclusive). |
| end_snapshot_id | long | no | - | Instructs this scan to look for changes up to a particular snapshot (inclusive). |
| use_snapshot_id | long | no | - | Instructs this scan to look for use the given snapshot ID. |
| use_snapshot_timestamp | long | no | - | Instructs this scan to look for use the most recent snapshot as of the given time in milliseconds. timestamp – the timestamp in millis since the Unix epoch |
| stream_scan_strategy | enum | no | FROM_LATEST_SNAPSHOT | Starting strategy for stream mode execution, Default to use `FROM_LATEST_SNAPSHOT` if don’t specify any value,The optional values are:<br/>TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode.<br/>FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive.<br/>FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive.<br/>FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive.<br/>FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. |
| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. |

## Task Example

### Simple:

```hocon
env {
execution.parallelism = 2
job.mode = "BATCH"
}
source {
Iceberg {
schema {
fields {
f2 = "boolean"
f1 = "bigint"
f3 = "int"
f4 = "bigint"
f5 = "float"
f6 = "double"
f7 = "date"
f9 = "timestamp"
f10 = "timestamp"
f11 = "string"
f12 = "bytes"
f13 = "bytes"
f14 = "decimal(19,9)"
f15 = "array<int>"
f16 = "map<string, int>"
}
}
catalog_name = "seatunnel"
catalog_type = "hadoop"
warehouse = "hdfs://your_cluster//tmp/seatunnel/iceberg/"
namespace = "your_iceberg_database"
table = "your_iceberg_table"
warehouse = "file:///tmp/seatunnel/iceberg/hadoop/"
namespace = "database1"
table = "source"
result_table_name = "iceberg"
}
}
transform {
}
sink {
Console {
source_table_name = "iceberg"
}
}
```

Or
### Hive Catalog:

```hocon
source {
Expand All @@ -156,7 +156,7 @@ source {
}
```

column projection
### Column Projection:

```hocon
source {
Expand All @@ -179,20 +179,6 @@ source {
}
```

:::tip

In order to be compatible with different versions of Hadoop and Hive, the scope of hive-exec and flink-shaded-hadoop-2 in the project pom file are provided, so if you use the Flink engine, first you may need to add the following Jar packages to <FLINK_HOME>/lib directory, if you are using the Spark engine and integrated with Hadoop, then you do not need to add the following Jar packages.

:::

```
flink-shaded-hadoop-x-xxx.jar
hive-exec-xxx.jar
libfb303-xxx.jar
```

Some versions of the hive-exec package do not have libfb303-xxx.jar, so you also need to manually import the Jar package.

## Changelog

### 2.2.0-beta 2022-09-26
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import lombok.ToString;

import java.io.Serializable;
import java.util.List;

import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType.HADOOP;
import static org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType.HIVE;
Expand Down Expand Up @@ -80,12 +79,6 @@ public class CommonConfig implements Serializable {
.defaultValue(false)
.withDescription(" the iceberg case_sensitive");

public static final Option<List<String>> KEY_FIELDS =
Options.key("fields")
.listType()
.noDefaultValue()
.withDescription(" the iceberg table fields");

private String catalogName;
private IcebergCatalogType catalogType;
private String uri;
Expand Down

0 comments on commit 659a68a

Please sign in to comment.