Skip to content

Commit

Permalink
Merge pull request #7 from Nordstrom/feature/4-connector-can-assume-i…
Browse files Browse the repository at this point in the history
…am-role

Functionality to allow connector to assume an AWS IAM role when invok…
  • Loading branch information
seananthonywilliams authored Jul 17, 2019
2 parents d939427 + f3bd175 commit 088d4c8
Show file tree
Hide file tree
Showing 16 changed files with 426 additions and 91 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
language: java
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
The AWS Lambda connector plugin provides the ability to use AWS Lambda functions as a sink (out of a Kafka topic into a Lambda function).

## Supported Kafka and AWS versions
The `kafka-connect-lambda` connector has been tested with `connect-api:2.1.0` and `aws-java-sdk-lambda:1.11.490`
The `kafka-connect-lambda` connector has been tested with `connect-api:2.1.0` and `aws-java-sdk-lambda:1.11.592`

# Building
You can build the connector with Maven using the standard lifecycle goals:
Expand All @@ -18,6 +18,13 @@ A sink connector reads from a Kafka topic and sends events to an AWS Lambda func
A sink connector configuration has two required fields:
* `aws.lambda.function.arn`: The AWS ARN of the Lambda function to send events to.
* `topics`: The Kafka topic to be read from.

### AWS Assume Role Support options
The connector can assume an IAM Role. The role must include a policy that allows lambda:InvokeFunction and lambda:InvokeAsync actions:
* `aws.credentials.provider.class=com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider`: REQUIRED The credentials provider class.
* `aws.credentials.provider.role.arn`: REQUIRED AWS Role ARN providing the access.
* `aws.credentials.provider.session.name`: REQUIRED Session name
* `aws.credentials.provider.external.id`: OPTIONAL (but recommended) External identifier used by the `kafka-connect-lambda` when assuming the role.

