Skip to content

Commit

Permalink
Merge branch 'apache:dev' into support-spark-streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
CheneyYin authored Sep 19, 2024
2 parents 37b6533 + 0d12520 commit 7c8db55
Show file tree
Hide file tree
Showing 219 changed files with 12,806 additions and 1,253 deletions.
1 change: 1 addition & 0 deletions config/plugin_config
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ connector-cdc-mongodb
connector-cdc-sqlserver
connector-cdc-postgres
connector-cdc-oracle
connector-cdc-tidb
connector-clickhouse
connector-datahub
connector-dingtalk
Expand Down
22 changes: 14 additions & 8 deletions docs/en/concept/JobEnvConfig.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,26 @@ You can configure whether the task is in batch or stream mode through `job.mode`

### checkpoint.interval

Gets the interval in which checkpoints are periodically scheduled.
Gets the interval (milliseconds) in which checkpoints are periodically scheduled.

In `STREAMING` mode, checkpoints is required, if you do not set it, it will be obtained from the application configuration file `seatunnel.yaml`. In `BATCH` mode, you can disable checkpoints by not setting this parameter.
In `STREAMING` mode, checkpoints is required, if you do not set it, it will be obtained from the application configuration file `seatunnel.yaml`. In `BATCH` mode, you can disable checkpoints by not setting this parameter. In Zeta `STREAMING` mode, the default value is 30000 milliseconds.

### checkpoint.timeout

The timeout (in milliseconds) for a checkpoint. If the checkpoint is not completed before the timeout, the job will fail. In Zeta, the default value is 30000 milliseconds.

### parallelism

This parameter configures the parallelism of source and sink.

### shade.identifier

Specify the method of encryption, if you didn't have the requirement for encrypting or decrypting config files, this option can be ignored.

For more details, you can refer to the documentation [Config Encryption Decryption](../connector-v2/Config-Encryption-Decryption.md)

## Zeta Engine Parameter

### job.retry.times

Used to control the default retry times when a job fails. The default value is 3, and it only works in the Zeta engine.
Expand All @@ -43,12 +55,6 @@ This parameter is used to specify the location of the savemode when the job is e
The default value is `CLUSTER`, which means that the savemode is executed on the cluster. If you want to execute the savemode on the client,
you can set it to `CLIENT`. Please use `CLUSTER` mode as much as possible, because when there are no problems with `CLUSTER` mode, we will remove `CLIENT` mode.

### shade.identifier

Specify the method of encryption, if you didn't have the requirement for encrypting or decrypting config files, this option can be ignored.

For more details, you can refer to the documentation [Config Encryption Decryption](../connector-v2/Config-Encryption-Decryption.md)

## Flink Engine Parameter

