Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
annie-mac committed Mar 29, 2024
1 parent 6070bc6 commit 3de8199
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 73 deletions.
5 changes: 5 additions & 0 deletions sdk/cosmos/azure-cosmos-kafka-connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,17 @@ Licensed under the MIT License.
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.apachecommons.lang=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.caches=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.faultinjection=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.guava25.base=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.routing=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.sink=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.sink.idStrategy=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.source=com.fasterxml.jackson.databind,ALL-UNNAMED

--add-exports com.azure.cosmos/com.azure.cosmos.implementation.changefeed.common=com.azure.cosmos.kafka.connect
--add-exports com.azure.cosmos.implementation.feedranges=com.azure.cosmos.kafka.connect

</javaModulesSurefireArgLine>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkConfig;
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkTask;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import org.slf4j.Logger;
Expand All @@ -15,6 +17,10 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig.validateThroughputControlConfig;

/**
* A Sink connector that publishes topic messages to CosmosDB.
Expand Down Expand Up @@ -60,4 +66,22 @@ public ConfigDef config() {
public String version() {
return KafkaCosmosConstants.CURRENT_VERSION;
}

@Override
public Config validate(Map<String, String> connectorConfigs) {
Config config = super.validate(connectorConfigs);
//there are errors based on the config def
if (config.configValues().stream().anyMatch(cv -> !cv.errorMessages().isEmpty())) {
return config;
}

Map<String, ConfigValue> configValues =
config
.configValues()
.stream()
.collect(Collectors.toMap(ConfigValue::name, Function.identity()));

validateThroughputControlConfig(connectorConfigs, configValues);
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import com.azure.cosmos.kafka.connect.implementation.source.MetadataTaskUnit;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.FeedRange;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
Expand All @@ -38,8 +40,11 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConfig.validateThroughputControlConfig;

/***
* The CosmosDb source connector.
*/
Expand Down Expand Up @@ -348,6 +353,24 @@ private Map<String, String> getContainersTopicMap(List<CosmosContainerProperties
return effectiveContainersTopicMap;
}

@Override
public Config validate(Map<String, String> connectorConfigs) {
Config config = super.validate(connectorConfigs);
//there are errors based on the config def
if (config.configValues().stream().anyMatch(cv -> !cv.errorMessages().isEmpty())) {
return config;
}

Map<String, ConfigValue> configValues =
config
.configValues()
.stream()
.collect(Collectors.toMap(ConfigValue::name, Function.identity()));

validateThroughputControlConfig(connectorConfigs, configValues);
return config;
}

