From 3ec8b8583b6086bc8323afd5bbf2cf60e941533c Mon Sep 17 00:00:00 2001 From: Sean Williams Date: Fri, 12 Jul 2019 18:43:44 -0700 Subject: [PATCH 1/9] Functionality to allow connector to assume an AWS IAM role when invoking a lambda --- .travis.yml | 1 + LICENSE | 201 ++++++++++++++++++ README.md | 9 +- pom.xml | 25 ++- .../AWSAssumeRoleCredentialsProvider.java | 88 ++++++++ .../kafka/connect/lambda/AwsLambdaUtil.java | 81 +++++-- .../kafka/connect/lambda/Configuration.java | 36 +++- .../connect/lambda/LambdaSinkConnector.java | 1 + .../lambda/LambdaSinkConnectorConfig.java | 135 ++++++++---- .../kafka/connect/lambda/LambdaSinkTask.java | 40 ++-- .../connect/{lambda => utils}/About.java | 2 +- .../connect/{lambda => utils}/Facility.java | 2 +- .../connect/{lambda => utils}/Guard.java | 2 +- .../connect/{lambda => utils}/JsonUtil.java | 2 +- .../AWSAssumeRoleCredentialsProviderTest.java | 71 +++++++ .../connect/lambda/AwsLambdaUtilTest.java | 9 +- .../connect/lambda/LambdaSinkTaskTest.java | 6 +- 17 files changed, 622 insertions(+), 89 deletions(-) create mode 100644 .travis.yml create mode 100644 LICENSE create mode 100644 src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java rename src/main/java/com/nordstrom/kafka/connect/{lambda => utils}/About.java (82%) rename src/main/java/com/nordstrom/kafka/connect/{lambda => utils}/Facility.java (94%) rename src/main/java/com/nordstrom/kafka/connect/{lambda => utils}/Guard.java (93%) rename src/main/java/com/nordstrom/kafka/connect/{lambda => utils}/JsonUtil.java (94%) create mode 100644 src/test/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProviderTest.java diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..dff5f3a --- /dev/null +++ b/.travis.yml @@ -0,0 +1 @@ +language: java diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..261eeb9 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md index dff04b2..8b20cae 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ The AWS Lambda connector plugin provides the ability to use AWS Lambda functions as a sink (out of a Kafka topic into a Lambda function). ## Supported Kafka and AWS versions -The `kafka-connect-lambda` connector has been tested with `connect-api:2.1.0` and `aws-java-sdk-lambda:1.11.490` +The `kafka-connect-lambda` connector has been tested with `connect-api:2.1.0` and `aws-java-sdk-lambda:1.11.587` # Building You can build the connector with Maven using the standard lifecycle goals: @@ -18,6 +18,13 @@ A sink connector reads from a Kafka topic and sends events to an AWS Lambda func A sink connector configuration has two required fields: * `aws.lambda.function.arn`: The AWS ARN of the Lambda function to send events to. * `topics`: The Kafka topic to be read from. + +### AWS Assume Role Support options + The connector can assume a cross-account role: + * `lambda.credentials.provider.class=com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider`: REQUIRED Class providing cross-account role assumption. + * `lambda.credentials.provider.role.arn`: REQUIRED AWS Role ARN providing the access. + * `lambda.credentials.provider.session.name`: REQUIRED Session name + * `lambda.credentials.provider.external.id`: OPTIONAL (but recommended) External identifier used by the `kafka-connect-lambda` when assuming the role. ### Sample Configuration ```json diff --git a/pom.xml b/pom.xml index abc71b2..a5ce0ee 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,11 @@ 3.0.0 2.1.0 - 1.11.490 + 1.11.587 + 4.12 + 2.28.2 + 19.0 + 1.18.4 @@ -39,23 +43,34 @@ com.amazonaws aws-java-sdk-lambda - ${aws.sdk.version} + ${aws-java-sdk.version} + + + com.amazonaws + aws-java-sdk-sts + ${aws-java-sdk.version} junit junit - 4.12 + ${junit.version} test org.mockito mockito-core - 2.25.1 + ${mockito-core.version} com.google.guava guava - 19.0 + ${google.guava.version} + + + org.projectlombok + lombok + ${lombok.version} + provided diff --git a/src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java b/src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java new file mode 100644 index 0000000..e076b08 --- /dev/null +++ b/src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java @@ -0,0 +1,88 @@ +package com.nordstrom.kafka.connect.auth; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; +import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; +import lombok.Getter; +import org.apache.kafka.common.Configurable; + +import java.util.Map; + +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; + +@Getter +public class AWSAssumeRoleCredentialsProvider implements AWSCredentialsProvider, Configurable { + //NB: uncomment slf4j imports and field declaration to enable logging. +// private static final Logger log = LoggerFactory.getLogger(AWSAssumeRoleCredentialsProvider.class); + + public static final String EXTERNAL_ID_CONFIG = "external.id"; + public static final String ROLE_ARN_CONFIG = "role.arn"; + public static final String SESSION_NAME_CONFIG = "session.name"; + + private String externalId; + private String roleArn; + private String sessionName; + + @Override + public void configure(Map map) { + externalId = getOptionalField(map, EXTERNAL_ID_CONFIG); + roleArn = getRequiredField(map, ROLE_ARN_CONFIG); + sessionName = getRequiredField(map, SESSION_NAME_CONFIG); + } + + @Override + public AWSCredentials getCredentials() { + AWSSecurityTokenServiceClientBuilder clientBuilder = AWSSecurityTokenServiceClientBuilder.standard(); + AWSCredentialsProvider provider = new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, sessionName) + .withStsClient(clientBuilder.defaultClient()) + .withExternalId(externalId) + .build(); + + return provider.getCredentials(); + } + + @Override + public void refresh() { + //Nothing to do really, since we are assuming a role. + } + + String getOptionalField(final Map map, final String fieldName) { + final Object field = map.get(fieldName); + if (isNotNull(field)) { + return field.toString(); + } + return null; + } + + String getRequiredField(final Map map, final String fieldName) { + final Object field = map.get(fieldName); + verifyNotNull(field, fieldName); + final String fieldValue = field.toString(); + verifyNotNullOrEmpty(fieldValue, fieldName); + + return fieldValue; + } + + private boolean isNotNull(final Object field) { + return null != field; + } + + private boolean isNotNullOrEmpty(final String field) { + return null != field && !field.isEmpty(); + } + + private void verifyNotNull(final Object field, final String fieldName) { + if (!isNotNull(field)) { + throw new IllegalArgumentException(String.format("The field '%1s' should not be null", fieldName)); + } + } + + private void verifyNotNullOrEmpty(final String field, final String fieldName) { + if (!isNotNullOrEmpty(field)) { + throw new IllegalArgumentException(String.format("The field '%1s' should not be null or empty", fieldName)); + } + } + +} diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java b/src/main/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java index 25b45fd..d718ff3 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java @@ -1,19 +1,23 @@ package com.nordstrom.kafka.connect.lambda; import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.profile.ProfileCredentialsProvider; +import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.lambda.AWSLambdaAsync; import com.amazonaws.services.lambda.AWSLambdaAsyncClientBuilder; import com.amazonaws.services.lambda.model.InvocationType; import com.amazonaws.services.lambda.model.InvokeRequest; import com.amazonaws.services.lambda.model.InvokeResult; import com.amazonaws.services.lambda.model.RequestTooLargeException; +import com.nordstrom.kafka.connect.utils.Guard; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.connect.errors.ConnectException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.time.Duration; import java.time.Instant; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -32,37 +36,41 @@ public class AwsLambdaUtil { private final AWSLambdaAsync lambdaClient; private final InvocationFailure failureMode; - public AwsLambdaUtil(final Configuration config) { - Guard.verifyNotNull(config, "config"); + public AwsLambdaUtil(final Configuration optConfigs, final Map bareAssumeRoleConfigs) { + LOGGER.warn("AwsLambdaUtil.ctor:bareAssumeRoleConfigs={}", bareAssumeRoleConfigs); + Guard.verifyNotNull(optConfigs, "optConfigs"); + final AWSLambdaAsyncClientBuilder builder = AWSLambdaAsyncClientBuilder.standard(); // Will check if there's proxy configuration in the environment; if // there's any will construct the client with it. - if (config.getHttpProxyHost().isPresent()) { + if (optConfigs.getHttpProxyHost().isPresent()) { final ClientConfiguration clientConfiguration = new ClientConfiguration() - .withProxyHost(config.getHttpProxyHost().get()); - if (config.getHttpProxyPort().isPresent()) { - clientConfiguration.setProxyPort(config.getHttpProxyPort().get()); + .withProxyHost(optConfigs.getHttpProxyHost().get()); + if (optConfigs.getHttpProxyPort().isPresent()) { + clientConfiguration.setProxyPort(optConfigs.getHttpProxyPort().get()); } builder.setClientConfiguration(clientConfiguration); LOGGER.info("Setting proxy configuration for AWS Lambda Async client host: {} port {}", - config.getHttpProxyHost().get(), config.getHttpProxyPort().orElse(-1)); + optConfigs.getHttpProxyHost().get(), optConfigs.getHttpProxyPort().orElse(-1)); } - // If there's a credentials profile configuration in the environment will - // use it. - if (config.getCredentialsProfile().isPresent()) { - builder.setCredentials(new ProfileCredentialsProvider(config.getCredentialsProfile().get())); - LOGGER.info("Using aws credentials profile {} for AWS Lambda client", - config.getCredentialsProfile().get()); + if (optConfigs.getAwsRegion().isPresent()) { + builder.setRegion(optConfigs.getAwsRegion().get()); + LOGGER.info("Using aws region: {}", optConfigs.getAwsRegion().toString()); } - if (config.getAwsRegion().isPresent()) { - builder.setRegion(config.getAwsRegion().get()); - LOGGER.info("Using aws region: {}", config.getAwsRegion().toString()); - } + failureMode = optConfigs.getFailureMode().orElse(InvocationFailure.STOP); - failureMode = config.getFailureMode().orElse(InvocationFailure.STOP); + AWSCredentialsProvider provider = null; + try { + provider = getCredentialsProvider(bareAssumeRoleConfigs); + } catch (Exception e) { + LOGGER.error("Problem initializing provider", e); + } + if (provider != null) { + builder.setCredentials(provider); + } this.lambdaClient = builder.build(); LOGGER.info("AWS Lambda client initialized"); @@ -147,6 +155,41 @@ InvocationResponse checkPayloadSizeForInvocationType(final byte[] payload, final return new InvocationResponse(413, e.getLocalizedMessage(), e.getLocalizedMessage(), start, Instant.now()); } + @SuppressWarnings("unchecked") + public AWSCredentialsProvider getCredentialsProvider(Map roleConfigs) { + LOGGER.warn(".get-credentials-provider:assumeRoleConfigs={}", roleConfigs); + + try { + Object providerField = roleConfigs.get("class"); + String providerClass = LambdaSinkConnectorConfig.ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_DEFAULT.getValue(); + if (providerField != null) { + providerClass = providerField.toString(); + } + LOGGER.warn(".get-credentials-provider:field={}, class={}", providerField, providerClass); + AWSCredentialsProvider provider = ((Class) + getClass(providerClass)).newInstance(); + + if (provider instanceof Configurable) { + ((Configurable) provider).configure(roleConfigs); + } + + LOGGER.warn(".get-credentials-provider:provider={}", provider); + return provider; + } catch (IllegalAccessException | InstantiationException e) { + throw new ConnectException("Invalid class for: " + LambdaSinkConnectorConfig.ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG, e); + } + } + + public Class getClass(String className) { + LOGGER.warn(".get-class:class={}",className); + try { + return Class.forName(className); + } catch (ClassNotFoundException e) { + LOGGER.error("Provider class not found: {}", e); + } + return null; + } + private class LambdaInvocationException extends RuntimeException { public LambdaInvocationException(final Throwable e) { super(e); diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/Configuration.java b/src/main/java/com/nordstrom/kafka/connect/lambda/Configuration.java index ee743b2..e2d995e 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/Configuration.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/Configuration.java @@ -1,5 +1,7 @@ package com.nordstrom.kafka.connect.lambda; +import com.nordstrom.kafka.connect.utils.Facility; + import java.util.Optional; import static com.nordstrom.kafka.connect.lambda.InvocationFailure.DROP; @@ -12,10 +14,16 @@ public class Configuration { private final Optional httpProxyPort; private final Optional awsRegion; private final Optional failureMode; + private final Optional roleArn; + private final Optional sessionName; + private final Optional externalId; + public Configuration(final String credentialsProfile, final String httpProxyHost, final Integer httpProxyPort, final String awsRegion, - final InvocationFailure failureMode) { + final InvocationFailure failureMode, + final String roleArn, final String sessionName, + final String externalId) { /* * String awsCredentialsProfile = * System.getenv(CREDENTIALS_PROFILE_CONFIG_ENV); String awsProxyHost = @@ -33,23 +41,40 @@ public Configuration(final String credentialsProfile, final String httpProxyHost this.awsRegion = Facility.isNotNullNorEmpty(awsRegion) ? Optional.of(awsRegion) : Optional.empty(); this.failureMode = Facility.isNotNull(failureMode) ? Optional.of(failureMode): Optional.of(DROP); + this.roleArn = + Facility.isNotNullNorEmpty(roleArn) ? Optional.of(roleArn) : Optional.empty(); + this.sessionName = + Facility.isNotNullNorEmpty(sessionName) ? Optional.of(sessionName) : Optional.empty(); + this.externalId = + Facility.isNotNullNorEmpty(externalId) ? Optional.of(externalId) : Optional.empty(); + } public Configuration(final Optional awsCredentialsProfile, final Optional httpProxyHost, final Optional httpProxyPort, final Optional awsRegion, - final Optional failureMode) { + final Optional failureMode, + final Optional roleArn, + final Optional sessionName, + final Optional externalId) { this.credentialsProfile = awsCredentialsProfile; this.httpProxyHost = httpProxyHost; this.httpProxyPort = httpProxyPort; this.awsRegion = awsRegion; this.failureMode = failureMode; + this.roleArn = roleArn; + this.sessionName = sessionName; + this.externalId = externalId; } public static Configuration empty() { - return new Configuration(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()); + return new Configuration( + Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); } public Optional getCredentialsProfile() { @@ -68,4 +93,9 @@ public Optional getHttpProxyPort() { public Optional getFailureMode() { return this.failureMode; } + public Optional getRoleArn() { return this.roleArn; } + + public Optional getSessionName() { return this.sessionName; } + + public Optional getExternalId() { return this.externalId; } } diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnector.java b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnector.java index 0ccf310..a342849 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnector.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnector.java @@ -6,6 +6,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import com.nordstrom.kafka.connect.utils.About; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfig.java b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfig.java index 70faa21..872a4f8 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfig.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfig.java @@ -1,13 +1,5 @@ package com.nordstrom.kafka.connect.lambda; -import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigDef.Type; -import org.apache.kafka.common.config.ConfigDef.Importance; -import org.apache.kafka.common.config.ConfigException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.text.MessageFormat; import java.time.Duration; import java.util.Collection; @@ -17,6 +9,16 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.nordstrom.kafka.connect.utils.Guard; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class LambdaSinkConnectorConfig extends AbstractConfig { private static final long AWS_LAMBDA_INVOCATION_TIMEOUT_MS_DEFAULT = 5 * 60 * 1000L; @@ -30,10 +32,13 @@ public class LambdaSinkConnectorConfig extends AbstractConfig { private static final String RETRIABLE_ERROR_CODES_DEFAULT = "500,503,504"; private static final int RETRY_BACKOFF_MILLIS_DEFAULT = 500; private static final int RETRIES_DEFAULT = 5; + private static final int MEGABYTE_SIZE = 1024 * 1024; + private static final String ROLE_ARN_DEFAULT = ""; + private static final String SESSION_NAME_DEFAULT = ""; + private static final String EXTERNAL_ID_DEFAULT = ""; private static final ConfigDef configDefinition = LambdaSinkConnectorConfig.config(); - private static final int MEGABYTE_SIZE = 1024 * 1024; private static final Logger LOGGER = LoggerFactory.getLogger(LambdaSinkConnectorConfig.class); private final Map properties; @@ -52,8 +57,12 @@ public class LambdaSinkConnectorConfig extends AbstractConfig { private final int maxBatchSizeBytes = (6 * MEGABYTE_SIZE) - 1; private final String awsRegion; private final InvocationFailure failureMode; + private final Object credentialsProviderClass; + private final String roleArn; + private final String sessionName; + private final String externalId; - public LambdaSinkConnectorConfig(final Map properties) { + LambdaSinkConnectorConfig(final Map properties) { this(configDefinition, properties); } @@ -68,26 +77,20 @@ public LambdaSinkConnectorConfig(final Map properties) { .mapToObj(String::valueOf) .collect(Collectors.joining())); + this.httpProxyHost = this.getString(ConfigurationKeys.HTTP_PROXY_HOST.getValue()); this.httpProxyPort = this.getInt(ConfigurationKeys.HTTP_PROXY_PORT.getValue()); - this.httpProxyHost = this.getString(ConfigurationKeys.HTTP_PROXY_HOST.getValue()); - this.awsCredentialsProfile = this - .getString(ConfigurationKeys.AWS_CREDENTIALS_PROFILE.getValue()); + this.awsCredentialsProfile = this.getString(ConfigurationKeys.AWS_CREDENTIALS_PROFILE.getValue()); this.awsFunctionArn = this.getString(ConfigurationKeys.AWS_LAMBDA_FUNCTION_ARN.getValue()); - this.invocationTimeout = Duration.ofMillis( - this.getLong(ConfigurationKeys.AWS_LAMBDA_INVOCATION_TIMEOUT_MS.getValue()) - ); + this.invocationTimeout = Duration.ofMillis(this.getLong(ConfigurationKeys.AWS_LAMBDA_INVOCATION_TIMEOUT_MS.getValue())); - this.invocationMode = InvocationMode.valueOf( - this.getString(ConfigurationKeys.AWS_LAMBDA_INVOCATION_MODE.getValue()) - ); + this.invocationMode = InvocationMode.valueOf(this.getString(ConfigurationKeys.AWS_LAMBDA_INVOCATION_MODE.getValue())); this.isBatchingEnabled = this.getBoolean(ConfigurationKeys.AWS_LAMBDA_BATCH_ENABLED.getValue()); this.retries = this.getInt(ConfigurationKeys.RETRIES_MAX.getValue()); - final List retriableErrorCodesString = this - .getList(ConfigurationKeys.RETRIABLE_ERROR_CODES.getValue()); + final List retriableErrorCodesString = this.getList(ConfigurationKeys.RETRIABLE_ERROR_CODES.getValue()); try { this.retriableErrorCodes = retriableErrorCodesString .stream() @@ -104,10 +107,12 @@ public LambdaSinkConnectorConfig(final Map properties) { this.awsRegion = this.getString(ConfigurationKeys.AWS_REGION.getValue()); - this.failureMode = InvocationFailure.valueOf( - this.getString(ConfigurationKeys.AWS_LAMBDA_INVOCATION_FAILURE_MODE.getValue()) - ); + this.failureMode = InvocationFailure.valueOf(this.getString(ConfigurationKeys.AWS_LAMBDA_INVOCATION_FAILURE_MODE.getValue())); + this.credentialsProviderClass = this.getClass(ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG.getValue()); + this.roleArn = this.getString(ConfigurationKeys.ROLE_ARN_CONFIG.getValue()); + this.sessionName = this.getString(ConfigurationKeys.SESSION_NAME_CONFIG.getValue()); + this.externalId = this.getString(ConfigurationKeys.EXTERNAL_ID_CONFIG.getValue()); } public Map getProperties() { @@ -174,6 +179,18 @@ public String getAwsRegion() { return this.awsRegion; } + public String getRoleArn() { + return this.roleArn; + } + + public String getSessionName() { + return sessionName; + } + + public String getExternalId() { + return externalId; + } + static ConfigDef getConfigDefinition() { return configDefinition; } @@ -221,13 +238,32 @@ public static ConfigDef config() { .define(ConfigurationKeys.RETRIABLE_ERROR_CODES.getValue(), Type.LIST, RETRIABLE_ERROR_CODES_DEFAULT, Importance.MEDIUM, - ConfigurationKeys.RETRIABLE_ERROR_CODES.getDocumentation()); + ConfigurationKeys.RETRIABLE_ERROR_CODES.getDocumentation()) + + .define(ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG.getValue(), Type.CLASS, + ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_DEFAULT.getValue(), + new CredentialsProviderValidator(), + Importance.LOW, + ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG.getDocumentation(), + "LAMBDA", + 0, + ConfigDef.Width.LONG, + "AWS Credentials Provider Class") + + .define(ConfigurationKeys.ROLE_ARN_CONFIG.getValue(), Type.STRING, ROLE_ARN_DEFAULT, + Importance.LOW, ConfigurationKeys.ROLE_ARN_CONFIG.getDocumentation()) + + .define(ConfigurationKeys.SESSION_NAME_CONFIG.getValue(), Type.STRING, SESSION_NAME_DEFAULT, + Importance.LOW, ConfigurationKeys.SESSION_NAME_CONFIG.getDocumentation()) + + .define(ConfigurationKeys.EXTERNAL_ID_CONFIG.getValue(), Type.STRING, EXTERNAL_ID_DEFAULT, + Importance.LOW, ConfigurationKeys.EXTERNAL_ID_CONFIG.getDocumentation()); } - public enum ConfigurationKeys { + enum ConfigurationKeys { NAME_CONFIG("name", "Connector Name"), - TASK_ID("task.id", "Connector Task id"), - AWS_LAMBDA_FUNCTION_ARN("aws.lambda.function.arn", "Full ARN for the function to be called."), + TASK_ID("task.id", "Connector Task Id"), + AWS_LAMBDA_FUNCTION_ARN("aws.lambda.function.arn", "Full ARN of the function to be called"), AWS_LAMBDA_INVOCATION_TIMEOUT_MS("aws.lambda.invocation.timeout.ms", "Time to wait for a lambda invocation, if the response times out, the connector will move forward. Default in ms: " + AWS_LAMBDA_INVOCATION_TIMEOUT_MS_DEFAULT), @@ -241,13 +277,11 @@ public enum ConfigurationKeys { .collect(Collectors.joining(","))), AWS_LAMBDA_BATCH_ENABLED("aws.lambda.batch.enabled", - "Boolean that determines if the messages will be batched together before sending them to aws lambda. By default is " - + AWS_LAMBDA_BATCH_ENABLED_DEFAULT - ), + "Boolean that determines if the messages will be batched together before sending them to aws lambda. By default is " + AWS_LAMBDA_BATCH_ENABLED_DEFAULT), AWS_REGION("aws.region", "AWS region to instantiate the Lambda client Default: " + AWS_REGION_DEFAULT), AWS_CREDENTIALS_PROFILE("aws.credentials.profile", - " AWS credentials profile to use for the Lambda client, by default is empty and will use the DefaultAWSCredentialsProviderChain "), + " AWS credentials profile to use for the Lambda client, by default is empty and will use the DefaultAWSCredentialsProviderChain"), HTTP_PROXY_HOST("http.proxy.host", "Http proxy port to be configured for the Lambda client, by default is empty"), @@ -259,8 +293,15 @@ public enum ConfigurationKeys { + RETRIABLE_ERROR_CODES_DEFAULT), RETRY_BACKOFF_MILLIS("retry.backoff.millis", "The amount of time to wait between retry attempts, by default is " - + RETRY_BACKOFF_MILLIS_DEFAULT); + + RETRY_BACKOFF_MILLIS_DEFAULT), + // AWS assume role support options + CREDENTIALS_PROVIDER_CLASS_CONFIG("lambda.credentials.provider.class", "REQUIRED Class providing cross-account role assumption"), + CREDENTIALS_PROVIDER_CLASS_DEFAULT("com.amazonaws.auth.DefaultAWSCredentialsProviderChain", "Default provider chain if lambda.credentials.provider.class is not passed in"), + CREDENTIALS_PROVIDER_CONFIG_PREFIX("lambda.credentials.provider.", "NB: trailing '.'"), + ROLE_ARN_CONFIG("lambda.credentials.provider.role.arn", " REQUIRED AWS Role ARN providing the access"), + SESSION_NAME_CONFIG("lambda.credentials.provider.session.name", "REQUIRED Session name"), + EXTERNAL_ID_CONFIG("lambda.credentials.provider.external.id", "OPTIONAL (but recommended) External identifier used by the kafka-connect-lambda when assuming the role"); private final String value; private final String documentation; @@ -268,22 +309,42 @@ public enum ConfigurationKeys { ConfigurationKeys(final String configurationKeyValue, final String documentation) { Guard.verifyNotNullOrEmpty(configurationKeyValue, "configurationKeyValue"); - // Empty or null documentation is OK. + // Empty or null documentation is ok. this.value = configurationKeyValue; this.documentation = documentation; } - public String getDocumentation() { + String getValue() { + return this.value; + } + + String getDocumentation() { return this.documentation; } - public String getValue() { + @Override + public String toString() { return this.value; } + } + + private static class CredentialsProviderValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object provider) { + if (provider instanceof Class + && AWSCredentialsProvider.class.isAssignableFrom((Class) provider)) { + return; + } + throw new ConfigException( + name, + provider, + "Class must extend: " + AWSCredentialsProvider.class + ); + } @Override public String toString() { - return this.value; + return "Any class implementing: " + AWSCredentialsProvider.class; } } } diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java index f156047..6baa37b 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java @@ -1,5 +1,7 @@ package com.nordstrom.kafka.connect.lambda; +import com.nordstrom.kafka.connect.utils.About; +import com.nordstrom.kafka.connect.utils.JsonUtil; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InvalidConfigurationException; @@ -10,7 +12,11 @@ import org.slf4j.LoggerFactory; import java.text.MessageFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -31,21 +37,26 @@ public String version() { } @Override - public void start(Map props) { + public void start(final Map props) { this.properties = props; this.configuration = new LambdaSinkTaskConfiguration(this.properties); + LOGGER.info("starting connector {} task {} with properties {}", this.configuration.getConnectorName(), this.configuration.getTaskId(), props); - this.lambdaClient = new AwsLambdaUtil( - new Configuration( + + Configuration optConfigs = new Configuration( this.configuration.getAwsCredentialsProfile(), this.configuration.getHttpProxyHost(), this.configuration.getHttpProxyPort(), this.configuration.getAwsRegion(), - this.configuration.getFailureMode()) - ); + this.configuration.getFailureMode(), + this.configuration.getRoleArn(), + this.configuration.getSessionName(), + this.configuration.getExternalId()); + this.lambdaClient = new AwsLambdaUtil(optConfigs, configuration.originalsWithPrefix(LambdaSinkConnectorConfig.ConfigurationKeys.CREDENTIALS_PROVIDER_CONFIG_PREFIX.getValue())); + LOGGER.info("Context for connector {} task {}, Assignments[{}], ", this.configuration.getConnectorName(), this.configuration.getTaskId(), @@ -58,7 +69,7 @@ public void start(Map props) { } @Override - public void put(Collection records) { + public void put(final Collection records) { if (records == null || records.isEmpty()) { LOGGER.debug("No records to process. connector=\"{}\" task=\"{}\"", this.configuration.getConnectorName(), @@ -78,7 +89,8 @@ public void put(Collection records) { this.rinse(); this.context.requestCommit(); } - } else { + } + else { for (final SinkRecord record : records) { this.invoke(this.getPayload(record)); } @@ -98,7 +110,7 @@ public void setLambdaClient(AwsLambdaUtil lambdaClient) { private void rinse() { final List records = new ArrayList<>(this.batchRecords); - if (!records.isEmpty()) { + if (! records.isEmpty()) { this.splitBatch(records, this.configuration.getMaxBatchSizeBytes()) .forEach(recordsToFlush -> { @@ -271,7 +283,7 @@ private void handleResponse( // NOT retrying -> data loss final String message = MessageFormat - .format("NonRetriable Error with last call {} {} {} ", + .format("Non-retriable Error with last call {} {} {} ", response.getStatusCode(), response.getErrorString(), response.getResponseString() @@ -292,21 +304,19 @@ class LambdaSinkTaskConfiguration extends LambdaSinkConnectorConfig { private final String taskId; - public LambdaSinkTaskConfiguration(final Map properties) { + LambdaSinkTaskConfiguration(final Map properties) { super(LambdaSinkConnectorConfig.getConfigDefinition(), properties); this.taskId = "0";//this.getString(ConfigurationKeys.TASK_ID.getValue()); } - public String getTaskId() { + String getTaskId() { return this.taskId; } } - private class OutOfRetriesException extends RuntimeException { - public OutOfRetriesException( - final String message) { + OutOfRetriesException(final String message) { super(message); } } diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/About.java b/src/main/java/com/nordstrom/kafka/connect/utils/About.java similarity index 82% rename from src/main/java/com/nordstrom/kafka/connect/lambda/About.java rename to src/main/java/com/nordstrom/kafka/connect/utils/About.java index 0f2e391..2044e5c 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/About.java +++ b/src/main/java/com/nordstrom/kafka/connect/utils/About.java @@ -1,4 +1,4 @@ -package com.nordstrom.kafka.connect.lambda; +package com.nordstrom.kafka.connect.utils; /** * This class is used by the templating-maven-plugin to generate the diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/Facility.java b/src/main/java/com/nordstrom/kafka/connect/utils/Facility.java similarity index 94% rename from src/main/java/com/nordstrom/kafka/connect/lambda/Facility.java rename to src/main/java/com/nordstrom/kafka/connect/utils/Facility.java index d3952cc..9b373dd 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/Facility.java +++ b/src/main/java/com/nordstrom/kafka/connect/utils/Facility.java @@ -1,4 +1,4 @@ -package com.nordstrom.kafka.connect.lambda; +package com.nordstrom.kafka.connect.utils; /** * General utility test methods. diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/Guard.java b/src/main/java/com/nordstrom/kafka/connect/utils/Guard.java similarity index 93% rename from src/main/java/com/nordstrom/kafka/connect/lambda/Guard.java rename to src/main/java/com/nordstrom/kafka/connect/utils/Guard.java index 3488971..ff057ee 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/Guard.java +++ b/src/main/java/com/nordstrom/kafka/connect/utils/Guard.java @@ -1,4 +1,4 @@ -package com.nordstrom.kafka.connect.lambda; +package com.nordstrom.kafka.connect.utils; /** * Guard methods throw an exception should the test fail. diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/JsonUtil.java b/src/main/java/com/nordstrom/kafka/connect/utils/JsonUtil.java similarity index 94% rename from src/main/java/com/nordstrom/kafka/connect/lambda/JsonUtil.java rename to src/main/java/com/nordstrom/kafka/connect/utils/JsonUtil.java index 4e7e85f..2eefa03 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/JsonUtil.java +++ b/src/main/java/com/nordstrom/kafka/connect/utils/JsonUtil.java @@ -1,4 +1,4 @@ -package com.nordstrom.kafka.connect.lambda; +package com.nordstrom.kafka.connect.utils; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; diff --git a/src/test/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProviderTest.java b/src/test/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProviderTest.java new file mode 100644 index 0000000..e3c5853 --- /dev/null +++ b/src/test/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProviderTest.java @@ -0,0 +1,71 @@ +package com.nordstrom.kafka.connect.auth; + +import com.google.common.collect.ImmutableMap; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class AWSAssumeRoleCredentialsProviderTest { + + private static final String NON_EXISTENT_FIELD_NAME = "non.existent.field.name"; + + private ImmutableMap testConfigs; + + @Before + public void setupTestConfigs() { + testConfigs = + new ImmutableMap.Builder() + .put(AWSAssumeRoleCredentialsProvider.EXTERNAL_ID_CONFIG, "test-external-id") + .put(AWSAssumeRoleCredentialsProvider.ROLE_ARN_CONFIG, "arn:aws:iam::123456789012:role/test-role") + .put(AWSAssumeRoleCredentialsProvider.SESSION_NAME_CONFIG, "test-session-name") + .build(); + } + + @Test + public void testConfigureInitializesProviderFields() { + AWSAssumeRoleCredentialsProvider testProvider = new AWSAssumeRoleCredentialsProvider(); + + testProvider.configure(testConfigs); + + assertEquals("test-external-id", testProvider.getExternalId()); + assertEquals("arn:aws:iam::123456789012:role/test-role", testProvider.getRoleArn()); + assertEquals("test-session-name", testProvider.getSessionName()); + } + + @Test + public void testGetOptionalFieldReturnsNullGivenNonExistentConfigName() { + AWSAssumeRoleCredentialsProvider testProvider = new AWSAssumeRoleCredentialsProvider(); + + testProvider.configure(testConfigs); + + assertNull(testProvider.getOptionalField(testConfigs, NON_EXISTENT_FIELD_NAME)); + } + + @Test + public void testGetOptionalFieldReturnsValueGivenValidConfigName() { + AWSAssumeRoleCredentialsProvider testProvider = new AWSAssumeRoleCredentialsProvider(); + + testProvider.configure(testConfigs); + + assertEquals("test-external-id", testProvider.getOptionalField(testConfigs, AWSAssumeRoleCredentialsProvider.EXTERNAL_ID_CONFIG)); + } + + @Test(expected = IllegalArgumentException.class) + public void testGetRequiredFieldThrowsExceptionGivenNonExistentConfigName() { + AWSAssumeRoleCredentialsProvider testProvider = new AWSAssumeRoleCredentialsProvider(); + + testProvider.configure(testConfigs); + + assertNull(testProvider.getRequiredField(testConfigs, NON_EXISTENT_FIELD_NAME)); + } + + @Test + public void testGetRequiredFieldReturnsValueGivenValidConfigName() { + AWSAssumeRoleCredentialsProvider testProvider = new AWSAssumeRoleCredentialsProvider(); + + testProvider.configure(testConfigs); + + assertEquals("test-external-id", testProvider.getRequiredField(testConfigs, AWSAssumeRoleCredentialsProvider.EXTERNAL_ID_CONFIG)); + } +} diff --git a/src/test/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtilTest.java b/src/test/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtilTest.java index 7fb7246..22c0e75 100644 --- a/src/test/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtilTest.java +++ b/src/test/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtilTest.java @@ -5,6 +5,7 @@ import org.junit.Test; import java.time.Instant; +import java.util.HashMap; import static org.junit.Assert.*; @@ -13,8 +14,8 @@ public class AwsLambdaUtilTest { @Test(expected = RequestTooLargeException.class) public void testCheckPayloadSizeForInvocationTypeWithInvocationFailureModeStopThrowsException() { - final Configuration testConfiguration = new Configuration("test-profile", "testhost", 123, "test-region", InvocationFailure.STOP); - final AwsLambdaUtil testUtil = new AwsLambdaUtil(testConfiguration); + final Configuration testOptConfigs = new Configuration("test-profile", "testhost", 123, "test-region", InvocationFailure.STOP, "test-arn", "test-session", "test-external-id"); + final AwsLambdaUtil testUtil = new AwsLambdaUtil(testOptConfigs, new HashMap<>()); testUtil.checkPayloadSizeForInvocationType("testpayload".getBytes(), InvocationType.RequestResponse, Instant.now(), new RequestTooLargeException("Request payload is too large!")); } @@ -25,8 +26,8 @@ public void testCheckPayloadSizeForInvocationTypeWithInvocationFailureModeDropCo AwsLambdaUtil.InvocationResponse testResp = null; RequestTooLargeException ex = null; - final Configuration testConfiguration = new Configuration("test-profile", "testhost", 123, "test-region", InvocationFailure.DROP); - final AwsLambdaUtil testUtil = new AwsLambdaUtil(testConfiguration); + final Configuration testOptConfigs = new Configuration("test-profile", "testhost", 123, "test-region", InvocationFailure.DROP, "test-arn", "test-session", "test-external-id"); + final AwsLambdaUtil testUtil = new AwsLambdaUtil(testOptConfigs, new HashMap<>()); try { testResp = testUtil.checkPayloadSizeForInvocationType("testpayload".getBytes(), InvocationType.RequestResponse, Instant.now(), new RequestTooLargeException("Request payload is too large!")); diff --git a/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java b/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java index 792d576..31919a6 100644 --- a/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java +++ b/src/test/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTaskTest.java @@ -13,6 +13,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import static org.apache.kafka.connect.data.Schema.STRING_SCHEMA; import static org.junit.Assert.*; @@ -91,7 +92,10 @@ public void testPutWhenBatchingIsNotEnabled() { task.configuration.getHttpProxyHost(), task.configuration.getHttpProxyPort(), task.configuration.getAwsRegion(), - task.configuration.getFailureMode())).new InvocationResponse(200, "test log", "", Instant.now(), Instant.now())); + task.configuration.getFailureMode(), + task.configuration.getRoleArn(), + task.configuration.getSessionName(), + task.configuration.getExternalId()), new HashMap<>()).new InvocationResponse(200, "test log", "", Instant.now(), Instant.now())); Schema testSchema = SchemaBuilder.struct().name("com.nordstrom.kafka.connect.lambda.foo").field("bar", STRING_SCHEMA).build(); From cb6683fe097276cb092d18401fe91b3a399776ba Mon Sep 17 00:00:00 2001 From: Sean Williams Date: Mon, 15 Jul 2019 12:20:27 -0700 Subject: [PATCH 2/9] Adjust logging messages from warn to info in several places --- .../nordstrom/kafka/connect/lambda/AwsLambdaUtil.java | 10 +++++----- .../nordstrom/kafka/connect/lambda/LambdaSinkTask.java | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java b/src/main/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java index d718ff3..20bb8aa 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java @@ -37,7 +37,7 @@ public class AwsLambdaUtil { private final InvocationFailure failureMode; public AwsLambdaUtil(final Configuration optConfigs, final Map bareAssumeRoleConfigs) { - LOGGER.warn("AwsLambdaUtil.ctor:bareAssumeRoleConfigs={}", bareAssumeRoleConfigs); + LOGGER.info("AwsLambdaUtil.ctor:bareAssumeRoleConfigs={}", bareAssumeRoleConfigs); Guard.verifyNotNull(optConfigs, "optConfigs"); final AWSLambdaAsyncClientBuilder builder = AWSLambdaAsyncClientBuilder.standard(); @@ -157,7 +157,7 @@ InvocationResponse checkPayloadSizeForInvocationType(final byte[] payload, final @SuppressWarnings("unchecked") public AWSCredentialsProvider getCredentialsProvider(Map roleConfigs) { - LOGGER.warn(".get-credentials-provider:assumeRoleConfigs={}", roleConfigs); + LOGGER.info(".get-credentials-provider:assumeRoleConfigs={}", roleConfigs); try { Object providerField = roleConfigs.get("class"); @@ -165,7 +165,7 @@ public AWSCredentialsProvider getCredentialsProvider(Map roleConfigs) if (providerField != null) { providerClass = providerField.toString(); } - LOGGER.warn(".get-credentials-provider:field={}, class={}", providerField, providerClass); + LOGGER.info(".get-credentials-provider:field={}, class={}", providerField, providerClass); AWSCredentialsProvider provider = ((Class) getClass(providerClass)).newInstance(); @@ -173,7 +173,7 @@ public AWSCredentialsProvider getCredentialsProvider(Map roleConfigs) ((Configurable) provider).configure(roleConfigs); } - LOGGER.warn(".get-credentials-provider:provider={}", provider); + LOGGER.info(".get-credentials-provider:provider={}", provider); return provider; } catch (IllegalAccessException | InstantiationException e) { throw new ConnectException("Invalid class for: " + LambdaSinkConnectorConfig.ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG, e); @@ -181,7 +181,7 @@ public AWSCredentialsProvider getCredentialsProvider(Map roleConfigs) } public Class getClass(String className) { - LOGGER.warn(".get-class:class={}",className); + LOGGER.info(".get-class:class={}",className); try { return Class.forName(className); } catch (ClassNotFoundException e) { diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java index 6baa37b..0a4e789 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java @@ -71,7 +71,7 @@ public void start(final Map props) { @Override public void put(final Collection records) { if (records == null || records.isEmpty()) { - LOGGER.debug("No records to process. connector=\"{}\" task=\"{}\"", + LOGGER.info("No records to process. connector=\"{}\" task=\"{}\"", this.configuration.getConnectorName(), this.configuration.getTaskId()); return; @@ -81,7 +81,7 @@ public void put(final Collection records) { this.batchRecords.addAll(records); final int batchLength = this.getPayload(this.batchRecords).getBytes().length; if (batchLength >= this.configuration.getMaxBatchSizeBytes()) { - LOGGER.warn("Batch size reached {} bytes within {} records. connector=\"{}\" task=\"{}\"", + LOGGER.info("Batch size reached {} bytes within {} records. connector=\"{}\" task=\"{}\"", batchLength, this.batchRecords.size(), this.configuration.getConnectorName(), From 1f02ff8e0048c82ac1bbae77af79ecafee5138fb Mon Sep 17 00:00:00 2001 From: Sean Williams Date: Mon, 15 Jul 2019 12:28:44 -0700 Subject: [PATCH 3/9] Removed LICENSE file. Will add it back in a separate pull/merge request. --- LICENSE | 201 -------------------------------------------------------- 1 file changed, 201 deletions(-) delete mode 100644 LICENSE diff --git a/LICENSE b/LICENSE deleted file mode 100644 index 261eeb9..0000000 --- a/LICENSE +++ /dev/null @@ -1,201 +0,0 @@ - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. From 3d09dda54cee16428d545e6a32b076532bbb54a7 Mon Sep 17 00:00:00 2001 From: Sean Williams Date: Mon, 15 Jul 2019 14:38:16 -0700 Subject: [PATCH 4/9] Remove lombok dependency and add explicit getters for externalId, roleArn, and sessionName instance variables. --- pom.xml | 6 ------ .../auth/AWSAssumeRoleCredentialsProvider.java | 13 +++++++++++-- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index a5ce0ee..479a4ac 100644 --- a/pom.xml +++ b/pom.xml @@ -66,12 +66,6 @@ guava ${google.guava.version} - - org.projectlombok - lombok - ${lombok.version} - provided - diff --git a/src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java b/src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java index e076b08..fe4ca4c 100644 --- a/src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java +++ b/src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java @@ -4,7 +4,6 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; -import lombok.Getter; import org.apache.kafka.common.Configurable; import java.util.Map; @@ -12,7 +11,6 @@ //import org.slf4j.Logger; //import org.slf4j.LoggerFactory; -@Getter public class AWSAssumeRoleCredentialsProvider implements AWSCredentialsProvider, Configurable { //NB: uncomment slf4j imports and field declaration to enable logging. // private static final Logger log = LoggerFactory.getLogger(AWSAssumeRoleCredentialsProvider.class); @@ -85,4 +83,15 @@ private void verifyNotNullOrEmpty(final String field, final String fieldName) { } } + String getExternalId() { + return this.externalId; + } + + String getRoleArn() { + return this.roleArn; + } + + String getSessionName() { + return this.sessionName; + } } From 62cd2707c0b8a6928234e724d360f8b40f6e908e Mon Sep 17 00:00:00 2001 From: Sean Williams Date: Mon, 15 Jul 2019 15:10:10 -0700 Subject: [PATCH 5/9] Slightly updated README.md documentation. --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 8b20cae..d4441a7 100644 --- a/README.md +++ b/README.md @@ -20,8 +20,8 @@ A sink connector configuration has two required fields: * `topics`: The Kafka topic to be read from. ### AWS Assume Role Support options - The connector can assume a cross-account role: - * `lambda.credentials.provider.class=com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider`: REQUIRED Class providing cross-account role assumption. + The connector can assume a role: + * `lambda.credentials.provider.class=com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider`: REQUIRED The credentials provider class. * `lambda.credentials.provider.role.arn`: REQUIRED AWS Role ARN providing the access. * `lambda.credentials.provider.session.name`: REQUIRED Session name * `lambda.credentials.provider.external.id`: OPTIONAL (but recommended) External identifier used by the `kafka-connect-lambda` when assuming the role. From 234e1ec6b839d0280bce827731681b342a30bfbc Mon Sep 17 00:00:00 2001 From: Sean Williams Date: Mon, 15 Jul 2019 15:13:42 -0700 Subject: [PATCH 6/9] Updated README.md once more to add comment(s) about policies the role should include. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d4441a7..5fa0ec2 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ A sink connector configuration has two required fields: * `topics`: The Kafka topic to be read from. ### AWS Assume Role Support options - The connector can assume a role: + The connector can assume an IAM Role. The role must include a policy that allows lambda:InvokeFunction and lambda:InvokeAsync actions: * `lambda.credentials.provider.class=com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider`: REQUIRED The credentials provider class. * `lambda.credentials.provider.role.arn`: REQUIRED AWS Role ARN providing the access. * `lambda.credentials.provider.session.name`: REQUIRED Session name From 2d18713a6bf8890514de1b56a87ad0632f9d8ca0 Mon Sep 17 00:00:00 2001 From: Sean Williams Date: Tue, 16 Jul 2019 14:23:14 -0700 Subject: [PATCH 7/9] Replace lambda.* prefix on credentials configs with aws.* prefix for consistency purposes --- README.md | 10 +++++----- pom.xml | 2 +- .../connect/lambda/LambdaSinkConnectorConfig.java | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 5fa0ec2..7d5eb47 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ The AWS Lambda connector plugin provides the ability to use AWS Lambda functions as a sink (out of a Kafka topic into a Lambda function). ## Supported Kafka and AWS versions -The `kafka-connect-lambda` connector has been tested with `connect-api:2.1.0` and `aws-java-sdk-lambda:1.11.587` +The `kafka-connect-lambda` connector has been tested with `connect-api:2.1.0` and `aws-java-sdk-lambda:1.11.592` # Building You can build the connector with Maven using the standard lifecycle goals: @@ -21,10 +21,10 @@ A sink connector configuration has two required fields: ### AWS Assume Role Support options The connector can assume an IAM Role. The role must include a policy that allows lambda:InvokeFunction and lambda:InvokeAsync actions: - * `lambda.credentials.provider.class=com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider`: REQUIRED The credentials provider class. - * `lambda.credentials.provider.role.arn`: REQUIRED AWS Role ARN providing the access. - * `lambda.credentials.provider.session.name`: REQUIRED Session name - * `lambda.credentials.provider.external.id`: OPTIONAL (but recommended) External identifier used by the `kafka-connect-lambda` when assuming the role. + * `aws.credentials.provider.class=com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider`: REQUIRED The credentials provider class. + * `aws.credentials.provider.role.arn`: REQUIRED AWS Role ARN providing the access. + * `aws.credentials.provider.session.name`: REQUIRED Session name + * `aws.credentials.provider.external.id`: OPTIONAL (but recommended) External identifier used by the `kafka-connect-lambda` when assuming the role. ### Sample Configuration ```json diff --git a/pom.xml b/pom.xml index 479a4ac..ef04053 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ 3.0.0 2.1.0 - 1.11.587 + 1.11.592 4.12 2.28.2 19.0 diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfig.java b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfig.java index 872a4f8..c22269c 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfig.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfig.java @@ -298,7 +298,7 @@ enum ConfigurationKeys { // AWS assume role support options CREDENTIALS_PROVIDER_CLASS_CONFIG("lambda.credentials.provider.class", "REQUIRED Class providing cross-account role assumption"), CREDENTIALS_PROVIDER_CLASS_DEFAULT("com.amazonaws.auth.DefaultAWSCredentialsProviderChain", "Default provider chain if lambda.credentials.provider.class is not passed in"), - CREDENTIALS_PROVIDER_CONFIG_PREFIX("lambda.credentials.provider.", "NB: trailing '.'"), + CREDENTIALS_PROVIDER_CONFIG_PREFIX("aws.credentials.provider.", "NB: trailing '.'"), ROLE_ARN_CONFIG("lambda.credentials.provider.role.arn", " REQUIRED AWS Role ARN providing the access"), SESSION_NAME_CONFIG("lambda.credentials.provider.session.name", "REQUIRED Session name"), EXTERNAL_ID_CONFIG("lambda.credentials.provider.external.id", "OPTIONAL (but recommended) External identifier used by the kafka-connect-lambda when assuming the role"); From ad43ed71b97c44d01cf1fb7127a62fda2a56bfa0 Mon Sep 17 00:00:00 2001 From: Sean Williams Date: Tue, 16 Jul 2019 16:24:32 -0700 Subject: [PATCH 8/9] Tweak logging levels for several trace lines; remove NB notation in comments. --- pom.xml | 2 +- .../connect/auth/AWSAssumeRoleCredentialsProvider.java | 2 +- .../com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java | 8 ++++---- .../kafka/connect/lambda/LambdaSinkConnectorConfig.java | 2 +- .../nordstrom/kafka/connect/lambda/LambdaSinkTask.java | 4 ++-- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pom.xml b/pom.xml index ef04053..6afddb0 100644 --- a/pom.xml +++ b/pom.xml @@ -80,7 +80,7 @@ - diff --git a/src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java b/src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java index fe4ca4c..2162453 100644 --- a/src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java +++ b/src/main/java/com/nordstrom/kafka/connect/auth/AWSAssumeRoleCredentialsProvider.java @@ -12,7 +12,7 @@ //import org.slf4j.LoggerFactory; public class AWSAssumeRoleCredentialsProvider implements AWSCredentialsProvider, Configurable { - //NB: uncomment slf4j imports and field declaration to enable logging. +// Uncomment slf4j imports and field declaration to enable logging. // private static final Logger log = LoggerFactory.getLogger(AWSAssumeRoleCredentialsProvider.class); public static final String EXTERNAL_ID_CONFIG = "external.id"; diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java b/src/main/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java index 20bb8aa..4dac70b 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/AwsLambdaUtil.java @@ -37,7 +37,7 @@ public class AwsLambdaUtil { private final InvocationFailure failureMode; public AwsLambdaUtil(final Configuration optConfigs, final Map bareAssumeRoleConfigs) { - LOGGER.info("AwsLambdaUtil.ctor:bareAssumeRoleConfigs={}", bareAssumeRoleConfigs); + LOGGER.debug("AwsLambdaUtil.ctor:bareAssumeRoleConfigs={}", bareAssumeRoleConfigs); Guard.verifyNotNull(optConfigs, "optConfigs"); final AWSLambdaAsyncClientBuilder builder = AWSLambdaAsyncClientBuilder.standard(); @@ -165,7 +165,7 @@ public AWSCredentialsProvider getCredentialsProvider(Map roleConfigs) if (providerField != null) { providerClass = providerField.toString(); } - LOGGER.info(".get-credentials-provider:field={}, class={}", providerField, providerClass); + LOGGER.debug(".get-credentials-provider:field={}, class={}", providerField, providerClass); AWSCredentialsProvider provider = ((Class) getClass(providerClass)).newInstance(); @@ -173,7 +173,7 @@ public AWSCredentialsProvider getCredentialsProvider(Map roleConfigs) ((Configurable) provider).configure(roleConfigs); } - LOGGER.info(".get-credentials-provider:provider={}", provider); + LOGGER.debug(".get-credentials-provider:provider={}", provider); return provider; } catch (IllegalAccessException | InstantiationException e) { throw new ConnectException("Invalid class for: " + LambdaSinkConnectorConfig.ConfigurationKeys.CREDENTIALS_PROVIDER_CLASS_CONFIG, e); @@ -181,7 +181,7 @@ public AWSCredentialsProvider getCredentialsProvider(Map roleConfigs) } public Class getClass(String className) { - LOGGER.info(".get-class:class={}",className); + LOGGER.debug(".get-class:class={}",className); try { return Class.forName(className); } catch (ClassNotFoundException e) { diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfig.java b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfig.java index c22269c..a06da94 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfig.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkConnectorConfig.java @@ -298,7 +298,7 @@ enum ConfigurationKeys { // AWS assume role support options CREDENTIALS_PROVIDER_CLASS_CONFIG("lambda.credentials.provider.class", "REQUIRED Class providing cross-account role assumption"), CREDENTIALS_PROVIDER_CLASS_DEFAULT("com.amazonaws.auth.DefaultAWSCredentialsProviderChain", "Default provider chain if lambda.credentials.provider.class is not passed in"), - CREDENTIALS_PROVIDER_CONFIG_PREFIX("aws.credentials.provider.", "NB: trailing '.'"), + CREDENTIALS_PROVIDER_CONFIG_PREFIX("aws.credentials.provider.", "Note trailing '.'"), ROLE_ARN_CONFIG("lambda.credentials.provider.role.arn", " REQUIRED AWS Role ARN providing the access"), SESSION_NAME_CONFIG("lambda.credentials.provider.session.name", "REQUIRED Session name"), EXTERNAL_ID_CONFIG("lambda.credentials.provider.external.id", "OPTIONAL (but recommended) External identifier used by the kafka-connect-lambda when assuming the role"); diff --git a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java index 0a4e789..6baa37b 100644 --- a/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java +++ b/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java @@ -71,7 +71,7 @@ public void start(final Map props) { @Override public void put(final Collection records) { if (records == null || records.isEmpty()) { - LOGGER.info("No records to process. connector=\"{}\" task=\"{}\"", + LOGGER.debug("No records to process. connector=\"{}\" task=\"{}\"", this.configuration.getConnectorName(), this.configuration.getTaskId()); return; @@ -81,7 +81,7 @@ public void put(final Collection records) { this.batchRecords.addAll(records); final int batchLength = this.getPayload(this.batchRecords).getBytes().length; if (batchLength >= this.configuration.getMaxBatchSizeBytes()) { - LOGGER.info("Batch size reached {} bytes within {} records. connector=\"{}\" task=\"{}\"", + LOGGER.warn("Batch size reached {} bytes within {} records. connector=\"{}\" task=\"{}\"", batchLength, this.batchRecords.size(), this.configuration.getConnectorName(), From f3bd1758368773210f19741a3fe0cd054878cdaf Mon Sep 17 00:00:00 2001 From: Sean Williams Date: Wed, 17 Jul 2019 10:59:08 -0700 Subject: [PATCH 9/9] Set version number to 1.0.0 for initial release --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 6afddb0..eb5ae5b 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.nordstrom.kafka.connect.lambda kafka-connect-lambda - 1.0-SNAPSHOT + 1.0.0 jar kafka-connect-lambda