Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sample transform): add sample_rate_key config option #21283

Merged
merged 6 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/configurable-sample-rate-key.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `sample` transform now has a `sample_rate_key` configuration option, which default to `sample_rate`, that allows configuring which key is used to attach the sample rate to sampled events. If set to an empty string, the sample rate will not be attached to sampled events.

authors: dekelpilli
12 changes: 11 additions & 1 deletion src/transforms/sample/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use vector_lib::config::{LegacyKey, LogNamespace};
use vector_lib::configurable::configurable_component;
use vrl::owned_value_path;
use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path};
use vrl::value::Kind;

use crate::{
Expand Down Expand Up @@ -44,6 +44,9 @@ pub struct SampleConfig {
/// sampled together, but that overall `1/N` transactions are sampled.
#[configurable(metadata(docs::examples = "message"))]
pub key_field: Option<String>,
#[serde(default = "default_sample_rate_key")]
#[configurable(metadata(docs::examples = "sample_rate"), description = "")]
pub sample_rate_key: OptionalValuePath,

/// The value to group events into separate buckets to be sampled independently.
///
Expand All @@ -66,6 +69,7 @@ impl GenerateConfig for SampleConfig {
key_field: None,
group_by: None,
exclude: None::<AnyCondition>,
sample_rate_key: default_sample_rate_key(),
})
.unwrap()
}
Expand All @@ -84,6 +88,7 @@ impl TransformConfig for SampleConfig {
.as_ref()
.map(|condition| condition.build(&context.enrichment_tables))
.transpose()?,
default_sample_rate_key(),
)))
}

Expand Down Expand Up @@ -118,6 +123,10 @@ impl TransformConfig for SampleConfig {
}
}

pub fn default_sample_rate_key() -> OptionalValuePath {
OptionalValuePath::from(owned_value_path!("sample_rate"))
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -141,6 +150,7 @@ mod tests {
key_field: None,
group_by: None,
exclude: None,
sample_rate_key: default_sample_rate_key(),
};
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
Expand Down
80 changes: 61 additions & 19 deletions src/transforms/sample/transform.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::HashMap;
use vector_lib::config::LegacyKey;
use vrl::event_path;

use crate::{
conditions::Condition,
Expand All @@ -10,6 +9,8 @@ use crate::{
template::Template,
transforms::{FunctionTransform, OutputBuffer},
};
use vector_lib::lookup::OwnedTargetPath;
use vector_lib::lookup::lookup_v2::OptionalValuePath;

#[derive(Clone)]
pub struct Sample {
Expand All @@ -18,6 +19,7 @@ pub struct Sample {
key_field: Option<String>,
group_by: Option<Template>,
exclude: Option<Condition>,
sample_rate_key: OptionalValuePath,
counter: HashMap<Option<String>, u64>,
}

Expand All @@ -31,13 +33,15 @@ impl Sample {
key_field: Option<String>,
group_by: Option<Template>,
exclude: Option<Condition>,
sample_rate_key: OptionalValuePath,
) -> Self {
Self {
name,
rate,
key_field,
group_by,
exclude,
sample_rate_key,
counter: HashMap::new(),
}
}
Expand Down Expand Up @@ -113,21 +117,23 @@ impl FunctionTransform for Sample {
self.counter.insert(group_by_key.clone(), increment);

if num % self.rate == 0 {
match event {
Event::Log(ref mut event) => {
event.namespace().insert_source_metadata(
self.name.as_str(),
event,
Some(LegacyKey::Overwrite(vrl::path!("sample_rate"))),
vrl::path!("sample_rate"),
self.rate.to_string(),
);
}
Event::Trace(ref mut event) => {
event.insert(event_path!("sample_rate"), self.rate.to_string());
}
Event::Metric(_) => panic!("component can never receive metric events"),
};
if let Some(path) = &self.sample_rate_key.path {
match event {
Event::Log(ref mut event) => {
event.namespace().insert_source_metadata(
self.name.as_str(),
event,
Some(LegacyKey::Overwrite(path)),
path,
self.rate.to_string(),
);
}
Event::Trace(ref mut event) => {
event.insert(&OwnedTargetPath::event(path.clone()), self.rate.to_string());
}
Event::Metric(_) => panic!("component can never receive metric events"),
};
}
output.push(event);
} else {
emit!(SampleEventDiscarded);
Expand All @@ -138,14 +144,15 @@ impl FunctionTransform for Sample {
#[cfg(test)]
mod tests {
use super::*;
use vrl::owned_value_path;

use crate::{
conditions::{Condition, ConditionalConfig, VrlConfig},
config::log_schema,
event::{Event, LogEvent, TraceEvent},
test_util::random_lines,
transforms::sample::config::default_sample_rate_key,
transforms::test::transform_one,
transforms::OutputBuffer,
};
use approx::assert_relative_eq;

Expand Down Expand Up @@ -174,6 +181,7 @@ mod tests {
log_schema().message_key().unwrap().to_string().as_str(),
"na",
)),
default_sample_rate_key(),
);
let total_passed = events
.into_iter()
Expand All @@ -197,6 +205,7 @@ mod tests {
log_schema().message_key().unwrap().to_string().as_str(),
"na",
)),
default_sample_rate_key(),
);
let total_passed = events
.into_iter()
Expand All @@ -223,6 +232,7 @@ mod tests {
log_schema().message_key().unwrap().to_string().as_str(),
"na",
)),
default_sample_rate_key(),
);

let first_run = events
Expand Down Expand Up @@ -259,6 +269,7 @@ mod tests {
log_schema().message_key().unwrap().to_string().as_str(),
"important",
)),
default_sample_rate_key(),
);
let iterations = 0..1000;
let total_passed = iterations
Expand Down Expand Up @@ -286,6 +297,7 @@ mod tests {
log_schema().message_key().unwrap().to_string().as_str(),
"na",
)),
default_sample_rate_key(),
);
let iterations = 0..1000;
let total_passed = iterations
Expand All @@ -310,6 +322,7 @@ mod tests {
key_field.clone(),
None,
Some(condition_contains("other_field", "foo")),
default_sample_rate_key(),
);
let iterations = 0..1000;
let total_passed = iterations
Expand All @@ -333,6 +346,7 @@ mod tests {
key_field.clone(),
None,
Some(condition_contains(&message_key, "na")),
default_sample_rate_key(),
);
let passing = events
.into_iter()
Expand All @@ -348,13 +362,31 @@ mod tests {
key_field.clone(),
None,
Some(condition_contains(&message_key, "na")),
OptionalValuePath::from(owned_value_path!("custom_sample_rate")),
);
let passing = events
.into_iter()
.filter(|s| !s.as_log()[&message_key].to_string_lossy().contains("na"))
.find_map(|event| transform_one(&mut sampler, event))
.unwrap();
assert_eq!(passing.as_log()["custom_sample_rate"], "25".into());
assert!(passing.as_log().get("sample_rate").is_none());

let events = random_events(10000);
let mut sampler = Sample::new(
"sample".to_string(),
50,
key_field.clone(),
None,
Some(condition_contains(&message_key, "na")),
OptionalValuePath::from(owned_value_path!("")),
);
let passing = events
.into_iter()
.filter(|s| !s.as_log()[&message_key].to_string_lossy().contains("na"))
.find_map(|event| transform_one(&mut sampler, event))
.unwrap();
assert_eq!(passing.as_log()["sample_rate"], "25".into());
assert!(passing.as_log().get("sample_rate").is_none());

// If the event passed the regex check, don't include the sampling rate
let mut sampler = Sample::new(
Expand All @@ -363,6 +395,7 @@ mod tests {
key_field.clone(),
None,
Some(condition_contains(&message_key, "na")),
default_sample_rate_key(),
);
let event = Event::Log(LogEvent::from("nananana"));
let passing = transform_one(&mut sampler, event).unwrap();
Expand All @@ -374,7 +407,16 @@ mod tests {
fn handles_trace_event() {
let event: TraceEvent = LogEvent::from("trace").into();
let trace = Event::Trace(event);
let mut sampler = Sample::new("sample".to_string(), 2, None, None, None);

let mut sampler = Sample::new(
"sample".to_string(),
2,
None,
None,
None,
default_sample_rate_key(),
);

let iterations = 0..2;
let total_passed = iterations
.filter_map(|_| transform_one(&mut sampler, trace.clone()))
Expand Down
Loading