Skip to content

Commit

Permalink
feat(codec,sources): influxdb line protcol decoder
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann committed Jan 17, 2024
1 parent cebe628 commit 7d57649
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 3 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/codecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ chrono = { version = "0.4", default-features = false }
csv-core = { version = "0.1.10", default-features = false }
derivative = { version = "2", default-features = false }
dyn-clone = { version = "1", default-features = false }
influxdb-line-protocol = { version = "2", default-features = false }
lookup = { package = "vector-lookup", path = "../vector-lookup", default-features = false, features = ["test"] }
memchr = { version = "2", default-features = false }
once_cell = { version = "1.19", default-features = false }
Expand Down
217 changes: 217 additions & 0 deletions lib/codecs/src/decoding/format/influxdb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
use std::borrow::Cow;

use bytes::Bytes;
use chrono::NaiveDateTime;
use derivative::Derivative;
use influxdb_line_protocol::{FieldValue, ParsedLine};
use smallvec::SmallVec;
use vector_config::configurable_component;
use vector_core::config::LogNamespace;
use vector_core::event::{Event, Metric, MetricKind, MetricTags, MetricValue};
use vector_core::{config::DataType, schema};
use vrl::value::kind::Collection;
use vrl::value::Kind;

use crate::decoding::format::default_lossy;

use super::Deserializer;

/// Config used to build a `InfluxdbDeserializer`.
#[configurable_component]
#[derive(Debug, Clone, Default)]
pub struct InfluxdbDeserializerConfig {
/// Influxdb-specific decoding options.
#[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
pub influxdb: InfluxdbDeserializerOptions,
}

impl InfluxdbDeserializerConfig {
/// new constructs a new InfluxdbDeserializerConfig
pub fn new(options: InfluxdbDeserializerOptions) -> Self {
Self { influxdb: options }
}

/// build constructs a new InfluxdbDeserializer
pub fn build(&self) -> InfluxdbDeserializer {
Into::<InfluxdbDeserializer>::into(self)
}

/// The output type produced by the deserializer.
pub fn output_type(&self) -> DataType {
DataType::Metric
}

/// The schema produced by the deserializer.
pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
schema::Definition::new_with_default_metadata(
Kind::object(Collection::empty()),
[log_namespace],
)
}
}

/// Influxdb-specific decoding options.
#[configurable_component]
#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
#[derivative(Default)]
pub struct InfluxdbDeserializerOptions {
/// Determines whether or not to replace invalid UTF-8 sequences instead of failing.
///
/// When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD].
///
/// [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character
#[serde(
default = "default_lossy",
skip_serializing_if = "vector_core::serde::is_default"
)]
#[derivative(Default(value = "default_lossy()"))]
pub lossy: bool,
}

/// Deserializer for the influxdb line protocol
#[derive(Debug, Clone, Derivative)]
#[derivative(Default)]
pub struct InfluxdbDeserializer {
#[derivative(Default(value = "default_lossy()"))]
lossy: bool,
}

impl InfluxdbDeserializer {
/// new constructs a new InfluxdbDeserializer
pub fn new(lossy: bool) -> Self {
Self { lossy }
}
}

impl Deserializer for InfluxdbDeserializer {
fn parse(
&self,
bytes: Bytes,
_log_namespace: LogNamespace,
) -> vector_common::Result<SmallVec<[Event; 1]>> {
let line: Cow<str> = match self.lossy {
true => String::from_utf8_lossy(&bytes),
false => Cow::from(std::str::from_utf8(&bytes)?),
};
let parsed_line = influxdb_line_protocol::parse_lines(&line);

let res = parsed_line
.collect::<Result<Vec<_>, _>>()?
.iter()
.flat_map(|line| {
let ParsedLine {
series,
field_set,
timestamp,
} = line;

field_set
.iter()
.filter_map(|f| {
let measurement = series.measurement.clone();
let tags = series.tag_set.as_ref().clone();
let val = match f.1 {
FieldValue::I64(v) => v as f64,
FieldValue::U64(v) => v as f64,
FieldValue::F64(v) => v,
FieldValue::Boolean(v) => {
if v {
1.0
} else {
0.0
}
}
// String values cannot be modelled in our schema
FieldValue::String(_) => return None,
};
Some(Event::Metric(
Metric::new(
format!("{0}_{1}", measurement, f.0),
MetricKind::Absolute,
MetricValue::Gauge { value: val },
)
.with_tags(tags.and_then(|ts| {
Some(MetricTags::from_iter(
ts.iter().map(|t| (t.0.to_string(), t.1.to_string())),
))
}))
.with_timestamp(timestamp.and_then(|t| {
NaiveDateTime::from_timestamp_micros(t).map(|t| t.and_utc())
})),
))
})
.collect::<Vec<_>>()
})
.collect();

Ok(res)
}
}

