Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat (sink: aws_cloudwatch_logs) #11185: Allow specifying a KMS key and tags for newly created AWS CloudWatch log groups. #22274

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ aws-sdk-sqs = { version = "1.3.0", default-features = false, features = ["behavi
aws-sdk-sns = { version = "1.6.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true }
aws-sdk-cloudwatch = { version = "1.3.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true }
aws-sdk-cloudwatchlogs = { version = "1.3.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true }
aws-sdk-kms = { version = "1.3.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true }
aws-sdk-elasticsearch = { version = "1.3.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true }
aws-sdk-firehose = { version = "1.3.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true }
aws-sdk-kinesis = { version = "1.3.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true }
Expand Down Expand Up @@ -763,7 +764,7 @@ sinks-metrics = [

sinks-amqp = ["lapin"]
sinks-appsignal = []
sinks-aws_cloudwatch_logs = ["aws-core", "dep:aws-sdk-cloudwatchlogs"]
sinks-aws_cloudwatch_logs = ["aws-core", "dep:aws-sdk-cloudwatchlogs", "dep:aws-sdk-kms"]
sinks-aws_cloudwatch_metrics = ["aws-core", "dep:aws-sdk-cloudwatch"]
sinks-aws_kinesis_firehose = ["aws-core", "dep:aws-sdk-firehose"]
sinks-aws_kinesis_streams = ["aws-core", "dep:aws-sdk-kinesis"]
Expand Down
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ aws-sdk-cloudwatch,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust
aws-sdk-cloudwatchlogs,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
aws-sdk-firehose,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
aws-sdk-kinesis,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
aws-sdk-kms,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
aws-sdk-s3,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
aws-sdk-secretsmanager,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
aws-sdk-sns,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/11185.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Allows users to specify a KMS key and tags for newly created AWS CloudWatch log groups.

authors: johannesfloriangeiger
2 changes: 1 addition & 1 deletion scripts/integration/aws/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ services:
mock-localstack:
image: docker.io/localstack/localstack:3
environment:
- SERVICES=kinesis,s3,cloudwatch,es,firehose,sqs,sns,logs
- SERVICES=kinesis,s3,cloudwatch,es,firehose,kms,sqs,sns,logs
mock-ecs:
image: docker.io/amazon/amazon-ecs-local-container-endpoints:latest
volumes:
Expand Down
1 change: 1 addition & 0 deletions scripts/integration/aws/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ env:
ECS_ADDRESS: http://mock-ecs
ELASTICSEARCH_ADDRESS: http://mock-localstack:4566
KINESIS_ADDRESS: http://mock-localstack:4566
KMS_ADDRESS: http://mock-localstack:4566
S3_ADDRESS: http://mock-localstack:4566
SQS_ADDRESS: http://mock-localstack:4566
SNS_ADDRESS: http://mock-localstack:4566
Expand Down
21 changes: 21 additions & 0 deletions src/sinks/aws_cloudwatch_logs/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use aws_sdk_cloudwatchlogs::Client as CloudwatchLogsClient;
use futures::FutureExt;
use serde::{de, Deserialize, Deserializer};
use std::collections::HashMap;
use tower::ServiceBuilder;
use vector_lib::codecs::JsonSerializerConfig;
use vector_lib::configurable::configurable_component;
Expand Down Expand Up @@ -164,6 +165,24 @@ pub struct CloudwatchLogsSinkConfig {
skip_serializing_if = "crate::serde::is_default"
)]
pub acknowledgements: AcknowledgementsConfig,

/// The [ARN][arn] (Amazon Resource Name) of the [KMS key][kms_key] to use when encrypting log data.
///
/// [arn]: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference-arns.html
/// [kms_key]: https://docs.aws.amazon.com/kms/latest/developerguide/overview.html
#[configurable(derived)]
johannesfloriangeiger marked this conversation as resolved.
Show resolved Hide resolved
#[serde(default)]
pub kms_key: Option<String>,

/// The Key-value pairs to be applied as [tags][tags] to the log group and stream.
///
/// [tags]: https://docs.aws.amazon.com/whitepapers/latest/tagging-best-practices/what-are-tags.html
#[configurable(derived)]
#[serde(default)]
#[configurable(metadata(
docs::additional_props_description = "A tag represented as a key-value pair"
))]
pub tags: Option<HashMap<String, String>>,
}

impl CloudwatchLogsSinkConfig {
Expand Down Expand Up @@ -248,6 +267,8 @@ fn default_config(encoding: EncodingConfig) -> CloudwatchLogsSinkConfig {
assume_role: Default::default(),
auth: Default::default(),
acknowledgements: Default::default(),
kms_key: Default::default(),
tags: Default::default(),
}
}

Expand Down
135 changes: 134 additions & 1 deletion src/sinks/aws_cloudwatch_logs/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use std::collections::HashMap;
use std::convert::TryFrom;

use aws_config::Region;
use aws_sdk_cloudwatchlogs::Client as CloudwatchLogsClient;
use aws_sdk_kms::Client as KMSClient;
use chrono::Duration;
use futures::{stream, StreamExt};
use similar_asserts::assert_eq;
use vector_lib::codecs::TextSerializerConfig;
use vector_lib::lookup;

use super::*;
use crate::aws::create_client;
use crate::aws::{create_client, ClientBuilder};
use crate::aws::{AwsAuthentication, RegionOrEndpoint};
use crate::sinks::aws_cloudwatch_logs::config::CloudwatchLogsClientBuilder;
use crate::{
Expand All @@ -29,6 +31,20 @@ fn cloudwatch_address() -> String {
std::env::var("CLOUDWATCH_ADDRESS").unwrap_or_else(|_| "http://localhost:4566".into())
}

fn kms_address() -> String {
std::env::var("KMS_ADDRESS").unwrap_or_else(|_| "http://localhost:4566".into())
}

struct KMSClientBuilder;

impl ClientBuilder for KMSClientBuilder {
type Client = aws_sdk_kms::client::Client;

fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
aws_sdk_kms::client::Client::new(config)
}
}

#[tokio::test]
async fn cloudwatch_insert_log_event() {
trace_init();
Expand All @@ -51,6 +67,8 @@ async fn cloudwatch_insert_log_event() {
assume_role: None,
auth: Default::default(),
acknowledgements: Default::default(),
kms_key: None,
tags: None,
};

let (sink, _) = config.build(SinkContext::default()).await.unwrap();
Expand Down Expand Up @@ -102,6 +120,8 @@ async fn cloudwatch_insert_log_events_sorted() {
assume_role: None,
auth: Default::default(),
acknowledgements: Default::default(),
kms_key: None,
tags: None,
};

let (sink, _) = config.build(SinkContext::default()).await.unwrap();
Expand Down Expand Up @@ -178,6 +198,8 @@ async fn cloudwatch_insert_out_of_range_timestamp() {
assume_role: None,
auth: Default::default(),
acknowledgements: Default::default(),
kms_key: None,
tags: None,
};

let (sink, _) = config.build(SinkContext::default()).await.unwrap();
Expand Down Expand Up @@ -255,6 +277,8 @@ async fn cloudwatch_dynamic_group_and_stream_creation() {
assume_role: None,
auth: Default::default(),
acknowledgements: Default::default(),
kms_key: None,
tags: None,
};

let (sink, _) = config.build(SinkContext::default()).await.unwrap();
Expand Down Expand Up @@ -284,6 +308,90 @@ async fn cloudwatch_dynamic_group_and_stream_creation() {
assert_eq!(output_lines.sort(), input_lines.sort());
}

#[tokio::test]
async fn cloudwatch_dynamic_group_and_stream_creation_with_kms_key_and_tags() {
trace_init();

let stream_name = gen_name();
let group_name = gen_name();

let config = CloudwatchLogsSinkConfig {
stream_name: Template::try_from(stream_name.as_str()).unwrap(),
group_name: Template::try_from(group_name.as_str()).unwrap(),
region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()),
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
tls: Default::default(),
assume_role: None,
auth: Default::default(),
acknowledgements: Default::default(),
kms_key: Some(
create_kms_client_test()
.await
.create_key()
.send()
.await
.unwrap()
.key_metadata()
.unwrap()
.key_id()
.parse()
.unwrap(),
),
tags: Some(HashMap::from_iter(vec![(
"key".to_string(),
"value".to_string(),
)])),
};

let (sink, _) = config.build(SinkContext::default()).await.unwrap();

let timestamp = chrono::Utc::now();

let (mut input_lines, events) = random_lines_with_stream(100, 11, None);
run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await;

let response = create_client_test()
.await
.get_log_events()
.log_stream_name(stream_name)
.log_group_name(group_name.clone())
.start_time(timestamp.timestamp_millis())
.send()
.await
.unwrap();

let events = response.events.unwrap();

let mut output_lines = events
.into_iter()
.map(|e| e.message.unwrap())
.collect::<Vec<_>>();

assert_eq!(output_lines.sort(), input_lines.sort());

let log_group = create_client_test()
.await
.describe_log_groups()
.log_group_name_pattern(group_name.clone())
.limit(1)
.send()
.await
.unwrap()
.log_groups()
.first()
.unwrap()
.clone();

let kms_key = log_group.kms_key_id().unwrap();
assert_eq!(kms_key, config.kms_key.unwrap());
}

#[tokio::test]
async fn cloudwatch_insert_log_event_batched() {
trace_init();
Expand Down Expand Up @@ -311,6 +419,8 @@ async fn cloudwatch_insert_log_event_batched() {
assume_role: None,
auth: Default::default(),
acknowledgements: Default::default(),
kms_key: None,
tags: None,
};

let (sink, _) = config.build(SinkContext::default()).await.unwrap();
Expand Down Expand Up @@ -362,6 +472,8 @@ async fn cloudwatch_insert_log_event_partitioned() {
assume_role: None,
auth: Default::default(),
acknowledgements: Default::default(),
kms_key: None,
tags: None,
};

let (sink, _) = config.build(SinkContext::default()).await.unwrap();
Expand Down Expand Up @@ -455,6 +567,8 @@ async fn cloudwatch_healthcheck() {
assume_role: None,
auth: Default::default(),
acknowledgements: Default::default(),
kms_key: None,
tags: None,
};

let client = config.create_client(&ProxyConfig::default()).await.unwrap();
Expand All @@ -480,6 +594,25 @@ async fn create_client_test() -> CloudwatchLogsClient {
.unwrap()
}

async fn create_kms_client_test() -> KMSClient {
let auth = AwsAuthentication::test_auth();
let region = Some(Region::new("us-east-1"));
let endpoint = Some(kms_address());
let proxy = ProxyConfig::default();

create_client::<KMSClientBuilder>(
&KMSClientBuilder {},
&auth,
region,
endpoint,
&proxy,
None,
None,
)
.await
.unwrap()
}

async fn ensure_group() {
let client = create_client_test().await;
_ = client
Expand Down
Loading
Loading