-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(aws_cloudwatch_logs sink): add configurable log retention #18865
Changes from 2 commits
f649cf9
6409fbf
97d882b
3dd5aff
63eccf9
8853c58
54b9fc8
8371b1d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -2,6 +2,7 @@ use aws_sdk_cloudwatchlogs::Client as CloudwatchLogsClient; | |||||
use aws_smithy_types::retry::RetryConfig; | ||||||
use codecs::JsonSerializerConfig; | ||||||
use futures::FutureExt; | ||||||
use serde::{de, Deserialize, Deserializer}; | ||||||
use tower::ServiceBuilder; | ||||||
use vector_config::configurable_component; | ||||||
use vector_core::schema; | ||||||
|
@@ -48,6 +49,43 @@ impl ClientBuilder for CloudwatchLogsClientBuilder { | |||||
} | ||||||
} | ||||||
|
||||||
#[configurable_component] | ||||||
#[derive(Clone, Debug, Default)] | ||||||
/// Retention policy configuration for AWS Cloudwatch Log Group | ||||||
pub struct Retention { | ||||||
#[configurable(derived)] | ||||||
pub enabled: bool, | ||||||
|
||||||
#[configurable(derived)] | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
#[serde( | ||||||
default, | ||||||
deserialize_with = "retention_days", | ||||||
skip_serializing_if = "crate::serde::skip_serializing_if_default" | ||||||
)] | ||||||
pub days: i32, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Unfortunately, the docs aren't able to work with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||||||
} | ||||||
|
||||||
fn retention_days<'de, D>(deserializer: D) -> Result<i32, D::Error> | ||||||
where | ||||||
D: Deserializer<'de>, | ||||||
{ | ||||||
let days: i32 = Deserialize::deserialize(deserializer)?; | ||||||
const ALLOWED_VALUES: &[i32] = &[ | ||||||
1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1096, 1827, 2192, 2557, | ||||||
2922, 3288, 3653, | ||||||
]; | ||||||
if ALLOWED_VALUES.contains(&days) { | ||||||
Ok(days) | ||||||
} else { | ||||||
let msg = format!("one of allowed values: {:?}", ALLOWED_VALUES).to_owned(); | ||||||
let expected: &str = &msg[..]; | ||||||
Err(de::Error::invalid_value( | ||||||
de::Unexpected::Signed(days.into()), | ||||||
&expected, | ||||||
)) | ||||||
} | ||||||
} | ||||||
|
||||||
/// Configuration for the `aws_cloudwatch_logs` sink. | ||||||
#[configurable_component(sink( | ||||||
"aws_cloudwatch_logs", | ||||||
|
@@ -96,6 +134,9 @@ pub struct CloudwatchLogsSinkConfig { | |||||
#[serde(default = "crate::serde::default_true")] | ||||||
pub create_missing_stream: bool, | ||||||
|
||||||
#[configurable(derived)] | ||||||
pub retention: Retention, | ||||||
|
||||||
#[configurable(derived)] | ||||||
pub encoding: EncodingConfig, | ||||||
|
||||||
|
@@ -227,6 +268,7 @@ fn default_config(encoding: EncodingConfig) -> CloudwatchLogsSinkConfig { | |||||
region: Default::default(), | ||||||
create_missing_group: true, | ||||||
create_missing_stream: true, | ||||||
retention: Default::default(), | ||||||
compression: Default::default(), | ||||||
batch: Default::default(), | ||||||
request: Default::default(), | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,9 +7,11 @@ use std::{ | |
use aws_sdk_cloudwatchlogs::error::{ | ||
CreateLogGroupError, CreateLogGroupErrorKind, CreateLogStreamError, CreateLogStreamErrorKind, | ||
DescribeLogStreamsError, DescribeLogStreamsErrorKind, PutLogEventsError, | ||
PutRetentionPolicyError, | ||
}; | ||
use aws_sdk_cloudwatchlogs::operation::PutLogEvents; | ||
|
||
use crate::sinks::aws_cloudwatch_logs::config::Retention; | ||
use aws_sdk_cloudwatchlogs::model::InputLogEvent; | ||
use aws_sdk_cloudwatchlogs::output::{DescribeLogStreamsOutput, PutLogEventsOutput}; | ||
use aws_sdk_cloudwatchlogs::types::SdkError; | ||
|
@@ -27,6 +29,7 @@ pub struct CloudwatchFuture { | |
state: State, | ||
create_missing_group: bool, | ||
create_missing_stream: bool, | ||
retention_enabled: bool, | ||
events: Vec<Vec<InputLogEvent>>, | ||
token_tx: Option<oneshot::Sender<Option<String>>>, | ||
} | ||
|
@@ -41,6 +44,7 @@ struct Client { | |
stream_name: String, | ||
group_name: String, | ||
headers: IndexMap<String, String>, | ||
retention_days: i32, | ||
} | ||
|
||
type ClientResult<T, E> = BoxFuture<'static, Result<T, SdkError<E>>>; | ||
|
@@ -50,6 +54,7 @@ enum State { | |
CreateStream(ClientResult<(), CreateLogStreamError>), | ||
DescribeStream(ClientResult<DescribeLogStreamsOutput, DescribeLogStreamsError>), | ||
Put(ClientResult<PutLogEventsOutput, PutLogEventsError>), | ||
PutRetentionPolicy(ClientResult<(), PutRetentionPolicyError>), | ||
} | ||
|
||
impl CloudwatchFuture { | ||
|
@@ -63,16 +68,19 @@ impl CloudwatchFuture { | |
group_name: String, | ||
create_missing_group: bool, | ||
create_missing_stream: bool, | ||
retention: Retention, | ||
mut events: Vec<Vec<InputLogEvent>>, | ||
token: Option<String>, | ||
token_tx: oneshot::Sender<Option<String>>, | ||
) -> Self { | ||
let retention_days = retention.days; | ||
let client = Client { | ||
client, | ||
smithy_client, | ||
stream_name, | ||
group_name, | ||
headers, | ||
retention_days, | ||
}; | ||
|
||
let state = if let Some(token) = token { | ||
|
@@ -81,13 +89,16 @@ impl CloudwatchFuture { | |
State::DescribeStream(client.describe_stream()) | ||
}; | ||
|
||
let retention_enabled = retention.enabled; | ||
|
||
Self { | ||
client, | ||
events, | ||
state, | ||
token_tx: Some(token_tx), | ||
create_missing_group, | ||
create_missing_stream, | ||
retention_enabled, | ||
} | ||
} | ||
} | ||
|
@@ -115,14 +126,19 @@ impl Future for CloudwatchFuture { | |
} | ||
} | ||
} | ||
return Poll::Ready(Err(CloudwatchError::Describe(err))); | ||
return Poll::Ready(Err(CloudwatchError::DescribeLogStreams(err))); | ||
} | ||
}; | ||
|
||
let stream_name = &self.client.stream_name; | ||
|
||
if let Some(stream) = response | ||
.log_streams | ||
.ok_or(CloudwatchError::NoStreamsFound)? | ||
.into_iter() | ||
.filter(|log_stream| { | ||
log_stream.log_stream_name == Some(stream_name.clone()) | ||
}) | ||
.next() | ||
{ | ||
debug!(message = "Stream found.", stream = ?stream.log_stream_name); | ||
|
@@ -163,6 +179,11 @@ impl Future for CloudwatchFuture { | |
|
||
info!(message = "Group created.", name = %self.client.group_name); | ||
|
||
if self.retention_enabled { | ||
self.state = State::PutRetentionPolicy(self.client.put_retention_policy()); | ||
continue; | ||
} | ||
|
||
// self does not abide by `create_missing_stream` since a group | ||
// never has any streams and thus we need to create one if a group | ||
// is created no matter what. | ||
|
@@ -212,6 +233,19 @@ impl Future for CloudwatchFuture { | |
return Poll::Ready(Ok(())); | ||
} | ||
} | ||
|
||
State::PutRetentionPolicy(fut) => { | ||
match ready!(fut.poll_unpin(cx)) { | ||
Ok(__) => {} | ||
Err(error) => { | ||
return Poll::Ready(Err(CloudwatchError::PutRetentionPolicy(error))) | ||
} | ||
} | ||
|
||
info!(message = "Retention policy updated for stream.", name = %self.client.stream_name); | ||
|
||
self.state = State::CreateStream(self.client.create_log_stream()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs formatting.. Nearly there.. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @StephenWakely any other changes needed? |
||
} | ||
} | ||
} | ||
} | ||
|
@@ -268,7 +302,6 @@ impl Client { | |
Box::pin(async move { | ||
client | ||
.describe_log_streams() | ||
.limit(1) | ||
.log_group_name(group_name) | ||
.log_stream_name_prefix(stream_name) | ||
.send() | ||
|
@@ -303,4 +336,19 @@ impl Client { | |
Ok(()) | ||
}) | ||
} | ||
|
||
pub fn put_retention_policy(&self) -> ClientResult<(), PutRetentionPolicyError> { | ||
let client = self.client.clone(); | ||
let group_name = self.group_name.clone(); | ||
let retention_days = self.retention_days.clone(); | ||
Box::pin(async move { | ||
client | ||
.put_retention_policy() | ||
.log_group_name(group_name) | ||
.retention_in_days(retention_days) | ||
.send() | ||
.await?; | ||
Ok(()) | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.