impl From<&InfluxdbDeserializerConfig> for InfluxdbDeserializer {
fn from(config: &InfluxdbDeserializerConfig) -> Self {
Self {
lossy: config.influxdb.lossy,
}
}
}

#[cfg(test)]
mod tests {
use bytes::Bytes;
use chrono::NaiveDateTime;
use vector_core::{
config::LogNamespace,
event::{Metric, MetricKind, MetricTags, MetricValue},
};

use crate::decoding::format::{Deserializer, InfluxdbDeserializer};

#[test]
fn deserialize_success() {
let deser = InfluxdbDeserializer::new(true);
let buffer = Bytes::from(
"cpu,host=A,region=west usage_system=64i,usage_user=10i 1590488773254420000",
);
let events = deser.parse(buffer, LogNamespace::default()).unwrap();
assert_eq!(events.len(), 2);

assert_eq!(
events[0].as_metric(),
&Metric::new(
"cpu_usage_system",
MetricKind::Absolute,
MetricValue::Gauge { value: 64. },
)
.with_tags(Some(MetricTags::from_iter([
("host".to_string(), "A".to_string()),
("region".to_string(), "west".to_string()),
])))
.with_timestamp(
NaiveDateTime::from_timestamp_micros(1590488773254420000).map(|t| t.and_utc())
)
);
assert_eq!(
events[1].as_metric(),
&Metric::new(
"cpu_usage_user",
MetricKind::Absolute,
MetricValue::Gauge { value: 10. },
)
.with_tags(Some(MetricTags::from_iter([
("host".to_string(), "A".to_string()),
("region".to_string(), "west".to_string()),
])))
.with_timestamp(
NaiveDateTime::from_timestamp_micros(1590488773254420000).map(|t| t.and_utc())
)
);
}

#[test]
fn deserialize_error() {
let deser = InfluxdbDeserializer::new(true);
let buffer = Bytes::from("some invalid string");
assert!(deser.parse(buffer, LogNamespace::default()).is_err());
}
}
2 changes: 2 additions & 0 deletions lib/codecs/src/decoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
mod avro;
mod bytes;
mod gelf;
mod influxdb;
mod json;
mod native;
mod native_json;
Expand All @@ -17,6 +18,7 @@ use ::bytes::Bytes;
pub use avro::{AvroDeserializer, AvroDeserializerConfig, AvroDeserializerOptions};
use dyn_clone::DynClone;
pub use gelf::{GelfDeserializer, GelfDeserializerConfig, GelfDeserializerOptions};
pub use influxdb::{InfluxdbDeserializer, InfluxdbDeserializerConfig};
pub use json::{JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions};
pub use native::{NativeDeserializer, NativeDeserializerConfig};
pub use native_json::{
Expand Down
26 changes: 23 additions & 3 deletions lib/codecs/src/decoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use bytes::{Bytes, BytesMut};
pub use error::StreamDecodingError;
pub use format::{
BoxedDeserializer, BytesDeserializer, BytesDeserializerConfig, GelfDeserializer,
GelfDeserializerConfig, GelfDeserializerOptions, JsonDeserializer, JsonDeserializerConfig,
JsonDeserializerOptions, NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer,
GelfDeserializerConfig, GelfDeserializerOptions, InfluxdbDeserializer,
InfluxdbDeserializerConfig, JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions,
NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer,
NativeJsonDeserializerConfig, NativeJsonDeserializerOptions, ProtobufDeserializer,
ProtobufDeserializerConfig, ProtobufDeserializerOptions,
};
Expand Down Expand Up @@ -239,6 +240,11 @@ pub enum DeserializerConfig {
/// [gelf]: https://docs.graylog.org/docs/gelf
Gelf(GelfDeserializerConfig),

/// Decodes the raw bytes as a [Influxdb Line Protocol][influxdb] message.
///
/// [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol
Influxdb(InfluxdbDeserializerConfig),

/// Decodes the raw bytes as as an [Apache Avro][apache_avro] message.
///
/// [apache_avro]: https://avro.apache.org/
Expand Down Expand Up @@ -285,6 +291,12 @@ impl From<NativeJsonDeserializerConfig> for DeserializerConfig {
}
}

impl From<InfluxdbDeserializerConfig> for DeserializerConfig {
fn from(config: InfluxdbDeserializerConfig) -> Self {
Self::Influxdb(config)
}
}

impl DeserializerConfig {
/// Build the `Deserializer` from this configuration.
pub fn build(&self) -> vector_common::Result<Deserializer> {
Expand All @@ -305,6 +317,7 @@ impl DeserializerConfig {
}
DeserializerConfig::NativeJson(config) => Ok(Deserializer::NativeJson(config.build())),
DeserializerConfig::Gelf(config) => Ok(Deserializer::Gelf(config.build())),
DeserializerConfig::Influxdb(config) => Ok(Deserializer::Influxdb(config.build())),
}
}

Expand All @@ -316,6 +329,7 @@ impl DeserializerConfig {
DeserializerConfig::Bytes
| DeserializerConfig::Json(_)
| DeserializerConfig::Gelf(_)
| DeserializerConfig::Influxdb(_)
| DeserializerConfig::NativeJson(_) => {
FramingConfig::NewlineDelimited(Default::default())
}
Expand All @@ -340,6 +354,7 @@ impl DeserializerConfig {
DeserializerConfig::Native => NativeDeserializerConfig.output_type(),
DeserializerConfig::NativeJson(config) => config.output_type(),
DeserializerConfig::Gelf(config) => config.output_type(),
DeserializerConfig::Influxdb(config) => config.output_type(),
}
}

Expand All @@ -358,6 +373,7 @@ impl DeserializerConfig {
DeserializerConfig::Native => NativeDeserializerConfig.schema_definition(log_namespace),
DeserializerConfig::NativeJson(config) => config.schema_definition(log_namespace),
DeserializerConfig::Gelf(config) => config.schema_definition(log_namespace),
DeserializerConfig::Influxdb(config) => config.schema_definition(log_namespace),
}
}

