Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(log_to_metric transform): support tags expansion similar to labels expansion for loki sink #21939

Merged
merged 26 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
1fc4d4d
Try tag expansion
titaneric Nov 30, 2024
f49b068
Add tags expansion test
titaneric Dec 2, 2024
7a94bd1
Extract dynamic labels into tags result
titaneric Dec 2, 2024
e85cb44
Update tags expansion testcase
titaneric Dec 3, 2024
7c39d98
Refactor render_tag_into
titaneric Dec 3, 2024
788a257
Add multi_value_tags_expansion_yaml test
titaneric Dec 3, 2024
293b15a
Add basic docs for tags
titaneric Dec 3, 2024
ba98fb2
Add change log
titaneric Dec 3, 2024
c4bba1b
Rename changelog file
titaneric Dec 3, 2024
8ced9a8
rename labels to tags and inset the tag value in render_tag_into
titaneric Dec 5, 2024
3884f83
Remove debug msg
titaneric Dec 5, 2024
21a532c
Add colliding tags test
titaneric Dec 5, 2024
9bf3219
Add source reference
titaneric Dec 6, 2024
5957703
fix comment and changelog
titaneric Dec 22, 2024
c23fb31
Extract pair expansion to common mod
titaneric Dec 22, 2024
3a10f42
Adopt pair expansion in loki sink
titaneric Dec 22, 2024
f12e9e2
Generate component docs
titaneric Dec 23, 2024
057456d
Add PairExpansion Error
titaneric Dec 23, 2024
6af30a4
Extend the result with extended pairs
titaneric Dec 23, 2024
c337c04
Update docs
titaneric Dec 23, 2024
fbdebbb
Re-generate docs
titaneric Dec 23, 2024
f08e7ff
Add docs for pair_expansion and change signature to string literal
titaneric Jan 4, 2025
97a8756
Update signature for render_tag_into
titaneric Jan 4, 2025
a4ecc09
Handle the `pair_expansion` error in caller side
titaneric Jan 6, 2025
32ed91d
Better error handling in `pair_expansion`
titaneric Jan 6, 2025
5d4b386
Update example
titaneric Jan 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/21939_log_to_metric_tags_expansion.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `log_to_metric` transformer tag key are now template-able which enables tags expansion.

authors: titaneric
57 changes: 57 additions & 0 deletions src/common/expansion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use regex::Regex;
use std::{collections::HashMap, sync::LazyLock};

use crate::event::Value;

static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"[^0-9A-Za-z_]").unwrap());
fn slugify_text(input: &str) -> String {
let result = RE.replace_all(&input, "_");

Check failure on line 8 in src/common/expansion.rs

View workflow job for this annotation

GitHub Actions / Checks

this expression creates a reference which is immediately dereferenced by the compiler
result.to_lowercase()
}

/// Expands the given possibly template-able `key_s` and `value_s`, and return the expanded owned pairs
/// it would also insert the pairs into either `static_pairs` or `dynamic_pairs` depending on the template-ability of `key_s`.
/// Refer to loki sink and log_to_metric transform for further information.
pront marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) fn pair_expansion(
key_s: &str,
value_s: &str,
static_pairs: &mut HashMap<String, String>,
dynamic_pairs: &mut HashMap<String, String>,
) -> Result<HashMap<String, String>, serde_json::Error> {
let mut expanded_pairs = HashMap::new();
if let Some(opening_prefix) = key_s.strip_suffix('*') {
let output: Result<serde_json::map::Map<String, serde_json::Value>, serde_json::Error> =
serde_json::from_str(value_s);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✨ nice we got rid of a clone call


if let Err(err) = output {
warn!(
"Failed to expand dynamic pair. value: {}, err: {}",
value_s, err
);
return Err(err);
}

// key_* -> key_one, key_two, key_three
// * -> one, two, three
for (k, v) in output.unwrap() {
let key = slugify_text(&format!("{}{}", opening_prefix, k));
let val = Value::from(v).to_string_lossy().into_owned();
if val == "<null>" {
warn!("Encountered \"null\" value for dynamic pair. key: {}", key);
continue;
}
if let Some(prev) = dynamic_pairs.insert(key.clone(), val.clone()) {
warn!(
"Encountered duplicated dynamic pair. \
key: {}, value: {:?}, discarded value: {:?}",
key, val, prev
);
};
expanded_pairs.insert(key, val);
}
} else {
static_pairs.insert(key_s.to_string(), value_s.to_string());
expanded_pairs.insert(key_s.to_string(), value_s.to_string());
}
Ok(expanded_pairs)
}
5 changes: 4 additions & 1 deletion src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Modules that are common between sources and sinks.
//! Modules that are common between sources, transforms, and sinks.
#[cfg(any(
feature = "sources-datadog_agent",
feature = "sinks-datadog_events",
Expand All @@ -17,3 +17,6 @@ pub(crate) mod sqs;

