customPluginTags;
+ final String identifier = describeCustomPluginRequest.customPluginArn();
+ final KafkaConnectClient kafkaConnectClient = proxyClient.client();
+
+ try {
+ describeCustomPluginResponse =
+ proxyClient.injectCredentialsAndInvokeV2(
+ describeCustomPluginRequest, kafkaConnectClient::describeCustomPlugin);
+ } catch (final AwsServiceException e) {
+ throw exceptionTranslator.translateToCfnException(e, identifier);
+ }
+
+ try {
+ final ListTagsForResourceResponse listTagsForResourceResponse =
+ TagHelper.listTags(
+ describeCustomPluginRequest.customPluginArn(), kafkaConnectClient, proxyClient);
+ customPluginTags = listTagsForResourceResponse.tags();
+ } catch (final AwsServiceException e) {
+ throw exceptionTranslator.translateToCfnException(e, identifier);
+ }
+
+ logger.log(
+ String.format("%s [%s] has successfully been read.", ResourceModel.TYPE_NAME, identifier));
+
+ ResourceModel readResponse = translator.translateFromReadResponse(describeCustomPluginResponse);
+ readResponse.setTags(TagHelper.convertToList(customPluginTags));
+ return readResponse;
+ }
+}
diff --git a/aws-kafkaconnect-customplugin/src/main/java/software/amazon/kafkaconnect/customplugin/TagHelper.java b/aws-kafkaconnect-customplugin/src/main/java/software/amazon/kafkaconnect/customplugin/TagHelper.java
new file mode 100644
index 0000000..7cc1244
--- /dev/null
+++ b/aws-kafkaconnect-customplugin/src/main/java/software/amazon/kafkaconnect/customplugin/TagHelper.java
@@ -0,0 +1,200 @@
+package software.amazon.kafkaconnect.customplugin;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.ObjectUtils;
+
+import software.amazon.awssdk.services.kafkaconnect.KafkaConnectClient;
+import software.amazon.awssdk.services.kafkaconnect.model.ListTagsForResourceRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.ListTagsForResourceResponse;
+import software.amazon.cloudformation.proxy.ProxyClient;
+import software.amazon.cloudformation.proxy.ResourceHandlerRequest;
+
+public class TagHelper {
+ /**
+ * Converts a collection of Tag objects to a tag-name : tag-value map.
+ *
+ * Note: Tag objects with null tag values will not be included in the output map.
+ *
+ * @param tags Collection of tags to convert.
+ * @return Map of Tag objects.
+ */
+ public static Map convertToMap(final Collection tags) {
+ if (CollectionUtils.isEmpty(tags)) {
+ return Collections.emptyMap();
+ }
+ return tags.stream()
+ .filter(tag -> tag.getValue() != null)
+ .collect(Collectors.toMap(Tag::getKey, Tag::getValue, (oldValue, newValue) -> newValue));
+ }
+
+ /**
+ * Converts a tag map to a list of Tag objects.
+ *
+ * Note: Like convertToMap, convertToList filters out value-less tag entries.
+ *
+ * @param tagMap Map of tags to convert.
+ * @return List of Tag objects.
+ */
+ public static List convertToList(final Map tagMap) {
+ if (MapUtils.isEmpty(tagMap)) {
+ return Collections.emptyList();
+ }
+ return tagMap.entrySet().stream()
+ .filter(tag -> tag.getValue() != null)
+ .map(tag -> Tag.builder().key(tag.getKey()).value(tag.getValue()).build())
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Executes listTagsForResource SDK client call for the specified resource ARN.
+ *
+ * @param arn Resource ARN to list tags for.
+ * @param kafkaConnectClient AWS KafkaConnect client to use.
+ * @param proxyClient Proxy client to use for providing credentials.
+ * @return ListTagsForResourceResponse from the SDK client call.
+ */
+ public static ListTagsForResourceResponse listTags(
+ final String arn,
+ final KafkaConnectClient kafkaConnectClient,
+ final ProxyClient proxyClient) {
+ final ListTagsForResourceRequest listTagsForResourceRequest =
+ ListTagsForResourceRequest.builder().resourceArn(arn).build();
+
+ return proxyClient.injectCredentialsAndInvokeV2(
+ listTagsForResourceRequest, kafkaConnectClient::listTagsForResource);
+ }
+
+ /**
+ * generateTagsForCreate
+ *
+ * Generate tags to put into resource creation request.
+ * This includes user defined tags and system tags as well.
+ */
+ public static Map generateTagsForCreate(
+ final ResourceHandlerRequest handlerRequest) {
+ final Map tagMap = new HashMap<>();
+
+ // merge system tags with desired resource tags if your service supports CloudFormation system tags
+ if (handlerRequest.getSystemTags() != null) {
+ tagMap.putAll(handlerRequest.getSystemTags());
+ }
+
+ // get desired stack level tags from handlerRequest
+ if (handlerRequest.getDesiredResourceTags() != null) {
+ tagMap.putAll(handlerRequest.getDesiredResourceTags());
+ }
+
+ // get resource level tags from resource model based on your tag property name
+ if (handlerRequest.getDesiredResourceState() != null
+ && handlerRequest.getDesiredResourceState().getTags() != null) {
+ tagMap.putAll(convertToMap(handlerRequest.getDesiredResourceState().getTags()));
+ }
+
+ return Collections.unmodifiableMap(tagMap);
+ }
+
+ /**
+ * shouldUpdateTags
+ *
+ * Determines whether user defined tags have been changed during update.
+ */
+ public static boolean shouldUpdateTags(final ResourceHandlerRequest handlerRequest) {
+ final Map previousTags = getPreviouslyAttachedTags(handlerRequest);
+ final Map desiredTags = getNewDesiredTags(handlerRequest);
+ return ObjectUtils.notEqual(previousTags, desiredTags);
+ }
+
+ /**
+ * getPreviouslyAttachedTags
+ *
+ * If stack tags and resource tags are not merged together in Configuration class,
+ * we will get previous attached user defined tags from both handlerRequest.getPreviousResourceTags (stack tags)
+ * and handlerRequest.getPreviousResourceState (resource tags).
+ */
+ public static Map getPreviouslyAttachedTags(
+ final ResourceHandlerRequest handlerRequest) {
+ final Map previousTags = new HashMap<>();
+
+ // get previous system tags if your service supports CloudFormation system tags
+ if (handlerRequest.getPreviousSystemTags() != null) {
+ previousTags.putAll(handlerRequest.getPreviousSystemTags());
+ }
+
+ // get previous stack level tags from handlerRequest
+ if (handlerRequest.getPreviousResourceTags() != null) {
+ previousTags.putAll(handlerRequest.getPreviousResourceTags());
+ }
+
+ // get resource level tags from previous resource state based on your tag property name
+ if (handlerRequest.getPreviousResourceState() != null
+ && handlerRequest.getPreviousResourceState().getTags() != null) {
+ previousTags.putAll(convertToMap(handlerRequest.getPreviousResourceState().getTags()));
+ }
+
+ return previousTags;
+ }
+
+ /**
+ * getNewDesiredTags
+ *
+ * If stack tags and resource tags are not merged together in Configuration class,
+ * we will get new user defined tags from both resource model and previous stack tags.
+ */
+ public static Map getNewDesiredTags(final ResourceHandlerRequest handlerRequest) {
+ final Map desiredTags = new HashMap<>();
+
+ // merge system tags with desired resource tags if your service supports CloudFormation system tags
+ if (handlerRequest.getSystemTags() != null) {
+ desiredTags.putAll(handlerRequest.getSystemTags());
+ }
+
+ // get desired stack level tags from handlerRequest
+ if (handlerRequest.getDesiredResourceTags() != null) {
+ desiredTags.putAll(handlerRequest.getDesiredResourceTags());
+ }
+
+ // get resource level tags from resource model based on your tag property name
+ desiredTags.putAll(convertToMap(handlerRequest.getDesiredResourceState().getTags()));
+ return desiredTags;
+ }
+
+ /**
+ * Generates a map of tags to be added or modified in the resource.
+ *
+ * @param previousTags
+ * @param desiredTags
+ * @return
+ */
+ public static Map generateTagsToAdd(
+ final Map previousTags, final Map desiredTags) {
+ return desiredTags.entrySet().stream()
+ .filter(
+ desiredTag -> !previousTags.containsKey(desiredTag.getKey())
+ || !Objects.equals(
+ previousTags.get(desiredTag.getKey()), desiredTag.getValue()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ /**
+ * getTagsToRemove
+ *
+ * Determines the tags the customer desired to remove from the function.
+ */
+ public static Set generateTagsToRemove(final Map previousTags,
+ final Map desiredTags) {
+ final Set desiredTagNames = desiredTags.keySet();
+
+ return previousTags.keySet().stream()
+ .filter(tagName -> !desiredTagNames.contains(tagName))
+ .collect(Collectors.toSet());
+ }
+}
diff --git a/aws-kafkaconnect-customplugin/src/main/java/software/amazon/kafkaconnect/customplugin/Translator.java b/aws-kafkaconnect-customplugin/src/main/java/software/amazon/kafkaconnect/customplugin/Translator.java
new file mode 100644
index 0000000..da628ca
--- /dev/null
+++ b/aws-kafkaconnect-customplugin/src/main/java/software/amazon/kafkaconnect/customplugin/Translator.java
@@ -0,0 +1,204 @@
+package software.amazon.kafkaconnect.customplugin;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import software.amazon.awssdk.services.kafkaconnect.model.CreateCustomPluginRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.CustomPluginLocationDescription;
+import software.amazon.awssdk.services.kafkaconnect.model.DeleteCustomPluginRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.DescribeCustomPluginRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.DescribeCustomPluginResponse;
+import software.amazon.awssdk.services.kafkaconnect.model.ListCustomPluginsRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.ListCustomPluginsResponse;
+import software.amazon.awssdk.services.kafkaconnect.model.S3LocationDescription;
+import software.amazon.awssdk.services.kafkaconnect.model.TagResourceRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.UntagResourceRequest;
+
+/**
+ * This class is a centralized placeholder for - api request construction - object translation
+ * to/from aws sdk - resource model construction for read/list handlers
+ */
+public class Translator {
+
+ /**
+ * Request to create a resource
+ *
+ * @param model resource model
+ * @return createCustomPluginRequest the kafkaconnect request to create a resource
+ */
+ public CreateCustomPluginRequest translateToCreateRequest(final ResourceModel model,
+ final Map tagsForCreate) {
+ return CreateCustomPluginRequest.builder()
+ .contentType(model.getContentType())
+ .description(model.getDescription())
+ .location(resourceCustomPluginLocationToSdkCustomPluginLocation(model.getLocation()))
+ .name(model.getName())
+ .tags(tagsForCreate)
+ .build();
+ }
+
+ /**
+ * Request to read a resource
+ *
+ * @param model resource model
+ * @return describeCustomPluginRequest the kafkaconnect request to describe a resource
+ */
+ public DescribeCustomPluginRequest translateToReadRequest(final ResourceModel model) {
+ return DescribeCustomPluginRequest.builder()
+ .customPluginArn(model.getCustomPluginArn())
+ .build();
+ }
+
+ /**
+ * Translates resource object from sdk into a resource model
+ *
+ * @param describeCustomPluginResponse the kafkaconnect describe resource response
+ * @return model resource model
+ */
+ public ResourceModel translateFromReadResponse(
+ final DescribeCustomPluginResponse describeCustomPluginResponse) {
+ return ResourceModel.builder()
+ .name(describeCustomPluginResponse.name())
+ .description(describeCustomPluginResponse.description())
+ .customPluginArn(describeCustomPluginResponse.customPluginArn())
+ .fileDescription(
+ sdkCustomPluginFileDescriptionToResourceCustomPluginFileDescription(
+ describeCustomPluginResponse.latestRevision().fileDescription()))
+ .location(
+ sdkCustomPluginLocationDescriptionToResourceCustomPluginLocation(
+ describeCustomPluginResponse.latestRevision().location()))
+ .contentType(describeCustomPluginResponse.latestRevision().contentTypeAsString())
+ .revision(describeCustomPluginResponse.latestRevision().revision())
+ .build();
+ }
+
+ /**
+ * Request to delete a resource.
+ *
+ * @param model resource model
+ * @return awsRequest the aws service request to delete a resource
+ */
+ public DeleteCustomPluginRequest translateToDeleteRequest(final ResourceModel model) {
+ return DeleteCustomPluginRequest.builder().customPluginArn(model.getCustomPluginArn()).build();
+ }
+
+ /**
+ * Request to list resources
+ *
+ * @param nextToken token passed to the aws service list resources request
+ * @return listCustomPluginsRequest the kafkaconnect request to list resources within aws account
+ */
+ ListCustomPluginsRequest translateToListRequest(final String nextToken) {
+ return ListCustomPluginsRequest.builder().nextToken(nextToken).build();
+ }
+
+ /**
+ * Translates custom plugins from sdk into a resource model with primary identifier only. This is
+ * as per contract for list handlers.
+ *
+ * Reference -
+ * https://docs.aws.amazon.com/cloudformation-cli/latest/userguide/resource-type-test-contract.html#resource-type-test-contract-list
+ *
+ * @param listCustomPluginsResponse the kafkaconnect list resources response
+ * @return list of resource models
+ */
+ public List translateFromListResponse(
+ final ListCustomPluginsResponse listCustomPluginsResponse) {
+ return streamOfOrEmpty(listCustomPluginsResponse.customPlugins())
+ .map(
+ customPlugin -> ResourceModel.builder().customPluginArn(customPlugin.customPluginArn()).build())
+ .collect(Collectors.toList());
+ }
+
+ protected static Stream streamOfOrEmpty(final Collection collection) {
+ return Optional.ofNullable(collection).map(Collection::stream).orElseGet(Stream::empty);
+ }
+
+ /**
+ * Request to add tags to a resource.
+ *
+ * @param model resource model
+ * @return awsRequest the aws service request to create a resource
+ */
+ static TagResourceRequest translateToTagRequest(
+ final ResourceModel model, final Map addedTags) {
+ return TagResourceRequest.builder()
+ .resourceArn(model.getCustomPluginArn())
+ .tags(addedTags)
+ .build();
+ }
+
+ /**
+ * Request to remove tags from a resource.
+ *
+ * @param model resource model
+ * @return awsRequest the aws service request to create a resource
+ */
+ static UntagResourceRequest translateToUntagRequest(
+ final ResourceModel model, final Set removedTags) {
+ return UntagResourceRequest.builder()
+ .resourceArn(model.getCustomPluginArn())
+ .tagKeys(removedTags)
+ .build();
+ }
+
+ protected static CustomPluginFileDescription sdkCustomPluginFileDescriptionToResourceCustomPluginFileDescription(
+ final software.amazon.awssdk.services.kafkaconnect.model.CustomPluginFileDescription customPluginFileDescription) {
+
+ return customPluginFileDescription == null
+ ? null
+ : CustomPluginFileDescription.builder()
+ .fileMd5(customPluginFileDescription.fileMd5())
+ .fileSize(customPluginFileDescription.fileSize())
+ .build();
+ }
+
+ protected static CustomPluginLocation sdkCustomPluginLocationDescriptionToResourceCustomPluginLocation(
+ final CustomPluginLocationDescription customPluginLocationDescription) {
+
+ return customPluginLocationDescription == null
+ ? null
+ : CustomPluginLocation.builder()
+ .s3Location(
+ sdkS3LocationDescriptionToResourceS3Location(
+ customPluginLocationDescription.s3Location()))
+ .build();
+ }
+
+ protected static S3Location sdkS3LocationDescriptionToResourceS3Location(
+ final S3LocationDescription s3LocationDescription) {
+
+ return s3LocationDescription == null
+ ? null
+ : S3Location.builder()
+ .bucketArn(s3LocationDescription.bucketArn())
+ .fileKey(s3LocationDescription.fileKey())
+ .objectVersion(s3LocationDescription.objectVersion())
+ .build();
+ }
+
+ protected static software.amazon.awssdk.services.kafkaconnect.model.CustomPluginLocation resourceCustomPluginLocationToSdkCustomPluginLocation(
+ final CustomPluginLocation customPluginLocation) {
+ return customPluginLocation == null
+ ? null
+ : software.amazon.awssdk.services.kafkaconnect.model.CustomPluginLocation.builder()
+ .s3Location(resourceS3LocationToSdkS3Location(customPluginLocation.getS3Location()))
+ .build();
+ }
+
+ protected static software.amazon.awssdk.services.kafkaconnect.model.S3Location resourceS3LocationToSdkS3Location(
+ final S3Location s3Location) {
+ return s3Location == null
+ ? null
+ : software.amazon.awssdk.services.kafkaconnect.model.S3Location.builder()
+ .bucketArn(s3Location.getBucketArn())
+ .fileKey(s3Location.getFileKey())
+ .objectVersion(s3Location.getObjectVersion())
+ .build();
+ }
+}
diff --git a/aws-kafkaconnect-customplugin/src/main/java/software/amazon/kafkaconnect/customplugin/UpdateHandler.java b/aws-kafkaconnect-customplugin/src/main/java/software/amazon/kafkaconnect/customplugin/UpdateHandler.java
new file mode 100644
index 0000000..27651e8
--- /dev/null
+++ b/aws-kafkaconnect-customplugin/src/main/java/software/amazon/kafkaconnect/customplugin/UpdateHandler.java
@@ -0,0 +1,207 @@
+package software.amazon.kafkaconnect.customplugin;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.kafkaconnect.KafkaConnectClient;
+import software.amazon.awssdk.services.kafkaconnect.model.DescribeCustomPluginRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.DescribeCustomPluginResponse;
+import software.amazon.awssdk.services.kafkaconnect.model.TagResourceRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.UntagResourceRequest;
+import software.amazon.cloudformation.exceptions.CfnNotFoundException;
+import software.amazon.cloudformation.exceptions.CfnNotUpdatableException;
+import software.amazon.cloudformation.proxy.AmazonWebServicesClientProxy;
+import software.amazon.cloudformation.proxy.Logger;
+import software.amazon.cloudformation.proxy.ProgressEvent;
+import software.amazon.cloudformation.proxy.ProxyClient;
+import software.amazon.cloudformation.proxy.ResourceHandlerRequest;
+
+public class UpdateHandler extends BaseHandlerStd {
+
+ private Logger logger;
+
+ private final Translator translator;
+ private final ExceptionTranslator exceptionTranslator;
+ private final ReadHandler readHandler;
+
+ public UpdateHandler() {
+ this(new ExceptionTranslator(), new Translator(), new ReadHandler());
+ }
+
+ /**
+ * This is constructor is used for unit testing.
+ *
+ * @param exceptionTranslator
+ * @param translator
+ * @param readHandler
+ */
+ UpdateHandler(
+ final ExceptionTranslator exceptionTranslator,
+ final Translator translator,
+ final ReadHandler readHandler) {
+ this.translator = translator;
+ this.exceptionTranslator = exceptionTranslator;
+ this.readHandler = readHandler;
+ }
+
+ protected ProgressEvent handleRequest(
+ final AmazonWebServicesClientProxy proxy,
+ final ResourceHandlerRequest request,
+ final CallbackContext callbackContext,
+ final ProxyClient proxyClient,
+ final Logger logger) {
+ this.logger = logger;
+
+ final ResourceModel desiredModel = request.getDesiredResourceState();
+ final ResourceModel previousModel = request.getPreviousResourceState();
+
+ return ProgressEvent.progress(desiredModel, callbackContext)
+ .then(
+ progress -> proxy
+ .initiate(
+ "AWS-KafkaConnect-CustomPlugin::Update::ValidateResourceExists",
+ proxyClient,
+ desiredModel,
+ callbackContext)
+ .translateToServiceRequest(translator::translateToReadRequest)
+ .makeServiceCall(this::validateResourceExists)
+ .progress())
+ .then(progress -> verifyNonUpdatableFields(desiredModel, previousModel, progress))
+ .then(progress -> updateTags(proxyClient, progress, request))
+ .then(
+ progress -> readHandler.handleRequest(proxy, request, callbackContext, proxyClient, logger));
+ }
+
+ private DescribeCustomPluginResponse validateResourceExists(
+ DescribeCustomPluginRequest describeCustomPluginRequest,
+ ProxyClient proxyClient) {
+ DescribeCustomPluginResponse describeCustomPluginResponse;
+ if (describeCustomPluginRequest.customPluginArn() == null) {
+ throw new CfnNotFoundException(ResourceModel.TYPE_NAME, null);
+ }
+
+ try {
+ final KafkaConnectClient kafkaConnectClient = proxyClient.client();
+ describeCustomPluginResponse =
+ proxyClient.injectCredentialsAndInvokeV2(
+ describeCustomPluginRequest, kafkaConnectClient::describeCustomPlugin);
+ } catch (final AwsServiceException e) {
+ throw exceptionTranslator.translateToCfnException(
+ e, describeCustomPluginRequest.customPluginArn());
+ }
+
+ logger.log(
+ String.format(
+ "Validated Custom Plugin exists with name: %s", describeCustomPluginResponse.name()));
+ return describeCustomPluginResponse;
+ }
+
+ /**
+ * Checks if the CREATE ONLY fields have been updated and throws an exception if it is the case.
+ *
+ * @param currentModel The current resource model.
+ * @param previousModel The previous resource model.
+ * @param progress
+ * @return
+ */
+ private ProgressEvent verifyNonUpdatableFields(
+ ResourceModel currentModel,
+ ResourceModel previousModel,
+ ProgressEvent progress) {
+ if (previousModel != null) {
+ // Check READ ONLY fields.
+ final boolean isCustomPluginArnEqual =
+ Optional.ofNullable(currentModel.getCustomPluginArn())
+ .equals(Optional.ofNullable(previousModel.getCustomPluginArn()));
+ final boolean isRevisionEqual =
+ Optional.ofNullable(currentModel.getRevision())
+ .equals(Optional.ofNullable(previousModel.getRevision()));
+ final boolean isFileDescriptionEqual =
+ Optional.ofNullable(currentModel.getFileDescription())
+ .equals(Optional.ofNullable(previousModel.getFileDescription()));
+ // Check CREATE ONLY fields.
+ final boolean isNameEqual =
+ Optional.ofNullable(currentModel.getName())
+ .equals(Optional.ofNullable(previousModel.getName()));
+ final boolean isDescriptionEqual =
+ Optional.ofNullable(currentModel.getDescription())
+ .equals(Optional.ofNullable(previousModel.getDescription()));
+ final boolean isContentTypeEqual =
+ Optional.ofNullable(currentModel.getContentType())
+ .equals(Optional.ofNullable(previousModel.getContentType()));
+ final boolean isLocationEqual =
+ Optional.ofNullable(currentModel.getLocation())
+ .equals(Optional.ofNullable(previousModel.getLocation()));
+ if (!(isCustomPluginArnEqual
+ && isRevisionEqual
+ && isFileDescriptionEqual
+ && isNameEqual
+ && isDescriptionEqual
+ && isContentTypeEqual
+ && isLocationEqual)) {
+ throw new CfnNotUpdatableException(
+ ResourceModel.TYPE_NAME, currentModel.getCustomPluginArn());
+ }
+ }
+ logger.log(
+ String.format(
+ "Verified non-updatable fields for CustomPlugin resource with arn: %s",
+ currentModel.getCustomPluginArn()));
+ return progress;
+ }
+
+ /**
+ * Updates the tag for the CustomPlugin. This will remove the tags which are no longer needed and
+ * add new tags.
+ *
+ * @param proxyClient KafkaConnectClient to be used for updating tags
+ * @param progress
+ * @param request
+ * @return
+ */
+ private ProgressEvent updateTags(
+ final ProxyClient proxyClient,
+ final ProgressEvent progress,
+ ResourceHandlerRequest request) {
+ final ResourceModel desiredModel = request.getDesiredResourceState();
+ final String identifier = desiredModel.getCustomPluginArn();
+
+ if (TagHelper.shouldUpdateTags(request)) {
+ final Map previousTags = TagHelper.getPreviouslyAttachedTags(request);
+ final Map desiredTags = TagHelper.getNewDesiredTags(request);
+ final Map addedTags = TagHelper.generateTagsToAdd(previousTags, desiredTags);
+ final Set removedTags = TagHelper.generateTagsToRemove(previousTags, desiredTags);
+ final KafkaConnectClient kafkaConnectClient = proxyClient.client();
+
+ if (!removedTags.isEmpty()) {
+ final UntagResourceRequest untagResourceRequest =
+ Translator.translateToUntagRequest(desiredModel, removedTags);
+ try {
+ proxyClient.injectCredentialsAndInvokeV2(
+ untagResourceRequest, kafkaConnectClient::untagResource);
+ logger.log(
+ String.format(
+ "CustomPlugin removed %d tags from arn: %s", removedTags.size(), identifier));
+ } catch (final AwsServiceException e) {
+ throw exceptionTranslator.translateToCfnException(e, identifier);
+ }
+ }
+
+ if (!addedTags.isEmpty()) {
+ final TagResourceRequest tagResourceRequest =
+ Translator.translateToTagRequest(desiredModel, addedTags);
+ try {
+ proxyClient.injectCredentialsAndInvokeV2(
+ tagResourceRequest, kafkaConnectClient::tagResource);
+ logger.log(
+ String.format("CustomPlugin added %d tags to arn: %s", addedTags.size(), identifier));
+ } catch (final AwsServiceException e) {
+ throw exceptionTranslator.translateToCfnException(e, identifier);
+ }
+ }
+ }
+ return ProgressEvent.progress(desiredModel, progress.getCallbackContext());
+ }
+}
diff --git a/aws-kafkaconnect-customplugin/src/resources/log4j2.xml b/aws-kafkaconnect-customplugin/src/resources/log4j2.xml
new file mode 100644
index 0000000..5657daf
--- /dev/null
+++ b/aws-kafkaconnect-customplugin/src/resources/log4j2.xml
@@ -0,0 +1,17 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/AbstractTestBase.java b/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/AbstractTestBase.java
new file mode 100644
index 0000000..3ca246e
--- /dev/null
+++ b/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/AbstractTestBase.java
@@ -0,0 +1,72 @@
+package software.amazon.kafkaconnect.customplugin;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import software.amazon.awssdk.awscore.AwsRequest;
+import software.amazon.awssdk.awscore.AwsResponse;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.core.pagination.sync.SdkIterable;
+import software.amazon.awssdk.services.kafkaconnect.KafkaConnectClient;
+import software.amazon.cloudformation.proxy.AmazonWebServicesClientProxy;
+import software.amazon.cloudformation.proxy.Credentials;
+import software.amazon.cloudformation.proxy.LoggerProxy;
+import software.amazon.cloudformation.proxy.ProxyClient;
+
+public class AbstractTestBase {
+ protected static final Credentials MOCK_CREDENTIALS =
+ new Credentials("accessKey", "secretKey", "token");
+ protected static final LoggerProxy logger = new LoggerProxy();
+
+ protected static final Map TAGS =
+ new HashMap() {
+ {
+ put("TEST_TAG1", "TEST_TAG_VALUE1");
+ put("TEST_TAG2", "TEST_TAG_VALUE2");
+ }
+ };
+
+ static ProxyClient proxyStub(
+ final AmazonWebServicesClientProxy proxy, final KafkaConnectClient kafkaConnectClient) {
+ return new ProxyClient() {
+ @Override
+ public ResponseT injectCredentialsAndInvokeV2(
+ RequestT request, Function requestFunction) {
+ return proxy.injectCredentialsAndInvokeV2(request, requestFunction);
+ }
+
+ @Override
+ public CompletableFuture injectCredentialsAndInvokeV2Async(
+ RequestT request, Function> requestFunction) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public > IterableT injectCredentialsAndInvokeIterableV2(
+ RequestT request, Function requestFunction) {
+ return proxy.injectCredentialsAndInvokeIterableV2(request, requestFunction);
+ }
+
+ @Override
+ public ResponseInputStream injectCredentialsAndInvokeV2InputStream(
+ RequestT requestT, Function> function) {
+
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ResponseBytes injectCredentialsAndInvokeV2Bytes(
+ RequestT requestT, Function> function) {
+
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public KafkaConnectClient client() {
+ return kafkaConnectClient;
+ }
+ };
+ }
+}
diff --git a/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/CreateHandlerTest.java b/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/CreateHandlerTest.java
new file mode 100644
index 0000000..51c75e7
--- /dev/null
+++ b/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/CreateHandlerTest.java
@@ -0,0 +1,388 @@
+package software.amazon.kafkaconnect.customplugin;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.kafkaconnect.KafkaConnectClient;
+import software.amazon.awssdk.services.kafkaconnect.model.ConflictException;
+import software.amazon.awssdk.services.kafkaconnect.model.CreateCustomPluginRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.CreateCustomPluginResponse;
+import software.amazon.awssdk.services.kafkaconnect.model.CustomPluginContentType;
+import software.amazon.awssdk.services.kafkaconnect.model.CustomPluginLocation;
+import software.amazon.awssdk.services.kafkaconnect.model.CustomPluginLocationDescription;
+import software.amazon.awssdk.services.kafkaconnect.model.CustomPluginRevisionSummary;
+import software.amazon.awssdk.services.kafkaconnect.model.CustomPluginState;
+import software.amazon.awssdk.services.kafkaconnect.model.DescribeCustomPluginRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.DescribeCustomPluginResponse;
+import software.amazon.awssdk.services.kafkaconnect.model.ListTagsForResourceRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.ListTagsForResourceResponse;
+import software.amazon.awssdk.services.kafkaconnect.model.S3Location;
+import software.amazon.awssdk.services.kafkaconnect.model.S3LocationDescription;
+import software.amazon.awssdk.services.kafkaconnect.model.StateDescription;
+import software.amazon.cloudformation.exceptions.CfnAlreadyExistsException;
+import software.amazon.cloudformation.exceptions.CfnGeneralServiceException;
+import software.amazon.cloudformation.exceptions.CfnResourceConflictException;
+import software.amazon.cloudformation.proxy.AmazonWebServicesClientProxy;
+import software.amazon.cloudformation.proxy.OperationStatus;
+import software.amazon.cloudformation.proxy.ProgressEvent;
+import software.amazon.cloudformation.proxy.ProxyClient;
+import software.amazon.cloudformation.proxy.ResourceHandlerRequest;
+
+@ExtendWith(MockitoExtension.class)
+public class CreateHandlerTest extends AbstractTestBase {
+
+ @Mock
+ private KafkaConnectClient kafkaConnectClient;
+
+ @Mock
+ private ExceptionTranslator exceptionTranslator;
+
+ @Mock
+ private Translator translator;
+
+ private ReadHandler readHandler;
+
+ private AmazonWebServicesClientProxy proxy;
+
+ private ProxyClient proxyClient;
+
+ private CreateHandler handler;
+
+ @BeforeEach
+ public void setup() {
+ proxy =
+ new AmazonWebServicesClientProxy(
+ logger, MOCK_CREDENTIALS, () -> Duration.ofSeconds(600).toMillis());
+ proxyClient = proxyStub(proxy, kafkaConnectClient);
+ readHandler = new ReadHandler(exceptionTranslator, translator);
+ handler = new CreateHandler(exceptionTranslator, translator, readHandler);
+ }
+
+ @AfterEach
+ public void tear_down() {
+ verify(kafkaConnectClient, atLeastOnce()).serviceName();
+ verifyNoMoreInteractions(kafkaConnectClient, exceptionTranslator, translator);
+ }
+
+ @Test
+ public void handleRequest_success() {
+ final ResourceModel resourceModel = TestData.getResourceModel();
+ when(translator.translateToCreateRequest(resourceModel, TagHelper.convertToMap(resourceModel.getTags())))
+ .thenReturn(TestData.CREATE_CUSTOM_PLUGIN_REQUEST);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.CREATE_CUSTOM_PLUGIN_REQUEST, kafkaConnectClient::createCustomPlugin))
+ .thenReturn(TestData.CREATE_CUSTOM_PLUGIN_RESPONSE);
+ when(translator.translateToReadRequest(TestData.RESOURCE_MODEL_WITH_ARN))
+ .thenReturn(TestData.DESCRIBE_CUSTOM_PLUGIN_REQUEST);
+ final DescribeCustomPluginResponse describeCustomPluginResponse =
+ TestData.FULL_DESCRIBE_CUSTOM_PLUGIN_RESPONSE;
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.DESCRIBE_CUSTOM_PLUGIN_REQUEST, kafkaConnectClient::describeCustomPlugin))
+ .thenReturn(
+ describeCustomPluginResponse.toBuilder().customPluginState(CustomPluginState.CREATING).build())
+ .thenReturn(describeCustomPluginResponse);
+ when(translator.translateFromReadResponse(describeCustomPluginResponse))
+ .thenReturn(TestData.RESOURCE_MODEL_WITH_ARN);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.LIST_TAGS_FOR_RESOURCE_REQUEST, kafkaConnectClient::listTagsForResource))
+ .thenReturn(TestData.LIST_TAGS_FOR_RESOURCE_RESPONSE);
+
+ final ResourceHandlerRequest request =
+ TestData.getResourceHandlerRequest(resourceModel);
+ final ProgressEvent response =
+ handler.handleRequest(proxy, request, new CallbackContext(), proxyClient, logger);
+
+ assertThat(response).isEqualTo(TestData.DESCRIBE_RESPONSE);
+ assertThat(response.getResourceModel().getTags())
+ .isEqualTo(request.getDesiredResourceState().getTags());
+ }
+
+ @Test
+ public void handleRequest_throwsAlreadyExistsException_whenCustomPluginExists() {
+ final ResourceModel resourceModel = TestData.getResourceModel();
+ final ConflictException cException = ConflictException.builder().build();
+ final CfnAlreadyExistsException cfnException = new CfnAlreadyExistsException(cException);
+ when(translator.translateToCreateRequest(resourceModel, TagHelper.convertToMap(resourceModel.getTags())))
+ .thenReturn(TestData.CREATE_CUSTOM_PLUGIN_REQUEST);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.CREATE_CUSTOM_PLUGIN_REQUEST, kafkaConnectClient::createCustomPlugin))
+ .thenThrow(cException);
+ when(exceptionTranslator.translateToCfnException(cException, TestData.CUSTOM_PLUGIN_NAME))
+ .thenReturn(cfnException);
+ final CfnAlreadyExistsException exception =
+ assertThrows(
+ CfnAlreadyExistsException.class,
+ () -> handler.handleRequest(
+ proxy,
+ TestData.getResourceHandlerRequest(resourceModel),
+ new CallbackContext(),
+ proxyClient,
+ logger));
+
+ assertThat(exception).isEqualTo(cfnException);
+ }
+
+ public void handleRequest_afterNDescribeCustomPlugins_success() {
+ final ResourceModel resourceModel = TestData.getResourceModel();
+ when(translator.translateToCreateRequest(resourceModel, TagHelper.convertToMap(resourceModel.getTags())))
+ .thenReturn(TestData.CREATE_CUSTOM_PLUGIN_REQUEST);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.CREATE_CUSTOM_PLUGIN_REQUEST, kafkaConnectClient::createCustomPlugin))
+ .thenReturn(TestData.CREATE_CUSTOM_PLUGIN_RESPONSE);
+ when(translator.translateToReadRequest(TestData.RESOURCE_MODEL_WITH_ARN))
+ .thenReturn(TestData.DESCRIBE_CUSTOM_PLUGIN_REQUEST);
+ final DescribeCustomPluginResponse describeRunningCustomPluginResponse =
+ TestData.describeResponseWithState(CustomPluginState.ACTIVE);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.DESCRIBE_CUSTOM_PLUGIN_REQUEST, kafkaConnectClient::describeCustomPlugin))
+ .thenReturn(TestData.describeResponseWithState(CustomPluginState.CREATING))
+ .thenReturn(TestData.describeResponseWithState(CustomPluginState.CREATING))
+ .thenReturn(describeRunningCustomPluginResponse);
+ when(translator.translateFromReadResponse(describeRunningCustomPluginResponse))
+ .thenReturn(TestData.RESOURCE_MODEL_WITH_ARN);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.LIST_TAGS_FOR_RESOURCE_REQUEST, kafkaConnectClient::listTagsForResource))
+ .thenReturn(TestData.LIST_TAGS_FOR_RESOURCE_RESPONSE);
+
+ final ResourceHandlerRequest request =
+ TestData.getResourceHandlerRequest(resourceModel);
+ final ProgressEvent response =
+ handler.handleRequest(proxy, request, new CallbackContext(), proxyClient, logger);
+
+ assertThat(response).isEqualTo(TestData.DESCRIBE_RESPONSE);
+ assertThat(response.getResourceModel().getTags())
+ .isEqualTo(request.getDesiredResourceState().getTags());
+ }
+
+ @Test
+ public void handleRequest_throwsGeneralServiceException_whenDescribeCustomPluginThrowsException() {
+ final AwsServiceException cException =
+ AwsServiceException.builder().message(TestData.EXCEPTION_MESSAGE).build();
+ when(translator.translateToCreateRequest(TestData.getResourceModel(),
+ TagHelper.convertToMap(TestData.getResourceModel().getTags())))
+ .thenReturn(TestData.CREATE_CUSTOM_PLUGIN_REQUEST);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.CREATE_CUSTOM_PLUGIN_REQUEST, kafkaConnectClient::createCustomPlugin))
+ .thenReturn(TestData.CREATE_CUSTOM_PLUGIN_RESPONSE);
+ when(translator.translateToReadRequest(TestData.RESOURCE_MODEL_WITH_ARN))
+ .thenReturn(TestData.DESCRIBE_CUSTOM_PLUGIN_REQUEST);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.DESCRIBE_CUSTOM_PLUGIN_REQUEST, kafkaConnectClient::describeCustomPlugin))
+ .thenThrow(cException);
+
+ runHandlerAndAssertExceptionThrownWithMessage(
+ CfnGeneralServiceException.class,
+ "Error occurred during operation 'AWS::KafkaConnect::CustomPlugin create request "
+ + "accepted but failed to get state due to: "
+ + TestData.EXCEPTION_MESSAGE
+ + "'.");
+ }
+
+ @Test
+ public void handleRequest_throwsGeneralServiceException_whenCustomPluginsStateFails() {
+ setupMocksToReturnCustomPluginState(CustomPluginState.CREATE_FAILED);
+
+ runHandlerAndAssertExceptionThrownWithMessage(
+ CfnGeneralServiceException.class,
+ "Error occurred during operation 'Couldn't create AWS::KafkaConnect::CustomPlugin "
+ + "due to create failure'.");
+ }
+
+ @Test
+ public void handleRequest_throwsResourceConflictException_whenCustomPluginStartsDeletingDuringCreate() {
+ setupMocksToReturnCustomPluginState(CustomPluginState.DELETING);
+
+ runHandlerAndAssertExceptionThrownWithMessage(
+ CfnResourceConflictException.class,
+ "Resource of type 'AWS::KafkaConnect::CustomPlugin' with identifier '"
+ + TestData.CUSTOM_PLUGIN_ARN
+ + "' has a conflict. Reason: Another process is deleting this AWS::KafkaConnect::CustomPlugin.");
+ }
+
+ @Test
+ public void handleRequest_throwsGeneralServiceException_whenCustomPluginReturnsUnexpectedState() {
+ setupMocksToReturnCustomPluginState(CustomPluginState.UNKNOWN_TO_SDK_VERSION);
+
+ runHandlerAndAssertExceptionThrownWithMessage(
+ CfnGeneralServiceException.class,
+ "Error occurred during operation 'AWS::KafkaConnect::CustomPlugin create request accepted "
+ + "but current state is unknown'.");
+ }
+
+ private void setupMocksToReturnCustomPluginState(final CustomPluginState customPluginState) {
+ when(translator.translateToCreateRequest(TestData.getResourceModel(),
+ TagHelper.convertToMap(TestData.getResourceModel().getTags())))
+ .thenReturn(TestData.CREATE_CUSTOM_PLUGIN_REQUEST);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.CREATE_CUSTOM_PLUGIN_REQUEST, kafkaConnectClient::createCustomPlugin))
+ .thenReturn(TestData.CREATE_CUSTOM_PLUGIN_RESPONSE);
+ when(translator.translateToReadRequest(TestData.RESOURCE_MODEL_WITH_ARN))
+ .thenReturn(TestData.DESCRIBE_CUSTOM_PLUGIN_REQUEST);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.DESCRIBE_CUSTOM_PLUGIN_REQUEST, kafkaConnectClient::describeCustomPlugin))
+ .thenReturn(TestData.describeResponseWithState(customPluginState));
+ }
+
+ private void runHandlerAndAssertExceptionThrownWithMessage(
+ final Class extends Exception> expectedExceptionClass, final String expectedMessage) {
+
+ final Exception exception =
+ assertThrows(
+ expectedExceptionClass,
+ () -> handler.handleRequest(
+ proxy,
+ TestData.getResourceHandlerRequest(TestData.getResourceModel()),
+ new CallbackContext(),
+ proxyClient,
+ logger));
+
+ assertThat(exception.getMessage()).isEqualTo(expectedMessage);
+ }
+
+ private static class TestData {
+ private static final String CUSTOM_PLUGIN_NAME = "unit-test-custom-plugin";
+ private static final String CUSTOM_PLUGIN_DESCRIPTION =
+ "Unit testing custom plugin description";
+
+ private static final long CUSTOM_PLUGIN_REVISION = 1L;
+ private static final String CUSTOM_PLUGIN_S3_FILE_KEY = "file-key";
+
+ private static final String CUSTOM_PLUGIN_PROPERTIES_FILE_CONTENT = "propertiesFileContent";
+
+ private static final String CUSTOM_PLUGIN_ARN =
+ "arn:aws:kafkaconnect:us-east-1:1111111111:custom-plugin/unit-test-custom-plugin";
+ private static final Instant CUSTOM_PLUGIN_CREATION_TIME =
+ OffsetDateTime.parse("2021-03-04T14:03:40.818Z").toInstant();
+ private static final String CUSTOM_PLUGIN_LOCATION_BUCKET_ARN = "arn:aws:s3:::unit-test-bucket";
+ private static final String CUSTOM_PLUGIN_LOCATION_FILE_KEY = "unit-test-file-key.zip";
+ private static final String CUSTOM_PLUGIN_LOCATION_OBJECT_VERSION = "1";
+ private static final String CUSTOM_PLUGIN_STATE_CODE = "custom-plugin-state-code";
+ private static final String CUSTOM_PLUGIN_STATE_DESCRIPTION = "custom-plugin-state-description";
+ private static final String CUSTOM_PLUGIN_S3_OBJECT_VERSION = "object-version";
+ private static final String CUSTOM_PLUGIN_S3_BUCKET_ARN = "bucket-arn";
+ private static final String CUSTOM_PLUGIN_FILE_MD5 = "abcd1234";
+ private static final long CUSTOM_PLUGIN_FILE_SIZE = 123456L;
+ private static final String EXCEPTION_MESSAGE = "Exception message";
+ private static final DescribeCustomPluginResponse FULL_DESCRIBE_CUSTOM_PLUGIN_RESPONSE =
+ DescribeCustomPluginResponse.builder()
+ .creationTime(CUSTOM_PLUGIN_CREATION_TIME)
+ .customPluginArn(CUSTOM_PLUGIN_ARN)
+ .customPluginState(CustomPluginState.ACTIVE)
+ .name(CUSTOM_PLUGIN_NAME)
+ .description(CUSTOM_PLUGIN_DESCRIPTION)
+ .stateDescription(
+ StateDescription.builder()
+ .code(CUSTOM_PLUGIN_STATE_CODE)
+ .message(CUSTOM_PLUGIN_STATE_DESCRIPTION)
+ .build())
+ .latestRevision(
+ CustomPluginRevisionSummary.builder()
+ .contentType(CustomPluginContentType.ZIP)
+ .creationTime(CUSTOM_PLUGIN_CREATION_TIME)
+ .description(CUSTOM_PLUGIN_DESCRIPTION)
+ .fileDescription(
+ software.amazon.awssdk.services.kafkaconnect.model.CustomPluginFileDescription.builder()
+ .fileMd5(CUSTOM_PLUGIN_FILE_MD5)
+ .fileSize(CUSTOM_PLUGIN_FILE_SIZE)
+ .build())
+ .location(
+ CustomPluginLocationDescription.builder()
+ .s3Location(
+ S3LocationDescription.builder()
+ .bucketArn(CUSTOM_PLUGIN_S3_BUCKET_ARN)
+ .fileKey(CUSTOM_PLUGIN_S3_FILE_KEY)
+ .objectVersion(CUSTOM_PLUGIN_S3_OBJECT_VERSION)
+ .build())
+ .build())
+ .revision(CUSTOM_PLUGIN_REVISION)
+ .build())
+ .build();
+ private static final CustomPluginLocation CUSTOM_PLUGIN_LOCATION =
+ CustomPluginLocation.builder()
+ .s3Location(
+ S3Location.builder()
+ .bucketArn(CUSTOM_PLUGIN_LOCATION_BUCKET_ARN)
+ .fileKey(CUSTOM_PLUGIN_LOCATION_FILE_KEY)
+ .objectVersion(CUSTOM_PLUGIN_LOCATION_OBJECT_VERSION)
+ .build())
+ .build();
+
+ private static final CreateCustomPluginRequest CREATE_CUSTOM_PLUGIN_REQUEST =
+ CreateCustomPluginRequest.builder()
+ .contentType(CUSTOM_PLUGIN_PROPERTIES_FILE_CONTENT)
+ .description(CUSTOM_PLUGIN_DESCRIPTION)
+ .location(CUSTOM_PLUGIN_LOCATION)
+ .name(CUSTOM_PLUGIN_NAME)
+ .build();
+
+ private static final CreateCustomPluginResponse CREATE_CUSTOM_PLUGIN_RESPONSE =
+ CreateCustomPluginResponse.builder()
+ .customPluginArn(CUSTOM_PLUGIN_ARN)
+ .customPluginState(CustomPluginState.ACTIVE)
+ .name(CUSTOM_PLUGIN_NAME)
+ .revision(CUSTOM_PLUGIN_REVISION)
+ .build();
+
+ private static final DescribeCustomPluginRequest DESCRIBE_CUSTOM_PLUGIN_REQUEST =
+ DescribeCustomPluginRequest.builder().customPluginArn(CUSTOM_PLUGIN_ARN).build();
+
+ private static final ResourceModel RESOURCE_MODEL_WITH_ARN =
+ ResourceModel.builder()
+ .name(CUSTOM_PLUGIN_NAME)
+ .customPluginArn(CUSTOM_PLUGIN_ARN)
+ .tags(TagHelper.convertToList(TAGS))
+ .build();
+
+ private static DescribeCustomPluginResponse describeResponseWithState(
+ final CustomPluginState state) {
+ return DescribeCustomPluginResponse.builder()
+ .customPluginArn(CUSTOM_PLUGIN_ARN)
+ .name(CUSTOM_PLUGIN_NAME)
+ .customPluginState(state)
+ .build();
+ }
+
+ private static final ProgressEvent DESCRIBE_RESPONSE =
+ ProgressEvent.builder()
+ .resourceModel(RESOURCE_MODEL_WITH_ARN)
+ .status(OperationStatus.SUCCESS)
+ .build();
+
+ private static ResourceHandlerRequest getResourceHandlerRequest(
+ final ResourceModel resourceModel) {
+
+ return ResourceHandlerRequest.builder()
+ .desiredResourceState(resourceModel)
+ .desiredResourceTags(TAGS)
+ .build();
+ }
+
+ private static ResourceModel getResourceModel() {
+ return ResourceModel.builder()
+ .name(CUSTOM_PLUGIN_NAME)
+ .tags(TagHelper.convertToList(TAGS))
+ .build();
+ }
+
+ private static final ListTagsForResourceRequest LIST_TAGS_FOR_RESOURCE_REQUEST =
+ ListTagsForResourceRequest.builder().resourceArn(CUSTOM_PLUGIN_ARN).build();
+
+ private static final ListTagsForResourceResponse LIST_TAGS_FOR_RESOURCE_RESPONSE =
+ ListTagsForResourceResponse.builder().tags(TAGS).build();
+ }
+}
diff --git a/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/DeleteHandlerTest.java b/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/DeleteHandlerTest.java
new file mode 100644
index 0000000..5d0d2c4
--- /dev/null
+++ b/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/DeleteHandlerTest.java
@@ -0,0 +1,332 @@
+package software.amazon.kafkaconnect.customplugin;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.kafkaconnect.KafkaConnectClient;
+import software.amazon.awssdk.services.kafkaconnect.model.BadRequestException;
+import software.amazon.awssdk.services.kafkaconnect.model.CustomPluginState;
+import software.amazon.awssdk.services.kafkaconnect.model.DeleteCustomPluginRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.DeleteCustomPluginResponse;
+import software.amazon.awssdk.services.kafkaconnect.model.DescribeCustomPluginRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.DescribeCustomPluginResponse;
+import software.amazon.awssdk.services.kafkaconnect.model.NotFoundException;
+import software.amazon.cloudformation.exceptions.CfnInvalidRequestException;
+import software.amazon.cloudformation.exceptions.CfnNotFoundException;
+import software.amazon.cloudformation.exceptions.CfnNotStabilizedException;
+import software.amazon.cloudformation.proxy.AmazonWebServicesClientProxy;
+import software.amazon.cloudformation.proxy.OperationStatus;
+import software.amazon.cloudformation.proxy.ProgressEvent;
+import software.amazon.cloudformation.proxy.ProxyClient;
+import software.amazon.cloudformation.proxy.ResourceHandlerRequest;
+
+@ExtendWith(MockitoExtension.class)
+public class DeleteHandlerTest extends AbstractTestBase {
+
+ @Mock
+ private AmazonWebServicesClientProxy proxy;
+
+ @Mock
+ private ProxyClient proxyClient;
+
+ @Mock
+ private KafkaConnectClient kafkaConnectClient;
+
+ @Mock
+ private ExceptionTranslator exceptionTranslator;
+
+ @Mock
+ private Translator translator;
+
+ private DeleteHandler handler;
+
+ @BeforeEach
+ public void setup() {
+ proxy =
+ new AmazonWebServicesClientProxy(
+ logger, MOCK_CREDENTIALS, () -> Duration.ofSeconds(600).toMillis());
+ proxyClient = proxyStub(proxy, kafkaConnectClient);
+ handler = new DeleteHandler(exceptionTranslator, translator);
+ }
+
+ @AfterEach
+ public void tear_down() {
+ verify(kafkaConnectClient, atLeastOnce()).serviceName();
+ verifyNoMoreInteractions(kafkaConnectClient, exceptionTranslator, translator);
+ }
+
+ @Test
+ public void test_handleRequest_success() {
+ final DescribeCustomPluginRequest describeCustomPluginRequest =
+ TestData.createDescribeCustomPluginRequest();
+ final ResourceModel resourceModel = TestData.createResourceModel();
+
+ when(translator.translateToReadRequest(resourceModel)).thenReturn(describeCustomPluginRequest);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ describeCustomPluginRequest, kafkaConnectClient::describeCustomPlugin))
+ .thenReturn(
+ TestData.createDescribeCustomPluginResponse(CustomPluginState.ACTIVE)) // First call to
+ // validate resource
+ .thenReturn(
+ TestData.createDescribeCustomPluginResponse(
+ CustomPluginState.DELETING)) // Second call to
+ // stabalize
+ // resource
+ .thenThrow(NotFoundException.class); // Third call to finalise deletion of the resource
+ when(kafkaConnectClient.deleteCustomPlugin(any(DeleteCustomPluginRequest.class)))
+ .thenReturn(TestData.createDeleteCustomPluginResponse());
+ when(translator.translateToDeleteRequest(resourceModel))
+ .thenReturn(TestData.createDeleteCustomPluginRequest());
+
+ final ProgressEvent response =
+ handler.handleRequest(
+ proxy,
+ TestData.createResourceHandlerRequest(resourceModel),
+ new CallbackContext(),
+ proxyClient,
+ logger);
+
+ assertThat(response).isNotNull();
+ assertThat(response.getStatus()).isEqualTo(OperationStatus.SUCCESS);
+ assertThat(response.getCallbackDelaySeconds()).isEqualTo(0);
+ assertThat(response.getResourceModel()).isNull();
+ assertThat(response.getResourceModels()).isNull();
+ assertThat(response.getMessage()).isNull();
+ assertThat(response.getErrorCode()).isNull();
+
+ verify(kafkaConnectClient, times(1)).deleteCustomPlugin(any(DeleteCustomPluginRequest.class));
+ verify(kafkaConnectClient, times(3))
+ .describeCustomPlugin(any(DescribeCustomPluginRequest.class));
+ }
+
+ @Test
+ public void test_handleRequest_failure_dueToBadRequest() {
+ final DescribeCustomPluginRequest describeCustomPluginRequest =
+ TestData.createDescribeCustomPluginRequest();
+ final ResourceModel resourceModel = TestData.createResourceModel();
+ final BadRequestException serviceException = BadRequestException.builder().build();
+ final CfnInvalidRequestException cfnException =
+ new CfnInvalidRequestException(serviceException);
+
+ when(translator.translateToReadRequest(resourceModel)).thenReturn(describeCustomPluginRequest);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ describeCustomPluginRequest, kafkaConnectClient::describeCustomPlugin))
+ .thenThrow(serviceException);
+ when(exceptionTranslator.translateToCfnException(serviceException, TestData.CUSTOM_PLUGIN_ARN))
+ .thenReturn(cfnException);
+
+ assertThrows(
+ CfnInvalidRequestException.class,
+ () -> handler.handleRequest(
+ proxy,
+ TestData.createResourceHandlerRequest(resourceModel),
+ new CallbackContext(),
+ proxyClient,
+ logger));
+
+ verify(kafkaConnectClient, times(0)).deleteCustomPlugin(any(DeleteCustomPluginRequest.class));
+ verify(kafkaConnectClient, times(1))
+ .describeCustomPlugin(any(DescribeCustomPluginRequest.class));
+ }
+
+ @Test
+ public void test_handleRequest_failure_dueToEmptyCustomPluginArn() {
+ final DescribeCustomPluginRequest describeCustomPluginRequest =
+ DescribeCustomPluginRequest.builder().customPluginArn(null).build();
+ final ResourceModel resourceModel = TestData.createResourceModel();
+
+ when(translator.translateToReadRequest(resourceModel)).thenReturn(describeCustomPluginRequest);
+
+ assertThrows(
+ CfnNotFoundException.class,
+ () -> handler.handleRequest(
+ proxy,
+ TestData.createResourceHandlerRequest(resourceModel),
+ new CallbackContext(),
+ proxyClient,
+ logger));
+
+ verify(kafkaConnectClient, times(0)).deleteCustomPlugin(any(DeleteCustomPluginRequest.class));
+ verify(kafkaConnectClient, times(0))
+ .describeCustomPlugin(any(DescribeCustomPluginRequest.class));
+ }
+
+ @Test
+ public void test_handleRequest_failure_dueToCustomPluginNotFound() {
+ final DescribeCustomPluginRequest describeCustomPluginRequest = TestData.createDescribeCustomPluginRequest();
+ final ResourceModel resourceModel = TestData.createResourceModel();
+ final NotFoundException serviceException = NotFoundException.builder().build();
+ final CfnNotFoundException cfnException = new CfnNotFoundException(serviceException);
+
+ when(translator.translateToReadRequest(resourceModel)).thenReturn(describeCustomPluginRequest);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ describeCustomPluginRequest, kafkaConnectClient::describeCustomPlugin))
+ .thenThrow(serviceException);
+ when(exceptionTranslator.translateToCfnException(serviceException, TestData.CUSTOM_PLUGIN_ARN))
+ .thenReturn(cfnException);
+
+ assertThrows(
+ CfnNotFoundException.class,
+ () -> handler.handleRequest(
+ proxy,
+ TestData.createResourceHandlerRequest(resourceModel),
+ new CallbackContext(),
+ proxyClient,
+ logger));
+
+ verify(kafkaConnectClient, times(0)).deleteCustomPlugin(any(DeleteCustomPluginRequest.class));
+ verify(kafkaConnectClient, times(1))
+ .describeCustomPlugin(any(DescribeCustomPluginRequest.class));
+ }
+
+ @Test
+ public void test_handleRequest_failure_alreadyDeleted() {
+ final DescribeCustomPluginRequest describeCustomPluginRequest =
+ TestData.createDescribeCustomPluginRequest();
+ final ResourceModel resourceModel = TestData.createResourceModel();
+ final NotFoundException serviceException = NotFoundException.builder().build();
+ final CfnNotFoundException cfnException = new CfnNotFoundException(serviceException);
+
+ when(translator.translateToReadRequest(resourceModel)).thenReturn(describeCustomPluginRequest);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ describeCustomPluginRequest, kafkaConnectClient::describeCustomPlugin))
+ .thenReturn(TestData.createDescribeCustomPluginResponse(CustomPluginState.ACTIVE));
+ when(kafkaConnectClient.deleteCustomPlugin(any(DeleteCustomPluginRequest.class)))
+ .thenThrow(serviceException);
+ when(exceptionTranslator.translateToCfnException(serviceException, TestData.CUSTOM_PLUGIN_ARN))
+ .thenReturn(cfnException);
+ when(translator.translateToDeleteRequest(resourceModel))
+ .thenReturn(TestData.createDeleteCustomPluginRequest());
+
+ assertThrows(
+ CfnNotFoundException.class,
+ () -> handler.handleRequest(
+ proxy,
+ TestData.createResourceHandlerRequest(resourceModel),
+ new CallbackContext(),
+ proxyClient,
+ logger));
+
+ verify(kafkaConnectClient, times(1)).deleteCustomPlugin(any(DeleteCustomPluginRequest.class));
+ verify(kafkaConnectClient, times(1))
+ .describeCustomPlugin(any(DescribeCustomPluginRequest.class));
+ }
+
+ @Test
+ public void test_handleRequest_failure_stabilizeHandlesServiceExceptions() {
+ final DescribeCustomPluginRequest describeCustomPluginRequest =
+ TestData.createDescribeCustomPluginRequest();
+ final ResourceModel resourceModel = TestData.createResourceModel();
+
+ when(translator.translateToReadRequest(resourceModel)).thenReturn(describeCustomPluginRequest);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ describeCustomPluginRequest, kafkaConnectClient::describeCustomPlugin))
+ .thenReturn(TestData.createDescribeCustomPluginResponse(CustomPluginState.ACTIVE))
+ .thenThrow(AwsServiceException.builder().build());
+ when(kafkaConnectClient.deleteCustomPlugin(any(DeleteCustomPluginRequest.class)))
+ .thenReturn(TestData.createDeleteCustomPluginResponse());
+ when(translator.translateToDeleteRequest(resourceModel))
+ .thenReturn(TestData.createDeleteCustomPluginRequest());
+
+ final ProgressEvent response =
+ handler.handleRequest(
+ proxy,
+ TestData.createResourceHandlerRequest(resourceModel),
+ new CallbackContext(),
+ proxyClient,
+ logger);
+
+ assertThat(response).isNotNull();
+ assertThat(response.getStatus()).isEqualTo(OperationStatus.FAILED);
+ assertThat(response.getCallbackDelaySeconds()).isEqualTo(0);
+ assertThat(response.getResourceModels()).isNull();
+
+ verify(kafkaConnectClient, times(1)).deleteCustomPlugin(any(DeleteCustomPluginRequest.class));
+ verify(kafkaConnectClient, times(2))
+ .describeCustomPlugin(any(DescribeCustomPluginRequest.class));
+ verify(exceptionTranslator, times(1))
+ .translateToCfnException(any(AwsServiceException.class), eq(TestData.CUSTOM_PLUGIN_ARN));
+ }
+
+ @Test
+ public void test_handleRequest_failure_stabilizeFails_dueToUnexpectedState() {
+ final DescribeCustomPluginRequest describeCustomPluginRequest =
+ TestData.createDescribeCustomPluginRequest();
+ final ResourceModel resourceModel = TestData.createResourceModel();
+
+ when(translator.translateToReadRequest(resourceModel)).thenReturn(describeCustomPluginRequest);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ describeCustomPluginRequest, kafkaConnectClient::describeCustomPlugin))
+ .thenReturn(TestData.createDescribeCustomPluginResponse(CustomPluginState.ACTIVE))
+ .thenReturn(
+ TestData.createDescribeCustomPluginResponse(CustomPluginState.UNKNOWN_TO_SDK_VERSION));
+ when(kafkaConnectClient.deleteCustomPlugin(any(DeleteCustomPluginRequest.class)))
+ .thenReturn(TestData.createDeleteCustomPluginResponse());
+ when(translator.translateToDeleteRequest(resourceModel))
+ .thenReturn(TestData.createDeleteCustomPluginRequest());
+
+ assertThrows(
+ CfnNotStabilizedException.class,
+ () -> handler.handleRequest(
+ proxy,
+ TestData.createResourceHandlerRequest(resourceModel),
+ new CallbackContext(),
+ proxyClient,
+ logger));
+
+ verify(kafkaConnectClient, times(1)).deleteCustomPlugin(any(DeleteCustomPluginRequest.class));
+ verify(kafkaConnectClient, times(2))
+ .describeCustomPlugin(any(DescribeCustomPluginRequest.class));
+ }
+
+ private static class TestData {
+ private static final String CUSTOM_PLUGIN_ARN =
+ "arn:aws:kafkaconnect:us-east-1:123456789:custom-plugin/unit-test-custom-plugin";
+
+ private static ResourceModel createResourceModel() {
+ return ResourceModel.builder().customPluginArn(CUSTOM_PLUGIN_ARN).build();
+ }
+
+ private static ResourceHandlerRequest createResourceHandlerRequest(
+ ResourceModel resourceModel) {
+ return ResourceHandlerRequest.builder()
+ .desiredResourceState(resourceModel)
+ .build();
+ }
+
+ private static DescribeCustomPluginRequest createDescribeCustomPluginRequest() {
+ return DescribeCustomPluginRequest.builder().customPluginArn(CUSTOM_PLUGIN_ARN).build();
+ }
+
+ private static DescribeCustomPluginResponse createDescribeCustomPluginResponse(
+ CustomPluginState state) {
+ return DescribeCustomPluginResponse.builder()
+ .customPluginArn(CUSTOM_PLUGIN_ARN)
+ .customPluginState(state)
+ .build();
+ }
+
+ private static DeleteCustomPluginRequest createDeleteCustomPluginRequest() {
+ return DeleteCustomPluginRequest.builder().customPluginArn(CUSTOM_PLUGIN_ARN).build();
+ }
+
+ private static DeleteCustomPluginResponse createDeleteCustomPluginResponse() {
+ return DeleteCustomPluginResponse.builder().customPluginArn(CUSTOM_PLUGIN_ARN).build();
+ }
+ }
+}
diff --git a/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/ExceptionTranslatorTest.java b/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/ExceptionTranslatorTest.java
new file mode 100644
index 0000000..8caf830
--- /dev/null
+++ b/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/ExceptionTranslatorTest.java
@@ -0,0 +1,102 @@
+package software.amazon.kafkaconnect.customplugin;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.kafkaconnect.model.BadRequestException;
+import software.amazon.awssdk.services.kafkaconnect.model.ConflictException;
+import software.amazon.awssdk.services.kafkaconnect.model.NotFoundException;
+import software.amazon.awssdk.services.kafkaconnect.model.TooManyRequestsException;
+import software.amazon.awssdk.services.kafkaconnect.model.UnauthorizedException;
+import software.amazon.awssdk.services.kafkaconnect.model.InternalServerErrorException;
+
+import software.amazon.cloudformation.exceptions.*;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@ExtendWith(MockitoExtension.class)
+public class ExceptionTranslatorTest {
+ private static final String TEST_IDENTIFIER = "custom-plugin-test-name";
+ private static final String TEST_MESSAGE = "test-message";
+ private final ExceptionTranslator exceptionTranslator = new ExceptionTranslator();
+
+ @Test
+ public void translateToCfnException_NotFoundException_MapsToCfnNotFoundException() {
+ final NotFoundException exception = NotFoundException.builder()
+ .message(TEST_MESSAGE)
+ .build();
+
+ runTranslateToCfnExceptionAndVerifyOutput(exception, CfnNotFoundException.class,
+ "Resource of type 'AWS::KafkaConnect::CustomPlugin' with identifier 'custom-plugin-test-name' was not found.");
+ }
+
+ @Test
+ public void translateToCfnException_BadRequestException_MapsToCfnInvalidRequestException() {
+ final BadRequestException exception = BadRequestException.builder()
+ .message(TEST_MESSAGE)
+ .build();
+
+ runTranslateToCfnExceptionAndVerifyOutput(exception, CfnInvalidRequestException.class,
+ "Invalid request provided: " + TEST_MESSAGE);
+ }
+
+ @Test
+ public void translateToCfnException_ConflictException_MapsToCfnAlreadyExistsException() {
+ final ConflictException exception = ConflictException.builder()
+ .message(TEST_MESSAGE)
+ .build();
+
+ runTranslateToCfnExceptionAndVerifyOutput(exception, CfnAlreadyExistsException.class,
+ "Resource of type 'AWS::KafkaConnect::CustomPlugin' with identifier 'custom-plugin-test-name' "
+ +
+ "already exists.");
+ }
+
+ @Test
+ public void translateToCfnException_InternalServerErrorException_MapsToCfnInternalFailureException() {
+ final InternalServerErrorException exception = InternalServerErrorException.builder()
+ .message(TEST_MESSAGE)
+ .build();
+
+ runTranslateToCfnExceptionAndVerifyOutput(exception, CfnInternalFailureException.class,
+ "Internal error occurred.");
+ }
+
+ @Test
+ public void translateToCfnException_UnauthorizedException_MapsToCfnAccessDeniedException() {
+ final UnauthorizedException exception = UnauthorizedException.builder()
+ .message(TEST_MESSAGE)
+ .build();
+
+ runTranslateToCfnExceptionAndVerifyOutput(exception, CfnAccessDeniedException.class,
+ "Access denied for operation 'AWS::KafkaConnect::CustomPlugin'.");
+ }
+
+ @Test
+ public void translateToCfnException_TooManyRequestsException_MapsToCfnServiceLimitExceededException() {
+ final TooManyRequestsException exception = TooManyRequestsException.builder()
+ .message(TEST_MESSAGE)
+ .build();
+
+ runTranslateToCfnExceptionAndVerifyOutput(exception, CfnServiceLimitExceededException.class, TEST_MESSAGE);
+ }
+
+ @Test
+ public void translateToCfnException_Other_MapsToCfnGeneralServiceException() {
+ final AwsServiceException exception = AwsServiceException.builder()
+ .message(TEST_MESSAGE)
+ .build();
+
+ runTranslateToCfnExceptionAndVerifyOutput(exception, CfnGeneralServiceException.class, TEST_MESSAGE);
+ }
+
+ private void runTranslateToCfnExceptionAndVerifyOutput(final AwsServiceException exception,
+ final Class extends BaseHandlerException> expectedExceptionClass, final String expectedMessage) {
+
+ final BaseHandlerException result = exceptionTranslator.translateToCfnException(exception, TEST_IDENTIFIER);
+
+ assertThat(result.getClass()).isEqualTo(expectedExceptionClass);
+ assertThat(result.getMessage()).isEqualTo(expectedMessage);
+ }
+}
diff --git a/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/ListHandlerTest.java b/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/ListHandlerTest.java
new file mode 100644
index 0000000..13dd7c7
--- /dev/null
+++ b/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/ListHandlerTest.java
@@ -0,0 +1,165 @@
+package software.amazon.kafkaconnect.customplugin;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.kafkaconnect.KafkaConnectClient;
+import software.amazon.awssdk.services.kafkaconnect.model.CustomPluginSummary;
+import software.amazon.awssdk.services.kafkaconnect.model.ListCustomPluginsRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.ListCustomPluginsResponse;
+import software.amazon.cloudformation.exceptions.CfnGeneralServiceException;
+import software.amazon.cloudformation.proxy.AmazonWebServicesClientProxy;
+import software.amazon.cloudformation.proxy.OperationStatus;
+import software.amazon.cloudformation.proxy.ProgressEvent;
+import software.amazon.cloudformation.proxy.ProxyClient;
+import software.amazon.cloudformation.proxy.ResourceHandlerRequest;
+
+@ExtendWith(MockitoExtension.class)
+public class ListHandlerTest extends AbstractTestBase {
+ @Mock
+ private KafkaConnectClient kafkaConnectClient;
+
+ @Mock
+ private ExceptionTranslator exceptionTranslator;
+
+ @Mock
+ private Translator translator;
+
+ private AmazonWebServicesClientProxy proxy;
+
+ private ProxyClient proxyClient;
+
+ private ListHandler handler;
+
+ @BeforeEach
+ public void setup() {
+ proxy =
+ new AmazonWebServicesClientProxy(
+ logger, MOCK_CREDENTIALS, () -> Duration.ofSeconds(600).toMillis());
+ proxyClient = proxyStub(proxy, kafkaConnectClient);
+ handler = new ListHandler(exceptionTranslator, translator);
+ }
+
+ @AfterEach
+ public void tear_down() {
+ verifyNoMoreInteractions(kafkaConnectClient, exceptionTranslator, translator);
+ }
+
+ @Test
+ public void handleRequest_success() {
+ when(translator.translateToListRequest(TestData.NEXT_TOKEN_1))
+ .thenReturn(TestData.LIST_CUSTOM_PLUGINS_REQUEST);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.LIST_CUSTOM_PLUGINS_REQUEST, kafkaConnectClient::listCustomPlugins))
+ .thenReturn(TestData.LIST_CUSTOM_PLUGINS_RESPONSE);
+ when(translator.translateFromListResponse(TestData.LIST_CUSTOM_PLUGINS_RESPONSE))
+ .thenReturn(TestData.CUSTOM_PLUGIN_MODELS);
+
+ final ProgressEvent response =
+ handler.handleRequest(
+ proxy, TestData.RESOURCE_HANDLER_REQUEST, new CallbackContext(), proxyClient, logger);
+
+ assertThat(response).isEqualTo(TestData.SUCCESS_RESPONSE);
+ }
+
+ @Test
+ public void handleRequest_throwsException_whenListCustomPluginsFails() {
+ final AwsServiceException serviceException = AwsServiceException.builder().build();
+ final CfnGeneralServiceException cfnException =
+ new CfnGeneralServiceException(serviceException);
+ when(translator.translateToListRequest(TestData.NEXT_TOKEN_1))
+ .thenReturn(TestData.LIST_CUSTOM_PLUGINS_REQUEST);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.LIST_CUSTOM_PLUGINS_REQUEST, kafkaConnectClient::listCustomPlugins))
+ .thenThrow(serviceException);
+ when(exceptionTranslator.translateToCfnException(serviceException, TestData.AWS_ACCOUNT_ID))
+ .thenReturn(cfnException);
+
+ final CfnGeneralServiceException exception =
+ assertThrows(
+ CfnGeneralServiceException.class,
+ () -> handler.handleRequest(
+ proxy,
+ TestData.RESOURCE_HANDLER_REQUEST,
+ new CallbackContext(),
+ proxyClient,
+ logger));
+
+ assertThat(exception).isEqualTo(cfnException);
+ }
+
+ private static class TestData {
+ private static final String NEXT_TOKEN_1 = "next-token-1";
+ private static final String NEXT_TOKEN_2 = "next-token-2";
+ private static final String AWS_ACCOUNT_ID = "1111111111";
+ private static final String CUSTOM_PLUGIN_ARN_1 =
+ "arn:aws:kafkaconnect:us-east-1:123456789:custom-plugin/unit-test-custom-plugin-1";
+ private static final String CUSTOM_PLUGIN_ARN_2 =
+ "arn:aws:kafkaconnect:us-east-1:123456789:custom-plugin/unit-test-custom-plugin-2";
+ private static final String CUSTOM_PLUGIN_NAME_1 = "unit-test-custom-plugin-1";
+ private static final String CUSTOM_PLUGIN_NAME_2 = "unit-test-custom-plugin-2";
+
+ private static final ListCustomPluginsRequest LIST_CUSTOM_PLUGINS_REQUEST =
+ ListCustomPluginsRequest.builder().nextToken(NEXT_TOKEN_1).build();
+
+ private static final ResourceModel MODEL = ResourceModel.builder().build();
+
+ private static final ResourceHandlerRequest RESOURCE_HANDLER_REQUEST =
+ ResourceHandlerRequest.builder()
+ .desiredResourceState(MODEL)
+ .nextToken(NEXT_TOKEN_1)
+ .awsAccountId(AWS_ACCOUNT_ID)
+ .build();
+
+ public static final List CUSTOM_PLUGIN_MODELS =
+ Arrays.asList(
+ buildResourceModel(CUSTOM_PLUGIN_NAME_1, CUSTOM_PLUGIN_ARN_1),
+ buildResourceModel(CUSTOM_PLUGIN_NAME_2, CUSTOM_PLUGIN_ARN_2));
+
+ private static final List CUSTOM_PLUGIN_SUMMARIES =
+ Arrays.asList(
+ buildCustomPluginSummary(CUSTOM_PLUGIN_NAME_1, CUSTOM_PLUGIN_ARN_1),
+ buildCustomPluginSummary(CUSTOM_PLUGIN_NAME_2, CUSTOM_PLUGIN_ARN_2));
+
+ private static final ProgressEvent SUCCESS_RESPONSE =
+ ProgressEvent.builder()
+ .resourceModels(CUSTOM_PLUGIN_MODELS)
+ .nextToken(NEXT_TOKEN_2)
+ .status(OperationStatus.SUCCESS)
+ .build();
+
+ private static final ListCustomPluginsResponse LIST_CUSTOM_PLUGINS_RESPONSE =
+ ListCustomPluginsResponse.builder()
+ .nextToken(TestData.NEXT_TOKEN_2)
+ .customPlugins(TestData.CUSTOM_PLUGIN_SUMMARIES)
+ .build();
+
+ private static final CustomPluginSummary buildCustomPluginSummary(
+ final String customPluginName, final String customPluginArn) {
+ return CustomPluginSummary.builder()
+ .customPluginArn(customPluginArn)
+ .name(customPluginName)
+ .build();
+ }
+
+ private static ResourceModel buildResourceModel(
+ final String customPluginName, final String customPluginArn) {
+ return ResourceModel.builder()
+ .customPluginArn(customPluginArn)
+ .name(customPluginName)
+ .build();
+ }
+ }
+}
diff --git a/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/ReadHandlerTest.java b/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/ReadHandlerTest.java
new file mode 100644
index 0000000..d57a742
--- /dev/null
+++ b/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/ReadHandlerTest.java
@@ -0,0 +1,218 @@
+package software.amazon.kafkaconnect.customplugin;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.util.HashMap;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import software.amazon.awssdk.services.kafkaconnect.KafkaConnectClient;
+import software.amazon.awssdk.services.kafkaconnect.model.DescribeCustomPluginRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.DescribeCustomPluginResponse;
+import software.amazon.awssdk.services.kafkaconnect.model.ListTagsForResourceRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.ListTagsForResourceResponse;
+import software.amazon.awssdk.services.kafkaconnect.model.NotFoundException;
+import software.amazon.cloudformation.exceptions.CfnNotFoundException;
+import software.amazon.cloudformation.proxy.AmazonWebServicesClientProxy;
+import software.amazon.cloudformation.proxy.OperationStatus;
+import software.amazon.cloudformation.proxy.ProgressEvent;
+import software.amazon.cloudformation.proxy.ProxyClient;
+import software.amazon.cloudformation.proxy.ResourceHandlerRequest;
+
+@ExtendWith(MockitoExtension.class)
+public class ReadHandlerTest extends AbstractTestBase {
+
+ @Mock
+ private KafkaConnectClient kafkaConnectClient;
+
+ @Mock
+ private ExceptionTranslator exceptionTranslator;
+
+ @Mock
+ private Translator translator;
+
+ private AmazonWebServicesClientProxy proxy;
+
+ private ProxyClient proxyClient;
+
+ private ReadHandler handler;
+
+ @BeforeEach
+ public void setup() {
+ proxy =
+ new AmazonWebServicesClientProxy(
+ logger, MOCK_CREDENTIALS, () -> Duration.ofSeconds(600).toMillis());
+ proxyClient = proxyStub(proxy, kafkaConnectClient);
+ handler = new ReadHandler(exceptionTranslator, translator);
+ }
+
+ @AfterEach
+ public void tear_down() {
+ verify(kafkaConnectClient, atLeastOnce()).serviceName();
+ verifyNoMoreInteractions(kafkaConnectClient, exceptionTranslator, translator);
+ }
+
+ @Test
+ public void handleRequest_returnsCustomPluginWhenResourceModelIsPassedAndNonEmptyTags_success() {
+ when(translator.translateToReadRequest(TestData.RESOURCE_MODEL))
+ .thenReturn(TestData.DESCRIBE_CUSTOM_PLUGIN_REQUEST);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.DESCRIBE_CUSTOM_PLUGIN_REQUEST, kafkaConnectClient::describeCustomPlugin))
+ .thenReturn(TestData.DESCRIBE_CUSTOM_PLUGIN_RESPONSE);
+ when(translator.translateFromReadResponse(TestData.DESCRIBE_CUSTOM_PLUGIN_RESPONSE))
+ .thenReturn(TestData.RESPONSE_RESOURCE_MODEL);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.LIST_TAGS_FOR_RESOURCE_REQUEST, kafkaConnectClient::listTagsForResource))
+ .thenReturn(TestData.LIST_TAGS_FOR_RESOURCE_RESPONSE);
+
+ final ProgressEvent response =
+ handler.handleRequest(
+ proxy, TestData.RESOURCE_HANDLER_REQUEST, new CallbackContext(), proxyClient, logger);
+
+ assertThat(response).isNotNull();
+ assertThat(response.getStatus()).isEqualTo(OperationStatus.SUCCESS);
+ assertThat(response.getCallbackContext()).isNull();
+ assertThat(response.getCallbackDelaySeconds()).isEqualTo(0);
+ assertThat(response.getResourceModel().getCustomPluginArn())
+ .isEqualTo(TestData.CUSTOM_PLUGIN_ARN);
+ assertThat(response.getResourceModel().getName()).isEqualTo(TestData.CUSTOM_PLUGIN_NAME);
+ assertThat(response.getResourceModel().getTags()).isEqualTo(TagHelper.convertToList(TAGS));
+ assertThat(response.getResourceModels()).isNull();
+ assertThat(response.getMessage()).isNull();
+ assertThat(response.getErrorCode()).isNull();
+ }
+
+ @Test
+ public void handleRequest_returnsCustomPluginWhenResourceModelIsPassedAndEmptyTags_success() {
+ when(translator.translateToReadRequest(TestData.RESOURCE_MODEL))
+ .thenReturn(TestData.DESCRIBE_CUSTOM_PLUGIN_REQUEST);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.DESCRIBE_CUSTOM_PLUGIN_REQUEST, kafkaConnectClient::describeCustomPlugin))
+ .thenReturn(TestData.DESCRIBE_CUSTOM_PLUGIN_RESPONSE);
+ when(translator.translateFromReadResponse(TestData.DESCRIBE_CUSTOM_PLUGIN_RESPONSE))
+ .thenReturn(TestData.RESPONSE_RESOURCE_MODEL_EMPTY_TAGS);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.LIST_TAGS_FOR_RESOURCE_REQUEST, kafkaConnectClient::listTagsForResource))
+ .thenReturn(TestData.LIST_TAGS_FOR_RESOURCE_RESPONSE_EMPTY_TAGS);
+
+ final ProgressEvent response =
+ handler.handleRequest(
+ proxy, TestData.RESOURCE_HANDLER_REQUEST, new CallbackContext(), proxyClient, logger);
+
+ assertThat(response).isNotNull();
+ assertThat(response.getStatus()).isEqualTo(OperationStatus.SUCCESS);
+ assertThat(response.getCallbackContext()).isNull();
+ assertThat(response.getCallbackDelaySeconds()).isEqualTo(0);
+ assertThat(response.getResourceModel().getCustomPluginArn())
+ .isEqualTo(TestData.CUSTOM_PLUGIN_ARN);
+ assertThat(response.getResourceModel().getName()).isEqualTo(TestData.CUSTOM_PLUGIN_NAME);
+ assertThat(response.getResourceModel().getTags()).isNullOrEmpty();
+ assertThat(response.getResourceModels()).isNull();
+ assertThat(response.getMessage()).isNull();
+ assertThat(response.getErrorCode()).isNull();
+ }
+
+ @Test
+ public void handleRequest_throwsCfnNotFoundException_whenDescribeCustomPluginFails() {
+ final NotFoundException serviceException = NotFoundException.builder().build();
+ final CfnNotFoundException cfnException = new CfnNotFoundException(serviceException);
+ when(translator.translateToReadRequest(TestData.RESOURCE_MODEL))
+ .thenReturn(TestData.DESCRIBE_CUSTOM_PLUGIN_REQUEST);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.DESCRIBE_CUSTOM_PLUGIN_REQUEST, kafkaConnectClient::describeCustomPlugin))
+ .thenThrow(serviceException);
+ when(exceptionTranslator.translateToCfnException(serviceException, TestData.CUSTOM_PLUGIN_ARN))
+ .thenReturn(cfnException);
+
+ final CfnNotFoundException exception =
+ assertThrows(
+ CfnNotFoundException.class,
+ () -> handler.handleRequest(
+ proxy,
+ TestData.RESOURCE_HANDLER_REQUEST,
+ new CallbackContext(),
+ proxyClient,
+ logger));
+
+ assertThat(exception).isEqualTo(cfnException);
+ }
+
+ @Test
+ public void handleRequest_throwsCfnNotFoundException_whenListTagsForResourceFails() {
+ final NotFoundException serviceException = NotFoundException.builder().build();
+ final CfnNotFoundException cfnException = new CfnNotFoundException(serviceException);
+ when(translator.translateToReadRequest(TestData.RESOURCE_MODEL))
+ .thenReturn(TestData.DESCRIBE_CUSTOM_PLUGIN_REQUEST);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.DESCRIBE_CUSTOM_PLUGIN_REQUEST, kafkaConnectClient::describeCustomPlugin))
+ .thenReturn(TestData.DESCRIBE_CUSTOM_PLUGIN_RESPONSE);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.LIST_TAGS_FOR_RESOURCE_REQUEST, kafkaConnectClient::listTagsForResource))
+ .thenThrow(serviceException);
+ when(exceptionTranslator.translateToCfnException(serviceException, TestData.CUSTOM_PLUGIN_ARN))
+ .thenReturn(cfnException);
+
+ final CfnNotFoundException exception =
+ assertThrows(
+ CfnNotFoundException.class,
+ () -> handler.handleRequest(
+ proxy,
+ TestData.RESOURCE_HANDLER_REQUEST,
+ new CallbackContext(),
+ proxyClient,
+ logger));
+
+ assertThat(exception).isEqualTo(cfnException);
+ }
+
+ private static class TestData {
+ private static final String CUSTOM_PLUGIN_ARN =
+ "arn:aws:kafkaconnect:us-east-1:123456789:custom-plugin/unit-test-custom-plugin";
+ private static final String CUSTOM_PLUGIN_NAME = "unit-test-custom-plugin";
+
+ private static final ResourceModel RESPONSE_RESOURCE_MODEL =
+ ResourceModel.builder()
+ .customPluginArn(CUSTOM_PLUGIN_ARN)
+ .name(CUSTOM_PLUGIN_NAME)
+ .tags(TagHelper.convertToList(TAGS))
+ .build();
+
+ private static final ResourceModel RESPONSE_RESOURCE_MODEL_EMPTY_TAGS =
+ ResourceModel.builder().customPluginArn(CUSTOM_PLUGIN_ARN).name(CUSTOM_PLUGIN_NAME).build();
+
+ private static final ResourceModel RESOURCE_MODEL =
+ ResourceModel.builder().customPluginArn(CUSTOM_PLUGIN_ARN).build();
+
+ private static final ResourceHandlerRequest RESOURCE_HANDLER_REQUEST =
+ ResourceHandlerRequest.builder()
+ .desiredResourceState(RESOURCE_MODEL)
+ .build();
+
+ private static final DescribeCustomPluginRequest DESCRIBE_CUSTOM_PLUGIN_REQUEST =
+ DescribeCustomPluginRequest.builder().customPluginArn(CUSTOM_PLUGIN_ARN).build();
+
+ private static final DescribeCustomPluginResponse DESCRIBE_CUSTOM_PLUGIN_RESPONSE =
+ DescribeCustomPluginResponse.builder()
+ .customPluginArn(CUSTOM_PLUGIN_ARN)
+ .name(CUSTOM_PLUGIN_NAME)
+ .build();
+
+ private static final ListTagsForResourceRequest LIST_TAGS_FOR_RESOURCE_REQUEST =
+ ListTagsForResourceRequest.builder().resourceArn(CUSTOM_PLUGIN_ARN).build();
+
+ private static final ListTagsForResourceResponse LIST_TAGS_FOR_RESOURCE_RESPONSE =
+ ListTagsForResourceResponse.builder().tags(TAGS).build();
+
+ private static final ListTagsForResourceResponse LIST_TAGS_FOR_RESOURCE_RESPONSE_EMPTY_TAGS =
+ ListTagsForResourceResponse.builder().tags(new HashMap<>()).build();
+ }
+}
diff --git a/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/TranslatorTest.java b/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/TranslatorTest.java
new file mode 100644
index 0000000..ee7cc6e
--- /dev/null
+++ b/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/TranslatorTest.java
@@ -0,0 +1,329 @@
+package software.amazon.kafkaconnect.customplugin;
+
+import static java.util.Arrays.asList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import software.amazon.awssdk.services.kafkaconnect.model.CreateCustomPluginRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.CustomPluginContentType;
+import software.amazon.awssdk.services.kafkaconnect.model.CustomPluginLocationDescription;
+import software.amazon.awssdk.services.kafkaconnect.model.CustomPluginRevisionSummary;
+import software.amazon.awssdk.services.kafkaconnect.model.CustomPluginState;
+import software.amazon.awssdk.services.kafkaconnect.model.CustomPluginSummary;
+import software.amazon.awssdk.services.kafkaconnect.model.DeleteCustomPluginRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.DescribeCustomPluginRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.DescribeCustomPluginResponse;
+import software.amazon.awssdk.services.kafkaconnect.model.ListCustomPluginsRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.ListCustomPluginsResponse;
+import software.amazon.awssdk.services.kafkaconnect.model.S3LocationDescription;
+import software.amazon.awssdk.services.kafkaconnect.model.StateDescription;
+import software.amazon.awssdk.services.kafkaconnect.model.TagResourceRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.UntagResourceRequest;
+
+@ExtendWith(MockitoExtension.class)
+public class TranslatorTest extends AbstractTestBase {
+
+ private Translator translator = new Translator();
+
+ @Test
+ public void translateToCreateRequest_success() {
+ assertThat(translator.translateToCreateRequest(TestData.CREATE_REQUEST_RESOURCE_MODEL,
+ TagHelper.convertToMap(TestData.CREATE_REQUEST_RESOURCE_MODEL.getTags())))
+ .isEqualTo(TestData.CREATE_CUSTOM_PLUGIN_REQUEST);
+ }
+
+ @Test
+ public void translateToReadRequest_success() {
+ assertThat(translator.translateToReadRequest(TestData.READ_REQUEST_RESOURCE_MODEL))
+ .isEqualTo(TestData.DESCRIBE_CUSTOM_PLUGIN_REQUEST);
+ }
+
+ @Test
+ public void translateFromReadResponse_fullCustomPlugin_success() {
+ assertThat(translator.translateFromReadResponse(TestData.FULL_DESCRIBE_CUSTOM_PLUGIN_RESPONSE))
+ .isEqualTo(TestData.FULL_RESOURCE_DESCRIBE_MODEL);
+ }
+
+ @Test
+ public void translateToListRequest_success() {
+ assertThat(translator.translateToListRequest(TestData.NEXT_TOKEN))
+ .isEqualTo(TestData.LIST_CUSTOM_PLUGINS_REQUEST);
+ }
+
+ @Test
+ public void translateFromListResponse_success() {
+ assertThat(translator.translateFromListResponse(TestData.LIST_CUSTOM_PLUGINS_RESPONSE))
+ .isEqualTo(TestData.LIST_CUSTOM_PLUGIN_MODELS);
+ }
+
+ @Test
+ public void translateToTagResourceRequest_success() {
+ final ResourceModel model = TestData.buildBaseModel(TestData.CUSTOM_PLUGIN_ARN);
+ assertThat(Translator.translateToTagRequest(model, TAGS))
+ .isEqualTo(TestData.TAG_RESOURCE_REQUEST);
+ }
+
+ @Test
+ public void translateToUntagResourceRequest_success() {
+ final Set tagsToUntag = new HashSet<>();
+ tagsToUntag.add(TestData.TAG_KEY_TO_REMOVE);
+ final ResourceModel model = TestData.buildBaseModel(TestData.CUSTOM_PLUGIN_ARN);
+ assertThat(Translator.translateToUntagRequest(model, tagsToUntag))
+ .isEqualTo(TestData.UNTAG_RESOURCE_REQUEST);
+ }
+
+ @Test
+ public void translateToDeleteRequest_success() {
+ assertThat(translator.translateToDeleteRequest(TestData.DELETE_REQUEST_RESOURCE_MODEL))
+ .isEqualTo(TestData.DELETE_CUSTOM_PLUGIN_REQUEST);
+ }
+
+ @Test
+ public void sdkCustomPluginFileDescriptionToResourceCustomPluginFileDescription_success() {
+ assertThat(
+ Translator.sdkCustomPluginFileDescriptionToResourceCustomPluginFileDescription(
+ TestData.FULL_DESCRIBE_CUSTOM_PLUGIN_RESPONSE.latestRevision().fileDescription()))
+ .isEqualTo(TestData.FULL_RESOURCE_DESCRIBE_MODEL.getFileDescription());
+ assertThat(
+ Translator.sdkCustomPluginFileDescriptionToResourceCustomPluginFileDescription(null))
+ .isEqualTo(null);
+ }
+
+ @Test
+ public void sdkS3LocationDescriptionToResourceS3Location_success() {
+ assertThat(
+ Translator.sdkS3LocationDescriptionToResourceS3Location(
+ TestData.FULL_DESCRIBE_CUSTOM_PLUGIN_RESPONSE.latestRevision().location().s3Location()))
+ .isEqualTo(TestData.FULL_RESOURCE_DESCRIBE_MODEL.getLocation().getS3Location());
+ assertThat(
+ Translator.sdkS3LocationDescriptionToResourceS3Location(null))
+ .isEqualTo(null);
+ }
+
+ @Test
+ public void sdkCustomPluginLocationDescriptionToResourceCustomPluginLocation_success() {
+ assertThat(
+ Translator.sdkCustomPluginLocationDescriptionToResourceCustomPluginLocation(
+ TestData.FULL_DESCRIBE_CUSTOM_PLUGIN_RESPONSE.latestRevision().location()))
+ .isEqualTo(TestData.FULL_RESOURCE_DESCRIBE_MODEL.getLocation());
+ assertThat(
+ Translator.sdkCustomPluginLocationDescriptionToResourceCustomPluginLocation(null))
+ .isEqualTo(null);
+ }
+
+ @Test
+ public void resourceS3LocationToSdkS3Location_success() {
+ assertThat(
+ Translator.resourceS3LocationToSdkS3Location(
+ TestData.CREATE_REQUEST_RESOURCE_MODEL.getLocation().getS3Location()))
+ .isEqualTo(TestData.CREATE_CUSTOM_PLUGIN_REQUEST.location().s3Location());
+ assertThat(
+ Translator.resourceS3LocationToSdkS3Location(null))
+ .isEqualTo(null);
+ }
+
+ @Test
+ public void resourceCustomPluginLocationToSdkCustomPluginLocation_success() {
+ assertThat(
+ Translator.resourceCustomPluginLocationToSdkCustomPluginLocation(
+ TestData.CREATE_REQUEST_RESOURCE_MODEL.getLocation()))
+ .isEqualTo(TestData.CREATE_CUSTOM_PLUGIN_REQUEST.location());
+ assertThat(
+ Translator.resourceCustomPluginLocationToSdkCustomPluginLocation(null))
+ .isEqualTo(null);
+ }
+
+ private static class TestData {
+ private static final String CUSTOM_PLUGIN_ARN =
+ "arn:aws:kafkaconnect:us-east-1:123456789:custom-plugin/unit-test-custom-plugin";
+ private static final String CUSTOM_PLUGIN_NAME = "unit-test-custom-plugin";
+ private static final String CUSTOM_PLUGIN_ARN_2 =
+ "arn:aws:kafkaconnect:us-east-1:123456789:custom-plugin/unit-test-custom-plugin-2";
+ private static final String CUSTOM_PLUGIN_NAME_2 = "unit-test-custom-plugin-2";
+ private static final String CUSTOM_PLUGIN_DESCRIPTION =
+ "Unit testing custom plugin description";
+ private static final long CUSTOM_PLUGIN_REVISION = 1L;
+ private static final Instant CUSTOM_PLUGIN_CREATION_TIME =
+ OffsetDateTime.parse("2021-03-04T14:03:40.818Z").toInstant();
+ private static final String CUSTOM_PLUGIN_STATE_CODE = "custom-plugin-state-code";
+ private static final String CUSTOM_PLUGIN_STATE_DESCRIPTION = "custom-plugin-state-description";
+ private static final String CUSTOM_PLUGIN_FILE_MD5 = "abcd1234";
+ private static final long CUSTOM_PLUGIN_FILE_SIZE = 123456L;
+ private static final String CUSTOM_PLUGIN_S3_BUCKET_ARN = "bucket-arn";
+ private static final String CUSTOM_PLUGIN_S3_FILE_KEY = "file-key";
+ private static final String CUSTOM_PLUGIN_S3_OBJECT_VERSION = "object-version";
+ private static final String NEXT_TOKEN = "next-token";
+ private static final String TAG_KEY_TO_REMOVE = "tag-key-to-remove";
+
+ private static final ResourceModel READ_REQUEST_RESOURCE_MODEL =
+ ResourceModel.builder().customPluginArn(CUSTOM_PLUGIN_ARN).build();
+
+ private static final DescribeCustomPluginRequest DESCRIBE_CUSTOM_PLUGIN_REQUEST =
+ DescribeCustomPluginRequest.builder().customPluginArn(CUSTOM_PLUGIN_ARN).build();
+
+ private static final DescribeCustomPluginResponse FULL_DESCRIBE_CUSTOM_PLUGIN_RESPONSE =
+ DescribeCustomPluginResponse.builder()
+ .creationTime(CUSTOM_PLUGIN_CREATION_TIME)
+ .customPluginArn(CUSTOM_PLUGIN_ARN)
+ .customPluginState(CustomPluginState.ACTIVE)
+ .name(CUSTOM_PLUGIN_NAME)
+ .description(CUSTOM_PLUGIN_DESCRIPTION)
+ .stateDescription(
+ StateDescription.builder()
+ .code(CUSTOM_PLUGIN_STATE_CODE)
+ .message(CUSTOM_PLUGIN_STATE_DESCRIPTION)
+ .build())
+ .latestRevision(
+ CustomPluginRevisionSummary.builder()
+ .contentType(CustomPluginContentType.ZIP)
+ .creationTime(CUSTOM_PLUGIN_CREATION_TIME)
+ .description(CUSTOM_PLUGIN_DESCRIPTION)
+ .fileDescription(
+ software.amazon.awssdk.services.kafkaconnect.model.CustomPluginFileDescription.builder()
+ .fileMd5(CUSTOM_PLUGIN_FILE_MD5)
+ .fileSize(CUSTOM_PLUGIN_FILE_SIZE)
+ .build())
+ .location(
+ CustomPluginLocationDescription.builder()
+ .s3Location(
+ S3LocationDescription.builder()
+ .bucketArn(CUSTOM_PLUGIN_S3_BUCKET_ARN)
+ .fileKey(CUSTOM_PLUGIN_S3_FILE_KEY)
+ .objectVersion(CUSTOM_PLUGIN_S3_OBJECT_VERSION)
+ .build())
+ .build())
+ .revision(CUSTOM_PLUGIN_REVISION)
+ .build())
+ .build();
+
+ private static final ResourceModel FULL_RESOURCE_DESCRIBE_MODEL =
+ ResourceModel.builder()
+ .customPluginArn(CUSTOM_PLUGIN_ARN)
+ .description(CUSTOM_PLUGIN_DESCRIPTION)
+ .name(CUSTOM_PLUGIN_NAME)
+ .fileDescription(
+ CustomPluginFileDescription.builder()
+ .fileMd5(CUSTOM_PLUGIN_FILE_MD5)
+ .fileSize(CUSTOM_PLUGIN_FILE_SIZE)
+ .build())
+ .location(
+ CustomPluginLocation.builder()
+ .s3Location(
+ S3Location.builder()
+ .bucketArn(CUSTOM_PLUGIN_S3_BUCKET_ARN)
+ .fileKey(CUSTOM_PLUGIN_S3_FILE_KEY)
+ .objectVersion(CUSTOM_PLUGIN_S3_OBJECT_VERSION)
+ .build())
+ .build())
+ .contentType(CustomPluginContentType.ZIP.toString())
+ .revision(CUSTOM_PLUGIN_REVISION)
+ .build();
+
+ private static final ResourceModel buildBaseModel(final String customPluginArn) {
+ return ResourceModel.builder().customPluginArn(customPluginArn).build();
+ }
+
+ private static final CustomPluginSummary buildCustomPluginSummary(
+ final String customPluginName, final String customPluginArn) {
+ return CustomPluginSummary.builder()
+ .creationTime(CUSTOM_PLUGIN_CREATION_TIME)
+ .customPluginArn(customPluginArn)
+ .customPluginState(CustomPluginState.ACTIVE)
+ .description(CUSTOM_PLUGIN_DESCRIPTION)
+ .name(customPluginName)
+ .latestRevision(
+ CustomPluginRevisionSummary.builder()
+ .contentType(CustomPluginContentType.ZIP)
+ .creationTime(CUSTOM_PLUGIN_CREATION_TIME)
+ .description(CUSTOM_PLUGIN_DESCRIPTION)
+ .location(
+ CustomPluginLocationDescription.builder()
+ .s3Location(
+ S3LocationDescription.builder()
+ .bucketArn(CUSTOM_PLUGIN_S3_BUCKET_ARN)
+ .fileKey(CUSTOM_PLUGIN_S3_FILE_KEY)
+ .objectVersion(CUSTOM_PLUGIN_S3_OBJECT_VERSION)
+ .build())
+ .build())
+ .fileDescription(
+ software.amazon.awssdk.services.kafkaconnect.model.CustomPluginFileDescription
+ .builder()
+ .fileMd5(CUSTOM_PLUGIN_FILE_MD5)
+ .fileSize(CUSTOM_PLUGIN_FILE_SIZE)
+ .build())
+ .revision(CUSTOM_PLUGIN_REVISION)
+ .build())
+ .build();
+ }
+
+ private static final List LIST_CUSTOM_PLUGIN_MODELS =
+ asList(buildBaseModel(CUSTOM_PLUGIN_ARN), buildBaseModel(CUSTOM_PLUGIN_ARN_2));
+
+ private static final ListCustomPluginsRequest LIST_CUSTOM_PLUGINS_REQUEST =
+ ListCustomPluginsRequest.builder().nextToken(NEXT_TOKEN).build();
+
+ private static final ListCustomPluginsResponse LIST_CUSTOM_PLUGINS_RESPONSE =
+ ListCustomPluginsResponse.builder()
+ .customPlugins(
+ asList(
+ buildCustomPluginSummary(CUSTOM_PLUGIN_NAME, CUSTOM_PLUGIN_ARN),
+ buildCustomPluginSummary(CUSTOM_PLUGIN_NAME_2, CUSTOM_PLUGIN_ARN_2)))
+ .build();
+
+ private static final UntagResourceRequest UNTAG_RESOURCE_REQUEST =
+ UntagResourceRequest.builder()
+ .resourceArn(CUSTOM_PLUGIN_ARN)
+ .tagKeys(TAG_KEY_TO_REMOVE)
+ .build();
+
+ private static final TagResourceRequest TAG_RESOURCE_REQUEST =
+ TagResourceRequest.builder().resourceArn(CUSTOM_PLUGIN_ARN).tags(TAGS).build();
+
+ private static final ResourceModel DELETE_REQUEST_RESOURCE_MODEL =
+ ResourceModel.builder().customPluginArn(CUSTOM_PLUGIN_ARN).build();
+
+ private static final DeleteCustomPluginRequest DELETE_CUSTOM_PLUGIN_REQUEST =
+ DeleteCustomPluginRequest.builder().customPluginArn(CUSTOM_PLUGIN_ARN).build();
+
+ private static final ResourceModel CREATE_REQUEST_RESOURCE_MODEL =
+ ResourceModel.builder()
+ .name(CUSTOM_PLUGIN_NAME)
+ .description(CUSTOM_PLUGIN_DESCRIPTION)
+ .contentType(CustomPluginContentType.ZIP.toString())
+ .location(
+ CustomPluginLocation.builder()
+ .s3Location(
+ S3Location.builder()
+ .bucketArn(CUSTOM_PLUGIN_S3_BUCKET_ARN)
+ .fileKey(CUSTOM_PLUGIN_S3_FILE_KEY)
+ .objectVersion(CUSTOM_PLUGIN_S3_OBJECT_VERSION)
+ .build())
+ .build())
+ .tags(TagHelper.convertToList(TAGS))
+ .build();
+
+ private static final CreateCustomPluginRequest CREATE_CUSTOM_PLUGIN_REQUEST =
+ CreateCustomPluginRequest.builder()
+ .contentType(CustomPluginContentType.ZIP)
+ .description(CUSTOM_PLUGIN_DESCRIPTION)
+ .location(
+ software.amazon.awssdk.services.kafkaconnect.model.CustomPluginLocation.builder()
+ .s3Location(
+ software.amazon.awssdk.services.kafkaconnect.model.S3Location.builder()
+ .bucketArn(CUSTOM_PLUGIN_S3_BUCKET_ARN)
+ .fileKey(CUSTOM_PLUGIN_S3_FILE_KEY)
+ .objectVersion(CUSTOM_PLUGIN_S3_OBJECT_VERSION)
+ .build())
+ .build())
+ .name(CUSTOM_PLUGIN_NAME)
+ .tags(TAGS)
+ .build();
+ }
+}
diff --git a/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/UpdateHandlerTest.java b/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/UpdateHandlerTest.java
new file mode 100644
index 0000000..e328218
--- /dev/null
+++ b/aws-kafkaconnect-customplugin/src/test/java/software/amazon/kafkaconnect/customplugin/UpdateHandlerTest.java
@@ -0,0 +1,594 @@
+package software.amazon.kafkaconnect.customplugin;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.kafkaconnect.KafkaConnectClient;
+import software.amazon.awssdk.services.kafkaconnect.model.CustomPluginLocationDescription;
+import software.amazon.awssdk.services.kafkaconnect.model.CustomPluginRevisionSummary;
+import software.amazon.awssdk.services.kafkaconnect.model.DescribeCustomPluginRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.DescribeCustomPluginResponse;
+import software.amazon.awssdk.services.kafkaconnect.model.ListTagsForResourceRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.ListTagsForResourceResponse;
+import software.amazon.awssdk.services.kafkaconnect.model.S3LocationDescription;
+import software.amazon.awssdk.services.kafkaconnect.model.TagResourceRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.TagResourceResponse;
+import software.amazon.awssdk.services.kafkaconnect.model.UntagResourceRequest;
+import software.amazon.awssdk.services.kafkaconnect.model.UntagResourceResponse;
+import software.amazon.cloudformation.exceptions.CfnGeneralServiceException;
+import software.amazon.cloudformation.exceptions.CfnNotFoundException;
+import software.amazon.cloudformation.exceptions.CfnNotUpdatableException;
+import software.amazon.cloudformation.proxy.AmazonWebServicesClientProxy;
+import software.amazon.cloudformation.proxy.OperationStatus;
+import software.amazon.cloudformation.proxy.ProgressEvent;
+import software.amazon.cloudformation.proxy.ProxyClient;
+import software.amazon.cloudformation.proxy.ResourceHandlerRequest;
+import software.amazon.cloudformation.proxy.ResourceHandlerRequest.ResourceHandlerRequestBuilder;
+
+@ExtendWith(MockitoExtension.class)
+public class UpdateHandlerTest extends AbstractTestBase {
+ @Mock
+ private AmazonWebServicesClientProxy proxy;
+
+ @Mock
+ private ProxyClient proxyClient;
+
+ @Mock
+ KafkaConnectClient kafkaConnectClient;
+
+ @Mock
+ private ExceptionTranslator exceptionTranslator;
+
+ @Mock
+ private Translator translator;
+
+ private ReadHandler readHandler;
+
+ private UpdateHandler handler;
+
+ @BeforeEach
+ public void setup() {
+ proxy = new AmazonWebServicesClientProxy(
+ logger, MOCK_CREDENTIALS, () -> Duration.ofSeconds(600).toMillis());
+ proxyClient = proxyStub(proxy, kafkaConnectClient);
+ readHandler = new ReadHandler(exceptionTranslator, translator);
+ handler = new UpdateHandler(exceptionTranslator, translator, readHandler);
+ }
+
+ @AfterEach
+ public void tear_down() {
+ verify(kafkaConnectClient, atLeastOnce()).serviceName();
+ verifyNoMoreInteractions(kafkaConnectClient, exceptionTranslator, translator);
+ }
+
+ @Test
+ public void test_handleRequest_updateTags_success_onAddingAndRemovingTags() {
+ final ResourceModel previousModel = TestData.RESOURCE_MODEL_1.toBuilder().build();
+ final ResourceModel desiredModel = TestData.RESOURCE_MODEL_2.toBuilder().build();
+ final DescribeCustomPluginRequest describeCustomPluginRequest = TestData.DESCRIBE_CUSTOM_PLUGIN_REQUEST_1;
+ final DescribeCustomPluginResponse describeCustomPluginResponse = TestData.DESCRIBE_CUSTOM_PLUGIN_RESPONSE_1
+ .toBuilder().build();
+ when(translator.translateToReadRequest(desiredModel)).thenReturn(describeCustomPluginRequest);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ describeCustomPluginRequest, kafkaConnectClient::describeCustomPlugin))
+ .thenReturn(describeCustomPluginResponse);
+ when(proxyClient.client().untagResource(any(UntagResourceRequest.class)))
+ .thenReturn(UntagResourceResponse.builder().build());
+ when(proxyClient.client().tagResource(any(TagResourceRequest.class)))
+ .thenReturn(TagResourceResponse.builder().build());
+ when(translator.translateFromReadResponse(describeCustomPluginResponse))
+ .thenReturn(desiredModel);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.LIST_TAGS_FOR_RESOURCE_REQUEST_1, kafkaConnectClient::listTagsForResource))
+ .thenReturn(TestData.LIST_TAGS_FOR_RESOURCE_RESPONSE_2);
+
+ final ResourceHandlerRequest request =
+ TestData.createResourceHandlerRequest(desiredModel, previousModel);
+ final ProgressEvent response = handler.handleRequest(proxy, request,
+ new CallbackContext(), proxyClient, logger);
+
+ final ProgressEvent expected = ProgressEvent
+ .builder()
+ .resourceModel(desiredModel)
+ .status(OperationStatus.SUCCESS)
+ .build();
+
+ assertThat(response).isEqualTo(expected);
+
+ verify(proxyClient.client(), times(2))
+ .describeCustomPlugin(any(DescribeCustomPluginRequest.class));
+ verify(proxyClient.client(), times(1))
+ .listTagsForResource(any(ListTagsForResourceRequest.class));
+ verify(proxyClient.client(), times(1))
+ .untagResource(any(UntagResourceRequest.class));
+ verify(proxyClient.client(), times(1))
+ .tagResource(any(TagResourceRequest.class));
+ }
+
+ @Test
+ public void test_handleRequest_updateTags_success_onAddingTags() {
+ final ResourceModel previousModel = TestData.RESOURCE_MODEL_1.toBuilder().build();
+ final ResourceModel desiredModel = TestData.RESOURCE_MODEL_3.toBuilder().build();
+ final DescribeCustomPluginRequest describeCustomPluginRequest = TestData.DESCRIBE_CUSTOM_PLUGIN_REQUEST_1;
+ final DescribeCustomPluginResponse describeCustomPluginResponse = TestData.DESCRIBE_CUSTOM_PLUGIN_RESPONSE_1
+ .toBuilder().build();
+ when(translator.translateToReadRequest(desiredModel)).thenReturn(describeCustomPluginRequest);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ describeCustomPluginRequest, kafkaConnectClient::describeCustomPlugin))
+ .thenReturn(describeCustomPluginResponse);
+ when(proxyClient.client().tagResource(any(TagResourceRequest.class)))
+ .thenReturn(TagResourceResponse.builder().build());
+ when(translator.translateFromReadResponse(describeCustomPluginResponse))
+ .thenReturn(desiredModel);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.LIST_TAGS_FOR_RESOURCE_REQUEST_1, kafkaConnectClient::listTagsForResource))
+ .thenReturn(TestData.LIST_TAGS_FOR_RESOURCE_RESPONSE_3);
+
+ final ResourceHandlerRequest request =
+ TestData.createResourceHandlerRequest(desiredModel, previousModel);
+ final ProgressEvent response = handler.handleRequest(proxy, request,
+ new CallbackContext(), proxyClient, logger);
+
+ final ProgressEvent expected = ProgressEvent
+ .builder()
+ .resourceModel(desiredModel)
+ .status(OperationStatus.SUCCESS)
+ .build();
+
+ assertThat(response).isEqualTo(expected);
+
+ verify(proxyClient.client(), times(2))
+ .describeCustomPlugin(any(DescribeCustomPluginRequest.class));
+ verify(proxyClient.client(), times(1))
+ .listTagsForResource(any(ListTagsForResourceRequest.class));
+ verify(proxyClient.client(), times(0))
+ .untagResource(any(UntagResourceRequest.class));
+ verify(proxyClient.client(), times(1))
+ .tagResource(any(TagResourceRequest.class));
+ }
+
+ @Test
+ public void test_handleRequest_updateTags_success_onRemovingTags() {
+ final ResourceModel previousModel = TestData.RESOURCE_MODEL_1.toBuilder().build();
+ final ResourceModel desiredModel = TestData.RESOURCE_MODEL_4.toBuilder().build();
+ final DescribeCustomPluginRequest describeCustomPluginRequest = TestData.DESCRIBE_CUSTOM_PLUGIN_REQUEST_1;
+ final DescribeCustomPluginResponse describeCustomPluginResponse = TestData.DESCRIBE_CUSTOM_PLUGIN_RESPONSE_1
+ .toBuilder().build();
+ when(translator.translateToReadRequest(desiredModel)).thenReturn(describeCustomPluginRequest);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ describeCustomPluginRequest, kafkaConnectClient::describeCustomPlugin))
+ .thenReturn(describeCustomPluginResponse);
+ when(proxyClient.client().untagResource(any(UntagResourceRequest.class)))
+ .thenReturn(UntagResourceResponse.builder().build());
+ when(translator.translateFromReadResponse(describeCustomPluginResponse))
+ .thenReturn(desiredModel);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.LIST_TAGS_FOR_RESOURCE_REQUEST_1, kafkaConnectClient::listTagsForResource))
+ .thenReturn(TestData.LIST_TAGS_FOR_RESOURCE_RESPONSE_4);
+
+ final ResourceHandlerRequest request =
+ TestData.createResourceHandlerRequest(desiredModel, previousModel);
+ final ProgressEvent response = handler.handleRequest(proxy, request,
+ new CallbackContext(), proxyClient, logger);
+
+ final ProgressEvent expected = ProgressEvent
+ .builder()
+ .resourceModel(desiredModel)
+ .status(OperationStatus.SUCCESS)
+ .build();
+
+ assertThat(response).isEqualTo(expected);
+
+ verify(proxyClient.client(), times(2))
+ .describeCustomPlugin(any(DescribeCustomPluginRequest.class));
+ verify(proxyClient.client(), times(1))
+ .listTagsForResource(any(ListTagsForResourceRequest.class));
+ verify(proxyClient.client(), times(1))
+ .untagResource(any(UntagResourceRequest.class));
+ verify(proxyClient.client(), times(0))
+ .tagResource(any(TagResourceRequest.class));
+ }
+
+ @Test
+ public void test_handleRequest_updateTags_success_onNoChangeToTags() {
+ final ResourceModel previousModel = TestData.RESOURCE_MODEL_1.toBuilder().build();
+ final ResourceModel desiredModel = TestData.RESOURCE_MODEL_1.toBuilder().build();
+ final DescribeCustomPluginRequest describeCustomPluginRequest = TestData.DESCRIBE_CUSTOM_PLUGIN_REQUEST_1;
+ final DescribeCustomPluginResponse describeCustomPluginResponse = TestData.DESCRIBE_CUSTOM_PLUGIN_RESPONSE_1
+ .toBuilder().build();
+ when(translator.translateToReadRequest(desiredModel)).thenReturn(describeCustomPluginRequest);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ describeCustomPluginRequest, kafkaConnectClient::describeCustomPlugin))
+ .thenReturn(describeCustomPluginResponse);
+ when(translator.translateFromReadResponse(describeCustomPluginResponse))
+ .thenReturn(desiredModel);
+ when(proxyClient.injectCredentialsAndInvokeV2(
+ TestData.LIST_TAGS_FOR_RESOURCE_REQUEST_1, kafkaConnectClient::listTagsForResource))
+ .thenReturn(TestData.LIST_TAGS_FOR_RESOURCE_RESPONSE_2);
+
+ final ResourceHandlerRequest request =
+ TestData.createResourceHandlerRequest(desiredModel, previousModel);
+ final ProgressEvent response = handler.handleRequest(proxy, request,
+ new CallbackContext(), proxyClient, logger);
+
+ final ProgressEvent expected = ProgressEvent
+ .builder()
+ .resourceModel(desiredModel)
+ .status(OperationStatus.SUCCESS)
+ .build();
+
+ assertThat(response).isEqualTo(expected);
+
+ verify(proxyClient.client(), times(2))
+ .describeCustomPlugin(any(DescribeCustomPluginRequest.class));
+ verify(proxyClient.client(), times(1))
+ .listTagsForResource(any(ListTagsForResourceRequest.class));
+ verify(proxyClient.client(), times(0))
+ .untagResource(any(UntagResourceRequest.class));
+ verify(proxyClient.client(), times(0))
+ .tagResource(any(TagResourceRequest.class));
+ }
+
+ @Test
+ public void test_handleRequest_failure_throwsException_onNameChange() {
+ final ResourceModel desiredModel = TestData.RESOURCE_MODEL_1.toBuilder().name(TestData.CUSTOM_PLUGIN_NAME_2)
+ .build();
+ helper_test_handleRequest_failure_throwsException_onNonUpdatableFieldChange(desiredModel);
+ }
+
+ @Test
+ public void test_handleRequest_failure_throwsException_onDescriptionChange() {
+ final ResourceModel desiredModel = TestData.RESOURCE_MODEL_1
+ .toBuilder()
+ .description(TestData.CUSTOM_PLUGIN_DESCRIPTION_2)
+ .build();
+ helper_test_handleRequest_failure_throwsException_onNonUpdatableFieldChange(desiredModel);
+ }
+
+ @Test
+ public void test_handleRequest_failure_throwsException_onContentTypeChange() {
+ final ResourceModel desiredModel = TestData.RESOURCE_MODEL_1
+ .toBuilder()
+ .contentType(TestData.CUSTOM_PLUGIN_CONTENT_TYPE_2)
+ .build();
+ helper_test_handleRequest_failure_throwsException_onNonUpdatableFieldChange(desiredModel);
+ }
+
+ @Test
+ public void test_handleRequest_failure_throwsException_onLocationChange() {
+ final ResourceModel desiredModel = TestData.RESOURCE_MODEL_1.toBuilder()
+ .location(TestData.CUSTOM_PLUGIN_LOCATION_2).build();
+ helper_test_handleRequest_failure_throwsException_onNonUpdatableFieldChange(desiredModel);
+ }
+
+ @Test
+ public void test_handleRequest_failure_throwsException_onCustomPluginArnChange() {
+ final ResourceModel desiredModel = TestData.RESOURCE_MODEL_1.toBuilder()
+ .customPluginArn(TestData.CUSTOM_PLUGIN_ARN_2).build();
+ helper_test_handleRequest_failure_throwsException_onNonUpdatableFieldChange(desiredModel);
+ }
+
+ @Test
+ public void test_handleRequest_failure_throwsException_onRevisionChange() {
+ final ResourceModel desiredModel = TestData.RESOURCE_MODEL_1.toBuilder()
+ .revision(TestData.CUSTOM_PLUGIN_REVISION_2).build();
+ helper_test_handleRequest_failure_throwsException_onNonUpdatableFieldChange(desiredModel);
+ }
+
+ @Test
+ public void test_handleRequest_failure_throwsException_onFileDescriptionChange() {
+ final ResourceModel desiredModel = TestData.RESOURCE_MODEL_1
+ .toBuilder()
+ .fileDescription(TestData.CUSTOM_PLUGIN_FILE_DESCRIPTION_2)
+ .build();
+ helper_test_handleRequest_failure_throwsException_onNonUpdatableFieldChange(desiredModel);
+ }
+
+ @Test
+ public void test_handleRequest_failure_throwsCfnException_ifCustomPluginArnIsNull() {
+ final ResourceModel desiredModel = TestData.RESOURCE_MODEL_1.toBuilder().build();
+ final ResourceHandlerRequest request = ResourceHandlerRequest.