@Override
public void close() {
this.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

public class KafkaCosmosThroughputControlHelper {
public class CosmosThroughputControlHelper {
public static CosmosAsyncContainer tryEnableThroughputControl(
CosmosAsyncContainer container,
CosmosAsyncClient throughputControlCosmosClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,4 @@ public class KafkaCosmosConstants {
public static final String CURRENT_VERSION = CoreUtils.getProperties(PROPERTIES_FILE_NAME).get("version");
public static final String CURRENT_NAME = CoreUtils.getProperties(PROPERTIES_FILE_NAME).get("name");
public static final String USER_AGENT_SUFFIX = String.format("KafkaConnect/%s/%s", CURRENT_NAME, CURRENT_VERSION);

public static class StatusCodes {
public static final int NOTFOUND = 404;
public static final int REQUEST_TIMEOUT = 408;
public static final int GONE = 410;
public static final int CONFLICT = 409;
public static final int PRECONDITION_FAILED = 412;
public static final int SERVICE_UNAVAILABLE = 503;
public static final int INTERNAL_SERVER_ERROR = 500;
}

public static class SubStatusCodes {
public static final int READ_SESSION_NOT_AVAILABLE = 1002;
public static final int PARTITION_KEY_RANGE_GONE = 1002;
public static final int COMPLETING_SPLIT_OR_MERGE = 1007;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
package com.azure.cosmos.kafka.connect.implementation;

import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.HttpConstants;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;

public class KafkaCosmosExceptionsHelper {
public static boolean isTransientFailure(int statusCode, int substatusCode) {
return statusCode == KafkaCosmosConstants.StatusCodes.GONE
|| statusCode == KafkaCosmosConstants.StatusCodes.SERVICE_UNAVAILABLE
|| statusCode == KafkaCosmosConstants.StatusCodes.INTERNAL_SERVER_ERROR
|| statusCode == KafkaCosmosConstants.StatusCodes.REQUEST_TIMEOUT
|| (statusCode == KafkaCosmosConstants.StatusCodes.NOTFOUND && substatusCode == KafkaCosmosConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE);
return statusCode == HttpConstants.StatusCodes.GONE
|| statusCode == HttpConstants.StatusCodes.SERVICE_UNAVAILABLE
|| statusCode == HttpConstants.StatusCodes.INTERNAL_SERVER_ERROR
|| statusCode == HttpConstants.StatusCodes.REQUEST_TIMEOUT
|| (statusCode == HttpConstants.StatusCodes.NOTFOUND && substatusCode == HttpConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE);

}

Expand All @@ -36,9 +37,9 @@ public static boolean isFeedRangeGoneException(Throwable throwable) {
}

public static boolean isFeedRangeGoneException(int statusCode, int substatusCode) {
return statusCode == KafkaCosmosConstants.StatusCodes.GONE
&& (substatusCode == KafkaCosmosConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE
|| substatusCode == KafkaCosmosConstants.SubStatusCodes.COMPLETING_SPLIT_OR_MERGE);
return statusCode == HttpConstants.StatusCodes.GONE
&& (substatusCode == HttpConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE
|| substatusCode == HttpConstants.SubStatusCodes.COMPLETING_SPLIT_OR_MERGE);
}

public static ConnectException convertToConnectException(Throwable throwable, String message) {
Expand All @@ -51,31 +52,31 @@ public static ConnectException convertToConnectException(Throwable throwable, St

public static boolean isResourceExistsException(Throwable throwable) {
if (throwable instanceof CosmosException) {
return ((CosmosException) throwable).getStatusCode() == KafkaCosmosConstants.StatusCodes.CONFLICT;
return ((CosmosException) throwable).getStatusCode() == HttpConstants.StatusCodes.CONFLICT;
}

return false;
}

public static boolean isNotFoundException(Throwable throwable) {
if (throwable instanceof CosmosException) {
return ((CosmosException) throwable).getStatusCode() == KafkaCosmosConstants.StatusCodes.NOTFOUND;
return ((CosmosException) throwable).getStatusCode() == HttpConstants.StatusCodes.NOTFOUND;
}

return false;
}

public static boolean isPreconditionFailedException(Throwable throwable) {
if (throwable instanceof CosmosException) {
return ((CosmosException) throwable).getStatusCode() == KafkaCosmosConstants.StatusCodes.PRECONDITION_FAILED;
return ((CosmosException) throwable).getStatusCode() == HttpConstants.StatusCodes.PRECONDITION_FAILED;
}

return false;
}

public static boolean isTimeoutException(Throwable throwable) {
if (throwable instanceof CosmosException) {
return ((CosmosException) throwable).getStatusCode() == KafkaCosmosConstants.StatusCodes.REQUEST_TIMEOUT;
return ((CosmosException) throwable).getStatusCode() == HttpConstants.StatusCodes.REQUEST_TIMEOUT;
}

return false;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosThroughputControlHelper;
import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlHelper;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
Expand Down Expand Up @@ -96,7 +96,7 @@ record -> this.sinkTaskConfig
.getDatabase(this.sinkTaskConfig.getContainersConfig().getDatabaseName())
.getContainer(containerName);

KafkaCosmosThroughputControlHelper
CosmosThroughputControlHelper
.tryEnableThroughputControl(
container,
this.throughputControlClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlConfig;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosSchedulers;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosThroughputControlHelper;
import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlHelper;
import com.azure.cosmos.models.CosmosBulkExecutionOptions;
import com.azure.cosmos.models.CosmosBulkItemRequestOptions;
import com.azure.cosmos.models.CosmosBulkItemResponse;
Expand Down Expand Up @@ -134,7 +134,7 @@ private CosmosBulkExecutionOptions getBulkExecutionOperations() {
.setMaxConcurrentCosmosPartitions(bulkExecutionOptions, this.writeConfig.getBulkMaxConcurrentCosmosPartitions());
}

KafkaCosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(bulkExecutionOptions, this.throughputControlConfig);
CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(bulkExecutionOptions, this.throughputControlConfig);

return bulkExecutionOptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlConfig;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosSchedulers;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosThroughputControlHelper;
import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlHelper;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.slf4j.Logger;
Expand Down Expand Up @@ -71,7 +71,7 @@ private void upsertWithRetry(CosmosAsyncContainer container, SinkOperation sinkO
executeWithRetry(
(operation) -> {
CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions();
KafkaCosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(cosmosItemRequestOptions, this.throughputControlConfig);
CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(cosmosItemRequestOptions, this.throughputControlConfig);
return container.upsertItem(operation.getSinkRecord().value(), cosmosItemRequestOptions).then();
},
(throwable) -> false, // no exceptions should be ignored
Expand All @@ -83,7 +83,7 @@ private void createWithRetry(CosmosAsyncContainer container, SinkOperation sinkO
executeWithRetry(
(operation) -> {
CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions();
KafkaCosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(cosmosItemRequestOptions, this.throughputControlConfig);
CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(cosmosItemRequestOptions, this.throughputControlConfig);
return container.createItem(operation.getSinkRecord().value(), cosmosItemRequestOptions).then();
},
(throwable) -> KafkaCosmosExceptionsHelper.isResourceExistsException(throwable),
Expand All @@ -96,7 +96,7 @@ private void replaceIfNotModifiedWithRetry(CosmosAsyncContainer container, SinkO
(operation) -> {
CosmosItemRequestOptions itemRequestOptions = new CosmosItemRequestOptions();
itemRequestOptions.setIfMatchETag(etag);
KafkaCosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(itemRequestOptions, this.throughputControlConfig);
CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(itemRequestOptions, this.throughputControlConfig);

return ImplementationBridgeHelpers
.CosmosAsyncContainerHelper
Expand Down Expand Up @@ -129,7 +129,7 @@ private void deleteWithRetry(CosmosAsyncContainer container, SinkOperation sinkO
}
}

KafkaCosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(itemRequestOptions, this.throughputControlConfig);
CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(itemRequestOptions, this.throughputControlConfig);

return ImplementationBridgeHelpers
.CosmosAsyncContainerHelper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosThroughputControlHelper;
import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlHelper;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
Expand Down Expand Up @@ -167,7 +167,7 @@ private Pair<List<SourceRecord>, Boolean> executeFeedRangeTask(FeedRangeTaskUnit
this.cosmosClient
.getDatabase(feedRangeTaskUnit.getDatabaseName())
.getContainer(feedRangeTaskUnit.getContainerName());
KafkaCosmosThroughputControlHelper.tryEnableThroughputControl(
CosmosThroughputControlHelper.tryEnableThroughputControl(
container,
this.throughputControlCosmosClient,
this.taskConfig.getThroughputControlConfig());
Expand All @@ -178,7 +178,7 @@ private Pair<List<SourceRecord>, Boolean> executeFeedRangeTask(FeedRangeTaskUnit

// split/merge will be handled in source task
ModelBridgeInternal.getChangeFeedIsSplitHandlingDisabled(changeFeedRequestOptions);
KafkaCosmosThroughputControlHelper
CosmosThroughputControlHelper
.tryPopulateThroughputControlGroupName(
changeFeedRequestOptions,
this.taskConfig.getThroughputControlConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,17 +284,12 @@ static CosmosChangeFeedRequestOptions createForProcessingFromContinuation(
Range<String> normalizedRange =
FeedRangeInternal.normalizeRange(((FeedRangeEpkImpl) targetRange).getRange());

<<<<<<< HEAD

final ChangeFeedState changeFeedState = ChangeFeedState.fromString(continuation);
=======
final ChangeFeedState changeFeedState = ChangeFeedState.fromString(continuation);

if (StringUtils.isEmpty(continuationLsn)) {
continuationLsn = changeFeedState.getContinuation().getCurrentContinuationToken().getToken();
}

>>>>>>> main
ChangeFeedState targetChangeFeedState =
new ChangeFeedStateV1(
changeFeedState.getContainerRid(),
Expand Down
7 changes: 0 additions & 7 deletions sdk/cosmos/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,7 @@ extends:
- name: AdditionalArgs
value: '-DCOSMOS.CLIENT_TELEMETRY_ENDPOINT=$(cosmos-client-telemetry-endpoint) -DCOSMOS.CLIENT_TELEMETRY_COSMOS_ACCOUNT=$(cosmos-client-telemetry-cosmos-account)'

<<<<<<< HEAD
- template: /eng/pipelines/templates/stages/archetype-sdk-tests.yml
=======
- template: /eng/pipelines/templates/stages/archetype-sdk-tests-isolated.yml
>>>>>>> main
parameters:
TestName: 'Kafka_Cosmos_Integration'
CloudConfig:
Expand All @@ -98,10 +94,7 @@ extends:
- template: /eng/pipelines/templates/steps/install-reporting-tools.yml
TestGoals: 'clean verify'
TestOptions: '$(ProfileFlag)'
<<<<<<< HEAD
=======
AdditionalVariables:
- name: AdditionalArgs
value: '-DCOSMOS.CLIENT_TELEMETRY_ENDPOINT=$(cosmos-client-telemetry-endpoint) -DCOSMOS.CLIENT_TELEMETRY_COSMOS_ACCOUNT=$(cosmos-client-telemetry-cosmos-account)'
>>>>>>> main

0 comments on commit 3de8199

Please sign in to comment.