Skip to content

Commit

Permalink
[Feature][connector-v2][mongodbcdc]Support source mongodb cdc (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
MonsterChenzhuo authored and liunaijie committed Jul 19, 2023
1 parent 40bbb04 commit eef4e59
Show file tree
Hide file tree
Showing 50 changed files with 5,696 additions and 9 deletions.
3 changes: 2 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ seatunnel-connectors-v2/connector-cdc/connector-base/src/main/java/org/apache/se
seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql from https://github.com/ververica/flink-cdc-connectors
seatunnel-connectors-v2/connector-cdc/connector-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium from https://github.com/ververica/flink-cdc-connectors
seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java from https://github.com/debezium/debezium
seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb from https://github.com/ververica/flink-cdc-connectors
generate_client_protocol.sh from https://github.com/hazelcast/hazelcast
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java from https://github.com/hazelcast/hazelcast
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java from https://github.com/hazelcast/hazelcast
Expand All @@ -239,4 +240,4 @@ seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLEngine.java from https://github.com/JSQLParser/JSqlParser
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLType.java from https://github.com/JSQLParser/JSqlParser
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFilter.java from https://github.com/JSQLParser/JSqlParser
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFunction.java from https://github.com/JSQLParser/JSqlParser
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFunction.java from https://github.com/JSQLParser/JSqlParser
1 change: 1 addition & 0 deletions config/plugin_config
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ connector-amazondynamodb
connector-assert
connector-cassandra
connector-cdc-mysql
connector-cdc-mongodb
connector-cdc-sqlserver
connector-clickhouse
connector-datahub
Expand Down
282 changes: 282 additions & 0 deletions docs/en/connector-v2/source/MongoDB-CDC.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
# MongoDB CDC

> MongoDB CDC source connector
## Support Those Engines

> SeaTunnel Zeta<br/>
## Key Features

- [ ] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [x] [support user-defined split](../../concept/connector-v2-features.md)

## Description

The MongoDB CDC connector allows for reading snapshot data and incremental data from MongoDB database.

## Supported DataSource Info

In order to use the Mongodb CDC connector, the following dependencies are required.
They can be downloaded via install-plugin.sh or from the Maven central repository.

| Datasource | Supported Versions | Dependency |
|------------|--------------------|-------------------------------------------------------------------------------------------------------------------|
| MongoDB | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-cdc-mongodb) |

## Availability Settings

1.MongoDB version: MongoDB version >= 4.0.

2.Cluster deployment: replica sets or sharded clusters.

3.Storage Engine: WiredTiger Storage Engine.

4.Permissions:changeStream and read

