Skip to content

Commit

Permalink
Merge pull request #552 from confluentinc/ccmsg-1074
Browse files Browse the repository at this point in the history
CCMSG 1074 - Allow S3 sink to use assume role with aws.access.key.id
  • Loading branch information
poojakuntalcflt authored Sep 14, 2022
2 parents e39ac41 + 905831b commit c633f08
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.SSEAlgorithm;
import io.confluent.connect.storage.common.util.StringUtils;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
Expand Down Expand Up @@ -787,6 +790,14 @@ public CannedAccessControlList getCannedAcl() {
return CannedAclValidator.ACLS_BY_HEADER_VALUE.get(getString(ACL_CANNED_CONFIG));
}

public String awsAccessKeyId() {
return getString(AWS_ACCESS_KEY_ID_CONFIG);
}

public Password awsSecretKeyId() {
return getPassword(AWS_SECRET_ACCESS_KEY_CONFIG);
}

public int getPartSize() {
return getInt(PART_SIZE_CONFIG);
}
Expand All @@ -802,7 +813,18 @@ public AWSCredentialsProvider getCredentialsProvider() {
configs.remove(CREDENTIALS_PROVIDER_CLASS_CONFIG.substring(
CREDENTIALS_PROVIDER_CONFIG_PREFIX.length()
));

configs.put(AWS_ACCESS_KEY_ID_CONFIG, awsAccessKeyId());
configs.put(AWS_SECRET_ACCESS_KEY_CONFIG, awsSecretKeyId().value());

((Configurable) provider).configure(configs);
} else {
final String accessKeyId = awsAccessKeyId();
final String secretKey = awsSecretKeyId().value();
if (StringUtils.isNotBlank(accessKeyId) && StringUtils.isNotBlank(secretKey)) {
BasicAWSCredentials basicCredentials = new BasicAWSCredentials(accessKeyId, secretKey);
provider = new AWSStaticCredentialsProvider(basicCredentials);
}
}

return provider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@
package io.confluent.connect.s3.auth;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import io.confluent.connect.storage.common.util.StringUtils;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import java.util.Map;

import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_ACCESS_KEY_ID_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_SECRET_ACCESS_KEY_CONFIG;

/**
* AWS credentials provider that uses the AWS Security Token Service to assume a Role and create a
* temporary, short-lived session to use for authentication. This credentials provider does not
Expand Down Expand Up @@ -59,21 +65,39 @@ public class AwsAssumeRoleCredentialsProvider implements AWSCredentialsProvider,
private String roleExternalId;
private String roleSessionName;

private BasicAWSCredentials basicCredentials;

@Override
public void configure(Map<String, ?> configs) {
AbstractConfig config = new AbstractConfig(STS_CONFIG_DEF, configs);
roleArn = config.getString(ROLE_ARN_CONFIG);
roleExternalId = config.getString(ROLE_EXTERNAL_ID_CONFIG);
roleSessionName = config.getString(ROLE_SESSION_NAME_CONFIG);
final String accessKeyId = (String) configs.get(AWS_ACCESS_KEY_ID_CONFIG);
final String secretKey = (String) configs.get(AWS_SECRET_ACCESS_KEY_CONFIG);
if (StringUtils.isNotBlank(accessKeyId) && StringUtils.isNotBlank(secretKey)) {
basicCredentials = new BasicAWSCredentials(accessKeyId, secretKey);
} else {
basicCredentials = null;
}
}

@Override
public AWSCredentials getCredentials() {
return new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, roleSessionName)
.withStsClient(AWSSecurityTokenServiceClientBuilder.defaultClient())
.withExternalId(roleExternalId)
.build()
.getCredentials();
if (basicCredentials != null) {
return new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, roleSessionName)
.withStsClient(AWSSecurityTokenServiceClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(basicCredentials)).build())
.withExternalId(roleExternalId)
.build()
.getCredentials();
} else {
return new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, roleSessionName)
.withStsClient(AWSSecurityTokenServiceClientBuilder.defaultClient())
.withExternalId(roleExternalId)
.build()
.getCredentials();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import com.amazonaws.PredefinedClientConfigurations;
import com.amazonaws.SdkClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.retry.PredefinedBackoffStrategies;
Expand All @@ -47,8 +45,6 @@
import io.confluent.connect.storage.Storage;
import io.confluent.connect.storage.common.util.StringUtils;

import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_ACCESS_KEY_ID_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_SECRET_ACCESS_KEY_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.REGION_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.S3_PATH_STYLE_ACCESS_ENABLED_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.S3_PROXY_URL_CONFIG;
Expand Down Expand Up @@ -174,17 +170,8 @@ protected RetryPolicy newFullJitterRetryPolicy(S3SinkConnectorConfig config) {
}

protected AWSCredentialsProvider newCredentialsProvider(S3SinkConnectorConfig config) {
final String accessKeyId = config.getString(AWS_ACCESS_KEY_ID_CONFIG);
final String secretKey = config.getPassword(AWS_SECRET_ACCESS_KEY_CONFIG).value();
if (StringUtils.isNotBlank(accessKeyId) && StringUtils.isNotBlank(secretKey)) {
log.info("Returning new credentials provider using the access key id and "
+ "the secret access key that were directly supplied through the connector's "
+ "configuration");
BasicAWSCredentials basicCredentials = new BasicAWSCredentials(accessKeyId, secretKey);
return new AWSStaticCredentialsProvider(basicCredentials);
}
log.info(
"Returning new credentials provider based on the configured credentials provider class");
log.info("Returning new credentials provider based on the configured "
+ "credentials provider class");
return config.getCredentialsProvider();
}

Expand Down

0 comments on commit c633f08

Please sign in to comment.