diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java index 11ef57ffc5f..25e463c17e5 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/dialect/MongodbDialect.java @@ -34,6 +34,7 @@ import com.mongodb.client.MongoClient; import io.debezium.relational.TableId; +import lombok.extern.slf4j.Slf4j; import javax.annotation.Nonnull; @@ -52,6 +53,7 @@ import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.getCurrentClusterTime; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.getLatestResumeToken; +@Slf4j public class MongodbDialect implements DataSourceDialect { private final Map cache = @@ -137,6 +139,11 @@ public ChangeStreamOffset displayCurrentOffset(MongodbSourceConfig sourceConfig) ChangeStreamOffset changeStreamOffset; if (startupResumeToken != null) { changeStreamOffset = new ChangeStreamOffset(startupResumeToken); + log.info( + "startup resume token={},change stream offset={}", + startupResumeToken, + changeStreamOffset); + } else { changeStreamOffset = new ChangeStreamOffset(getCurrentClusterTime(mongoClient)); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java index 3ddd2ccbb21..5ee8962bc53 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/ResumeToken.java @@ -17,8 +17,6 @@ package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils; -import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException; - import org.bson.BsonDocument; import org.bson.BsonTimestamp; import org.bson.BsonValue; @@ -29,41 +27,33 @@ import java.nio.ByteOrder; import java.util.Objects; -import static org.apache.seatunnel.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT; - public class ResumeToken { private static final int K_TIMESTAMP = 130; - public static @Nonnull BsonTimestamp decodeTimestamp(BsonDocument resumeToken) { - Objects.requireNonNull(resumeToken, "Missing ResumeToken."); - BsonValue bsonValue = resumeToken.get("_data"); - byte[] keyStringBytes = extractKeyStringBytes(bsonValue); - validateKeyType(keyStringBytes); - - ByteBuffer buffer = ByteBuffer.wrap(keyStringBytes).order(ByteOrder.BIG_ENDIAN); - int t = buffer.getInt(); - int i = buffer.getInt(); - return new BsonTimestamp(t, i); - } - - private static byte[] extractKeyStringBytes(@Nonnull BsonValue bsonValue) { - if (bsonValue.isBinary()) { - return bsonValue.asBinary().getData(); - } else if (bsonValue.isString()) { - return hexToUint8Array(bsonValue.asString().getValue()); + public static BsonTimestamp decodeTimestamp(BsonDocument resumeToken) { + BsonValue bsonValue = + Objects.requireNonNull(resumeToken, "Missing ResumeToken.").get("_data"); + final byte[] keyStringBytes; + // Resume Tokens format: https://www.mongodb.com/docs/manual/changeStreams/#resume-tokens + if (bsonValue.isBinary()) { // BinData + keyStringBytes = bsonValue.asBinary().getData(); + } else if (bsonValue.isString()) { // Hex-encoded string (v0 or v1) + keyStringBytes = hexToUint8Array(bsonValue.asString().getValue()); } else { - throw new MongodbConnectorException( - ILLEGAL_ARGUMENT, "Unknown resume token format: " + bsonValue); + throw new IllegalArgumentException( + "Unknown resume token format: " + resumeToken.toJson()); } - } - private static void validateKeyType(byte[] keyStringBytes) { - int kType = keyStringBytes[0] & 0xff; + ByteBuffer buffer = ByteBuffer.wrap(keyStringBytes).order(ByteOrder.BIG_ENDIAN); + int kType = buffer.get() & 0xff; if (kType != K_TIMESTAMP) { - throw new MongodbConnectorException( - ILLEGAL_ARGUMENT, "Unknown keyType of timestamp: " + kType); + throw new IllegalArgumentException("Unknown keyType of timestamp: " + kType); } + + int t = buffer.getInt(); + int i = buffer.getInt(); + return new BsonTimestamp(t, i); } private static byte[] hexToUint8Array(@Nonnull String str) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongoDBContainer.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongoDBContainer.java index c33f6d047d4..a311bccc90b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongoDBContainer.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongoDBContainer.java @@ -84,6 +84,7 @@ public MongoDBContainer(Network network, ShardingClusterRole clusterRole) { withExposedPorts(MONGODB_PORT); withCommand(ShardingClusterRole.startupCommand(clusterRole)); waitingFor(clusterRole.waitStrategy); + withEnv("TZ", "Asia/Shanghai"); } public void executeCommand(String command) {