```shell
use admin;
db.createRole(
{
role: "strole",
privileges: [{
resource: { db: "", collection: "" },
actions: [
"splitVector",
"listDatabases",
"listCollections",
"collStats",
"find",
"changeStream" ]
}],
roles: [
{ role: 'read', db: 'config' }
]
}
);

db.createUser(
{
user: 'stuser',
pwd: 'stpw',
roles: [
{ role: 'strole', db: 'admin' }
]
}
);
```
## Data Type Mapping
The following table lists the field data type mapping from MongoDB BSON type to Seatunnel data type.
| MongoDB BSON type | Seatunnel Data type |
|-------------------|---------------------|
| ObjectId | STRING |
| String | STRING |
| Boolean | BOOLEAN |
| Binary | BINARY |
| Int32 | INTEGER |
| Int64 | BIGINT |
| Double | DOUBLE |
| Decimal128 | DECIMAL |
| Date | Date |
| Timestamp | Timestamp |
| Object | ROW |
| Array | ARRAY |
For specific types in MongoDB, we use Extended JSON format to map them to Seatunnel STRING type.
| MongoDB BSON type | Seatunnel STRING |
|-------------------|----------------------------------------------------------------------------------------------|
| Symbol | {"_value": {"$symbol": "12"}} |
| RegularExpression | {"_value": {"$regularExpression": {"pattern": "^9$", "options": "i"}}} |
| JavaScript | {"_value": {"$code": "function() { return 10; }"}} |
| DbPointer | {"_value": {"$dbPointer": {"$ref": "db.coll", "$id": {"$oid": "63932a00da01604af329e33c"}}}} |
**Tips**
> 1.When using the DECIMAL type in SeaTunnel, be aware that the maximum range cannot exceed 34 digits, which means you should use decimal(34, 18).<br/>
## Source Options
| Name | Type | Required | Default | Description |
|------------------------------------|--------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| hosts | String | Yes | - | The comma-separated list of hostname and port pairs of the MongoDB servers. eg. `localhost:27017,localhost:27018` |
| username | String | No | - | Name of the database user to be used when connecting to MongoDB. |
| password | String | No | - | Password to be used when connecting to MongoDB. |
| database | List | Yes | - | Name of the database to watch for changes. If not set then all databases will be captured. The database also supports regular expressions to monitor multiple databases matching the regular expression. eg. `db1,db2`. |
| collection | List | Yes | - | Name of the collection in the database to watch for changes. If not set then all collections will be captured. The collection also supports regular expressions to monitor multiple collections matching fully-qualified collection identifiers. eg. `db1.coll1,db2.coll2`. |
| connection.options | String | No | - | The ampersand-separated connection options of MongoDB. eg. `replicaSet=test&connectTimeoutMS=300000`. |
| batch.size | Long | No | 1024 | The cursor batch size. |
| poll.max.batch.size | Enum | No | 1024 | Maximum number of change stream documents to include in a single batch when polling for new data. |
| poll.await.time.ms | Long | No | 1000 | The amount of time to wait before checking for new results on the change stream. |
| heartbeat.interval.ms | String | No | 0 | The length of time in milliseconds between sending heartbeat messages. Use 0 to disable. |
| incremental.snapshot.chunk.size.mb | Long | No | 64 | The chunk size mb of incremental snapshot. |
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. |
### Tips:
> 1.If the collection changes at a slow pace, it is strongly recommended to set an appropriate value greater than 0 for the heartbeat.interval.ms parameter. When we recover a Seatunnel job from a checkpoint or savepoint, the heartbeat events can push the resumeToken forward to avoid its expiration.<br/>
> 2.MongoDB has a limit of 16MB for a single document. Change documents include additional information, so even if the original document is not larger than 15MB, the change document may exceed the 16MB limit, resulting in the termination of the Change Stream operation.<br/>
> 3.It is recommended to use immutable shard keys. In MongoDB, shard keys allow modifications after transactions are enabled, but changing the shard key can cause frequent shard migrations, resulting in additional performance overhead. Additionally, modifying the shard key can also cause the Update Lookup feature to become ineffective, leading to inconsistent results in CDC (Change Data Capture) scenarios.<br/>
## How to Create a MongoDB CDC Data Synchronization Jobs
### CDC Data Print to Client
The following example demonstrates how to create a data synchronization job that reads cdc data from MongoDB and prints it on the local client:
```hocon
env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "STREAMING"
execution.checkpoint.interval = 5000
}
source {
MongoDB-CDC {
hosts = "mongo0:27017"
database = ["inventory"]
collection = ["inventory.products"]
username = stuser
password = stpw
schema = {
fields {
"_id" : string,
"name" : string,
"description" : string,
"weight" : string
}
}
}
}
# Console printing of the read Mongodb data
sink {
Console {
parallelism = 1
}
}
```
## CDC Data Write to MysqlDB
The following example demonstrates how to create a data synchronization job that reads cdc data from MongoDB and write to mysql database:
```hocon
env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "STREAMING"
execution.checkpoint.interval = 5000
}
source {
MongoDB-CDC {
hosts = "mongo0:27017"
database = ["inventory"]
collection = ["inventory.products"]
username = stuser
password = stpw
}
}
sink {
jdbc {
url = "jdbc:mysql://mysql_cdc_e2e:3306"
driver = "com.mysql.cj.jdbc.Driver"
user = "st_user"
password = "seatunnel"
generate_sink_sql = true
# You need to configure both database and table
database = mongodb_cdc
table = products
primary_keys = ["_id"]
}
}
```
## Multi-table Synchronization
The following example demonstrates how to create a data synchronization job that read the cdc data of multiple library tables mongodb and prints it on the local client:
```hocon
env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "STREAMING"
execution.checkpoint.interval = 5000
}
source {
MongoDB-CDC {
hosts = "mongo0:27017"
database = ["inventory","crm"]
collection = ["inventory.products","crm.test"]
username = stuser
password = stpw
}
}
# Console printing of the read Mongodb data
sink {
Console {
parallelism = 1
}
}
```
### Tips:
> 1.The cdc synchronization of multiple library tables cannot specify the schema, and can only output json data downstream.
> This is because MongoDB does not provide metadata information for querying, so if you want to support multiple tables, all tables can only be read as one structure.
## Regular Expression Matching for Multiple Tables
The following example demonstrates how to create a data synchronization job that through regular expression read the data of multiple library tables mongodb and prints it on the local client:
| Matching example | Expressions | | Describe |
|------------------|-------------|---|----------------------------------------------------------------------------------------|
| Prefix matching | ^(test).* | | Match the database name or table name with the prefix test, such as test1, test2, etc. |
| Suffix matching | .*[p$] | | Match the database name or table name with the suffix p, such as cdcp, edcp, etc. |
```hocon
env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "STREAMING"
execution.checkpoint.interval = 5000
}
source {
MongoDB-CDC {
hosts = "mongo0:27017"
# So this example is used (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[5-8]|tt),matching txc.tt、test2.test5.
database = ["(^(test).*|^(tpc).*|txc|.*[p$]|t{2})"]
collection = ["(t[5-8]|tt)"]
username = stuser
password = stpw
}
}
# Console printing of the read Mongodb data
sink {
Console {
parallelism = 1
}
}
```
## Changelog
- [Feature]Add MongoDB CDC Source Connector([4923](https://github.com/apache/seatunnel/pull/4923))
### next version
1 change: 1 addition & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ seatunnel.sink.Doris = connector-doris
seatunnel.source.Maxcompute = connector-maxcompute
seatunnel.sink.Maxcompute = connector-maxcompute
seatunnel.source.MySQL-CDC = connector-cdc-mysql
seatunnel.source.MongoDB-CDC = connector-cdc-mongodb
seatunnel.sink.S3Redshift = connector-s3-redshift
seatunnel.source.TDengine = connector-tdengine
seatunnel.sink.TDengine = connector-tdengine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
Expand All @@ -39,8 +38,6 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getTableId;

/**
* Fetcher to fetch data from table split, the split is the incremental split {@link
* IncrementalSplit}.
Expand Down Expand Up @@ -150,11 +147,14 @@ public void close() {
private boolean shouldEmit(SourceRecord sourceRecord) {
if (taskContext.isDataChangeRecord(sourceRecord)) {
Offset position = taskContext.getStreamOffset(sourceRecord);
TableId tableId = getTableId(sourceRecord);
// TODO: The sourceRecord from MongoDB CDC and MySQL CDC are inconsistent. For
// compatibility, the getTableId method is commented out for now.
// TableId tableId = getTableId(sourceRecord);
if (!taskContext.isExactlyOnce()) {
log.trace(
"The table {} is not support exactly-once, so ignore the watermark check",
tableId);
// log.trace(
// "The table {} is not support exactly-once, so ignore the
// watermark check",
// tableId);
return position.isAfter(splitStartWatermark);
}
// TODO only the table who captured snapshot splits need to filter( Used to support
Expand Down
Loading

0 comments on commit eef4e59

Please sign in to comment.