diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java index 880bcf374..97a9541e7 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java @@ -17,11 +17,14 @@ import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.regions.RegionUtils; import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.SSEAlgorithm; +import io.confluent.connect.storage.common.util.StringUtils; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -787,6 +790,14 @@ public CannedAccessControlList getCannedAcl() { return CannedAclValidator.ACLS_BY_HEADER_VALUE.get(getString(ACL_CANNED_CONFIG)); } + public String awsAccessKeyId() { + return getString(AWS_ACCESS_KEY_ID_CONFIG); + } + + public Password awsSecretKeyId() { + return getPassword(AWS_SECRET_ACCESS_KEY_CONFIG); + } + public int getPartSize() { return getInt(PART_SIZE_CONFIG); } @@ -802,7 +813,18 @@ public AWSCredentialsProvider getCredentialsProvider() { configs.remove(CREDENTIALS_PROVIDER_CLASS_CONFIG.substring( CREDENTIALS_PROVIDER_CONFIG_PREFIX.length() )); + + configs.put(AWS_ACCESS_KEY_ID_CONFIG, awsAccessKeyId()); + configs.put(AWS_SECRET_ACCESS_KEY_CONFIG, awsSecretKeyId().value()); + ((Configurable) provider).configure(configs); + } else { + final String accessKeyId = awsAccessKeyId(); + final String secretKey = awsSecretKeyId().value(); + if (StringUtils.isNotBlank(accessKeyId) && StringUtils.isNotBlank(secretKey)) { + BasicAWSCredentials basicCredentials = new BasicAWSCredentials(accessKeyId, secretKey); + provider = new AWSStaticCredentialsProvider(basicCredentials); + } } return provider; diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java index 9547098c7..2dc6b23da 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java @@ -17,15 +17,21 @@ package io.confluent.connect.s3.auth; import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; +import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; +import io.confluent.connect.storage.common.util.StringUtils; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import java.util.Map; +import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_ACCESS_KEY_ID_CONFIG; +import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_SECRET_ACCESS_KEY_CONFIG; + /** * AWS credentials provider that uses the AWS Security Token Service to assume a Role and create a * temporary, short-lived session to use for authentication. This credentials provider does not @@ -59,21 +65,39 @@ public class AwsAssumeRoleCredentialsProvider implements AWSCredentialsProvider, private String roleExternalId; private String roleSessionName; + private BasicAWSCredentials basicCredentials; + @Override public void configure(Map configs) { AbstractConfig config = new AbstractConfig(STS_CONFIG_DEF, configs); roleArn = config.getString(ROLE_ARN_CONFIG); roleExternalId = config.getString(ROLE_EXTERNAL_ID_CONFIG); roleSessionName = config.getString(ROLE_SESSION_NAME_CONFIG); + final String accessKeyId = (String) configs.get(AWS_ACCESS_KEY_ID_CONFIG); + final String secretKey = (String) configs.get(AWS_SECRET_ACCESS_KEY_CONFIG); + if (StringUtils.isNotBlank(accessKeyId) && StringUtils.isNotBlank(secretKey)) { + basicCredentials = new BasicAWSCredentials(accessKeyId, secretKey); + } else { + basicCredentials = null; + } } @Override public AWSCredentials getCredentials() { - return new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, roleSessionName) - .withStsClient(AWSSecurityTokenServiceClientBuilder.defaultClient()) - .withExternalId(roleExternalId) - .build() - .getCredentials(); + if (basicCredentials != null) { + return new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, roleSessionName) + .withStsClient(AWSSecurityTokenServiceClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(basicCredentials)).build()) + .withExternalId(roleExternalId) + .build() + .getCredentials(); + } else { + return new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, roleSessionName) + .withStsClient(AWSSecurityTokenServiceClientBuilder.defaultClient()) + .withExternalId(roleExternalId) + .build() + .getCredentials(); + } } @Override diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3Storage.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3Storage.java index 206741324..6e01c4559 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3Storage.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3Storage.java @@ -19,8 +19,6 @@ import com.amazonaws.PredefinedClientConfigurations; import com.amazonaws.SdkClientException; import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.regions.Regions; import com.amazonaws.retry.PredefinedBackoffStrategies; @@ -47,8 +45,6 @@ import io.confluent.connect.storage.Storage; import io.confluent.connect.storage.common.util.StringUtils; -import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_ACCESS_KEY_ID_CONFIG; -import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_SECRET_ACCESS_KEY_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.REGION_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.S3_PATH_STYLE_ACCESS_ENABLED_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.S3_PROXY_URL_CONFIG; @@ -174,17 +170,8 @@ protected RetryPolicy newFullJitterRetryPolicy(S3SinkConnectorConfig config) { } protected AWSCredentialsProvider newCredentialsProvider(S3SinkConnectorConfig config) { - final String accessKeyId = config.getString(AWS_ACCESS_KEY_ID_CONFIG); - final String secretKey = config.getPassword(AWS_SECRET_ACCESS_KEY_CONFIG).value(); - if (StringUtils.isNotBlank(accessKeyId) && StringUtils.isNotBlank(secretKey)) { - log.info("Returning new credentials provider using the access key id and " - + "the secret access key that were directly supplied through the connector's " - + "configuration"); - BasicAWSCredentials basicCredentials = new BasicAWSCredentials(accessKeyId, secretKey); - return new AWSStaticCredentialsProvider(basicCredentials); - } - log.info( - "Returning new credentials provider based on the configured credentials provider class"); + log.info("Returning new credentials provider based on the configured " + + "credentials provider class"); return config.getCredentialsProvider(); }