diff --git a/config/worker.properties b/config/worker.properties index 961fd73..8117cc9 100644 --- a/config/worker.properties +++ b/config/worker.properties @@ -1,6 +1,6 @@ bootstrap.servers=localhost:9092 -plugin.path=target +plugin.path=target/plugin offset.storage.file.filename=/tmp/connect.offsets key.converter=org.apache.kafka.connect.storage.StringConverter diff --git a/docker-compose.yml b/docker-compose.yml index a25d23b..b6f5d81 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -64,16 +64,15 @@ services: - CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter - CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter - CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter - + - CONNECT_PLUGIN_PATH=/opt/connectors - KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/etc/log4j.properties - + - AWS_PROFILE - AWS_ACCESS_KEY_ID - AWS_SECRET_ACCESS_KEY volumes: - ~/.aws:/root/.aws - - ./target:/opt/connectors - - /opt/connectors/.shaded-jar + - ./target/plugin:/opt/connectors - ./config/log4j.properties:/etc/log4j.properties depends_on: [broker] diff --git a/pom.xml b/pom.xml index 6f9d16c..c5412bb 100644 --- a/pom.xml +++ b/pom.xml @@ -15,22 +15,11 @@ UTF-8 1.8 - 1.6.0 - 0.8.2 - 3.8.0 - 2.22.0 - 3.0.1 - 3.1.1 - 3.0.1 - 2.22.1 - 3.0.0 - 2.1.0 1.11.592 4.12 2.28.2 19.0 - 1.18.4 @@ -59,11 +48,13 @@ org.mockito mockito-core ${mockito-core.version} + test com.google.guava guava ${google.guava.version} + test @@ -79,138 +70,84 @@ - - - - - org.apache.maven.plugins - maven-compiler-plugin - ${maven-compiler-plugin.version} - - ${java.version} - ${java.version} - - - - org.apache.maven.plugins - maven-javadoc-plugin - ${maven-javadoc-plugin.version} - - - org.apache.maven.plugins - maven-source-plugin - ${maven-source-plugin.version} - - - attach-sources - verify - - jar-no-fork - - - - - - org.apache.maven.plugins - maven-shade-plugin - ${maven-shade-plugin.version} - - - package - - shade - - - ${project.build.directory}/.shaded-jar - - - *:* - - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - - junit:* - jmock:* - mockito-all:* - - - commons-codec:* - commons-logging:* - org.apache.httpcomponents:* - org.apache.kafka:* - org.slf4j:* - - - - - - - - org.jacoco - jacoco-maven-plugin - ${jacoco-maven-plugin.version} - - - - prepare-agent - - - - report - prepare-package - - report - - - - - - - org.apache.maven.plugins maven-compiler-plugin - - - org.codehaus.mojo - exec-maven-plugin - ${exec-maven-plugin.version} - - - org.apache.maven.plugins - maven-source-plugin + 3.8.0 + + ${java.version} + ${java.version} + org.apache.maven.plugins - maven-surefire-plugin - ${maven-surefire-plugin.version} + maven-javadoc-plugin + 3.0.1 org.apache.maven.plugins - maven-failsafe-plugin - ${maven-failsafe-plugin.version} + maven-source-plugin + 3.0.1 + + + attach-sources + verify + + jar-no-fork + + + org.apache.maven.plugins - maven-javadoc-plugin - - - org.jacoco - jacoco-maven-plugin + maven-jar-plugin + 3.1.2 + + target/plugin/ + org.apache.maven.plugins maven-shade-plugin + 3.1.1 + + + package + + shade + + + + + *:* + + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + junit:* + jmock:* + mockito-all:* + + + commons-codec:* + commons-logging:* + org.apache.httpcomponents:* + org.apache.kafka:* + org.slf4j:* + + + + + org.codehaus.mojo diff --git a/src/main/java/com/nordstrom/kafka/connect/formatters/PayloadFormatter.java b/src/main/java/com/nordstrom/kafka/connect/formatters/PayloadFormatter.java new file mode 100644 index 0000000..0872fb5 --- /dev/null +++ b/src/main/java/com/nordstrom/kafka/connect/formatters/PayloadFormatter.java @@ -0,0 +1,9 @@ +package com.nordstrom.kafka.connect.formatters; + +import java.util.Collection; +import org.apache.kafka.connect.sink.SinkRecord; + +public interface PayloadFormatter { + String format(final SinkRecord record) throws PayloadFormattingException; + String formatBatch(final Collection records) throws PayloadFormattingException; +} diff --git a/src/main/java/com/nordstrom/kafka/connect/formatters/PayloadFormattingException.java b/src/main/java/com/nordstrom/kafka/connect/formatters/PayloadFormattingException.java new file mode 100644 index 0000000..c5e8375 --- /dev/null +++ b/src/main/java/com/nordstrom/kafka/connect/formatters/PayloadFormattingException.java @@ -0,0 +1,7 @@ +package com.nordstrom.kafka.connect.formatters; + +public class PayloadFormattingException extends RuntimeException { + public PayloadFormattingException (final Throwable e) { + super(e); + } +} diff --git a/src/main/java/com/nordstrom/kafka/connect/formatters/PlainPayload.java b/src/main/java/com/nordstrom/kafka/connect/formatters/PlainPayload.java new file mode 100644 index 0000000..f0542bd --- /dev/null +++ b/src/main/java/com/nordstrom/kafka/connect/formatters/PlainPayload.java @@ -0,0 +1,64 @@ +package com.nordstrom.kafka.connect.formatters; + +import org.apache.kafka.connect.sink.SinkRecord; + +public class PlainPayload { + private String key; + private String keySchemaName; + private String value; + private String valueSchemaName; + private String topic; + private int partition; + private long offset; + private long timestamp; + private String timestampTypeName; + + protected PlainPayload() { + } + + public PlainPayload(final SinkRecord record) { + this.key = record.key() == null ? "" : record.key().toString(); + if (record.keySchema() != null) + this.keySchemaName = record.keySchema().name(); + + this.value = record.value() == null ? "" : record.value().toString(); + if (record.valueSchema() != null) + this.valueSchemaName = record.valueSchema().name(); + + this.topic = record.topic(); + this.partition = record.kafkaPartition(); + this.offset = record.kafkaOffset(); + + if (record.timestamp() != null) + this.timestamp = record.timestamp(); + if (record.timestampType() != null) + this.timestampTypeName = record.timestampType().name; + } + + public String getValue() { return this.value; } + public void setValue(final String value) { this.value = value; } + + public long getOffset() { return this.offset; } + public void setOffset(final long offset) { this.offset = offset; } + + public Long getTimestamp() { return this.timestamp; } + public void setTimestamp(final long timestamp) { this.timestamp = timestamp; } + + public String getTimestampTypeName() { return this.timestampTypeName; } + public void setTimestampTypeName(final String timestampTypeName) { this.timestampTypeName = timestampTypeName; } + + public int getPartition() { return this.partition; } + public void setPartition(final int partition) { this.partition = partition; } + + public String getKey() { return this.key; } + public void setKey(final String key) { this.key = key; } + + public String getKeySchemaName() { return this.keySchemaName; } + public void setKeySchemaName(final String keySchemaName) { this.keySchemaName = keySchemaName; } + + public String getValueSchemaName() { return this.valueSchemaName; } + public void setValueSchemaName(final String valueSchemaName) { this.valueSchemaName = valueSchemaName; } + + public String getTopic() { return this.topic; } + public void setTopic(final String topic) { this.topic = topic; } +} diff --git a/src/main/java/com/nordstrom/kafka/connect/formatters/PlainPayloadFormatter.java b/src/main/java/com/nordstrom/kafka/connect/formatters/PlainPayloadFormatter.java new file mode 100644 index 0000000..982dc32 --- /dev/null +++ b/src/main/java/com/nordstrom/kafka/connect/formatters/PlainPayloadFormatter.java @@ -0,0 +1,43 @@ +package com.nordstrom.kafka.connect.formatters; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; + +import org.apache.kafka.connect.sink.SinkRecord; + +import java.util.Collection; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PlainPayloadFormatter implements PayloadFormatter { + private static final Logger LOGGER = LoggerFactory.getLogger(PlainPayloadFormatter.class); + private final ObjectWriter recordWriter = new ObjectMapper().writerFor(PlainPayload.class); + private final ObjectWriter recordsWriter = new ObjectMapper().writerFor(PlainPayload[].class); + + public String format(final SinkRecord record) { + PlainPayload payload = new PlainPayload(record); + + try { + return this.recordWriter.writeValueAsString(payload); + } catch (final JsonProcessingException e) { + LOGGER.error(e.getLocalizedMessage(), e); + throw new PayloadFormattingException(e); + } + } + + public String formatBatch(final Collection records) { + final PlainPayload[] payloads = records + .stream() + .map(record -> new PlainPayload(record)) + .toArray(PlainPayload[]::new); + + try { + return this.recordsWriter.writeValueAsString(payloads); + } catch (final JsonProcessingException e) { + LOGGER.error(e.getLocalizedMessage(), e); + throw new PayloadFormattingException(e); + } + } +} diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java b/src/main/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java index 4dac70b..36db3b7 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java @@ -9,22 +9,18 @@ import com.amazonaws.services.lambda.model.InvokeResult; import com.amazonaws.services.lambda.model.RequestTooLargeException; import com.nordstrom.kafka.connect.utils.Guard; -import org.apache.kafka.common.Configurable; -import org.apache.kafka.connect.errors.ConnectException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.time.Duration; import java.time.Instant; -import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class AwsLambdaUtil { - private static final Logger LOGGER = LoggerFactory.getLogger(AwsLambdaUtil.class); private static final int MEGABYTE_SIZE = 1024 * 1024; @@ -36,43 +32,20 @@ public class AwsLambdaUtil { private final AWSLambdaAsync lambdaClient; private final InvocationFailure failureMode; - public AwsLambdaUtil(final Configuration optConfigs, final Map bareAssumeRoleConfigs) { - LOGGER.debug("AwsLambdaUtil.ctor:bareAssumeRoleConfigs={}", bareAssumeRoleConfigs); - Guard.verifyNotNull(optConfigs, "optConfigs"); - - final AWSLambdaAsyncClientBuilder builder = AWSLambdaAsyncClientBuilder.standard(); - - // Will check if there's proxy configuration in the environment; if - // there's any will construct the client with it. - if (optConfigs.getHttpProxyHost().isPresent()) { - final ClientConfiguration clientConfiguration = new ClientConfiguration() - .withProxyHost(optConfigs.getHttpProxyHost().get()); - if (optConfigs.getHttpProxyPort().isPresent()) { - clientConfiguration.setProxyPort(optConfigs.getHttpProxyPort().get()); - } - builder.setClientConfiguration(clientConfiguration); - LOGGER.info("Setting proxy configuration for AWS Lambda Async client host: {} port {}", - optConfigs.getHttpProxyHost().get(), optConfigs.getHttpProxyPort().orElse(-1)); - } + public AwsLambdaUtil(ClientConfiguration clientConfiguration, + AWSCredentialsProvider credentialsProvider, + InvocationFailure failureMode) { - if (optConfigs.getAwsRegion().isPresent()) { - builder.setRegion(optConfigs.getAwsRegion().get()); - LOGGER.info("Using aws region: {}", optConfigs.getAwsRegion().toString()); - } + Guard.verifyNotNull(clientConfiguration, "clientConfiguration"); + Guard.verifyNotNull(credentialsProvider, "credentialsProvider"); - failureMode = optConfigs.getFailureMode().orElse(InvocationFailure.STOP); - - AWSCredentialsProvider provider = null; - try { - provider = getCredentialsProvider(bareAssumeRoleConfigs); - } catch (Exception e) { - LOGGER.error("Problem initializing provider", e); - } - if (provider != null) { - builder.setCredentials(provider); - } + final AWSLambdaAsyncClientBuilder builder = AWSLambdaAsyncClientBuilder.standard() + .withClientConfiguration(clientConfiguration) + .withCredentials(credentialsProvider); + this.failureMode = failureMode; this.lambdaClient = builder.build(); + LOGGER.info("AWS Lambda client initialized"); } @@ -155,41 +128,6 @@ InvocationResponse checkPayloadSizeForInvocationType(final byte[] payload, final return new InvocationResponse(413, e.getLocalizedMessage(), e.getLocalizedMessage(), start, Instant.now()); } - @SuppressWarnings("unchecked") - public AWSCredentialsProvider getCredentialsProvider(Map roleConfigs) { - LOGGER.info(".get-credentials-provider:assumeRoleConfigs={}", roleConfigs); - - try { - Object providerField = roleConfigs.get("class"); - String providerClass = LambdaSinkConnectorConfig.ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_DEFAULT.getValue(); - if (providerField != null) { - providerClass = providerField.toString(); - } - LOGGER.debug(".get-credentials-provider:field={}, class={}", providerField, providerClass); - AWSCredentialsProvider provider = ((Class) - getClass(providerClass)).newInstance(); - - if (provider instanceof Configurable) { - ((Configurable) provider).configure(roleConfigs); - } - - LOGGER.debug(".get-credentials-provider:provider={}", provider); - return provider; - } catch (IllegalAccessException | InstantiationException e) { - throw new ConnectException("Invalid class for: " + LambdaSinkConnectorConfig.ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG, e); - } - } - - public Class getClass(String className) { - LOGGER.debug(".get-class:class={}",className); - try { - return Class.forName(className); - } catch (ClassNotFoundException e) { - LOGGER.error("Provider class not found: {}", e); - } - return null; - } - private class LambdaInvocationException extends RuntimeException { public LambdaInvocationException(final Throwable e) { super(e); diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/Configuration.java b/src/main/java/com/nordstrom/kafka/connect/lambda/Configuration.java deleted file mode 100644 index ba261e2..0000000 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/Configuration.java +++ /dev/null @@ -1,85 +0,0 @@ -package com.nordstrom.kafka.connect.lambda; - -import com.nordstrom.kafka.connect.utils.Facility; - -import java.util.Optional; - -import static com.nordstrom.kafka.connect.lambda.InvocationFailure.DROP; - -public class Configuration { - - private static final int MAX_HTTP_PORT_NUMBER = 65536; - private final Optional httpProxyHost; - private final Optional httpProxyPort; - private final Optional awsRegion; - private final Optional failureMode; - private final Optional roleArn; - private final Optional sessionName; - private final Optional externalId; - - - public Configuration(final String httpProxyHost, - final Integer httpProxyPort, - final String awsRegion, - final InvocationFailure failureMode, - final String roleArn, - final String sessionName, - final String externalId) { - this.httpProxyHost = - Facility.isNotNullNorEmpty(httpProxyHost) ? Optional.of(httpProxyHost) : Optional.empty(); - this.httpProxyPort = Facility.isNotNullAndInRange(httpProxyPort, 0, MAX_HTTP_PORT_NUMBER) - ? Optional.of(httpProxyPort) : Optional.empty(); - this.awsRegion = - Facility.isNotNullNorEmpty(awsRegion) ? Optional.of(awsRegion) : Optional.empty(); - this.failureMode = Facility.isNotNull(failureMode) ? Optional.of(failureMode): Optional.of(DROP); - this.roleArn = - Facility.isNotNullNorEmpty(roleArn) ? Optional.of(roleArn) : Optional.empty(); - this.sessionName = - Facility.isNotNullNorEmpty(sessionName) ? Optional.of(sessionName) : Optional.empty(); - this.externalId = - Facility.isNotNullNorEmpty(externalId) ? Optional.of(externalId) : Optional.empty(); - - } - - public Configuration(final Optional httpProxyHost, - final Optional httpProxyPort, - final Optional awsRegion, - final Optional failureMode, - final Optional roleArn, - final Optional sessionName, - final Optional externalId) { - - this.httpProxyHost = httpProxyHost; - this.httpProxyPort = httpProxyPort; - this.awsRegion = awsRegion; - this.failureMode = failureMode; - this.roleArn = roleArn; - this.sessionName = sessionName; - this.externalId = externalId; - } - - public static Configuration empty() { - return new Configuration(Optional.empty(), - Optional.empty(), Optional.empty(), - Optional.empty(), Optional.empty(), - Optional.empty(), Optional.empty()); - } - - public Optional getHttpProxyHost() { - return this.httpProxyHost; - } - - public Optional getHttpProxyPort() { - return this.httpProxyPort; - } - - public Optional getAwsRegion() { return this.awsRegion; } - - public Optional getFailureMode() { return this.failureMode; } - - public Optional getRoleArn() { return this.roleArn; } - - public Optional getSessionName() { return this.sessionName; } - - public Optional getExternalId() { return this.externalId; } -} diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/KafkaSerializationException.java b/src/main/java/com/nordstrom/kafka/connect/lambda/KafkaSerializationException.java deleted file mode 100644 index 14b4f3d..0000000 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/KafkaSerializationException.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.nordstrom.kafka.connect.lambda; - -public class KafkaSerializationException extends RuntimeException { - - public KafkaSerializationException(final Throwable e) { - super(e); - } -} diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnector.java b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnector.java index 9876ceb..ad5bc01 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnector.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnector.java @@ -20,32 +20,27 @@ public class LambdaSinkConnector extends SinkConnector { @Override public List> taskConfigs(int maxTasks) { return IntStream.range(0, maxTasks) - .mapToObj(i -> { - final Map taskProperties = new HashMap<>( - this.configuration.getProperties()); - return taskProperties; - }) - .collect(Collectors.toList()); + .mapToObj(i -> { + return new HashMap<>(this.configuration.originalsStrings()); + }) + .collect(Collectors.toList()); } @Override public void start(Map settings) { - LOGGER.info("starting connector {}", - settings.getOrDefault(LambdaSinkConnectorConfig.ConfigurationKeys.NAME_CONFIG.getValue(), "")); - this.configuration = new LambdaSinkConnectorConfig(settings); - LOGGER.info("connector.start:OK"); + LOGGER.info("Starting connector {}", this.configuration.getConnectorName()); } @Override public void stop() { - LOGGER.info("connector.stop:OK"); + LOGGER.info("Stopping connector {}", this.configuration.getConnectorName()); } @Override public ConfigDef config() { - return LambdaSinkConnectorConfig.config(); + return LambdaSinkConnectorConfig.configDef(); } @Override diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfig.java b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfig.java index 9fc6450..c382d9e 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfig.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfig.java @@ -1,336 +1,442 @@ package com.nordstrom.kafka.connect.lambda; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.errors.ConnectException; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.ClientConfiguration; + +import com.nordstrom.kafka.connect.formatters.PayloadFormatter; +import com.nordstrom.kafka.connect.formatters.PlainPayloadFormatter; +import com.nordstrom.kafka.connect.utils.Guard; + import java.text.MessageFormat; import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; +import java.util.Arrays; import java.util.stream.Collectors; import java.util.stream.Stream; - -import com.amazonaws.auth.AWSCredentialsProvider; -import com.nordstrom.kafka.connect.utils.Guard; -import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigDef.Type; -import org.apache.kafka.common.config.ConfigDef.Importance; -import org.apache.kafka.common.config.ConfigException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.concurrent.ThreadLocalRandom; +import java.lang.reflect.InvocationTargetException; public class LambdaSinkConnectorConfig extends AbstractConfig { + private static final String AWS_REGION_DEFAULT = "us-west-2"; + private static final long AWS_LAMBDA_INVOCATION_TIMEOUT_MS_DEFAULT = 5 * 60 * 1000L; + private static final String HTTP_PROXY_HOST_DEFAULT = ""; + private static final int HTTP_PROXY_PORT_DEFAULT = -1; + private static final boolean AWS_LAMBDA_BATCH_ENABLED_DEFAULT = true; + private static final String AWS_LAMBDA_INVOCATION_MODE_DEFAULT = InvocationMode.SYNC.name(); + private static final String AWS_LAMBDA_INVOCATION_FAILURE_MODE_DEFAULT = InvocationFailure.STOP.name(); + private static final String RETRIABLE_ERROR_CODES_DEFAULT = "500,503,504"; + private static final int RETRY_BACKOFF_MILLIS_DEFAULT = 500; + private static final int RETRIES_DEFAULT = 5; + private static final String AWS_IAM_ROLE_ARN_DEFAULT = ""; + private static final String AWS_IAM_SESSION_NAME_DEFAULT = ""; + private static final String AWS_IAM_EXTERNAL_ID_DEFAULT = ""; + + private final String connectorName; + private final ClientConfiguration awsClientConfiguration; + private final AWSCredentialsProvider awsCredentialsProvider; + private final PayloadFormatter payloadFormatter; + private final Collection retriableErrorCodes; + + LambdaSinkConnectorConfig(final Map parsedConfig) { + this(configDef(), parsedConfig); + } - private static final long AWS_LAMBDA_INVOCATION_TIMEOUT_MS_DEFAULT = 5 * 60 * 1000L; - private static final String AWS_REGION_DEFAULT = "us-west-2"; - private static final String HTTP_PROXY_HOST_DEFAULT = ""; - private static final int HTTP_PROXY_PORT_DEFAULT = -1; - private static final boolean AWS_LAMBDA_BATCH_ENABLED_DEFAULT = true; - private static final String AWS_LAMBDA_INVOCATION_MODE_DEFAULT = InvocationMode.SYNC.name(); - private static final String AWS_LAMBDA_INVOCATION_FAILURE_MODE_DEFAULT = InvocationFailure.STOP.name(); - private static final String RETRIABLE_ERROR_CODES_DEFAULT = "500,503,504"; - private static final int RETRY_BACKOFF_MILLIS_DEFAULT = 500; - private static final int RETRIES_DEFAULT = 5; - private static final int MEGABYTE_SIZE = 1024 * 1024; - private static final String ROLE_ARN_DEFAULT = ""; - private static final String SESSION_NAME_DEFAULT = ""; - private static final String EXTERNAL_ID_DEFAULT = ""; - - private static final ConfigDef configDefinition = LambdaSinkConnectorConfig.config(); - - private static final Logger LOGGER = LoggerFactory.getLogger(LambdaSinkConnectorConfig.class); - - private final Map properties; - private final String connectorName; - private final String httpProxyHost; - private final Integer httpProxyPort; - private final String awsFunctionArn; - private final Duration invocationTimeout; - private final InvocationMode invocationMode; - private final boolean isBatchingEnabled; - private final long retryBackoffTimeMillis; - private final int retries; - private final Collection retriableErrorCodes; - private final boolean isWithJsonWrapper = true; - private final int maxBatchSizeBytes = (6 * MEGABYTE_SIZE) - 1; - private final String awsRegion; - private final InvocationFailure failureMode; - private final Object credentialsProviderClass; - private final String roleArn; - private final String sessionName; - private final String externalId; - - LambdaSinkConnectorConfig(final Map properties) { - this(configDefinition, properties); - } - - LambdaSinkConnectorConfig(final ConfigDef configDefinition, final Map props) { - super(configDefinition, props); - this.properties = props; - - this.connectorName = this.properties.getOrDefault( - ConfigurationKeys.NAME_CONFIG.getValue(), + LambdaSinkConnectorConfig(final ConfigDef configDef, final Map parsedConfig) { + super(configDef, parsedConfig); + + this.connectorName = parsedConfig.getOrDefault(ConfigurationKeys.NAME_CONFIG.getValue(), "LambdaSinkConnector-Unnamed-" + ThreadLocalRandom.current() - .ints(4) - .mapToObj(String::valueOf) - .collect(Collectors.joining())); - - this.httpProxyHost = this.getString(ConfigurationKeys.HTTP_PROXY_HOST.getValue()); - this.httpProxyPort = this.getInt(ConfigurationKeys.HTTP_PROXY_PORT.getValue()); - - this.awsFunctionArn = this.getString(ConfigurationKeys.AWS_LAMBDA_FUNCTION_ARN.getValue()); - this.invocationTimeout = Duration.ofMillis(this.getLong(ConfigurationKeys.AWS_LAMBDA_INVOCATION_TIMEOUT_MS.getValue())); - - this.invocationMode = InvocationMode.valueOf(this.getString(ConfigurationKeys.AWS_LAMBDA_INVOCATION_MODE.getValue())); - - this.isBatchingEnabled = this.getBoolean(ConfigurationKeys.AWS_LAMBDA_BATCH_ENABLED.getValue()); - this.retries = this.getInt(ConfigurationKeys.RETRIES_MAX.getValue()); - - final List retriableErrorCodesString = this.getList(ConfigurationKeys.RETRIABLE_ERROR_CODES.getValue()); - try { - this.retriableErrorCodes = retriableErrorCodesString - .stream() - .map(Integer::parseInt) - .collect(Collectors.toList()); - } catch (final NumberFormatException e) { - final String errorMessage = MessageFormat - .format("The list {1} was not able to parse to a list of integers", - retriableErrorCodesString.stream().collect(Collectors.joining(","))); - LOGGER.error(errorMessage); - throw new ConfigException(errorMessage, e); + .ints(4) + .mapToObj(String::valueOf) + .collect(Collectors.joining())); + + this.awsClientConfiguration = loadAwsClientConfiguration(); + this.awsCredentialsProvider = loadAwsCredentialsProvider(); + this.payloadFormatter = loadPayloadFormatter(); + this.retriableErrorCodes = loadRetriableErrorCodes(); } - this.retryBackoffTimeMillis = this.getInt(ConfigurationKeys.RETRY_BACKOFF_MILLIS.getValue()); - this.awsRegion = this.getString(ConfigurationKeys.AWS_REGION.getValue()); + public String getConnectorName() { + return this.connectorName; + } + + public String getAwsFunctionArn() { + return this.getString(ConfigurationKeys.AWS_LAMBDA_FUNCTION_ARN.getValue()); + } + + public Duration getInvocationTimeout() { + return Duration.ofMillis(this.getLong(ConfigurationKeys.AWS_LAMBDA_INVOCATION_TIMEOUT_MS.getValue())); + } - this.failureMode = InvocationFailure.valueOf(this.getString(ConfigurationKeys.AWS_LAMBDA_INVOCATION_FAILURE_MODE.getValue())); + public InvocationMode getInvocationMode() { + return InvocationMode.valueOf(this.getString(ConfigurationKeys.AWS_LAMBDA_INVOCATION_MODE.getValue())); + } - this.credentialsProviderClass = this.getClass(ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG.getValue()); - this.roleArn = this.getString(ConfigurationKeys.ROLE_ARN_CONFIG.getValue()); - this.sessionName = this.getString(ConfigurationKeys.SESSION_NAME_CONFIG.getValue()); - this.externalId = this.getString(ConfigurationKeys.EXTERNAL_ID_CONFIG.getValue()); - } + public InvocationFailure getFailureMode() { + return InvocationFailure.valueOf(this.getString(ConfigurationKeys.AWS_LAMBDA_INVOCATION_FAILURE_MODE.getValue())); + } - public Map getProperties() { - return this.properties; - } + public boolean isBatchingEnabled() { + return this.getBoolean(ConfigurationKeys.AWS_LAMBDA_BATCH_ENABLED.getValue()); + } - public String getConnectorName() { - return this.connectorName; - } + public long getRetryBackoffTimeMillis() { + return this.getInt(ConfigurationKeys.RETRY_BACKOFF_MILLIS.getValue()); + } - public String getAwsFunctionArn() { - return this.awsFunctionArn; - } + public int getRetries() { + return this.getInt(ConfigurationKeys.RETRIES_MAX.getValue()); + } - public Duration getInvocationTimeout() { - return this.invocationTimeout; - } + public Collection getRetriableErrorCodes() { + return this.retriableErrorCodes; + } - public Integer getHttpProxyPort() { - return this.httpProxyPort; - } + Collection loadRetriableErrorCodes() { + final List retriableErrorCodesString = this.getList(ConfigurationKeys.RETRIABLE_ERROR_CODES.getValue()); + try { + return retriableErrorCodesString + .stream() + .map(Integer::parseInt) + .collect(Collectors.toList()); + } catch (final NumberFormatException e) { + final String errorMessage = MessageFormat + .format("The list {1} was not able to parse to a list of integers", + retriableErrorCodesString.stream().collect(Collectors.joining(","))); + throw new ConfigException(errorMessage, e); + } + } - public String getHttpProxyHost() { - return this.httpProxyHost; - } + public String getAwsRegion() { + return this.getString(ConfigurationKeys.AWS_REGION.getValue()); + } - public boolean isWithJsonWrapper() { - return this.isWithJsonWrapper; - } + public ClientConfiguration getAwsClientConfiguration() { + return this.awsClientConfiguration; + } - public InvocationMode getInvocationMode() { - return this.invocationMode; - } + ClientConfiguration loadAwsClientConfiguration() { + ClientConfiguration clientConfiguration = new ClientConfiguration(); - public InvocationFailure getFailureMode() { - return this.failureMode; - } + String httpProxyHost = this.getString(ConfigurationKeys.HTTP_PROXY_HOST.getValue()); - public boolean isBatchingEnabled() { - return this.isBatchingEnabled; - } + if (!httpProxyHost.isEmpty()) { + clientConfiguration.setProxyHost(httpProxyHost); - public long getRetryBackoffTimeMillis() { - return this.retryBackoffTimeMillis; - } - - public int getRetries() { - return this.retries; - } - - public Collection getRetriableErrorCodes() { - return this.retriableErrorCodes; - } - - public int getMaxBatchSizeBytes() { - return this.maxBatchSizeBytes; - } - - public String getAwsRegion() { - return this.awsRegion; - } - - public String getRoleArn() { - return this.roleArn; - } - - public String getSessionName() { - return sessionName; - } - - public String getExternalId() { - return externalId; - } - - static ConfigDef getConfigDefinition() { - return configDefinition; - } - - public static ConfigDef config() { - return new ConfigDef() - .define(ConfigurationKeys.AWS_LAMBDA_FUNCTION_ARN.getValue(), Type.STRING, Importance.HIGH, - ConfigurationKeys.AWS_LAMBDA_FUNCTION_ARN.getDocumentation()) - - .define(ConfigurationKeys.AWS_LAMBDA_INVOCATION_TIMEOUT_MS.getValue(), Type.LONG, - AWS_LAMBDA_INVOCATION_TIMEOUT_MS_DEFAULT, Importance.HIGH, - ConfigurationKeys.AWS_LAMBDA_INVOCATION_TIMEOUT_MS.getDocumentation()) - - .define(ConfigurationKeys.AWS_LAMBDA_INVOCATION_MODE.getValue(), Type.STRING, - AWS_LAMBDA_INVOCATION_MODE_DEFAULT, Importance.MEDIUM, - ConfigurationKeys.AWS_LAMBDA_INVOCATION_MODE.getDocumentation()) - - .define(ConfigurationKeys.AWS_LAMBDA_INVOCATION_FAILURE_MODE.getValue(), Type.STRING, - AWS_LAMBDA_INVOCATION_FAILURE_MODE_DEFAULT, Importance.MEDIUM, - ConfigurationKeys.AWS_LAMBDA_INVOCATION_FAILURE_MODE.getDocumentation()) - - .define(ConfigurationKeys.AWS_LAMBDA_BATCH_ENABLED.getValue(), Type.BOOLEAN, - AWS_LAMBDA_BATCH_ENABLED_DEFAULT, Importance.MEDIUM, - ConfigurationKeys.AWS_LAMBDA_BATCH_ENABLED.getDocumentation()) - - .define(ConfigurationKeys.AWS_REGION.getValue(), Type.STRING, AWS_REGION_DEFAULT, - Importance.LOW, ConfigurationKeys.AWS_REGION.getDocumentation()) - - .define(ConfigurationKeys.HTTP_PROXY_HOST.getValue(), Type.STRING, HTTP_PROXY_HOST_DEFAULT, - Importance.LOW, ConfigurationKeys.HTTP_PROXY_HOST.getDocumentation()) - - .define(ConfigurationKeys.HTTP_PROXY_PORT.getValue(), Type.INT, HTTP_PROXY_PORT_DEFAULT, - Importance.LOW, ConfigurationKeys.HTTP_PROXY_PORT.getDocumentation()) - - .define(ConfigurationKeys.RETRIES_MAX.getValue(), Type.INT, RETRIES_DEFAULT, Importance.MEDIUM, - ConfigurationKeys.RETRIES_MAX.getDocumentation()) - - .define(ConfigurationKeys.RETRY_BACKOFF_MILLIS.getValue(), Type.INT, - RETRY_BACKOFF_MILLIS_DEFAULT, Importance.MEDIUM, - ConfigurationKeys.RETRY_BACKOFF_MILLIS.getDocumentation()) - - .define(ConfigurationKeys.RETRIABLE_ERROR_CODES.getValue(), Type.LIST, - RETRIABLE_ERROR_CODES_DEFAULT, Importance.MEDIUM, - ConfigurationKeys.RETRIABLE_ERROR_CODES.getDocumentation()) - - .define(ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG.getValue(), Type.CLASS, - ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_DEFAULT.getValue(), - new CredentialsProviderValidator(), - Importance.LOW, - ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG.getDocumentation(), - "LAMBDA", - 0, - ConfigDef.Width.LONG, - "AWS Credentials Provider Class") - - .define(ConfigurationKeys.ROLE_ARN_CONFIG.getValue(), Type.STRING, ROLE_ARN_DEFAULT, - Importance.LOW, ConfigurationKeys.ROLE_ARN_CONFIG.getDocumentation()) - - .define(ConfigurationKeys.SESSION_NAME_CONFIG.getValue(), Type.STRING, SESSION_NAME_DEFAULT, - Importance.LOW, ConfigurationKeys.SESSION_NAME_CONFIG.getDocumentation()) - - .define(ConfigurationKeys.EXTERNAL_ID_CONFIG.getValue(), Type.STRING, EXTERNAL_ID_DEFAULT, - Importance.LOW, ConfigurationKeys.EXTERNAL_ID_CONFIG.getDocumentation()); - } - - enum ConfigurationKeys { - NAME_CONFIG("name", "Connector Name"), - TASK_ID("task.id", "Connector Task Id"), - AWS_LAMBDA_FUNCTION_ARN("aws.lambda.function.arn", "Full ARN of the function to be called"), - AWS_LAMBDA_INVOCATION_TIMEOUT_MS("aws.lambda.invocation.timeout.ms", - "Time to wait for a lambda invocation, if the response times out, the connector will move forward. Default in ms: " - + AWS_LAMBDA_INVOCATION_TIMEOUT_MS_DEFAULT), - AWS_LAMBDA_INVOCATION_MODE("aws.lambda.invocation.mode", - "Determines whether the lambda would be called asynchronously (Event) or Synchronously (Request-Response), possible values are: " - + Stream.of(InvocationMode.values()).map(InvocationMode::toString) - .collect(Collectors.joining(","))), - AWS_LAMBDA_INVOCATION_FAILURE_MODE("aws.lambda.invocation.failure.mode", // TODO Maybe generalize for other types of failures - "Determines whether the lambda should stop or drop and continue on failure (specifically, payload limit exceeded), possible values are: " - + Stream.of(InvocationFailure.values()).map(InvocationFailure::toString) - .collect(Collectors.joining(","))), - - AWS_LAMBDA_BATCH_ENABLED("aws.lambda.batch.enabled", - "Boolean that determines if the messages will be batched together before sending them to aws lambda. By default is " + AWS_LAMBDA_BATCH_ENABLED_DEFAULT), - AWS_REGION("aws.region", - "AWS region to instantiate the Lambda client Default: " + AWS_REGION_DEFAULT), - - HTTP_PROXY_HOST("http.proxy.host", - "Http proxy port to be configured for the Lambda client, by default is empty"), - HTTP_PROXY_PORT("http.proxy.port", - "Http proxy to be configured for the Lambda client, by default is empty"), - RETRIES_MAX("retries.max", "Max number of times to retry a call"), - RETRIABLE_ERROR_CODES("retriable.error.codes" - , "A comma separated list with the error codes to be retried, by default " - + RETRIABLE_ERROR_CODES_DEFAULT), - RETRY_BACKOFF_MILLIS("retry.backoff.millis", - "The amount of time to wait between retry attempts, by default is " - + RETRY_BACKOFF_MILLIS_DEFAULT), - - // AWS assume role support options - CREDENTIALS_PROVIDER_CLASS_CONFIG("aws.lambda.credentials.provider.class", "REQUIRED Class providing cross-account role assumption"), - CREDENTIALS_PROVIDER_CLASS_DEFAULT("com.amazonaws.auth.DefaultAWSCredentialsProviderChain", "Default provider chain if aws.lambda.credentials.provider.class is not passed in"), - CREDENTIALS_PROVIDER_CONFIG_PREFIX("aws.lambda.credentials.provider.", "Note trailing '.'"), - ROLE_ARN_CONFIG("aws.lambda.credentials.provider.role.arn", " REQUIRED AWS Role ARN providing the access"), - SESSION_NAME_CONFIG("aws.lambda.credentials.provider.session.name", "REQUIRED Session name"), - EXTERNAL_ID_CONFIG("aws.lambda.credentials.provider.external.id", "OPTIONAL (but recommended) External identifier used by the kafka-connect-lambda when assuming the role"); - - private final String value; - private final String documentation; - - ConfigurationKeys(final String configurationKeyValue, final String documentation) { - Guard.verifyNotNullOrEmpty(configurationKeyValue, "configurationKeyValue"); - - // Empty or null documentation is ok. - this.value = configurationKeyValue; - this.documentation = documentation; + Integer httpProxyPort = this.getInt(ConfigurationKeys.HTTP_PROXY_PORT.getValue()); + if (httpProxyPort != HTTP_PROXY_PORT_DEFAULT) + clientConfiguration.setProxyPort(httpProxyPort); + } + + return clientConfiguration; + } + + public AWSCredentialsProvider getAwsCredentialsProvider() { + return this.awsCredentialsProvider; + } + + @SuppressWarnings("unchecked") + AWSCredentialsProvider loadAwsCredentialsProvider() { + String configKey = ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG.getValue(); + + try { + AWSCredentialsProvider awsCredentialsProvider = ((Class) + getClass(configKey)).getDeclaredConstructor().newInstance(); + + if (awsCredentialsProvider instanceof Configurable) { + Map configs = originalsWithPrefix( + ConfigurationKeys.CREDENTIALS_PROVIDER_CONFIG_PREFIX.getValue()); + + ((Configurable)awsCredentialsProvider).configure(configs); + } + + return awsCredentialsProvider; + + } catch (IllegalAccessException | InstantiationException | InvocationTargetException | NoSuchMethodException e) { + throw new ConnectException("Unable to create " + configKey, e); + } } - String getValue() { - return this.value; + public String getIamRoleArn() { + return this.getString(ConfigurationKeys.AWS_IAM_ROLE_ARN_CONFIG.getValue()); } - String getDocumentation() { - return this.documentation; + public String getIamSessionName() { + return this.getString(ConfigurationKeys.AWS_IAM_SESSION_NAME_CONFIG.getValue()); + } + + public String getIamExternalId() { + return this.getString(ConfigurationKeys.AWS_IAM_EXTERNAL_ID_CONFIG.getValue()); } - @Override - public String toString() { - return this.value; + public PayloadFormatter getPayloadFormatter() { + return this.payloadFormatter; } - } - - private static class CredentialsProviderValidator implements ConfigDef.Validator { - @Override - public void ensureValid(String name, Object provider) { - if (provider instanceof Class - && AWSCredentialsProvider.class.isAssignableFrom((Class) provider)) { - return; - } - throw new ConfigException( - name, - provider, - "Class must extend: " + AWSCredentialsProvider.class - ); + + @SuppressWarnings("unchecked") + PayloadFormatter loadPayloadFormatter() { + String configKey = ConfigurationKeys.PAYLOAD_FORMATTER_CLASS_CONFIG.getValue(); + + try { + PayloadFormatter payloadFormatter = ((Class) + getClass(configKey)).getDeclaredConstructor().newInstance(); + + return payloadFormatter; + + } catch (IllegalAccessException | InstantiationException | InvocationTargetException | NoSuchMethodException e) { + throw new ConnectException("Unable to create " + configKey, e); + } + } + + public static ConfigDef configDef() { + return new ConfigDef() + .define(ConfigurationKeys.AWS_LAMBDA_FUNCTION_ARN.getValue(), Type.STRING, Importance.HIGH, + ConfigurationKeys.AWS_LAMBDA_FUNCTION_ARN.getDocumentation()) + + .define(ConfigurationKeys.AWS_LAMBDA_INVOCATION_TIMEOUT_MS.getValue(), Type.LONG, + AWS_LAMBDA_INVOCATION_TIMEOUT_MS_DEFAULT, Importance.HIGH, + ConfigurationKeys.AWS_LAMBDA_INVOCATION_TIMEOUT_MS.getDocumentation()) + + .define(ConfigurationKeys.AWS_LAMBDA_INVOCATION_MODE.getValue(), Type.STRING, + AWS_LAMBDA_INVOCATION_MODE_DEFAULT, + new InvocationModeValidator(), + Importance.MEDIUM, + ConfigurationKeys.AWS_LAMBDA_INVOCATION_MODE.getDocumentation(), + "LAMBDA", + 0, + ConfigDef.Width.SHORT, + "Invocation mode", + new InvocationModeRecommender()) + + .define(ConfigurationKeys.AWS_LAMBDA_INVOCATION_FAILURE_MODE.getValue(), Type.STRING, + AWS_LAMBDA_INVOCATION_FAILURE_MODE_DEFAULT, + new InvocationFailureValidator(), + Importance.MEDIUM, + ConfigurationKeys.AWS_LAMBDA_INVOCATION_FAILURE_MODE.getDocumentation(), + "LAMBDA", + 0, + ConfigDef.Width.SHORT, + "Invocation mode", + new InvocationFailureRecommender()) + + .define(ConfigurationKeys.AWS_LAMBDA_BATCH_ENABLED.getValue(), Type.BOOLEAN, + AWS_LAMBDA_BATCH_ENABLED_DEFAULT, Importance.MEDIUM, + ConfigurationKeys.AWS_LAMBDA_BATCH_ENABLED.getDocumentation()) + + .define(ConfigurationKeys.AWS_REGION.getValue(), Type.STRING, AWS_REGION_DEFAULT, + Importance.LOW, ConfigurationKeys.AWS_REGION.getDocumentation()) + + .define(ConfigurationKeys.HTTP_PROXY_HOST.getValue(), Type.STRING, HTTP_PROXY_HOST_DEFAULT, + Importance.LOW, ConfigurationKeys.HTTP_PROXY_HOST.getDocumentation()) + + .define(ConfigurationKeys.HTTP_PROXY_PORT.getValue(), Type.INT, HTTP_PROXY_PORT_DEFAULT, + Importance.LOW, ConfigurationKeys.HTTP_PROXY_PORT.getDocumentation()) + + .define(ConfigurationKeys.RETRIES_MAX.getValue(), Type.INT, RETRIES_DEFAULT, Importance.MEDIUM, + ConfigurationKeys.RETRIES_MAX.getDocumentation()) + + .define(ConfigurationKeys.RETRY_BACKOFF_MILLIS.getValue(), Type.INT, + RETRY_BACKOFF_MILLIS_DEFAULT, Importance.MEDIUM, + ConfigurationKeys.RETRY_BACKOFF_MILLIS.getDocumentation()) + + .define(ConfigurationKeys.RETRIABLE_ERROR_CODES.getValue(), Type.LIST, + RETRIABLE_ERROR_CODES_DEFAULT, Importance.MEDIUM, + ConfigurationKeys.RETRIABLE_ERROR_CODES.getDocumentation()) + + .define(ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG.getValue(), Type.CLASS, + ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_DEFAULT.getValue(), + new AwsCredentialsProviderValidator(), + Importance.LOW, + ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG.getDocumentation(), + "LAMBDA", + 0, + ConfigDef.Width.LONG, + "AWS credentials provider class") + + .define(ConfigurationKeys.AWS_IAM_ROLE_ARN_CONFIG.getValue(), Type.STRING, AWS_IAM_ROLE_ARN_DEFAULT, + Importance.LOW, ConfigurationKeys.AWS_IAM_ROLE_ARN_CONFIG.getDocumentation()) + + .define(ConfigurationKeys.AWS_IAM_SESSION_NAME_CONFIG.getValue(), Type.STRING, AWS_IAM_SESSION_NAME_DEFAULT, + Importance.LOW, ConfigurationKeys.AWS_IAM_SESSION_NAME_CONFIG.getDocumentation()) + + .define(ConfigurationKeys.AWS_IAM_EXTERNAL_ID_CONFIG.getValue(), Type.STRING, AWS_IAM_EXTERNAL_ID_DEFAULT, + Importance.LOW, ConfigurationKeys.AWS_IAM_EXTERNAL_ID_CONFIG.getDocumentation()) + + .define(ConfigurationKeys.PAYLOAD_FORMATTER_CLASS_CONFIG.getValue(), Type.CLASS, + PlainPayloadFormatter.class, + new PayloadFormatterClassValidator(), + Importance.LOW, + ConfigurationKeys.PAYLOAD_FORMATTER_CLASS_CONFIG.getDocumentation(), + "LAMBDA", + 0, + ConfigDef.Width.LONG, + "Invocation payload formatter class"); } - @Override - public String toString() { - return "Any class implementing: " + AWSCredentialsProvider.class; + enum ConfigurationKeys { + NAME_CONFIG("name", "Connector Name"), + AWS_LAMBDA_FUNCTION_ARN("aws.lambda.function.arn", "Full ARN of the function to be called"), + AWS_LAMBDA_INVOCATION_TIMEOUT_MS("aws.lambda.invocation.timeout.ms", + "Time to wait for a lambda invocation, if the response times out, the connector will move forward. Default in ms: " + + AWS_LAMBDA_INVOCATION_TIMEOUT_MS_DEFAULT), + AWS_LAMBDA_INVOCATION_MODE("aws.lambda.invocation.mode", + "Determines whether the lambda would be called asynchronously (Event) or Synchronously (Request-Response), possible values are: [" + + Stream.of(InvocationMode.values()).map(InvocationMode::toString) + .collect(Collectors.joining(",")) + "]"), + AWS_LAMBDA_INVOCATION_FAILURE_MODE("aws.lambda.invocation.failure.mode", + "Determines whether the lambda should stop or drop and continue on failure (specifically, payload limit exceeded), possible values are: [" + + Stream.of(InvocationFailure.values()).map(InvocationFailure::toString) + .collect(Collectors.joining(",")) + "]"), + + AWS_LAMBDA_BATCH_ENABLED("aws.lambda.batch.enabled", + "Boolean that determines if the messages will be batched together before sending them to aws lambda. By default is " + AWS_LAMBDA_BATCH_ENABLED_DEFAULT), + AWS_REGION("aws.region", + "AWS region to instantiate the Lambda client Default: " + AWS_REGION_DEFAULT), + + HTTP_PROXY_HOST("http.proxy.host", + "Http proxy port to be configured for the Lambda client, by default is empty"), + HTTP_PROXY_PORT("http.proxy.port", + "Http proxy to be configured for the Lambda client, by default is empty"), + RETRIES_MAX("retries.max", "Max number of times to retry a call"), + RETRIABLE_ERROR_CODES("retriable.error.codes" + , "A comma separated list with the error codes to be retried, by default " + + RETRIABLE_ERROR_CODES_DEFAULT), + RETRY_BACKOFF_MILLIS("retry.backoff.millis", + "The amount of time to wait between retry attempts, by default is " + + RETRY_BACKOFF_MILLIS_DEFAULT), + + CREDENTIALS_PROVIDER_CLASS_CONFIG("aws.credentials.provider.class", "Class providing cross-account role assumption"), + CREDENTIALS_PROVIDER_CLASS_DEFAULT("com.amazonaws.auth.DefaultAWSCredentialsProviderChain", "Default provider chain if aws.lambda.credentials.provider.class is not passed in"), + CREDENTIALS_PROVIDER_CONFIG_PREFIX("aws.credentials.provider.", "Note trailing '.'"), + AWS_IAM_ROLE_ARN_CONFIG("aws.credentials.provider.role.arn", "REQUIRED AWS Role ARN providing the access"), + AWS_IAM_SESSION_NAME_CONFIG("aws.credentials.provider.session.name", "REQUIRED Session name"), + AWS_IAM_EXTERNAL_ID_CONFIG("aws.credentials.provider.external.id", "OPTIONAL (but recommended) External identifier used by the kafka-connect-lambda when assuming the role"), + + PAYLOAD_FORMATTER_CLASS_CONFIG("payload.formatter.class", "Class formatter for the invocation payload"); + + private final String value; + private final String documentation; + + ConfigurationKeys(final String configurationKeyValue, final String documentation) { + Guard.verifyNotNullOrEmpty(configurationKeyValue, "configurationKeyValue"); + + // Empty or null documentation is ok. + this.value = configurationKeyValue; + this.documentation = documentation; + } + + String getValue() { + return this.value; + } + + String getDocumentation() { + return this.documentation; + } + + @Override + public String toString() { + return this.value; + } + } + + private static class InvocationModeRecommender implements ConfigDef.Recommender { + @Override + public List validValues(String name, Map connectorConfigs) { + return Arrays.asList(InvocationMode.values()); + } + + @Override + public boolean visible(String name, Map connectorConfigs) { + return true; + } + } + + private static class InvocationModeValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object invocationMode) { + try { + InvocationMode.valueOf(((String)invocationMode).trim()); + } catch (Exception e) { + throw new ConfigException(name, invocationMode, "Value must be one of [" + + Utils.join(InvocationMode.values(), ", ") + "]"); + } + } + + @Override + public String toString() { + return "[" + Utils.join(InvocationMode.values(), ", ") + "]"; + } + } + + private static class InvocationFailureRecommender implements ConfigDef.Recommender { + @Override + public List validValues(String name, Map connectorConfigs) { + return Arrays.asList(InvocationFailure.values()); + } + + @Override + public boolean visible(String name, Map connectorConfigs) { + return true; + } + } + + private static class InvocationFailureValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object invocationFailure) { + try { + InvocationFailure.valueOf(((String)invocationFailure).trim()); + } catch (Exception e) { + throw new ConfigException(name, invocationFailure, "Value must be one of [" + + Utils.join(InvocationFailure.values(), ", ") + "]"); + } + } + + @Override + public String toString() { + return "[" + Utils.join(InvocationFailure.values(), ", ") + "]"; + } + } + + private static class AwsCredentialsProviderValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object provider) { + if (provider instanceof Class && AWSCredentialsProvider.class.isAssignableFrom((Class)provider)) { + return; + } + + throw new ConfigException(name, provider, "Class must extend: " + AWSCredentialsProvider.class); + } + + @Override + public String toString() { + return "Any class implementing: " + AWSCredentialsProvider.class; + } + } + + private static class PayloadFormatterClassValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object formatter) { + if (formatter instanceof Class && PayloadFormatter.class.isAssignableFrom((Class)formatter)) { + return; + } + + throw new ConfigException(name, formatter, "Class must extend: " + PayloadFormatter.class); + } + + @Override + public String toString() { + return "Any class implementing: " + PayloadFormatter.class; + } } - } } diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java index 84fd964..c7e73ff 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java @@ -1,10 +1,13 @@ package com.nordstrom.kafka.connect.lambda; import com.nordstrom.kafka.connect.About; +import com.nordstrom.kafka.connect.formatters.PayloadFormatter; +import com.nordstrom.kafka.connect.formatters.PayloadFormattingException; import com.nordstrom.kafka.connect.utils.JsonUtil; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; @@ -26,10 +29,11 @@ public class LambdaSinkTask extends SinkTask { private final Queue batchRecords = new ConcurrentLinkedQueue<>(); private final AtomicInteger retryCount = new AtomicInteger(0); + private final int maxBatchSizeBytes = (6 * 1024 * 1024) - 1; AwsLambdaUtil lambdaClient; - Map properties; - LambdaSinkTaskConfiguration configuration; + PayloadFormatter payloadFormatter; + LambdaSinkConnectorConfig configuration; @Override public String version() { @@ -37,27 +41,19 @@ public String version() { } @Override - public void start(final Map props) { - this.properties = props; - this.configuration = new LambdaSinkTaskConfiguration(this.properties); + public void start(final Map settings) { + this.configuration = new LambdaSinkConnectorConfig(settings); - LOGGER.info("starting connector {} task {}", - this.configuration.getConnectorName(), - this.configuration.getTaskId()); - - Configuration optConfigs = new Configuration( - this.configuration.getHttpProxyHost(), - this.configuration.getHttpProxyPort(), - this.configuration.getAwsRegion(), - this.configuration.getFailureMode(), - this.configuration.getRoleArn(), - this.configuration.getSessionName(), - this.configuration.getExternalId()); - this.lambdaClient = new AwsLambdaUtil(optConfigs, configuration.originalsWithPrefix(LambdaSinkConnectorConfig.ConfigurationKeys.CREDENTIALS_PROVIDER_CONFIG_PREFIX.getValue())); - - LOGGER.info("Context for connector {} task {}, Assignments[{}], ", + LOGGER.info("Starting lambda connector {} task.", this.configuration.getConnectorName()); + + this.lambdaClient = new AwsLambdaUtil( + this.configuration.getAwsClientConfiguration(), + this.configuration.getAwsCredentialsProvider(), + this.configuration.getFailureMode()); + this.payloadFormatter = this.configuration.getPayloadFormatter(); + + LOGGER.info("Context for connector {} task, Assignments[{}], ", this.configuration.getConnectorName(), - this.configuration.getTaskId(), this.context .assignment() .stream() @@ -69,21 +65,19 @@ public void start(final Map props) { @Override public void put(final Collection records) { if (records == null || records.isEmpty()) { - LOGGER.debug("No records to process. connector=\"{}\" task=\"{}\"", - this.configuration.getConnectorName(), - this.configuration.getTaskId()); + LOGGER.debug("No records to process. connector=\"{}\"", + this.configuration.getConnectorName()); return; } if (this.configuration.isBatchingEnabled()) { this.batchRecords.addAll(records); final int batchLength = this.getPayload(this.batchRecords).getBytes().length; - if (batchLength >= this.configuration.getMaxBatchSizeBytes()) { - LOGGER.warn("Batch size reached {} bytes within {} records. connector=\"{}\" task=\"{}\"", + if (batchLength >= maxBatchSizeBytes) { + LOGGER.warn("Batch size reached {} bytes within {} records. connector=\"{}\"", batchLength, this.batchRecords.size(), - this.configuration.getConnectorName(), - this.configuration.getTaskId()); + this.configuration.getConnectorName()); this.rinse(); this.context.requestCommit(); } @@ -110,7 +104,7 @@ private void rinse() { if (! records.isEmpty()) { - this.splitBatch(records, this.configuration.getMaxBatchSizeBytes()) + this.splitBatch(records, maxBatchSizeBytes) .forEach(recordsToFlush -> { final AwsLambdaUtil.InvocationResponse response = this.invoke(this.getPayload(recordsToFlush)); @@ -125,10 +119,9 @@ private void rinse() { .collect(Collectors.joining(" | ")); if (responsesMsg != null && !responsesMsg.isEmpty()) { final String message = MessageFormat.format( - "Response Summary Batch - arn=\"{0}\", connector=\"{1}\" task=\"{2}\", recordcount=\"{3}\" responseCode=\"{4}\" response=\"{5}\" start=\"{6}\" durationtimemillis=\"{7}\" | {8}", + "Response Summary Batch - arn=\"{0}\", connector=\"{1}\" recordcount=\"{3}\" responseCode=\"{4}\" response=\"{5}\" start=\"{6}\" durationtimemillis=\"{7}\" | {8}", this.configuration.getAwsFunctionArn(), this.configuration.getConnectorName(), - this.configuration.getTaskId(), recordsToFlush.size(), response.getStatusCode(), String.join(" : ", response.getResponseString(), response.getErrorString()), @@ -143,16 +136,14 @@ private void rinse() { }); if (!this.batchRecords.isEmpty()) { LOGGER.error( - "Race Condition Found between sinkConnector.put() and sinkConnector.flush() connector=\"{}\" task=\"{}\"", - this.configuration.getConnectorName(), - this.configuration.getTaskId()); + "Race Condition Found between sinkConnector.put() and sinkConnector.flush() connector=\"{}\"", + this.configuration.getConnectorName()); } } else { - LOGGER.info("No records sent in the flush cycle. connector=\"{}\" task=\"{}\"", - this.configuration.getConnectorName(), - this.configuration.getTaskId()); + LOGGER.info("No records sent in the flush cycle. connector=\"{}\"", + this.configuration.getConnectorName()); } this.context.requestCommit(); } @@ -187,17 +178,19 @@ private Collection> splitBatch(final List rec } private String getPayload(final SinkRecord record) { - return this.configuration.isWithJsonWrapper() ? - new SinkRecordSerializable(record).toJsonString() : - record.value().toString(); + try { + return this.payloadFormatter.format(record); + } catch (final PayloadFormattingException e) { + throw new DataException("Record could not be formatted.", e); + } } private String getPayload(final Collection records) { - final List stringRecords = records - .stream() - .map(this::getPayload) - .collect(Collectors.toList()); - return JsonUtil.jsonify(stringRecords); + try { + return this.payloadFormatter.formatBatch(records); + } catch (final PayloadFormattingException e) { + throw new DataException("Records could not be formatted.", e); + } } private AwsLambdaUtil.InvocationResponse invoke(final String payload) { @@ -293,27 +286,10 @@ private void handleResponse( @Override public void stop() { - LOGGER.info("stopping lambda connector {} task {}.", - this.properties.getOrDefault(LambdaSinkConnectorConfig.ConfigurationKeys.NAME_CONFIG.getValue(), "undefined"), - this.properties.getOrDefault(LambdaSinkConnectorConfig.ConfigurationKeys.TASK_ID.getValue(), "undefined")); - } - - class LambdaSinkTaskConfiguration extends LambdaSinkConnectorConfig { - - private final String taskId; - - LambdaSinkTaskConfiguration(final Map properties) { - super(LambdaSinkConnectorConfig.getConfigDefinition(), properties); - this.taskId = "0";//this.getString(ConfigurationKeys.TASK_ID.getValue()); - } - - String getTaskId() { - return this.taskId; - } + LOGGER.info("Stopping lambda connector {} task.", this.configuration.getConnectorName()); } private class OutOfRetriesException extends RuntimeException { - OutOfRetriesException(final String message) { super(message); } diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/SinkRecordSerializable.java b/src/main/java/com/nordstrom/kafka/connect/lambda/SinkRecordSerializable.java index e2c242e..ba58e9a 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/SinkRecordSerializable.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/SinkRecordSerializable.java @@ -3,6 +3,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; +import com.nordstrom.kafka.connect.formatters.PayloadFormattingException; + import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,7 +119,7 @@ public String toJsonString() { return this.jsonWriter.writeValueAsString(this); } catch (final JsonProcessingException e) { LOGGER.error(e.getLocalizedMessage(), e); - throw new KafkaSerializationException(e); + throw new PayloadFormattingException(e); } } diff --git a/src/test/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProviderTest.java b/src/test/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProviderTest.java index e3c5853..d31a836 100644 --- a/src/test/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProviderTest.java +++ b/src/test/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProviderTest.java @@ -14,12 +14,11 @@ public class AWSAssumeRoleCredentialsProviderTest { @Before public void setupTestConfigs() { - testConfigs = - new ImmutableMap.Builder() - .put(AWSAssumeRoleCredentialsProvider.EXTERNAL_ID_CONFIG, "test-external-id") - .put(AWSAssumeRoleCredentialsProvider.ROLE_ARN_CONFIG, "arn:aws:iam::123456789012:role/test-role") - .put(AWSAssumeRoleCredentialsProvider.SESSION_NAME_CONFIG, "test-session-name") - .build(); + testConfigs = new ImmutableMap.Builder() + .put(AWSAssumeRoleCredentialsProvider.EXTERNAL_ID_CONFIG, "test-external-id") + .put(AWSAssumeRoleCredentialsProvider.ROLE_ARN_CONFIG, "arn:aws:iam::123456789012:role/test-role") + .put(AWSAssumeRoleCredentialsProvider.SESSION_NAME_CONFIG, "test-session-name") + .build(); } @Test diff --git a/src/test/java/com/nordstrom/kafka/connect/formatters/PlainPayloadFormatterTest.java b/src/test/java/com/nordstrom/kafka/connect/formatters/PlainPayloadFormatterTest.java new file mode 100644 index 0000000..31ff155 --- /dev/null +++ b/src/test/java/com/nordstrom/kafka/connect/formatters/PlainPayloadFormatterTest.java @@ -0,0 +1,78 @@ +package com.nordstrom.kafka.connect.formatters; + +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.data.SchemaBuilder; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.nordstrom.kafka.connect.formatters.PlainPayload; + +public class PlainPayloadFormatterTest { + + private Schema keySchema; + private Schema valueSchema; + + @Before + public void setup() { + keySchema = SchemaBuilder.struct() + .name("com.test.HelloKey") + .field("name", Schema.STRING_SCHEMA) + .build(); + valueSchema = SchemaBuilder.struct() + .name("com.test.HelloValue") + .field("name", Schema.STRING_SCHEMA) + .build(); + } + + @Test + public void testFormatSingleRecord() throws IOException { + PlainPayloadFormatter f = new PlainPayloadFormatter(); + SinkRecord record = new SinkRecord("test-topic", 1, keySchema, "test-key", valueSchema, "test-value", 2, 3L, TimestampType.NO_TIMESTAMP_TYPE); + + String result = f.format(record); + + PlainPayload payload = new ObjectMapper() + .readValue(result, PlainPayload.class); + + assertEquals("test-topic", payload.getTopic()); + assertEquals(1, payload.getPartition()); + assertEquals("test-key", payload.getKey()); + assertEquals("com.test.HelloKey", payload.getKeySchemaName()); + assertEquals("test-value", payload.getValue()); + assertEquals("com.test.HelloValue", payload.getValueSchemaName()); + assertEquals(2, payload.getOffset()); + assertEquals(new Long(3), payload.getTimestamp()); + assertEquals("NoTimestampType", payload.getTimestampTypeName()); + } + + @Test + public void testFormatBatchOfRecords() throws IOException { + PlainPayloadFormatter f = new PlainPayloadFormatter(); + + List records = Arrays.asList( + new SinkRecord("test-topic", 1, null, "test-key1", null, "test-value1", 0), + new SinkRecord("test-topic", 1, null, "test-key2", null, "test-value2", 1), + new SinkRecord("test-topic", 1, null, "test-key3", null, "test-value3", 2) + ); + + String result = f.formatBatch(records); + + PlainPayload[] payloads = new ObjectMapper() + .readValue(result, PlainPayload[].class); + + assertEquals(3, payloads.length); + for (int i = 0; i < payloads.length; i++) { + assertEquals(i, payloads[i].getOffset()); + } + } +} diff --git a/src/test/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtilTest.java b/src/test/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtilTest.java index e2afbea..6dfdf26 100644 --- a/src/test/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtilTest.java +++ b/src/test/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtilTest.java @@ -1,11 +1,12 @@ package com.nordstrom.kafka.connect.lambda; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.services.lambda.model.InvocationType; import com.amazonaws.services.lambda.model.RequestTooLargeException; import org.junit.Test; import java.time.Instant; -import java.util.HashMap; import static org.junit.Assert.*; @@ -13,24 +14,34 @@ public class AwsLambdaUtilTest { @Test(expected = RequestTooLargeException.class) public void testCheckPayloadSizeForInvocationTypeWithInvocationFailureModeStopThrowsException() { - - final Configuration testOptConfigs = new Configuration("testhost", 123, "test-region", InvocationFailure.STOP, "test-arn", "test-session", "test-external-id"); - final AwsLambdaUtil testUtil = new AwsLambdaUtil(testOptConfigs, new HashMap<>()); - - testUtil.checkPayloadSizeForInvocationType("testpayload".getBytes(), InvocationType.RequestResponse, Instant.now(), new RequestTooLargeException("Request payload is too large!")); + AwsLambdaUtil util = new AwsLambdaUtil( + new ClientConfiguration(), + new DefaultAWSCredentialsProviderChain(), + InvocationFailure.STOP); + + util.checkPayloadSizeForInvocationType( + "testpayload".getBytes(), + InvocationType.RequestResponse, + Instant.now(), + new RequestTooLargeException("Request payload is too large!")); } @Test public void testCheckPayloadSizeForInvocationTypeWithInvocationFailureModeDropContinues() { - AwsLambdaUtil.InvocationResponse testResp = null; RequestTooLargeException ex = null; - final Configuration testOptConfigs = new Configuration("testhost", 123, "test-region", InvocationFailure.DROP, "test-arn", "test-session", "test-external-id"); - final AwsLambdaUtil testUtil = new AwsLambdaUtil(testOptConfigs, new HashMap<>()); + AwsLambdaUtil util = new AwsLambdaUtil( + new ClientConfiguration(), + new DefaultAWSCredentialsProviderChain(), + InvocationFailure.DROP); try { - testResp = testUtil.checkPayloadSizeForInvocationType("testpayload".getBytes(), InvocationType.RequestResponse, Instant.now(), new RequestTooLargeException("Request payload is too large!")); + testResp = util.checkPayloadSizeForInvocationType( + "testpayload".getBytes(), + InvocationType.RequestResponse, + Instant.now(), + new RequestTooLargeException("Request payload is too large!")); } catch (RequestTooLargeException e) { ex = e; } diff --git a/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfigTest.java b/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfigTest.java new file mode 100644 index 0000000..f98d855 --- /dev/null +++ b/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfigTest.java @@ -0,0 +1,72 @@ +package com.nordstrom.kafka.connect.lambda; + +import org.junit.Test; + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.nordstrom.kafka.connect.formatters.PlainPayloadFormatter; +import com.nordstrom.kafka.connect.lambda.InvocationMode; +import com.nordstrom.kafka.connect.lambda.InvocationFailure; + +import static org.junit.Assert.*; + +import java.util.HashMap; + +public class LambdaSinkConnectorConfigTest { + @Test + public void minimalConfig() { + LambdaSinkConnectorConfig config = new LambdaSinkConnectorConfig( + new HashMap() { + { + put("aws.lambda.function.arn", "my-function"); + } + }); + + assertTrue("Expected auto-generated connector name", + config.getConnectorName().contains("LambdaSinkConnector-Unnamed")); + + assertEquals("my-function", config.getAwsFunctionArn()); + + assertNotNull(config.getAwsRegion()); + assertNotNull(config.getAwsClientConfiguration()); + assertNotNull(config.getInvocationTimeout()); + assertNotNull(config.getFailureMode()); + assertNotNull(config.getInvocationMode()); + assertNotNull(config.getRetries()); + assertNotNull(config.getRetriableErrorCodes()); + assertNotNull(config.getIamRoleArn()); + assertNotNull(config.getIamExternalId()); + assertNotNull(config.getIamSessionName()); + + assertEquals(DefaultAWSCredentialsProviderChain.class, config.getAwsCredentialsProvider().getClass()); + assertEquals(PlainPayloadFormatter.class, config.getPayloadFormatter().getClass()); + } + + @Test + public void sampleConfig() { + LambdaSinkConnectorConfig config = new LambdaSinkConnectorConfig( + new HashMap() { + { + put("name", "test-connector"); + put("aws.region", "test-region"); + put("aws.lambda.function.arn", "test-function"); + put("aws.lambda.invocation.timeout.ms", "123"); + put("aws.lambda.invocation.mode", "SYNC"); + put("aws.lambda.invocation.failure.mode", "DROP"); + put("aws.lambda.batch.enabled", "true"); + put("retriable.error.codes", "1,2,3"); + put("retry.backoff.millis", "123"); + put("retries.max", "123"); + } + }); + + assertEquals("test-connector", config.getConnectorName()); + assertEquals("test-region", config.getAwsRegion()); + assertEquals("PT0.123S", config.getInvocationTimeout().toString()); + assertEquals(InvocationMode.SYNC, config.getInvocationMode()); + assertEquals(InvocationFailure.DROP, config.getFailureMode()); + assertEquals(true, config.isBatchingEnabled()); + assertEquals(3, config.getRetriableErrorCodes().size()); + assertEquals(123, config.getRetryBackoffTimeMillis()); + assertEquals(123, config.getRetries()); + } +} diff --git a/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java b/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java index d36871c..df58120 100644 --- a/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java +++ b/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java @@ -1,6 +1,8 @@ package com.nordstrom.kafka.connect.lambda; import com.amazonaws.services.lambda.model.InvocationType; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.google.common.collect.ImmutableMap; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.data.Schema; @@ -13,7 +15,6 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import static org.apache.kafka.connect.data.Schema.STRING_SCHEMA; import static org.junit.Assert.*; @@ -22,43 +23,7 @@ import static org.mockito.Mockito.when; public class LambdaSinkTaskTest { - - @Test - public void testStartProperlyInitializesSinkTaskWithSampleConnectorConfigurations(){ - - ImmutableMap props = - new ImmutableMap.Builder() - .put("connector.class", "com.nordstrom.kafka.connect.lambda.LambdaSinkConnector") - .put("tasks.max", "1") - .put("aws.region", "test-region") - .put("aws.lambda.function.arn", "arn:aws:lambda:us-west-2:123456789123:function:test-lambda") - .put("aws.lambda.invocation.timeout.ms", "300000") - .put("aws.lambda.invocation.mode", "SYNC") - .put("aws.lambda.batch.enabled", "true") - .put("key.converter", "org.apache.kafka.connect.storage.StringConverter") - .put("value.converter", "org.apache.kafka.connect.storage.StringConverter") - .put("topics", "connect-lambda-test") - .build(); - - LambdaSinkTask task = new LambdaSinkTask(); - task.initialize(mock(SinkTaskContext.class)); - - task.start(props); - - assertNotNull(task.configuration); - assertNotNull(task.properties); - assertNotNull(task.lambdaClient); - - assertEquals("0", task.configuration.getTaskId()); - assertEquals("test-region", task.configuration.getAwsRegion()); - assertEquals("arn:aws:lambda:us-west-2:123456789123:function:test-lambda", task.configuration.getAwsFunctionArn()); - assertEquals("PT5M", task.configuration.getInvocationTimeout().toString()); - assertEquals("SYNC", task.configuration.getInvocationMode().toString()); - assertTrue(task.configuration.isBatchingEnabled()); - assertEquals(6291455, task.configuration.getMaxBatchSizeBytes()); - } - - @Ignore("Test is ignored as a demonstration -- needs profile") + @Ignore("Test is ignored as a demonstration -- needs credentials") @Test public void testPutWhenBatchingIsNotEnabled() { @@ -84,14 +49,12 @@ public void testPutWhenBatchingIsNotEnabled() { AwsLambdaUtil mockedLambdaClient = mock(AwsLambdaUtil.class); - when(mockedLambdaClient.invoke(anyString(), anyObject(), anyObject(), eq(InvocationType.RequestResponse))).thenReturn(new AwsLambdaUtil( new Configuration( - task.configuration.getHttpProxyHost(), - task.configuration.getHttpProxyPort(), - task.configuration.getAwsRegion(), - task.configuration.getFailureMode(), - task.configuration.getRoleArn(), - task.configuration.getSessionName(), - task.configuration.getExternalId()), new HashMap<>()).new InvocationResponse(200, "test log", "", Instant.now(), Instant.now())); + when(mockedLambdaClient.invoke(anyString(), anyObject(), anyObject(), eq(InvocationType.RequestResponse))) + .thenReturn(new AwsLambdaUtil( + new ClientConfiguration(), + new DefaultAWSCredentialsProviderChain(), + InvocationFailure.STOP) + .new InvocationResponse(200, "test log", "", Instant.now(), Instant.now())); Schema testSchema = SchemaBuilder.struct().name("com.nordstrom.kafka.connect.lambda.foo").field("bar", STRING_SCHEMA).build();