From b6be26d1db816b5806e5c67116f275a80ee92efd Mon Sep 17 00:00:00 2001 From: Claudio Contino Date: Wed, 15 Jan 2025 11:54:01 +0100 Subject: [PATCH] Upgrade AWS SDK to version 2 --- .gitignore | 3 + README.md | 1 + docker-compose.yml | 8 +- pom.xml | 36 ++++++--- .../AWSAssumeRoleCredentialsProvider.java | 37 ++++----- .../connect/lambda/InvocationClient.java | 78 ++++++++++--------- .../lambda/InvocationClientConfig.java | 53 +++++++------ .../lambda/InvocationClientConfigTest.java | 8 +- .../connect/lambda/InvocationClientTest.java | 32 ++++---- .../connect/lambda/LambdaSinkTaskTest.java | 4 - 10 files changed, 144 insertions(+), 116 deletions(-) diff --git a/.gitignore b/.gitignore index dce74df..1f44137 100644 --- a/.gitignore +++ b/.gitignore @@ -33,6 +33,9 @@ bin/ .idea/ *.iml +# VS Code +.vscode/ + # maven-shade-plugin dependency-reduced-pom.xml diff --git a/README.md b/README.md index 67cbb8b..6502a6c 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,7 @@ A Kafka Connect sink plugin to invoke AWS Lambda functions. |1.1.1|2.2.0|1.11.592| |1.2.0|2.3.0|1.11.651| |1.3.0|2.8.1|1.11.1034| +|1.4.0|3.8.1|2.29.47| Due to a compatibility issue with [Apache httpcomponents](http://hc.apache.org/), connector versions 1.1.1 and earlier may not work with Kafka Connect versions greater than 2.2 diff --git a/docker-compose.yml b/docker-compose.yml index cbdf27d..560872a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ version: "3" services: zookeeper: - image: confluentinc/cp-zookeeper:5.3.1 + image: confluentinc/cp-zookeeper:7.8.0 environment: ZOOKEEPER_CLIENT_PORT: 2181 ports: @@ -10,7 +10,7 @@ services: logging: { driver: none } broker: - image: confluentinc/cp-kafka:5.3.1 + image: confluentinc/cp-kafka:7.8.0 ports: - 9092:9092 environment: @@ -31,7 +31,7 @@ services: logging: { driver: none } schema-registry: - image: confluentinc/cp-schema-registry:5.3.1 + image: confluentinc/cp-schema-registry:7.8.0 hostname: schema-registry ports: - 8080:8080 @@ -48,7 +48,7 @@ services: # NB: run connect locally in stand-alone mode to debug connect: - image: confluentinc/cp-kafka-connect:5.3.1 + image: confluentinc/cp-kafka-connect:7.8.0 ports: - 8083:8083 environment: diff --git a/pom.xml b/pom.xml index 1e60e2d..0e40cc1 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.nordstrom.kafka.connect.lambda kafka-connect-lambda - 1.3.0 + 1.4.0 kafka-connect-lambda A Kafka Connect Connector for kafka-connect-lambda @@ -17,12 +17,12 @@ 1.7.25 - 2.8.1 - 2.13.4.2 - 1.11.1034 + 3.8.1 + 2.18.2 + 2.29.47 4.13.2 - 2.28.2 - 32.0.0-jre + 5.15.2 + 33.4.0-jre @@ -32,8 +32,8 @@ specify their versions. --> - com.amazonaws - aws-java-sdk-bom + software.amazon.awssdk + bom ${aws-java-sdk.version} pom import @@ -77,12 +77,24 @@ ${jackson.version} - com.amazonaws - aws-java-sdk-lambda + software.amazon.awssdk + apache-client + ${aws-java-sdk.version} - com.amazonaws - aws-java-sdk-sts + software.amazon.awssdk + netty-nio-client + ${aws-java-sdk.version} + + + software.amazon.awssdk + lambda + ${aws-java-sdk.version} + + + software.amazon.awssdk + sts + ${aws-java-sdk.version} junit diff --git a/src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java b/src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java index cd8540b..1a77cdd 100644 --- a/src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java +++ b/src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java @@ -1,9 +1,9 @@ package com.nordstrom.kafka.connect.auth; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; -import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; import org.apache.kafka.common.Configurable; import java.util.Map; @@ -11,7 +11,7 @@ //import org.slf4j.Logger; //import org.slf4j.LoggerFactory; -public class AWSAssumeRoleCredentialsProvider implements AWSCredentialsProvider, Configurable { +public class AWSAssumeRoleCredentialsProvider implements AwsCredentialsProvider, Configurable { // Uncomment slf4j imports and field declaration to enable logging. // private static final Logger log = LoggerFactory.getLogger(AWSAssumeRoleCredentialsProvider.class); @@ -23,6 +23,10 @@ public class AWSAssumeRoleCredentialsProvider implements AWSCredentialsProvider, private String roleArn; private String sessionName; + public static AWSAssumeRoleCredentialsProvider create() { + return new AWSAssumeRoleCredentialsProvider(); + } + @Override public void configure(Map map) { externalId = getOptionalField(map, EXTERNAL_ID_CONFIG); @@ -31,19 +35,16 @@ public void configure(Map map) { } @Override - public AWSCredentials getCredentials() { - AWSSecurityTokenServiceClientBuilder clientBuilder = AWSSecurityTokenServiceClientBuilder.standard(); - AWSCredentialsProvider provider = new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, sessionName) - .withStsClient(clientBuilder.defaultClient()) - .withExternalId(externalId) - .build(); - - return provider.getCredentials(); - } - - @Override - public void refresh() { - //Nothing to do really, since we are assuming a role. + public AwsCredentials resolveCredentials() { + AwsCredentialsProvider provider = StsAssumeRoleCredentialsProvider.builder() + .stsClient(StsClient.create()) + .refreshRequest(r -> r + .externalId(externalId) + .roleArn(roleArn) + .roleSessionName(sessionName)) + .build(); + + return provider.resolveCredentials(); } String getOptionalField(final Map map, final String fieldName) { diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClient.java b/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClient.java index 14e0944..85cd972 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClient.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClient.java @@ -1,20 +1,19 @@ package com.nordstrom.kafka.connect.lambda; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.services.lambda.AWSLambdaAsync; -import com.amazonaws.services.lambda.AWSLambdaAsyncClientBuilder; -import com.amazonaws.services.lambda.model.InvocationType; -import com.amazonaws.services.lambda.model.InvokeRequest; -import com.amazonaws.services.lambda.model.InvokeResult; -import com.amazonaws.services.lambda.model.RequestTooLargeException; - -import com.nordstrom.kafka.connect.utils.Facility; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.lambda.LambdaAsyncClient; +import software.amazon.awssdk.services.lambda.LambdaAsyncClientBuilder; +import software.amazon.awssdk.services.lambda.model.InvocationType; +import software.amazon.awssdk.services.lambda.model.InvokeRequest; +import software.amazon.awssdk.services.lambda.model.InvokeResponse; +import software.amazon.awssdk.services.lambda.model.RequestTooLargeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; import java.time.Duration; import java.time.Instant; import java.util.concurrent.ExecutionException; @@ -34,33 +33,34 @@ public class InvocationClient { private static final int maxSyncPayloadSizeBytes = (6 * MEGABYTE_SIZE); private static final int maxAsyncPayloadSizeBytes = (256 * KILOBYTE_SIZE); - private final AWSLambdaAsync innerClient; + private final LambdaAsyncClient innerClient; private final String functionArn; private InvocationFailure failureMode; private InvocationMode invocationMode; private Duration invocationTimeout; - private InvocationClient(String functionArn, AWSLambdaAsync innerClient) { + private InvocationClient(String functionArn, LambdaAsyncClient innerClient) { this.functionArn = functionArn; this.innerClient = innerClient; } public InvocationResponse invoke(final byte[] payload) { final InvocationType type = invocationMode == InvocationMode.ASYNC - ? InvocationType.Event : InvocationType.RequestResponse; + ? InvocationType.EVENT : InvocationType.REQUEST_RESPONSE; - final InvokeRequest request = new InvokeRequest() - .withInvocationType(type) - .withFunctionName(functionArn) - .withPayload(ByteBuffer.wrap(payload)); + final InvokeRequest request = InvokeRequest.builder() + .invocationType(type) + .functionName(functionArn) + .payload(SdkBytes.fromByteArray(payload)) + .build(); - final Future futureResult = innerClient.invokeAsync(request); + final Future futureResult = innerClient.invoke(request); final Instant start = Instant.now(); try { - final InvokeResult result = futureResult.get(invocationTimeout.toMillis(), TimeUnit.MILLISECONDS); - return new InvocationResponse(result.getStatusCode(), result.getLogResult(), - result.getFunctionError(), start, Instant.now()); + final InvokeResponse result = futureResult.get(invocationTimeout.toMillis(), TimeUnit.MILLISECONDS); + return new InvocationResponse(result.statusCode(), result.logResult(), + result.functionError(), start, Instant.now()); } catch (RequestTooLargeException e) { return checkPayloadSizeForInvocationType(payload, type, start, e); } catch (final InterruptedException | ExecutionException e) { @@ -84,13 +84,13 @@ public InvocationResponse invoke(final byte[] payload) { InvocationResponse checkPayloadSizeForInvocationType(final byte[] payload, final InvocationType event, final Instant start, final RequestTooLargeException e) { switch (event) { - case Event: + case EVENT: if (payload.length > maxAsyncPayloadSizeBytes) { LOGGER.error("{} bytes payload exceeded {} bytes invocation limit for asynchronous Lambda call", payload.length, maxAsyncPayloadSizeBytes); } break; - case RequestResponse: + case REQUEST_RESPONSE: if (payload.length > maxSyncPayloadSizeBytes) { LOGGER.error("{} bytes payload exceeded {} bytes invocation limit for synchronous Lambda call", payload.length, maxSyncPayloadSizeBytes); } @@ -119,11 +119,14 @@ public static class Builder { private InvocationMode invocationMode = DEFAULT_INVOCATION_MODE; private InvocationFailure failureMode = DEFAULT_FAILURE_MODE; private Duration invocationTimeout = Duration.ofMillis(DEFAULT_INVOCATION_TIMEOUT_MS); + private String region; + private SdkAsyncHttpClient httpClient; + private AwsCredentialsProvider credentialsProvider; - private final AWSLambdaAsyncClientBuilder innerBuilder; + private final LambdaAsyncClientBuilder innerBuilder; public Builder() { - this.innerBuilder = AWSLambdaAsyncClientBuilder.standard(); + this.innerBuilder = LambdaAsyncClient.builder(); } public InvocationClient build() { @@ -174,29 +177,32 @@ public Builder setInvocationTimeout(final Duration timeout) { } public String getRegion() { - return this.innerBuilder.getRegion(); + return this.region; } public Builder setRegion(final String awsRegion) { - this.innerBuilder.setRegion(awsRegion); + this.innerBuilder.region(Region.of(awsRegion)); + this.region = awsRegion; return this; } - public ClientConfiguration getClientConfiguration() { - return this.innerBuilder.getClientConfiguration(); + public SdkAsyncHttpClient getHttpClient() { + return this.httpClient; } - public Builder withClientConfiguration(final ClientConfiguration clientConfiguration) { - this.innerBuilder.withClientConfiguration(clientConfiguration); + public Builder withHttpClient(final SdkAsyncHttpClient httpClient) { + this.innerBuilder.httpClient(httpClient); + this.httpClient = httpClient; return this; } - public AWSCredentialsProvider getCredentialsProvider() { - return this.innerBuilder.getCredentials(); + public AwsCredentialsProvider getCredentialsProvider() { + return this.credentialsProvider; } - public Builder withCredentialsProvider(final AWSCredentialsProvider credentialsProvider) { - this.innerBuilder.withCredentials(credentialsProvider); + public Builder withCredentialsProvider(final AwsCredentialsProvider credentialsProvider) { + this.innerBuilder.credentialsProvider(credentialsProvider); + this.credentialsProvider = credentialsProvider; return this; } } diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClientConfig.java b/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClientConfig.java index e168b5e..8761a9a 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClientConfig.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClientConfig.java @@ -4,12 +4,13 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.errors.ConnectException; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.ProxyConfiguration; + import com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider; import java.util.Arrays; @@ -17,6 +18,7 @@ import java.util.Map; import java.time.Duration; import java.lang.reflect.InvocationTargetException; +import java.util.stream.Collectors; public class InvocationClientConfig extends AbstractConfig { static final String CONFIG_GROUP_NAME = "Lambda"; @@ -63,7 +65,9 @@ public class InvocationClientConfig extends AbstractConfig { .setInvocationMode(InvocationMode.valueOf(getString(INVOCATION_MODE_KEY))) .setInvocationTimeout(Duration.ofMillis(getLong(INVOCATION_TIMEOUT_KEY))) .setFailureMode(InvocationFailure.valueOf(getString(FAILURE_MODE_KEY))) - .withClientConfiguration(loadAwsClientConfiguration()) + .withHttpClient(NettyNioAsyncHttpClient.builder() + .proxyConfiguration(loadAwsProxyConfiguration()) + .build()) .withCredentialsProvider(loadAwsCredentialsProvider()); String awsRegion = getString(AWS_REGION_KEY); @@ -77,26 +81,27 @@ public InvocationClient getInvocationClient() { return this.clientBuilder.build(); } - ClientConfiguration loadAwsClientConfiguration() { - ClientConfiguration clientConfiguration = new ClientConfiguration(); + ProxyConfiguration loadAwsProxyConfiguration() { + ProxyConfiguration.Builder proxyConfigurationBuilder = ProxyConfiguration.builder(); String httpProxyHost = this.getString(HTTP_PROXY_HOST_KEY); if (httpProxyHost != null && !httpProxyHost.isEmpty()) { - clientConfiguration.setProxyHost(httpProxyHost); + + proxyConfigurationBuilder.host(httpProxyHost); Integer httpProxyPort = this.getInt(HTTP_PROXY_PORT_KEY); - if (httpProxyPort > 0) - clientConfiguration.setProxyPort(httpProxyPort); + if (httpProxyPort > 0) { + proxyConfigurationBuilder.port(httpProxyPort); + } } - return clientConfiguration; + return proxyConfigurationBuilder.build(); } - @SuppressWarnings("unchecked") - AWSCredentialsProvider loadAwsCredentialsProvider() { + AwsCredentialsProvider loadAwsCredentialsProvider() { try { - AWSCredentialsProvider credentialsProvider = ((Class) - getClass(CREDENTIALS_PROVIDER_CLASS_KEY)).getDeclaredConstructor().newInstance(); + AwsCredentialsProvider credentialsProvider = (AwsCredentialsProvider) getClass(CREDENTIALS_PROVIDER_CLASS_KEY) + .getDeclaredMethod("create").invoke(null); if (credentialsProvider instanceof Configurable) { Map configs = originalsWithPrefix( @@ -107,7 +112,7 @@ AWSCredentialsProvider loadAwsCredentialsProvider() { return credentialsProvider; - } catch (IllegalAccessException | InstantiationException | InvocationTargetException | NoSuchMethodException e) { + } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { throw new ConnectException("Unable to create " + CREDENTIALS_PROVIDER_CLASS_KEY, e); } } @@ -195,7 +200,7 @@ public static ConfigDef configDef(ConfigDef base) { .define(CREDENTIALS_PROVIDER_CLASS_KEY, ConfigDef.Type.CLASS, - DefaultAWSCredentialsProviderChain.class, + DefaultCredentialsProvider.class, new AwsCredentialsProviderValidator(), ConfigDef.Importance.LOW, CREDENTIALS_PROVIDER_CLASS_DOC, @@ -254,13 +259,13 @@ public void ensureValid(String name, Object invocationMode) { InvocationMode.valueOf(((String)invocationMode).trim()); } catch (Exception e) { throw new ConfigException(name, invocationMode, "Value must be one of [" + - Utils.join(InvocationMode.values(), ", ") + "]"); + Arrays.stream(InvocationMode.values()).map(InvocationMode::toString).collect(Collectors.joining(", ")) + "]"); } } @Override public String toString() { - return "[" + Utils.join(InvocationMode.values(), ", ") + "]"; + return "[" + Arrays.stream(InvocationMode.values()).map(InvocationMode::toString).collect(Collectors.joining(", ")) + "]"; } } @@ -283,29 +288,29 @@ public void ensureValid(String name, Object invocationFailure) { InvocationFailure.valueOf(((String)invocationFailure).trim()); } catch (Exception e) { throw new ConfigException(name, invocationFailure, "Value must be one of [" + - Utils.join(InvocationFailure.values(), ", ") + "]"); + Arrays.stream(InvocationFailure.values()).map(InvocationFailure::toString).collect(Collectors.joining(", ")) + "]"); } } @Override public String toString() { - return "[" + Utils.join(InvocationFailure.values(), ", ") + "]"; + return "[" + Arrays.stream(InvocationFailure.values()).map(InvocationFailure::toString).collect(Collectors.joining(", ")) + "]"; } } static class AwsCredentialsProviderValidator implements ConfigDef.Validator { @Override public void ensureValid(String name, Object provider) { - if (provider instanceof Class && AWSCredentialsProvider.class.isAssignableFrom((Class)provider)) { + if (provider instanceof Class && AwsCredentialsProvider.class.isAssignableFrom((Class)provider)) { return; } - throw new ConfigException(name, provider, "Class must extend: " + AWSCredentialsProvider.class); + throw new ConfigException(name, provider, "Class must extend: " + AwsCredentialsProvider.class); } @Override public String toString() { - return "Any class implementing: " + AWSCredentialsProvider.class; + return "Any class implementing: " + AwsCredentialsProvider.class; } } } diff --git a/src/test/java/com/nordstrom/kafka/connect/lambda/InvocationClientConfigTest.java b/src/test/java/com/nordstrom/kafka/connect/lambda/InvocationClientConfigTest.java index 263ff74..47c35a7 100644 --- a/src/test/java/com/nordstrom/kafka/connect/lambda/InvocationClientConfigTest.java +++ b/src/test/java/com/nordstrom/kafka/connect/lambda/InvocationClientConfigTest.java @@ -2,7 +2,7 @@ import org.apache.kafka.common.config.ConfigException; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider; import java.time.Duration; @@ -27,8 +27,8 @@ public void minimalConfig() { assertEquals(InvocationMode.SYNC, builder.getInvocationMode()); assertEquals(InvocationFailure.STOP, builder.getFailureMode()); assertEquals(Duration.ofMinutes(5), builder.getInvocationTimeout()); - assertNotNull(builder.getClientConfiguration()); - assertEquals(DefaultAWSCredentialsProviderChain.class, builder.getCredentialsProvider().getClass()); + assertNotNull(builder.getHttpClient()); + assertEquals(DefaultCredentialsProvider.class, builder.getCredentialsProvider().getClass()); } @Test @@ -55,7 +55,7 @@ public void sampleConfig() { assertEquals(123, builder.getInvocationTimeout().toMillis()); assertEquals(InvocationMode.SYNC, builder.getInvocationMode()); assertEquals(InvocationFailure.DROP, builder.getFailureMode()); - assertNotNull(builder.getClientConfiguration()); + assertNotNull(builder.getHttpClient()); assertEquals(AWSAssumeRoleCredentialsProvider.class, builder.getCredentialsProvider().getClass()); AWSAssumeRoleCredentialsProvider credentialsProvider = (AWSAssumeRoleCredentialsProvider)builder.getCredentialsProvider(); diff --git a/src/test/java/com/nordstrom/kafka/connect/lambda/InvocationClientTest.java b/src/test/java/com/nordstrom/kafka/connect/lambda/InvocationClientTest.java index eab020e..5172564 100644 --- a/src/test/java/com/nordstrom/kafka/connect/lambda/InvocationClientTest.java +++ b/src/test/java/com/nordstrom/kafka/connect/lambda/InvocationClientTest.java @@ -1,10 +1,11 @@ package com.nordstrom.kafka.connect.lambda; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.services.lambda.model.InvocationType; -import com.amazonaws.services.lambda.model.RequestTooLargeException; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.services.lambda.model.InvocationType; +import software.amazon.awssdk.services.lambda.model.RequestTooLargeException; import java.time.Duration; import java.time.Instant; @@ -21,14 +22,15 @@ public void testBuilderDefaults() { assertEquals(InvocationMode.SYNC, builder.getInvocationMode()); assertEquals(InvocationFailure.STOP, builder.getFailureMode()); assertEquals(Duration.ofMinutes(5), builder.getInvocationTimeout()); - assertNull(builder.getClientConfiguration()); + assertNull(builder.getHttpClient()); assertNull(builder.getCredentialsProvider()); } @Test public void testBuilderReflexiveProperties() { - ClientConfiguration clientConfiguration = new ClientConfiguration(); - AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain(); + SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.create(); + AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.builder() + .build(); InvocationClient.Builder builder = new InvocationClient.Builder() .setFunctionArn("test-function-arn") @@ -36,7 +38,7 @@ public void testBuilderReflexiveProperties() { .setInvocationMode(InvocationMode.ASYNC) .setFailureMode(InvocationFailure.DROP) .setInvocationTimeout(Duration.ofSeconds(123)) - .withClientConfiguration(clientConfiguration) + .withHttpClient(httpClient) .withCredentialsProvider(credentialsProvider); assertEquals("test-function-arn", builder.getFunctionArn()); @@ -44,7 +46,7 @@ public void testBuilderReflexiveProperties() { assertEquals(InvocationMode.ASYNC, builder.getInvocationMode()); assertEquals(InvocationFailure.DROP, builder.getFailureMode()); assertEquals(Duration.ofSeconds(123), builder.getInvocationTimeout()); - assertSame(clientConfiguration, builder.getClientConfiguration()); + assertSame(httpClient, builder.getHttpClient()); assertSame(credentialsProvider, builder.getCredentialsProvider()); } @@ -63,9 +65,10 @@ public void testCheckPayloadSizeForInvocationTypeWithInvocationFailureModeStopTh client.checkPayloadSizeForInvocationType( "testpayload".getBytes(), - InvocationType.RequestResponse, + InvocationType.REQUEST_RESPONSE, Instant.now(), - new RequestTooLargeException("Request payload is too large!")); + RequestTooLargeException.builder() + .build()); } @Test @@ -78,9 +81,10 @@ public void testCheckPayloadSizeForInvocationTypeWithInvocationFailureModeDropCo try { testResp = client.checkPayloadSizeForInvocationType( "testpayload".getBytes(), - InvocationType.RequestResponse, + InvocationType.REQUEST_RESPONSE, Instant.now(), - new RequestTooLargeException("Request payload is too large!")); + RequestTooLargeException.builder() + .build()); } catch (RequestTooLargeException e) { ex = e; } diff --git a/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java b/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java index ceee909..ff6bcd7 100644 --- a/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java +++ b/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java @@ -1,8 +1,5 @@ package com.nordstrom.kafka.connect.lambda; -import com.amazonaws.services.lambda.model.InvocationType; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.google.common.collect.ImmutableMap; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.data.Schema; @@ -17,7 +14,6 @@ import java.util.Collection; import static org.apache.kafka.connect.data.Schema.STRING_SCHEMA; -import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when;