diff --git a/changelog.d/21939_log_to_metric_tags_expansion.feature.md b/changelog.d/21939_log_to_metric_tags_expansion.feature.md new file mode 100644 index 0000000000000..d6223975ae7e9 --- /dev/null +++ b/changelog.d/21939_log_to_metric_tags_expansion.feature.md @@ -0,0 +1,3 @@ +The `log_to_metric` transformer tag key are now template-able which enables tags expansion. + +authors: titaneric diff --git a/src/common/expansion.rs b/src/common/expansion.rs new file mode 100644 index 0000000000000..11d6940183de7 --- /dev/null +++ b/src/common/expansion.rs @@ -0,0 +1,48 @@ +use regex::Regex; +use std::{collections::HashMap, sync::LazyLock}; + +use crate::event::Value; + +static RE: LazyLock = LazyLock::new(|| Regex::new(r"[^0-9A-Za-z_]").unwrap()); +fn slugify_text(input: &str) -> String { + let result = RE.replace_all(input, "_"); + result.to_lowercase() +} + +/// Expands the given possibly template-able `key_s` and `value_s`, and return the expanded owned pairs +/// it would also insert the pairs into either `static_pairs` or `dynamic_pairs` depending on the template-ability of `key_s`. +pub(crate) fn pair_expansion( + key_s: &str, + value_s: &str, + static_pairs: &mut HashMap, + dynamic_pairs: &mut HashMap, +) -> Result, serde_json::Error> { + let mut expanded_pairs = HashMap::new(); + if let Some(opening_prefix) = key_s.strip_suffix('*') { + let output: serde_json::map::Map = + serde_json::from_str(value_s)?; + + // key_* -> key_one, key_two, key_three + // * -> one, two, three + for (k, v) in output { + let key = slugify_text(&format!("{}{}", opening_prefix, k)); + let val = Value::from(v).to_string_lossy().into_owned(); + if val == "" { + warn!("Encountered \"null\" value for dynamic pair. key: {}", key); + continue; + } + if let Some(prev) = dynamic_pairs.insert(key.clone(), val.clone()) { + warn!( + "Encountered duplicated dynamic pair. \ + key: {}, value: {:?}, discarded value: {:?}", + key, val, prev + ); + }; + expanded_pairs.insert(key, val); + } + } else { + static_pairs.insert(key_s.to_string(), value_s.to_string()); + expanded_pairs.insert(key_s.to_string(), value_s.to_string()); + } + Ok(expanded_pairs) +} diff --git a/src/common/mod.rs b/src/common/mod.rs index dad3e63de35e4..6e29cbeebeadf 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,4 +1,4 @@ -//! Modules that are common between sources and sinks. +//! Modules that are common between sources, transforms, and sinks. #[cfg(any( feature = "sources-datadog_agent", feature = "sinks-datadog_events", @@ -17,3 +17,6 @@ pub(crate) mod sqs; #[cfg(any(feature = "sources-aws_s3", feature = "sinks-aws_s3"))] pub(crate) mod s3; + +#[cfg(any(feature = "transforms-log_to_metric", feature = "sinks-loki"))] +pub(crate) mod expansion; diff --git a/src/sinks/loki/sink.rs b/src/sinks/loki/sink.rs index f12dbc3b09e4a..37be3a9ee7478 100644 --- a/src/sinks/loki/sink.rs +++ b/src/sinks/loki/sink.rs @@ -1,7 +1,6 @@ -use std::{collections::HashMap, num::NonZeroUsize, sync::LazyLock}; +use std::{collections::HashMap, num::NonZeroUsize}; use bytes::{Bytes, BytesMut}; -use regex::Regex; use snafu::Snafu; use tokio_util::codec::Encoder as _; use vrl::path::parse_target_path; @@ -13,6 +12,7 @@ use super::{ }; use crate::sinks::loki::event::LokiBatchEncoding; use crate::{ + common::expansion::pair_expansion, http::{get_http_scheme_from_uri, HttpClient}, internal_events::{ LokiEventUnlabeledError, LokiOutOfOrderEventDroppedError, LokiOutOfOrderEventRewritten, @@ -179,41 +179,13 @@ impl EventEncoder { let key_s = key.unwrap(); let value_s = value.unwrap(); - - if let Some(opening_prefix) = key_s.strip_suffix('*') { - let output: Result< - serde_json::map::Map, - serde_json::Error, - > = serde_json::from_str(value_s.clone().as_str()); - - if output.is_err() { - warn!( - "Failed to expand dynamic label. value: {}, err: {}", - value_s, - output.err().unwrap() - ); - continue; - } - - // key_* -> key_one, key_two, key_three - // * -> one, two, three - for (k, v) in output.unwrap() { - let key = slugify_text(format!("{}{}", opening_prefix, k)); - let val = Value::from(v).to_string_lossy().into_owned(); - if val == "" { - warn!("Encountered \"null\" value for dynamic label. key: {}", key); - continue; - } - if let Some(prev) = dynamic_labels.insert(key.clone(), val.clone()) { - warn!( - "Encountered duplicated dynamic label. \ - key: {}, value: {}, discarded value: {}", - key, val, prev - ); - }; - } - } else { - static_labels.insert(key_s, value_s); + let result = pair_expansion(&key_s, &value_s, &mut static_labels, &mut dynamic_labels); + // we just need to check the error since the result have been inserted in the static_pairs or dynamic_pairs + if let Err(err) = result { + warn!( + "Failed to expand dynamic label. value: {}, err: {}", + value_s, err + ); } } @@ -284,52 +256,25 @@ impl EventEncoder { let key_s = key.unwrap(); let value_s = value.unwrap(); - - if let Some(opening_prefix) = key_s.strip_suffix('*') { - let output: Result< - serde_json::map::Map, - serde_json::Error, - > = serde_json::from_str(value_s.clone().as_str()); - - if output.is_err() { - warn!( - "Failed to expand dynamic structured metadata. value: {}, err: {}", - value_s, - output.err().unwrap() - ); - continue; - } - - // key_* -> key_one, key_two, key_three - // * -> one, two, three - for (k, v) in output.unwrap() { - let key = slugify_text(format!("{}{}", opening_prefix, k)); - let val = Value::from(v).to_string_lossy().into_owned(); - if val == "" { - warn!( - "Encountered \"null\" value for dynamic structured_metadata. key: {}", - key - ); - continue; - } - if let Some(prev) = dynamic_structured_metadata.insert(key.clone(), val.clone()) - { - warn!( - "Encountered duplicated dynamic structured_metadata. \ - key: {}, value: {}, discarded value: {}", - key, val, prev - ); - }; - } - } else { - static_structured_metadata.insert(key_s, value_s); + let result = pair_expansion( + &key_s, + &value_s, + &mut static_structured_metadata, + &mut dynamic_structured_metadata, + ); + // we just need to check the error since the result have been inserted in the static_pairs or dynamic_pairs + if let Err(err) = result { + warn!( + "Failed to expand dynamic structured metadata. value: {}, err: {}", + value_s, err + ); } } for (k, v) in static_structured_metadata { if let Some(discarded_v) = dynamic_structured_metadata.insert(k.clone(), v.clone()) { warn!( - "Static label overrides dynamic label. \ + "Static structured_metadata overrides dynamic structured_metadata. \ key: {}, value: {}, discarded value: {}", k, v, discarded_v ); @@ -620,13 +565,6 @@ impl StreamSink for LokiSink { } } -static RE: LazyLock = LazyLock::new(|| Regex::new(r"[^0-9A-Za-z_]").unwrap()); - -fn slugify_text(input: String) -> String { - let result = RE.replace_all(&input, "_"); - result.to_lowercase() -} - #[cfg(test)] mod tests { use std::{collections::HashMap, convert::TryFrom}; diff --git a/src/transforms/log_to_metric.rs b/src/transforms/log_to_metric.rs index 1e3d966d2c43a..48cfbecfeb501 100644 --- a/src/transforms/log_to_metric.rs +++ b/src/transforms/log_to_metric.rs @@ -19,6 +19,7 @@ use vrl::{event_path, path}; use crate::config::schema::Definition; use crate::transforms::log_to_metric::TransformError::PathNotFound; use crate::{ + common::expansion::pair_expansion, config::{ DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext, TransformOutput, @@ -109,8 +110,11 @@ pub struct MetricConfig { pub namespace: Option