From d729fcba4cd06d348d9714b0af01962423f47dd3 Mon Sep 17 00:00:00 2001
From: monster <60029759+MonsterChenzhuo@users.noreply.github.com>
Date: Mon, 17 Jul 2023 17:49:47 +0800
Subject: [PATCH 01/43] [Feature][connector-v2][mongodbcdc]Support source
mongodb cdc (#4923)
---
LICENSE | 3 +-
config/plugin_config | 1 +
docs/en/connector-v2/source/MongoDB-CDC.md | 282 +++++++++
plugin-mapping.properties | 1 +
.../IncrementalSourceStreamFetcher.java | 14 +-
.../connector-cdc-mongodb/pom.xml | 87 +++
.../cdc/mongodb/MongodbIncrementalSource.java | 132 +++++
.../MongodbIncrementalSourceFactory.java | 104 ++++
.../mongodb/config/MongodbSourceConfig.java | 122 ++++
.../config/MongodbSourceConfigProvider.java | 175 ++++++
.../mongodb/config/MongodbSourceOptions.java | 255 ++++++++
.../exception/MongodbConnectorException.java | 28 +
.../internal/MongodbClientProvider.java | 45 ++
...MongoDBConnectorDeserializationSchema.java | 553 ++++++++++++++++++
.../mongodb/sender/SerializableFunction.java | 24 +
.../source/dialect/MongodbDialect.java | 146 +++++
.../source/fetch/MongodbFetchTaskContext.java | 210 +++++++
.../source/fetch/MongodbScanFetchTask.java | 253 ++++++++
.../source/fetch/MongodbStreamFetchTask.java | 359 ++++++++++++
.../source/offset/ChangeStreamDescriptor.java | 73 +++
.../source/offset/ChangeStreamOffset.java | 94 +++
.../offset/ChangeStreamOffsetFactory.java | 62 ++
.../splitters/MongodbChunkSplitter.java | 45 ++
.../splitters/SampleBucketSplitStrategy.java | 139 +++++
.../splitters/ShardedSplitStrategy.java | 108 ++++
.../source/splitters/SingleSplitStrategy.java | 54 ++
.../source/splitters/SplitContext.java | 93 +++
.../source/splitters/SplitStrategy.java | 57 ++
.../splitters/SplitVectorSplitStrategy.java | 119 ++++
.../cdc/mongodb/utils/BsonUtils.java | 325 ++++++++++
.../cdc/mongodb/utils/ChunkUtils.java | 45 ++
.../utils/CollectionDiscoveryUtils.java | 171 ++++++
.../cdc/mongodb/utils/MongodbRecordUtils.java | 165 ++++++
.../cdc/mongodb/utils/MongodbUtils.java | 407 +++++++++++++
.../cdc/mongodb/utils/ResumeToken.java | 80 +++
.../MongodbIncrementalSourceFactoryTest.java | 30 +
.../cdc/mysql/testutils/UniqueDatabase.java | 10 +-
seatunnel-connectors-v2/connector-cdc/pom.xml | 1 +
seatunnel-dist/pom.xml | 6 +
.../connector-cdc-mongodb-e2e/pom.xml | 71 +++
.../test/java/mongodb/MongoDBContainer.java | 240 ++++++++
.../src/test/java/mongodb/MongodbCDCIT.java | 266 +++++++++
.../src/test/resources/ddl/inventory.js | 24 +
.../src/test/resources/ddl/inventoryDDL.js | 32 +
.../src/test/resources/ddl/mongodb_cdc.sql | 32 +
.../test/resources/docker/mongodb/random.key | 34 ++
.../test/resources/docker/mongodb/setup.js | 39 ++
.../src/test/resources/log4j2-test.properties | 29 +
.../test/resources/mongodbcdc_to_mysql.conf | 59 ++
.../seatunnel-connector-v2-e2e/pom.xml | 1 +
50 files changed, 5696 insertions(+), 9 deletions(-)
create mode 100644 docs/en/connector-v2/source/MongoDB-CDC.md
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/pom.xml
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfig.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfigProvider.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/exception/MongodbConnectorException.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/internal/MongodbClientProvider.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/SerializableFunction.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbScanFetchTask.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbStreamFetchTask.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/offset/ChangeStreamDescriptor.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/offset/ChangeStreamOffset.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/offset/ChangeStreamOffsetFactory.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/splitters/MongodbChunkSplitter.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/splitters/SampleBucketSplitStrategy.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/splitters/ShardedSplitStrategy.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/splitters/SingleSplitStrategy.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/splitters/SplitContext.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/splitters/SplitStrategy.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/splitters/SplitVectorSplitStrategy.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/BsonUtils.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ChunkUtils.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/CollectionDiscoveryUtils.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbUtils.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java
create mode 100644 seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/source/MongodbIncrementalSourceFactoryTest.java
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/pom.xml
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongoDBContainer.java
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventory.js
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryDDL.js
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/mongodb_cdc.sql
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/docker/mongodb/random.key
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/docker/mongodb/setup.js
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/log4j2-test.properties
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf
diff --git a/LICENSE b/LICENSE
index bd06a03806b3..adabba50de63 100644
--- a/LICENSE
+++ b/LICENSE
@@ -219,6 +219,7 @@ seatunnel-connectors-v2/connector-cdc/connector-base/src/main/java/org/apache/se
seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql from https://github.com/ververica/flink-cdc-connectors
seatunnel-connectors-v2/connector-cdc/connector-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium from https://github.com/ververica/flink-cdc-connectors
seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java from https://github.com/debezium/debezium
+seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb from https://github.com/ververica/flink-cdc-connectors
generate_client_protocol.sh from https://github.com/hazelcast/hazelcast
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java from https://github.com/hazelcast/hazelcast
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java from https://github.com/hazelcast/hazelcast
@@ -239,4 +240,4 @@ seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLEngine.java from https://github.com/JSQLParser/JSqlParser
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLType.java from https://github.com/JSQLParser/JSqlParser
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFilter.java from https://github.com/JSQLParser/JSqlParser
-seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFunction.java from https://github.com/JSQLParser/JSqlParser
\ No newline at end of file
+seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFunction.java from https://github.com/JSQLParser/JSqlParser
diff --git a/config/plugin_config b/config/plugin_config
index 98c9fa8c2c69..95b952b31bf1 100644
--- a/config/plugin_config
+++ b/config/plugin_config
@@ -24,6 +24,7 @@ connector-amazondynamodb
connector-assert
connector-cassandra
connector-cdc-mysql
+connector-cdc-mongodb
connector-cdc-sqlserver
connector-clickhouse
connector-datahub
diff --git a/docs/en/connector-v2/source/MongoDB-CDC.md b/docs/en/connector-v2/source/MongoDB-CDC.md
new file mode 100644
index 000000000000..cb7c2f32acf5
--- /dev/null
+++ b/docs/en/connector-v2/source/MongoDB-CDC.md
@@ -0,0 +1,282 @@
+# MongoDB CDC
+
+> MongoDB CDC source connector
+
+## Support Those Engines
+
+> SeaTunnel Zeta
+
+## Key Features
+
+- [ ] [batch](../../concept/connector-v2-features.md)
+- [x] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [x] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Description
+
+The MongoDB CDC connector allows for reading snapshot data and incremental data from MongoDB database.
+
+## Supported DataSource Info
+
+In order to use the Mongodb CDC connector, the following dependencies are required.
+They can be downloaded via install-plugin.sh or from the Maven central repository.
+
+| Datasource | Supported Versions | Dependency |
+|------------|--------------------|-------------------------------------------------------------------------------------------------------------------|
+| MongoDB | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-cdc-mongodb) |
+
+## Availability Settings
+
+1.MongoDB version: MongoDB version >= 4.0.
+
+2.Cluster deployment: replica sets or sharded clusters.
+
+3.Storage Engine: WiredTiger Storage Engine.
+
+4.Permissions:changeStream and read
+
+```shell
+use admin;
+db.createRole(
+ {
+ role: "strole",
+ privileges: [{
+ resource: { db: "", collection: "" },
+ actions: [
+ "splitVector",
+ "listDatabases",
+ "listCollections",
+ "collStats",
+ "find",
+ "changeStream" ]
+ }],
+ roles: [
+ { role: 'read', db: 'config' }
+ ]
+ }
+);
+
+db.createUser(
+ {
+ user: 'stuser',
+ pwd: 'stpw',
+ roles: [
+ { role: 'strole', db: 'admin' }
+ ]
+ }
+);
+```
+
+## Data Type Mapping
+
+The following table lists the field data type mapping from MongoDB BSON type to Seatunnel data type.
+
+| MongoDB BSON type | Seatunnel Data type |
+|-------------------|---------------------|
+| ObjectId | STRING |
+| String | STRING |
+| Boolean | BOOLEAN |
+| Binary | BINARY |
+| Int32 | INTEGER |
+| Int64 | BIGINT |
+| Double | DOUBLE |
+| Decimal128 | DECIMAL |
+| Date | Date |
+| Timestamp | Timestamp |
+| Object | ROW |
+| Array | ARRAY |
+
+For specific types in MongoDB, we use Extended JSON format to map them to Seatunnel STRING type.
+
+| MongoDB BSON type | Seatunnel STRING |
+|-------------------|----------------------------------------------------------------------------------------------|
+| Symbol | {"_value": {"$symbol": "12"}} |
+| RegularExpression | {"_value": {"$regularExpression": {"pattern": "^9$", "options": "i"}}} |
+| JavaScript | {"_value": {"$code": "function() { return 10; }"}} |
+| DbPointer | {"_value": {"$dbPointer": {"$ref": "db.coll", "$id": {"$oid": "63932a00da01604af329e33c"}}}} |
+
+**Tips**
+
+> 1.When using the DECIMAL type in SeaTunnel, be aware that the maximum range cannot exceed 34 digits, which means you should use decimal(34, 18).
+
+## Source Options
+
+| Name | Type | Required | Default | Description |
+|------------------------------------|--------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| hosts | String | Yes | - | The comma-separated list of hostname and port pairs of the MongoDB servers. eg. `localhost:27017,localhost:27018` |
+| username | String | No | - | Name of the database user to be used when connecting to MongoDB. |
+| password | String | No | - | Password to be used when connecting to MongoDB. |
+| database | List | Yes | - | Name of the database to watch for changes. If not set then all databases will be captured. The database also supports regular expressions to monitor multiple databases matching the regular expression. eg. `db1,db2`. |
+| collection | List | Yes | - | Name of the collection in the database to watch for changes. If not set then all collections will be captured. The collection also supports regular expressions to monitor multiple collections matching fully-qualified collection identifiers. eg. `db1.coll1,db2.coll2`. |
+| connection.options | String | No | - | The ampersand-separated connection options of MongoDB. eg. `replicaSet=test&connectTimeoutMS=300000`. |
+| batch.size | Long | No | 1024 | The cursor batch size. |
+| poll.max.batch.size | Enum | No | 1024 | Maximum number of change stream documents to include in a single batch when polling for new data. |
+| poll.await.time.ms | Long | No | 1000 | The amount of time to wait before checking for new results on the change stream. |
+| heartbeat.interval.ms | String | No | 0 | The length of time in milliseconds between sending heartbeat messages. Use 0 to disable. |
+| incremental.snapshot.chunk.size.mb | Long | No | 64 | The chunk size mb of incremental snapshot. |
+| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. |
+
+### Tips:
+
+> 1.If the collection changes at a slow pace, it is strongly recommended to set an appropriate value greater than 0 for the heartbeat.interval.ms parameter. When we recover a Seatunnel job from a checkpoint or savepoint, the heartbeat events can push the resumeToken forward to avoid its expiration.
+> 2.MongoDB has a limit of 16MB for a single document. Change documents include additional information, so even if the original document is not larger than 15MB, the change document may exceed the 16MB limit, resulting in the termination of the Change Stream operation.
+> 3.It is recommended to use immutable shard keys. In MongoDB, shard keys allow modifications after transactions are enabled, but changing the shard key can cause frequent shard migrations, resulting in additional performance overhead. Additionally, modifying the shard key can also cause the Update Lookup feature to become ineffective, leading to inconsistent results in CDC (Change Data Capture) scenarios.
+
+## How to Create a MongoDB CDC Data Synchronization Jobs
+
+### CDC Data Print to Client
+
+The following example demonstrates how to create a data synchronization job that reads cdc data from MongoDB and prints it on the local client:
+
+```hocon
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ execution.checkpoint.interval = 5000
+}
+
+source {
+ MongoDB-CDC {
+ hosts = "mongo0:27017"
+ database = ["inventory"]
+ collection = ["inventory.products"]
+ username = stuser
+ password = stpw
+ schema = {
+ fields {
+ "_id" : string,
+ "name" : string,
+ "description" : string,
+ "weight" : string
+ }
+ }
+ }
+}
+
+# Console printing of the read Mongodb data
+sink {
+ Console {
+ parallelism = 1
+ }
+}
+```
+
+## CDC Data Write to MysqlDB
+
+The following example demonstrates how to create a data synchronization job that reads cdc data from MongoDB and write to mysql database:
+
+```hocon
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ execution.checkpoint.interval = 5000
+}
+
+source {
+ MongoDB-CDC {
+ hosts = "mongo0:27017"
+ database = ["inventory"]
+ collection = ["inventory.products"]
+ username = stuser
+ password = stpw
+ }
+}
+
+sink {
+ jdbc {
+ url = "jdbc:mysql://mysql_cdc_e2e:3306"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "st_user"
+ password = "seatunnel"
+
+ generate_sink_sql = true
+ # You need to configure both database and table
+ database = mongodb_cdc
+ table = products
+ primary_keys = ["_id"]
+ }
+}
+```
+
+## Multi-table Synchronization
+
+The following example demonstrates how to create a data synchronization job that read the cdc data of multiple library tables mongodb and prints it on the local client:
+
+```hocon
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ execution.checkpoint.interval = 5000
+}
+
+source {
+ MongoDB-CDC {
+ hosts = "mongo0:27017"
+ database = ["inventory","crm"]
+ collection = ["inventory.products","crm.test"]
+ username = stuser
+ password = stpw
+ }
+}
+
+# Console printing of the read Mongodb data
+sink {
+ Console {
+ parallelism = 1
+ }
+}
+```
+
+### Tips:
+
+> 1.The cdc synchronization of multiple library tables cannot specify the schema, and can only output json data downstream.
+> This is because MongoDB does not provide metadata information for querying, so if you want to support multiple tables, all tables can only be read as one structure.
+
+## Regular Expression Matching for Multiple Tables
+
+The following example demonstrates how to create a data synchronization job that through regular expression read the data of multiple library tables mongodb and prints it on the local client:
+
+| Matching example | Expressions | | Describe |
+|------------------|-------------|---|----------------------------------------------------------------------------------------|
+| Prefix matching | ^(test).* | | Match the database name or table name with the prefix test, such as test1, test2, etc. |
+| Suffix matching | .*[p$] | | Match the database name or table name with the suffix p, such as cdcp, edcp, etc. |
+
+```hocon
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ execution.checkpoint.interval = 5000
+}
+
+source {
+ MongoDB-CDC {
+ hosts = "mongo0:27017"
+ # So this example is used (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[5-8]|tt),matching txc.tt、test2.test5.
+ database = ["(^(test).*|^(tpc).*|txc|.*[p$]|t{2})"]
+ collection = ["(t[5-8]|tt)"]
+ username = stuser
+ password = stpw
+ }
+}
+
+# Console printing of the read Mongodb data
+sink {
+ Console {
+ parallelism = 1
+ }
+}
+```
+
+## Changelog
+
+- [Feature]Add MongoDB CDC Source Connector([4923](https://github.com/apache/seatunnel/pull/4923))
+
+### next version
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index de6593b4523c..3943159aefaf 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -99,6 +99,7 @@ seatunnel.sink.Doris = connector-doris
seatunnel.source.Maxcompute = connector-maxcompute
seatunnel.sink.Maxcompute = connector-maxcompute
seatunnel.source.MySQL-CDC = connector-cdc-mysql
+seatunnel.source.MongoDB-CDC = connector-cdc-mongodb
seatunnel.sink.S3Redshift = connector-s3-redshift
seatunnel.source.TDengine = connector-tdengine
seatunnel.sink.TDengine = connector-tdengine
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index 5257064dc1fe..c4d5fdfd68fe 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -28,7 +28,6 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
-import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
@@ -39,8 +38,6 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getTableId;
-
/**
* Fetcher to fetch data from table split, the split is the incremental split {@link
* IncrementalSplit}.
@@ -150,11 +147,14 @@ public void close() {
private boolean shouldEmit(SourceRecord sourceRecord) {
if (taskContext.isDataChangeRecord(sourceRecord)) {
Offset position = taskContext.getStreamOffset(sourceRecord);
- TableId tableId = getTableId(sourceRecord);
+ // TODO: The sourceRecord from MongoDB CDC and MySQL CDC are inconsistent. For
+ // compatibility, the getTableId method is commented out for now.
+ // TableId tableId = getTableId(sourceRecord);
if (!taskContext.isExactlyOnce()) {
- log.trace(
- "The table {} is not support exactly-once, so ignore the watermark check",
- tableId);
+ // log.trace(
+ // "The table {} is not support exactly-once, so ignore the
+ // watermark check",
+ // tableId);
return position.isAfter(splitStartWatermark);
}
// TODO only the table who captured snapshot splits need to filter( Used to support
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/pom.xml b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/pom.xml
new file mode 100644
index 000000000000..e22560ed0383
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/pom.xml
@@ -0,0 +1,87 @@
+
+
+
+ 4.0.0
+
+ org.apache.seatunnel
+ connector-cdc
+ ${revision}
+
+ connector-cdc-mongodb
+ SeaTunnel : Connectors V2 : CDC : Mongodb
+
+
+ 4.7.1
+ 1.11.1
+ 1.10.1
+ 4.13.2
+
+
+
+
+ org.apache.seatunnel
+ connector-cdc-base
+ ${project.version}
+ compile
+
+
+ io.debezium
+ debezium-connector-mongodb
+ ${debezium.version}
+ compile
+
+
+ org.mongodb.kafka
+ mongo-kafka-connect
+ ${mongo-kafka-connect.version}
+
+
+ org.mongodb
+ mongodb-driver-sync
+
+
+ org.apache.kafka
+ connect-api
+
+
+ org.apache.avro
+ avro
+
+
+
+
+ org.apache.avro
+ avro
+ ${avro.version}
+
+
+ org.mongodb
+ mongodb-driver-sync
+ ${mongo.driver.version}
+
+
+ junit
+ junit
+ ${junit.vserion}
+ test
+
+
+
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
new file mode 100644
index 000000000000..41191cfa52ba
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
+import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
+import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
+import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
+import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.seatunnel.connectors.cdc.debezium.row.DebeziumJsonDeserializeSchema;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfigProvider;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.MongoDBConnectorDeserializationSchema;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.dialect.MongodbDialect;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffsetFactory;
+
+import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
+
+import javax.annotation.Nonnull;
+
+import java.util.Optional;
+
+@NoArgsConstructor
+@AutoService(SeaTunnelSource.class)
+public class MongodbIncrementalSource extends IncrementalSource
+ implements SupportParallelism {
+
+ static final String IDENTIFIER = "MongoDB-CDC";
+
+ public MongodbIncrementalSource(
+ ReadonlyConfig options, SeaTunnelDataType dataType) {
+ super(options, dataType);
+ }
+
+ @Override
+ public Option getStartupModeOption() {
+ return MongodbSourceOptions.STARTUP_MODE;
+ }
+
+ @Override
+ public Option getStopModeOption() {
+ return MongodbSourceOptions.STOP_MODE;
+ }
+
+ @Override
+ public String getPluginName() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public SourceConfig.Factory createSourceConfigFactory(
+ @Nonnull ReadonlyConfig config) {
+ MongodbSourceConfigProvider.Builder builder =
+ MongodbSourceConfigProvider.newBuilder()
+ .hosts(config.get(MongodbSourceOptions.HOSTS))
+ .validate();
+ Optional.ofNullable(config.get(MongodbSourceOptions.DATABASE))
+ .ifPresent(builder::databaseList);
+ Optional.ofNullable(config.get(MongodbSourceOptions.COLLECTION))
+ .ifPresent(builder::collectionList);
+ Optional.ofNullable(config.get(MongodbSourceOptions.USERNAME)).ifPresent(builder::username);
+ Optional.ofNullable(config.get(MongodbSourceOptions.PASSWORD)).ifPresent(builder::password);
+ Optional.ofNullable(config.get(MongodbSourceOptions.CONNECTION_OPTIONS))
+ .ifPresent(builder::connectionOptions);
+ Optional.ofNullable(config.get(MongodbSourceOptions.BATCH_SIZE))
+ .ifPresent(builder::batchSize);
+ Optional.ofNullable(config.get(MongodbSourceOptions.POLL_MAX_BATCH_SIZE))
+ .ifPresent(builder::pollMaxBatchSize);
+ Optional.ofNullable(config.get(MongodbSourceOptions.POLL_AWAIT_TIME_MILLIS))
+ .ifPresent(builder::pollAwaitTimeMillis);
+ Optional.ofNullable(config.get(MongodbSourceOptions.HEARTBEAT_INTERVAL_MILLIS))
+ .ifPresent(builder::heartbeatIntervalMillis);
+ Optional.ofNullable(config.get(MongodbSourceOptions.HEARTBEAT_INTERVAL_MILLIS))
+ .ifPresent(builder::splitMetaGroupSize);
+ Optional.ofNullable(config.get(MongodbSourceOptions.INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB))
+ .ifPresent(builder::splitSizeMB);
+ Optional.ofNullable(startupConfig).ifPresent(builder::startupOptions);
+ Optional.ofNullable(stopConfig).ifPresent(builder::stopOptions);
+ return builder;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public DebeziumDeserializationSchema createDebeziumDeserializationSchema(
+ ReadonlyConfig config) {
+ SeaTunnelDataType physicalRowType;
+ if (dataType == null) {
+ return (DebeziumDeserializationSchema)
+ new DebeziumJsonDeserializeSchema(
+ config.get(MongodbSourceOptions.DEBEZIUM_PROPERTIES));
+ } else {
+ physicalRowType = dataType;
+ return (DebeziumDeserializationSchema)
+ new MongoDBConnectorDeserializationSchema(physicalRowType, physicalRowType);
+ }
+ }
+
+ @Override
+ public DataSourceDialect createDataSourceDialect(ReadonlyConfig config) {
+ return new MongodbDialect();
+ }
+
+ @Override
+ public OffsetFactory createOffsetFactory(ReadonlyConfig config) {
+ return new ChangeStreamOffsetFactory();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
new file mode 100644
index 000000000000..6215afb74ef0
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.SupportMultipleTable;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions;
+
+import com.google.auto.service.AutoService;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+@AutoService(Factory.class)
+public class MongodbIncrementalSourceFactory implements TableSourceFactory, SupportMultipleTable {
+ @Override
+ public String factoryIdentifier() {
+ return MongodbIncrementalSource.IDENTIFIER;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return MongodbSourceOptions.getBaseRule()
+ .required(
+ MongodbSourceOptions.HOSTS,
+ MongodbSourceOptions.DATABASE,
+ MongodbSourceOptions.COLLECTION)
+ .optional(
+ MongodbSourceOptions.USERNAME,
+ MongodbSourceOptions.PASSWORD,
+ MongodbSourceOptions.CONNECTION_OPTIONS,
+ MongodbSourceOptions.BATCH_SIZE,
+ MongodbSourceOptions.POLL_MAX_BATCH_SIZE,
+ MongodbSourceOptions.POLL_AWAIT_TIME_MILLIS,
+ MongodbSourceOptions.HEARTBEAT_INTERVAL_MILLIS,
+ MongodbSourceOptions.INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB,
+ MongodbSourceOptions.STARTUP_MODE,
+ MongodbSourceOptions.STOP_MODE)
+ .build();
+ }
+
+ @Override
+ public Class extends SeaTunnelSource> getSourceClass() {
+ return MongodbIncrementalSource.class;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public
+ TableSource createSource(TableFactoryContext context) {
+ return () -> {
+ SeaTunnelDataType dataType;
+ if (context.getCatalogTables().size() == 1) {
+ dataType =
+ context.getCatalogTables().get(0).getTableSchema().toPhysicalRowDataType();
+ } else {
+ Map rowTypeMap = new HashMap<>();
+ for (CatalogTable catalogTable : context.getCatalogTables()) {
+ rowTypeMap.put(
+ catalogTable.getTableId().toTablePath().toString(),
+ catalogTable.getTableSchema().toPhysicalRowDataType());
+ }
+ dataType = new MultipleRowType(rowTypeMap);
+ }
+ return (SeaTunnelSource)
+ new MongodbIncrementalSource<>(context.getOptions(), dataType);
+ };
+ }
+
+ @Override
+ public Result applyTables(@Nonnull TableFactoryContext context) {
+ return Result.of(context.getCatalogTables(), Collections.emptyList());
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfig.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfig.java
new file mode 100644
index 000000000000..049b37db3634
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfig.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config;
+
+import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
+import org.apache.seatunnel.connectors.cdc.base.config.StartupConfig;
+import org.apache.seatunnel.connectors.cdc.base.config.StopConfig;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+import java.util.List;
+
+import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.buildConnectionString;
+import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+@Getter
+@EqualsAndHashCode
+public class MongodbSourceConfig implements SourceConfig {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String hosts;
+
+ private final String username;
+
+ private final String password;
+
+ private final List databaseList;
+
+ private final List collectionList;
+
+ private final String connectionString;
+
+ private final int batchSize;
+
+ private final int pollAwaitTimeMillis;
+
+ private final int pollMaxBatchSize;
+
+ private final boolean updateLookup;
+
+ private final StartupConfig startupOptions;
+
+ private final StopConfig stopOptions;
+
+ private final int heartbeatIntervalMillis;
+
+ private final int splitMetaGroupSize;
+
+ private final int splitSizeMB;
+
+ MongodbSourceConfig(
+ String hosts,
+ String username,
+ String password,
+ List databaseList,
+ List collectionList,
+ String connectionOptions,
+ int batchSize,
+ int pollAwaitTimeMillis,
+ int pollMaxBatchSize,
+ boolean updateLookup,
+ StartupConfig startupOptions,
+ StopConfig stopOptions,
+ int heartbeatIntervalMillis,
+ int splitMetaGroupSize,
+ int splitSizeMB) {
+ this.hosts = checkNotNull(hosts);
+ this.username = username;
+ this.password = password;
+ this.databaseList = databaseList;
+ this.collectionList = collectionList;
+ this.connectionString =
+ buildConnectionString(username, password, hosts, connectionOptions)
+ .getConnectionString();
+ this.batchSize = batchSize;
+ this.pollAwaitTimeMillis = pollAwaitTimeMillis;
+ this.pollMaxBatchSize = pollMaxBatchSize;
+ this.updateLookup = updateLookup;
+ this.startupOptions = startupOptions;
+ this.stopOptions = stopOptions;
+ this.heartbeatIntervalMillis = heartbeatIntervalMillis;
+ this.splitMetaGroupSize = splitMetaGroupSize;
+ this.splitSizeMB = splitSizeMB;
+ }
+
+ @Override
+ public StartupConfig getStartupConfig() {
+ return startupOptions;
+ }
+
+ @Override
+ public StopConfig getStopConfig() {
+ return stopOptions;
+ }
+
+ @Override
+ public int getSplitSize() {
+ return splitSizeMB;
+ }
+
+ @Override
+ public boolean isExactlyOnce() {
+ return true;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfigProvider.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfigProvider.java
new file mode 100644
index 000000000000..ebe7af13e0c6
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfigProvider.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config;
+
+import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
+import org.apache.seatunnel.connectors.cdc.base.config.StartupConfig;
+import org.apache.seatunnel.connectors.cdc.base.config.StopConfig;
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
+
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.seatunnel.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT;
+import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.BATCH_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
+import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB;
+import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.POLL_AWAIT_TIME_MILLIS;
+import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.POLL_MAX_BATCH_SIZE;
+import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+public class MongodbSourceConfigProvider {
+
+ private MongodbSourceConfigProvider() {}
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder implements SourceConfig.Factory {
+ private String hosts;
+ private String username;
+ private String password;
+ private List databaseList;
+ private List collectionList;
+ private String connectionOptions;
+ private int batchSize = BATCH_SIZE.defaultValue();
+ private int pollAwaitTimeMillis = POLL_AWAIT_TIME_MILLIS.defaultValue();
+ private int pollMaxBatchSize = POLL_MAX_BATCH_SIZE.defaultValue();
+ private StartupConfig startupOptions;
+ private StopConfig stopOptions;
+ private int heartbeatIntervalMillis = HEARTBEAT_INTERVAL_MILLIS.defaultValue();
+ private int splitMetaGroupSize = 2;
+ private int splitSizeMB = INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB.defaultValue();
+
+ public Builder hosts(String hosts) {
+ this.hosts = hosts;
+ return this;
+ }
+
+ public Builder connectionOptions(String connectionOptions) {
+ this.connectionOptions = connectionOptions;
+ return this;
+ }
+
+ public Builder username(String username) {
+ this.username = username;
+ return this;
+ }
+
+ public Builder password(String password) {
+ this.password = password;
+ return this;
+ }
+
+ public Builder databaseList(List databases) {
+ this.databaseList = databases;
+ return this;
+ }
+
+ public Builder collectionList(List collections) {
+ this.collectionList = collections;
+ return this;
+ }
+
+ public Builder batchSize(int batchSize) {
+ checkArgument(batchSize >= 0);
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ public Builder pollAwaitTimeMillis(int pollAwaitTimeMillis) {
+ checkArgument(pollAwaitTimeMillis > 0);
+ this.pollAwaitTimeMillis = pollAwaitTimeMillis;
+ return this;
+ }
+
+ public Builder pollMaxBatchSize(int pollMaxBatchSize) {
+ checkArgument(pollMaxBatchSize > 0);
+ this.pollMaxBatchSize = pollMaxBatchSize;
+ return this;
+ }
+
+ public Builder startupOptions(StartupConfig startupOptions) {
+ this.startupOptions = Objects.requireNonNull(startupOptions);
+ if (startupOptions.getStartupMode() != StartupMode.INITIAL
+ && startupOptions.getStartupMode() != StartupMode.TIMESTAMP) {
+ throw new MongodbConnectorException(
+ ILLEGAL_ARGUMENT,
+ "Unsupported startup mode " + startupOptions.getStartupMode());
+ }
+ return this;
+ }
+
+ public Builder stopOptions(StopConfig stopOptions) {
+ this.stopOptions = Objects.requireNonNull(stopOptions);
+ if (stopOptions.getStopMode() != StopMode.NEVER) {
+ throw new MongodbConnectorException(
+ ILLEGAL_ARGUMENT,
+ String.format("The %s mode is not supported.", stopOptions.getStopMode()));
+ }
+ return this;
+ }
+
+ public Builder heartbeatIntervalMillis(int heartbeatIntervalMillis) {
+ checkArgument(heartbeatIntervalMillis >= 0);
+ this.heartbeatIntervalMillis = heartbeatIntervalMillis;
+ return this;
+ }
+
+ public Builder splitSizeMB(int splitSizeMB) {
+ checkArgument(splitSizeMB > 0);
+ this.splitSizeMB = splitSizeMB;
+ return this;
+ }
+
+ public Builder splitMetaGroupSize(int splitMetaGroupSize) {
+ this.splitMetaGroupSize = splitMetaGroupSize;
+ return this;
+ }
+
+ public Builder validate() {
+ checkNotNull(hosts, "hosts must be provided");
+ return this;
+ }
+
+ @Override
+ public MongodbSourceConfig create(int subtask) {
+ boolean updateLookup = true;
+ return new MongodbSourceConfig(
+ hosts,
+ username,
+ password,
+ databaseList,
+ collectionList,
+ connectionOptions,
+ batchSize,
+ pollAwaitTimeMillis,
+ pollMaxBatchSize,
+ updateLookup,
+ startupOptions,
+ stopOptions,
+ heartbeatIntervalMillis,
+ splitMetaGroupSize,
+ splitSizeMB);
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
new file mode 100644
index 000000000000..df73772e0718
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
+
+import org.bson.BsonDouble;
+import org.bson.json.JsonMode;
+import org.bson.json.JsonWriterSettings;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MongodbSourceOptions extends SourceOptions {
+
+ public static final String ENCODE_VALUE_FIELD = "_value";
+
+ public static final String CLUSTER_TIME_FIELD = "clusterTime";
+
+ public static final String TS_MS_FIELD = "ts_ms";
+
+ public static final String SOURCE_FIELD = "source";
+
+ public static final String SNAPSHOT_FIELD = "snapshot";
+
+ public static final String FALSE_FALSE = "false";
+
+ public static final String OPERATION_TYPE_INSERT = "insert";
+
+ public static final String SNAPSHOT_TRUE = "true";
+
+ public static final String ID_FIELD = "_id";
+
+ public static final String DOCUMENT_KEY = "documentKey";
+
+ public static final String NS_FIELD = "ns";
+
+ public static final String OPERATION_TYPE = "operationType";
+
+ public static final String TIMESTAMP_FIELD = "timestamp";
+
+ public static final String RESUME_TOKEN_FIELD = "resumeToken";
+
+ public static final String FULL_DOCUMENT = "fullDocument";
+
+ public static final String DB_FIELD = "db";
+
+ public static final String COLL_FIELD = "coll";
+
+ public static final int FAILED_TO_PARSE_ERROR = 9;
+
+ public static final int UNAUTHORIZED_ERROR = 13;
+
+ public static final int ILLEGAL_OPERATION_ERROR = 20;
+
+ public static final int UNKNOWN_FIELD_ERROR = 40415;
+
+ public static final String DROPPED_FIELD = "dropped";
+
+ public static final String MAX_FIELD = "max";
+
+ public static final String MIN_FIELD = "min";
+
+ public static final String ADD_NS_FIELD_NAME = "_ns_";
+
+ public static final String UUID_FIELD = "uuid";
+
+ public static final String SHARD_FIELD = "shard";
+
+ public static final String DIALECT_NAME = "MongoDB";
+
+ public static final BsonDouble COMMAND_SUCCEED_FLAG = new BsonDouble(1.0d);
+
+ public static final JsonWriterSettings DEFAULT_JSON_WRITER_SETTINGS =
+ JsonWriterSettings.builder().outputMode(JsonMode.EXTENDED).build();
+
+ public static final String OUTPUT_SCHEMA =
+ "{"
+ + " \"name\": \"ChangeStream\","
+ + " \"type\": \"record\","
+ + " \"fields\": ["
+ + " { \"name\": \"_id\", \"type\": \"string\" },"
+ + " { \"name\": \"operationType\", \"type\": [\"string\", \"null\"] },"
+ + " { \"name\": \"fullDocument\", \"type\": [\"string\", \"null\"] },"
+ + " { \"name\": \"source\","
+ + " \"type\": [{\"name\": \"source\", \"type\": \"record\", \"fields\": ["
+ + " {\"name\": \"ts_ms\", \"type\": \"long\"},"
+ + " {\"name\": \"snapshot\", \"type\": [\"string\", \"null\"] } ]"
+ + " }, \"null\" ] },"
+ + " { \"name\": \"ts_ms\", \"type\": [\"long\", \"null\"]},"
+ + " { \"name\": \"ns\","
+ + " \"type\": [{\"name\": \"ns\", \"type\": \"record\", \"fields\": ["
+ + " {\"name\": \"db\", \"type\": \"string\"},"
+ + " {\"name\": \"coll\", \"type\": [\"string\", \"null\"] } ]"
+ + " }, \"null\" ] },"
+ + " { \"name\": \"to\","
+ + " \"type\": [{\"name\": \"to\", \"type\": \"record\", \"fields\": ["
+ + " {\"name\": \"db\", \"type\": \"string\"},"
+ + " {\"name\": \"coll\", \"type\": [\"string\", \"null\"] } ]"
+ + " }, \"null\" ] },"
+ + " { \"name\": \"documentKey\", \"type\": [\"string\", \"null\"] },"
+ + " { \"name\": \"updateDescription\","
+ + " \"type\": [{\"name\": \"updateDescription\", \"type\": \"record\", \"fields\": ["
+ + " {\"name\": \"updatedFields\", \"type\": [\"string\", \"null\"]},"
+ + " {\"name\": \"removedFields\","
+ + " \"type\": [{\"type\": \"array\", \"items\": \"string\"}, \"null\"]"
+ + " }] }, \"null\"] },"
+ + " { \"name\": \"clusterTime\", \"type\": [\"string\", \"null\"] },"
+ + " { \"name\": \"txnNumber\", \"type\": [\"long\", \"null\"]},"
+ + " { \"name\": \"lsid\", \"type\": [{\"name\": \"lsid\", \"type\": \"record\","
+ + " \"fields\": [ {\"name\": \"id\", \"type\": \"string\"},"
+ + " {\"name\": \"uid\", \"type\": \"string\"}] }, \"null\"] }"
+ + " ]"
+ + "}";
+
+ public static final Option HOSTS =
+ Options.key("hosts")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The comma-separated list of hostname and port pairs of the MongoDB servers. "
+ + "eg. localhost:27017,localhost:27018");
+
+ public static final Option USERNAME =
+ Options.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Name of the database user to be used when connecting to MongoDB. "
+ + "This is required only when MongoDB is configured to use authentication.");
+
+ public static final Option PASSWORD =
+ Options.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Password to be used when connecting to MongoDB. "
+ + "This is required only when MongoDB is configured to use authentication.");
+
+ public static final Option> DATABASE =
+ Options.key("database")
+ .listType()
+ .noDefaultValue()
+ .withDescription("Name of the database to watch for changes.");
+
+ public static final Option> COLLECTION =
+ Options.key("collection")
+ .listType()
+ .noDefaultValue()
+ .withDescription(
+ "Name of the collection in the database to watch for changes.");
+
+ public static final Option CONNECTION_OPTIONS =
+ Options.key("connection.options")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The ampersand-separated MongoDB connection options. "
+ + "eg. replicaSet=test&connectTimeoutMS=300000");
+
+ public static final Option BATCH_SIZE =
+ Options.key("batch.size")
+ .intType()
+ .defaultValue(1024)
+ .withDescription("The cursor batch size. Defaults to 1024.");
+
+ public static final Option POLL_MAX_BATCH_SIZE =
+ Options.key("poll.max.batch.size")
+ .intType()
+ .defaultValue(1024)
+ .withDescription(
+ "Maximum number of change stream documents "
+ + "to include in a single batch when polling for new data. "
+ + "This setting can be used to limit the amount of data buffered internally in the connector. "
+ + "Defaults to 1024.");
+
+ public static final Option POLL_AWAIT_TIME_MILLIS =
+ Options.key("poll.await.time.ms")
+ .intType()
+ .defaultValue(1000)
+ .withDescription(
+ "The amount of time to wait before checking for new results on the change stream."
+ + "Defaults: 1000.");
+
+ public static final Option HEARTBEAT_INTERVAL_MILLIS =
+ Options.key("heartbeat.interval.ms")
+ .intType()
+ .defaultValue(0)
+ .withDescription(
+ "The length of time in milliseconds between sending heartbeat messages."
+ + "Heartbeat messages contain the post batch resume token and are sent when no source records "
+ + "have been published in the specified interval. This improves the resumability of the connector "
+ + "for low volume namespaces. Use 0 to disable. Defaults to 0.");
+
+ public static final Option INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB =
+ Options.key("incremental.snapshot.chunk.size.mb")
+ .intType()
+ .defaultValue(64)
+ .withDescription(
+ "The chunk size mb of incremental snapshot. Defaults to 64mb.");
+
+ public static final Option