Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CC-22913 : Add support for AWS IAM Assume Role Credential provider for CCloud #762

Open
wants to merge 6 commits into
base: 10.5.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.SSEAlgorithm;
import io.confluent.connect.storage.common.util.StringUtils;
import io.confluent.provider.integration.aws.AssumeRoleConfig;
import io.confluent.provider.integration.aws.ChainedAssumeRoleCredentialsProvider;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
Expand Down Expand Up @@ -118,11 +120,134 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig {
CREDENTIALS_PROVIDER_CLASS_CONFIG.lastIndexOf(".") + 1
);

/**
* Authentication related configs
*/
public static final String AUTH_METHOD_CONFIG = "authentication.method";
public static final ConfigDef.Type AUTH_METHOD_TYPE = ConfigDef.Type.STRING;
public static final String AUTH_METHOD_ACCESS_KEYS = "Access Keys";
public static final String AUTH_METHOD_IAM_ROLE = "IAM Roles";
public static final String AUTH_METHOD_DEFAULT = AUTH_METHOD_ACCESS_KEYS;
public static final ConfigDef.Validator AUTH_METHOD_VALIDATOR =
ConfigDef.ValidString.in(AUTH_METHOD_ACCESS_KEYS, AUTH_METHOD_IAM_ROLE);
public static final ConfigDef.Recommender AUTH_METHOD_RECOMMENDER = new ConfigDef.Recommender() {
@Override
public List<Object> validValues(String s, Map<String, Object> map) {
return Arrays.asList(AUTH_METHOD_ACCESS_KEYS, AUTH_METHOD_IAM_ROLE);
}

@Override
public boolean visible(String s, Map<String, Object> map) {
return true;
}
};
public static final ConfigDef.Importance AUTH_METHOD_IMPORTANCE = ConfigDef.Importance.HIGH;
public static final String AUTH_METHOD_DOC = "Select how you want to authenticate with AWS.";
public static final String AUTH_METHOD_DISPLAY_NAME = "Authentication method";
public static final String AWS_CREDENTIALS_GROUP = "AWS credentials";

/**
* Configs for Authentication Method : Access Keys
*/
public static final String AWS_ACCESS_KEY_ID_CONFIG = "aws.access.key.id";
public static final ConfigDef.Type AWS_ACCESS_KEY_ID_TYPE = Type.STRING;
public static final String AWS_ACCESS_KEY_ID_DEFAULT = "";
public static final ConfigDef.Importance AWS_ACCESS_KEY_ID_IMPORTANCE = ConfigDef.Importance.HIGH;
public static final String AWS_ACCESS_KEY_ID_DOC =
"The AWS access key ID used to authenticate personal AWS credentials such as IAM "
+ "credentials. Use only if you do not wish to authenticate by using a credentials "
+ "provider class via ``"
+ CREDENTIALS_PROVIDER_CLASS_CONFIG
+ "``";
public static final String AWS_ACCESS_KEY_ID_DISPLAY_NAME = "AWS Access Key ID";

public static final String AWS_SECRET_ACCESS_KEY_CONFIG = "aws.secret.access.key";
public static final ConfigDef.Type AWS_SECRET_ACCESS_KEY_TYPE = Type.PASSWORD;
public static final Password AWS_SECRET_ACCESS_KEY_DEFAULT = new Password(null);
public static final ConfigDef.Importance AWS_SECRET_ACCESS_KEY_IMPORTANCE =
ConfigDef.Importance.HIGH;
public static final String AWS_SECRET_ACCESS_KEY_DOC =
"The secret access key used to authenticate personal AWS credentials such as IAM "
+ "credentials. Use only if you do not wish to authenticate by using a credentials "
+ "provider class via ``"
+ CREDENTIALS_PROVIDER_CLASS_CONFIG
+ "``";
public static final String AWS_SECRET_ACCESS_KEY_DISPLAY_NAME = "AWS Secret Access Key";

/**
* Configs for Authentication Method : IAM Roles
*/
public static final String PROVIDER_INTEGRATION_ID_CONFIG = "provider.integration.id";
public static final ConfigDef.Type PROVIDER_INTEGRATION_ID_TYPE = Type.STRING;
public static final String PROVIDER_INTEGRATION_ID_DEFAULT = "";
public static final ConfigDef.Importance PROVIDER_INTEGRATION_ID_IMPORTANCE =
ConfigDef.Importance.HIGH;
public static final String PROVIDER_INTEGRATION_ID_DOC =
"Select an existing integration that has access to your resource. "
+ "In case you need to integrate a new IAM role, use provider integration.";
public static final String PROVIDER_INTEGRATION_ID_DISPLAY_NAME = "Provider Integration";

public static final String AWS_IAM_ROLE_ARN_CONFIG = "customer.aws.iam.role.arn";
public static final ConfigDef.Type AWS_IAM_ROLE_ARN_TYPE = Type.STRING;
public static final String AWS_IAM_ROLE_ARN_DEFAULT = "";
public static final String AWS_IAM_ROLE_ARN_DOC =
"The Amazon Resource Name (ARN) that identifies the AWS IAM role that Confluent "
+ "Cloud assumes to access the resources in customer's AWS account.";
public static final ConfigDef.Importance AWS_IAM_ROLE_ARN_IMPORTANCE = ConfigDef.Importance.HIGH;
public static final String AWS_IAM_ROLE_ARN_DISPLAY_NAME = "IAM Role ARN";

public static final String EXTERNAL_ID_CONFIG = "external.id";
public static final ConfigDef.Type EXTERNAL_ID_TYPE = Type.PASSWORD;
public static final Password EXTERNAL_ID_DEFAULT = new Password(null);
public static final String EXTERNAL_ID_DOC =
"The external ID that Confluent Cloud uses when it assumes the IAM "
+ "role in customer's Amazon Web Services (AWS) account.";
public static final ConfigDef.Importance EXTERNAL_ID_IMPORTANCE =
ConfigDef.Importance.HIGH;
public static final String EXTERNAL_ID_DISPLAY_NAME = "External ID";

public static final String CONFLUENT_AWS_IAM_ROLE_ARN_CONFIG = "confluent.aws.iam.role.arn";
public static final ConfigDef.Type CONFLUENT_AWS_IAM_ROLE_ARN_TYPE = Type.STRING;
public static final String CONFLUENT_AWS_IAM_ROLE_ARN_DEFAULT = "";
public static final String CONFLUENT_AWS_IAM_ROLE_ARN_DOC =
"The Amazon Resource Name (ARN) that specifies the AWS IAM role that Confluent "
+ "Cloud uses to assume the IAM role within the customer's AWS account.";
public static final ConfigDef.Importance CONFLUENT_AWS_IAM_ROLE_ARN_IMPORTANCE =
ConfigDef.Importance.HIGH;
public static final String CONFLUENT_AWS_IAM_ROLE_ARN_DISPLAY_NAME = "Confluent AWS IAM Role ARN";

public static final String CONFLUENT_MIDDLEWARE_EXTERNAL_ID_CONFIG = "middleware.external.id";
public static final ConfigDef.Type CONFLUENT_MIDDLEWARE_EXTERNAL_ID_TYPE = Type.PASSWORD;
public static final Password CONFLUENT_MIDDLEWARE_EXTERNAL_ID_DEFAULT = new Password(null);
public static final String CONFLUENT_MIDDLEWARE_EXTERNAL_ID_DOC =
"The external ID that Confluent Cloud uses when it assumes "
+ "the IAM role in confluent middleware account.";
public static final ConfigDef.Importance CONFLUENT_MIDDLEWARE_EXTERNAL_ID_IMPORTANCE =
ConfigDef.Importance.HIGH;
public static final String CONFLUENT_MIDDLEWARE_EXTERNAL_ID_DISPLAY_NAME =
"Middleware External ID";

public static final String PROVIDER_INTEGRATION_MAX_RETRIES_CONFIG =
"provider.integration.max.retries";
public static final ConfigDef.Type PROVIDER_INTEGRATION_MAX_RETRIES_TYPE = Type.INT;
public static final int PROVIDER_INTEGRATION_MAX_RETRIES_DEFAULT = 10;
public static final String PROVIDER_INTEGRATION_MAX_RETRIES_DOC =
"The max retries for provider integration to establish connection";
public static final ConfigDef.Importance PROVIDER_INTEGRATION_MAX_RETRIES_IMPORTANCE =
ConfigDef.Importance.LOW;
public static final String PROVIDER_INTEGRATION_MAX_RETRIES_DISPLAY_NAME =
"MProvider Integration Max retries";

public static final String AWS_ASSUME_IAM_ROLE_SESSION_NAME_CONFIG =
"aws.iam.assume.role.session.name";
public static final ConfigDef.Type AWS_ASSUME_IAM_ROLE_SESSION_NAME_TYPE = Type.STRING;
public static final String AWS_ASSUME_IAM_ROLE_SESSION_NAME_DEFAULT = "";
public static final String AWS_ASSUME_IAM_ROLE_SESSION_NAME_DOC =
"Specify the name for the assumed role session.";
public static final ConfigDef.Importance AWS_ASSUME_IAM_ROLE_SESSION_NAME_IMPORTANCE =
ConfigDef.Importance.HIGH;
public static final String AWS_ASSUME_IAM_ROLE_SESSION_NAME_DISPLAY_NAME =
"AWS IAM Assume Role Session Name";

public static final String REGION_CONFIG = "s3.region";
public static final String REGION_DEFAULT = Regions.DEFAULT_REGION.getName();
Expand Down Expand Up @@ -375,37 +500,7 @@ public static ConfigDef newConfigDef() {
"AWS Credentials Provider Class"
);

configDef.define(
AWS_ACCESS_KEY_ID_CONFIG,
Type.STRING,
AWS_ACCESS_KEY_ID_DEFAULT,
Importance.HIGH,
"The AWS access key ID used to authenticate personal AWS credentials such as IAM "
+ "credentials. Use only if you do not wish to authenticate by using a credentials "
+ "provider class via ``"
+ CREDENTIALS_PROVIDER_CLASS_CONFIG
+ "``",
group,
++orderInGroup,
Width.LONG,
"AWS Access Key ID"
);

configDef.define(
AWS_SECRET_ACCESS_KEY_CONFIG,
Type.PASSWORD,
AWS_SECRET_ACCESS_KEY_DEFAULT,
Importance.HIGH,
"The secret access key used to authenticate personal AWS credentials such as IAM "
+ "credentials. Use only if you do not wish to authenticate by using a credentials "
+ "provider class via ``"
+ CREDENTIALS_PROVIDER_CLASS_CONFIG
+ "``",
group,
++orderInGroup,
Width.LONG,
"AWS Secret Access Key"
);
addAuthenticationConfigs(configDef);

List<String> validSsea = new ArrayList<>(SSEAlgorithm.values().length + 1);
validSsea.add("");
Expand Down Expand Up @@ -801,6 +896,116 @@ public static ConfigDef newConfigDef() {
return configDef;
}

private static void addAuthenticationConfigs(ConfigDef configDef) {
int orderInGroup = 1;
configDef.define(
AUTH_METHOD_CONFIG,
AUTH_METHOD_TYPE,
AUTH_METHOD_DEFAULT,
AUTH_METHOD_VALIDATOR,
AUTH_METHOD_IMPORTANCE,
AUTH_METHOD_DOC,
AWS_CREDENTIALS_GROUP,
orderInGroup++,
Width.LONG,
AUTH_METHOD_DISPLAY_NAME,
AUTH_METHOD_RECOMMENDER)
.define(
AWS_ACCESS_KEY_ID_CONFIG,
AWS_ACCESS_KEY_ID_TYPE,
AWS_ACCESS_KEY_ID_DEFAULT,
AWS_ACCESS_KEY_ID_IMPORTANCE,
AWS_ACCESS_KEY_ID_DOC,
AWS_CREDENTIALS_GROUP,
orderInGroup++,
Width.LONG,
AWS_ACCESS_KEY_ID_DISPLAY_NAME)
.define(
AWS_SECRET_ACCESS_KEY_CONFIG,
AWS_SECRET_ACCESS_KEY_TYPE,
AWS_SECRET_ACCESS_KEY_DEFAULT,
AWS_SECRET_ACCESS_KEY_IMPORTANCE,
AWS_SECRET_ACCESS_KEY_DOC,
AWS_CREDENTIALS_GROUP,
orderInGroup++,
Width.LONG,
AWS_SECRET_ACCESS_KEY_DISPLAY_NAME)
.define(
PROVIDER_INTEGRATION_ID_CONFIG,
PROVIDER_INTEGRATION_ID_TYPE,
PROVIDER_INTEGRATION_ID_DEFAULT,
PROVIDER_INTEGRATION_ID_IMPORTANCE,
PROVIDER_INTEGRATION_ID_DOC,
AWS_CREDENTIALS_GROUP,
orderInGroup++,
Width.LONG,
PROVIDER_INTEGRATION_ID_DISPLAY_NAME)
.define(
AWS_IAM_ROLE_ARN_CONFIG,
AWS_IAM_ROLE_ARN_TYPE,
AWS_IAM_ROLE_ARN_DEFAULT,
AWS_IAM_ROLE_ARN_IMPORTANCE,
AWS_IAM_ROLE_ARN_DOC,
AWS_CREDENTIALS_GROUP,
orderInGroup++,
Width.LONG,
AWS_IAM_ROLE_ARN_DISPLAY_NAME)
.define(
EXTERNAL_ID_CONFIG,
EXTERNAL_ID_TYPE,
EXTERNAL_ID_DEFAULT,
EXTERNAL_ID_IMPORTANCE,
EXTERNAL_ID_DOC,
AWS_CREDENTIALS_GROUP,
orderInGroup++,
Width.LONG,
EXTERNAL_ID_DISPLAY_NAME)
.define(
CONFLUENT_AWS_IAM_ROLE_ARN_CONFIG,
CONFLUENT_AWS_IAM_ROLE_ARN_TYPE,
CONFLUENT_AWS_IAM_ROLE_ARN_DEFAULT,
CONFLUENT_AWS_IAM_ROLE_ARN_IMPORTANCE,
CONFLUENT_AWS_IAM_ROLE_ARN_DOC,
AWS_CREDENTIALS_GROUP,
orderInGroup,
Width.LONG,
CONFLUENT_AWS_IAM_ROLE_ARN_DISPLAY_NAME
)
.define(
AWS_ASSUME_IAM_ROLE_SESSION_NAME_CONFIG,
AWS_ASSUME_IAM_ROLE_SESSION_NAME_TYPE,
AWS_ASSUME_IAM_ROLE_SESSION_NAME_DEFAULT,
AWS_ASSUME_IAM_ROLE_SESSION_NAME_IMPORTANCE,
AWS_ASSUME_IAM_ROLE_SESSION_NAME_DOC,
AWS_CREDENTIALS_GROUP,
orderInGroup,
Width.LONG,
AWS_ASSUME_IAM_ROLE_SESSION_NAME_DISPLAY_NAME
)
.define(
CONFLUENT_MIDDLEWARE_EXTERNAL_ID_CONFIG,
CONFLUENT_MIDDLEWARE_EXTERNAL_ID_TYPE,
CONFLUENT_MIDDLEWARE_EXTERNAL_ID_DEFAULT,
CONFLUENT_MIDDLEWARE_EXTERNAL_ID_IMPORTANCE,
CONFLUENT_MIDDLEWARE_EXTERNAL_ID_DOC,
AWS_CREDENTIALS_GROUP,
orderInGroup,
Width.LONG,
CONFLUENT_MIDDLEWARE_EXTERNAL_ID_DISPLAY_NAME
)
.define(
PROVIDER_INTEGRATION_MAX_RETRIES_CONFIG,
PROVIDER_INTEGRATION_MAX_RETRIES_TYPE,
PROVIDER_INTEGRATION_MAX_RETRIES_DEFAULT,
PROVIDER_INTEGRATION_MAX_RETRIES_IMPORTANCE,
PROVIDER_INTEGRATION_MAX_RETRIES_DOC,
AWS_CREDENTIALS_GROUP,
orderInGroup,
Width.LONG,
PROVIDER_INTEGRATION_MAX_RETRIES_DISPLAY_NAME
);
}

