diff --git a/docs/en/connector-v2/sink/MongoDB.md b/docs/en/connector-v2/sink/MongoDB.md index 40355583cb1..3c772f41ee6 100644 --- a/docs/en/connector-v2/sink/MongoDB.md +++ b/docs/en/connector-v2/sink/MongoDB.md @@ -73,6 +73,7 @@ The following table lists the field data type mapping from MongoDB BSON type to | retry.interval | Duration | No | 1000 | Specifies the retry time interval if writing records to database failed, the unit is millisecond. | | upsert-enable | Boolean | No | false | Whether to write documents via upsert mode. | | primary-key | List | No | - | The primary keys for upsert/update. Keys are in `["id","name",...]` format for properties. | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | **Tips** diff --git a/docs/en/connector-v2/source/MongoDB.md b/docs/en/connector-v2/source/MongoDB.md index 14f283afb43..fd8603bb4f6 100644 --- a/docs/en/connector-v2/source/MongoDB.md +++ b/docs/en/connector-v2/source/MongoDB.md @@ -79,6 +79,10 @@ For specific types in MongoDB, we use Extended JSON format to map them to SeaTun | fetch.size | Int | No | 2048 | Set the number of documents obtained from the server for each batch. Setting the appropriate batch size can improve query performance and avoid the memory pressure caused by obtaining a large amount of data at one time. | | max.time-min | Long | No | 600 | This parameter is a MongoDB query option that limits the maximum execution time for query operations. The value of maxTimeMin is in Minute. If the execution time of the query exceeds the specified time limit, MongoDB will terminate the operation and return an error. | | flat.sync-string | Boolean | No | true | By utilizing flatSyncString, only one field attribute value can be set, and the field type must be a String. This operation will perform a string mapping on a single MongoDB data entry. | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | + +tips: +1.The `match.query` parameter is compatible with the old version`mactQuery' writing. ## How to Create a MongoDB Data Synchronization Jobs diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java index 1ba9ad70cc4..132263125b8 100644 --- a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java @@ -56,7 +56,8 @@ public class MongodbConfig { Options.key("match.query") .stringType() .noDefaultValue() - .withDescription("Mongodb's query syntax."); + .withDescription("Mongodb's query syntax.") + .withFallbackKeys("matchQuery"); public static final Option PROJECTION = Options.key("match.projection") diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java index e68796faa9e..d611a9bd53b 100644 --- a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java @@ -46,6 +46,7 @@ import com.google.auto.service.AutoService; import java.util.ArrayList; +import java.util.List; import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY; @@ -111,6 +112,17 @@ public void prepare(Config pluginConfig) throws PrepareFailException { splitStrategyBuilder.setMatchQuery( BsonDocument.parse(pluginConfig.getString(MongodbConfig.MATCH_QUERY.key()))); } + + List fallbackKeys = MongodbConfig.MATCH_QUERY.getFallbackKeys(); + fallbackKeys.forEach( + key -> { + if (pluginConfig.hasPath(key)) { + splitStrategyBuilder.setMatchQuery( + BsonDocument.parse( + pluginConfig.getString(MongodbConfig.MATCH_QUERY.key()))); + } + }); + if (pluginConfig.hasPath(MongodbConfig.SPLIT_KEY.key())) { splitStrategyBuilder.setSplitKey(pluginConfig.getString(MongodbConfig.SPLIT_KEY.key())); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java index ce25b2062b6..bc6f8840001 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java @@ -140,4 +140,38 @@ public void testMongodbSourceSplit(TestContainer container) .collect(Collectors.toList())); clearDate(MONGODB_SPLIT_RESULT_TABLE); } + + @TestTemplate + public void testCompatibleParameters(TestContainer container) + throws IOException, InterruptedException { + // `upsert-key` compatible test + Container.ExecResult insertResult = + container.executeJob("/updateIT/fake_source_to_updateMode_insert_mongodb.conf"); + Assertions.assertEquals(0, insertResult.getExitCode(), insertResult.getStderr()); + + Container.ExecResult updateResult = + container.executeJob("/compatibleParametersIT/fake_source_to_update_mongodb.conf"); + Assertions.assertEquals(0, updateResult.getExitCode(), updateResult.getStderr()); + + Container.ExecResult assertResult = + container.executeJob("/updateIT/update_mongodb_to_assert.conf"); + Assertions.assertEquals(0, assertResult.getExitCode(), assertResult.getStderr()); + + clearDate(MONGODB_UPDATE_TABLE); + + // `matchQuery` compatible test + Container.ExecResult queryResult = + container.executeJob("/matchIT/mongodb_matchQuery_source_to_assert.conf"); + Assertions.assertEquals(0, queryResult.getExitCode(), queryResult.getStderr()); + + Assertions.assertIterableEquals( + TEST_MATCH_DATASET.stream() + .filter(x -> x.get("c_int").equals(2)) + .peek(e -> e.remove("_id")) + .collect(Collectors.toList()), + readMongodbData(MONGODB_MATCH_RESULT_TABLE).stream() + .peek(e -> e.remove("_id")) + .collect(Collectors.toList())); + clearDate(MONGODB_MATCH_RESULT_TABLE); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/compatibleParametersIT/fake_source_to_update_mongodb.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/compatibleParametersIT/fake_source_to_update_mongodb.conf new file mode 100644 index 00000000000..ef5bf5b88e1 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/compatibleParametersIT/fake_source_to_update_mongodb.conf @@ -0,0 +1,103 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + row.num = 5 + int.template = [2] + result_table_name = "mongodb_table" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + } + } + } + } +} + +sink { + MongoDB { + uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true" + database = "test_db" + collection = "test_update_table" + upsert-enable = true + // compatible parameters + upsert-key = ["c_int"] + source_table_name = "mongodb_table" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + } + } + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/compatibleParametersIT/mongodb_matchQuery_source_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/compatibleParametersIT/mongodb_matchQuery_source_to_assert.conf new file mode 100644 index 00000000000..5b7e73344ea --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/compatibleParametersIT/mongodb_matchQuery_source_to_assert.conf @@ -0,0 +1,93 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + MongoDB { + uri = "mongodb://e2e_mongodb:27017/test_db" + database = "test_db" + collection = "test_match_op_db" + result_table_name = "mongodb_table" + // compatible parameters + matchQuery = "{c_int: 2}" + cursor.no-timeout = true + fetch.size = 1000 + max.time-min = 100 + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + } + } + } + } +} + +sink { + Console { + source_table_name = "mongodb_table" + } + MongoDB { + uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true" + database = "test_db" + collection = "test_match_op_result_db" + source_table_name = "mongodb_table" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + } + } + } + } +}