diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java index b56f230ca..49149b529 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java @@ -130,8 +130,6 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig { public static final String AWS_SECRET_ACCESS_KEY_CONFIG = "aws.secret.access.key"; public static final Password AWS_SECRET_ACCESS_KEY_DEFAULT = new Password(null); - public static final String ASSUME_AWS_ACCESS_KEY_ID_CONFIG = "assume.aws.access.key.id"; - public static final String ASSUME_AWS_SECRET_ACCESS_KEY_CONFIG = "assume.aws.secret.access.key"; public static final String CUSTOMER_ROLE_ARN_CONFIG = "aws.iam.assume.role"; public static final String CUSTOMER_ROLE_ARN_DEFAULT = ""; public static final String CUSTOMER_ROLE_EXTERNAL_ID_CONFIG = "aws.iam.external.id"; @@ -323,38 +321,6 @@ public static void addIamConfigDef(ConfigDef configDef, final String group, int Width.LONG, "AWS Role ARN" ); - - configDef.define( - ASSUME_AWS_ACCESS_KEY_ID_CONFIG, - Type.STRING, - AWS_ACCESS_KEY_ID_DEFAULT, - Importance.HIGH, - "The AWS access key ID used to authenticate personal AWS credentials such as IAM " - + "credentials. Use only if you do not wish to authenticate by using a credentials " - + "provider class via ``" - + CREDENTIALS_PROVIDER_CLASS_CONFIG - + "``", - group, - ++orderInGroup, - Width.LONG, - "Assume - AWS Access Key ID" - ); - - configDef.define( - ASSUME_AWS_SECRET_ACCESS_KEY_CONFIG, - Type.PASSWORD, - AWS_SECRET_ACCESS_KEY_DEFAULT, - Importance.HIGH, - "The secret access key used to authenticate personal AWS credentials such as IAM " - + "credentials. Use only if you do not wish to authenticate by using a credentials " - + "provider class via ``" - + CREDENTIALS_PROVIDER_CLASS_CONFIG - + "``", - group, - ++orderInGroup, - Width.LONG, - "Assume - AWS Secret Access Key" - ); } public static ConfigDef newConfigDef() { @@ -978,14 +944,6 @@ public Password awsSecretKeyId() { return getPassword(AWS_SECRET_ACCESS_KEY_CONFIG); } - public String assumeAwsAccessKeyId() { - return getString(ASSUME_AWS_ACCESS_KEY_ID_CONFIG); - } - - public Password assumeAwsSecretKeyId() { - return getPassword(ASSUME_AWS_SECRET_ACCESS_KEY_CONFIG); - } - public String awsCustomerRoleARN() { return getString(CUSTOMER_ROLE_ARN_CONFIG); } @@ -1016,49 +974,26 @@ public AWSCredentialsProvider getCredentialsProvider() { log.info("Authentication method: {}", authMethod); if (provider instanceof Configurable) { - log.info("Instance of configurable"); Map configs = originalsWithPrefix(CREDENTIALS_PROVIDER_CONFIG_PREFIX); configs.remove(CREDENTIALS_PROVIDER_CLASS_CONFIG.substring( CREDENTIALS_PROVIDER_CONFIG_PREFIX.length() )); - authMethod = getAuthenticationMethod(); - log.info("Authentication method: {}", authMethod); - - if (authMethod.equals("IAM Assume Role")) { - log.info("Assume role authentication"); - configs.put(CUSTOMER_ROLE_ARN_CONFIG, awsCustomerRoleARN()); - configs.put(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, awsExternalId()); - configs.put(MIDDLEWARE_ROLE_ARN_CONFIG, awsMiddlewareRoleARN()); - - configs.put(ASSUME_AWS_ACCESS_KEY_ID_CONFIG, assumeAwsAccessKeyId()); - configs.put(ASSUME_AWS_SECRET_ACCESS_KEY_CONFIG, assumeAwsSecretKeyId().value()); - - provider = new AwsIamAssumeRoleChaining(); - ((AwsIamAssumeRoleChaining) provider).configure(configs); - } else { - configs.put(AWS_ACCESS_KEY_ID_CONFIG, awsAccessKeyId()); - configs.put(AWS_SECRET_ACCESS_KEY_CONFIG, awsSecretKeyId().value()); - ((Configurable) provider).configure(configs); - } + configs.put(AWS_ACCESS_KEY_ID_CONFIG, awsAccessKeyId()); + configs.put(AWS_SECRET_ACCESS_KEY_CONFIG, awsSecretKeyId().value()); + ((Configurable) provider).configure(configs); } else { authMethod = getAuthenticationMethod(); - log.info("Authentication method: ", authMethod); - Map configs = new HashMap(); if (authMethod.equals("IAM Assume Role")) { - log.info("Assume role authentication"); + Map configs = new HashMap(); configs.put(CUSTOMER_ROLE_ARN_CONFIG, awsCustomerRoleARN()); configs.put(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, awsExternalId().value()); configs.put(MIDDLEWARE_ROLE_ARN_CONFIG, awsMiddlewareRoleARN()); - configs.put(ASSUME_AWS_ACCESS_KEY_ID_CONFIG, assumeAwsAccessKeyId()); - configs.put(ASSUME_AWS_SECRET_ACCESS_KEY_CONFIG, assumeAwsSecretKeyId().value()); - provider = new AwsIamAssumeRoleChaining(); ((AwsIamAssumeRoleChaining) provider).configure(configs); } else { - log.info("Using accessKeyID and secret"); final String accessKeyId = awsAccessKeyId(); final String secretKey = awsSecretKeyId().value(); if (StringUtils.isNotBlank(accessKeyId) && StringUtils.isNotBlank(secretKey)) { diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamassume/AwsIamAssumeRoleChaining.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamassume/AwsIamAssumeRoleChaining.java index 06b6d95bf..dbd29de4f 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamassume/AwsIamAssumeRoleChaining.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamassume/AwsIamAssumeRoleChaining.java @@ -18,7 +18,6 @@ import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; @@ -28,9 +27,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static io.confluent.connect.s3.S3SinkConnectorConfig.ASSUME_AWS_ACCESS_KEY_ID_CONFIG; -import static io.confluent.connect.s3.S3SinkConnectorConfig.ASSUME_AWS_SECRET_ACCESS_KEY_CONFIG; -import static io.confluent.connect.s3.S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.CUSTOMER_ROLE_ARN_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.CUSTOMER_ROLE_EXTERNAL_ID_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.MIDDLEWARE_ROLE_ARN_CONFIG; @@ -55,24 +51,6 @@ public class AwsIamAssumeRoleChaining implements AWSCredentialsProvider { ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Role ARN to use when starting a session." - ).define( - ASSUME_AWS_ACCESS_KEY_ID_CONFIG, - ConfigDef.Type.STRING, - ConfigDef.Importance.HIGH, - "The secret access key used to authenticate personal AWS credentials such as IAM " - + "credentials. Use only if you do not wish to authenticate by using a credentials " - + "provider class via ``" - + CREDENTIALS_PROVIDER_CLASS_CONFIG - + "``" - ).define( - ASSUME_AWS_SECRET_ACCESS_KEY_CONFIG, - ConfigDef.Type.STRING, - ConfigDef.Importance.HIGH, - "The secret access key used to authenticate personal AWS credentials such as IAM " - + "credentials. Use only if you do not wish to authenticate by using a credentials " - + "provider class via ``" - + CREDENTIALS_PROVIDER_CLASS_CONFIG - + "``" ); private String customerRoleArn; @@ -80,8 +58,6 @@ public class AwsIamAssumeRoleChaining implements AWSCredentialsProvider { private String middlewareRoleArn; private STSAssumeRoleSessionCredentialsProvider stsCredentialProvider; private STSAssumeRoleSessionCredentialsProvider initialProvider; - private String accessSecret = ""; - private String accessKeyId = ""; // Method to initiate role chaining public void configure(Map configs) { @@ -90,8 +66,6 @@ public void configure(Map configs) { customerRoleArn = config.getString(CUSTOMER_ROLE_ARN_CONFIG); customerRoleExternalId = config.getString(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG); middlewareRoleArn = config.getString(MIDDLEWARE_ROLE_ARN_CONFIG); - accessKeyId = config.getString(ASSUME_AWS_ACCESS_KEY_ID_CONFIG); - accessSecret = config.getString(ASSUME_AWS_SECRET_ACCESS_KEY_CONFIG); initialProvider = buildProvider( middlewareRoleArn, @@ -121,7 +95,6 @@ private STSAssumeRoleSessionCredentialsProvider buildProvider( STSAssumeRoleSessionCredentialsProvider credentialsProvider; // If an existing credentials provider is provided, use it for creating the STS client if (existingProvider != null) { - log.info("Assuming customer's role"); AWSCredentials basicCredentials = existingProvider.getCredentials(); credentialsProvider = new STSAssumeRoleSessionCredentialsProvider .Builder(roleArn, roleSessionName) @@ -131,26 +104,11 @@ private STSAssumeRoleSessionCredentialsProvider buildProvider( ) .withExternalId(roleExternalId) .build(); - log.info("Assumed customer's role"); } else { - if (accessKeyId.isEmpty() || accessSecret.isEmpty()) { - log.info("default sts client will internally use default credentials chain provider"); - credentialsProvider = new STSAssumeRoleSessionCredentialsProvider - .Builder(roleArn, roleSessionName) - .withStsClient(AWSSecurityTokenServiceClientBuilder.defaultClient()) - .build(); - } else { - log.info("Using credentials provided to assume role in middleware account"); - BasicAWSCredentials basicCredentials = new BasicAWSCredentials(accessKeyId, accessSecret); - credentialsProvider = new STSAssumeRoleSessionCredentialsProvider - .Builder(roleArn, roleSessionName) - .withStsClient(AWSSecurityTokenServiceClientBuilder - .standard() - .withCredentials(new AWSStaticCredentialsProvider(basicCredentials)).build() - ) - .build(); - } - log.info("Assumed role in middleware account"); + credentialsProvider = new STSAssumeRoleSessionCredentialsProvider + .Builder(roleArn, roleSessionName) + .withStsClient(AWSSecurityTokenServiceClientBuilder.defaultClient()) + .build(); } return credentialsProvider; }