public S3SinkConnectorConfig(Map<String, String> props) {
this(newConfigDef(), props);
}
Expand Down Expand Up @@ -878,6 +1083,34 @@ public Password awsSecretKeyId() {
return getPassword(AWS_SECRET_ACCESS_KEY_CONFIG);
}

public String getAuthenticationMethod() {
return getString(AUTH_METHOD_CONFIG);
}

public String getAwsRoleArn() {
return getString(AWS_IAM_ROLE_ARN_CONFIG);
}

public String getConfluentAwsRoleArn() {
return getString(CONFLUENT_AWS_IAM_ROLE_ARN_CONFIG);
}

public Password getExternalID() {
return getPassword(EXTERNAL_ID_CONFIG);
}

public String getAwsAssumeIamRoleSessionName() {
return getString(AWS_ASSUME_IAM_ROLE_SESSION_NAME_CONFIG);
}

public Password getMiddlewareExternalID() {
return getPassword(CONFLUENT_MIDDLEWARE_EXTERNAL_ID_CONFIG);
}

public int getProviderIntegrationMaxRetries() {
return getInt(PROVIDER_INTEGRATION_MAX_RETRIES_CONFIG);
}

public int getPartSize() {
return getInt(PART_SIZE_CONFIG);
}
Expand All @@ -887,6 +1120,7 @@ public AWSCredentialsProvider getCredentialsProvider() {
try {
AWSCredentialsProvider provider = ((Class<? extends AWSCredentialsProvider>)
getClass(S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG)).newInstance();
String authMethod = getAuthenticationMethod();

if (provider instanceof Configurable) {
Map<String, Object> configs = originalsWithPrefix(CREDENTIALS_PROVIDER_CONFIG_PREFIX);
Expand All @@ -898,6 +1132,39 @@ public AWSCredentialsProvider getCredentialsProvider() {
configs.put(AWS_SECRET_ACCESS_KEY_CONFIG, awsSecretKeyId().value());

((Configurable) provider).configure(configs);
} else if (AUTH_METHOD_IAM_ROLE.equalsIgnoreCase(authMethod)) {
final String sessionName = getAwsAssumeIamRoleSessionName();
final String middlewareSessionName = "middleware-" + sessionName;
final int maxRetries = getProviderIntegrationMaxRetries();

// middleware AssumeRole config
final String confluentAwsRoleArn = getConfluentAwsRoleArn();
final String middlewareExternalId = getMiddlewareExternalID().value();
AssumeRoleConfig.Builder confluentMiddlewareConfigBuilder =
new AssumeRoleConfig.Builder(confluentAwsRoleArn)
.withSessionName(middlewareSessionName);
if (!StringUtils.isBlank(middlewareExternalId)) {
confluentMiddlewareConfigBuilder =
confluentMiddlewareConfigBuilder.withExternalId(middlewareExternalId);
}
final AssumeRoleConfig confluentMiddlewareConfig = confluentMiddlewareConfigBuilder.build();

// customer AssumeRole config
final String awsRoleArn = getAwsRoleArn();
final String externalId = getExternalID().value();
AssumeRoleConfig.Builder customerConfigBuilder =
new AssumeRoleConfig.Builder(awsRoleArn)
.withSessionName(sessionName);
if (!StringUtils.isBlank(externalId)) {
customerConfigBuilder = customerConfigBuilder.withExternalId(externalId);
}
final AssumeRoleConfig customerConfig = customerConfigBuilder.build();

// build the credential provider
provider = new ChainedAssumeRoleCredentialsProvider
.Builder(confluentMiddlewareConfig, customerConfig)
.withMaxRetries(maxRetries)
.build();
} else {
final String accessKeyId = awsAccessKeyId();
final String secretKey = awsSecretKeyId().value();
Expand Down
Loading