Skip to content

Commit

Permalink
[Improve][Connector-v2][Mongodb]sink support transaction update/writi…
Browse files Browse the repository at this point in the history
…ng (#5034)
  • Loading branch information
MonsterChenzhuo authored Jul 10, 2023
1 parent d3462ec commit b1203c9
Show file tree
Hide file tree
Showing 17 changed files with 987 additions and 16 deletions.
3 changes: 2 additions & 1 deletion docs/en/connector-v2/sink/MongoDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
Key Features
------------

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [x] [cdc](../../concept/connector-v2-features.md)

**Tips**
Expand Down Expand Up @@ -73,6 +73,7 @@ The following table lists the field data type mapping from MongoDB BSON type to
| retry.interval | Duration | No | 1000 | Specifies the retry time interval if writing records to database failed, the unit is millisecond. |
| upsert-enable | Boolean | No | false | Whether to write documents via upsert mode. |
| primary-key | List | No | - | The primary keys for upsert/update. Keys are in `["id","name",...]` format for properties. |
| transaction | Boolean | No | false | Whether to use transactions in MongoSink (requires MongoDB 4.2+). |
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details |

### Tips
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,7 @@ public class MongodbConfig {
.withDescription(
"The primary keys for upsert/update. Keys are in csv format for properties.")
.withFallbackKeys("upsert-key");

public static final Option<Boolean> TRANSACTION =
Options.key("transaction").booleanType().defaultValue(false).withDescription(".");
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,33 @@
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataDocumentSerializer;
import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit.MongodbSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo;

import com.google.auto.service.AutoService;

import java.util.List;
import java.util.Optional;

import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;

@AutoService(SeaTunnelSink.class)
public class MongodbSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
public class MongodbSink
implements SeaTunnelSink<
SeaTunnelRow, DocumentBulk, MongodbCommitInfo, MongodbAggregatedCommitInfo> {

private MongodbWriterOptions options;

Expand Down Expand Up @@ -89,6 +97,10 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
if (pluginConfig.hasPath(MongodbConfig.RETRY_INTERVAL.key())) {
builder.withRetryInterval(pluginConfig.getLong(MongodbConfig.RETRY_INTERVAL.key()));
}

if (pluginConfig.hasPath(MongodbConfig.TRANSACTION.key())) {
builder.withTransaction(pluginConfig.getBoolean(MongodbConfig.TRANSACTION.key()));
}
this.options = builder.build();
}
}
Expand All @@ -109,7 +121,8 @@ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
}

