Skip to content

Commit

Permalink
Merge pull request #54 from ClaudioContino-TomTom/upgrade-aws-sdk
Browse files Browse the repository at this point in the history
Upgrade AWS SDK to version 2
  • Loading branch information
dylanmei authored Feb 1, 2025
2 parents eb415e6 + b6be26d commit 234e18e
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 116 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ bin/
.idea/
*.iml

# VS Code
.vscode/

# maven-shade-plugin
dependency-reduced-pom.xml

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ A Kafka Connect sink plugin to invoke AWS Lambda functions.
|1.1.1|2.2.0|1.11.592|
|1.2.0|2.3.0|1.11.651|
|1.3.0|2.8.1|1.11.1034|
|1.4.0|3.8.1|2.29.47|

Due to a compatibility issue with [Apache httpcomponents](http://hc.apache.org/), connector versions 1.1.1 and earlier may not work with Kafka Connect versions greater than 2.2

Expand Down
8 changes: 4 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ version: "3"

services:
zookeeper:
image: confluentinc/cp-zookeeper:5.3.1
image: confluentinc/cp-zookeeper:7.8.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- 2181:2181
logging: { driver: none }

broker:
image: confluentinc/cp-kafka:5.3.1
image: confluentinc/cp-kafka:7.8.0
ports:
- 9092:9092
environment:
Expand All @@ -31,7 +31,7 @@ services:
logging: { driver: none }

schema-registry:
image: confluentinc/cp-schema-registry:5.3.1
image: confluentinc/cp-schema-registry:7.8.0
hostname: schema-registry
ports:
- 8080:8080
Expand All @@ -48,7 +48,7 @@ services:

# NB: run connect locally in stand-alone mode to debug
connect:
image: confluentinc/cp-kafka-connect:5.3.1
image: confluentinc/cp-kafka-connect:7.8.0
ports:
- 8083:8083
environment:
Expand Down
36 changes: 24 additions & 12 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.nordstrom.kafka.connect.lambda</groupId>
<artifactId>kafka-connect-lambda</artifactId>
<version>1.3.0</version>
<version>1.4.0</version>

<name>kafka-connect-lambda</name>
<description>A Kafka Connect Connector for kafka-connect-lambda</description>
Expand All @@ -17,12 +17,12 @@

<slf4j.version>1.7.25</slf4j.version>

<kafka-connect.version>2.8.1</kafka-connect.version>
<jackson.version>2.13.4.2</jackson.version>
<aws-java-sdk.version>1.11.1034</aws-java-sdk.version>
<kafka-connect.version>3.8.1</kafka-connect.version>
<jackson.version>2.18.2</jackson.version>
<aws-java-sdk.version>2.29.47</aws-java-sdk.version>
<junit.version>4.13.2</junit.version>
<mockito-core.version>2.28.2</mockito-core.version>
<google.guava.version>32.0.0-jre</google.guava.version>
<mockito-core.version>5.15.2</mockito-core.version>
<google.guava.version>33.4.0-jre</google.guava.version>
</properties>

<dependencyManagement>
Expand All @@ -32,8 +32,8 @@
specify their versions.
-->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>${aws-java-sdk.version}</version>
<type>pom</type>
<scope>import</scope>
Expand Down Expand Up @@ -77,12 +77,24 @@
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-lambda</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>netty-nio-client</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>lambda</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package com.nordstrom.kafka.connect.auth;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import org.apache.kafka.common.Configurable;

import java.util.Map;

//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;

public class AWSAssumeRoleCredentialsProvider implements AWSCredentialsProvider, Configurable {
public class AWSAssumeRoleCredentialsProvider implements AwsCredentialsProvider, Configurable {
// Uncomment slf4j imports and field declaration to enable logging.
// private static final Logger log = LoggerFactory.getLogger(AWSAssumeRoleCredentialsProvider.class);

Expand All @@ -23,6 +23,10 @@ public class AWSAssumeRoleCredentialsProvider implements AWSCredentialsProvider,
private String roleArn;
private String sessionName;

public static AWSAssumeRoleCredentialsProvider create() {
return new AWSAssumeRoleCredentialsProvider();
}

@Override
public void configure(Map<String, ?> map) {
externalId = getOptionalField(map, EXTERNAL_ID_CONFIG);
Expand All @@ -31,19 +35,16 @@ public void configure(Map<String, ?> map) {
}

@Override
public AWSCredentials getCredentials() {
AWSSecurityTokenServiceClientBuilder clientBuilder = AWSSecurityTokenServiceClientBuilder.standard();
AWSCredentialsProvider provider = new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, sessionName)
.withStsClient(clientBuilder.defaultClient())
.withExternalId(externalId)
.build();

return provider.getCredentials();
}

@Override
public void refresh() {
//Nothing to do really, since we are assuming a role.
public AwsCredentials resolveCredentials() {
AwsCredentialsProvider provider = StsAssumeRoleCredentialsProvider.builder()
.stsClient(StsClient.create())
.refreshRequest(r -> r
.externalId(externalId)
.roleArn(roleArn)
.roleSessionName(sessionName))
.build();

return provider.resolveCredentials();
}

String getOptionalField(final Map<String, ?> map, final String fieldName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
package com.nordstrom.kafka.connect.lambda;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.lambda.AWSLambdaAsync;
import com.amazonaws.services.lambda.AWSLambdaAsyncClientBuilder;
import com.amazonaws.services.lambda.model.InvocationType;
import com.amazonaws.services.lambda.model.InvokeRequest;
import com.amazonaws.services.lambda.model.InvokeResult;
import com.amazonaws.services.lambda.model.RequestTooLargeException;

import com.nordstrom.kafka.connect.utils.Facility;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
import software.amazon.awssdk.services.lambda.LambdaAsyncClientBuilder;
import software.amazon.awssdk.services.lambda.model.InvocationType;
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
import software.amazon.awssdk.services.lambda.model.RequestTooLargeException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
Expand All @@ -34,33 +33,34 @@ public class InvocationClient {
private static final int maxSyncPayloadSizeBytes = (6 * MEGABYTE_SIZE);
private static final int maxAsyncPayloadSizeBytes = (256 * KILOBYTE_SIZE);

private final AWSLambdaAsync innerClient;
private final LambdaAsyncClient innerClient;
private final String functionArn;
private InvocationFailure failureMode;
private InvocationMode invocationMode;
private Duration invocationTimeout;

private InvocationClient(String functionArn, AWSLambdaAsync innerClient) {
private InvocationClient(String functionArn, LambdaAsyncClient innerClient) {
this.functionArn = functionArn;
this.innerClient = innerClient;
}

public InvocationResponse invoke(final byte[] payload) {
final InvocationType type = invocationMode == InvocationMode.ASYNC
? InvocationType.Event : InvocationType.RequestResponse;
? InvocationType.EVENT : InvocationType.REQUEST_RESPONSE;

final InvokeRequest request = new InvokeRequest()
.withInvocationType(type)
.withFunctionName(functionArn)
.withPayload(ByteBuffer.wrap(payload));
final InvokeRequest request = InvokeRequest.builder()
.invocationType(type)
.functionName(functionArn)
.payload(SdkBytes.fromByteArray(payload))
.build();

final Future<InvokeResult> futureResult = innerClient.invokeAsync(request);
final Future<InvokeResponse> futureResult = innerClient.invoke(request);

final Instant start = Instant.now();
try {
final InvokeResult result = futureResult.get(invocationTimeout.toMillis(), TimeUnit.MILLISECONDS);
return new InvocationResponse(result.getStatusCode(), result.getLogResult(),
result.getFunctionError(), start, Instant.now());
final InvokeResponse result = futureResult.get(invocationTimeout.toMillis(), TimeUnit.MILLISECONDS);
return new InvocationResponse(result.statusCode(), result.logResult(),
result.functionError(), start, Instant.now());
} catch (RequestTooLargeException e) {
return checkPayloadSizeForInvocationType(payload, type, start, e);
} catch (final InterruptedException | ExecutionException e) {
Expand All @@ -84,13 +84,13 @@ public InvocationResponse invoke(final byte[] payload) {
InvocationResponse checkPayloadSizeForInvocationType(final byte[] payload, final InvocationType event, final Instant start, final RequestTooLargeException e) {
switch (event) {

case Event:
case EVENT:
if (payload.length > maxAsyncPayloadSizeBytes) {
LOGGER.error("{} bytes payload exceeded {} bytes invocation limit for asynchronous Lambda call", payload.length, maxAsyncPayloadSizeBytes);
}
break;

case RequestResponse:
case REQUEST_RESPONSE:
if (payload.length > maxSyncPayloadSizeBytes) {
LOGGER.error("{} bytes payload exceeded {} bytes invocation limit for synchronous Lambda call", payload.length, maxSyncPayloadSizeBytes);
}
Expand Down Expand Up @@ -119,11 +119,14 @@ public static class Builder {
private InvocationMode invocationMode = DEFAULT_INVOCATION_MODE;
private InvocationFailure failureMode = DEFAULT_FAILURE_MODE;
private Duration invocationTimeout = Duration.ofMillis(DEFAULT_INVOCATION_TIMEOUT_MS);
private String region;
private SdkAsyncHttpClient httpClient;
private AwsCredentialsProvider credentialsProvider;

private final AWSLambdaAsyncClientBuilder innerBuilder;
private final LambdaAsyncClientBuilder innerBuilder;

public Builder() {
this.innerBuilder = AWSLambdaAsyncClientBuilder.standard();
this.innerBuilder = LambdaAsyncClient.builder();
}

public InvocationClient build() {
Expand Down Expand Up @@ -174,29 +177,32 @@ public Builder setInvocationTimeout(final Duration timeout) {
}

public String getRegion() {
return this.innerBuilder.getRegion();
return this.region;
}

public Builder setRegion(final String awsRegion) {
this.innerBuilder.setRegion(awsRegion);
this.innerBuilder.region(Region.of(awsRegion));
this.region = awsRegion;
return this;
}

public ClientConfiguration getClientConfiguration() {
return this.innerBuilder.getClientConfiguration();
public SdkAsyncHttpClient getHttpClient() {
return this.httpClient;
}

public Builder withClientConfiguration(final ClientConfiguration clientConfiguration) {
this.innerBuilder.withClientConfiguration(clientConfiguration);
public Builder withHttpClient(final SdkAsyncHttpClient httpClient) {
this.innerBuilder.httpClient(httpClient);
this.httpClient = httpClient;
return this;
}

public AWSCredentialsProvider getCredentialsProvider() {
return this.innerBuilder.getCredentials();
public AwsCredentialsProvider getCredentialsProvider() {
return this.credentialsProvider;
}

public Builder withCredentialsProvider(final AWSCredentialsProvider credentialsProvider) {
this.innerBuilder.withCredentials(credentialsProvider);
public Builder withCredentialsProvider(final AwsCredentialsProvider credentialsProvider) {
this.innerBuilder.credentialsProvider(credentialsProvider);
this.credentialsProvider = credentialsProvider;
return this;
}
}
Expand Down
Loading

0 comments on commit 234e18e

Please sign in to comment.