#[cfg(any(feature = "sources-aws_s3", feature = "sinks-aws_s3"))]
pub(crate) mod s3;

#[cfg(any(feature = "transforms-log_to_metric", feature = "sinks-loki"))]
pub(crate) mod expansion;
94 changes: 9 additions & 85 deletions src/sinks/loki/sink.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{collections::HashMap, num::NonZeroUsize, sync::LazyLock};
use std::{collections::HashMap, num::NonZeroUsize};

use bytes::{Bytes, BytesMut};
use regex::Regex;
use snafu::Snafu;
use tokio_util::codec::Encoder as _;
use vrl::path::parse_target_path;
Expand All @@ -13,6 +12,7 @@ use super::{
};
use crate::sinks::loki::event::LokiBatchEncoding;
use crate::{
common::expansion::pair_expansion,
http::{get_http_scheme_from_uri, HttpClient},
internal_events::{
LokiEventUnlabeledError, LokiOutOfOrderEventDroppedError, LokiOutOfOrderEventRewritten,
Expand Down Expand Up @@ -179,42 +179,7 @@ impl EventEncoder {

let key_s = key.unwrap();
let value_s = value.unwrap();

if let Some(opening_prefix) = key_s.strip_suffix('*') {
let output: Result<
serde_json::map::Map<String, serde_json::Value>,
serde_json::Error,
> = serde_json::from_str(value_s.clone().as_str());

if output.is_err() {
warn!(
"Failed to expand dynamic label. value: {}, err: {}",
value_s,
output.err().unwrap()
);
continue;
}

// key_* -> key_one, key_two, key_three
// * -> one, two, three
for (k, v) in output.unwrap() {
let key = slugify_text(format!("{}{}", opening_prefix, k));
let val = Value::from(v).to_string_lossy().into_owned();
if val == "<null>" {
warn!("Encountered \"null\" value for dynamic label. key: {}", key);
continue;
}
if let Some(prev) = dynamic_labels.insert(key.clone(), val.clone()) {
warn!(
"Encountered duplicated dynamic label. \
key: {}, value: {}, discarded value: {}",
key, val, prev
);
};
}
} else {
static_labels.insert(key_s, value_s);
}
let _ = pair_expansion(&key_s, &value_s, &mut static_labels, &mut dynamic_labels);
}

for (k, v) in static_labels {
Expand Down Expand Up @@ -284,46 +249,12 @@ impl EventEncoder {

let key_s = key.unwrap();
let value_s = value.unwrap();

if let Some(opening_prefix) = key_s.strip_suffix('*') {
let output: Result<
serde_json::map::Map<String, serde_json::Value>,
serde_json::Error,
> = serde_json::from_str(value_s.clone().as_str());

if output.is_err() {
warn!(
"Failed to expand dynamic structured metadata. value: {}, err: {}",
value_s,
output.err().unwrap()
);
continue;
}

// key_* -> key_one, key_two, key_three
// * -> one, two, three
for (k, v) in output.unwrap() {
let key = slugify_text(format!("{}{}", opening_prefix, k));
let val = Value::from(v).to_string_lossy().into_owned();
if val == "<null>" {
warn!(
"Encountered \"null\" value for dynamic structured_metadata. key: {}",
key
);
continue;
}
if let Some(prev) = dynamic_structured_metadata.insert(key.clone(), val.clone())
{
warn!(
"Encountered duplicated dynamic structured_metadata. \
key: {}, value: {}, discarded value: {}",
key, val, prev
);
};
}
} else {
static_structured_metadata.insert(key_s, value_s);
}
let _ = pair_expansion(
&key_s,
pront marked this conversation as resolved.
Show resolved Hide resolved
&value_s,
&mut static_structured_metadata,
&mut dynamic_structured_metadata,
);
}

for (k, v) in static_structured_metadata {
Expand Down Expand Up @@ -620,13 +551,6 @@ impl StreamSink<Event> for LokiSink {
}
}

static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"[^0-9A-Za-z_]").unwrap());

fn slugify_text(input: String) -> String {
let result = RE.replace_all(&input, "_");
result.to_lowercase()
}

#[cfg(test)]
mod tests {
use std::{collections::HashMap, convert::TryFrom};
Expand Down
Loading
Loading