Skip to content

Commit

Permalink
WIP: influxdb line protcol decoder
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann committed Jan 17, 2024
1 parent cebe628 commit a85c402
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 0 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/codecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ chrono = { version = "0.4", default-features = false }
csv-core = { version = "0.1.10", default-features = false }
derivative = { version = "2", default-features = false }
dyn-clone = { version = "1", default-features = false }
influxdb-line-protocol = { version = "2", default-features = false }
lookup = { package = "vector-lookup", path = "../vector-lookup", default-features = false, features = ["test"] }
memchr = { version = "2", default-features = false }
once_cell = { version = "1.19", default-features = false }
Expand Down
167 changes: 167 additions & 0 deletions lib/codecs/src/decoding/format/influxdb_line.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
use bytes::Bytes;
use chrono::NaiveDateTime;
use derivative::Derivative;
use influxdb_line_protocol::{FieldValue, ParsedLine};
use smallvec::SmallVec;
use vector_config::configurable_component;
use vector_core::config::LogNamespace;
use vector_core::event::{Event, Metric, MetricKind, MetricTags, MetricValue};
use vector_core::{config::DataType, schema};
use vrl::value::kind::Collection;
use vrl::value::Kind;

use super::Deserializer;

/// Config used to build a `InfluxdbLineDeserializer`.
#[configurable_component]
#[derive(Debug, Clone, Default)]
pub struct InfluxdbLineDeserializerConfig {}

impl InfluxdbLineDeserializerConfig {
/// new constructs a new InfluxdbLineDeserializerConfig
pub fn new() -> Self {
Self {}
}

/// build constructs a new InfluxdbLineDeserializer
pub fn build(&self) -> InfluxdbLineDeserializer {
InfluxdbLineDeserializer {}
}

/// The output type produced by the deserializer.
pub fn output_type(&self) -> DataType {
DataType::Metric
}

/// The schema produced by the deserializer.
pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
schema::Definition::new_with_default_metadata(
Kind::object(Collection::empty()),
[log_namespace],
)
}
}

/// Deserializer for the influxdb line protocol
#[derive(Debug, Clone, Derivative)]
#[derivative(Default)]
pub struct InfluxdbLineDeserializer {}

impl InfluxdbLineDeserializer {
/// new constructs a new InfluxdbLineDeserializer
pub fn new() -> InfluxdbLineDeserializer {
InfluxdbLineDeserializer {}
}
}

impl Deserializer for InfluxdbLineDeserializer {
fn parse(
&self,
bytes: Bytes,
_log_namespace: LogNamespace,
) -> vector_common::Result<SmallVec<[Event; 1]>> {
// TODO: config option for lossy
let payload = String::from_utf8_lossy(&bytes);
let parsed_lines = influxdb_line_protocol::parse_lines(&payload);

Ok(parsed_lines
.flat_map(|line| {
let ParsedLine {
series,
field_set,
timestamp,
} = line.unwrap();

field_set
.iter()
// TODO: discard fieldvalues we dont want
.map(move |f| {
let measurement = series.measurement.clone();
let tags = series.tag_set.as_ref().clone();
let val = match f.1 {
FieldValue::I64(v) => v as f64,
FieldValue::U64(v) => v as f64,
FieldValue::F64(v) => v,
FieldValue::Boolean(v) => {
if v {
1.0
} else {
0.0
}
}
_ => panic!("foo"),
};
Event::Metric(
Metric::new(
format!("{0}_{1}", measurement, f.0),
MetricKind::Absolute,
MetricValue::Gauge { value: val },
)
.with_tags(tags.and_then(|ts| {
Some(MetricTags::from_iter(
ts.iter().map(|t| (t.0.to_string(), t.1.to_string())),
))
}))
.with_timestamp(timestamp.and_then(|t| {
NaiveDateTime::from_timestamp_micros(t).map(|t| t.and_utc())
})),
)
})
.collect::<Vec<_>>()
})
.collect())
}
}

#[cfg(test)]
mod tests {
use bytes::Bytes;
use chrono::NaiveDateTime;
use vector_core::{
config::LogNamespace,
event::{Metric, MetricKind, MetricTags, MetricValue},
};

use crate::decoding::format::{Deserializer, InfluxdbLineDeserializer};

#[test]
fn deserialize() {
let deser = InfluxdbLineDeserializer::new();
let buffer = Bytes::from(
"cpu,host=A,region=west usage_system=64i,usage_user=10i 1590488773254420000",
);
let events = deser.parse(buffer, LogNamespace::default()).unwrap();
assert_eq!(events.len(), 2);

assert_eq!(
events[0].as_metric(),
&Metric::new(
"cpu_usage_system",
MetricKind::Absolute,
MetricValue::Gauge { value: 64. },
)
.with_tags(Some(MetricTags::from_iter([
("host".to_string(), "A".to_string()),
("region".to_string(), "west".to_string()),
])))
.with_timestamp(
NaiveDateTime::from_timestamp_micros(1590488773254420000).map(|t| t.and_utc())
)
);
assert_eq!(
events[1].as_metric(),
&Metric::new(
"cpu_usage_user",
MetricKind::Absolute,
MetricValue::Gauge { value: 10. },
)
.with_tags(Some(MetricTags::from_iter([
("host".to_string(), "A".to_string()),
("region".to_string(), "west".to_string()),
])))
.with_timestamp(
NaiveDateTime::from_timestamp_micros(1590488773254420000).map(|t| t.and_utc())
)
);
}
}
2 changes: 2 additions & 0 deletions lib/codecs/src/decoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
mod avro;
mod bytes;
mod gelf;
mod influxdb_line;
mod json;
mod native;
mod native_json;
Expand All @@ -17,6 +18,7 @@ use ::bytes::Bytes;
pub use avro::{AvroDeserializer, AvroDeserializerConfig, AvroDeserializerOptions};
use dyn_clone::DynClone;
pub use gelf::{GelfDeserializer, GelfDeserializerConfig, GelfDeserializerOptions};
pub use influxdb_line::{InfluxdbLineDeserializer, InfluxdbLineDeserializerConfig};
pub use json::{JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions};
pub use native::{NativeDeserializer, NativeDeserializerConfig};
pub use native_json::{
Expand Down

0 comments on commit a85c402

Please sign in to comment.