Skip to content

Commit

Permalink
[Hotfix][Connector-V2][Mongodb] Compatible with historical parameters (
Browse files Browse the repository at this point in the history
  • Loading branch information
MonsterChenzhuo authored and Jarvis committed Jul 13, 2023
1 parent 279e947 commit 4363fb6
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 3 deletions.
5 changes: 3 additions & 2 deletions docs/en/connector-v2/sink/MongoDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ The following table lists the field data type mapping from MongoDB BSON type to
| retry.interval | Duration | No | 1000 | Specifies the retry time interval if writing records to database failed, the unit is millisecond. |
| upsert-enable | Boolean | No | false | Whether to write documents via upsert mode. |
| primary-key | List | No | - | The primary keys for upsert/update. Keys are in `["id","name",...]` format for properties. |
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details |

**Tips**
### Tips

> 1.The data flushing logic of the MongoDB Sink Connector is jointly controlled by three parameters: `buffer-flush.max-rows`, `buffer-flush.interval`, and `checkpoint.interval`.
> 1.The data flushing logic of the MongoDB Sink Connector is jointly controlled by three parameters: `buffer-flush.max-rows`, `buffer-flush.interval`, and `checkpoint.interval`.<br/>
> Data flushing will be triggered if any of these conditions are met.<br/>
> 2.Compatible with the historical parameter `upsert-key`. If `upsert-key` is set, please do not set `primary-key`.<br/>
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/MongoDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ For specific types in MongoDB, we use Extended JSON format to map them to SeaTun
| fetch.size | Int | No | 2048 | Set the number of documents obtained from the server for each batch. Setting the appropriate batch size can improve query performance and avoid the memory pressure caused by obtaining a large amount of data at one time. |
| max.time-min | Long | No | 600 | This parameter is a MongoDB query option that limits the maximum execution time for query operations. The value of maxTimeMin is in Minute. If the execution time of the query exceeds the specified time limit, MongoDB will terminate the operation and return an error. |
| flat.sync-string | Boolean | No | true | By utilizing flatSyncString, only one field attribute value can be set, and the field type must be a String. This operation will perform a string mapping on a single MongoDB data entry. |
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details |

### Tips

> 1.The parameter `match.query` is compatible with the historical old version parameter `matchQuery`, and they are equivalent replacements.<br/>
## How to Create a MongoDB Data Synchronization Jobs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public class MongodbConfig {
Options.key("match.query")
.stringType()
.noDefaultValue()
.withDescription("Mongodb's query syntax.");
.withDescription("Mongodb's query syntax.")
.withFallbackKeys("matchQuery");

public static final Option<String> PROJECTION =
Options.key("match.projection")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.auto.service.AutoService;

import java.util.ArrayList;
import java.util.List;

import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;

Expand Down Expand Up @@ -111,6 +112,17 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
splitStrategyBuilder.setMatchQuery(
BsonDocument.parse(pluginConfig.getString(MongodbConfig.MATCH_QUERY.key())));
}

List<String> fallbackKeys = MongodbConfig.MATCH_QUERY.getFallbackKeys();
fallbackKeys.forEach(
key -> {
if (pluginConfig.hasPath(key)) {
splitStrategyBuilder.setMatchQuery(
BsonDocument.parse(
pluginConfig.getString(MongodbConfig.MATCH_QUERY.key())));
}
});

if (pluginConfig.hasPath(MongodbConfig.SPLIT_KEY.key())) {
splitStrategyBuilder.setSplitKey(pluginConfig.getString(MongodbConfig.SPLIT_KEY.key()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,38 @@ public void testMongodbSourceSplit(TestContainer container)
.collect(Collectors.toList()));
clearDate(MONGODB_SPLIT_RESULT_TABLE);
}

@TestTemplate
public void testCompatibleParameters(TestContainer container)
throws IOException, InterruptedException {
// `upsert-key` compatible test
Container.ExecResult insertResult =
container.executeJob("/updateIT/fake_source_to_updateMode_insert_mongodb.conf");
Assertions.assertEquals(0, insertResult.getExitCode(), insertResult.getStderr());

Container.ExecResult updateResult =
container.executeJob("/compatibleParametersIT/fake_source_to_update_mongodb.conf");
Assertions.assertEquals(0, updateResult.getExitCode(), updateResult.getStderr());

Container.ExecResult assertResult =
container.executeJob("/updateIT/update_mongodb_to_assert.conf");
Assertions.assertEquals(0, assertResult.getExitCode(), assertResult.getStderr());

clearDate(MONGODB_UPDATE_TABLE);

// `matchQuery` compatible test
Container.ExecResult queryResult =
container.executeJob("/matchIT/mongodb_matchQuery_source_to_assert.conf");
Assertions.assertEquals(0, queryResult.getExitCode(), queryResult.getStderr());

Assertions.assertIterableEquals(
TEST_MATCH_DATASET.stream()
.filter(x -> x.get("c_int").equals(2))
.peek(e -> e.remove("_id"))
.collect(Collectors.toList()),
readMongodbData(MONGODB_MATCH_RESULT_TABLE).stream()
.peek(e -> e.remove("_id"))
.collect(Collectors.toList()));
clearDate(MONGODB_MATCH_RESULT_TABLE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
execution.parallelism = 1
job.mode = "BATCH"
#spark config
spark.app.name = "SeaTunnel"
spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
}

source {
FakeSource {
row.num = 5
int.template = [2]
result_table_name = "mongodb_table"
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_int = int
c_bigint = bigint
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(33, 18)"
c_timestamp = timestamp
c_row = {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_int = int
c_bigint = bigint
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(33, 18)"
c_timestamp = timestamp
}
}
}
}
}

sink {
MongoDB {
uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true"
database = "test_db"
collection = "test_update_table"
upsert-enable = true
// compatible parameters
upsert-key = ["c_int"]
source_table_name = "mongodb_table"
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_int = int
c_bigint = bigint
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(33, 18)"
c_timestamp = timestamp
c_row = {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_int = int
c_bigint = bigint
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(33, 18)"
c_timestamp = timestamp
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
execution.parallelism = 1
job.mode = "BATCH"
#spark config
spark.app.name = "SeaTunnel"
spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
}

source {
MongoDB {
uri = "mongodb://e2e_mongodb:27017/test_db"
database = "test_db"
collection = "test_match_op_db"
result_table_name = "mongodb_table"
// compatible parameters
matchQuery = "{c_int: 2}"
cursor.no-timeout = true
fetch.size = 1000
max.time-min = 100
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_int = int
c_bigint = bigint
c_double = double
c_row = {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_int = int
c_bigint = bigint
c_double = double
}
}
}
}
}

sink {
Console {
source_table_name = "mongodb_table"
}
MongoDB {
uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true"
database = "test_db"
collection = "test_match_op_result_db"
source_table_name = "mongodb_table"
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_int = int
c_bigint = bigint
c_double = double
c_row = {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_int = int
c_bigint = bigint
c_double = double
}
}
}
}
}

0 comments on commit 4363fb6

Please sign in to comment.