From 53c21e1ab03ac447603b156945204681cad0205f Mon Sep 17 00:00:00 2001 From: gaojun Date: Mon, 17 Jul 2023 23:22:16 +0800 Subject: [PATCH 1/6] Improve S3File Source & S3File Sink document --- docs/en/connector-v2/sink/S3File.md | 224 +++++++++++++----- docs/en/connector-v2/source/S3File.md | 323 +++++++++++++++----------- 2 files changed, 346 insertions(+), 201 deletions(-) diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md index dcaee7338fe..5774fb976f8 100644 --- a/docs/en/connector-v2/sink/S3File.md +++ b/docs/en/connector-v2/sink/S3File.md @@ -1,24 +1,17 @@ # S3File -> S3 file sink connector +> S3 File Sink Connector -## Description - -Output data to aws s3 file system. - -:::tip - -If you use spark/flink, In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x. - -If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you download and install SeaTunnel Engine. You can check the jar package under ${SEATUNNEL_HOME}/lib to confirm this. - -To use this connector you need put hadoop-aws-3.1.4.jar and aws-java-sdk-bundle-1.11.271.jar in ${SEATUNNEL_HOME}/lib dir. +## Support Those Engines -::: +> Spark
+> Flink
+> SeaTunnel Zeta
-## Key features +## Key Features - [x] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [cdc](../../concept/connector-v2-features.md) By default, we use 2PC commit to ensure `exactly-once` @@ -30,60 +23,106 @@ By default, we use 2PC commit to ensure `exactly-once` - [x] json - [x] excel -## Options - -| name | type | required | default value | remarks | -|----------------------------------|---------|----------|-------------------------------------------------------|--------------------------------------------------------------------------------------------------------| -| path | string | yes | - | | -| bucket | string | yes | - | | -| fs.s3a.endpoint | string | yes | - | | -| fs.s3a.aws.credentials.provider | string | yes | com.amazonaws.auth.InstanceProfileCredentialsProvider | | -| access_key | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider | -| access_secret | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider | -| custom_filename | boolean | no | false | Whether you need custom the filename | -| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | -| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | -| file_format_type | string | no | "csv" | | -| field_delimiter | string | no | '\001' | Only used when file_format is text | -| row_delimiter | string | no | "\n" | Only used when file_format is text | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used then have_partition is true | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | -| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | -| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | -| is_enable_transaction | boolean | no | true | | -| batch_size | int | no | 1000000 | | -| compress_codec | string | no | none | | -| common-options | object | no | - | | -| max_rows_in_memory | int | no | - | Only used when file_format is excel. | -| sheet_name | string | no | Sheet${Random number} | Only used when file_format is excel. | - -### path [string] - -The target dir path is required. - -### bucket [string] - -The bucket address of s3 file system, for example: `s3n://seatunnel-test`, if you use `s3a` protocol, this parameter should be `s3a://seatunnel-test`. - -### fs.s3a.endpoint [string] - -fs s3a endpoint +## Description -### fs.s3a.aws.credentials.provider [string] +Output data to aws s3 file system. -The way to authenticate s3a. We only support `org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider` and `com.amazonaws.auth.InstanceProfileCredentialsProvider` now. +## Supported DataSource Info -More information about the credential provider you can see [Hadoop AWS Document](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Simple_name.2Fsecret_credentials_with_SimpleAWSCredentialsProvider.2A) +| Datasource | Supported Versions | +|------------|--------------------| +| S3 | current | -### access_key [string] +## Database Dependency -The access key of s3 file system. If this parameter is not set, please confirm that the credential provider chain can be authenticated correctly, you could check this [hadoop-aws](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) +> If you use spark/flink, In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x. -### access_secret [string] +> If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you download and install SeaTunnel Engine. You can check the jar package under ${SEATUNNEL_HOME}/lib to confirm this. +To use this connector you need put hadoop-aws-3.1.4.jar and aws-java-sdk-bundle-1.11.271.jar in ${SEATUNNEL_HOME}/lib dir. -The access secret of s3 file system. If this parameter is not set, please confirm that the credential provider chain can be authenticated correctly, you could check this [hadoop-aws](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) +## Data Type Mapping + +If write to `csv`, `text` file type, All column will be string. + +### Orc File Type + + +| SeaTunnel Data type | Orc Data type | +|-----------------------|------------------------| +| STRING | STRING | +| BOOLEAN | BOOLEAN | +| TINYINT | BYTE | +| SMALLINT | SHORT | +| INT | INT | +| BIGINT | LONG | +| FLOAT | FLOAT | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| DECIMAL | DECIMAL | +| BYTES | BINARY | +| DATE | DATE | +| TIME
TIMESTAMP | TIMESTAMP | +| ROW | STRUCT | +| NULL | UNSUPPORTED DATA TYPE | +| ARRAY | LIST | +| Map | Map | + + +### Parquet File Type + + +| SeaTunnel Data type | Parquet Data type | +|-----------------------|-----------------------| +| STRING | STRING | +| BOOLEAN | BOOLEAN | +| TINYINT | INT_8 | +| SMALLINT | INT_16 | +| INT | INT32 | +| BIGINT | INT64 | +| FLOAT | FLOAT | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| DECIMAL | DECIMAL | +| BYTES | BINARY | +| DATE | DATE | +| TIME
TIMESTAMP | TIMESTAMP_MILLIS | +| ROW | GroupType | +| NULL | UNSUPPORTED DATA TYPE | +| ARRAY | LIST | +| Map | Map | + +## Sink Options + + +| name | type | required | default value | Description | +|----------------------------------|---------|----------|-------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| path | string | yes | - | | +| bucket | string | yes | - | | +| fs.s3a.endpoint | string | yes | - | | +| fs.s3a.aws.credentials.provider | string | yes | com.amazonaws.auth.InstanceProfileCredentialsProvider | The way to authenticate s3a. We only support `org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider` and `com.amazonaws.auth.InstanceProfileCredentialsProvider` now. | +| access_key | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider | +| access_secret | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider | +| custom_filename | boolean | no | false | Whether you need custom the filename | +| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | +| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | +| file_format_type | string | no | "csv" | | +| field_delimiter | string | no | '\001' | Only used when file_format is text | +| row_delimiter | string | no | "\n" | Only used when file_format is text | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Only used then have_partition is true | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | +| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | | +| batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | +| common-options | object | no | - | | +| max_rows_in_memory | int | no | - | Only used when file_format is excel. | +| sheet_name | string | no | Sheet${Random number} | Only used when file_format is excel. | +| hadoop_s3_properties | map | no | | If you need to add a other option, you could add it here and refer to this [link](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) | + + | ### hadoop_s3_properties [map] If you need to add a other option, you could add it here and refer to this [link](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) @@ -95,6 +134,7 @@ hadoop_s3_properties { } ``` + ### custom_filename [boolean] Whether custom the filename @@ -208,6 +248,70 @@ Writer the sheet of the workbook ## Example +### Simple: + +> This example defines a SeaTunnel synchronization task that automatically generates data through FakeSource and sends it to S3File Sink. FakeSource generates a total of 16 rows of data (row.num=16), with each row having two fields, name (string type) and age (int type). The final target s3 dir will also create a file and all of the data in write in it. +> Before run this job, you need create s3 path: /seatunnel/text. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in [Install SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy SeaTunnel. And then follow the instructions in [Quick Start With SeaTunnel Engine](../../start-v2/locally/quick-start-seatunnel-engine.md) to run this job. + +``` +# Defining the runtime environment +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + parallelism = 1 + result_table_name = "fake" + row.num = 16 + schema = { + fields { + name = "string" + age = "int" + } + } + } + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/category/source-v2 +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/category/transform-v2 +} + +sink { + S3File { + bucket = "s3a://seatunnel-test" + tmp_path = "/tmp/seatunnel" + path="/seatunnel/text" + fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn" + fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider" + file_format_type = "text" + field_delimiter = "\t" + row_delimiter = "\n" + have_partition = true + partition_by = ["age"] + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + custom_filename = true + file_name_expression = "${transactionId}_${now}" + filename_time_format = "yyyy.MM.dd" + sink_columns = ["name","age"] + is_enable_transaction=true + hadoop_s3_properties { + "fs.s3a.buffer.dir" = "/data/st_test/s3a" + "fs.s3a.fast.upload.buffer" = "disk" + } + } + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/category/sink-v2 +} +``` + For text file format with `have_partition` and `custom_filename` and `sink_columns` and `com.amazonaws.auth.InstanceProfileCredentialsProvider` ```hocon diff --git a/docs/en/connector-v2/source/S3File.md b/docs/en/connector-v2/source/S3File.md index f58a1a6bc36..81c42238dc5 100644 --- a/docs/en/connector-v2/source/S3File.md +++ b/docs/en/connector-v2/source/S3File.md @@ -1,22 +1,14 @@ # S3File -> S3 file source connector +> S3 File Source Connector -## Description - -Read data from aws s3 file system. - -:::tip - -If you use spark/flink, In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x. - -If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you download and install SeaTunnel Engine. You can check the jar package under ${SEATUNNEL_HOME}/lib to confirm this. +## Support Those Engines -To use this connector you need put hadoop-aws-3.1.4.jar and aws-java-sdk-bundle-1.11.271.jar in ${SEATUNNEL_HOME}/lib dir. - -::: +> Spark
+> Flink
+> SeaTunnel Zeta
-## Key features +## Key Features - [x] [batch](../../concept/connector-v2-features.md) - [ ] [stream](../../concept/connector-v2-features.md) @@ -35,103 +27,31 @@ Read all the data in a split in a pollNext call. What splits are read will be sa - [x] json - [x] excel -## Options - -| name | type | required | default value | -|---------------------------------|---------|----------|-------------------------------------------------------| -| path | string | yes | - | -| file_format_type | string | yes | - | -| bucket | string | yes | - | -| fs.s3a.endpoint | string | yes | - | -| fs.s3a.aws.credentials.provider | string | yes | com.amazonaws.auth.InstanceProfileCredentialsProvider | -| read_columns | list | no | - | -| access_key | string | no | - | -| access_secret | string | no | - | -| hadoop_s3_properties | map | no | - | -| delimiter | string | no | \001 | -| parse_partition_from_path | boolean | no | true | -| date_format | string | no | yyyy-MM-dd | -| datetime_format | string | no | yyyy-MM-dd HH:mm:ss | -| time_format | string | no | HH:mm:ss | -| skip_header_row_number | long | no | 0 | -| schema | config | no | - | -| common-options | | no | - | -| sheet_name | string | no | - | - -### path [string] - -The source file path. - -### fs.s3a.endpoint [string] - -fs s3a endpoint - -### fs.s3a.aws.credentials.provider [string] - -The way to authenticate s3a. We only support `org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider` and `com.amazonaws.auth.InstanceProfileCredentialsProvider` now. - -More information about the credential provider you can see [Hadoop AWS Document](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Simple_name.2Fsecret_credentials_with_SimpleAWSCredentialsProvider.2A) - -### delimiter [string] - -Field delimiter, used to tell connector how to slice and dice fields when reading text files - -default `\001`, the same as hive's default delimiter - -### parse_partition_from_path [boolean] - -Control whether parse the partition keys and values from file path - -For example if you read a file from path `s3n://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26` - -Every record data from file will be added these two fields: - -| name | age | -|---------------|-----| -| tyrantlucifer | 26 | - -Tips: **Do not define partition fields in schema option** - -### date_format [string] - -Date type format, used to tell connector how to convert string to date, supported as the following formats: - -`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd` - -default `yyyy-MM-dd` - -### datetime_format [string] - -Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats: - -`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` - -default `yyyy-MM-dd HH:mm:ss` - -### time_format [string] - -Time type format, used to tell connector how to convert string to time, supported as the following formats: - -`HH:mm:ss` `HH:mm:ss.SSS` +## Description -default `HH:mm:ss` +Read data from aws s3 file system. -### skip_header_row_number [long] +## Supported DataSource Info -Skip the first few lines, but only for the txt and csv. +| Datasource | Supported versions | +|------------|--------------------| +| S3 | current | -For example, set like following: +## Dependency -`skip_header_row_number = 2` +> If you use spark/flink, In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x.
-then SeaTunnel will skip the first 2 lines from source files +> If you use SeaTunnel Zeta, It automatically integrated the hadoop jar when you download and install SeaTunnel Zeta. You can check the jar package under ${SEATUNNEL_HOME}/lib to confirm this.
+To use this connector you need put hadoop-aws-3.1.4.jar and aws-java-sdk-bundle-1.11.271.jar in ${SEATUNNEL_HOME}/lib dir. -### file_format_type [string] +## Data Type Mapping -File type, supported as the following file types: +Data type mapping is related to the type of file being read, We supported as the following file types: `text` `csv` `parquet` `orc` `json` `excel` +### JSON File Type + If you assign file type to `json`, you should also assign schema option to tell connector how to parse data to the row you want. For example: @@ -173,7 +93,8 @@ connector will generate data as the following: |------|-------------|---------| | 200 | get success | true | -If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically. + +### Text Or CSV File Type If you assign file type to `text` `csv`, you can choose to specify the schema information or not. @@ -214,61 +135,107 @@ connector will generate data as the following: |---------------|-----|--------| | tyrantlucifer | 26 | male | -### bucket [string] - -The bucket address of s3 file system, for example: `s3n://seatunnel-test`, if you use `s3a` protocol, this parameter should be `s3a://seatunnel-test`. - -### access_key [string] - -The access key of s3 file system. If this parameter is not set, please confirm that the credential provider chain can be authenticated correctly, you could check this [hadoop-aws](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) - -### access_secret [string] - -The access secret of s3 file system. If this parameter is not set, please confirm that the credential provider chain can be authenticated correctly, you could check this [hadoop-aws](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) -### hadoop_s3_properties [map] +### Orc File Type -If you need to add a other option, you could add it here and refer to this [hadoop-aws](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) - -``` -hadoop_s3_properties { - "xxx" = "xxx" - } -``` - -### schema [config] - -#### fields [Config] - -The schema of upstream data. +If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically. -### read_columns [list] -The read column list of the data source, user can use it to implement field projection. +| Orc Data type | SeaTunnel Data type | +|----------------------------------|----------------------------------------------------------------| +| BOOLEAN | BOOLEAN | +| INT | INT | +| BYTE | BYTE | +| SHORT | SHORT | +| LONG | LONG | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| BINARY | BINARY | +| STRING
VARCHAR
CHAR
| STRING | +| DATE | LOCAL_DATE_TYPE | +| TIMESTAMP | LOCAL_DATE_TIME_TYPE | +| DECIMAL | DECIMAL | +| LIST(STRING) | STRING_ARRAY_TYPE | +| LIST(BOOLEAN) | BOOLEAN_ARRAY_TYPE | +| LIST(TINYINT) | BYTE_ARRAY_TYPE | +| LIST(SMALLINT) | SHORT_ARRAY_TYPE | +| LIST(INT) | INT_ARRAY_TYPE | +| LIST(BIGINT) | LONG_ARRAY_TYPE | +| LIST(FLOAT) | FLOAT_ARRAY_TYPE | +| LIST(DOUBLE) | DOUBLE_ARRAY_TYPE | +| Map | MapType, This type of K and V will transform to SeaTunnel type | +| STRUCT | SeaTunnelRowType | + +### Parquet File Type -The file type supported column projection as the following shown: +If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically. -- text -- json -- csv -- orc -- parquet -- excel -**Tips: If the user wants to use this feature when reading `text` `json` `csv` files, the schema option must be configured** +| Orc Data type | SeaTunnel Data type | +|---------------------------|----------------------------------------------------------------| +| INT_8 | BYTE | +| INT_16 | SHORT | +| DATE | DATE | +| TIMESTAMP_MILLIS | TIMESTAMP | +| INT64 | LONG | +| INT96 | TIMESTAMP | +| BINARY | BYTES | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| BOOLEAN | BOOLEAN | +| FIXED_LEN_BYTE_ARRAY | TIMESTAMP
DECIMAL | +| DECIMAL | DECIMAL | +| LIST(STRING) | STRING_ARRAY_TYPE | +| LIST(BOOLEAN) | BOOLEAN_ARRAY_TYPE | +| LIST(TINYINT) | BYTE_ARRAY_TYPE | +| LIST(SMALLINT) | SHORT_ARRAY_TYPE | +| LIST(INT) | INT_ARRAY_TYPE | +| LIST(BIGINT) | LONG_ARRAY_TYPE | +| LIST(FLOAT) | FLOAT_ARRAY_TYPE | +| LIST(DOUBLE) | DOUBLE_ARRAY_TYPE | +| Map | MapType, This type of K and V will transform to SeaTunnel type | +| STRUCT | SeaTunnelRowType | -### common options -Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. +## Options -### sheet_name [string] +| name | type | required | default value | Description | +|---------------------------------|---------|----------|-------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| path | string | yes | - | The s3 path that needs to be read can have sub paths, but the sub paths need to meet certain format requirements. Specific requirements can be referred to "parse_partition_from_path" option | +| file_format_type | string | yes | - | File type, supported as the following file types: `text` `csv` `parquet` `orc` `json` `excel` | +| bucket | string | yes | - | The bucket address of s3 file system, for example: `s3n://seatunnel-test`, if you use `s3a` protocol, this parameter should be `s3a://seatunnel-test`. | +| fs.s3a.endpoint | string | yes | - | fs s3a endpoint | +| fs.s3a.aws.credentials.provider | string | yes | com.amazonaws.auth.InstanceProfileCredentialsProvider | The way to authenticate s3a. We only support `org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider` and `com.amazonaws.auth.InstanceProfileCredentialsProvider` now. More information about the credential provider you can see [Hadoop AWS Document](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Simple_name.2Fsecret_credentials_with_SimpleAWSCredentialsProvider.2A) | +| read_columns | list | no | - | The read column list of the data source, user can use it to implement field projection. The file type supported column projection as the following shown: `text` `csv` `parquet` `orc` `json` `excel` . If the user wants to use this feature when reading `text` `json` `csv` files, the "schema" option must be configured. | +| access_key | string | no | - | Only used when `fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider ` | +| access_secret | string | no | - | Only used when `fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider ` | +| hadoop_s3_properties | map | no | - | If you need to add other option, you could add it here and refer to this [link](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) | +| delimiter | string | no | \001 | Field delimiter, used to tell connector how to slice and dice fields when reading text files. Default `\001`, the same as hive's default delimiter. | +| parse_partition_from_path | boolean | no | true | Control whether parse the partition keys and values from file path. For example if you read a file from path `s3n://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`. Every record data from file will be added these two fields: name="tyrantlucifer", age=16 | +| date_format | string | no | yyyy-MM-dd | Date type format, used to tell connector how to convert string to date, supported as the following formats:`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`. default `yyyy-MM-dd` | +| datetime_format | string | no | yyyy-MM-dd HH:mm:ss | Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats:`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` | +| time_format | string | no | HH:mm:ss | Time type format, used to tell connector how to convert string to time, supported as the following formats:`HH:mm:ss` `HH:mm:ss.SSS` | +| skip_header_row_number | long | no | 0 | Skip the first few lines, but only for the txt and csv. For example, set like following:`skip_header_row_number = 2`. Then SeaTunnel will skip the first 2 lines from source files | +| schema | config | no | - | The schema of upstream data. | +| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. | +| sheet_name | string | no | - | Reader the sheet of the workbook,Only used when file_format is excel. | -Reader the sheet of the workbook,Only used when file_format is excel. ## Example -```hocon +1. In this example, We read data from s3 path `s3a://seatunnel-test/seatunnel/text` and the file type is orc in this path. + We use `org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider` to authentication so `access_key` and `secret_key` is required. + All columns in the file will be read and send to sink. + +``` +# Defining the runtime environment +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} +source { S3File { path = "/seatunnel/text" fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn" @@ -278,9 +245,31 @@ Reader the sheet of the workbook,Only used when file_format is excel. bucket = "s3a://seatunnel-test" file_format_type = "orc" } +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/category/transform-v2 +} +sink { + jdbc { + url = "jdbc:mysql://localhost:3306/test" + driver = "com.mysql.cj.jdbc.Driver" + user = "root" + password = "123456" + # Automatically generate sql statements based on database table names + generate_sink_sql = true + database = test + table = test_table + } +} ``` + +2. Use `InstanceProfileCredentialsProvider` to authentication + The file type in S3 is json, so need config schema option. + ```hocon S3File { @@ -299,6 +288,58 @@ Reader the sheet of the workbook,Only used when file_format is excel. ``` +3. Use `InstanceProfileCredentialsProvider` to authentication + The file type in S3 is json and has five fields (`id`, `name`, `age`, `sex`, `type`), so need config schema option. + In this job, we only need send `id` and `name` column to mysql. + + +``` +# Defining the runtime environment +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + S3File { + path = "/seatunnel/json" + bucket = "s3a://seatunnel-test" + fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn" + fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider" + file_format_type = "json" + read_columns = ["id", "name"] + schema { + fields { + id = int + name = string + age = int + sex = int + type = string + } + } + } +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/category/transform-v2 +} + +sink { + jdbc { + url = "jdbc:mysql://localhost:3306/test" + driver = "com.mysql.cj.jdbc.Driver" + user = "root" + password = "123456" + # Automatically generate sql statements based on database table names + generate_sink_sql = true + database = test + table = test_table + } +} +``` + ## Changelog ### 2.3.0-beta 2022-10-20 From db883b0b578b65f558db2d3120b78de18de9aa71 Mon Sep 17 00:00:00 2001 From: Eric Date: Tue, 25 Jul 2023 10:42:56 +0800 Subject: [PATCH 2/6] Update docs/en/connector-v2/sink/S3File.md Co-authored-by: TaoZex <45089228+TaoZex@users.noreply.github.com> --- docs/en/connector-v2/sink/S3File.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md index 5774fb976f8..d0f8f3e0d36 100644 --- a/docs/en/connector-v2/sink/S3File.md +++ b/docs/en/connector-v2/sink/S3File.md @@ -110,7 +110,7 @@ If write to `csv`, `text` file type, All column will be string. | field_delimiter | string | no | '\001' | Only used when file_format is text | | row_delimiter | string | no | "\n" | Only used when file_format is text | | have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used then have_partition is true | +| partition_by | array | no | - | Only used when have_partition is true | | partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | | is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | | sink_columns | array | no | | When this parameter is empty, all fields are sink columns | From b60a78a9936abc699c34966f00e615f4d25a646a Mon Sep 17 00:00:00 2001 From: Eric Date: Tue, 25 Jul 2023 10:43:04 +0800 Subject: [PATCH 3/6] Update docs/en/connector-v2/sink/S3File.md Co-authored-by: TaoZex <45089228+TaoZex@users.noreply.github.com> --- docs/en/connector-v2/sink/S3File.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md index d0f8f3e0d36..edd842d6642 100644 --- a/docs/en/connector-v2/sink/S3File.md +++ b/docs/en/connector-v2/sink/S3File.md @@ -111,7 +111,7 @@ If write to `csv`, `text` file type, All column will be string. | row_delimiter | string | no | "\n" | Only used when file_format is text | | have_partition | boolean | no | false | Whether you need processing partitions. | | partition_by | array | no | - | Only used when have_partition is true | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used when have_partition is true | | is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | | sink_columns | array | no | | When this parameter is empty, all fields are sink columns | | is_enable_transaction | boolean | no | true | | From a99ad9c5c350de5f0d52d77432e435b924cefbd6 Mon Sep 17 00:00:00 2001 From: Eric Date: Tue, 25 Jul 2023 10:43:11 +0800 Subject: [PATCH 4/6] Update docs/en/connector-v2/sink/S3File.md Co-authored-by: TaoZex <45089228+TaoZex@users.noreply.github.com> --- docs/en/connector-v2/sink/S3File.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md index edd842d6642..0892dc6a48a 100644 --- a/docs/en/connector-v2/sink/S3File.md +++ b/docs/en/connector-v2/sink/S3File.md @@ -112,7 +112,7 @@ If write to `csv`, `text` file type, All column will be string. | have_partition | boolean | no | false | Whether you need processing partitions. | | partition_by | array | no | - | Only used when have_partition is true | | partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used when have_partition is true | -| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | +| is_partition_field_write_in_file | boolean | no | false | Only used when have_partition is true | | sink_columns | array | no | | When this parameter is empty, all fields are sink columns | | is_enable_transaction | boolean | no | true | | | batch_size | int | no | 1000000 | | From bd79b74349ac71733045b886ab179407c4ddf716 Mon Sep 17 00:00:00 2001 From: gaojun Date: Thu, 10 Aug 2023 13:39:52 +0800 Subject: [PATCH 5/6] add all type in example FakeSource --- docs/en/connector-v2/sink/S3File.md | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md index 5d91175d4a7..4bb670ae38c 100644 --- a/docs/en/connector-v2/sink/S3File.md +++ b/docs/en/connector-v2/sink/S3File.md @@ -37,8 +37,8 @@ Output data to aws s3 file system. > If you use spark/flink, In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x. > -> If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you download and install SeaTunnel Engine. You can check the jar package under ${SEATUNNEL_HOME}/lib to confirm this. -> To use this connector you need put hadoop-aws-3.1.4.jar and aws-java-sdk-bundle-1.11.271.jar in ${SEATUNNEL_HOME}/lib dir. +> If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you download and install SeaTunnel Engine. You can check the jar package under `${SEATUNNEL_HOME}/lib` to confirm this. +> To use this connector you need put `hadoop-aws-3.1.4.jar` and `aws-java-sdk-bundle-1.11.271.jar` in `${SEATUNNEL_HOME}/lib` dir. ## Data Type Mapping @@ -263,8 +263,21 @@ source { row.num = 16 schema = { fields { - name = "string" - age = "int" + c_map = "map>" + c_array = "array" + name = string + c_boolean = boolean + age = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(16, 1)" + c_null = "null" + c_bytes = bytes + c_date = date + c_timestamp = timestamp } } } From a6b447e17c7175a356edbfb6247795650b6ded01 Mon Sep 17 00:00:00 2001 From: liuli Date: Fri, 11 Aug 2023 11:42:48 +0800 Subject: [PATCH 6/6] update sink to console --- docs/en/connector-v2/source/S3File.md | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/docs/en/connector-v2/source/S3File.md b/docs/en/connector-v2/source/S3File.md index d9b4d8825eb..54124a37038 100644 --- a/docs/en/connector-v2/source/S3File.md +++ b/docs/en/connector-v2/source/S3File.md @@ -247,16 +247,7 @@ transform { } sink { - jdbc { - url = "jdbc:mysql://localhost:3306/test" - driver = "com.mysql.cj.jdbc.Driver" - user = "root" - password = "123456" - # Automatically generate sql statements based on database table names - generate_sink_sql = true - database = test - table = test_table - } + Console {} } ``` @@ -319,16 +310,7 @@ transform { } sink { - jdbc { - url = "jdbc:mysql://localhost:3306/test" - driver = "com.mysql.cj.jdbc.Driver" - user = "root" - password = "123456" - # Automatically generate sql statements based on database table names - generate_sink_sql = true - database = test - table = test_table - } + Console {} } ```