Skip to content

Commit

Permalink
[Hotfix][Connector-V2][Mongodb] Fix document error content and remove…
Browse files Browse the repository at this point in the history
… redundant code (#4982)

Co-authored-by: chenzy15 <[email protected]>
  • Loading branch information
MonsterChenzhuo and chenzy15 committed Jun 28, 2023
1 parent b65f40c commit 526197a
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 81 deletions.
55 changes: 24 additions & 31 deletions docs/en/connector-v2/sink/MongoDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

> MongoDB Sink Connector
Support Those Engines
---------------------
## Support Those Engines

> Spark<br/>
> Flink<br/>
Expand All @@ -19,14 +18,12 @@ Key Features

> 1.If you want to use CDC-written features, recommend enable the upsert-enable configuration.
Description
-----------
## Description

The MongoDB Connector provides the ability to read and write data from and to MongoDB.
This document describes how to set up the MongoDB connector to run data writers against MongoDB.

Supported DataSource Info
-------------------------
## Supported DataSource Info

In order to use the Mongodb connector, the following dependencies are required.
They can be downloaded via install-plugin.sh or from the Maven central repository.
Expand All @@ -35,8 +32,7 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
|------------|--------------------|---------------------------------------------------------------------------------------------------------------|
| MongoDB | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-mongodb) |

Data Type Mapping
-----------------
## Data Type Mapping

The following table lists the field data type mapping from MongoDB BSON type to Seatunnel data type.

Expand All @@ -63,30 +59,28 @@ The following table lists the field data type mapping from MongoDB BSON type to
> 1.When using SeaTunnel to write Date and Timestamp types to MongoDB, both will produce a Date data type in MongoDB, but the precision will be different. The data generated by the SeaTunnel Date type has second-level precision, while the data generated by the SeaTunnel Timestamp type has millisecond-level precision.<br/>
> 2.When using the DECIMAL type in SeaTunnel, be aware that the maximum range cannot exceed 34 digits, which means you should use decimal(34, 18).<br/>
Sink Options
------------

| Name | Type | Required | Default | Description |
|-----------------------|----------|----------|---------|---------------------------------------------------------------------------------------------------|
| uri | String | Yes | - | The MongoDB connection uri. |
| database | String | Yes | - | The name of MongoDB database to read or write. |
| collection | String | Yes | - | The name of MongoDB collection to read or write. |
| schema | String | Yes | - | MongoDB's BSON and seatunnel data structure mapping |
| buffer-flush.max-rows | String | No | 1000 | Specifies the maximum number of buffered rows per batch request. |
| buffer-flush.interval | String | No | 30000 | Specifies the retry time interval if writing records to database failed, the unit is seconds. |
| retry.max | String | No | 3 | Specifies the max retry times if writing records to database failed. |
| retry.interval | Duration | No | 1000 | Specifies the retry time interval if writing records to database failed, the unit is millisecond. |
| upsert-enable | Boolean | No | false | Whether to write documents via upsert mode. |
| primary-key | List | No | - | The primary keys for upsert/update. Keys are in `["id","name",...]` format for properties. |
## Sink Options

| Name | Type | Required | Default | Description |
|-----------------------|----------|----------|---------|------------------------------------------------------------------------------------------------------------------------------|
| uri | String | Yes | - | The MongoDB standard connection uri. eg. mongodb://user:password@hosts:27017/database?readPreference=secondary&slaveOk=true. |
| database | String | Yes | - | The name of MongoDB database to read or write. |
| collection | String | Yes | - | The name of MongoDB collection to read or write. |
| schema | String | Yes | - | MongoDB's BSON and seatunnel data structure mapping. |
| buffer-flush.max-rows | String | No | 1000 | Specifies the maximum number of buffered rows per batch request. |
| buffer-flush.interval | String | No | 30000 | Specifies the maximum interval of buffered rows per batch request, the unit is millisecond. |
| retry.max | String | No | 3 | Specifies the max number of retry if writing records to database failed. |
| retry.interval | Duration | No | 1000 | Specifies the retry time interval if writing records to database failed, the unit is millisecond. |
| upsert-enable | Boolean | No | false | Whether to write documents via upsert mode. |
| primary-key | List | No | - | The primary keys for upsert/update. Keys are in `["id","name",...]` format for properties. |

**Tips**

