From 3de81991b36c45503e646755507a696b85d5572a Mon Sep 17 00:00:00 2001 From: annie-mac Date: Fri, 29 Mar 2024 08:48:08 -0700 Subject: [PATCH] refactor --- sdk/cosmos/azure-cosmos-kafka-connect/pom.xml | 5 ++++ .../kafka/connect/CosmosSinkConnector.java | 24 ++++++++++++++++++ .../kafka/connect/CosmosSourceConnector.java | 23 +++++++++++++++++ ...ava => CosmosThroughputControlHelper.java} | 2 +- .../implementation/KafkaCosmosConstants.java | 16 ------------ .../KafkaCosmosExceptionsHelper.java | 25 ++++++++++--------- .../sink/CosmosDBWriteException.java | 20 --------------- .../implementation/sink/CosmosSinkTask.java | 4 +-- .../sink/KafkaCosmosBulkWriter.java | 4 +-- .../sink/KafkaCosmosPointWriter.java | 10 ++++---- .../source/CosmosSourceTask.java | 6 ++--- .../CosmosChangeFeedRequestOptions.java | 5 ---- sdk/cosmos/tests.yml | 7 ------ 13 files changed, 78 insertions(+), 73 deletions(-) rename sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/{KafkaCosmosThroughputControlHelper.java => CosmosThroughputControlHelper.java} (98%) delete mode 100644 sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosDBWriteException.java diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/pom.xml b/sdk/cosmos/azure-cosmos-kafka-connect/pom.xml index 6dc24310792e3..12aa0fc119fa9 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/pom.xml +++ b/sdk/cosmos/azure-cosmos-kafka-connect/pom.xml @@ -48,12 +48,17 @@ Licensed under the MIT License. --add-opens com.azure.cosmos/com.azure.cosmos.implementation.apachecommons.lang=ALL-UNNAMED --add-opens com.azure.cosmos/com.azure.cosmos.implementation.caches=ALL-UNNAMED --add-opens com.azure.cosmos/com.azure.cosmos.implementation.faultinjection=ALL-UNNAMED + --add-opens com.azure.cosmos/com.azure.cosmos.implementation.guava25.base=ALL-UNNAMED --add-opens com.azure.cosmos/com.azure.cosmos.implementation.routing=ALL-UNNAMED --add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect=ALL-UNNAMED --add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation=ALL-UNNAMED --add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.sink=ALL-UNNAMED --add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.sink.idStrategy=ALL-UNNAMED --add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.source=com.fasterxml.jackson.databind,ALL-UNNAMED + + --add-exports com.azure.cosmos/com.azure.cosmos.implementation.changefeed.common=com.azure.cosmos.kafka.connect + --add-exports com.azure.cosmos.implementation.feedranges=com.azure.cosmos.kafka.connect + diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java index ef38399c74396..dc05ff76f91ec 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSinkConnector.java @@ -6,7 +6,9 @@ import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants; import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkConfig; import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkTask; +import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; import org.slf4j.Logger; @@ -15,6 +17,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig.validateThroughputControlConfig; /** * A Sink connector that publishes topic messages to CosmosDB. @@ -60,4 +66,22 @@ public ConfigDef config() { public String version() { return KafkaCosmosConstants.CURRENT_VERSION; } + + @Override + public Config validate(Map connectorConfigs) { + Config config = super.validate(connectorConfigs); + //there are errors based on the config def + if (config.configValues().stream().anyMatch(cv -> !cv.errorMessages().isEmpty())) { + return config; + } + + Map configValues = + config + .configValues() + .stream() + .collect(Collectors.toMap(ConfigValue::name, Function.identity())); + + validateThroughputControlConfig(connectorConfigs, configValues); + return config; + } } diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java index bbf568d9b9fb7..4752fdcfb07de 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/CosmosSourceConnector.java @@ -22,7 +22,9 @@ import com.azure.cosmos.kafka.connect.implementation.source.MetadataTaskUnit; import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.FeedRange; +import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; import org.slf4j.Logger; @@ -38,8 +40,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import java.util.stream.Collectors; +import static com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig.validateThroughputControlConfig; + /*** * The CosmosDb source connector. */ @@ -348,6 +353,24 @@ private Map getContainersTopicMap(List connectorConfigs) { + Config config = super.validate(connectorConfigs); + //there are errors based on the config def + if (config.configValues().stream().anyMatch(cv -> !cv.errorMessages().isEmpty())) { + return config; + } + + Map configValues = + config + .configValues() + .stream() + .collect(Collectors.toMap(ConfigValue::name, Function.identity())); + + validateThroughputControlConfig(connectorConfigs, configValues); + return config; + } + @Override public void close() { this.stop(); diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosThroughputControlHelper.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosThroughputControlHelper.java similarity index 98% rename from sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosThroughputControlHelper.java rename to sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosThroughputControlHelper.java index f6719b0a58e0c..f30210f103736 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosThroughputControlHelper.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/CosmosThroughputControlHelper.java @@ -14,7 +14,7 @@ import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; -public class KafkaCosmosThroughputControlHelper { +public class CosmosThroughputControlHelper { public static CosmosAsyncContainer tryEnableThroughputControl( CosmosAsyncContainer container, CosmosAsyncClient throughputControlCosmosClient, diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosConstants.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosConstants.java index 50db99a79c634..01a89ef20f532 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosConstants.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosConstants.java @@ -10,20 +10,4 @@ public class KafkaCosmosConstants { public static final String CURRENT_VERSION = CoreUtils.getProperties(PROPERTIES_FILE_NAME).get("version"); public static final String CURRENT_NAME = CoreUtils.getProperties(PROPERTIES_FILE_NAME).get("name"); public static final String USER_AGENT_SUFFIX = String.format("KafkaConnect/%s/%s", CURRENT_NAME, CURRENT_VERSION); - - public static class StatusCodes { - public static final int NOTFOUND = 404; - public static final int REQUEST_TIMEOUT = 408; - public static final int GONE = 410; - public static final int CONFLICT = 409; - public static final int PRECONDITION_FAILED = 412; - public static final int SERVICE_UNAVAILABLE = 503; - public static final int INTERNAL_SERVER_ERROR = 500; - } - - public static class SubStatusCodes { - public static final int READ_SESSION_NOT_AVAILABLE = 1002; - public static final int PARTITION_KEY_RANGE_GONE = 1002; - public static final int COMPLETING_SPLIT_OR_MERGE = 1007; - } } diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosExceptionsHelper.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosExceptionsHelper.java index a8adfd35a22ab..0d5a8bb8759e3 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosExceptionsHelper.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/KafkaCosmosExceptionsHelper.java @@ -4,16 +4,17 @@ package com.azure.cosmos.kafka.connect.implementation; import com.azure.cosmos.CosmosException; +import com.azure.cosmos.implementation.HttpConstants; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; public class KafkaCosmosExceptionsHelper { public static boolean isTransientFailure(int statusCode, int substatusCode) { - return statusCode == KafkaCosmosConstants.StatusCodes.GONE - || statusCode == KafkaCosmosConstants.StatusCodes.SERVICE_UNAVAILABLE - || statusCode == KafkaCosmosConstants.StatusCodes.INTERNAL_SERVER_ERROR - || statusCode == KafkaCosmosConstants.StatusCodes.REQUEST_TIMEOUT - || (statusCode == KafkaCosmosConstants.StatusCodes.NOTFOUND && substatusCode == KafkaCosmosConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE); + return statusCode == HttpConstants.StatusCodes.GONE + || statusCode == HttpConstants.StatusCodes.SERVICE_UNAVAILABLE + || statusCode == HttpConstants.StatusCodes.INTERNAL_SERVER_ERROR + || statusCode == HttpConstants.StatusCodes.REQUEST_TIMEOUT + || (statusCode == HttpConstants.StatusCodes.NOTFOUND && substatusCode == HttpConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE); } @@ -36,9 +37,9 @@ public static boolean isFeedRangeGoneException(Throwable throwable) { } public static boolean isFeedRangeGoneException(int statusCode, int substatusCode) { - return statusCode == KafkaCosmosConstants.StatusCodes.GONE - && (substatusCode == KafkaCosmosConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE - || substatusCode == KafkaCosmosConstants.SubStatusCodes.COMPLETING_SPLIT_OR_MERGE); + return statusCode == HttpConstants.StatusCodes.GONE + && (substatusCode == HttpConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE + || substatusCode == HttpConstants.SubStatusCodes.COMPLETING_SPLIT_OR_MERGE); } public static ConnectException convertToConnectException(Throwable throwable, String message) { @@ -51,7 +52,7 @@ public static ConnectException convertToConnectException(Throwable throwable, St public static boolean isResourceExistsException(Throwable throwable) { if (throwable instanceof CosmosException) { - return ((CosmosException) throwable).getStatusCode() == KafkaCosmosConstants.StatusCodes.CONFLICT; + return ((CosmosException) throwable).getStatusCode() == HttpConstants.StatusCodes.CONFLICT; } return false; @@ -59,7 +60,7 @@ public static boolean isResourceExistsException(Throwable throwable) { public static boolean isNotFoundException(Throwable throwable) { if (throwable instanceof CosmosException) { - return ((CosmosException) throwable).getStatusCode() == KafkaCosmosConstants.StatusCodes.NOTFOUND; + return ((CosmosException) throwable).getStatusCode() == HttpConstants.StatusCodes.NOTFOUND; } return false; @@ -67,7 +68,7 @@ public static boolean isNotFoundException(Throwable throwable) { public static boolean isPreconditionFailedException(Throwable throwable) { if (throwable instanceof CosmosException) { - return ((CosmosException) throwable).getStatusCode() == KafkaCosmosConstants.StatusCodes.PRECONDITION_FAILED; + return ((CosmosException) throwable).getStatusCode() == HttpConstants.StatusCodes.PRECONDITION_FAILED; } return false; @@ -75,7 +76,7 @@ public static boolean isPreconditionFailedException(Throwable throwable) { public static boolean isTimeoutException(Throwable throwable) { if (throwable instanceof CosmosException) { - return ((CosmosException) throwable).getStatusCode() == KafkaCosmosConstants.StatusCodes.REQUEST_TIMEOUT; + return ((CosmosException) throwable).getStatusCode() == HttpConstants.StatusCodes.REQUEST_TIMEOUT; } return false; diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosDBWriteException.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosDBWriteException.java deleted file mode 100644 index 69f99f35f8baf..0000000000000 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosDBWriteException.java +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.cosmos.kafka.connect.implementation.sink; - -import org.apache.kafka.connect.errors.ConnectException; - -/** - * Generic CosmosDb sink write exceptions. - */ -public class CosmosDBWriteException extends ConnectException { - /** - * - */ - private static final long serialVersionUID = 1L; - - public CosmosDBWriteException(String message) { - super(message); - } -} diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java index af7731387cf82..9a3bf9e2565bc 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java @@ -8,7 +8,7 @@ import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore; import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants; -import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosThroughputControlHelper; +import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlHelper; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.slf4j.Logger; @@ -96,7 +96,7 @@ record -> this.sinkTaskConfig .getDatabase(this.sinkTaskConfig.getContainersConfig().getDatabaseName()) .getContainer(containerName); - KafkaCosmosThroughputControlHelper + CosmosThroughputControlHelper .tryEnableThroughputControl( container, this.throughputControlClient, diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosBulkWriter.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosBulkWriter.java index ea63c8345401c..61b67b44f9c49 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosBulkWriter.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosBulkWriter.java @@ -12,7 +12,7 @@ import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlConfig; import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper; import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosSchedulers; -import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosThroughputControlHelper; +import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlHelper; import com.azure.cosmos.models.CosmosBulkExecutionOptions; import com.azure.cosmos.models.CosmosBulkItemRequestOptions; import com.azure.cosmos.models.CosmosBulkItemResponse; @@ -134,7 +134,7 @@ private CosmosBulkExecutionOptions getBulkExecutionOperations() { .setMaxConcurrentCosmosPartitions(bulkExecutionOptions, this.writeConfig.getBulkMaxConcurrentCosmosPartitions()); } - KafkaCosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(bulkExecutionOptions, this.throughputControlConfig); + CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(bulkExecutionOptions, this.throughputControlConfig); return bulkExecutionOptions; } diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosPointWriter.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosPointWriter.java index 2bd58242584f3..62a731ce81565 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosPointWriter.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosPointWriter.java @@ -10,7 +10,7 @@ import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlConfig; import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper; import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosSchedulers; -import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosThroughputControlHelper; +import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlHelper; import com.azure.cosmos.models.CosmosItemRequestOptions; import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.slf4j.Logger; @@ -71,7 +71,7 @@ private void upsertWithRetry(CosmosAsyncContainer container, SinkOperation sinkO executeWithRetry( (operation) -> { CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions(); - KafkaCosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(cosmosItemRequestOptions, this.throughputControlConfig); + CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(cosmosItemRequestOptions, this.throughputControlConfig); return container.upsertItem(operation.getSinkRecord().value(), cosmosItemRequestOptions).then(); }, (throwable) -> false, // no exceptions should be ignored @@ -83,7 +83,7 @@ private void createWithRetry(CosmosAsyncContainer container, SinkOperation sinkO executeWithRetry( (operation) -> { CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions(); - KafkaCosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(cosmosItemRequestOptions, this.throughputControlConfig); + CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(cosmosItemRequestOptions, this.throughputControlConfig); return container.createItem(operation.getSinkRecord().value(), cosmosItemRequestOptions).then(); }, (throwable) -> KafkaCosmosExceptionsHelper.isResourceExistsException(throwable), @@ -96,7 +96,7 @@ private void replaceIfNotModifiedWithRetry(CosmosAsyncContainer container, SinkO (operation) -> { CosmosItemRequestOptions itemRequestOptions = new CosmosItemRequestOptions(); itemRequestOptions.setIfMatchETag(etag); - KafkaCosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(itemRequestOptions, this.throughputControlConfig); + CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(itemRequestOptions, this.throughputControlConfig); return ImplementationBridgeHelpers .CosmosAsyncContainerHelper @@ -129,7 +129,7 @@ private void deleteWithRetry(CosmosAsyncContainer container, SinkOperation sinkO } } - KafkaCosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(itemRequestOptions, this.throughputControlConfig); + CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(itemRequestOptions, this.throughputControlConfig); return ImplementationBridgeHelpers .CosmosAsyncContainerHelper diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceTask.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceTask.java index e29aa11b9f708..cf4ed6c798c98 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceTask.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceTask.java @@ -11,7 +11,7 @@ import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore; import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants; import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper; -import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosThroughputControlHelper; +import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlHelper; import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; import com.azure.cosmos.models.FeedRange; import com.azure.cosmos.models.FeedResponse; @@ -167,7 +167,7 @@ private Pair, Boolean> executeFeedRangeTask(FeedRangeTaskUnit this.cosmosClient .getDatabase(feedRangeTaskUnit.getDatabaseName()) .getContainer(feedRangeTaskUnit.getContainerName()); - KafkaCosmosThroughputControlHelper.tryEnableThroughputControl( + CosmosThroughputControlHelper.tryEnableThroughputControl( container, this.throughputControlCosmosClient, this.taskConfig.getThroughputControlConfig()); @@ -178,7 +178,7 @@ private Pair, Boolean> executeFeedRangeTask(FeedRangeTaskUnit // split/merge will be handled in source task ModelBridgeInternal.getChangeFeedIsSplitHandlingDisabled(changeFeedRequestOptions); - KafkaCosmosThroughputControlHelper + CosmosThroughputControlHelper .tryPopulateThroughputControlGroupName( changeFeedRequestOptions, this.taskConfig.getThroughputControlConfig()); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java index 7ddbf5bbe1004..61e34ed4d0fdf 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java @@ -284,17 +284,12 @@ static CosmosChangeFeedRequestOptions createForProcessingFromContinuation( Range normalizedRange = FeedRangeInternal.normalizeRange(((FeedRangeEpkImpl) targetRange).getRange()); -<<<<<<< HEAD - - final ChangeFeedState changeFeedState = ChangeFeedState.fromString(continuation); -======= final ChangeFeedState changeFeedState = ChangeFeedState.fromString(continuation); if (StringUtils.isEmpty(continuationLsn)) { continuationLsn = changeFeedState.getContinuation().getCurrentContinuationToken().getToken(); } ->>>>>>> main ChangeFeedState targetChangeFeedState = new ChangeFeedStateV1( changeFeedState.getContainerRid(), diff --git a/sdk/cosmos/tests.yml b/sdk/cosmos/tests.yml index bfa4e836c2328..ffab0caff4a5f 100644 --- a/sdk/cosmos/tests.yml +++ b/sdk/cosmos/tests.yml @@ -69,11 +69,7 @@ extends: - name: AdditionalArgs value: '-DCOSMOS.CLIENT_TELEMETRY_ENDPOINT=$(cosmos-client-telemetry-endpoint) -DCOSMOS.CLIENT_TELEMETRY_COSMOS_ACCOUNT=$(cosmos-client-telemetry-cosmos-account)' -<<<<<<< HEAD - - template: /eng/pipelines/templates/stages/archetype-sdk-tests.yml -======= - template: /eng/pipelines/templates/stages/archetype-sdk-tests-isolated.yml ->>>>>>> main parameters: TestName: 'Kafka_Cosmos_Integration' CloudConfig: @@ -98,10 +94,7 @@ extends: - template: /eng/pipelines/templates/steps/install-reporting-tools.yml TestGoals: 'clean verify' TestOptions: '$(ProfileFlag)' -<<<<<<< HEAD -======= AdditionalVariables: - name: AdditionalArgs value: '-DCOSMOS.CLIENT_TELEMETRY_ENDPOINT=$(cosmos-client-telemetry-endpoint) -DCOSMOS.CLIENT_TELEMETRY_COSMOS_ACCOUNT=$(cosmos-client-telemetry-cosmos-account)' ->>>>>>> main