Skip to content

Commit

Permalink
Merge branch 'dev' into 140_add_s3_improve
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 committed Aug 4, 2023
2 parents a99ad9c + 573306b commit 7a03fe4
Show file tree
Hide file tree
Showing 280 changed files with 16,674 additions and 1,951 deletions.
6 changes: 3 additions & 3 deletions .github/ISSUE_TEMPLATE/bug-report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ body:

- type: textarea
attributes:
label: Flink or Spark Version
description: Provide Flink or Spark Version.
label: Zeta or Flink or Spark Version
description: Provide Zeta or Flink or Spark Version.
placeholder: >
Please provide the version of Flink or Spark.
Please provide the version of Zeta or Flink or Spark.
validations:
required: false

Expand Down
26 changes: 25 additions & 1 deletion .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 90
timeout-minutes: 150
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
Expand Down Expand Up @@ -736,6 +736,30 @@ jobs:
env:
MAVEN_OPTS: -Xmx4096m

jdbc-connectors-it-part-4:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true'
runs-on: ${{ matrix.os }}
strategy:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 90
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v3
with:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: run jdbc connectors integration test (part-4)
if: needs.changes.outputs.api == 'true'
run: |
./mvnw -B -T 1C verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-4 -am -Pci
env:
MAVEN_OPTS: -Xmx4096m

kafka-connector-it:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true'
Expand Down
11 changes: 11 additions & 0 deletions .github/workflows/documents.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,14 @@ jobs:
cd seatunnel-website
npm install
npm run build
code-style:
name: Code style
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- uses: actions/checkout@v3
with:
submodules: true
- name: Check code style
run: ./mvnw --batch-mode --quiet --no-snapshot-updates clean spotless:check
3 changes: 2 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ seatunnel-connectors-v2/connector-cdc/connector-base/src/main/java/org/apache/se
seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql from https://github.com/ververica/flink-cdc-connectors
seatunnel-connectors-v2/connector-cdc/connector-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium from https://github.com/ververica/flink-cdc-connectors
seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java from https://github.com/debezium/debezium
seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb from https://github.com/ververica/flink-cdc-connectors
generate_client_protocol.sh from https://github.com/hazelcast/hazelcast
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java from https://github.com/hazelcast/hazelcast
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java from https://github.com/hazelcast/hazelcast
Expand All @@ -239,4 +240,4 @@ seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLEngine.java from https://github.com/JSQLParser/JSqlParser
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLType.java from https://github.com/JSQLParser/JSqlParser
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFilter.java from https://github.com/JSQLParser/JSqlParser
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFunction.java from https://github.com/JSQLParser/JSqlParser
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFunction.java from https://github.com/JSQLParser/JSqlParser
1 change: 1 addition & 0 deletions config/plugin_config
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ connector-amazondynamodb
connector-assert
connector-cassandra
connector-cdc-mysql
connector-cdc-mongodb
connector-cdc-sqlserver
connector-clickhouse
connector-datahub
Expand Down
107 changes: 107 additions & 0 deletions docs/en/connector-v2/formats/debezium-json.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Debezium Format

Changelog-Data-Capture Format: Serialization Schema Format: Deserialization Schema

Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a *change event stream*, and applications simply read these streams to see the change events in the same order in which they occurred.

Seatunnel supports to interpret Debezium JSON messages as INSERT/UPDATE/DELETE messages into seatunnel system. This is useful in many cases to leverage this feature, such as

synchronizing incremental data from databases to other systems
auditing logs
real-time materialized views on databases
temporal join changing history of a database table and so on.

Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in Seatunnel asDebezium JSON messages, and emit to storage like Kafka.

# Format Options

| option | default | required | Description |
|-----------------------------------|---------|----------|------------------------------------------------------------------------------------------------------|
| format | (none) | yes | Specify what format to use, here should be 'debezium_json'. |
| debezium-json.ignore-parse-errors | false | no | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. |

# How to use Debezium format

## Kafka uses example

Debezium provides a unified format for changelog, here is a simple example for an update operation captured from a MySQL products table:

```bash
{
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter ",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter ",
"weight": 5.17
},
"source": {
"version": "1.1.1.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1589362330000,
"snapshot": "false",
"db": "inventory",
"table": "products",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 2090,
"row": 0,
"thread": 2,
"query": null
},
"op": "u",
"ts_ms": 1589362330904,
"transaction": null
}
```

Note: please refer to Debezium documentation about the meaning of each fields.

The MySQL products table has 4 columns (id, name, description and weight).
The above JSON message is an update change event on the products table where the weight value of the row with id = 111 is changed from 5.18 to 5.15.
Assuming the messages have been synchronized to Kafka topic products_binlog, then we can use the following Seatunnel conf to consume this topic and interpret the change events by Debezium format.

```bash
env {
execution.parallelism = 1
job.mode = "BATCH"
}

source {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "products_binlog"
result_table_name = "kafka_name"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "string"
}
}
format = debezium_json
}

}

transform {
}

sink {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "consume-binlog"
format = debezium_json
}
}
```

Loading

0 comments on commit 7a03fe4

Please sign in to comment.