> 1.The data flushing logic of the MongoDB Sink Connector is jointly controlled by three parameters: `buffer-flush.max-rows`, `buffer-flush.interval`, and `checkpoint.interval`.
> Data flushing will be triggered if any of these conditions are met.<br/>
> 2.Compatible with the historical parameter `upsert-key`. If `upsert-key` is set, please do not set `primary-key`.<br/>
How to Create a MongoDB Data Synchronization Jobs
-------------------------------------------------
## How to Create a MongoDB Data Synchronization Jobs

The following example demonstrates how to create a data synchronization job that writes randomly generated data to a MongoDB database:

Expand Down Expand Up @@ -128,10 +122,9 @@ sink {
}
```

Parameter Interpretation
------------------------
## Parameter Interpretation

**MongoDB Database Connection URI Examples**
### MongoDB Database Connection URI Examples

Unauthenticated single node connection:

Expand Down Expand Up @@ -171,7 +164,7 @@ mongodb://192.168.0.1:27017,192.168.0.2:27017,192.168.0.3:27017/mydb

Note: The username and password in the URI must be URL-encoded before being concatenated into the connection string.

**Buffer Flush**
### Buffer Flush

```bash
sink {
Expand All @@ -192,14 +185,14 @@ sink {
}
```

**Why is Not Recommended to Use Transactions for Operation?**
### Why is Not Recommended to Use Transactions for Operation?

Although MongoDB has fully supported multi-document transactions since version 4.2, it doesn't mean that everyone should use them recklessly.
Transactions are equivalent to locks, node coordination, additional overhead, and performance impact.
Instead, the principle for using transactions should be: avoid using them if possible.
The necessity for using transactions can be greatly avoided by designing systems rationally.

**Idempotent Writes**
### Idempotent Writes

By specifying a clear primary key and using the upsert method, exactly-once write semantics can be achieved.

Expand Down
40 changes: 16 additions & 24 deletions docs/en/connector-v2/source/MongoDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@

> MongoDB Source Connector
Support Those Engines
---------------------
## Support Those Engines

> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>
Key Features
------------
## Key Features

- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
Expand All @@ -19,14 +17,12 @@ Key Features
- [x] [parallelism](../../concept/connector-v2-features.md)
- [x] [support user-defined split](../../concept/connector-v2-features.md)

Description
-----------
## Description

The MongoDB Connector provides the ability to read and write data from and to MongoDB.
This document describes how to set up the MongoDB connector to run data reads against MongoDB.

Supported DataSource Info
-------------------------
## Supported DataSource Info

In order to use the Mongodb connector, the following dependencies are required.
They can be downloaded via install-plugin.sh or from the Maven central repository.
Expand All @@ -35,8 +31,7 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
|------------|--------------------|---------------------------------------------------------------------------------------------------------------|
| MongoDB | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-mongodb) |

Data Type Mapping
-----------------
## Data Type Mapping

The following table lists the field data type mapping from MongoDB BSON type to SeaTunnel data type.

Expand Down Expand Up @@ -68,26 +63,24 @@ For specific types in MongoDB, we use Extended JSON format to map them to SeaTun

> 1.When using the DECIMAL type in SeaTunnel, be aware that the maximum range cannot exceed 34 digits, which means you should use decimal(34, 18).<br/>
Source Options
--------------
## Source Options

| Name | Type | Required | Default | Description |
|----------------------|---------|----------|------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| uri | String | Yes | - | The MongoDB connection uri. |
| uri | String | Yes | - | The MongoDB standard connection uri. eg. mongodb://user:password@hosts:27017/database?readPreference=secondary&slaveOk=true. |
| database | String | Yes | - | The name of MongoDB database to read or write. |
| collection | String | Yes | - | The name of MongoDB collection to read or write. |
| schema | String | Yes | - | MongoDB's BSON and seatunnel data structure mapping |
| schema | String | Yes | - | MongoDB's BSON and seatunnel data structure mapping. |
| match.query | String | No | - | In MongoDB, filters are used to filter documents for query operations. |
| match.projection | String | No | - | In MongoDB, Projection is used to control the fields contained in the query results |
| match.projection | String | No | - | In MongoDB, Projection is used to control the fields contained in the query results. |
| partition.split-key | String | No | _id | The key of Mongodb fragmentation. |
| partition.split-size | Long | No | 64 * 1024 * 1024 | The size of Mongodb fragment. |
| cursor.no-timeout | Boolean | No | true | MongoDB server normally times out idle cursors after an inactivity period (10 minutes) to prevent excess memory use. Set this option to true to prevent that. However, if the application takes longer than 30 minutes to process the current batch of documents, the session is marked as expired and closed. |
| fetch.size | Int | No | 2048 | Set the number of documents obtained from the server for each batch. Setting the appropriate batch size can improve query performance and avoid the memory pressure caused by obtaining a large amount of data at one time. |
| max.time-min | Long | No | 600 | This parameter is a MongoDB query option that limits the maximum execution time for query operations. The value of maxTimeMin is in Minute. If the execution time of the query exceeds the specified time limit, MongoDB will terminate the operation and return an error. |
| flat.sync-string | Boolean | No | true | By utilizing flatSyncString, only one field attribute value can be set, and the field type must be a String. This operation will perform a string mapping on a single MongoDB data entry. |

How to Create a MongoDB Data Synchronization Jobs
-------------------------------------------------
## How to Create a MongoDB Data Synchronization Jobs

The following example demonstrates how to create a data synchronization job that reads data from MongoDB and prints it on the local client:

Expand Down Expand Up @@ -143,10 +136,9 @@ sink {
}
```

Parameter Interpretation
------------------------
## Parameter Interpretation

**MongoDB Database Connection URI Examples**
### MongoDB Database Connection URI Examples

Unauthenticated single node connection:

Expand Down Expand Up @@ -186,7 +178,7 @@ mongodb://192.168.0.1:27017,192.168.0.2:27017,192.168.0.3:27017/mydb

Note: The username and password in the URI must be URL-encoded before being concatenated into the connection string.

**MatchQuery Scan**
### MatchQuery Scan

In data synchronization scenarios, the matchQuery approach needs to be used early to reduce the number of documents that need to be processed by subsequent operators, thus improving performance.
Here is a simple example of a seatunnel using `match.query`
Expand Down Expand Up @@ -225,7 +217,7 @@ The following are examples of MatchQuery query statements of various data types:

Please refer to how to write the syntax of `match.query`https://www.mongodb.com/docs/manual/tutorial/query-documents

**Projection Scan**
### Projection Scan

In MongoDB, Projection is used to control which fields are included in the query results. This can be accomplished by specifying which fields need to be returned and which fields do not.
In the find() method, a projection object can be passed as a second argument. The key of the projection object indicates the fields to include or exclude, and a value of 1 indicates inclusion and 0 indicates exclusion.
Expand Down Expand Up @@ -256,7 +248,7 @@ source {

```
**Partitioned Scan**
### Partitioned Scan
To speed up reading data in parallel source task instances, seatunnel provides a partitioned scan feature for MongoDB collections. The following partitioning strategies are provided.
Users can control data sharding by setting the partition.split-key for sharding keys and partition.split-size for sharding size.
Expand All @@ -280,7 +272,7 @@ source {

```
**Flat Sync String**
### Flat Sync String
By utilizing `flat.sync-string`, only one field attribute value can be set, and the field type must be a String.
This operation will perform a string mapping on a single MongoDB data entry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,16 @@ public class MongodbConfig {
public static final Option<Long> BUFFER_FLUSH_INTERVAL =
Options.key("buffer-flush.interval")
.longType()
.defaultValue(30_000L)
.defaultValue(30000L)
.withDescription(
"Specifies the retry time interval if writing records to database failed.");
"Specifies the maximum interval of buffered rows per batch request, the unit is millisecond.");

public static final Option<Integer> RETRY_MAX =
Options.key("retry.max")
.intType()
.defaultValue(3)
.withDescription(
"Specifies the max retry times if writing records to database failed.");
"Specifies the max number of retry if writing records to database failed.");

public static final Option<Long> RETRY_INTERVAL =
Options.key("retry.interval")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ public class DocumentRowDataDeserializer implements DocumentDeserializer<SeaTunn

private final BsonToRowDataConverters bsonConverters;

private final Boolean flatSyncString;
private final boolean flatSyncString;

public DocumentRowDataDeserializer(
String[] fieldNames, SeaTunnelDataType<?> dataTypes, Boolean flatSyncString) {
String[] fieldNames, SeaTunnelDataType<?> dataTypes, boolean flatSyncString) {
if (fieldNames == null || fieldNames.length < 1) {
throw new MongodbConnectorException(ILLEGAL_ARGUMENT, "fieldName is empty");
}
Expand Down
Loading

0 comments on commit 526197a

Please sign in to comment.