Here are some SeaTunnel parameter names corresponding to the names in Flink, not all of them. Please refer to the official [Flink Documentation](https://flink.apache.org/).
Expand Down
71 changes: 45 additions & 26 deletions docs/en/concept/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ sql = """ select * from "table" """

## Json Format Support

Before writing the config file, please make sure that the name of the config file should end with `.json`.

```json

{
Expand Down Expand Up @@ -205,8 +207,26 @@ sql = """ select * from "table" """

## Config Variable Substitution

In config file we can define some variables and replace it in run time. **This is only support `hocon` format file**.
In a config file, we can define variables and replace them at runtime. However, note that only HOCON format files are supported.

### Usage of Variables:
- `${varName}`: If the variable is not provided, an exception will be thrown.
- `${varName:default}`: If the variable is not provided, the default value will be used. If you set a default value, it should be enclosed in double quotes.
- `${varName:}`: If the variable is not provided, an empty string will be used.

If you do not set the variable value through `-i`, you can also pass the value by setting the system environment variables. Variable substitution supports obtaining variable values through environment variables.
For example, you can set the environment variable in the shell script as follows:
```shell
export varName="value with space"
```
Then you can use the variable in the config file.

If you set a variable without default value in the config file but do not pass it during execution, an exception will be thrown. Example:
```shell
Caused by: org.apache.seatunnel.core.starter.exception.CommandExecuteException: Variable substitution error: ${resName}_table
```

### Example:
```hocon
env {
job.mode = "BATCH"
Expand All @@ -216,24 +236,24 @@ env {
source {
FakeSource {
result_table_name = ${resName}
row.num = ${rowNum}
result_table_name = "${resName:fake_test}_table"
row.num = "${rowNum:50}"
string.template = ${strTemplate}
int.template = [20, 21]
schema = {
fields {
name = ${nameType}
age = "int"
name = "${nameType:string}"
age = ${ageType}
}
}
}
}
transform {
sql {
source_table_name = "fake"
source_table_name = "${resName:fake_test}_table"
result_table_name = "sql"
query = "select * from "${resName}" where name = '"${nameVal}"' "
query = "select * from ${resName:fake_test}_table where name = '${nameVal}' "
}
}
Expand All @@ -245,26 +265,24 @@ sink {
password = ${password}
}
}
```

In the above config, we define some variables, like `${rowNum}`, `${resName}`.
We can replace those parameters with this shell command:
In the configuration above, we have defined several variables like `${rowNum}`, `${resName}`. We can replace these parameters using the following shell command:

```shell
./bin/seatunnel.sh -c <this_config_file>
-i jobName='this_is_a_job_name'
-i resName=fake
-i rowNum=10
-i strTemplate=['abc','d~f','hi']
-i nameType=string
-i ageType=int
-i nameVal=abc
-i username=seatunnel=2.3.1
-i password='$a^b%c.d~e0*9('
-m local
```

Then the final submitted config is:
In this case, `resName`, `rowNum`, and `nameType` are not set, so they will take their default values.

The final submitted configuration would be:

```hocon
env {
Expand All @@ -275,13 +293,13 @@ env {
source {
FakeSource {
result_table_name = "fake"
row.num = 10
string.template = ["abc","d~f","h i"]
result_table_name = "fake_test_table"
row.num = 50
string.template = ['abc','d~f','hi']
int.template = [20, 21]
schema = {
fields {
name = string
name = "string"
age = "int"
}
}
Expand All @@ -290,9 +308,9 @@ source {
transform {
sql {
source_table_name = "fake"
source_table_name = "fake_test_table"
result_table_name = "sql"
query = "select * from fake where name = 'abc' "
query = "select * from fake_test_table where name = 'abc' "
}
}
Expand All @@ -302,15 +320,16 @@ sink {
source_table_name = "sql"
username = "seatunnel=2.3.1"
password = "$a^b%c.d~e0*9("
}
}
}
```

Some Notes:
- Quota with `'` if the value has special character such as `(`
- If the replacement variables is in `"` or `'`, like `resName` and `nameVal`, you need add `"`
- The value can't have space `' '`, like `-i jobName='this is a job name' `, this will be replaced to `job.name = "this"`
- If you want to use dynamic parameters, you can use the following format: -i date=$(date +"%Y%m%d").
### Important Notes:
- If a value contains special characters like `(`, enclose it in single quotes (`'`).
- If the substitution variable contains double or single quotes (e.g., `"resName"` or `"nameVal"`), you need to include them with the value.
- The value cannot contain spaces (`' '`). For example, `-i jobName='this is a job name'` will be replaced with `job.name = "this"`. You can use environment variables to pass values with spaces.
- For dynamic parameters, you can use the following format: `-i date=$(date +"%Y%m%d")`.
- Cannot use specified system reserved characters; they will not be replaced by `-i`, such as: `${database_name}`, `${schema_name}`, `${table_name}`, `${schema_full_name}`, `${table_full_name}`, `${primary_key}`, `${unique_key}`, `${field_names}`. For details, please refer to [Sink Parameter Placeholders](sink-options-placeholders.md).

## What's More

Expand Down
58 changes: 58 additions & 0 deletions docs/en/concept/schema-evolution.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Schema evolution
Schema Evolution means that the schema of a data table can be changed and the data synchronization task can automatically adapt to the changes of the new table structure without any other operations.
Now we only support the operation about `add column``drop column``rename column` and `modify column` of the table in CDC source. This feature is only support zeta engine at now.

## Supported connectors

### Source
[Mysql-CDC](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/MySQL-CDC.md)

### Sink
[Jdbc-Mysql](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)

Note: The schema evolution is not support the transform at now.

## Enable schema evolution
Schema evolution is disabled by default in CDC source. You need configure `debezium.include.schema.changes = true` which is only supported in MySQL-CDC to enable it.

## Examples

### Mysql-CDC -> Jdbc-Mysql
```
env {
# You can set engine configuration here
parallelism = 5
job.mode = "STREAMING"
checkpoint.interval = 5000
read_limit.bytes_per_second=7000000
read_limit.rows_per_second=400
}
source {
MySQL-CDC {
server-id = 5652-5657
username = "st_user_source"
password = "mysqlpw"
table-names = ["shop.products"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
debezium = {
include.schema.changes = true
}
}
}
sink {
jdbc {
url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
driver = "com.mysql.cj.jdbc.Driver"
user = "st_user_sink"
password = "mysqlpw"
generate_sink_sql = true
database = shop
table = mysql_cdc_e2e_sink_table_with_schema_change_exactly_once
primary_keys = ["id"]
is_exactly_once = true
xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
}
}
```
2 changes: 2 additions & 0 deletions docs/en/concept/sql-config.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# SQL Configuration File

Before writing the sql config file, please make sure that the name of the config file should end with `.sql`.

## Structure of SQL Configuration File

The `SQL` configuration file appears as follows:
Expand Down
2 changes: 1 addition & 1 deletion docs/en/connector-v2/sink/Assert.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
## Description

A flink sink plugin which can assert illegal data by user defined rules
A sink plugin which can assert illegal data by user defined rules

## Key Features

Expand Down
42 changes: 42 additions & 0 deletions docs/en/connector-v2/sink/FtpFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ By default, we use 2PC commit to ensure `exactly-once`
| parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. |
| parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. |
| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. |
| schema_save_mode | string | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Existing dir processing method |
| data_save_mode | string | no | APPEND_DATA | Existing data processing method |

### host [string]

Expand Down Expand Up @@ -227,6 +229,18 @@ Support writing Parquet INT96 from a 12-byte field, only valid for parquet files
Only used when file_format_type is json,text,csv,xml.
The encoding of the file to write. This param will be parsed by `Charset.forName(encoding)`.

### schema_save_mode [string]
Existing dir processing method.
- RECREATE_SCHEMA: will create when the dir does not exist, delete and recreate when the dir is exist
- CREATE_SCHEMA_WHEN_NOT_EXIST: will create when the dir does not exist, skipped when the dir is exist
- ERROR_WHEN_SCHEMA_NOT_EXIST: error will be reported when the dir does not exist
- IGNORE :Ignore the treatment of the table

### data_save_mode [string]
Existing data processing method.
- DROP_DATA: preserve dir and delete data files
- APPEND_DATA: preserve dir, preserve data files
- ERROR_WHEN_DATA_EXISTS: when there is data files, an error is reported
## Example

For text file format simple config
Expand Down Expand Up @@ -273,6 +287,34 @@ FtpFile {

```

When our source end is multiple tables, and wants different expressions to different directory, we can configure this way

```hocon
FtpFile {
host = "xxx.xxx.xxx.xxx"
port = 21
user = "username"
password = "password"
path = "/data/ftp/seatunnel/job1/${table_name}"
tmp_path = "/data/ftp/seatunnel/tmp"
file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
partition_by = ["age"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
custom_filename = true
file_name_expression = "${transactionId}_${now}"
sink_columns = ["name","age"]
filename_time_format = "yyyy.MM.dd"
schema_save_mode=RECREATE_SCHEMA
data_save_mode=DROP_DATA
}
```

## Changelog

### 2.2.0-beta 2022-09-26
Expand Down
Loading

0 comments on commit 7c8db55

Please sign in to comment.