From 51140e482b139aa523f5322819cef1d6a8a6dd79 Mon Sep 17 00:00:00 2001 From: chenqqq11 Date: Thu, 29 Jun 2023 16:31:33 +0800 Subject: [PATCH] [Hotfix][Connector-V2][Mongodb] Compatible with historical parameters --- docs/en/connector-v2/sink/MongoDB.md | 5 +- docs/en/connector-v2/source/MongoDB.md | 5 + .../mongodb/config/MongodbConfig.java | 3 +- .../mongodb/source/MongodbSource.java | 12 ++ .../e2e/connector/v2/mongodb/MongodbIT.java | 34 ++++++ .../fake_source_to_update_mongodb.conf | 103 ++++++++++++++++++ .../mongodb_matchQuery_source_to_assert.conf | 93 ++++++++++++++++ 7 files changed, 252 insertions(+), 3 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/compatibleParametersIT/fake_source_to_update_mongodb.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/compatibleParametersIT/mongodb_matchQuery_source_to_assert.conf diff --git a/docs/en/connector-v2/sink/MongoDB.md b/docs/en/connector-v2/sink/MongoDB.md index 40355583cb1..464ecdeab6e 100644 --- a/docs/en/connector-v2/sink/MongoDB.md +++ b/docs/en/connector-v2/sink/MongoDB.md @@ -73,10 +73,11 @@ The following table lists the field data type mapping from MongoDB BSON type to | retry.interval | Duration | No | 1000 | Specifies the retry time interval if writing records to database failed, the unit is millisecond. | | upsert-enable | Boolean | No | false | Whether to write documents via upsert mode. | | primary-key | List | No | - | The primary keys for upsert/update. Keys are in `["id","name",...]` format for properties. | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | -**Tips** +### Tips -> 1.The data flushing logic of the MongoDB Sink Connector is jointly controlled by three parameters: `buffer-flush.max-rows`, `buffer-flush.interval`, and `checkpoint.interval`. +> 1.The data flushing logic of the MongoDB Sink Connector is jointly controlled by three parameters: `buffer-flush.max-rows`, `buffer-flush.interval`, and `checkpoint.interval`.
> Data flushing will be triggered if any of these conditions are met.
> 2.Compatible with the historical parameter `upsert-key`. If `upsert-key` is set, please do not set `primary-key`.
diff --git a/docs/en/connector-v2/source/MongoDB.md b/docs/en/connector-v2/source/MongoDB.md index 14f283afb43..137fb205b8c 100644 --- a/docs/en/connector-v2/source/MongoDB.md +++ b/docs/en/connector-v2/source/MongoDB.md @@ -79,6 +79,11 @@ For specific types in MongoDB, we use Extended JSON format to map them to SeaTun | fetch.size | Int | No | 2048 | Set the number of documents obtained from the server for each batch. Setting the appropriate batch size can improve query performance and avoid the memory pressure caused by obtaining a large amount of data at one time. | | max.time-min | Long | No | 600 | This parameter is a MongoDB query option that limits the maximum execution time for query operations. The value of maxTimeMin is in Minute. If the execution time of the query exceeds the specified time limit, MongoDB will terminate the operation and return an error. | | flat.sync-string | Boolean | No | true | By utilizing flatSyncString, only one field attribute value can be set, and the field type must be a String. This operation will perform a string mapping on a single MongoDB data entry. | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | + +### Tips + +> 1.The parameter `match.query` is compatible with the historical old version parameter `matchQuery`, and they are equivalent replacements.
## How to Create a MongoDB Data Synchronization Jobs diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java index 1ba9ad70cc4..132263125b8 100644 --- a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java @@ -56,7 +56,8 @@ public class MongodbConfig { Options.key("match.query") .stringType() .noDefaultValue() - .withDescription("Mongodb's query syntax."); + .withDescription("Mongodb's query syntax.") + .withFallbackKeys("matchQuery"); public static final Option PROJECTION = Options.key("match.projection") diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java index e68796faa9e..d611a9bd53b 100644 --- a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java @@ -46,6 +46,7 @@ import com.google.auto.service.AutoService; import java.util.ArrayList; +import java.util.List; import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY; @@ -111,6 +112,17 @@ public void prepare(Config pluginConfig) throws PrepareFailException { splitStrategyBuilder.setMatchQuery( BsonDocument.parse(pluginConfig.getString(MongodbConfig.MATCH_QUERY.key()))); } + + List fallbackKeys = MongodbConfig.MATCH_QUERY.getFallbackKeys(); + fallbackKeys.forEach( + key -> { + if (pluginConfig.hasPath(key)) { + splitStrategyBuilder.setMatchQuery( + BsonDocument.parse( + pluginConfig.getString(MongodbConfig.MATCH_QUERY.key()))); + } + }); + if (pluginConfig.hasPath(MongodbConfig.SPLIT_KEY.key())) { splitStrategyBuilder.setSplitKey(pluginConfig.getString(MongodbConfig.SPLIT_KEY.key())); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java index ce25b2062b6..bc6f8840001 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java @@ -140,4 +140,38 @@ public void testMongodbSourceSplit(TestContainer container) .collect(Collectors.toList())); clearDate(MONGODB_SPLIT_RESULT_TABLE); } + + @TestTemplate + public void testCompatibleParameters(TestContainer container) + throws IOException, InterruptedException { + // `upsert-key` compatible test + Container.ExecResult insertResult = + container.executeJob("/updateIT/fake_source_to_updateMode_insert_mongodb.conf"); + Assertions.assertEquals(0, insertResult.getExitCode(), insertResult.getStderr()); + + Container.ExecResult updateResult = + container.executeJob("/compatibleParametersIT/fake_source_to_update_mongodb.conf"); + Assertions.assertEquals(0, updateResult.getExitCode(), updateResult.getStderr()); + + Container.ExecResult assertResult = + container.executeJob("/updateIT/update_mongodb_to_assert.conf"); + Assertions.assertEquals(0, assertResult.getExitCode(), assertResult.getStderr()); + + clearDate(MONGODB_UPDATE_TABLE); + + // `matchQuery` compatible test + Container.ExecResult queryResult = + container.executeJob("/matchIT/mongodb_matchQuery_source_to_assert.conf"); + Assertions.assertEquals(0, queryResult.getExitCode(), queryResult.getStderr()); + + Assertions.assertIterableEquals( + TEST_MATCH_DATASET.stream() + .filter(x -> x.get("c_int").equals(2)) + .peek(e -> e.remove("_id")) + .collect(Collectors.toList()), + readMongodbData(MONGODB_MATCH_RESULT_TABLE).stream() + .peek(e -> e.remove("_id")) + .collect(Collectors.toList())); + clearDate(MONGODB_MATCH_RESULT_TABLE); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/compatibleParametersIT/fake_source_to_update_mongodb.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/compatibleParametersIT/fake_source_to_update_mongodb.conf new file mode 100644 index 00000000000..ef5bf5b88e1 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/compatibleParametersIT/fake_source_to_update_mongodb.conf @@ -0,0 +1,103 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + row.num = 5 + int.template = [2] + result_table_name = "mongodb_table" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + } + } + } + } +} + +sink { + MongoDB { + uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true" + database = "test_db" + collection = "test_update_table" + upsert-enable = true + // compatible parameters + upsert-key = ["c_int"] + source_table_name = "mongodb_table" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(33, 18)" + c_timestamp = timestamp + } + } + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/compatibleParametersIT/mongodb_matchQuery_source_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/compatibleParametersIT/mongodb_matchQuery_source_to_assert.conf new file mode 100644 index 00000000000..5b7e73344ea --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/compatibleParametersIT/mongodb_matchQuery_source_to_assert.conf @@ -0,0 +1,93 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + MongoDB { + uri = "mongodb://e2e_mongodb:27017/test_db" + database = "test_db" + collection = "test_match_op_db" + result_table_name = "mongodb_table" + // compatible parameters + matchQuery = "{c_int: 2}" + cursor.no-timeout = true + fetch.size = 1000 + max.time-min = 100 + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + } + } + } + } +} + +sink { + Console { + source_table_name = "mongodb_table" + } + MongoDB { + uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true" + database = "test_db" + collection = "test_match_op_result_db" + source_table_name = "mongodb_table" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + } + } + } + } +}