Skip to content

Commit

Permalink
feat(sample transform): add sample_rate_key config option (#21283)
Browse files Browse the repository at this point in the history
* feat(sample transform): add sample_rate_key config option

* docs: Add chanogelog entry for sample_rate_key change

* Add sample_rate_key description

* docs: add cue docs

* docs: correct sample docs whitespace
  • Loading branch information
dekelpilli authored Oct 31, 2024
1 parent f2f38bc commit 08bf15a
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 20 deletions.
3 changes: 3 additions & 0 deletions changelog.d/configurable-sample-rate-key.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `sample` transform now has a `sample_rate_key` configuration option, which default to `sample_rate`, that allows configuring which key is used to attach the sample rate to sampled events. If set to an empty string, the sample rate will not be attached to sampled events.

authors: dekelpilli
14 changes: 13 additions & 1 deletion src/transforms/sample/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use vector_lib::config::{LegacyKey, LogNamespace};
use vector_lib::configurable::configurable_component;
use vrl::owned_value_path;
use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path};
use vrl::value::Kind;

use crate::{
Expand Down Expand Up @@ -45,6 +45,11 @@ pub struct SampleConfig {
#[configurable(metadata(docs::examples = "message"))]
pub key_field: Option<String>,

/// The event key in which the sample rate is stored. If set to an empty string, the sample rate will not be added to the event.
#[configurable(metadata(docs::examples = "sample_rate"))]
#[serde(default = "default_sample_rate_key")]
pub sample_rate_key: OptionalValuePath,

/// The value to group events into separate buckets to be sampled independently.
///
/// If left unspecified, or if the event doesn't have `group_by`, then the event is not
Expand All @@ -66,6 +71,7 @@ impl GenerateConfig for SampleConfig {
key_field: None,
group_by: None,
exclude: None::<AnyCondition>,
sample_rate_key: default_sample_rate_key(),
})
.unwrap()
}
Expand All @@ -84,6 +90,7 @@ impl TransformConfig for SampleConfig {
.as_ref()
.map(|condition| condition.build(&context.enrichment_tables))
.transpose()?,
default_sample_rate_key(),
)))
}

Expand Down Expand Up @@ -118,6 +125,10 @@ impl TransformConfig for SampleConfig {
}
}

pub fn default_sample_rate_key() -> OptionalValuePath {
OptionalValuePath::from(owned_value_path!("sample_rate"))
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -141,6 +152,7 @@ mod tests {
key_field: None,
group_by: None,
exclude: None,
sample_rate_key: default_sample_rate_key(),
};
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
Expand Down
80 changes: 61 additions & 19 deletions src/transforms/sample/transform.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::HashMap;
use vector_lib::config::LegacyKey;
use vrl::event_path;

use crate::{
conditions::Condition,
Expand All @@ -10,6 +9,8 @@ use crate::{
template::Template,
transforms::{FunctionTransform, OutputBuffer},
};
use vector_lib::lookup::lookup_v2::OptionalValuePath;
use vector_lib::lookup::OwnedTargetPath;

#[derive(Clone)]
pub struct Sample {
Expand All @@ -18,6 +19,7 @@ pub struct Sample {
key_field: Option<String>,
group_by: Option<Template>,
exclude: Option<Condition>,
sample_rate_key: OptionalValuePath,
counter: HashMap<Option<String>, u64>,
}

Expand All @@ -31,13 +33,15 @@ impl Sample {
key_field: Option<String>,
group_by: Option<Template>,
exclude: Option<Condition>,
sample_rate_key: OptionalValuePath,
) -> Self {
Self {
name,
rate,
key_field,
group_by,
exclude,
sample_rate_key,
counter: HashMap::new(),
}
}
Expand Down Expand Up @@ -113,21 +117,23 @@ impl FunctionTransform for Sample {
self.counter.insert(group_by_key.clone(), increment);

if num % self.rate == 0 {
match event {
Event::Log(ref mut event) => {
event.namespace().insert_source_metadata(
self.name.as_str(),
event,
Some(LegacyKey::Overwrite(vrl::path!("sample_rate"))),
vrl::path!("sample_rate"),
self.rate.to_string(),
);
}
Event::Trace(ref mut event) => {
event.insert(event_path!("sample_rate"), self.rate.to_string());
}
Event::Metric(_) => panic!("component can never receive metric events"),
};
if let Some(path) = &self.sample_rate_key.path {
match event {
Event::Log(ref mut event) => {
event.namespace().insert_source_metadata(
self.name.as_str(),
event,
Some(LegacyKey::Overwrite(path)),
path,
self.rate.to_string(),
);
}
Event::Trace(ref mut event) => {
event.insert(&OwnedTargetPath::event(path.clone()), self.rate.to_string());
}
Event::Metric(_) => panic!("component can never receive metric events"),
};
}
output.push(event);
} else {
emit!(SampleEventDiscarded);
Expand All @@ -138,14 +144,15 @@ impl FunctionTransform for Sample {
#[cfg(test)]
mod tests {
use super::*;
use vrl::owned_value_path;

use crate::{
conditions::{Condition, ConditionalConfig, VrlConfig},
config::log_schema,
event::{Event, LogEvent, TraceEvent},
test_util::random_lines,
transforms::sample::config::default_sample_rate_key,
transforms::test::transform_one,
transforms::OutputBuffer,
};
use approx::assert_relative_eq;

Expand Down Expand Up @@ -174,6 +181,7 @@ mod tests {
log_schema().message_key().unwrap().to_string().as_str(),
"na",
)),
default_sample_rate_key(),
);
let total_passed = events
.into_iter()
Expand All @@ -197,6 +205,7 @@ mod tests {
log_schema().message_key().unwrap().to_string().as_str(),
"na",
)),
default_sample_rate_key(),
);
let total_passed = events
.into_iter()
Expand All @@ -223,6 +232,7 @@ mod tests {
log_schema().message_key().unwrap().to_string().as_str(),
"na",
)),
default_sample_rate_key(),
);

let first_run = events
Expand Down Expand Up @@ -259,6 +269,7 @@ mod tests {
log_schema().message_key().unwrap().to_string().as_str(),
"important",
)),
default_sample_rate_key(),
);
let iterations = 0..1000;
let total_passed = iterations
Expand Down Expand Up @@ -286,6 +297,7 @@ mod tests {
log_schema().message_key().unwrap().to_string().as_str(),
"na",
)),
default_sample_rate_key(),
);
let iterations = 0..1000;
let total_passed = iterations
Expand All @@ -310,6 +322,7 @@ mod tests {
key_field.clone(),
None,
Some(condition_contains("other_field", "foo")),
default_sample_rate_key(),
);
let iterations = 0..1000;
let total_passed = iterations
Expand All @@ -333,6 +346,7 @@ mod tests {
key_field.clone(),
None,
Some(condition_contains(&message_key, "na")),
default_sample_rate_key(),
);
let passing = events
.into_iter()
Expand All @@ -348,13 +362,31 @@ mod tests {
key_field.clone(),
None,
Some(condition_contains(&message_key, "na")),
OptionalValuePath::from(owned_value_path!("custom_sample_rate")),
);
let passing = events
.into_iter()
.filter(|s| !s.as_log()[&message_key].to_string_lossy().contains("na"))
.find_map(|event| transform_one(&mut sampler, event))
.unwrap();
assert_eq!(passing.as_log()["custom_sample_rate"], "25".into());
assert!(passing.as_log().get("sample_rate").is_none());

let events = random_events(10000);
let mut sampler = Sample::new(
"sample".to_string(),
50,
key_field.clone(),
None,
Some(condition_contains(&message_key, "na")),
OptionalValuePath::from(owned_value_path!("")),
);
let passing = events
.into_iter()
.filter(|s| !s.as_log()[&message_key].to_string_lossy().contains("na"))
.find_map(|event| transform_one(&mut sampler, event))
.unwrap();
assert_eq!(passing.as_log()["sample_rate"], "25".into());
assert!(passing.as_log().get("sample_rate").is_none());

// If the event passed the regex check, don't include the sampling rate
let mut sampler = Sample::new(
Expand All @@ -363,6 +395,7 @@ mod tests {
key_field.clone(),
None,
Some(condition_contains(&message_key, "na")),
default_sample_rate_key(),
);
let event = Event::Log(LogEvent::from("nananana"));
let passing = transform_one(&mut sampler, event).unwrap();
Expand All @@ -374,7 +407,16 @@ mod tests {
fn handles_trace_event() {
let event: TraceEvent = LogEvent::from("trace").into();
let trace = Event::Trace(event);
let mut sampler = Sample::new("sample".to_string(), 2, None, None, None);

let mut sampler = Sample::new(
"sample".to_string(),
2,
None,
None,
None,
default_sample_rate_key(),
);

let iterations = 0..2;
let total_passed = iterations
.filter_map(|_| transform_one(&mut sampler, trace.clone()))
Expand Down
8 changes: 8 additions & 0 deletions website/cue/reference/components/transforms/base/sample.cue
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,12 @@ base: components: transforms: sample: configuration: {
1500,
]
}
sample_rate_key: {
description: "The event key in which the sample rate is stored. If set to an empty string, the sample rate will not be added to the event."
required: false
type: string: {
default: "sample_rate"
examples: ["sample_rate"]
}
}
}

0 comments on commit 08bf15a

Please sign in to comment.