diff --git a/Cargo.lock b/Cargo.lock index 2ca453ed5bdbf..5e37025ca02aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -969,6 +969,28 @@ dependencies = [ "tracing 0.1.41", ] +[[package]] +name = "aws-sdk-kms" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "374513bec90ddc64288c1f1e5cb4b95e6df6e42a47d9e332a713f97f66cf9043" +dependencies = [ + "aws-credential-types", + "aws-http", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes 1.9.0", + "http 0.2.9", + "regex", + "tracing 0.1.41", +] + [[package]] name = "aws-sdk-s3" version = "1.4.0" @@ -10992,6 +11014,7 @@ dependencies = [ "aws-sdk-elasticsearch", "aws-sdk-firehose", "aws-sdk-kinesis", + "aws-sdk-kms", "aws-sdk-s3", "aws-sdk-secretsmanager", "aws-sdk-sns", diff --git a/Cargo.toml b/Cargo.toml index 27c4f6eb9e38f..87e354bb34818 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -220,6 +220,7 @@ aws-sdk-sqs = { version = "1.3.0", default-features = false, features = ["behavi aws-sdk-sns = { version = "1.6.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true } aws-sdk-cloudwatch = { version = "1.3.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true } aws-sdk-cloudwatchlogs = { version = "1.3.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true } +aws-sdk-kms = { version = "1.3.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true } aws-sdk-elasticsearch = { version = "1.3.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true } aws-sdk-firehose = { version = "1.3.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true } aws-sdk-kinesis = { version = "1.3.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true } @@ -763,7 +764,7 @@ sinks-metrics = [ sinks-amqp = ["lapin"] sinks-appsignal = [] -sinks-aws_cloudwatch_logs = ["aws-core", "dep:aws-sdk-cloudwatchlogs"] +sinks-aws_cloudwatch_logs = ["aws-core", "dep:aws-sdk-cloudwatchlogs", "dep:aws-sdk-kms"] sinks-aws_cloudwatch_metrics = ["aws-core", "dep:aws-sdk-cloudwatch"] sinks-aws_kinesis_firehose = ["aws-core", "dep:aws-sdk-firehose"] sinks-aws_kinesis_streams = ["aws-core", "dep:aws-sdk-kinesis"] diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 08fa4e1b450b2..7c4bd3eb70c8e 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -55,6 +55,7 @@ aws-sdk-cloudwatch,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust aws-sdk-cloudwatchlogs,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team , Russell Cohen " aws-sdk-firehose,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team , Russell Cohen " aws-sdk-kinesis,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team , Russell Cohen " +aws-sdk-kms,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team , Russell Cohen " aws-sdk-s3,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team , Russell Cohen " aws-sdk-secretsmanager,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team , Russell Cohen " aws-sdk-sns,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team , Russell Cohen " diff --git a/changelog.d/11185.feature.md b/changelog.d/11185.feature.md new file mode 100644 index 0000000000000..79c4d89e5e3e7 --- /dev/null +++ b/changelog.d/11185.feature.md @@ -0,0 +1,3 @@ +Allows users to specify a KMS key and tags for newly created AWS CloudWatch log groups. + +authors: johannesfloriangeiger diff --git a/scripts/integration/aws/compose.yaml b/scripts/integration/aws/compose.yaml index c45cbe7f2e37b..c7337dd4bcf7f 100644 --- a/scripts/integration/aws/compose.yaml +++ b/scripts/integration/aws/compose.yaml @@ -6,7 +6,7 @@ services: mock-localstack: image: docker.io/localstack/localstack:3 environment: - - SERVICES=kinesis,s3,cloudwatch,es,firehose,sqs,sns,logs + - SERVICES=kinesis,s3,cloudwatch,es,firehose,kms,sqs,sns,logs mock-ecs: image: docker.io/amazon/amazon-ecs-local-container-endpoints:latest volumes: diff --git a/scripts/integration/aws/test.yaml b/scripts/integration/aws/test.yaml index 71ff65cae11f8..ba1d901dfc217 100644 --- a/scripts/integration/aws/test.yaml +++ b/scripts/integration/aws/test.yaml @@ -11,6 +11,7 @@ env: ECS_ADDRESS: http://mock-ecs ELASTICSEARCH_ADDRESS: http://mock-localstack:4566 KINESIS_ADDRESS: http://mock-localstack:4566 + KMS_ADDRESS: http://mock-localstack:4566 S3_ADDRESS: http://mock-localstack:4566 SQS_ADDRESS: http://mock-localstack:4566 SNS_ADDRESS: http://mock-localstack:4566 diff --git a/src/sinks/aws_cloudwatch_logs/config.rs b/src/sinks/aws_cloudwatch_logs/config.rs index d4a1dd4f39f1f..db7e1d5d4da7a 100644 --- a/src/sinks/aws_cloudwatch_logs/config.rs +++ b/src/sinks/aws_cloudwatch_logs/config.rs @@ -1,6 +1,7 @@ use aws_sdk_cloudwatchlogs::Client as CloudwatchLogsClient; use futures::FutureExt; use serde::{de, Deserialize, Deserializer}; +use std::collections::HashMap; use tower::ServiceBuilder; use vector_lib::codecs::JsonSerializerConfig; use vector_lib::configurable::configurable_component; @@ -164,6 +165,24 @@ pub struct CloudwatchLogsSinkConfig { skip_serializing_if = "crate::serde::is_default" )] pub acknowledgements: AcknowledgementsConfig, + + /// The [ARN][arn] (Amazon Resource Name) of the [KMS key][kms_key] to use when encrypting log data. + /// + /// [arn]: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference-arns.html + /// [kms_key]: https://docs.aws.amazon.com/kms/latest/developerguide/overview.html + #[configurable(derived)] + #[serde(default)] + pub kms_key: Option, + + /// The Key-value pairs to be applied as [tags][tags] to the log group and stream. + /// + /// [tags]: https://docs.aws.amazon.com/whitepapers/latest/tagging-best-practices/what-are-tags.html + #[configurable(derived)] + #[serde(default)] + #[configurable(metadata( + docs::additional_props_description = "A tag represented as a key-value pair" + ))] + pub tags: Option>, } impl CloudwatchLogsSinkConfig { @@ -248,6 +267,8 @@ fn default_config(encoding: EncodingConfig) -> CloudwatchLogsSinkConfig { assume_role: Default::default(), auth: Default::default(), acknowledgements: Default::default(), + kms_key: Default::default(), + tags: Default::default(), } } diff --git a/src/sinks/aws_cloudwatch_logs/integration_tests.rs b/src/sinks/aws_cloudwatch_logs/integration_tests.rs index a2c2ff4c8cb54..437408e51f3ea 100644 --- a/src/sinks/aws_cloudwatch_logs/integration_tests.rs +++ b/src/sinks/aws_cloudwatch_logs/integration_tests.rs @@ -1,7 +1,9 @@ +use std::collections::HashMap; use std::convert::TryFrom; use aws_config::Region; use aws_sdk_cloudwatchlogs::Client as CloudwatchLogsClient; +use aws_sdk_kms::Client as KMSClient; use chrono::Duration; use futures::{stream, StreamExt}; use similar_asserts::assert_eq; @@ -9,7 +11,7 @@ use vector_lib::codecs::TextSerializerConfig; use vector_lib::lookup; use super::*; -use crate::aws::create_client; +use crate::aws::{create_client, ClientBuilder}; use crate::aws::{AwsAuthentication, RegionOrEndpoint}; use crate::sinks::aws_cloudwatch_logs::config::CloudwatchLogsClientBuilder; use crate::{ @@ -29,6 +31,20 @@ fn cloudwatch_address() -> String { std::env::var("CLOUDWATCH_ADDRESS").unwrap_or_else(|_| "http://localhost:4566".into()) } +fn kms_address() -> String { + std::env::var("KMS_ADDRESS").unwrap_or_else(|_| "http://localhost:4566".into()) +} + +struct KMSClientBuilder; + +impl ClientBuilder for KMSClientBuilder { + type Client = aws_sdk_kms::client::Client; + + fn build(&self, config: &aws_types::SdkConfig) -> Self::Client { + aws_sdk_kms::client::Client::new(config) + } +} + #[tokio::test] async fn cloudwatch_insert_log_event() { trace_init(); @@ -51,6 +67,8 @@ async fn cloudwatch_insert_log_event() { assume_role: None, auth: Default::default(), acknowledgements: Default::default(), + kms_key: None, + tags: None, }; let (sink, _) = config.build(SinkContext::default()).await.unwrap(); @@ -102,6 +120,8 @@ async fn cloudwatch_insert_log_events_sorted() { assume_role: None, auth: Default::default(), acknowledgements: Default::default(), + kms_key: None, + tags: None, }; let (sink, _) = config.build(SinkContext::default()).await.unwrap(); @@ -178,6 +198,8 @@ async fn cloudwatch_insert_out_of_range_timestamp() { assume_role: None, auth: Default::default(), acknowledgements: Default::default(), + kms_key: None, + tags: None, }; let (sink, _) = config.build(SinkContext::default()).await.unwrap(); @@ -255,6 +277,8 @@ async fn cloudwatch_dynamic_group_and_stream_creation() { assume_role: None, auth: Default::default(), acknowledgements: Default::default(), + kms_key: None, + tags: None, }; let (sink, _) = config.build(SinkContext::default()).await.unwrap(); @@ -284,6 +308,90 @@ async fn cloudwatch_dynamic_group_and_stream_creation() { assert_eq!(output_lines.sort(), input_lines.sort()); } +#[tokio::test] +async fn cloudwatch_dynamic_group_and_stream_creation_with_kms_key_and_tags() { + trace_init(); + + let stream_name = gen_name(); + let group_name = gen_name(); + + let config = CloudwatchLogsSinkConfig { + stream_name: Template::try_from(stream_name.as_str()).unwrap(), + group_name: Template::try_from(group_name.as_str()).unwrap(), + region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()), + encoding: TextSerializerConfig::default().into(), + create_missing_group: true, + create_missing_stream: true, + retention: Default::default(), + compression: Default::default(), + batch: Default::default(), + request: Default::default(), + tls: Default::default(), + assume_role: None, + auth: Default::default(), + acknowledgements: Default::default(), + kms_key: Some( + create_kms_client_test() + .await + .create_key() + .send() + .await + .unwrap() + .key_metadata() + .unwrap() + .key_id() + .parse() + .unwrap(), + ), + tags: Some(HashMap::from_iter(vec![( + "key".to_string(), + "value".to_string(), + )])), + }; + + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); + + let timestamp = chrono::Utc::now(); + + let (mut input_lines, events) = random_lines_with_stream(100, 11, None); + run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await; + + let response = create_client_test() + .await + .get_log_events() + .log_stream_name(stream_name) + .log_group_name(group_name.clone()) + .start_time(timestamp.timestamp_millis()) + .send() + .await + .unwrap(); + + let events = response.events.unwrap(); + + let mut output_lines = events + .into_iter() + .map(|e| e.message.unwrap()) + .collect::>(); + + assert_eq!(output_lines.sort(), input_lines.sort()); + + let log_group = create_client_test() + .await + .describe_log_groups() + .log_group_name_pattern(group_name.clone()) + .limit(1) + .send() + .await + .unwrap() + .log_groups() + .first() + .unwrap() + .clone(); + + let kms_key = log_group.kms_key_id().unwrap(); + assert_eq!(kms_key, config.kms_key.unwrap()); +} + #[tokio::test] async fn cloudwatch_insert_log_event_batched() { trace_init(); @@ -311,6 +419,8 @@ async fn cloudwatch_insert_log_event_batched() { assume_role: None, auth: Default::default(), acknowledgements: Default::default(), + kms_key: None, + tags: None, }; let (sink, _) = config.build(SinkContext::default()).await.unwrap(); @@ -362,6 +472,8 @@ async fn cloudwatch_insert_log_event_partitioned() { assume_role: None, auth: Default::default(), acknowledgements: Default::default(), + kms_key: None, + tags: None, }; let (sink, _) = config.build(SinkContext::default()).await.unwrap(); @@ -455,6 +567,8 @@ async fn cloudwatch_healthcheck() { assume_role: None, auth: Default::default(), acknowledgements: Default::default(), + kms_key: None, + tags: None, }; let client = config.create_client(&ProxyConfig::default()).await.unwrap(); @@ -480,6 +594,25 @@ async fn create_client_test() -> CloudwatchLogsClient { .unwrap() } +async fn create_kms_client_test() -> KMSClient { + let auth = AwsAuthentication::test_auth(); + let region = Some(Region::new("us-east-1")); + let endpoint = Some(kms_address()); + let proxy = ProxyConfig::default(); + + create_client::( + &KMSClientBuilder {}, + &auth, + region, + endpoint, + &proxy, + None, + None, + ) + .await + .unwrap() +} + async fn ensure_group() { let client = create_client_test().await; _ = client diff --git a/src/sinks/aws_cloudwatch_logs/request.rs b/src/sinks/aws_cloudwatch_logs/request.rs index 60d9e88a6bb05..e052c05d38578 100644 --- a/src/sinks/aws_cloudwatch_logs/request.rs +++ b/src/sinks/aws_cloudwatch_logs/request.rs @@ -1,9 +1,3 @@ -use std::{ - future::Future, - pin::Pin, - task::{ready, Context, Poll}, -}; - use aws_sdk_cloudwatchlogs::{ operation::{ create_log_group::CreateLogGroupError, @@ -19,6 +13,12 @@ use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkErro use futures::{future::BoxFuture, FutureExt}; use http::{header::HeaderName, HeaderValue}; use indexmap::IndexMap; +use std::collections::HashMap; +use std::{ + future::Future, + pin::Pin, + task::{ready, Context, Poll}, +}; use tokio::sync::oneshot; use crate::sinks::aws_cloudwatch_logs::config::Retention; @@ -40,6 +40,8 @@ struct Client { group_name: String, headers: IndexMap, retention_days: u32, + kms_key: Option, + tags: Option>, } type ClientResult = BoxFuture<'static, Result>>; @@ -63,6 +65,8 @@ impl CloudwatchFuture { create_missing_group: bool, create_missing_stream: bool, retention: Retention, + kms_key: Option, + tags: Option>, mut events: Vec>, token: Option, token_tx: oneshot::Sender>, @@ -74,6 +78,8 @@ impl CloudwatchFuture { group_name, headers, retention_days, + kms_key, + tags, }; let state = if let Some(token) = token { @@ -288,10 +294,14 @@ impl Client { pub fn create_log_group(&self) -> ClientResult<(), CreateLogGroupError> { let client = self.client.clone(); let group_name = self.group_name.clone(); + let kms_key = self.kms_key.clone(); + let tags = self.tags.clone(); Box::pin(async move { client .create_log_group() .log_group_name(group_name) + .set_kms_key_id(kms_key) + .set_tags(tags) .send() .await?; Ok(()) diff --git a/src/sinks/aws_cloudwatch_logs/service.rs b/src/sinks/aws_cloudwatch_logs/service.rs index ab8ea09daf551..f914029dbc35f 100644 --- a/src/sinks/aws_cloudwatch_logs/service.rs +++ b/src/sinks/aws_cloudwatch_logs/service.rs @@ -240,6 +240,9 @@ impl CloudwatchLogsSvc { let retention = config.retention.clone(); + let kms_key = config.kms_key.clone(); + let tags = config.tags.clone(); + CloudwatchLogsSvc { headers, client, @@ -248,6 +251,8 @@ impl CloudwatchLogsSvc { create_missing_group, create_missing_stream, retention, + kms_key, + tags, token: None, token_rx: None, } @@ -322,6 +327,8 @@ impl Service> for CloudwatchLogsSvc { self.create_missing_group, self.create_missing_stream, self.retention.clone(), + self.kms_key.clone(), + self.tags.clone(), event_batches, self.token.take(), tx, @@ -340,6 +347,8 @@ pub struct CloudwatchLogsSvc { create_missing_group: bool, create_missing_stream: bool, retention: Retention, + kms_key: Option, + tags: Option>, token: Option, token_rx: Option>>, } diff --git a/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue b/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue index be85c3848eef6..80a44689b243f 100644 --- a/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue +++ b/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue @@ -592,6 +592,16 @@ base: components: sinks: aws_cloudwatch_logs: configuration: { syntax: "template" } } + kms_key: { + description: """ + The [ARN][arn] (Amazon Resource Name) of the [KMS key][kms_key] to use when encrypting log data. + + [arn]: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference-arns.html + [kms_key]: https://docs.aws.amazon.com/kms/latest/developerguide/overview.html + """ + required: false + type: string: {} + } region: { description: """ The [AWS region][aws_region] of the target service. @@ -828,6 +838,19 @@ base: components: sinks: aws_cloudwatch_logs: configuration: { syntax: "template" } } + tags: { + description: """ + The Key-value pairs to be applied as [tags][tags] to the log group and stream. + + [tags]: https://docs.aws.amazon.com/whitepapers/latest/tagging-best-practices/what-are-tags.html + """ + required: false + type: object: options: "*": { + description: "A tag represented as a key-value pair" + required: true + type: string: {} + } + } tls: { description: "TLS configuration." required: false