Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(aws_cloudwatch_logs sink): add configurable log retention #18865

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/sinks/aws_cloudwatch_logs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use aws_sdk_cloudwatchlogs::Client as CloudwatchLogsClient;
use aws_smithy_types::retry::RetryConfig;
use codecs::JsonSerializerConfig;
use futures::FutureExt;
use serde::{de, Deserialize, Deserializer};
use tower::ServiceBuilder;
use vector_config::configurable_component;
use vector_core::schema;
Expand Down Expand Up @@ -48,6 +49,43 @@ impl ClientBuilder for CloudwatchLogsClientBuilder {
}
}

#[configurable_component]
#[derive(Clone, Debug, Default)]
/// Retention policy configuration for AWS Cloudwatch Log Group
pub struct Retention {
#[configurable(derived)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[configurable(derived)]
/// Whether or not to set a retention policy when creating a new Log Group.

pub enabled: bool,

#[configurable(derived)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[configurable(derived)]
/// If retention is enabled, the number of days to retain logs for.

#[serde(
default,
deserialize_with = "retention_days",
skip_serializing_if = "crate::serde::skip_serializing_if_default"
)]
pub days: i32,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub days: i32,
pub days: u32,

Unfortunately, the docs aren't able to work with i32. u32 should be fine here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

}

fn retention_days<'de, D>(deserializer: D) -> Result<i32, D::Error>
where
D: Deserializer<'de>,
{
let days: i32 = Deserialize::deserialize(deserializer)?;
const ALLOWED_VALUES: &[i32] = &[
1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1096, 1827, 2192, 2557,
2922, 3288, 3653,
];
if ALLOWED_VALUES.contains(&days) {
Ok(days)
} else {
let msg = format!("one of allowed values: {:?}", ALLOWED_VALUES).to_owned();
let expected: &str = &msg[..];
Err(de::Error::invalid_value(
de::Unexpected::Signed(days.into()),
&expected,
))
}
}