### Sample Configuration
```json
Expand Down
23 changes: 16 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.nordstrom.kafka.connect.lambda</groupId>
<artifactId>kafka-connect-lambda</artifactId>
<version>1.0-SNAPSHOT</version>
<version>1.0.0</version>
<packaging>jar</packaging>

<name>kafka-connect-lambda</name>
Expand All @@ -27,7 +27,11 @@
<maven-project-info-reports-plugin.version>3.0.0</maven-project-info-reports-plugin.version>

<kafka.connect-api.version>2.1.0</kafka.connect-api.version>
<aws.sdk.version>1.11.490</aws.sdk.version>
<aws-java-sdk.version>1.11.592</aws-java-sdk.version>
<junit.version>4.12</junit.version>
<mockito-core.version>2.28.2</mockito-core.version>
<google.guava.version>19.0</google.guava.version>
<lombok.version>1.18.4</lombok.version>
</properties>

<dependencies>
Expand All @@ -39,23 +43,28 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-lambda</artifactId>
<version>${aws.sdk.version}</version>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.25.1</version>
<version>${mockito-core.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
<version>${google.guava.version}</version>
</dependency>
</dependencies>

Expand All @@ -71,7 +80,7 @@
</resource>
</resources>

<!-- NB: pluginManagement defines configurations for plugins, but DOES
<!-- PluginManagement defines configurations for plugins, but DOES
NOT force execution -->
<pluginManagement>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.nordstrom.kafka.connect.auth;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import org.apache.kafka.common.Configurable;

import java.util.Map;

//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;

public class AWSAssumeRoleCredentialsProvider implements AWSCredentialsProvider, Configurable {
// Uncomment slf4j imports and field declaration to enable logging.
// private static final Logger log = LoggerFactory.getLogger(AWSAssumeRoleCredentialsProvider.class);

public static final String EXTERNAL_ID_CONFIG = "external.id";
public static final String ROLE_ARN_CONFIG = "role.arn";
public static final String SESSION_NAME_CONFIG = "session.name";

private String externalId;
private String roleArn;
private String sessionName;

@Override
public void configure(Map<String, ?> map) {
externalId = getOptionalField(map, EXTERNAL_ID_CONFIG);
roleArn = getRequiredField(map, ROLE_ARN_CONFIG);
sessionName = getRequiredField(map, SESSION_NAME_CONFIG);
}

@Override
public AWSCredentials getCredentials() {
AWSSecurityTokenServiceClientBuilder clientBuilder = AWSSecurityTokenServiceClientBuilder.standard();
AWSCredentialsProvider provider = new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, sessionName)
.withStsClient(clientBuilder.defaultClient())
.withExternalId(externalId)
.build();

return provider.getCredentials();
}

@Override
public void refresh() {
//Nothing to do really, since we are assuming a role.
}

String getOptionalField(final Map<String, ?> map, final String fieldName) {
final Object field = map.get(fieldName);
if (isNotNull(field)) {
return field.toString();
}
return null;
}

String getRequiredField(final Map<String, ?> map, final String fieldName) {
final Object field = map.get(fieldName);
verifyNotNull(field, fieldName);
final String fieldValue = field.toString();
verifyNotNullOrEmpty(fieldValue, fieldName);

return fieldValue;
}

private boolean isNotNull(final Object field) {
return null != field;
}

private boolean isNotNullOrEmpty(final String field) {
return null != field && !field.isEmpty();
}

private void verifyNotNull(final Object field, final String fieldName) {
if (!isNotNull(field)) {
throw new IllegalArgumentException(String.format("The field '%1s' should not be null", fieldName));
}
}

private void verifyNotNullOrEmpty(final String field, final String fieldName) {
if (!isNotNullOrEmpty(field)) {
throw new IllegalArgumentException(String.format("The field '%1s' should not be null or empty", fieldName));
}
}

String getExternalId() {
return this.externalId;
}

String getRoleArn() {
return this.roleArn;
}

String getSessionName() {
return this.sessionName;
}
}
81 changes: 62 additions & 19 deletions src/main/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
package com.nordstrom.kafka.connect.lambda;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.lambda.AWSLambdaAsync;
import com.amazonaws.services.lambda.AWSLambdaAsyncClientBuilder;
import com.amazonaws.services.lambda.model.InvocationType;
import com.amazonaws.services.lambda.model.InvokeRequest;
import com.amazonaws.services.lambda.model.InvokeResult;
import com.amazonaws.services.lambda.model.RequestTooLargeException;
import com.nordstrom.kafka.connect.utils.Guard;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,37 +36,41 @@ public class AwsLambdaUtil {
private final AWSLambdaAsync lambdaClient;
private final InvocationFailure failureMode;

public AwsLambdaUtil(final Configuration config) {
Guard.verifyNotNull(config, "config");
public AwsLambdaUtil(final Configuration optConfigs, final Map<String, ?> bareAssumeRoleConfigs) {
LOGGER.debug("AwsLambdaUtil.ctor:bareAssumeRoleConfigs={}", bareAssumeRoleConfigs);
Guard.verifyNotNull(optConfigs, "optConfigs");

final AWSLambdaAsyncClientBuilder builder = AWSLambdaAsyncClientBuilder.standard();

// Will check if there's proxy configuration in the environment; if
// there's any will construct the client with it.
if (config.getHttpProxyHost().isPresent()) {
if (optConfigs.getHttpProxyHost().isPresent()) {
final ClientConfiguration clientConfiguration = new ClientConfiguration()
.withProxyHost(config.getHttpProxyHost().get());
if (config.getHttpProxyPort().isPresent()) {
clientConfiguration.setProxyPort(config.getHttpProxyPort().get());
.withProxyHost(optConfigs.getHttpProxyHost().get());
if (optConfigs.getHttpProxyPort().isPresent()) {
clientConfiguration.setProxyPort(optConfigs.getHttpProxyPort().get());
}
builder.setClientConfiguration(clientConfiguration);
LOGGER.info("Setting proxy configuration for AWS Lambda Async client host: {} port {}",
config.getHttpProxyHost().get(), config.getHttpProxyPort().orElse(-1));
optConfigs.getHttpProxyHost().get(), optConfigs.getHttpProxyPort().orElse(-1));
}

// If there's a credentials profile configuration in the environment will
// use it.
if (config.getCredentialsProfile().isPresent()) {
builder.setCredentials(new ProfileCredentialsProvider(config.getCredentialsProfile().get()));
LOGGER.info("Using aws credentials profile {} for AWS Lambda client",
config.getCredentialsProfile().get());
if (optConfigs.getAwsRegion().isPresent()) {
builder.setRegion(optConfigs.getAwsRegion().get());
LOGGER.info("Using aws region: {}", optConfigs.getAwsRegion().toString());
}

if (config.getAwsRegion().isPresent()) {
builder.setRegion(config.getAwsRegion().get());
LOGGER.info("Using aws region: {}", config.getAwsRegion().toString());
}
failureMode = optConfigs.getFailureMode().orElse(InvocationFailure.STOP);

failureMode = config.getFailureMode().orElse(InvocationFailure.STOP);
AWSCredentialsProvider provider = null;
try {
provider = getCredentialsProvider(bareAssumeRoleConfigs);
} catch (Exception e) {
LOGGER.error("Problem initializing provider", e);
}
if (provider != null) {
builder.setCredentials(provider);
}

this.lambdaClient = builder.build();
LOGGER.info("AWS Lambda client initialized");
Expand Down Expand Up @@ -147,6 +155,41 @@ InvocationResponse checkPayloadSizeForInvocationType(final byte[] payload, final
return new InvocationResponse(413, e.getLocalizedMessage(), e.getLocalizedMessage(), start, Instant.now());
}

@SuppressWarnings("unchecked")
public AWSCredentialsProvider getCredentialsProvider(Map<String, ?> roleConfigs) {
LOGGER.info(".get-credentials-provider:assumeRoleConfigs={}", roleConfigs);

try {
Object providerField = roleConfigs.get("class");
String providerClass = LambdaSinkConnectorConfig.ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_DEFAULT.getValue();
if (providerField != null) {
providerClass = providerField.toString();
}
LOGGER.debug(".get-credentials-provider:field={}, class={}", providerField, providerClass);
AWSCredentialsProvider provider = ((Class<? extends AWSCredentialsProvider>)
getClass(providerClass)).newInstance();

if (provider instanceof Configurable) {
((Configurable) provider).configure(roleConfigs);
}

LOGGER.debug(".get-credentials-provider:provider={}", provider);
return provider;
} catch (IllegalAccessException | InstantiationException e) {
throw new ConnectException("Invalid class for: " + LambdaSinkConnectorConfig.ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG, e);
}
}

public Class<?> getClass(String className) {
LOGGER.debug(".get-class:class={}",className);
try {
return Class.forName(className);
} catch (ClassNotFoundException e) {
LOGGER.error("Provider class not found: {}", e);
}
return null;
}

private class LambdaInvocationException extends RuntimeException {
public LambdaInvocationException(final Throwable e) {
super(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.nordstrom.kafka.connect.lambda;

import com.nordstrom.kafka.connect.utils.Facility;

import java.util.Optional;

import static com.nordstrom.kafka.connect.lambda.InvocationFailure.DROP;
Expand All @@ -12,10 +14,16 @@ public class Configuration {
private final Optional<Integer> httpProxyPort;
private final Optional<String> awsRegion;
private final Optional<InvocationFailure> failureMode;
private final Optional<String> roleArn;
private final Optional<String> sessionName;
private final Optional<String> externalId;


public Configuration(final String credentialsProfile, final String httpProxyHost,
final Integer httpProxyPort, final String awsRegion,
final InvocationFailure failureMode) {
final InvocationFailure failureMode,
final String roleArn, final String sessionName,
final String externalId) {
/*
* String awsCredentialsProfile =
* System.getenv(CREDENTIALS_PROFILE_CONFIG_ENV); String awsProxyHost =
Expand All @@ -33,23 +41,40 @@ public Configuration(final String credentialsProfile, final String httpProxyHost
this.awsRegion =
Facility.isNotNullNorEmpty(awsRegion) ? Optional.of(awsRegion) : Optional.empty();
this.failureMode = Facility.isNotNull(failureMode) ? Optional.of(failureMode): Optional.of(DROP);
this.roleArn =
Facility.isNotNullNorEmpty(roleArn) ? Optional.of(roleArn) : Optional.empty();
this.sessionName =
Facility.isNotNullNorEmpty(sessionName) ? Optional.of(sessionName) : Optional.empty();
this.externalId =
Facility.isNotNullNorEmpty(externalId) ? Optional.of(externalId) : Optional.empty();

}

public Configuration(final Optional<String> awsCredentialsProfile,
final Optional<String> httpProxyHost,
final Optional<Integer> httpProxyPort,
final Optional<String> awsRegion,
final Optional<InvocationFailure> failureMode) {
final Optional<InvocationFailure> failureMode,
final Optional<String> roleArn,
final Optional<String> sessionName,
final Optional<String> externalId) {

this.credentialsProfile = awsCredentialsProfile;
this.httpProxyHost = httpProxyHost;
this.httpProxyPort = httpProxyPort;
this.awsRegion = awsRegion;
this.failureMode = failureMode;
this.roleArn = roleArn;
this.sessionName = sessionName;
this.externalId = externalId;
}

public static Configuration empty() {
return new Configuration(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
return new Configuration(
Optional.empty(), Optional.empty(),
Optional.empty(), Optional.empty(),
Optional.empty(), Optional.empty(),
Optional.empty(), Optional.empty());
}

public Optional<String> getCredentialsProfile() {
Expand All @@ -68,4 +93,9 @@ public Optional<Integer> getHttpProxyPort() {

public Optional<InvocationFailure> getFailureMode() { return this.failureMode; }

public Optional<String> getRoleArn() { return this.roleArn; }

public Optional<String> getSessionName() { return this.sessionName; }

public Optional<String> getExternalId() { return this.externalId; }
}
Loading

0 comments on commit 088d4c8

Please sign in to comment.