From d03f11bb022abd1b9ca9dae3b99985d9dd7957b5 Mon Sep 17 00:00:00 2001 From: Scott Miller Date: Fri, 22 Nov 2024 04:44:55 -0500 Subject: [PATCH 1/3] feat(aws_s3 sink): add option to use virtual addressing Signed-off-by: Scott Miller --- ...ption-to-use-virtual-addressing.feature.md | 3 +++ src/aws/mod.rs | 25 ++++++++++++++++--- src/common/s3.rs | 5 ++++ src/common/sqs.rs | 4 +++ src/secrets/aws_secrets_manager.rs | 5 ++++ src/sinks/aws_cloudwatch_logs/config.rs | 5 ++++ .../aws_cloudwatch_logs/integration_tests.rs | 8 +++--- src/sinks/aws_cloudwatch_metrics/mod.rs | 5 ++++ src/sinks/aws_kinesis/firehose/config.rs | 5 ++++ .../aws_kinesis/firehose/integration_tests.rs | 1 + src/sinks/aws_kinesis/streams/config.rs | 5 ++++ .../aws_kinesis/streams/integration_tests.rs | 1 + src/sinks/aws_s3/config.rs | 16 +++++++++++- src/sinks/aws_s3/integration_tests.rs | 3 +++ src/sinks/aws_s_s/sns/config.rs | 5 ++++ src/sinks/aws_s_s/sns/integration_tests.rs | 2 ++ src/sinks/aws_s_s/sqs/config.rs | 1 + src/sinks/aws_s_s/sqs/integration_tests.rs | 1 + src/sinks/s3_common/config.rs | 14 ++++++++--- src/sources/aws_s3/mod.rs | 4 +++ src/sources/aws_sqs/config.rs | 1 + .../components/sinks/base/aws_s3.cue | 9 +++++++ 22 files changed, 117 insertions(+), 11 deletions(-) create mode 100644 changelog.d/21999-add-option-to-use-virtual-addressing.feature.md diff --git a/changelog.d/21999-add-option-to-use-virtual-addressing.feature.md b/changelog.d/21999-add-option-to-use-virtual-addressing.feature.md new file mode 100644 index 0000000000000..5149bebb686aa --- /dev/null +++ b/changelog.d/21999-add-option-to-use-virtual-addressing.feature.md @@ -0,0 +1,3 @@ +Adds a `force_path_style` option to the `aws_s3` sink that allows users to configure virtual host style addressing. The value defaults to `true` to maintain existing behavior. + +authors: sam6258 diff --git a/src/aws/mod.rs b/src/aws/mod.rs index 775bb4c08a85f..e211a0091877e 100644 --- a/src/aws/mod.rs +++ b/src/aws/mod.rs @@ -124,6 +124,9 @@ pub trait ClientBuilder { /// Build the client using the given config settings. fn build(config: &SdkConfig) -> Self::Client; + + /// Build the client using the given config settings and path style addressing. + fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client; } fn region_provider( @@ -168,10 +171,19 @@ pub async fn create_client( proxy: &ProxyConfig, tls_options: &Option, timeout: &Option, + force_path_style: impl Into, ) -> crate::Result { - create_client_and_region::(auth, region, endpoint, proxy, tls_options, timeout) - .await - .map(|(client, _)| client) + create_client_and_region::( + auth, + region, + endpoint, + proxy, + tls_options, + timeout, + force_path_style, + ) + .await + .map(|(client, _)| client) } /// Create the SDK client and resolve the region using the provided settings. @@ -182,6 +194,7 @@ pub async fn create_client_and_region( proxy: &ProxyConfig, tls_options: &Option, timeout: &Option, + force_path_style: impl Into, ) -> crate::Result<(T::Client, Region)> { let retry_config = RetryConfig::disabled(); @@ -239,7 +252,11 @@ pub async fn create_client_and_region( let config = config_builder.build(); - Ok((T::build(&config), region)) + if force_path_style.into() { + Ok((T::build_and_force_path_style(&config), region)) + } else { + Ok((T::build(&config), region)) + } } #[derive(Snafu, Debug)] diff --git a/src/common/s3.rs b/src/common/s3.rs index cdb69725b4c66..29c4cbca11cdc 100644 --- a/src/common/s3.rs +++ b/src/common/s3.rs @@ -8,6 +8,11 @@ impl ClientBuilder for S3ClientBuilder { type Client = aws_sdk_s3::client::Client; fn build(config: &aws_types::SdkConfig) -> Self::Client { + let config = config::Builder::from(config).build(); + aws_sdk_s3::client::Client::from_conf(config) + } + + fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { let config = config::Builder::from(config).force_path_style(true).build(); aws_sdk_s3::client::Client::from_conf(config) } diff --git a/src/common/sqs.rs b/src/common/sqs.rs index f02aa2f7c4021..c02fd4ceafef5 100644 --- a/src/common/sqs.rs +++ b/src/common/sqs.rs @@ -8,4 +8,8 @@ impl ClientBuilder for SqsClientBuilder { fn build(config: &aws_types::SdkConfig) -> Self::Client { aws_sdk_sqs::client::Client::new(config) } + + fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { + SqsClientBuilder::build(config) + } } diff --git a/src/secrets/aws_secrets_manager.rs b/src/secrets/aws_secrets_manager.rs index f3c221b42eee0..0a86456a8506d 100644 --- a/src/secrets/aws_secrets_manager.rs +++ b/src/secrets/aws_secrets_manager.rs @@ -17,6 +17,10 @@ impl ClientBuilder for SecretsManagerClientBuilder { let config = config::Builder::from(config).build(); Client::from_conf(config) } + + fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { + SecretsManagerClientBuilder::build(config) + } } /// Configuration for the `aws_secrets_manager` secrets backend. @@ -63,6 +67,7 @@ impl SecretBackend for AwsSecretsManagerBackend { &ProxyConfig::default(), &self.tls, &None, + false, ) .await?; diff --git a/src/sinks/aws_cloudwatch_logs/config.rs b/src/sinks/aws_cloudwatch_logs/config.rs index 0dc7f90917620..e83c88cc47dab 100644 --- a/src/sinks/aws_cloudwatch_logs/config.rs +++ b/src/sinks/aws_cloudwatch_logs/config.rs @@ -36,6 +36,10 @@ impl ClientBuilder for CloudwatchLogsClientBuilder { fn build(config: &aws_types::SdkConfig) -> Self::Client { aws_sdk_cloudwatchlogs::client::Client::new(config) } + + fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { + CloudwatchLogsClientBuilder::build(config) + } } #[configurable_component] @@ -175,6 +179,7 @@ impl CloudwatchLogsSinkConfig { proxy, &self.tls, &None, + false, ) .await } diff --git a/src/sinks/aws_cloudwatch_logs/integration_tests.rs b/src/sinks/aws_cloudwatch_logs/integration_tests.rs index 4e80493adaf88..6fc2ad7faad7e 100644 --- a/src/sinks/aws_cloudwatch_logs/integration_tests.rs +++ b/src/sinks/aws_cloudwatch_logs/integration_tests.rs @@ -467,9 +467,11 @@ async fn create_client_test() -> CloudwatchLogsClient { let endpoint = Some(cloudwatch_address()); let proxy = ProxyConfig::default(); - create_client::(&auth, region, endpoint, &proxy, &None, &None) - .await - .unwrap() + create_client::( + &auth, region, endpoint, &proxy, &None, &None, false, + ) + .await + .unwrap() } async fn ensure_group() { diff --git a/src/sinks/aws_cloudwatch_metrics/mod.rs b/src/sinks/aws_cloudwatch_metrics/mod.rs index 371b1e29e79a4..2e369d1728d8b 100644 --- a/src/sinks/aws_cloudwatch_metrics/mod.rs +++ b/src/sinks/aws_cloudwatch_metrics/mod.rs @@ -122,6 +122,10 @@ impl ClientBuilder for CloudwatchMetricsClientBuilder { fn build(config: &aws_types::SdkConfig) -> Self::Client { aws_sdk_cloudwatch::client::Client::new(config) } + + fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { + CloudwatchMetricsClientBuilder::build(config) + } } #[async_trait::async_trait] @@ -178,6 +182,7 @@ impl CloudWatchMetricsSinkConfig { proxy, &self.tls, &None, + false, ) .await } diff --git a/src/sinks/aws_kinesis/firehose/config.rs b/src/sinks/aws_kinesis/firehose/config.rs index 96b032f4b5738..e796d91dd68ee 100644 --- a/src/sinks/aws_kinesis/firehose/config.rs +++ b/src/sinks/aws_kinesis/firehose/config.rs @@ -41,6 +41,10 @@ impl ClientBuilder for KinesisFirehoseClientBuilder { fn build(config: &aws_types::SdkConfig) -> Self::Client { Self::Client::new(config) } + + fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { + KinesisFirehoseClientBuilder::build(config) + } } // AWS Kinesis Firehose API accepts payloads up to 4MB or 500 events @@ -108,6 +112,7 @@ impl KinesisFirehoseSinkConfig { proxy, &self.base.tls, &None, + false, ) .await } diff --git a/src/sinks/aws_kinesis/firehose/integration_tests.rs b/src/sinks/aws_kinesis/firehose/integration_tests.rs index 00f06301eb164..26ba4f3f5d6c2 100644 --- a/src/sinks/aws_kinesis/firehose/integration_tests.rs +++ b/src/sinks/aws_kinesis/firehose/integration_tests.rs @@ -261,6 +261,7 @@ async fn firehose_client() -> aws_sdk_firehose::Client { &proxy, &None, &None, + false, ) .await .unwrap() diff --git a/src/sinks/aws_kinesis/streams/config.rs b/src/sinks/aws_kinesis/streams/config.rs index aa1896d0c7333..a95ca18a495d1 100644 --- a/src/sinks/aws_kinesis/streams/config.rs +++ b/src/sinks/aws_kinesis/streams/config.rs @@ -40,6 +40,10 @@ impl ClientBuilder for KinesisClientBuilder { fn build(config: &aws_types::SdkConfig) -> Self::Client { KinesisClient::new(config) } + + fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { + KinesisClientBuilder::build(config) + } } pub const MAX_PAYLOAD_SIZE: usize = 5_000_000; @@ -105,6 +109,7 @@ impl KinesisStreamsSinkConfig { proxy, &self.base.tls, &None, + false, ) .await } diff --git a/src/sinks/aws_kinesis/streams/integration_tests.rs b/src/sinks/aws_kinesis/streams/integration_tests.rs index 57958858f0122..fe967f3c29e06 100644 --- a/src/sinks/aws_kinesis/streams/integration_tests.rs +++ b/src/sinks/aws_kinesis/streams/integration_tests.rs @@ -183,6 +183,7 @@ async fn client() -> aws_sdk_kinesis::Client { &proxy, &None, &None, + false, ) .await .unwrap() diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index c1c8067fb8909..97558dc10f0d6 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -139,6 +139,12 @@ pub struct S3SinkConfig { #[configurable(derived)] #[serde(default)] pub timezone: Option, + + /// Specifies which addressing style to use. + /// + /// This controls if the bucket name is in the hostname or part of the URL. + #[serde(default = "crate::serde::default_true")] + pub force_path_style: bool, } pub(super) fn default_key_prefix() -> String { @@ -167,6 +173,7 @@ impl GenerateConfig for S3SinkConfig { auth: AwsAuthentication::default(), acknowledgements: Default::default(), timezone: Default::default(), + force_path_style: Default::default(), }) .unwrap() } @@ -251,7 +258,14 @@ impl S3SinkConfig { } pub async fn create_service(&self, proxy: &ProxyConfig) -> crate::Result { - s3_common::config::create_service(&self.region, &self.auth, proxy, &self.tls).await + s3_common::config::create_service( + &self.region, + &self.auth, + proxy, + &self.tls, + self.force_path_style, + ) + .await } } diff --git a/src/sinks/aws_s3/integration_tests.rs b/src/sinks/aws_s3/integration_tests.rs index e86c8956895c6..c099b694681b2 100644 --- a/src/sinks/aws_s3/integration_tests.rs +++ b/src/sinks/aws_s3/integration_tests.rs @@ -438,6 +438,7 @@ async fn s3_flush_on_exhaustion() { auth: Default::default(), acknowledgements: Default::default(), timezone: Default::default(), + force_path_style: true, } }; let prefix = config.key_prefix.clone(); @@ -496,6 +497,7 @@ async fn client() -> S3Client { &proxy, &tls_options, &None, + true, ) .await .unwrap() @@ -522,6 +524,7 @@ fn config(bucket: &str, batch_size: usize) -> S3SinkConfig { auth: Default::default(), acknowledgements: Default::default(), timezone: Default::default(), + force_path_style: true, } } diff --git a/src/sinks/aws_s_s/sns/config.rs b/src/sinks/aws_s_s/sns/config.rs index 3463443d3639c..447c11b5c0fc4 100644 --- a/src/sinks/aws_s_s/sns/config.rs +++ b/src/sinks/aws_s_s/sns/config.rs @@ -54,6 +54,7 @@ impl SnsSinkConfig { proxy, &self.base_config.tls, &None, + false, ) .await } @@ -111,6 +112,10 @@ impl ClientBuilder for SnsClientBuilder { fn build(config: &aws_types::SdkConfig) -> Self::Client { aws_sdk_sns::client::Client::new(config) } + + fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { + SnsClientBuilder::build(config) + } } pub(super) async fn healthcheck(client: SnsClient, topic_arn: String) -> crate::Result<()> { diff --git a/src/sinks/aws_s_s/sns/integration_tests.rs b/src/sinks/aws_s_s/sns/integration_tests.rs index 30b43d8b9b116..3e46615c9366c 100644 --- a/src/sinks/aws_s_s/sns/integration_tests.rs +++ b/src/sinks/aws_s_s/sns/integration_tests.rs @@ -38,6 +38,7 @@ async fn create_sns_test_client() -> SnsClient { &proxy, &None, &None, + false, ) .await .unwrap() @@ -59,6 +60,7 @@ async fn create_sqs_test_client() -> SqsClient { &proxy, &None, &None, + false, ) .await .unwrap() diff --git a/src/sinks/aws_s_s/sqs/config.rs b/src/sinks/aws_s_s/sqs/config.rs index a936a6badc03f..8b8afa84e2f8a 100644 --- a/src/sinks/aws_s_s/sqs/config.rs +++ b/src/sinks/aws_s_s/sqs/config.rs @@ -55,6 +55,7 @@ impl SqsSinkConfig { proxy, &self.base_config.tls, &None, + false, ) .await } diff --git a/src/sinks/aws_s_s/sqs/integration_tests.rs b/src/sinks/aws_s_s/sqs/integration_tests.rs index 3428caa4374fe..8046b2a7cd4f4 100644 --- a/src/sinks/aws_s_s/sqs/integration_tests.rs +++ b/src/sinks/aws_s_s/sqs/integration_tests.rs @@ -36,6 +36,7 @@ async fn create_test_client() -> SqsClient { &proxy, &None, &None, + false, ) .await .unwrap() diff --git a/src/sinks/s3_common/config.rs b/src/sinks/s3_common/config.rs index 2f87f3d6754a2..adf52a4c5ce45 100644 --- a/src/sinks/s3_common/config.rs +++ b/src/sinks/s3_common/config.rs @@ -363,12 +363,20 @@ pub async fn create_service( auth: &AwsAuthentication, proxy: &ProxyConfig, tls_options: &Option, + force_path_style: impl Into, ) -> crate::Result { let endpoint = region.endpoint(); let region = region.region(); - let client = - create_client::(auth, region.clone(), endpoint, proxy, tls_options, &None) - .await?; + let client = create_client::( + auth, + region.clone(), + endpoint, + proxy, + tls_options, + &None, + force_path_style, + ) + .await?; Ok(S3Service::new(client)) } diff --git a/src/sources/aws_s3/mod.rs b/src/sources/aws_s3/mod.rs index 988eb55bfcce4..8a889142efdd9 100644 --- a/src/sources/aws_s3/mod.rs +++ b/src/sources/aws_s3/mod.rs @@ -238,6 +238,7 @@ impl AwsS3Config { proxy, &self.tls_options, &None, + true, ) .await?; @@ -254,6 +255,7 @@ impl AwsS3Config { proxy, &sqs.tls_options, &sqs.timeout, + false, ) .await?; @@ -1023,6 +1025,7 @@ mod integration_tests { &proxy_config, &None, &None, + true, ) .await .unwrap() @@ -1042,6 +1045,7 @@ mod integration_tests { &proxy_config, &None, &None, + false, ) .await .unwrap() diff --git a/src/sources/aws_sqs/config.rs b/src/sources/aws_sqs/config.rs index 3d0583a007688..a97260e3b2ed5 100644 --- a/src/sources/aws_sqs/config.rs +++ b/src/sources/aws_sqs/config.rs @@ -167,6 +167,7 @@ impl AwsSqsConfig { &cx.proxy, &self.tls, &None, + false, ) .await } diff --git a/website/cue/reference/components/sinks/base/aws_s3.cue b/website/cue/reference/components/sinks/base/aws_s3.cue index 48bf7eeba61f1..cf5b5c375062c 100644 --- a/website/cue/reference/components/sinks/base/aws_s3.cue +++ b/website/cue/reference/components/sinks/base/aws_s3.cue @@ -615,6 +615,15 @@ base: components: sinks: aws_s3: configuration: { required: false type: string: default: "%s" } + force_path_style: { + description: """ + Specifies which addressing style to use. + + This controls if the bucket name is in the hostname or part of the URL. + """ + required: false + type: bool: default: true + } framing: { description: "Framing configuration." required: false From 337b6676e2e9d0ef6766ddfdd3b34d248c812402 Mon Sep 17 00:00:00 2001 From: Scott Miller Date: Thu, 19 Dec 2024 00:41:51 -0500 Subject: [PATCH 2/3] fix(aws_s3 sink): move force_path_style to S3ClientBuilder Signed-off-by: Scott Miller --- src/aws/mod.rs | 43 ++++++++----------- src/common/s3.rs | 18 ++++---- src/common/sqs.rs | 6 +-- src/secrets/aws_secrets_manager.rs | 8 +--- src/sinks/aws_cloudwatch_logs/config.rs | 8 +--- .../aws_cloudwatch_logs/integration_tests.rs | 8 +++- src/sinks/aws_cloudwatch_metrics/mod.rs | 8 +--- src/sinks/aws_kinesis/firehose/config.rs | 8 +--- .../aws_kinesis/firehose/integration_tests.rs | 2 +- src/sinks/aws_kinesis/streams/config.rs | 8 +--- .../aws_kinesis/streams/integration_tests.rs | 2 +- src/sinks/aws_s3/integration_tests.rs | 6 ++- src/sinks/aws_s_s/sns/config.rs | 8 +--- src/sinks/aws_s_s/sns/integration_tests.rs | 4 +- src/sinks/aws_s_s/sqs/config.rs | 2 +- src/sinks/aws_s_s/sqs/integration_tests.rs | 2 +- src/sinks/s3_common/config.rs | 6 ++- src/sources/aws_s3/mod.rs | 14 ++++-- src/sources/aws_sqs/config.rs | 2 +- 19 files changed, 74 insertions(+), 89 deletions(-) diff --git a/src/aws/mod.rs b/src/aws/mod.rs index e211a0091877e..ab2728c4a08bc 100644 --- a/src/aws/mod.rs +++ b/src/aws/mod.rs @@ -123,10 +123,7 @@ pub trait ClientBuilder { type Client; /// Build the client using the given config settings. - fn build(config: &SdkConfig) -> Self::Client; - - /// Build the client using the given config settings and path style addressing. - fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client; + fn build(&self, config: &SdkConfig) -> Self::Client; } fn region_provider( @@ -164,38 +161,36 @@ async fn resolve_region( } /// Create the SDK client using the provided settings. -pub async fn create_client( +pub async fn create_client( + builder: &T, auth: &AwsAuthentication, region: Option, endpoint: Option, proxy: &ProxyConfig, tls_options: &Option, timeout: &Option, - force_path_style: impl Into, -) -> crate::Result { - create_client_and_region::( - auth, - region, - endpoint, - proxy, - tls_options, - timeout, - force_path_style, - ) - .await - .map(|(client, _)| client) +) -> crate::Result +where + T: ClientBuilder, +{ + create_client_and_region::(builder, auth, region, endpoint, proxy, tls_options, timeout) + .await + .map(|(client, _)| client) } /// Create the SDK client and resolve the region using the provided settings. -pub async fn create_client_and_region( +pub async fn create_client_and_region( + builder: &T, auth: &AwsAuthentication, region: Option, endpoint: Option, proxy: &ProxyConfig, tls_options: &Option, timeout: &Option, - force_path_style: impl Into, -) -> crate::Result<(T::Client, Region)> { +) -> crate::Result<(T::Client, Region)> +where + T: ClientBuilder, +{ let retry_config = RetryConfig::disabled(); // The default credentials chains will look for a region if not given but we'd like to @@ -252,11 +247,7 @@ pub async fn create_client_and_region( let config = config_builder.build(); - if force_path_style.into() { - Ok((T::build_and_force_path_style(&config), region)) - } else { - Ok((T::build(&config), region)) - } + Ok((T::build(builder, &config), region)) } #[derive(Snafu, Debug)] diff --git a/src/common/s3.rs b/src/common/s3.rs index 29c4cbca11cdc..d8e610874f4bd 100644 --- a/src/common/s3.rs +++ b/src/common/s3.rs @@ -2,18 +2,20 @@ use aws_sdk_s3::config; use crate::aws::ClientBuilder; -pub(crate) struct S3ClientBuilder; +pub(crate) struct S3ClientBuilder { + pub force_path_style: Option, +} impl ClientBuilder for S3ClientBuilder { type Client = aws_sdk_s3::client::Client; - fn build(config: &aws_types::SdkConfig) -> Self::Client { - let config = config::Builder::from(config).build(); - aws_sdk_s3::client::Client::from_conf(config) - } + fn build(&self, config: &aws_types::SdkConfig) -> Self::Client { + let mut builder = config::Builder::from(config); + + if let Some(true) = self.force_path_style { + builder = builder.force_path_style(true); + } - fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { - let config = config::Builder::from(config).force_path_style(true).build(); - aws_sdk_s3::client::Client::from_conf(config) + aws_sdk_s3::client::Client::from_conf(builder.build()) } } diff --git a/src/common/sqs.rs b/src/common/sqs.rs index c02fd4ceafef5..8f4ff3ecab69e 100644 --- a/src/common/sqs.rs +++ b/src/common/sqs.rs @@ -5,11 +5,7 @@ pub(crate) struct SqsClientBuilder; impl ClientBuilder for SqsClientBuilder { type Client = aws_sdk_sqs::client::Client; - fn build(config: &aws_types::SdkConfig) -> Self::Client { + fn build(&self, config: &aws_types::SdkConfig) -> Self::Client { aws_sdk_sqs::client::Client::new(config) } - - fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { - SqsClientBuilder::build(config) - } } diff --git a/src/secrets/aws_secrets_manager.rs b/src/secrets/aws_secrets_manager.rs index 0a86456a8506d..2f61f31e74a09 100644 --- a/src/secrets/aws_secrets_manager.rs +++ b/src/secrets/aws_secrets_manager.rs @@ -13,14 +13,10 @@ pub(crate) struct SecretsManagerClientBuilder; impl ClientBuilder for SecretsManagerClientBuilder { type Client = Client; - fn build(config: &aws_types::SdkConfig) -> Self::Client { + fn build(&self, config: &aws_types::SdkConfig) -> Self::Client { let config = config::Builder::from(config).build(); Client::from_conf(config) } - - fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { - SecretsManagerClientBuilder::build(config) - } } /// Configuration for the `aws_secrets_manager` secrets backend. @@ -61,13 +57,13 @@ impl SecretBackend for AwsSecretsManagerBackend { _: &mut signal::SignalRx, ) -> crate::Result> { let client = create_client::( + &SecretsManagerClientBuilder {}, &self.auth, self.region.region(), self.region.endpoint(), &ProxyConfig::default(), &self.tls, &None, - false, ) .await?; diff --git a/src/sinks/aws_cloudwatch_logs/config.rs b/src/sinks/aws_cloudwatch_logs/config.rs index e83c88cc47dab..d84076884a37e 100644 --- a/src/sinks/aws_cloudwatch_logs/config.rs +++ b/src/sinks/aws_cloudwatch_logs/config.rs @@ -33,13 +33,9 @@ pub struct CloudwatchLogsClientBuilder; impl ClientBuilder for CloudwatchLogsClientBuilder { type Client = aws_sdk_cloudwatchlogs::client::Client; - fn build(config: &aws_types::SdkConfig) -> Self::Client { + fn build(&self, config: &aws_types::SdkConfig) -> Self::Client { aws_sdk_cloudwatchlogs::client::Client::new(config) } - - fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { - CloudwatchLogsClientBuilder::build(config) - } } #[configurable_component] @@ -173,13 +169,13 @@ pub struct CloudwatchLogsSinkConfig { impl CloudwatchLogsSinkConfig { pub async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result { create_client::( + &CloudwatchLogsClientBuilder {}, &self.auth, self.region.region(), self.region.endpoint(), proxy, &self.tls, &None, - false, ) .await } diff --git a/src/sinks/aws_cloudwatch_logs/integration_tests.rs b/src/sinks/aws_cloudwatch_logs/integration_tests.rs index 6fc2ad7faad7e..9eebc9dbbf5cc 100644 --- a/src/sinks/aws_cloudwatch_logs/integration_tests.rs +++ b/src/sinks/aws_cloudwatch_logs/integration_tests.rs @@ -468,7 +468,13 @@ async fn create_client_test() -> CloudwatchLogsClient { let proxy = ProxyConfig::default(); create_client::( - &auth, region, endpoint, &proxy, &None, &None, false, + &CloudwatchLogsClientBuilder {}, + &auth, + region, + endpoint, + &proxy, + &None, + &None, ) .await .unwrap() diff --git a/src/sinks/aws_cloudwatch_metrics/mod.rs b/src/sinks/aws_cloudwatch_metrics/mod.rs index 2e369d1728d8b..c7da85dddc3e8 100644 --- a/src/sinks/aws_cloudwatch_metrics/mod.rs +++ b/src/sinks/aws_cloudwatch_metrics/mod.rs @@ -119,13 +119,9 @@ struct CloudwatchMetricsClientBuilder; impl ClientBuilder for CloudwatchMetricsClientBuilder { type Client = aws_sdk_cloudwatch::client::Client; - fn build(config: &aws_types::SdkConfig) -> Self::Client { + fn build(&self, config: &aws_types::SdkConfig) -> Self::Client { aws_sdk_cloudwatch::client::Client::new(config) } - - fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { - CloudwatchMetricsClientBuilder::build(config) - } } #[async_trait::async_trait] @@ -176,13 +172,13 @@ impl CloudWatchMetricsSinkConfig { }; create_client::( + &CloudwatchMetricsClientBuilder {}, &self.auth, region, self.region.endpoint(), proxy, &self.tls, &None, - false, ) .await } diff --git a/src/sinks/aws_kinesis/firehose/config.rs b/src/sinks/aws_kinesis/firehose/config.rs index e796d91dd68ee..a785e333d0bd5 100644 --- a/src/sinks/aws_kinesis/firehose/config.rs +++ b/src/sinks/aws_kinesis/firehose/config.rs @@ -38,13 +38,9 @@ pub struct KinesisFirehoseClientBuilder; impl ClientBuilder for KinesisFirehoseClientBuilder { type Client = KinesisClient; - fn build(config: &aws_types::SdkConfig) -> Self::Client { + fn build(&self, config: &aws_types::SdkConfig) -> Self::Client { Self::Client::new(config) } - - fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { - KinesisFirehoseClientBuilder::build(config) - } } // AWS Kinesis Firehose API accepts payloads up to 4MB or 500 events @@ -106,13 +102,13 @@ impl KinesisFirehoseSinkConfig { pub async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result { create_client::( + &KinesisFirehoseClientBuilder {}, &self.base.auth, self.base.region.region(), self.base.region.endpoint(), proxy, &self.base.tls, &None, - false, ) .await } diff --git a/src/sinks/aws_kinesis/firehose/integration_tests.rs b/src/sinks/aws_kinesis/firehose/integration_tests.rs index 26ba4f3f5d6c2..69c4b66424bee 100644 --- a/src/sinks/aws_kinesis/firehose/integration_tests.rs +++ b/src/sinks/aws_kinesis/firehose/integration_tests.rs @@ -255,13 +255,13 @@ async fn firehose_client() -> aws_sdk_firehose::Client { let proxy = ProxyConfig::default(); create_client::( + &KinesisFirehoseClientBuilder {}, &auth, region_endpoint.region(), region_endpoint.endpoint(), &proxy, &None, &None, - false, ) .await .unwrap() diff --git a/src/sinks/aws_kinesis/streams/config.rs b/src/sinks/aws_kinesis/streams/config.rs index a95ca18a495d1..8da69d329ad95 100644 --- a/src/sinks/aws_kinesis/streams/config.rs +++ b/src/sinks/aws_kinesis/streams/config.rs @@ -37,13 +37,9 @@ pub struct KinesisClientBuilder; impl ClientBuilder for KinesisClientBuilder { type Client = KinesisClient; - fn build(config: &aws_types::SdkConfig) -> Self::Client { + fn build(&self, config: &aws_types::SdkConfig) -> Self::Client { KinesisClient::new(config) } - - fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { - KinesisClientBuilder::build(config) - } } pub const MAX_PAYLOAD_SIZE: usize = 5_000_000; @@ -103,13 +99,13 @@ impl KinesisStreamsSinkConfig { pub async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result { create_client::( + &KinesisClientBuilder {}, &self.base.auth, self.base.region.region(), self.base.region.endpoint(), proxy, &self.base.tls, &None, - false, ) .await } diff --git a/src/sinks/aws_kinesis/streams/integration_tests.rs b/src/sinks/aws_kinesis/streams/integration_tests.rs index fe967f3c29e06..673930b1c360f 100644 --- a/src/sinks/aws_kinesis/streams/integration_tests.rs +++ b/src/sinks/aws_kinesis/streams/integration_tests.rs @@ -177,13 +177,13 @@ async fn client() -> aws_sdk_kinesis::Client { let proxy = ProxyConfig::default(); let region = RegionOrEndpoint::with_both("us-east-1", kinesis_address()); create_client::( + &KinesisClientBuilder {}, &auth, region.region(), region.endpoint(), &proxy, &None, &None, - false, ) .await .unwrap() diff --git a/src/sinks/aws_s3/integration_tests.rs b/src/sinks/aws_s3/integration_tests.rs index c099b694681b2..61949bef3d287 100644 --- a/src/sinks/aws_s3/integration_tests.rs +++ b/src/sinks/aws_s3/integration_tests.rs @@ -490,14 +490,18 @@ async fn client() -> S3Client { let region = RegionOrEndpoint::with_both("us-east-1", s3_address()); let proxy = ProxyConfig::default(); let tls_options = None; + let force_path_style_value: bool = true; + create_client::( + &S3ClientBuilder { + force_path_style: Some(force_path_style_value), + }, &auth, region.region(), region.endpoint(), &proxy, &tls_options, &None, - true, ) .await .unwrap() diff --git a/src/sinks/aws_s_s/sns/config.rs b/src/sinks/aws_s_s/sns/config.rs index 447c11b5c0fc4..7dad0dae99059 100644 --- a/src/sinks/aws_s_s/sns/config.rs +++ b/src/sinks/aws_s_s/sns/config.rs @@ -48,13 +48,13 @@ impl GenerateConfig for SnsSinkConfig { impl SnsSinkConfig { pub(super) async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result { create_client::( + &SnsClientBuilder {}, &self.base_config.auth, self.region.region(), self.region.endpoint(), proxy, &self.base_config.tls, &None, - false, ) .await } @@ -109,13 +109,9 @@ pub(super) struct SnsClientBuilder; impl ClientBuilder for SnsClientBuilder { type Client = aws_sdk_sns::client::Client; - fn build(config: &aws_types::SdkConfig) -> Self::Client { + fn build(&self, config: &aws_types::SdkConfig) -> Self::Client { aws_sdk_sns::client::Client::new(config) } - - fn build_and_force_path_style(config: &aws_types::SdkConfig) -> Self::Client { - SnsClientBuilder::build(config) - } } pub(super) async fn healthcheck(client: SnsClient, topic_arn: String) -> crate::Result<()> { diff --git a/src/sinks/aws_s_s/sns/integration_tests.rs b/src/sinks/aws_s_s/sns/integration_tests.rs index 3e46615c9366c..be026eceee813 100644 --- a/src/sinks/aws_s_s/sns/integration_tests.rs +++ b/src/sinks/aws_s_s/sns/integration_tests.rs @@ -32,13 +32,13 @@ async fn create_sns_test_client() -> SnsClient { let endpoint = sns_address(); let proxy = ProxyConfig::default(); create_client::( + &SnsClientBuilder {}, &auth, Some(Region::new("us-east-1")), Some(endpoint), &proxy, &None, &None, - false, ) .await .unwrap() @@ -54,13 +54,13 @@ async fn create_sqs_test_client() -> SqsClient { let endpoint = sqs_address(); let proxy = ProxyConfig::default(); create_client::( + &SqsClientBuilder {}, &auth, Some(Region::new("us-east-1")), Some(endpoint), &proxy, &None, &None, - false, ) .await .unwrap() diff --git a/src/sinks/aws_s_s/sqs/config.rs b/src/sinks/aws_s_s/sqs/config.rs index 8b8afa84e2f8a..872844622a8fd 100644 --- a/src/sinks/aws_s_s/sqs/config.rs +++ b/src/sinks/aws_s_s/sqs/config.rs @@ -49,13 +49,13 @@ impl GenerateConfig for SqsSinkConfig { impl SqsSinkConfig { pub(super) async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result { create_client::( + &SqsClientBuilder {}, &self.base_config.auth, self.region.region(), self.region.endpoint(), proxy, &self.base_config.tls, &None, - false, ) .await } diff --git a/src/sinks/aws_s_s/sqs/integration_tests.rs b/src/sinks/aws_s_s/sqs/integration_tests.rs index 8046b2a7cd4f4..1334beba6e407 100644 --- a/src/sinks/aws_s_s/sqs/integration_tests.rs +++ b/src/sinks/aws_s_s/sqs/integration_tests.rs @@ -30,13 +30,13 @@ async fn create_test_client() -> SqsClient { let endpoint = sqs_address(); let proxy = ProxyConfig::default(); create_client::( + &SqsClientBuilder {}, &auth, Some(Region::new("us-east-1")), Some(endpoint), &proxy, &None, &None, - false, ) .await .unwrap() diff --git a/src/sinks/s3_common/config.rs b/src/sinks/s3_common/config.rs index adf52a4c5ce45..42c2f3b054eba 100644 --- a/src/sinks/s3_common/config.rs +++ b/src/sinks/s3_common/config.rs @@ -367,14 +367,18 @@ pub async fn create_service( ) -> crate::Result { let endpoint = region.endpoint(); let region = region.region(); + let force_path_style_value: bool = force_path_style.into(); + let client = create_client::( + &S3ClientBuilder { + force_path_style: Some(force_path_style_value), + }, auth, region.clone(), endpoint, proxy, tls_options, &None, - force_path_style, ) .await?; Ok(S3Service::new(client)) diff --git a/src/sources/aws_s3/mod.rs b/src/sources/aws_s3/mod.rs index 8a889142efdd9..369dce2d91843 100644 --- a/src/sources/aws_s3/mod.rs +++ b/src/sources/aws_s3/mod.rs @@ -230,15 +230,18 @@ impl AwsS3Config { ) -> crate::Result { let region = self.region.region(); let endpoint = self.region.endpoint(); + let force_path_style_value: bool = true; let s3_client = create_client::( + &S3ClientBuilder { + force_path_style: Some(force_path_style_value), + }, &self.auth, region.clone(), endpoint.clone(), proxy, &self.tls_options, &None, - true, ) .await?; @@ -249,13 +252,13 @@ impl AwsS3Config { match self.sqs { Some(ref sqs) => { let (sqs_client, region) = create_client_and_region::( + &SqsClientBuilder {}, &self.auth, region.clone(), endpoint, proxy, &sqs.tls_options, &sqs.timeout, - false, ) .await?; @@ -1018,14 +1021,17 @@ mod integration_tests { endpoint: Some(s3_address()), }; let proxy_config = ProxyConfig::default(); + let force_path_style_value: bool = true; create_client::( + &S3ClientBuilder { + force_path_style: Some(force_path_style_value), + }, &auth, region_endpoint.region(), region_endpoint.endpoint(), &proxy_config, &None, &None, - true, ) .await .unwrap() @@ -1039,13 +1045,13 @@ mod integration_tests { }; let proxy_config = ProxyConfig::default(); create_client::( + &SqsClientBuilder {}, &auth, region_endpoint.region(), region_endpoint.endpoint(), &proxy_config, &None, &None, - false, ) .await .unwrap() diff --git a/src/sources/aws_sqs/config.rs b/src/sources/aws_sqs/config.rs index a97260e3b2ed5..310603f703cb0 100644 --- a/src/sources/aws_sqs/config.rs +++ b/src/sources/aws_sqs/config.rs @@ -161,13 +161,13 @@ impl SourceConfig for AwsSqsConfig { impl AwsSqsConfig { async fn build_client(&self, cx: &SourceContext) -> crate::Result { create_client::( + &SqsClientBuilder {}, &self.auth, self.region.region(), self.region.endpoint(), &cx.proxy, &self.tls, &None, - false, ) .await } From a0ac4a0b30e83193f3eefa5569b354443a1f9977 Mon Sep 17 00:00:00 2001 From: Scott Miller Date: Thu, 19 Dec 2024 17:11:51 -0500 Subject: [PATCH 3/3] fix(aws_s3 sink): simplify S3ClientBuilder logic --- src/common/s3.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/common/s3.rs b/src/common/s3.rs index d8e610874f4bd..7c7a9810b1850 100644 --- a/src/common/s3.rs +++ b/src/common/s3.rs @@ -10,12 +10,8 @@ impl ClientBuilder for S3ClientBuilder { type Client = aws_sdk_s3::client::Client; fn build(&self, config: &aws_types::SdkConfig) -> Self::Client { - let mut builder = config::Builder::from(config); - - if let Some(true) = self.force_path_style { - builder = builder.force_path_style(true); - } - + let builder = + config::Builder::from(config).force_path_style(self.force_path_style.unwrap_or(true)); aws_sdk_s3::client::Client::from_conf(builder.build()) } }