diff --git a/Cargo.lock b/Cargo.lock index 7b5c131b0518c..9aa37849dea22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2091,6 +2091,7 @@ dependencies = [ "dyn-clone", "futures 0.3.30", "indoc", + "influxdb-line-protocol", "memchr", "once_cell", "ordered-float 4.2.1", @@ -4554,6 +4555,19 @@ version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc150e5ce2330295b8616ce0e3f53250e53af31759a9dbedad1621ba29151847" +[[package]] +name = "influxdb-line-protocol" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22fa7ee6be451ea0b1912b962c91c8380835e97cf1584a77e18264e908448dcb" +dependencies = [ + "bytes 1.6.0", + "log", + "nom", + "smallvec", + "snafu 0.7.5", +] + [[package]] name = "inotify" version = "0.9.6" diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index a444f2013f9db..9205521dc53c6 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -281,6 +281,7 @@ indexmap,https://github.com/bluss/indexmap,Apache-2.0 OR MIT,The indexmap Author indexmap,https://github.com/indexmap-rs/indexmap,Apache-2.0 OR MIT,The indexmap Authors indoc,https://github.com/dtolnay/indoc,MIT OR Apache-2.0,David Tolnay infer,https://github.com/bojand/infer,MIT,Bojan +influxdb-line-protocol,https://github.com/influxdata/influxdb_iox/tree/main/influxdb_line_protocol,MIT OR Apache-2.0,InfluxDB IOx Project Developers inotify,https://github.com/hannobraun/inotify,ISC,"Hanno Braun , Félix Saparelli , Cristian Kubis , Frank Denis " inotify-sys,https://github.com/hannobraun/inotify-sys,ISC,Hanno Braun inout,https://github.com/RustCrypto/utils,MIT OR Apache-2.0,RustCrypto Developers diff --git a/changelog.d/19637_influxdb_line_protocol_decoder.enhancement.md b/changelog.d/19637_influxdb_line_protocol_decoder.enhancement.md new file mode 100644 index 0000000000000..f11e9681895f9 --- /dev/null +++ b/changelog.d/19637_influxdb_line_protocol_decoder.enhancement.md @@ -0,0 +1,3 @@ +Introduced support for decoding InfluxDB line protocol messages, allowing these messages to be deserialized into the Vector metric format. + +authors: MichaHoffmann sebinsunny diff --git a/lib/codecs/Cargo.toml b/lib/codecs/Cargo.toml index f51e2f58ef823..e21795f531eb1 100644 --- a/lib/codecs/Cargo.toml +++ b/lib/codecs/Cargo.toml @@ -16,6 +16,7 @@ chrono.workspace = true csv-core = { version = "0.1.10", default-features = false } derivative = { version = "2", default-features = false } dyn-clone = { version = "1", default-features = false } +influxdb-line-protocol = { version = "2", default-features = false } lookup = { package = "vector-lookup", path = "../vector-lookup", default-features = false, features = ["test"] } memchr = { version = "2", default-features = false } once_cell = { version = "1.19", default-features = false } diff --git a/lib/codecs/src/decoding/format/influxdb.rs b/lib/codecs/src/decoding/format/influxdb.rs new file mode 100644 index 0000000000000..aab5aba9b87a5 --- /dev/null +++ b/lib/codecs/src/decoding/format/influxdb.rs @@ -0,0 +1,211 @@ +use std::borrow::Cow; + +use bytes::Bytes; +use chrono::DateTime; +use derivative::Derivative; +use influxdb_line_protocol::{FieldValue, ParsedLine}; +use smallvec::SmallVec; +use vector_config::configurable_component; +use vector_core::config::LogNamespace; +use vector_core::event::{Event, Metric, MetricKind, MetricTags, MetricValue}; +use vector_core::{config::DataType, schema}; +use vrl::value::kind::Collection; +use vrl::value::Kind; + +use crate::decoding::format::default_lossy; + +use super::Deserializer; + +/// Config used to build a `InfluxdbDeserializer`. +/// - [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v1/write_protocols/line_protocol_tutorial/): +#[configurable_component] +#[derive(Debug, Clone, Default)] +pub struct InfluxdbDeserializerConfig { + /// Influxdb-specific decoding options. + #[serde(default, skip_serializing_if = "vector_core::serde::is_default")] + pub influxdb: InfluxdbDeserializerOptions, +} + +impl InfluxdbDeserializerConfig { + /// new constructs a new InfluxdbDeserializerConfig + pub fn new(options: InfluxdbDeserializerOptions) -> Self { + Self { influxdb: options } + } + + /// build constructs a new InfluxdbDeserializer + pub fn build(&self) -> InfluxdbDeserializer { + Into::::into(self) + } + + /// The output type produced by the deserializer. + pub fn output_type(&self) -> DataType { + DataType::Metric + } + + /// The schema produced by the deserializer. + pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition { + schema::Definition::new_with_default_metadata( + Kind::object(Collection::empty()), + [log_namespace], + ) + } +} + +/// Influxdb-specific decoding options. +#[configurable_component] +#[derive(Debug, Clone, PartialEq, Eq, Derivative)] +#[derivative(Default)] +pub struct InfluxdbDeserializerOptions { + /// Determines whether or not to replace invalid UTF-8 sequences instead of failing. + /// + /// When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD]. + /// + /// [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character + #[serde( + default = "default_lossy", + skip_serializing_if = "vector_core::serde::is_default" + )] + #[derivative(Default(value = "default_lossy()"))] + pub lossy: bool, +} + +/// Deserializer for the influxdb line protocol +#[derive(Debug, Clone, Derivative)] +#[derivative(Default)] +pub struct InfluxdbDeserializer { + #[derivative(Default(value = "default_lossy()"))] + lossy: bool, +} + +impl InfluxdbDeserializer { + /// new constructs a new InfluxdbDeserializer + pub fn new(lossy: bool) -> Self { + Self { lossy } + } +} + +impl Deserializer for InfluxdbDeserializer { + fn parse( + &self, + bytes: Bytes, + _log_namespace: LogNamespace, + ) -> vector_common::Result> { + let line: Cow = match self.lossy { + true => String::from_utf8_lossy(&bytes), + false => Cow::from(std::str::from_utf8(&bytes)?), + }; + let parsed_line = influxdb_line_protocol::parse_lines(&line); + + let res = parsed_line + .collect::, _>>()? + .iter() + .flat_map(|line| { + let ParsedLine { + series, + field_set, + timestamp, + } = line; + + field_set + .iter() + .filter_map(|f| { + let measurement = series.measurement.clone(); + let tags = series.tag_set.as_ref(); + let val = match f.1 { + FieldValue::I64(v) => v as f64, + FieldValue::U64(v) => v as f64, + FieldValue::F64(v) => v, + FieldValue::Boolean(v) => { + if v { + 1.0 + } else { + 0.0 + } + } + FieldValue::String(_) => return None, // String values cannot be modelled in our schema + }; + Some(Event::Metric( + Metric::new( + format!("{0}_{1}", measurement, f.0), + MetricKind::Absolute, + MetricValue::Gauge { value: val }, + ) + .with_tags(tags.map(|ts| { + MetricTags::from_iter( + ts.iter().map(|t| (t.0.to_string(), t.1.to_string())), + ) + })) + .with_timestamp(timestamp.and_then(DateTime::from_timestamp_micros)), + )) + }) + .collect::>() + }) + .collect(); + + Ok(res) + } +} + +impl From<&InfluxdbDeserializerConfig> for InfluxdbDeserializer { + fn from(config: &InfluxdbDeserializerConfig) -> Self { + Self { + lossy: config.influxdb.lossy, + } + } +} + +#[cfg(test)] +mod tests { + use bytes::Bytes; + use chrono::DateTime; + use vector_core::{ + config::LogNamespace, + event::{Metric, MetricKind, MetricTags, MetricValue}, + }; + + use crate::decoding::format::{Deserializer, InfluxdbDeserializer}; + + #[test] + fn deserialize_success() { + let deser = InfluxdbDeserializer::new(true); + let buffer = Bytes::from( + "cpu,host=A,region=west usage_system=64i,usage_user=10i 1590488773254420000", + ); + let events = deser.parse(buffer, LogNamespace::default()).unwrap(); + assert_eq!(events.len(), 2); + + assert_eq!( + events[0].as_metric(), + &Metric::new( + "cpu_usage_system", + MetricKind::Absolute, + MetricValue::Gauge { value: 64. }, + ) + .with_tags(Some(MetricTags::from_iter([ + ("host".to_string(), "A".to_string()), + ("region".to_string(), "west".to_string()), + ]))) + .with_timestamp(DateTime::from_timestamp_micros(1590488773254420000)) + ); + assert_eq!( + events[1].as_metric(), + &Metric::new( + "cpu_usage_user", + MetricKind::Absolute, + MetricValue::Gauge { value: 10. }, + ) + .with_tags(Some(MetricTags::from_iter([ + ("host".to_string(), "A".to_string()), + ("region".to_string(), "west".to_string()), + ]))) + .with_timestamp(DateTime::from_timestamp_micros(1590488773254420000)) + ); + } + + #[test] + fn deserialize_error() { + let deser = InfluxdbDeserializer::new(true); + let buffer = Bytes::from("some invalid string"); + assert!(deser.parse(buffer, LogNamespace::default()).is_err()); + } +} diff --git a/lib/codecs/src/decoding/format/mod.rs b/lib/codecs/src/decoding/format/mod.rs index c46b87b3ca3f4..9e2dee7de1ce2 100644 --- a/lib/codecs/src/decoding/format/mod.rs +++ b/lib/codecs/src/decoding/format/mod.rs @@ -6,6 +6,7 @@ mod avro; mod bytes; mod gelf; +mod influxdb; mod json; mod native; mod native_json; @@ -18,6 +19,7 @@ use ::bytes::Bytes; pub use avro::{AvroDeserializer, AvroDeserializerConfig, AvroDeserializerOptions}; use dyn_clone::DynClone; pub use gelf::{GelfDeserializer, GelfDeserializerConfig, GelfDeserializerOptions}; +pub use influxdb::{InfluxdbDeserializer, InfluxdbDeserializerConfig}; pub use json::{JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions}; pub use native::{NativeDeserializer, NativeDeserializerConfig}; pub use native_json::{ diff --git a/lib/codecs/src/decoding/mod.rs b/lib/codecs/src/decoding/mod.rs index bfaeeb2ff126f..0f418be040b46 100644 --- a/lib/codecs/src/decoding/mod.rs +++ b/lib/codecs/src/decoding/mod.rs @@ -10,8 +10,9 @@ use bytes::{Bytes, BytesMut}; pub use error::StreamDecodingError; pub use format::{ BoxedDeserializer, BytesDeserializer, BytesDeserializerConfig, GelfDeserializer, - GelfDeserializerConfig, GelfDeserializerOptions, JsonDeserializer, JsonDeserializerConfig, - JsonDeserializerOptions, NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer, + GelfDeserializerConfig, GelfDeserializerOptions, InfluxdbDeserializer, + InfluxdbDeserializerConfig, JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions, + NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer, NativeJsonDeserializerConfig, NativeJsonDeserializerOptions, ProtobufDeserializer, ProtobufDeserializerConfig, ProtobufDeserializerOptions, }; @@ -252,6 +253,11 @@ pub enum DeserializerConfig { /// [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go Gelf(GelfDeserializerConfig), + /// Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message. + /// + /// [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol + Influxdb(InfluxdbDeserializerConfig), + /// Decodes the raw bytes as as an [Apache Avro][apache_avro] message. /// /// [apache_avro]: https://avro.apache.org/ @@ -303,6 +309,12 @@ impl From for DeserializerConfig { } } +impl From for DeserializerConfig { + fn from(config: InfluxdbDeserializerConfig) -> Self { + Self::Influxdb(config) + } +} + impl DeserializerConfig { /// Build the `Deserializer` from this configuration. pub fn build(&self) -> vector_common::Result { @@ -323,6 +335,7 @@ impl DeserializerConfig { } DeserializerConfig::NativeJson(config) => Ok(Deserializer::NativeJson(config.build())), DeserializerConfig::Gelf(config) => Ok(Deserializer::Gelf(config.build())), + DeserializerConfig::Influxdb(config) => Ok(Deserializer::Influxdb(config.build())), DeserializerConfig::Vrl(config) => Ok(Deserializer::Vrl(config.build()?)), } } @@ -334,6 +347,7 @@ impl DeserializerConfig { DeserializerConfig::Native => FramingConfig::LengthDelimited(Default::default()), DeserializerConfig::Bytes | DeserializerConfig::Json(_) + | DeserializerConfig::Influxdb(_) | DeserializerConfig::NativeJson(_) => { FramingConfig::NewlineDelimited(Default::default()) } @@ -363,6 +377,7 @@ impl DeserializerConfig { DeserializerConfig::NativeJson(config) => config.output_type(), DeserializerConfig::Gelf(config) => config.output_type(), DeserializerConfig::Vrl(config) => config.output_type(), + DeserializerConfig::Influxdb(config) => config.output_type(), } } @@ -381,6 +396,7 @@ impl DeserializerConfig { DeserializerConfig::Native => NativeDeserializerConfig.schema_definition(log_namespace), DeserializerConfig::NativeJson(config) => config.schema_definition(log_namespace), DeserializerConfig::Gelf(config) => config.schema_definition(log_namespace), + DeserializerConfig::Influxdb(config) => config.schema_definition(log_namespace), DeserializerConfig::Vrl(config) => config.schema_definition(log_namespace), } } @@ -413,6 +429,7 @@ impl DeserializerConfig { | DeserializerConfig::NativeJson(_) | DeserializerConfig::Bytes | DeserializerConfig::Gelf(_) + | DeserializerConfig::Influxdb(_) | DeserializerConfig::Vrl(_), _, ) => "text/plain", @@ -444,6 +461,8 @@ pub enum Deserializer { Boxed(BoxedDeserializer), /// Uses a `GelfDeserializer` for deserialization. Gelf(GelfDeserializer), + /// Uses a `InfluxdbDeserializer` for deserialization. + Influxdb(InfluxdbDeserializer), /// Uses a `VrlDeserializer` for deserialization. Vrl(VrlDeserializer), } @@ -465,6 +484,7 @@ impl format::Deserializer for Deserializer { Deserializer::NativeJson(deserializer) => deserializer.parse(bytes, log_namespace), Deserializer::Boxed(deserializer) => deserializer.parse(bytes, log_namespace), Deserializer::Gelf(deserializer) => deserializer.parse(bytes, log_namespace), + Deserializer::Influxdb(deserializer) => deserializer.parse(bytes, log_namespace), Deserializer::Vrl(deserializer) => deserializer.parse(bytes, log_namespace), } } diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index feb90716a899d..d184932633507 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -169,6 +169,8 @@ fn deserializer_config_to_serializer(config: &DeserializerConfig) -> encoding::S DeserializerConfig::NativeJson { .. } => SerializerConfig::NativeJson, DeserializerConfig::Gelf { .. } => SerializerConfig::Gelf, DeserializerConfig::Avro { avro } => SerializerConfig::Avro { avro: avro.into() }, + // TODO: Influxdb has no serializer yet + DeserializerConfig::Influxdb { .. } => todo!(), DeserializerConfig::Vrl { .. } => unimplemented!(), }; diff --git a/website/cue/reference/components/sources/base/amqp.cue b/website/cue/reference/components/sources/base/amqp.cue index 667b607d88044..eeba20ddf3b49 100644 --- a/website/cue/reference/components/sources/base/amqp.cue +++ b/website/cue/reference/components/sources/base/amqp.cue @@ -108,6 +108,11 @@ base: components: sources: amqp: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: """ + Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message. + + [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol + """ json: """ Decodes the raw bytes as [JSON][json]. @@ -167,6 +172,22 @@ base: components: sources: amqp: configuration: { type: bool: default: true } } + influxdb: { + description: "Influxdb-specific decoding options." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: lossy: { + description: """ + Determines whether or not to replace invalid UTF-8 sequences instead of failing. + + When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD]. + + [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character + """ + required: false + type: bool: default: true + } + } json: { description: "JSON-specific decoding options." relevant_when: "codec = \"json\"" diff --git a/website/cue/reference/components/sources/base/aws_kinesis_firehose.cue b/website/cue/reference/components/sources/base/aws_kinesis_firehose.cue index 12418c29fc811..2cba53e9f847e 100644 --- a/website/cue/reference/components/sources/base/aws_kinesis_firehose.cue +++ b/website/cue/reference/components/sources/base/aws_kinesis_firehose.cue @@ -111,6 +111,11 @@ base: components: sources: aws_kinesis_firehose: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: """ + Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message. + + [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol + """ json: """ Decodes the raw bytes as [JSON][json]. @@ -170,6 +175,22 @@ base: components: sources: aws_kinesis_firehose: configuration: { type: bool: default: true } } + influxdb: { + description: "Influxdb-specific decoding options." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: lossy: { + description: """ + Determines whether or not to replace invalid UTF-8 sequences instead of failing. + + When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD]. + + [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character + """ + required: false + type: bool: default: true + } + } json: { description: "JSON-specific decoding options." relevant_when: "codec = \"json\"" diff --git a/website/cue/reference/components/sources/base/aws_s3.cue b/website/cue/reference/components/sources/base/aws_s3.cue index 8827d8b883771..4538062a02679 100644 --- a/website/cue/reference/components/sources/base/aws_s3.cue +++ b/website/cue/reference/components/sources/base/aws_s3.cue @@ -206,6 +206,11 @@ base: components: sources: aws_s3: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: """ + Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message. + + [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol + """ json: """ Decodes the raw bytes as [JSON][json]. @@ -265,6 +270,22 @@ base: components: sources: aws_s3: configuration: { type: bool: default: true } } + influxdb: { + description: "Influxdb-specific decoding options." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: lossy: { + description: """ + Determines whether or not to replace invalid UTF-8 sequences instead of failing. + + When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD]. + + [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character + """ + required: false + type: bool: default: true + } + } json: { description: "JSON-specific decoding options." relevant_when: "codec = \"json\"" diff --git a/website/cue/reference/components/sources/base/aws_sqs.cue b/website/cue/reference/components/sources/base/aws_sqs.cue index 7573e19e67068..0d4377a71eab8 100644 --- a/website/cue/reference/components/sources/base/aws_sqs.cue +++ b/website/cue/reference/components/sources/base/aws_sqs.cue @@ -201,6 +201,11 @@ base: components: sources: aws_sqs: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: """ + Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message. + + [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol + """ json: """ Decodes the raw bytes as [JSON][json]. @@ -260,6 +265,22 @@ base: components: sources: aws_sqs: configuration: { type: bool: default: true } } + influxdb: { + description: "Influxdb-specific decoding options." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: lossy: { + description: """ + Determines whether or not to replace invalid UTF-8 sequences instead of failing. + + When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD]. + + [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character + """ + required: false + type: bool: default: true + } + } json: { description: "JSON-specific decoding options." relevant_when: "codec = \"json\"" diff --git a/website/cue/reference/components/sources/base/datadog_agent.cue b/website/cue/reference/components/sources/base/datadog_agent.cue index ba8c42768b138..352d34b09912a 100644 --- a/website/cue/reference/components/sources/base/datadog_agent.cue +++ b/website/cue/reference/components/sources/base/datadog_agent.cue @@ -93,6 +93,11 @@ base: components: sources: datadog_agent: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: """ + Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message. + + [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol + """ json: """ Decodes the raw bytes as [JSON][json]. @@ -152,6 +157,22 @@ base: components: sources: datadog_agent: configuration: { type: bool: default: true } } + influxdb: { + description: "Influxdb-specific decoding options." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: lossy: { + description: """ + Determines whether or not to replace invalid UTF-8 sequences instead of failing. + + When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD]. + + [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character + """ + required: false + type: bool: default: true + } + } json: { description: "JSON-specific decoding options." relevant_when: "codec = \"json\"" diff --git a/website/cue/reference/components/sources/base/demo_logs.cue b/website/cue/reference/components/sources/base/demo_logs.cue index 3d2a892210d8c..b24b6bb96197d 100644 --- a/website/cue/reference/components/sources/base/demo_logs.cue +++ b/website/cue/reference/components/sources/base/demo_logs.cue @@ -72,6 +72,11 @@ base: components: sources: demo_logs: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: """ + Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message. + + [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol + """ json: """ Decodes the raw bytes as [JSON][json]. @@ -131,6 +136,22 @@ base: components: sources: demo_logs: configuration: { type: bool: default: true } } + influxdb: { + description: "Influxdb-specific decoding options." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: lossy: { + description: """ + Determines whether or not to replace invalid UTF-8 sequences instead of failing. + + When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD]. + + [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character + """ + required: false + type: bool: default: true + } + } json: { description: "JSON-specific decoding options." relevant_when: "codec = \"json\"" diff --git a/website/cue/reference/components/sources/base/exec.cue b/website/cue/reference/components/sources/base/exec.cue index 0369f085bb761..ee5ac7f67b7e2 100644 --- a/website/cue/reference/components/sources/base/exec.cue +++ b/website/cue/reference/components/sources/base/exec.cue @@ -73,6 +73,11 @@ base: components: sources: exec: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: """ + Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message. + + [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol + """ json: """ Decodes the raw bytes as [JSON][json]. @@ -132,6 +137,22 @@ base: components: sources: exec: configuration: { type: bool: default: true } } + influxdb: { + description: "Influxdb-specific decoding options." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: lossy: { + description: """ + Determines whether or not to replace invalid UTF-8 sequences instead of failing. + + When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD]. + + [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character + """ + required: false + type: bool: default: true + } + } json: { description: "JSON-specific decoding options." relevant_when: "codec = \"json\"" diff --git a/website/cue/reference/components/sources/base/file_descriptor.cue b/website/cue/reference/components/sources/base/file_descriptor.cue index 63387d882f290..bdb44c7a93239 100644 --- a/website/cue/reference/components/sources/base/file_descriptor.cue +++ b/website/cue/reference/components/sources/base/file_descriptor.cue @@ -63,6 +63,11 @@ base: components: sources: file_descriptor: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: """ + Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message. + + [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol + """ json: """ Decodes the raw bytes as [JSON][json]. @@ -122,6 +127,22 @@ base: components: sources: file_descriptor: configuration: { type: bool: default: true } } + influxdb: { + description: "Influxdb-specific decoding options." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: lossy: { + description: """ + Determines whether or not to replace invalid UTF-8 sequences instead of failing. + + When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD]. + + [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character + """ + required: false + type: bool: default: true + } + } json: { description: "JSON-specific decoding options." relevant_when: "codec = \"json\"" diff --git a/website/cue/reference/components/sources/base/gcp_pubsub.cue b/website/cue/reference/components/sources/base/gcp_pubsub.cue index cd655c4c3dd0b..4cd21c96a17df 100644 --- a/website/cue/reference/components/sources/base/gcp_pubsub.cue +++ b/website/cue/reference/components/sources/base/gcp_pubsub.cue @@ -139,6 +139,11 @@ base: components: sources: gcp_pubsub: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: """ + Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message. + + [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol + """ json: """ Decodes the raw bytes as [JSON][json]. @@ -198,6 +203,22 @@ base: components: sources: gcp_pubsub: configuration: { type: bool: default: true } } + influxdb: { + description: "Influxdb-specific decoding options." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: lossy: { + description: """ + Determines whether or not to replace invalid UTF-8 sequences instead of failing. + + When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD]. + + [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character + """ + required: false + type: bool: default: true + } + } json: { description: "JSON-specific decoding options." relevant_when: "codec = \"json\"" diff --git a/website/cue/reference/components/sources/base/heroku_logs.cue b/website/cue/reference/components/sources/base/heroku_logs.cue index 33f6be7194f6a..060b03da1ca64 100644 --- a/website/cue/reference/components/sources/base/heroku_logs.cue +++ b/website/cue/reference/components/sources/base/heroku_logs.cue @@ -105,6 +105,11 @@ base: components: sources: heroku_logs: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: """ + Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message. + + [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol + """ json: """ Decodes the raw bytes as [JSON][json]. @@ -164,6 +169,22 @@ base: components: sources: heroku_logs: configuration: { type: bool: default: true } } + influxdb: { + description: "Influxdb-specific decoding options." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: lossy: { + description: """ + Determines whether or not to replace invalid UTF-8 sequences instead of failing. + + When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD]. + + [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character + """ + required: false + type: bool: default: true + } + } json: { description: "JSON-specific decoding options." relevant_when: "codec = \"json\"" diff --git a/website/cue/reference/components/sources/base/http.cue b/website/cue/reference/components/sources/base/http.cue index 73ae0efbe4ac7..6ae781d019c90 100644 --- a/website/cue/reference/components/sources/base/http.cue +++ b/website/cue/reference/components/sources/base/http.cue @@ -107,6 +107,11 @@ base: components: sources: http: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: """ + Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message. + + [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol + """ json: """ Decodes the raw bytes as [JSON][json]. @@ -165,6 +170,22 @@ base: components: sources: http: configuration: { type: bool: default: true } } + influxdb: { + description: "Influxdb-specific decoding options." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: lossy: { + description: """ + Determines whether or not to replace invalid UTF-8 sequences instead of failing. + + When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD]. + + [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character + """ + required: false + type: bool: default: true + } + } json: { description: "JSON-specific decoding options." relevant_when: "codec = \"json\"" diff --git a/website/cue/reference/components/sources/base/http_client.cue b/website/cue/reference/components/sources/base/http_client.cue index 38485fb373cd2..c2ca5a39190d1 100644 --- a/website/cue/reference/components/sources/base/http_client.cue +++ b/website/cue/reference/components/sources/base/http_client.cue @@ -105,6 +105,11 @@ base: components: sources: http_client: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: """ + Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message. + + [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol + """ json: """ Decodes the raw bytes as [JSON][json]. @@ -164,6 +169,22 @@ base: components: sources: http_client: configuration: { type: bool: default: true } } + influxdb: { + description: "Influxdb-specific decoding options." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: lossy: { + description: """ + Determines whether or not to replace invalid UTF-8 sequences instead of failing. + + When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD]. + + [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character + """ + required: false + type: bool: default: true + } + } json: { description: "JSON-specific decoding options." relevant_when: "codec = \"json\"" diff --git a/website/cue/reference/components/sources/base/http_server.cue b/website/cue/reference/components/sources/base/http_server.cue index 3837266a94199..ebb5095f87d01 100644 --- a/website/cue/reference/components/sources/base/http_server.cue +++ b/website/cue/reference/components/sources/base/http_server.cue @@ -107,6 +107,11 @@ base: components: sources: http_server: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: """ + Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message. + + [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol + """ json: """ Decodes the raw bytes as [JSON][json]. @@ -165,6 +170,22 @@ base: components: sources: http_server: configuration: { type: bool: default: true } } + influxdb: { + description: "Influxdb-specific decoding options." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: lossy: { + description: """ + Determines whether or not to replace invalid UTF-8 sequences instead of failing. + + When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD]. + + [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character + """ + required: false + type: bool: default: true + } + } json: { description: "JSON-specific decoding options." relevant_when: "codec = \"json\"" diff --git a/website/cue/reference/components/sources/base/kafka.cue b/website/cue/reference/components/sources/base/kafka.cue index f431399ba5c37..d5a7f8dbd8f77 100644 --- a/website/cue/reference/components/sources/base/kafka.cue +++ b/website/cue/reference/components/sources/base/kafka.cue @@ -117,6 +117,11 @@ base: components: sources: kafka: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: """ + Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message. + + [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol + """ json: """ Decodes the raw bytes as [JSON][json]. @@ -176,6 +181,22 @@ base: components: sources: kafka: configuration: { type: bool: default: true } } + influxdb: { + description: "Influxdb-specific decoding options." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: lossy: { + description: """ + Determines whether or not to replace invalid UTF-8 sequences instead of failing. + + When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD]. + + [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character + """ + required: false + type: bool: default: true + } + } json: { description: "JSON-specific decoding options." relevant_when: "codec = \"json\"" diff --git a/website/cue/reference/components/sources/base/nats.cue b/website/cue/reference/components/sources/base/nats.cue index 36cf215f3f55a..e7bb596422861 100644 --- a/website/cue/reference/components/sources/base/nats.cue +++ b/website/cue/reference/components/sources/base/nats.cue @@ -160,6 +160,11 @@ base: components: sources: nats: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: """ + Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message. + + [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol + """ json: """ Decodes the raw bytes as [JSON][json]. @@ -219,6 +224,22 @@ base: components: sources: nats: configuration: { type: bool: default: true } } + influxdb: { + description: "Influxdb-specific decoding options." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: lossy: { + description: """ + Determines whether or not to replace invalid UTF-8 sequences instead of failing. + + When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD]. + + [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character + """ + required: false + type: bool: default: true + } + } json: { description: "JSON-specific decoding options." relevant_when: "codec = \"json\"" diff --git a/website/cue/reference/components/sources/base/pulsar.cue b/website/cue/reference/components/sources/base/pulsar.cue index f12f205944955..7f4d00bba16a9 100644 --- a/website/cue/reference/components/sources/base/pulsar.cue +++ b/website/cue/reference/components/sources/base/pulsar.cue @@ -166,6 +166,11 @@ base: components: sources: pulsar: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: """ + Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message. + + [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol + """ json: """ Decodes the raw bytes as [JSON][json]. @@ -225,6 +230,22 @@ base: components: sources: pulsar: configuration: { type: bool: default: true } } + influxdb: { + description: "Influxdb-specific decoding options." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: lossy: { + description: """ + Determines whether or not to replace invalid UTF-8 sequences instead of failing. + + When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD]. + + [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character + """ + required: false + type: bool: default: true + } + } json: { description: "JSON-specific decoding options." relevant_when: "codec = \"json\"" diff --git a/website/cue/reference/components/sources/base/redis.cue b/website/cue/reference/components/sources/base/redis.cue index 039ba3c0fe1b1..791776717814f 100644 --- a/website/cue/reference/components/sources/base/redis.cue +++ b/website/cue/reference/components/sources/base/redis.cue @@ -78,6 +78,11 @@ base: components: sources: redis: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: """ + Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message. + + [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol + """ json: """ Decodes the raw bytes as [JSON][json]. @@ -137,6 +142,22 @@ base: components: sources: redis: configuration: { type: bool: default: true } } + influxdb: { + description: "Influxdb-specific decoding options." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: lossy: { + description: """ + Determines whether or not to replace invalid UTF-8 sequences instead of failing. + + When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD]. + + [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character + """ + required: false + type: bool: default: true + } + } json: { description: "JSON-specific decoding options." relevant_when: "codec = \"json\"" diff --git a/website/cue/reference/components/sources/base/socket.cue b/website/cue/reference/components/sources/base/socket.cue index 55cbd60be49d7..2dc27097e3373 100644 --- a/website/cue/reference/components/sources/base/socket.cue +++ b/website/cue/reference/components/sources/base/socket.cue @@ -80,6 +80,11 @@ base: components: sources: socket: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: """ + Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message. + + [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol + """ json: """ Decodes the raw bytes as [JSON][json]. @@ -139,6 +144,22 @@ base: components: sources: socket: configuration: { type: bool: default: true } } + influxdb: { + description: "Influxdb-specific decoding options." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: lossy: { + description: """ + Determines whether or not to replace invalid UTF-8 sequences instead of failing. + + When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD]. + + [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character + """ + required: false + type: bool: default: true + } + } json: { description: "JSON-specific decoding options." relevant_when: "codec = \"json\"" diff --git a/website/cue/reference/components/sources/base/stdin.cue b/website/cue/reference/components/sources/base/stdin.cue index 7e5fdefe6c44e..6a2fb7056289e 100644 --- a/website/cue/reference/components/sources/base/stdin.cue +++ b/website/cue/reference/components/sources/base/stdin.cue @@ -63,6 +63,11 @@ base: components: sources: stdin: configuration: { [gelf]: https://docs.graylog.org/docs/gelf [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go """ + influxdb: """ + Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message. + + [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol + """ json: """ Decodes the raw bytes as [JSON][json]. @@ -122,6 +127,22 @@ base: components: sources: stdin: configuration: { type: bool: default: true } } + influxdb: { + description: "Influxdb-specific decoding options." + relevant_when: "codec = \"influxdb\"" + required: false + type: object: options: lossy: { + description: """ + Determines whether or not to replace invalid UTF-8 sequences instead of failing. + + When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD]. + + [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character + """ + required: false + type: bool: default: true + } + } json: { description: "JSON-specific decoding options." relevant_when: "codec = \"json\""