Skip to content

Commit

Permalink
Using Access key and secret access to create Assume Role Provider.
Browse files Browse the repository at this point in the history
  • Loading branch information
poojakuntalcflt committed Aug 24, 2022
1 parent 488dc53 commit 905831b
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -772,12 +772,20 @@ public CannedAccessControlList getCannedAcl() {
return CannedAclValidator.ACLS_BY_HEADER_VALUE.get(getString(ACL_CANNED_CONFIG));
}

public String awsAccessKeyId() {
return getString(AWS_ACCESS_KEY_ID_CONFIG);
}

public Password awsSecretKeyId() {
return getPassword(AWS_SECRET_ACCESS_KEY_CONFIG);
}

public int getPartSize() {
return getInt(PART_SIZE_CONFIG);
}

@SuppressWarnings("unchecked")
public AWSCredentialsProvider getCredentialsProvider(S3SinkConnectorConfig config) {
public AWSCredentialsProvider getCredentialsProvider() {
try {
AWSCredentialsProvider provider = ((Class<? extends AWSCredentialsProvider>)
getClass(S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG)).newInstance();
Expand All @@ -788,15 +796,13 @@ public AWSCredentialsProvider getCredentialsProvider(S3SinkConnectorConfig confi
CREDENTIALS_PROVIDER_CONFIG_PREFIX.length()
));

configs.put(AWS_ACCESS_KEY_ID_CONFIG,
config.getString(AWS_ACCESS_KEY_ID_CONFIG));
configs.put(AWS_SECRET_ACCESS_KEY_CONFIG,
config.getPassword(AWS_SECRET_ACCESS_KEY_CONFIG).value()
);
configs.put(AWS_ACCESS_KEY_ID_CONFIG, awsAccessKeyId());
configs.put(AWS_SECRET_ACCESS_KEY_CONFIG, awsSecretKeyId().value());

((Configurable) provider).configure(configs);
} else {
final String accessKeyId = config.getString(AWS_ACCESS_KEY_ID_CONFIG);
final String secretKey = config.getPassword(AWS_SECRET_ACCESS_KEY_CONFIG).value();
final String accessKeyId = awsAccessKeyId();
final String secretKey = awsSecretKeyId().value();
if (StringUtils.isNotBlank(accessKeyId) && StringUtils.isNotBlank(secretKey)) {
BasicAWSCredentials basicCredentials = new BasicAWSCredentials(accessKeyId, secretKey);
provider = new AWSStaticCredentialsProvider(basicCredentials);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ protected RetryPolicy newFullJitterRetryPolicy(S3SinkConnectorConfig config) {
protected AWSCredentialsProvider newCredentialsProvider(S3SinkConnectorConfig config) {
log.info("Returning new credentials provider based on the configured "
+ "credentials provider class");
return config.getCredentialsProvider(config);
return config.getCredentialsProvider();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public void testConfigurableCredentialProvider() {
);
connectorConfig = new S3SinkConnectorConfig(properties);

AWSCredentialsProvider credentialsProvider = connectorConfig.getCredentialsProvider(connectorConfig);
AWSCredentialsProvider credentialsProvider = connectorConfig.getCredentialsProvider();

assertEquals(ACCESS_KEY_VALUE, credentialsProvider.getCredentials().getAWSAccessKeyId());
assertEquals(SECRET_KEY_VALUE, credentialsProvider.getCredentials().getAWSSecretKey());
Expand Down Expand Up @@ -246,7 +246,7 @@ public void testConfigurableAwsAssumeRoleCredentialsProvider() {
connectorConfig = new S3SinkConnectorConfig(properties);

AwsAssumeRoleCredentialsProvider credentialsProvider =
(AwsAssumeRoleCredentialsProvider) connectorConfig.getCredentialsProvider(connectorConfig);
(AwsAssumeRoleCredentialsProvider) connectorConfig.getCredentialsProvider();
}

@Test
Expand Down Expand Up @@ -279,7 +279,7 @@ public void testConfigurableCredentialProviderMissingConfigs() {
);

connectorConfig = new S3SinkConnectorConfig(properties);
assertThrows("are mandatory configuration properties", ConfigException.class, () -> connectorConfig.getCredentialsProvider(connectorConfig));
assertThrows("are mandatory configuration properties", ConfigException.class, () -> connectorConfig.getCredentialsProvider());
}

@Test
Expand All @@ -304,7 +304,7 @@ public void testConfigurableAwsAssumeRoleCredentialsProviderMissingConfigs() {
connectorConfig = new S3SinkConnectorConfig(properties);

AwsAssumeRoleCredentialsProvider credentialsProvider =
(AwsAssumeRoleCredentialsProvider) connectorConfig.getCredentialsProvider(connectorConfig);
(AwsAssumeRoleCredentialsProvider) connectorConfig.getCredentialsProvider();

assertThrows("Missing required configuration", ConfigException.class, () -> credentialsProvider.configure(properties));
}
Expand Down

0 comments on commit 905831b

Please sign in to comment.