@Override
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) {
public SinkWriter<SeaTunnelRow, MongodbCommitInfo, DocumentBulk> createWriter(
SinkWriter.Context context) {
return new MongodbWriter(
new RowDataDocumentSerializer(
RowDataToBsonConverters.createConverter(seaTunnelRowType),
Expand All @@ -118,4 +131,27 @@ public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context co
options,
context);
}

@Override
public Optional<Serializer<DocumentBulk>> getWriterStateSerializer() {
return options.transaction ? Optional.of(new DefaultSerializer<>()) : Optional.empty();
}

@Override
public Optional<SinkAggregatedCommitter<MongodbCommitInfo, MongodbAggregatedCommitInfo>>
createAggregatedCommitter() {
return options.transaction
? Optional.of(new MongodbSinkAggregatedCommitter(options))
: Optional.empty();
}

@Override
public Optional<Serializer<MongodbAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() {
return options.transaction ? Optional.of(new DefaultSerializer<>()) : Optional.empty();
}

@Override
public Optional<Serializer<MongodbCommitInfo>> getCommitInfoSerializer() {
return options.transaction ? Optional.of(new DefaultSerializer<>()) : Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,34 @@
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentSerializer;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo;

import org.bson.BsonDocument;

import com.mongodb.MongoException;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.WriteModel;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITER_OPERATION_FAILED;

@Slf4j
public class MongodbWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
public class MongodbWriter implements SinkWriter<SeaTunnelRow, MongodbCommitInfo, DocumentBulk> {

private MongodbClientProvider collectionProvider;

Expand All @@ -60,6 +65,8 @@ public class MongodbWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {

private volatile long lastSendTime = 0L;

private boolean transaction;

// TODO:Reserve parameters.
private final SinkWriter.Context context;

Expand All @@ -84,27 +91,66 @@ private void initOptions(MongodbWriterOptions options) {
.build();
this.bulkActions = options.getFlushSize();
this.batchIntervalMs = options.getBatchIntervalMs();
this.transaction = options.transaction;
}

@Override
public void write(SeaTunnelRow o) {
if (o.getRowKind() != RowKind.UPDATE_BEFORE) {
bulkRequests.add(serializer.serializeToWriteModel(o));
if (isOverMaxBatchSizeLimit() || isOverMaxBatchIntervalLimit()) {
if (!transaction && (isOverMaxBatchSizeLimit() || isOverMaxBatchIntervalLimit())) {
doBulkWrite();
}
}
}

@Override
public Optional<Void> prepareCommit() {
doBulkWrite();
return Optional.empty();
public Optional<MongodbCommitInfo> prepareCommit() {
if (!transaction) {
doBulkWrite();
return Optional.empty();
}

List<DocumentBulk> bsonDocuments = new ArrayList<>();
AtomicInteger counter = new AtomicInteger();

bulkRequests.stream()
.map(this::convertModelToBsonDocument)
.collect(
Collectors.groupingBy(
it -> counter.getAndIncrement() / DocumentBulk.BUFFER_SIZE))
.values()
.stream()
.map(this::convertBsonDocumentListToDocumentBulk)
.forEach(bsonDocuments::add);

bulkRequests.clear();

return Optional.of(new MongodbCommitInfo(bsonDocuments));
}

private BsonDocument convertModelToBsonDocument(WriteModel<BsonDocument> model) {
if (model instanceof InsertOneModel) {
return ((InsertOneModel<BsonDocument>) model).getDocument();
} else if (model instanceof UpdateOneModel) {
return (BsonDocument) ((UpdateOneModel<BsonDocument>) model).getUpdate();
}
return null;
}

private DocumentBulk convertBsonDocumentListToDocumentBulk(List<BsonDocument> documentList) {
DocumentBulk documentBulk = new DocumentBulk();
documentList.forEach(documentBulk::add);
return documentBulk;
}

@Override
public void abortPrepare() {}

@Override
public void close() {
doBulkWrite();
if (!transaction) {
doBulkWrite();
}
if (collectionProvider != null) {
collectionProvider.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,19 @@ public class MongodbWriterOptions implements Serializable {

protected final long retryInterval;

protected final boolean transaction;

public MongodbWriterOptions(
String connectString,
String database,
String collection,
int flushSize,
Long batchIntervalMs,
long batchIntervalMs,
boolean upsertEnable,
String[] primaryKey,
int retryMax,
Long retryInterval) {
long retryInterval,
boolean transaction) {
this.connectString = connectString;
this.database = database;
this.collection = collection;
Expand All @@ -63,6 +66,7 @@ public MongodbWriterOptions(
this.primaryKey = primaryKey;
this.retryMax = retryMax;
this.retryInterval = retryInterval;
this.transaction = transaction;
}

public static Builder builder() {
Expand All @@ -89,6 +93,8 @@ public static class Builder {

protected long retryInterval;

protected boolean transaction;

public Builder withConnectString(String connectString) {
this.connectString = connectString;
return this;
Expand Down Expand Up @@ -134,6 +140,11 @@ public Builder withRetryInterval(Long retryInterval) {
return this;
}

public Builder withTransaction(boolean transaction) {
this.transaction = transaction;
return this;
}

public MongodbWriterOptions build() {
return new MongodbWriterOptions(
connectString,
Expand All @@ -144,7 +155,8 @@ public MongodbWriterOptions build() {
upsertEnable,
primaryKey,
retryMax,
retryInterval);
retryInterval,
transaction);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit;

import org.bson.BsonDocument;

import com.mongodb.client.MongoCollection;
import com.mongodb.client.TransactionBody;
import com.mongodb.client.result.InsertManyResult;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class CommittableTransaction implements TransactionBody<Integer>, Serializable {

private static final int BUFFER_INIT_SIZE = 1024;

protected final MongoCollection<BsonDocument> collection;

protected List<BsonDocument> bufferedDocuments = new ArrayList<>(BUFFER_INIT_SIZE);

public CommittableTransaction(
MongoCollection<BsonDocument> collection, List<BsonDocument> documents) {
this.collection = collection;
this.bufferedDocuments.addAll(documents);
}

@Override
public Integer execute() {
InsertManyResult result = collection.insertMany(bufferedDocuments);
return result.getInsertedIds().size();
}
}
Loading

0 comments on commit b1203c9

Please sign in to comment.