From c280334a353bf37fa7fbdf420471cd7e786e69aa Mon Sep 17 00:00:00 2001 From: chenqqq11 Date: Thu, 29 Jun 2023 16:31:33 +0800 Subject: [PATCH] [Hotfix][Connector-V2][Mongodb] Compatible with historical parameters --- docs/en/connector-v2/sink/MongoDB.md | 1 + docs/en/connector-v2/source/MongoDB.md | 4 + .../mongodb/config/MongodbConfig.java | 3 +- .../mongodb/source/MongodbSource.java | 12 ++ .../e2e/connector/v2/mongodb/MongodbIT.java | 34 ++++++ .../fake_source_to_update_mongodb.conf | 103 ++++++++++++++++++ .../mongodb_matchQuery_source_to_assert.conf | 93 ++++++++++++++++ 7 files changed, 249 insertions(+), 1 deletion(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/compatibleParametersIT/fake_source_to_update_mongodb.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/compatibleParametersIT/mongodb_matchQuery_source_to_assert.conf diff --git a/docs/en/connector-v2/sink/MongoDB.md b/docs/en/connector-v2/sink/MongoDB.md index 40355583cb14..3c772f41ee6b 100644 --- a/docs/en/connector-v2/sink/MongoDB.md +++ b/docs/en/connector-v2/sink/MongoDB.md @@ -73,6 +73,7 @@ The following table lists the field data type mapping from MongoDB BSON type to | retry.interval | Duration | No | 1000 | Specifies the retry time interval if writing records to database failed, the unit is millisecond. | | upsert-enable | Boolean | No | false | Whether to write documents via upsert mode. | | primary-key | List | No | - | The primary keys for upsert/update. Keys are in `["id","name",...]` format for properties. | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | **Tips** diff --git a/docs/en/connector-v2/source/MongoDB.md b/docs/en/connector-v2/source/MongoDB.md index 14f283afb43e..fd8603bb4f67 100644 --- a/docs/en/connector-v2/source/MongoDB.md +++ b/docs/en/connector-v2/source/MongoDB.md @@ -79,6 +79,10 @@ For specific types in MongoDB, we use Extended JSON format to map them to SeaTun | fetch.size | Int | No | 2048 | Set the number of documents obtained from the server for each batch. Setting the appropriate batch size can improve query performance and avoid the memory pressure caused by obtaining a large amount of data at one time. | | max.time-min | Long | No | 600 | This parameter is a MongoDB query option that limits the maximum execution time for query operations. The value of maxTimeMin is in Minute. If the execution time of the query exceeds the specified time limit, MongoDB will terminate the operation and return an error. | | flat.sync-string | Boolean | No | true | By utilizing flatSyncString, only one field attribute value can be set, and the field type must be a String. This operation will perform a string mapping on a single MongoDB data entry. | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | + +tips: +1.The `match.query` parameter is compatible with the old version`mactQuery' writing. ## How to Create a MongoDB Data Synchronization Jobs diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java index 1ba9ad70cc4a..132263125b81 100644 --- a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java @@ -56,7 +56,8 @@ public class MongodbConfig { Options.key("match.query") .stringType() .noDefaultValue() - .withDescription("Mongodb's query syntax."); + .withDescription("Mongodb's query syntax.") + .withFallbackKeys("matchQuery"); public static final Option PROJECTION = Options.key("match.projection") diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java index e68796faa9e0..d611a9bd53b3 100644 --- a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java @@ -46,6 +46,7 @@ import com.google.auto.service.AutoService; import java.util.ArrayList; +import java.util.List; import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY; @@ -111,6 +112,17 @@ public void prepare(Config pluginConfig) throws PrepareFailException { splitStrategyBuilder.setMatchQuery( BsonDocument.parse(pluginConfig.getString(MongodbConfig.MATCH_QUERY.key()))); } + + List fallbackKeys = MongodbConfig.MATCH_QUERY.getFallbackKeys(); + fallbackKeys.forEach( + key -> { + if (pluginConfig.hasPath(key)) { + splitStrategyBuilder.setMatchQuery( + BsonDocument.parse( + pluginConfig.getString(MongodbConfig.MATCH_QUERY.key()))); + } + }); + if (pluginConfig.hasPath(MongodbConfig.SPLIT_KEY.key())) { splitStrategyBuilder.setSplitKey(pluginConfig.getString(MongodbConfig.SPLIT_KEY.key())); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java index ce25b2062b68..bc6f8840001c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java @@ -140,4 +140,38 @@ public void testMongodbSourceSplit(TestContainer container) .collect(Collectors.toList())); clearDate(MONGODB_SPLIT_RESULT_TABLE); } + + @TestTemplate + public void testCompatibleParameters(TestContainer container) + throws IOException, InterruptedException { + // `upsert-key` compatible test + Container.ExecResult insertResult = + container.executeJob("/updateIT/fake_source_to_updateMode_insert_mongodb.conf"); + Assertions.assertEquals(0, insertResult.getExitCode(), insertResult.getStderr()); + + Container.ExecResult updateResult = + container.executeJob("/compatibleParametersIT/fake_source_to_update_mongodb.conf"); + Assertions.assertEquals(0, updateResult.getExitCode(), updateResult.getStderr()); + + Container.ExecResult assertResult = + container.executeJob("/updateIT/update_mongodb_to_assert.conf"); + Assertions.assertEquals(0, assertResult.getExitCode(), assertResult.getStderr()); + + clearDate(MONGODB_UPDATE_TABLE); + + // `matchQuery` compatible test + Container.ExecResult queryResult = + container.executeJob("/matchIT/mongodb_matchQuery_source_to_assert.conf"); + Assertions.assertEquals(0, queryResult.getExitCode(), queryResult.getStderr()); + + Assertions.assertIterableEquals( + TEST_MATCH_DATASET.stream() + .filter(x -> x.get("c_int").equals(2)) + .peek(e -> e.remove("_id")) + .collect(Collectors.toList()), + readMongodbData(MONGODB_MATCH_RESULT_TABLE).stream() + .peek(e -> e.remove("_id")) + .collect(Collectors.toList())); + clearDate(MONGODB_MATCH_RESULT_TABLE); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/compatibleParametersIT/fake_source_to_update_mongodb.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/compatibleParametersIT/fake_source_to_update_mongodb.conf new file mode 100644 index 000000000000..ef5bf5b88e1c --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/compatibleParametersIT/fake_source_to_update_mongodb.conf @@ -0,0 +1,103 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + row.num = 5 + int.template = [2] + result_table_name = "mongodb_table" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + } + } + } + } +} + +sink { + MongoDB { + uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true" + database = "test_db" + collection = "test_update_table" + upsert-enable = true + // compatible parameters + upsert-key = ["c_int"] + source_table_name = "mongodb_table" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + } + } + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/compatibleParametersIT/mongodb_matchQuery_source_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/compatibleParametersIT/mongodb_matchQuery_source_to_assert.conf new file mode 100644 index 000000000000..5b7e73344ea9 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/compatibleParametersIT/mongodb_matchQuery_source_to_assert.conf @@ -0,0 +1,93 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + MongoDB { + uri = "mongodb://e2e_mongodb:27017/test_db" + database = "test_db" + collection = "test_match_op_db" + result_table_name = "mongodb_table" + // compatible parameters + matchQuery = "{c_int: 2}" + cursor.no-timeout = true + fetch.size = 1000 + max.time-min = 100 + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + } + } + } + } +} + +sink { + Console { + source_table_name = "mongodb_table" + } + MongoDB { + uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true" + database = "test_db" + collection = "test_match_op_result_db" + source_table_name = "mongodb_table" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + } + } + } + } +}