Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-31922][Connectors/AWS] Port over Kinesis Client configurations for retry and backoff strategies #165

Merged
merged 1 commit into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.aws.config;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

import java.time.Duration;

import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.accessKeyId;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.customCredentialsProviderClass;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.externalId;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.profileName;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.profilePath;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.roleArn;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.roleSessionName;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.roleStsEndpoint;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.secretKey;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.webIdentityTokenFile;

/** Configuration options for AWS service usage. */
@PublicEvolving
public class AWSConfigOptions {
karubian marked this conversation as resolved.
Show resolved Hide resolved
public static final ConfigOption<String> AWS_REGION_OPTION =
ConfigOptions.key(AWSConfigConstants.AWS_REGION)
.stringType()
.noDefaultValue()
.withDescription(
"The AWS region of the service (\"us-east-1\" is used if not set).");

public static final ConfigOption<AWSConfigConstants.CredentialProvider>
AWS_CREDENTIALS_PROVIDER_OPTION =
ConfigOptions.key(AWS_CREDENTIALS_PROVIDER)
.enumType(AWSConfigConstants.CredentialProvider.class)
.defaultValue(AWSConfigConstants.CredentialProvider.BASIC)
.withDescription(
"The credential provider type to use when AWS credentials are required (BASIC is used if not set");

public static final ConfigOption<String> AWS_ACCESS_KEY_ID_OPTION =
ConfigOptions.key(accessKeyId(AWS_CREDENTIALS_PROVIDER))
.stringType()
.noDefaultValue()
.withDescription(
"The AWS access key ID to use when setting credentials provider type to BASIC.");

public static final ConfigOption<String> AWS_SECRET_ACCESS_KEY_OPTION =
ConfigOptions.key(secretKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER))
.stringType()
.noDefaultValue()
.withDescription(
"The AWS secret key to use when setting credentials provider type to BASIC.");

public static final ConfigOption<String> AWS_PROFILE_PATH_OPTION =
ConfigOptions.key(profilePath(AWS_CREDENTIALS_PROVIDER))
.stringType()
.noDefaultValue()
.withDescription(
"Optional configuration for profile path if credential provider type is set to be PROFILE.");

public static final ConfigOption<String> AWS_PROFILE_NAME_OPTION =
ConfigOptions.key(profileName(AWS_CREDENTIALS_PROVIDER))
.stringType()
.noDefaultValue()
.withDescription(
"Optional configuration for profile name if credential provider type is set to be PROFILE.");

public static final ConfigOption<String> AWS_ROLE_STS_ENDPOINT_OPTION =
ConfigOptions.key(roleStsEndpoint(AWS_CREDENTIALS_PROVIDER))
.stringType()
.noDefaultValue()
.withDescription(
"The AWS endpoint for the STS (derived from the AWS region setting if not set) "
+ "to use if credential provider type is set to be ASSUME_ROLE.");

public static final ConfigOption<String> CUSTOM_CREDENTIALS_PROVIDER_CLASS_OPTION =
ConfigOptions.key(customCredentialsProviderClass(AWS_CREDENTIALS_PROVIDER))
.stringType()
.noDefaultValue()
.withDescription(
"The full path (e.g. org.user_company.auth.CustomAwsCredentialsProvider) to the user provided"
+ "class to use if credential provider type is set to be CUSTOM.");

public static final ConfigOption<String> AWS_ROLE_ARN_OPTION =
ConfigOptions.key(roleArn(AWS_CREDENTIALS_PROVIDER))
.stringType()
.noDefaultValue()
.withDescription(
"The role ARN to use when credential provider type is set to ASSUME_ROLE or"
+ "WEB_IDENTITY_TOKEN");

public static final ConfigOption<String> AWS_ROLE_SESSION_NAME =
ConfigOptions.key(roleSessionName(AWS_CREDENTIALS_PROVIDER))
.stringType()
.noDefaultValue()
.withDescription(
"The role session name to use when credential provider type is set to ASSUME_ROLE or"
+ "WEB_IDENTITY_TOKEN");

public static final ConfigOption<String> AWS_ROLE_EXTERNAL_ID_OPTION =
ConfigOptions.key(externalId(AWS_CREDENTIALS_PROVIDER))
.stringType()
.noDefaultValue()
.withDescription(
"The external ID to use when credential provider type is set to ASSUME_ROLE.");

public static final ConfigOption<String> AWS_WEB_IDENTITY_TOKEN_FILE =
ConfigOptions.key(webIdentityTokenFile(AWS_CREDENTIALS_PROVIDER))
.stringType()
.noDefaultValue()
.withDescription(
"The absolute path to the web identity token file that should be used if provider"
+ " type is set to WEB_IDENTITY_TOKEN.");

public static final ConfigOption<String> AWS_ROLE_CREDENTIALS_PROVIDER_OPTION =
ConfigOptions.key(webIdentityTokenFile(AWS_CREDENTIALS_PROVIDER))
.stringType()
.noDefaultValue()
.withDescription(
"The credentials provider that provides credentials for assuming the role when"
+ " credential provider type is set to ASSUME_ROLE. Roles can be nested, so"
+ " AWS_ROLE_CREDENTIALS_PROVIDER can again be set to ASSUME_ROLE");

public static final ConfigOption<String> AWS_ENDPOINT_OPTION =
ConfigOptions.key(AWSConfigConstants.AWS_ENDPOINT)
.stringType()
.noDefaultValue()
.withDescription(
"The AWS endpoint for the service (derived from the AWS region setting if not set).");

public static final ConfigOption<String> TRUST_ALL_CERTIFICATES_OPTION =
ConfigOptions.key(AWSConfigConstants.TRUST_ALL_CERTIFICATES)
.stringType()
.noDefaultValue()
.withDescription("Whether to trust all SSL certificates.");

public static final ConfigOption<String> HTTP_PROTOCOL_VERSION_OPTION =
ConfigOptions.key(AWSConfigConstants.HTTP_PROTOCOL_VERSION)
.stringType()
.noDefaultValue()
.withDescription("The HTTP protocol version to use.");

public static final ConfigOption<String> HTTP_CLIENT_MAX_CONCURRENCY_OPTION =
ConfigOptions.key(AWSConfigConstants.HTTP_CLIENT_MAX_CONCURRENCY)
.stringType()
.noDefaultValue()
.withDescription("Maximum request concurrency for SdkAsyncHttpClient.");

public static final ConfigOption<String> HTTP_CLIENT_READ_TIMEOUT_MILLIS_OPTION =
ConfigOptions.key(AWSConfigConstants.HTTP_CLIENT_READ_TIMEOUT_MILLIS)
.stringType()
.noDefaultValue()
.withDescription("Read Request timeout for SdkAsyncHttpClient.");

public static final ConfigOption<Duration> RETRY_STRATEGY_MIN_DELAY_OPTION =
ConfigOptions.key("retry-strategy.delay.min")
.durationType()
.defaultValue(Duration.ofMillis(300))
.withDescription("Base delay for the exponential backoff retry strategy");

public static final ConfigOption<Duration> RETRY_STRATEGY_MAX_DELAY_OPTION =
ConfigOptions.key("retry-strategy.delay.max")
.durationType()
.defaultValue(Duration.ofMillis(1000))
.withDescription("Max delay for the exponential backoff retry strategy");

public static final ConfigOption<Integer> RETRY_STRATEGY_MAX_ATTEMPTS_OPTION =
ConfigOptions.key("retry-strategy.attempts.max")
.intType()
.defaultValue(50)
.withDescription(
"Maximum number of attempts for the exponential backoff retry strategy");
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.aws.config.AWSConfigConstants;
import org.apache.flink.connector.aws.config.AWSConfigOptions;
import org.apache.flink.connector.aws.util.AWSClientUtil;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
Expand All @@ -49,11 +50,18 @@
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.UserCodeClassLoader;

import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.awscore.internal.AwsErrorCode;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.retries.StandardRetryStrategy;
import software.amazon.awssdk.retries.api.BackoffStrategy;
import software.amazon.awssdk.retries.api.RetryStrategy;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.utils.AttributeMap;

import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -139,7 +147,9 @@ public SourceReader<T, KinesisShardSplit> createReader(SourceReaderContext reade
Supplier<PollingKinesisShardSplitReader> splitReaderSupplier =
() ->
new PollingKinesisShardSplitReader(
createKinesisStreamProxy(sourceConfig), shardMetricGroupMap);
createKinesisStreamProxy(sourceConfig),
shardMetricGroupMap,
sourceConfig);
KinesisStreamsRecordEmitter<T> recordEmitter =
new KinesisStreamsRecordEmitter<>(deserializationSchema);

Expand Down Expand Up @@ -199,12 +209,25 @@ private KinesisStreamProxy createKinesisStreamProxy(Configuration consumerConfig
consumerConfig.addAllToProperties(kinesisClientProperties);
kinesisClientProperties.put(AWSConfigConstants.AWS_REGION, region);

final ClientOverrideConfiguration.Builder overrideBuilder =
ClientOverrideConfiguration.builder()
.retryStrategy(
createExpBackoffRetryStrategy(
sourceConfig.get(
AWSConfigOptions.RETRY_STRATEGY_MIN_DELAY_OPTION),
sourceConfig.get(
AWSConfigOptions.RETRY_STRATEGY_MAX_DELAY_OPTION),
sourceConfig.get(
AWSConfigOptions
.RETRY_STRATEGY_MAX_ATTEMPTS_OPTION)));

AWSGeneralUtil.validateAwsCredentials(kinesisClientProperties);
KinesisClient kinesisClient =
AWSClientUtil.createAwsSyncClient(
kinesisClientProperties,
httpClient,
KinesisClient.builder(),
overrideBuilder,
KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
return new KinesisStreamProxy(kinesisClient, httpClient);
Expand All @@ -225,4 +248,37 @@ public UserCodeClassLoader getUserCodeClassLoader() {
}
});
}