/// Configuration for the `aws_cloudwatch_logs` sink.
#[configurable_component(sink(
"aws_cloudwatch_logs",
Expand Down Expand Up @@ -96,6 +134,9 @@ pub struct CloudwatchLogsSinkConfig {
#[serde(default = "crate::serde::default_true")]
pub create_missing_stream: bool,

#[configurable(derived)]
pub retention: Retention,

#[configurable(derived)]
pub encoding: EncodingConfig,

Expand Down Expand Up @@ -227,6 +268,7 @@ fn default_config(encoding: EncodingConfig) -> CloudwatchLogsSinkConfig {
region: Default::default(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down
7 changes: 7 additions & 0 deletions src/sinks/aws_cloudwatch_logs/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ async fn cloudwatch_insert_log_event() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down Expand Up @@ -92,6 +93,7 @@ async fn cloudwatch_insert_log_events_sorted() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down Expand Up @@ -167,6 +169,7 @@ async fn cloudwatch_insert_out_of_range_timestamp() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down Expand Up @@ -243,6 +246,7 @@ async fn cloudwatch_dynamic_group_and_stream_creation() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down Expand Up @@ -298,6 +302,7 @@ async fn cloudwatch_insert_log_event_batched() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch,
request: Default::default(),
Expand Down Expand Up @@ -348,6 +353,7 @@ async fn cloudwatch_insert_log_event_partitioned() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down Expand Up @@ -440,6 +446,7 @@ async fn cloudwatch_healthcheck() {
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
Expand Down
52 changes: 50 additions & 2 deletions src/sinks/aws_cloudwatch_logs/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ use std::{
use aws_sdk_cloudwatchlogs::error::{
CreateLogGroupError, CreateLogGroupErrorKind, CreateLogStreamError, CreateLogStreamErrorKind,
DescribeLogStreamsError, DescribeLogStreamsErrorKind, PutLogEventsError,
PutRetentionPolicyError,
};
use aws_sdk_cloudwatchlogs::operation::PutLogEvents;

use crate::sinks::aws_cloudwatch_logs::config::Retention;
use aws_sdk_cloudwatchlogs::model::InputLogEvent;
use aws_sdk_cloudwatchlogs::output::{DescribeLogStreamsOutput, PutLogEventsOutput};
use aws_sdk_cloudwatchlogs::types::SdkError;
Expand All @@ -27,6 +29,7 @@ pub struct CloudwatchFuture {
state: State,
create_missing_group: bool,
create_missing_stream: bool,
retention_enabled: bool,
events: Vec<Vec<InputLogEvent>>,
token_tx: Option<oneshot::Sender<Option<String>>>,
}
Expand All @@ -41,6 +44,7 @@ struct Client {
stream_name: String,
group_name: String,
headers: IndexMap<String, String>,
retention_days: i32,
}

type ClientResult<T, E> = BoxFuture<'static, Result<T, SdkError<E>>>;
Expand All @@ -50,6 +54,7 @@ enum State {
CreateStream(ClientResult<(), CreateLogStreamError>),
DescribeStream(ClientResult<DescribeLogStreamsOutput, DescribeLogStreamsError>),
Put(ClientResult<PutLogEventsOutput, PutLogEventsError>),
PutRetentionPolicy(ClientResult<(), PutRetentionPolicyError>),
}

impl CloudwatchFuture {
Expand All @@ -63,16 +68,19 @@ impl CloudwatchFuture {
group_name: String,
create_missing_group: bool,
create_missing_stream: bool,
retention: Retention,
mut events: Vec<Vec<InputLogEvent>>,
token: Option<String>,
token_tx: oneshot::Sender<Option<String>>,
) -> Self {
let retention_days = retention.days;
let client = Client {
client,
smithy_client,
stream_name,
group_name,
headers,
retention_days,
};

let state = if let Some(token) = token {
Expand All @@ -81,13 +89,16 @@ impl CloudwatchFuture {
State::DescribeStream(client.describe_stream())
};

let retention_enabled = retention.enabled;

Self {
client,
events,
state,
token_tx: Some(token_tx),
create_missing_group,
create_missing_stream,
retention_enabled,
}
}
}
Expand Down Expand Up @@ -115,14 +126,19 @@ impl Future for CloudwatchFuture {
}
}
}
return Poll::Ready(Err(CloudwatchError::Describe(err)));
return Poll::Ready(Err(CloudwatchError::DescribeLogStreams(err)));
}
};

let stream_name = &self.client.stream_name;

if let Some(stream) = response
.log_streams
.ok_or(CloudwatchError::NoStreamsFound)?
.into_iter()
.filter(|log_stream| {
log_stream.log_stream_name == Some(stream_name.clone())
})
.next()
{
debug!(message = "Stream found.", stream = ?stream.log_stream_name);
Expand Down Expand Up @@ -163,6 +179,11 @@ impl Future for CloudwatchFuture {

info!(message = "Group created.", name = %self.client.group_name);

if self.retention_enabled {
self.state = State::PutRetentionPolicy(self.client.put_retention_policy());
continue;
}

// self does not abide by `create_missing_stream` since a group
// never has any streams and thus we need to create one if a group
// is created no matter what.
Expand Down Expand Up @@ -212,6 +233,19 @@ impl Future for CloudwatchFuture {
return Poll::Ready(Ok(()));
}
}

State::PutRetentionPolicy(fut) => {
match ready!(fut.poll_unpin(cx)) {
Ok(__) => {}
Err(error) => {
return Poll::Ready(Err(CloudwatchError::PutRetentionPolicy(error)))
}
}

info!(message = "Retention policy updated for stream.", name = %self.client.stream_name);

self.state = State::CreateStream(self.client.create_log_stream());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs formatting.. Nearly there..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@StephenWakely any other changes needed?

}
}
}
}
Expand Down Expand Up @@ -268,7 +302,6 @@ impl Client {
Box::pin(async move {
client
.describe_log_streams()
.limit(1)
.log_group_name(group_name)
.log_stream_name_prefix(stream_name)
.send()
Expand Down Expand Up @@ -303,4 +336,19 @@ impl Client {
Ok(())
})
}

pub fn put_retention_policy(&self) -> ClientResult<(), PutRetentionPolicyError> {
let client = self.client.clone();
let group_name = self.group_name.clone();
let retention_days = self.retention_days.clone();
Box::pin(async move {
client
.put_retention_policy()
.log_group_name(group_name)
.retention_in_days(retention_days)
.send()
.await?;
Ok(())
})
}
}
2 changes: 1 addition & 1 deletion src/sinks/aws_cloudwatch_logs/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl<T: Send + Sync + 'static> RetryLogic for CloudwatchRetryLogic<T> {
}
is_retriable_error(err)
}
CloudwatchError::Describe(err) => {
CloudwatchError::DescribeLogStreams(err) => {
if let SdkError::ServiceError(inner) = err {
let err = inner.err();
if let DescribeLogStreamsErrorKind::ServiceUnavailableException(_) = err.kind {
Expand Down
20 changes: 16 additions & 4 deletions src/sinks/aws_cloudwatch_logs/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{

use aws_sdk_cloudwatchlogs::error::{
CreateLogGroupError, CreateLogStreamError, DescribeLogStreamsError, PutLogEventsError,
PutRetentionPolicyError,
};
use aws_sdk_cloudwatchlogs::model::InputLogEvent;
use aws_sdk_cloudwatchlogs::types::SdkError;
Expand All @@ -30,7 +31,7 @@ use vector_core::stream::DriverResponse;

use crate::sinks::{
aws_cloudwatch_logs::{
config::CloudwatchLogsSinkConfig, request, retry::CloudwatchRetryLogic,
config::CloudwatchLogsSinkConfig, config::Retention, request, retry::CloudwatchRetryLogic,
sink::BatchCloudwatchRequest, CloudwatchKey,
},
util::{retries::FixedRetryPolicy, EncodedLength, TowerRequestConfig, TowerRequestSettings},
Expand Down Expand Up @@ -58,24 +59,30 @@ pub type SmithyClient = std::sync::Arc<
#[derive(Debug)]
pub enum CloudwatchError {
Put(SdkError<PutLogEventsError>),
Describe(SdkError<DescribeLogStreamsError>),
DescribeLogStreams(SdkError<DescribeLogStreamsError>),
CreateStream(SdkError<CreateLogStreamError>),
CreateGroup(SdkError<CreateLogGroupError>),
PutRetentionPolicy(SdkError<PutRetentionPolicyError>),
NoStreamsFound,
}

impl fmt::Display for CloudwatchError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
CloudwatchError::Put(error) => write!(f, "CloudwatchError::Put: {}", error),
CloudwatchError::Describe(error) => write!(f, "CloudwatchError::Describe: {}", error),
CloudwatchError::DescribeLogStreams(error) => {
write!(f, "CloudwatchError::DescribeLogStreams: {}", error)
}
CloudwatchError::CreateStream(error) => {
write!(f, "CloudwatchError::CreateStream: {}", error)
}
CloudwatchError::CreateGroup(error) => {
write!(f, "CloudwatchError::CreateGroup: {}", error)
}
CloudwatchError::NoStreamsFound => write!(f, "CloudwatchError: No Streams Found"),
CloudwatchError::PutRetentionPolicy(error) => {
write!(f, "CloudwatchError::PutRetentionPolicy: {}", error)
}
}
}
}
Expand All @@ -90,7 +97,7 @@ impl From<SdkError<PutLogEventsError>> for CloudwatchError {

impl From<SdkError<DescribeLogStreamsError>> for CloudwatchError {
fn from(error: SdkError<DescribeLogStreamsError>) -> Self {
CloudwatchError::Describe(error)
CloudwatchError::DescribeLogStreams(error)
}
}

Expand Down Expand Up @@ -216,6 +223,8 @@ impl CloudwatchLogsSvc {
let create_missing_group = config.create_missing_group;
let create_missing_stream = config.create_missing_stream;

let retention = config.retention.clone();

CloudwatchLogsSvc {
headers: config.request.headers,
client,
Expand All @@ -224,6 +233,7 @@ impl CloudwatchLogsSvc {
group_name,
create_missing_group,
create_missing_stream,
retention,
token: None,
token_rx: None,
}
Expand Down Expand Up @@ -305,6 +315,7 @@ impl Service<Vec<InputLogEvent>> for CloudwatchLogsSvc {
self.group_name.clone(),
self.create_missing_group,
self.create_missing_stream,
self.retention.clone(),
event_batches,
self.token.take(),
tx,
Expand All @@ -323,6 +334,7 @@ pub struct CloudwatchLogsSvc {
group_name: String,
create_missing_group: bool,
create_missing_stream: bool,
retention: Retention,
token: Option<String>,
token_rx: Option<oneshot::Receiver<Option<String>>>,
}
Expand Down