Skip to content

Commit

Permalink
Adopt pair expansion in loki sink
Browse files Browse the repository at this point in the history
  • Loading branch information
titaneric committed Dec 22, 2024
1 parent c23fb31 commit 27089c3
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 86 deletions.
2 changes: 1 addition & 1 deletion src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ pub(crate) mod sqs;
#[cfg(any(feature = "sources-aws_s3", feature = "sinks-aws_s3"))]
pub(crate) mod s3;

#[cfg(any(feature = "transforms-log_to_metric"))]
#[cfg(any(feature = "transforms-log_to_metric", , feature = "sinks-loki"))]
pub(crate) mod expansion;
94 changes: 9 additions & 85 deletions src/sinks/loki/sink.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{collections::HashMap, num::NonZeroUsize, sync::LazyLock};
use std::{collections::HashMap, num::NonZeroUsize};

use bytes::{Bytes, BytesMut};
use regex::Regex;
use snafu::Snafu;
use tokio_util::codec::Encoder as _;
use vrl::path::parse_target_path;
Expand All @@ -13,6 +12,7 @@ use super::{
};
use crate::sinks::loki::event::LokiBatchEncoding;
use crate::{
common::expansion::pair_expansion,
http::{get_http_scheme_from_uri, HttpClient},
internal_events::{
LokiEventUnlabeledError, LokiOutOfOrderEventDroppedError, LokiOutOfOrderEventRewritten,
Expand Down Expand Up @@ -179,42 +179,7 @@ impl EventEncoder {

let key_s = key.unwrap();
let value_s = value.unwrap();

if let Some(opening_prefix) = key_s.strip_suffix('*') {
let output: Result<
serde_json::map::Map<String, serde_json::Value>,
serde_json::Error,
> = serde_json::from_str(value_s.clone().as_str());

if output.is_err() {
warn!(
"Failed to expand dynamic label. value: {}, err: {}",
value_s,
output.err().unwrap()
);
continue;
}

// key_* -> key_one, key_two, key_three
// * -> one, two, three
for (k, v) in output.unwrap() {
let key = slugify_text(format!("{}{}", opening_prefix, k));
let val = Value::from(v).to_string_lossy().into_owned();
if val == "<null>" {
warn!("Encountered \"null\" value for dynamic label. key: {}", key);
continue;
}
if let Some(prev) = dynamic_labels.insert(key.clone(), val.clone()) {
warn!(
"Encountered duplicated dynamic label. \
key: {}, value: {}, discarded value: {}",
key, val, prev
);
};
}
} else {
static_labels.insert(key_s, value_s);
}
let _ = pair_expansion(key_s, value_s, &mut static_labels, &mut dynamic_labels);
}

for (k, v) in static_labels {
Expand Down Expand Up @@ -284,46 +249,12 @@ impl EventEncoder {

let key_s = key.unwrap();
let value_s = value.unwrap();

if let Some(opening_prefix) = key_s.strip_suffix('*') {
let output: Result<
serde_json::map::Map<String, serde_json::Value>,
serde_json::Error,
> = serde_json::from_str(value_s.clone().as_str());

if output.is_err() {
warn!(
"Failed to expand dynamic structured metadata. value: {}, err: {}",
value_s,
output.err().unwrap()
);
continue;
}

// key_* -> key_one, key_two, key_three
// * -> one, two, three
for (k, v) in output.unwrap() {
let key = slugify_text(format!("{}{}", opening_prefix, k));
let val = Value::from(v).to_string_lossy().into_owned();
if val == "<null>" {
warn!(
"Encountered \"null\" value for dynamic structured_metadata. key: {}",
key
);
continue;
}
if let Some(prev) = dynamic_structured_metadata.insert(key.clone(), val.clone())
{
warn!(
"Encountered duplicated dynamic structured_metadata. \
key: {}, value: {}, discarded value: {}",
key, val, prev
);
};
}
} else {
static_structured_metadata.insert(key_s, value_s);
}
let _ = pair_expansion(
key_s,
value_s,
&mut static_structured_metadata,
&mut dynamic_structured_metadata,
);
}

for (k, v) in static_structured_metadata {
Expand Down Expand Up @@ -620,13 +551,6 @@ impl StreamSink<Event> for LokiSink {
}
}

static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"[^0-9A-Za-z_]").unwrap());

fn slugify_text(input: String) -> String {
let result = RE.replace_all(&input, "_");
result.to_lowercase()
}

#[cfg(test)]
mod tests {
use std::{collections::HashMap, convert::TryFrom};
Expand Down

0 comments on commit 27089c3

Please sign in to comment.