diff --git a/docs/en/connector-v2/sink/MongoDB.md b/docs/en/connector-v2/sink/MongoDB.md index 464ecdeab6e..c4cbad95ef3 100644 --- a/docs/en/connector-v2/sink/MongoDB.md +++ b/docs/en/connector-v2/sink/MongoDB.md @@ -11,7 +11,7 @@ Key Features ------------ -- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [x] [exactly-once](../../concept/connector-v2-features.md) - [x] [cdc](../../concept/connector-v2-features.md) **Tips** @@ -73,6 +73,7 @@ The following table lists the field data type mapping from MongoDB BSON type to | retry.interval | Duration | No | 1000 | Specifies the retry time interval if writing records to database failed, the unit is millisecond. | | upsert-enable | Boolean | No | false | Whether to write documents via upsert mode. | | primary-key | List | No | - | The primary keys for upsert/update. Keys are in `["id","name",...]` format for properties. | +| transaction | Boolean | No | false | Whether to use transactions in MongoSink (requires MongoDB 4.2+). | | common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | ### Tips diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java index 132263125b8..848a120e270 100644 --- a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java @@ -150,4 +150,7 @@ public class MongodbConfig { .withDescription( "The primary keys for upsert/update. Keys are in csv format for properties.") .withFallbackKeys("upsert-key"); + + public static final Option TRANSACTION = + Options.key("transaction").booleanType().defaultValue(false).withDescription("."); } diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java index fa2c212c3d0..160aa966a04 100644 --- a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java @@ -20,25 +20,33 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.serialization.DefaultSerializer; +import org.apache.seatunnel.api.serialization.Serializer; import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; -import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig; import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataDocumentSerializer; import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters; +import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit.MongodbSinkAggregatedCommitter; +import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk; +import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbAggregatedCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo; import com.google.auto.service.AutoService; import java.util.List; +import java.util.Optional; import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY; @AutoService(SeaTunnelSink.class) -public class MongodbSink extends AbstractSimpleSink { +public class MongodbSink + implements SeaTunnelSink< + SeaTunnelRow, DocumentBulk, MongodbCommitInfo, MongodbAggregatedCommitInfo> { private MongodbWriterOptions options; @@ -89,6 +97,10 @@ public void prepare(Config pluginConfig) throws PrepareFailException { if (pluginConfig.hasPath(MongodbConfig.RETRY_INTERVAL.key())) { builder.withRetryInterval(pluginConfig.getLong(MongodbConfig.RETRY_INTERVAL.key())); } + + if (pluginConfig.hasPath(MongodbConfig.TRANSACTION.key())) { + builder.withTransaction(pluginConfig.getBoolean(MongodbConfig.TRANSACTION.key())); + } this.options = builder.build(); } } @@ -109,7 +121,8 @@ public SeaTunnelDataType getConsumedType() { } @Override - public AbstractSinkWriter createWriter(SinkWriter.Context context) { + public SinkWriter createWriter( + SinkWriter.Context context) { return new MongodbWriter( new RowDataDocumentSerializer( RowDataToBsonConverters.createConverter(seaTunnelRowType), @@ -118,4 +131,27 @@ public AbstractSinkWriter createWriter(SinkWriter.Context co options, context); } + + @Override + public Optional> getWriterStateSerializer() { + return options.transaction ? Optional.of(new DefaultSerializer<>()) : Optional.empty(); + } + + @Override + public Optional> + createAggregatedCommitter() { + return options.transaction + ? Optional.of(new MongodbSinkAggregatedCommitter(options)) + : Optional.empty(); + } + + @Override + public Optional> getAggregatedCommitInfoSerializer() { + return options.transaction ? Optional.of(new DefaultSerializer<>()) : Optional.empty(); + } + + @Override + public Optional> getCommitInfoSerializer() { + return options.transaction ? Optional.of(new DefaultSerializer<>()) : Optional.empty(); + } } diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java index 794c3bf04a3..0eb131d44ed 100644 --- a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java @@ -20,16 +20,19 @@ import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException; import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider; import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider; import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentSerializer; +import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk; +import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo; import org.bson.BsonDocument; import com.mongodb.MongoException; import com.mongodb.client.model.BulkWriteOptions; +import com.mongodb.client.model.InsertOneModel; +import com.mongodb.client.model.UpdateOneModel; import com.mongodb.client.model.WriteModel; import lombok.extern.slf4j.Slf4j; @@ -37,12 +40,14 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITER_OPERATION_FAILED; @Slf4j -public class MongodbWriter extends AbstractSinkWriter { +public class MongodbWriter implements SinkWriter { private MongodbClientProvider collectionProvider; @@ -60,6 +65,8 @@ public class MongodbWriter extends AbstractSinkWriter { private volatile long lastSendTime = 0L; + private boolean transaction; + // TODOļ¼šReserve parameters. private final SinkWriter.Context context; @@ -84,27 +91,66 @@ private void initOptions(MongodbWriterOptions options) { .build(); this.bulkActions = options.getFlushSize(); this.batchIntervalMs = options.getBatchIntervalMs(); + this.transaction = options.transaction; } @Override public void write(SeaTunnelRow o) { if (o.getRowKind() != RowKind.UPDATE_BEFORE) { bulkRequests.add(serializer.serializeToWriteModel(o)); - if (isOverMaxBatchSizeLimit() || isOverMaxBatchIntervalLimit()) { + if (!transaction && (isOverMaxBatchSizeLimit() || isOverMaxBatchIntervalLimit())) { doBulkWrite(); } } } - @Override - public Optional prepareCommit() { - doBulkWrite(); - return Optional.empty(); + public Optional prepareCommit() { + if (!transaction) { + doBulkWrite(); + return Optional.empty(); + } + + List bsonDocuments = new ArrayList<>(); + AtomicInteger counter = new AtomicInteger(); + + bulkRequests.stream() + .map(this::convertModelToBsonDocument) + .collect( + Collectors.groupingBy( + it -> counter.getAndIncrement() / DocumentBulk.BUFFER_SIZE)) + .values() + .stream() + .map(this::convertBsonDocumentListToDocumentBulk) + .forEach(bsonDocuments::add); + + bulkRequests.clear(); + + return Optional.of(new MongodbCommitInfo(bsonDocuments)); + } + + private BsonDocument convertModelToBsonDocument(WriteModel model) { + if (model instanceof InsertOneModel) { + return ((InsertOneModel) model).getDocument(); + } else if (model instanceof UpdateOneModel) { + return (BsonDocument) ((UpdateOneModel) model).getUpdate(); + } + return null; + } + + private DocumentBulk convertBsonDocumentListToDocumentBulk(List documentList) { + DocumentBulk documentBulk = new DocumentBulk(); + documentList.forEach(documentBulk::add); + return documentBulk; } + @Override + public void abortPrepare() {} + @Override public void close() { - doBulkWrite(); + if (!transaction) { + doBulkWrite(); + } if (collectionProvider != null) { collectionProvider.close(); } diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java index be8becd3275..e9b82647756 100644 --- a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java @@ -44,16 +44,19 @@ public class MongodbWriterOptions implements Serializable { protected final long retryInterval; + protected final boolean transaction; + public MongodbWriterOptions( String connectString, String database, String collection, int flushSize, - Long batchIntervalMs, + long batchIntervalMs, boolean upsertEnable, String[] primaryKey, int retryMax, - Long retryInterval) { + long retryInterval, + boolean transaction) { this.connectString = connectString; this.database = database; this.collection = collection; @@ -63,6 +66,7 @@ public MongodbWriterOptions( this.primaryKey = primaryKey; this.retryMax = retryMax; this.retryInterval = retryInterval; + this.transaction = transaction; } public static Builder builder() { @@ -89,6 +93,8 @@ public static class Builder { protected long retryInterval; + protected boolean transaction; + public Builder withConnectString(String connectString) { this.connectString = connectString; return this; @@ -134,6 +140,11 @@ public Builder withRetryInterval(Long retryInterval) { return this; } + public Builder withTransaction(boolean transaction) { + this.transaction = transaction; + return this; + } + public MongodbWriterOptions build() { return new MongodbWriterOptions( connectString, @@ -144,7 +155,8 @@ public MongodbWriterOptions build() { upsertEnable, primaryKey, retryMax, - retryInterval); + retryInterval, + transaction); } } } diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/CommittableTransaction.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/CommittableTransaction.java new file mode 100644 index 00000000000..42b61edbe1a --- /dev/null +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/CommittableTransaction.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit; + +import org.bson.BsonDocument; + +import com.mongodb.client.MongoCollection; +import com.mongodb.client.TransactionBody; +import com.mongodb.client.result.InsertManyResult; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class CommittableTransaction implements TransactionBody, Serializable { + + private static final int BUFFER_INIT_SIZE = 1024; + + protected final MongoCollection collection; + + protected List bufferedDocuments = new ArrayList<>(BUFFER_INIT_SIZE); + + public CommittableTransaction( + MongoCollection collection, List documents) { + this.collection = collection; + this.bufferedDocuments.addAll(documents); + } + + @Override + public Integer execute() { + InsertManyResult result = collection.insertMany(bufferedDocuments); + return result.getInsertedIds().size(); + } +} diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/CommittableUpsertTransaction.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/CommittableUpsertTransaction.java new file mode 100644 index 00000000000..1fa3669e969 --- /dev/null +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/CommittableUpsertTransaction.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit; + +import org.bson.BsonDocument; +import org.bson.conversions.Bson; + +import com.mongodb.bulk.BulkWriteResult; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.BulkWriteOptions; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.UpdateOneModel; +import com.mongodb.client.model.UpdateOptions; + +import java.util.ArrayList; +import java.util.List; + +public class CommittableUpsertTransaction extends CommittableTransaction { + + private final String[] upsertKeys; + private final UpdateOptions updateOptions = new UpdateOptions(); + private final BulkWriteOptions bulkWriteOptions = new BulkWriteOptions(); + + public CommittableUpsertTransaction( + MongoCollection collection, + List documents, + String[] upsertKeys) { + super(collection, documents); + this.upsertKeys = upsertKeys; + updateOptions.upsert(true); + bulkWriteOptions.ordered(true); + } + + @Override + public Integer execute() { + List> upserts = new ArrayList<>(); + for (BsonDocument document : bufferedDocuments) { + List filters = new ArrayList<>(upsertKeys.length); + for (String upsertKey : upsertKeys) { + Object o = document.get("$set").asDocument().get(upsertKey); + Bson eq = Filters.eq(upsertKey, o); + filters.add(eq); + } + Bson filter = Filters.and(filters); + UpdateOneModel updateOneModel = + new UpdateOneModel<>(filter, document, updateOptions); + upserts.add(updateOneModel); + } + + BulkWriteResult bulkWriteResult = collection.bulkWrite(upserts, bulkWriteOptions); + return bulkWriteResult.getUpserts().size() + bulkWriteResult.getInsertedCount(); + } +} diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/MongodbSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/MongodbSinkAggregatedCommitter.java new file mode 100644 index 00000000000..0ee73a3012d --- /dev/null +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/MongodbSinkAggregatedCommitter.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit; + +import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; +import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider; +import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider; +import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbWriterOptions; +import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk; +import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbAggregatedCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo; + +import org.bson.BsonDocument; + +import com.mongodb.ReadConcern; +import com.mongodb.ReadPreference; +import com.mongodb.TransactionOptions; +import com.mongodb.WriteConcern; +import com.mongodb.client.ClientSession; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCollection; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Slf4j +public class MongodbSinkAggregatedCommitter + implements SinkAggregatedCommitter { + + private static final long waitingTime = 5_000L; + + private static final long TRANSACTION_TIMEOUT_MS = 60_000L; + + private final boolean enableUpsert; + + private final String[] upsertKeys; + + private final MongodbClientProvider collectionProvider; + + private ClientSession clientSession; + + private MongoClient client; + + public MongodbSinkAggregatedCommitter(MongodbWriterOptions options) { + this.enableUpsert = options.isUpsertEnable(); + this.upsertKeys = options.getPrimaryKey(); + this.collectionProvider = + MongodbCollectionProvider.builder() + .connectionString(options.getConnectString()) + .database(options.getDatabase()) + .collection(options.getCollection()) + .build(); + } + + @Override + public List commit( + List aggregatedCommitInfo) { + return aggregatedCommitInfo.stream() + .map(this::processAggregatedCommitInfo) + .filter( + failedAggregatedCommitInfo -> + !failedAggregatedCommitInfo.getCommitInfos().isEmpty()) + .collect(Collectors.toList()); + } + + private MongodbAggregatedCommitInfo processAggregatedCommitInfo( + MongodbAggregatedCommitInfo aggregatedCommitInfo) { + List failedCommitInfos = + aggregatedCommitInfo.getCommitInfos().stream() + .flatMap( + (Function>>) + this::processCommitInfo) + .filter(failedDocumentBulks -> !failedDocumentBulks.isEmpty()) + .map(MongodbCommitInfo::new) + .collect(Collectors.toList()); + + return new MongodbAggregatedCommitInfo(failedCommitInfos); + } + + private Stream> processCommitInfo(MongodbCommitInfo commitInfo) { + client = collectionProvider.getClient(); + clientSession = client.startSession(); + MongoCollection collection = collectionProvider.getDefaultCollection(); + return Stream.of( + commitInfo.getDocumentBulks().stream() + .filter(bulk -> !bulk.getDocuments().isEmpty()) + .filter( + bulk -> { + try { + CommittableTransaction transaction; + if (enableUpsert) { + transaction = + new CommittableUpsertTransaction( + collection, + bulk.getDocuments(), + upsertKeys); + } else { + transaction = + new CommittableTransaction( + collection, bulk.getDocuments()); + } + + int insertedDocs = + clientSession.withTransaction( + transaction, + TransactionOptions.builder() + .readPreference( + ReadPreference.primary()) + .readConcern(ReadConcern.LOCAL) + .writeConcern(WriteConcern.MAJORITY) + .build()); + log.info( + "Inserted {} documents into collection {}.", + insertedDocs, + collection.getNamespace()); + return false; + } catch (Exception e) { + log.error("Failed to commit with Mongo transaction.", e); + return true; + } + }) + .collect(Collectors.toList())); + } + + @Override + public MongodbAggregatedCommitInfo combine(List commitInfos) { + return new MongodbAggregatedCommitInfo(commitInfos); + } + + @Override + public void abort(List aggregatedCommitInfo) {} + + @SneakyThrows + @Override + public void close() { + long deadline = System.currentTimeMillis() + TRANSACTION_TIMEOUT_MS; + while (clientSession.hasActiveTransaction() && System.currentTimeMillis() < deadline) { + // wait for active transaction to finish or timeout + Thread.sleep(waitingTime); + } + if (clientSession != null) { + clientSession.close(); + } + if (client != null) { + client.close(); + } + } +} diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/DocumentBulk.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/DocumentBulk.java new file mode 100644 index 00000000000..72a3d105383 --- /dev/null +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/DocumentBulk.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state; + +import org.bson.BsonDocument; + +import lombok.EqualsAndHashCode; +import lombok.ToString; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** + * DocumentBulk is buffered {@link BsonDocument} in memory, which would be written to MongoDB in a + * single transaction. Due to execution efficiency, each DocumentBulk maybe be limited to a maximum + * size, typically 1,000 documents. But for the transactional mode, the maximum size should not be + * respected because all that data must be written in one transaction. + */ +@ToString +@EqualsAndHashCode +public class DocumentBulk implements Serializable { + + public static final int BUFFER_SIZE = 1024; + + private final List bufferedDocuments; + + public DocumentBulk() { + bufferedDocuments = new ArrayList<>(BUFFER_SIZE); + } + + public void add(BsonDocument document) { + if (bufferedDocuments.size() == BUFFER_SIZE) { + throw new IllegalStateException("DocumentBulk is already full"); + } + bufferedDocuments.add(document); + } + + public int size() { + return bufferedDocuments.size(); + } + + public List getDocuments() { + return bufferedDocuments; + } +} diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/MongodbAggregatedCommitInfo.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/MongodbAggregatedCommitInfo.java new file mode 100644 index 00000000000..6b97d616af0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/MongodbAggregatedCommitInfo.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +@AllArgsConstructor +public class MongodbAggregatedCommitInfo implements Serializable { + List commitInfos; +} diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/MongodbCommitInfo.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/MongodbCommitInfo.java new file mode 100644 index 00000000000..052cd4c5a87 --- /dev/null +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/MongodbCommitInfo.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +@AllArgsConstructor +public class MongodbCommitInfo implements Serializable { + List documentBulks; +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java index 4c85c0d097e..5dbe7cf3479 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java @@ -82,6 +82,11 @@ public abstract class AbstractMongodbIT extends TestSuiteBase implements TestRes protected static final String MONGODB_CDC_RESULT_TABLE = "test_cdc_table"; + protected static final String MONGODB_TRANSACTION_SINK_TABLE = + "test_source_transaction_sink_table"; + protected static final String MONGODB_TRANSACTION_UPSERT_TABLE = + "test_source_upsert_transaction_table"; + protected GenericContainer mongodbContainer; protected MongoClient client; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java index bc6f8840001..fb643455a6e 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java @@ -174,4 +174,31 @@ public void testCompatibleParameters(TestContainer container) .collect(Collectors.toList())); clearDate(MONGODB_MATCH_RESULT_TABLE); } + + @TestTemplate + public void testTransactionSinkAndUpsert(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult insertResult = + container.executeJob("/transactionIT/fake_source_to_transaction_sink_mongodb.conf"); + Assertions.assertEquals(0, insertResult.getExitCode(), insertResult.getStderr()); + + Container.ExecResult assertSinkResult = + container.executeJob( + "/transactionIT/mongodb_source_transaction_sink_to_assert.conf"); + Assertions.assertEquals(0, assertSinkResult.getExitCode(), assertSinkResult.getStderr()); + + Container.ExecResult upsertResult = + container.executeJob( + "/transactionIT/fake_source_to_transaction_upsert_mongodb.conf"); + Assertions.assertEquals(0, upsertResult.getExitCode(), upsertResult.getStderr()); + + Container.ExecResult assertUpsertResult = + container.executeJob( + "/transactionIT/mongodb_source_transaction_upsert_to_assert.conf"); + Assertions.assertEquals( + 0, assertUpsertResult.getExitCode(), assertUpsertResult.getStderr()); + + clearDate(MONGODB_TRANSACTION_SINK_TABLE); + clearDate(MONGODB_TRANSACTION_UPSERT_TABLE); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_sink_mongodb.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_sink_mongodb.conf new file mode 100644 index 00000000000..67947eb956c --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_sink_mongodb.conf @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + row.num = 50 + int.template = [3] + split.num = 5 + split.read-interval = 100 + result_table_name = "mongodb_table" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + } + } + } + } +} + +sink { + MongoDB { + uri = "mongodb://e2e_mongodb:27017" + database = "test_db" + collection = "test_source_transaction_sink_table" + transaction = true + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + } + } + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_upsert_mongodb.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_upsert_mongodb.conf new file mode 100644 index 00000000000..53a98fe28a4 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_upsert_mongodb.conf @@ -0,0 +1,104 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + row.num = 50 + int.template = [2] + split.num = 5 + split.read-interval = 100 + result_table_name = "mongodb_table" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + } + } + } + } +} + +sink { + MongoDB { + uri = "mongodb://e2e_mongodb:27017" + database = "test_db" + collection = "test_source_upsert_transaction_table" + transaction = true + upsert-enable = true + primary-key = ["c_int"] + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + } + } + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/mongodb_source_transaction_sink_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/mongodb_source_transaction_sink_to_assert.conf new file mode 100644 index 00000000000..f453ff5dfef --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/mongodb_source_transaction_sink_to_assert.conf @@ -0,0 +1,115 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + MongoDB { + uri = "mongodb://e2e_mongodb:27017/test_db" + database = "test_db" + collection = "test_source_transaction_sink_table" + cursor.no-timeout = true + result_table_name = "mongodb_table" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + } + } + } + } +} + +sink { + Console { + source_table_name = "mongodb_table" + } + Assert { + source_table_name = "mongodb_table" + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 50 + }, + { + rule_type = MIN_ROW + rule_value = 50 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/mongodb_source_transaction_upsert_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/mongodb_source_transaction_upsert_to_assert.conf new file mode 100644 index 00000000000..0a5f8e5e1e0 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/mongodb_source_transaction_upsert_to_assert.conf @@ -0,0 +1,115 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + MongoDB { + uri = "mongodb://e2e_mongodb:27017/test_db" + database = "test_db" + collection = "test_source_upsert_transaction_table" + cursor.no-timeout = true + result_table_name = "mongodb_table" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + } + } + } + } +} + +sink { + Console { + source_table_name = "mongodb_table" + } + Assert { + source_table_name = "mongodb_table" + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 1 + }, + { + rule_type = MIN_ROW + rule_value = 1 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java index 9dcc94f8058..dc819c6ad7e 100644 --- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java +++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java @@ -283,7 +283,9 @@ public synchronized void deleteCheckpoint(String jobId, String pipelineId, Strin if (pipelineId.equals(getPipelineIdByFileName(fileName)) && checkpointId.equals(getCheckpointIdByFileName(fileName))) { try { - fs.delete(new Path(fileName), false); + fs.delete( + new Path(path + DEFAULT_CHECKPOINT_FILE_PATH_SPLIT + fileName), + false); } catch (Exception e) { log.error( "Failed to delete checkpoint {} for job {}, pipeline {}", @@ -311,7 +313,9 @@ public void deleteCheckpoint(String jobId, String pipelineId, List check if (pipelineId.equals(getPipelineIdByFileName(fileName)) && checkpointIdList.contains(checkpointIdByFileName)) { try { - fs.delete(new Path(fileName), false); + fs.delete( + new Path(path + DEFAULT_CHECKPOINT_FILE_PATH_SPLIT + fileName), + false); } catch (Exception e) { log.error( "Failed to delete checkpoint {} for job {}, pipeline {}",