Skip to content

Commit

Permalink
Fix the review content
Browse files Browse the repository at this point in the history
  • Loading branch information
chenqqq11 committed Jun 29, 2023
1 parent 069eb47 commit e29c90f
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 37 deletions.
3 changes: 2 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ seatunnel-connectors-v2/connector-cdc/connector-base/src/main/java/org/apache/se
seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql from https://github.com/ververica/flink-cdc-connectors
seatunnel-connectors-v2/connector-cdc/connector-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium from https://github.com/ververica/flink-cdc-connectors
seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java from https://github.com/debezium/debezium
seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb from https://github.com/ververica/flink-cdc-connectors
generate_client_protocol.sh from https://github.com/hazelcast/hazelcast
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java from https://github.com/hazelcast/hazelcast
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java from https://github.com/hazelcast/hazelcast
Expand All @@ -239,4 +240,4 @@ seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLEngine.java from https://github.com/JSQLParser/JSqlParser
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLType.java from https://github.com/JSQLParser/JSqlParser
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFilter.java from https://github.com/JSQLParser/JSqlParser
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFunction.java from https://github.com/JSQLParser/JSqlParser
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFunction.java from https://github.com/JSQLParser/JSqlParser
8 changes: 4 additions & 4 deletions docs/en/connector-v2/source/MongoDB-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ For specific types in MongoDB, we use Extended JSON format to map them to Seatun
## How to Create a MongoDB Data Synchronization Jobs
The following example demonstrates how to create a data synchronization job that reads data from MongoDB and prints it on the local client:
The following example demonstrates how to create a data synchronization job that reads cdc data from MongoDB and prints it on the local client:
```hocon
env {
Expand Down Expand Up @@ -163,7 +163,7 @@ sink {
}
```
The following example demonstrates how to create a data synchronization job that reads data from MongoDB and cdc write to mysql database:
The following example demonstrates how to create a data synchronization job that reads cdc data from MongoDB and write to mysql database:
```hocon
env {
Expand Down Expand Up @@ -199,7 +199,7 @@ sink {
}
```
The following example demonstrates how to create a data synchronization job that read the data of multiple library tables mongodb and prints it on the local client:
The following example demonstrates how to create a data synchronization job that read the cdc data of multiple library tables mongodb and prints it on the local client:
```hocon
env {
Expand Down Expand Up @@ -267,7 +267,7 @@ sink {
## Changelog
- Add MongoDB CDC Source Connector
- [Feature]Add MongoDB CDC Source Connector([4923](https://github.com/apache/seatunnel/pull/4923))
### next version
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,12 @@ public void execute(Context context) throws Exception {
BsonDocument startKey = (BsonDocument) snapshotSplit.getSplitStart()[1];
BsonDocument endKey = (BsonDocument) snapshotSplit.getSplitEnd()[1];
BsonDocument hint = (BsonDocument) snapshotSplit.getSplitStart()[0];

log.info(
"Initializing snapshot split processing: TableId={}, StartKey={}, EndKey={}, Hint={}",
snapshotSplit.getTableId(),
startKey,
endKey,
hint);
return collection
.find()
.min(startKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
<name>SeaTunnel : E2E : Connector V2 : CDC Mongodb</name>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<mysql.version>8.0.16</mysql.version>
</properties>

<dependencies>
Expand All @@ -55,6 +53,19 @@
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-jdbc</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class MongodbCDCIT extends TestSuiteBase implements TestResource {
private static final String SINK_SQL = "select name,description,weight from products";

private static final String MYSQL_DRIVER_JAR =
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar";
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";

private final UniqueDatabase inventoryDatabase =
new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
Expand All @@ -107,7 +107,7 @@ private static MySqlContainer createMySqlContainer() {
mySqlContainer.withUsername(MYSQL_USER_NAME);
mySqlContainer.withPassword(MYSQL_USER_PASSWORD);
// For local test use
mySqlContainer.setPortBindings(Collections.singletonList("3308:3306"));
// mySqlContainer.setPortBindings(Collections.singletonList("3308:3306"));
return mySqlContainer;
}

Expand All @@ -118,7 +118,7 @@ private static MySqlContainer createMySqlContainer() {
container.execInContainer(
"bash",
"-c",
"mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
"mkdir -p /tmp/seatunnel/Jdbc/lib && cd /tmp/seatunnel/Jdbc/lib && curl -O "
+ MYSQL_DRIVER_JAR);
Assertions.assertEquals(0, extraCommands.getExitCode());
};
Expand Down Expand Up @@ -176,31 +176,27 @@ public void testMongodbCdcToMysqlCheckDataE2e(TestContainer container) {
});

// insert update delete
// upsertDeleteSourceTable();
//
// await().atMost(240000, TimeUnit.MILLISECONDS)
// .untilAsserted(
// () -> {
// Assertions.assertIterableEquals(
// readMongodbData().stream()
// .peek(e -> e.remove("_id"))
// .map(Document::entrySet)
// .map(Set::stream)
// .map(
// entryStream ->
// entryStream
//
// .map(Map.Entry::getValue)
// .collect(
//
// Collectors.toCollection(
//
// ArrayList
//
// ::new)))
// .collect(Collectors.toList()),
// querySql());
// });
upsertDeleteSourceTable();

await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Assertions.assertIterableEquals(
readMongodbData().stream()
.peek(e -> e.remove("_id"))
.map(Document::entrySet)
.map(Set::stream)
.map(
entryStream ->
entryStream
.map(Map.Entry::getValue)
.collect(
Collectors.toCollection(
ArrayList
::new)))
.collect(Collectors.toList()),
querySql());
});
}

private Connection getJdbcConnection() throws SQLException {
Expand Down

0 comments on commit e29c90f

Please sign in to comment.