diff --git a/lib/vector-stream/src/partitioned_batcher.rs b/lib/vector-stream/src/partitioned_batcher.rs index b865216c1f764..66df4e531c269 100644 --- a/lib/vector-stream/src/partitioned_batcher.rs +++ b/lib/vector-stream/src/partitioned_batcher.rs @@ -207,7 +207,6 @@ where St: Stream, Prt: Partitioner + Unpin, Prt::Key: Eq + Hash + Clone, - Prt::Item: ByteSizeOf, C: BatchConfig, F: Fn() -> C + Send, { @@ -230,7 +229,6 @@ where St: Stream, Prt: Partitioner + Unpin, Prt::Key: Eq + Hash + Clone, - Prt::Item: ByteSizeOf, C: BatchConfig, F: Fn() -> C + Send, { @@ -251,7 +249,6 @@ where St: Stream, Prt: Partitioner + Unpin, Prt::Key: Eq + Hash + Clone, - Prt::Item: ByteSizeOf, KT: KeyedTimer, C: BatchConfig, F: Fn() -> C + Send, diff --git a/src/sinks/datadog/logs/sink.rs b/src/sinks/datadog/logs/sink.rs index 0e73e8495e7e9..a277681afbcea 100644 --- a/src/sinks/datadog/logs/sink.rs +++ b/src/sinks/datadog/logs/sink.rs @@ -2,29 +2,26 @@ use std::{fmt::Debug, io, sync::Arc}; use bytes::Bytes; use snafu::Snafu; -use vector_lib::codecs::{encoding::Framer, CharacterDelimitedEncoder, JsonSerializerConfig}; -use vector_lib::lookup::event_path; +use vector_lib::{lookup::event_path, stream::batcher::limiter::ItemBatchSize}; use super::{config::MAX_PAYLOAD_BYTES, service::LogApiRequest}; -use crate::sinks::{ - prelude::*, - util::{encoding::Encoder as _, Compressor}, -}; +use crate::sinks::{prelude::*, util::Compressor}; + #[derive(Default)] struct EventPartitioner; impl Partitioner for EventPartitioner { - type Item = Event; + type Item = EncodedEvent; type Key = Option>; fn partition(&self, item: &Self::Item) -> Self::Key { - item.metadata().datadog_api_key() + item.original.metadata().datadog_api_key() } } #[derive(Debug)] pub struct LogSinkBuilder { - encoding: JsonEncoding, + transformer: Transformer, service: S, batch_settings: BatcherSettings, compression: Option, @@ -41,7 +38,7 @@ impl LogSinkBuilder { protocol: String, ) -> Self { Self { - encoding: JsonEncoding::new(transformer), + transformer, service, default_api_key, batch_settings, @@ -58,7 +55,7 @@ impl LogSinkBuilder { pub fn build(self) -> LogSink { LogSink { default_api_key: self.default_api_key, - encoding: self.encoding, + transformer: self.transformer, service: self.service, batch_settings: self.batch_settings, compression: self.compression.unwrap_or_default(), @@ -78,7 +75,7 @@ pub struct LogSink { /// The API service service: S, /// The encoding of payloads - encoding: JsonEncoding, + transformer: Transformer, /// The compression technique to use when building the request body compression: Compression, /// Batch settings: timeout, max events, max bytes, etc. @@ -87,60 +84,27 @@ pub struct LogSink { protocol: String, } -/// Customized encoding specific to the Datadog Logs sink, as the logs API only accepts JSON encoded -/// log lines, and requires some specific normalization of certain event fields. -#[derive(Clone, Debug)] -pub struct JsonEncoding { - encoder: (Transformer, Encoder), -} +fn map_event(event: &mut Event) { + let log = event.as_mut_log(); + let message_path = log + .message_path() + .expect("message is required (make sure the \"message\" semantic meaning is set)") + .clone(); + log.rename_key(&message_path, event_path!("message")); -impl JsonEncoding { - pub fn new(transformer: Transformer) -> Self { - Self { - encoder: ( - transformer, - Encoder::::new( - CharacterDelimitedEncoder::new(b',').into(), - JsonSerializerConfig::default().build().into(), - ), - ), - } + if let Some(host_path) = log.host_path().cloned().as_ref() { + log.rename_key(host_path, event_path!("hostname")); } -} -impl crate::sinks::util::encoding::Encoder> for JsonEncoding { - fn encode_input( - &self, - mut input: Vec, - writer: &mut dyn io::Write, - ) -> io::Result<(usize, GroupedCountByteSize)> { - for event in input.iter_mut() { - let log = event.as_mut_log(); - let message_path = log - .message_path() - .expect("message is required (make sure the \"message\" semantic meaning is set)") - .clone(); - log.rename_key(&message_path, event_path!("message")); - - if let Some(host_path) = log.host_path().cloned().as_ref() { - log.rename_key(host_path, event_path!("hostname")); - } - - let message_path = log - .timestamp_path() - .expect( - "timestamp is required (make sure the \"timestamp\" semantic meaning is set)", - ) - .clone(); - if let Some(Value::Timestamp(ts)) = log.remove(&message_path) { - log.insert( - event_path!("timestamp"), - Value::Integer(ts.timestamp_millis()), - ); - } - } - - self.encoder.encode_input(input, writer) + let message_path = log + .timestamp_path() + .expect("timestamp is required (make sure the \"timestamp\" semantic meaning is set)") + .clone(); + if let Some(Value::Timestamp(ts)) = log.remove(&message_path) { + log.insert( + event_path!("timestamp"), + Value::Integer(ts.timestamp_millis()), + ); } } @@ -160,42 +124,14 @@ impl From for RequestBuildError { struct LogRequestBuilder { default_api_key: Arc, - encoding: JsonEncoding, compression: Compression, } -impl RequestBuilder<(Option>, Vec)> for LogRequestBuilder { - type Metadata = (Arc, EventFinalizers); - type Events = Vec; - type Encoder = JsonEncoding; - type Payload = Bytes; - type Request = LogApiRequest; - type Error = RequestBuildError; - - fn compression(&self) -> Compression { - self.compression - } - - fn encoder(&self) -> &Self::Encoder { - &self.encoding - } - - fn split_input( - &self, - input: (Option>, Vec), - ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { - let (api_key, mut events) = input; - let finalizers = events.take_finalizers(); - let api_key = api_key.unwrap_or_else(|| Arc::clone(&self.default_api_key)); - let builder = RequestMetadataBuilder::from_events(&events); - - ((api_key, finalizers), builder, events) - } - +impl LogRequestBuilder { fn encode_events( &self, - events: Self::Events, - ) -> Result, Self::Error> { + events: Vec, + ) -> Result, RequestBuildError> { // We need to first serialize the payload separately so that we can figure out how big it is // before compression. The Datadog Logs API has a limit on uncompressed data, so we can't // use the default implementation of this method. @@ -207,9 +143,16 @@ impl RequestBuilder<(Option>, Vec)> for LogRequestBuilder { // rejecting anyways, which is meh. This might be a signal that the true "right" fix is to actually switch this // sink to incremental encoding and simply put up with suboptimal batch sizes if we need to end up splitting due // to (un)compressed size limitations. - let mut buf = Vec::new(); + let mut byte_size = telemetry().create_request_count_byte_size(); let n_events = events.len(); - let (uncompressed_size, byte_size) = self.encoder().encode_input(events, &mut buf)?; + let mut payload = Vec::with_capacity(n_events); + for e in events { + byte_size += e.byte_size; + payload.push(e.encoded); + } + + let buf = serde_json::to_vec(&payload).expect("serializing to memory"); + let uncompressed_size = buf.len(); if uncompressed_size > MAX_PAYLOAD_BYTES { return Err(RequestBuildError::PayloadTooBig); } @@ -229,24 +172,46 @@ impl RequestBuilder<(Option>, Vec)> for LogRequestBuilder { Ok(EncodeResult::uncompressed(bytes, byte_size)) } } +} - fn build_request( - &self, - dd_metadata: Self::Metadata, - metadata: RequestMetadata, - payload: EncodeResult, - ) -> Self::Request { - let (api_key, finalizers) = dd_metadata; - let uncompressed_size = payload.uncompressed_byte_size; - - LogApiRequest { - api_key, - compression: self.compression, - body: payload.into_payload(), - finalizers, - uncompressed_size, - metadata, - } +struct ActualJsonSize; + +impl ItemBatchSize for ActualJsonSize { + fn size(&self, item: &EncodedEvent) -> usize { + item.encoded.get().len() + 1 // one for comma + } +} + +struct EncodedEvent { + original: Event, + encoded: Box, + byte_size: GroupedCountByteSize, +} + +impl Finalizable for EncodedEvent { + fn take_finalizers(&mut self) -> EventFinalizers { + self.original.take_finalizers() + } +} + +// only for RequestMetadataBuilder::from_events +impl EstimatedJsonEncodedSizeOf for EncodedEvent { + fn estimated_json_encoded_size_of(&self) -> JsonSize { + // we could use the actual json size here, but opting to stay consistent + self.original.estimated_json_encoded_size_of() + } +} + +// only for RequestMetadataBuilder::from_events +impl ByteSizeOf for EncodedEvent { + fn allocated_bytes(&self) -> usize { + self.original.allocated_bytes() + } +} + +impl GetEventCountTags for EncodedEvent { + fn get_tags(&self) -> TaggedEventsSent { + self.original.get_tags() } } @@ -262,17 +227,63 @@ where let partitioner = EventPartitioner; let batch_settings = self.batch_settings; + let transformer = Arc::new(self.transformer); + let builder = Arc::new(LogRequestBuilder { + default_api_key, + compression: self.compression, + }); + + let input = input + .ready_chunks(1024) + .concurrent_map(default_request_builder_concurrency_limit(), move |events| { + let transformer = Arc::clone(&transformer); + Box::pin(std::future::ready(futures::stream::iter( + events.into_iter().map(move |mut event| { + let original = event.clone(); + + map_event(&mut event); + transformer.transform(&mut event); + + let mut byte_size = telemetry().create_request_count_byte_size(); + byte_size.add_event(&event, event.estimated_json_encoded_size_of()); + let encoded = serde_json::value::to_raw_value(&event.as_log()) + .expect("serializing to memory"); + + EncodedEvent { + original, + encoded, + byte_size, + } + }), + ))) + }) + .flatten(); - let input = input.batched_partitioned(partitioner, || batch_settings.as_byte_size_config()); input - .request_builder( - default_request_builder_concurrency_limit(), - LogRequestBuilder { - default_api_key, - encoding: self.encoding, - compression: self.compression, - }, - ) + .batched_partitioned(partitioner, || { + batch_settings.as_item_size_config(ActualJsonSize) + }) + .concurrent_map(default_request_builder_concurrency_limit(), move |input| { + let builder = Arc::clone(&builder); + + Box::pin(async move { + let (api_key, mut events) = input; + let finalizers = events.take_finalizers(); + let api_key = api_key.unwrap_or_else(|| Arc::clone(&builder.default_api_key)); + let request_metadata_builder = RequestMetadataBuilder::from_events(&events); + + let payload = builder.encode_events(events)?; + + Ok::<_, RequestBuildError>(LogApiRequest { + api_key, + finalizers, + compression: builder.compression, + metadata: request_metadata_builder.build(&payload), + uncompressed_size: payload.uncompressed_byte_size, + body: payload.into_payload(), + }) + }) + }) .filter_map(|request| async move { match request { Err(error) => { diff --git a/src/sinks/datadog/logs/tests.rs b/src/sinks/datadog/logs/tests.rs index eedabc7493224..4feda5cce87e1 100644 --- a/src/sinks/datadog/logs/tests.rs +++ b/src/sinks/datadog/logs/tests.rs @@ -533,3 +533,51 @@ async fn error_is_retriable() { // but are not straightforward to instantiate due to the design of // the crates they originate from. } + +#[tokio::test] +async fn does_not_send_too_big_payloads() { + crate::test_util::trace_init(); + + let (mut config, cx) = load_sink::(indoc! {r#" + default_api_key = "atoken" + compression = "none" + "#}) + .unwrap(); + + let addr = next_addr(); + let endpoint = format!("http://{}", addr); + config.dd_common.endpoint = Some(endpoint.clone()); + + let (sink, _) = config.build(cx).await.unwrap(); + + let (mut rx, _trigger, server) = test_server(addr, ApiStatus::OKv2); + tokio::spawn(server); + + // Generate input that will require escaping when serialized to json, and therefore grow in size + // between batching and encoding. This is a very specific example that will fit in a batch of + // <4,250,000 but serialize to >5,000,000, defeating the current 750k safety buffer. + let events = (0..1000).map(|_n| { + let data = serde_json::json!({"a": "b"}); + let nested = serde_json::to_string(&data).unwrap(); + event_with_api_key(&nested.repeat(401), "foo") + }); + + sink.run_events(events).await.unwrap(); + + let mut sizes = Vec::new(); + loop { + tokio::select! { + Some((_parts, body)) = rx.next() => { + sizes.push(body.len()); + } + _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => { + break; + } + } + } + + assert!(!sizes.is_empty()); + for size in sizes { + assert!(size < 5_000_000, "{} not less than max", size); + } +} diff --git a/src/sinks/util/builder.rs b/src/sinks/util/builder.rs index bdf98a21df1f4..6d3fa2471ccf8 100644 --- a/src/sinks/util/builder.rs +++ b/src/sinks/util/builder.rs @@ -19,7 +19,6 @@ use vector_lib::stream::{ use vector_lib::{ event::{Finalizable, Metric}, partition::Partitioner, - ByteSizeOf, }; use super::{ @@ -53,7 +52,6 @@ pub trait SinkBuilderExt: Stream { Self: Stream + Sized, P: Partitioner + Unpin, P::Key: Eq + Hash + Clone, - P::Item: ByteSizeOf, C: BatchConfig, F: Fn() -> C + Send, {