Expand Down Expand Up @@ -388,7 +404,8 @@ impl DeserializerConfig {
DeserializerConfig::Json(_)
| DeserializerConfig::NativeJson(_)
| DeserializerConfig::Bytes
| DeserializerConfig::Gelf(_),
| DeserializerConfig::Gelf(_)
| DeserializerConfig::Influxdb(_),
_,
) => "text/plain",
#[cfg(feature = "syslog")]
Expand Down Expand Up @@ -419,6 +436,8 @@ pub enum Deserializer {
Boxed(BoxedDeserializer),
/// Uses a `GelfDeserializer` for deserialization.
Gelf(GelfDeserializer),
/// Uses a `InfluxdbDeserializer` for deserialization.
Influxdb(InfluxdbDeserializer),
}

impl format::Deserializer for Deserializer {
Expand All @@ -438,6 +457,7 @@ impl format::Deserializer for Deserializer {
Deserializer::NativeJson(deserializer) => deserializer.parse(bytes, log_namespace),
Deserializer::Boxed(deserializer) => deserializer.parse(bytes, log_namespace),
Deserializer::Gelf(deserializer) => deserializer.parse(bytes, log_namespace),
Deserializer::Influxdb(deserializer) => deserializer.parse(bytes, log_namespace),
}
}
}
2 changes: 2 additions & 0 deletions src/components/validation/resources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ fn deserializer_config_to_serializer(config: &DeserializerConfig) -> encoding::S
DeserializerConfig::NativeJson { .. } => SerializerConfig::NativeJson,
DeserializerConfig::Gelf { .. } => SerializerConfig::Gelf,
DeserializerConfig::Avro { avro } => SerializerConfig::Avro { avro: avro.into() },
// TODO: Influxdb has no serializer yet
DeserializerConfig::Influxdb { .. } => todo!(),
};

serializer_config
Expand Down

0 comments on commit 7d57649

Please sign in to comment.