private RetryStrategy createExpBackoffRetryStrategy(
Duration initialDelay, Duration maxDelay, int maxAttempts) {
final BackoffStrategy backoffStrategy =
BackoffStrategy.exponentialDelayHalfJitter(initialDelay, maxDelay);

return StandardRetryStrategy.builder()
.backoffStrategy(backoffStrategy)
.throttlingBackoffStrategy(backoffStrategy)
.maxAttempts(maxAttempts)
.retryOnException(
throwable -> {
if (throwable instanceof AwsServiceException) {
AwsServiceException exception = (AwsServiceException) throwable;
return (AwsErrorCode.RETRYABLE_ERROR_CODES.contains(
exception.awsErrorDetails().errorCode()))
|| (AwsErrorCode.THROTTLING_ERROR_CODES.contains(
exception.awsErrorDetails().errorCode()));
}
return false;
})
.treatAsThrottling(
throwable -> {
if (throwable instanceof AwsServiceException) {
AwsServiceException exception = (AwsServiceException) throwable;
return AwsErrorCode.THROTTLING_ERROR_CODES.contains(
exception.awsErrorDetails().errorCode());
}
return false;
})
.circuitBreakerEnabled(false)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner;
import org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory;
import org.apache.flink.connector.kinesis.source.enumerator.assigner.UniformShardAssigner;
import org.apache.flink.connector.kinesis.source.serialization.KinesisDeserializationSchema;

import java.time.Duration;

/**
* Builder to construct the {@link KinesisStreamsSource}.
*
Expand All @@ -52,18 +56,25 @@
@Experimental
public class KinesisStreamsSourceBuilder<T> {
private String streamArn;
private Configuration sourceConfig;
private KinesisDeserializationSchema<T> deserializationSchema;
private KinesisShardAssigner kinesisShardAssigner = ShardAssignerFactory.uniformShardAssigner();
private boolean preserveShardOrder = true;
private Duration retryStrategyMinDelay;
private Duration retryStrategyMaxDelay;
private Integer retryStrategyMaxAttempts;
private final Configuration configuration;

public KinesisStreamsSourceBuilder() {
this.configuration = new Configuration();
}

public KinesisStreamsSourceBuilder<T> setStreamArn(String streamArn) {
this.streamArn = streamArn;
return this;
}

public KinesisStreamsSourceBuilder<T> setSourceConfig(Configuration sourceConfig) {
this.sourceConfig = sourceConfig;
this.configuration.addAll(sourceConfig);
return this;
}

Expand All @@ -90,12 +101,47 @@ public KinesisStreamsSourceBuilder<T> setPreserveShardOrder(boolean preserveShar
return this;
}

public KinesisStreamsSourceBuilder<T> setRetryStrategyMinDelay(Duration retryStrategyMinDelay) {
this.retryStrategyMinDelay = retryStrategyMinDelay;
return this;
}

public KinesisStreamsSourceBuilder<T> setRetryStrategyMaxDelay(Duration retryStrategyMaxDelay) {
this.retryStrategyMaxDelay = retryStrategyMaxDelay;
return this;
}

public KinesisStreamsSourceBuilder<T> setRetryStrategyMaxAttempts(
Integer retryStrategyMaxAttempts) {
this.retryStrategyMaxAttempts = retryStrategyMaxAttempts;
return this;
}

public KinesisStreamsSource<T> build() {
setSourceConfigurations();
return new KinesisStreamsSource<>(
streamArn,
sourceConfig,
configuration,
deserializationSchema,
kinesisShardAssigner,
preserveShardOrder);
}

private void setSourceConfigurations() {
overrideIfExists(
KinesisSourceConfigOptions.RETRY_STRATEGY_MIN_DELAY_OPTION,
this.retryStrategyMinDelay);
overrideIfExists(
KinesisSourceConfigOptions.RETRY_STRATEGY_MAX_DELAY_OPTION,
this.retryStrategyMaxDelay);
overrideIfExists(
KinesisSourceConfigOptions.RETRY_STRATEGY_MAX_ATTEMPTS_OPTION,
this.retryStrategyMaxAttempts);
}

private <E> void overrideIfExists(ConfigOption<E> configOption, E value) {
if (value != null) {
this.configuration.set(configOption, value);
}
}
}
Loading
Loading