Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(datadog_logs sink): serialize before batching for more accurate request sizing #19037

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions lib/vector-stream/src/partitioned_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ where
St: Stream<Item = Prt::Item>,
Prt: Partitioner + Unpin,
Prt::Key: Eq + Hash + Clone,
Prt::Item: ByteSizeOf,
C: BatchConfig<Prt::Item>,
F: Fn() -> C + Send,
{
Expand All @@ -230,7 +229,6 @@ where
St: Stream<Item = Prt::Item>,
Prt: Partitioner + Unpin,
Prt::Key: Eq + Hash + Clone,
Prt::Item: ByteSizeOf,
C: BatchConfig<Prt::Item>,
F: Fn() -> C + Send,
{
Expand All @@ -251,7 +249,6 @@ where
St: Stream<Item = Prt::Item>,
Prt: Partitioner + Unpin,
Prt::Key: Eq + Hash + Clone,
Prt::Item: ByteSizeOf,
KT: KeyedTimer<Prt::Key>,
C: BatchConfig<Prt::Item, Batch = B>,
F: Fn() -> C + Send,
Expand Down
255 changes: 133 additions & 122 deletions src/sinks/datadog/logs/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,26 @@ use std::{fmt::Debug, io, sync::Arc};

use bytes::Bytes;
use snafu::Snafu;
use vector_lib::codecs::{encoding::Framer, CharacterDelimitedEncoder, JsonSerializerConfig};
use vector_lib::lookup::event_path;
use vector_lib::{lookup::event_path, stream::batcher::limiter::ItemBatchSize};

use super::{config::MAX_PAYLOAD_BYTES, service::LogApiRequest};
use crate::sinks::{
prelude::*,
util::{encoding::Encoder as _, Compressor},
};
use crate::sinks::{prelude::*, util::Compressor};

#[derive(Default)]
struct EventPartitioner;

impl Partitioner for EventPartitioner {
type Item = Event;
type Item = EncodedEvent;
type Key = Option<Arc<str>>;

fn partition(&self, item: &Self::Item) -> Self::Key {
item.metadata().datadog_api_key()
item.original.metadata().datadog_api_key()
}
}

#[derive(Debug)]
pub struct LogSinkBuilder<S> {
encoding: JsonEncoding,
transformer: Transformer,
service: S,
batch_settings: BatcherSettings,
compression: Option<Compression>,
Expand All @@ -41,7 +38,7 @@ impl<S> LogSinkBuilder<S> {
protocol: String,
) -> Self {
Self {
encoding: JsonEncoding::new(transformer),
transformer,
service,
default_api_key,
batch_settings,
Expand All @@ -58,7 +55,7 @@ impl<S> LogSinkBuilder<S> {
pub fn build(self) -> LogSink<S> {
LogSink {
default_api_key: self.default_api_key,
encoding: self.encoding,
transformer: self.transformer,
service: self.service,
batch_settings: self.batch_settings,
compression: self.compression.unwrap_or_default(),
Expand All @@ -78,7 +75,7 @@ pub struct LogSink<S> {
/// The API service
service: S,
/// The encoding of payloads
encoding: JsonEncoding,
transformer: Transformer,
/// The compression technique to use when building the request body
compression: Compression,
/// Batch settings: timeout, max events, max bytes, etc.
Expand All @@ -87,60 +84,27 @@ pub struct LogSink<S> {
protocol: String,
}

/// Customized encoding specific to the Datadog Logs sink, as the logs API only accepts JSON encoded
/// log lines, and requires some specific normalization of certain event fields.
#[derive(Clone, Debug)]
pub struct JsonEncoding {
encoder: (Transformer, Encoder<Framer>),
}
fn map_event(event: &mut Event) {
let log = event.as_mut_log();
let message_path = log
.message_path()
.expect("message is required (make sure the \"message\" semantic meaning is set)")
.clone();
log.rename_key(&message_path, event_path!("message"));

impl JsonEncoding {
pub fn new(transformer: Transformer) -> Self {
Self {
encoder: (
transformer,
Encoder::<Framer>::new(
CharacterDelimitedEncoder::new(b',').into(),
JsonSerializerConfig::default().build().into(),
),
),
}
if let Some(host_path) = log.host_path().cloned().as_ref() {
log.rename_key(host_path, event_path!("hostname"));
}
}

impl crate::sinks::util::encoding::Encoder<Vec<Event>> for JsonEncoding {
fn encode_input(
&self,
mut input: Vec<Event>,
writer: &mut dyn io::Write,
) -> io::Result<(usize, GroupedCountByteSize)> {
for event in input.iter_mut() {
let log = event.as_mut_log();
let message_path = log
.message_path()
.expect("message is required (make sure the \"message\" semantic meaning is set)")
.clone();
log.rename_key(&message_path, event_path!("message"));

if let Some(host_path) = log.host_path().cloned().as_ref() {
log.rename_key(host_path, event_path!("hostname"));
}

let message_path = log
.timestamp_path()
.expect(
"timestamp is required (make sure the \"timestamp\" semantic meaning is set)",
)
.clone();
if let Some(Value::Timestamp(ts)) = log.remove(&message_path) {
log.insert(
event_path!("timestamp"),
Value::Integer(ts.timestamp_millis()),
);
}
}

self.encoder.encode_input(input, writer)
let message_path = log
.timestamp_path()
.expect("timestamp is required (make sure the \"timestamp\" semantic meaning is set)")
.clone();
if let Some(Value::Timestamp(ts)) = log.remove(&message_path) {
log.insert(
event_path!("timestamp"),
Value::Integer(ts.timestamp_millis()),
);
}
}

Expand All @@ -160,42 +124,14 @@ impl From<io::Error> for RequestBuildError {

struct LogRequestBuilder {
default_api_key: Arc<str>,
encoding: JsonEncoding,
compression: Compression,
}

impl RequestBuilder<(Option<Arc<str>>, Vec<Event>)> for LogRequestBuilder {
type Metadata = (Arc<str>, EventFinalizers);
type Events = Vec<Event>;
type Encoder = JsonEncoding;
type Payload = Bytes;
type Request = LogApiRequest;
type Error = RequestBuildError;

fn compression(&self) -> Compression {
self.compression
}

fn encoder(&self) -> &Self::Encoder {
&self.encoding
}

fn split_input(
&self,
input: (Option<Arc<str>>, Vec<Event>),
) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
let (api_key, mut events) = input;
let finalizers = events.take_finalizers();
let api_key = api_key.unwrap_or_else(|| Arc::clone(&self.default_api_key));
let builder = RequestMetadataBuilder::from_events(&events);

((api_key, finalizers), builder, events)
}

impl LogRequestBuilder {
fn encode_events(
&self,
events: Self::Events,
) -> Result<EncodeResult<Self::Payload>, Self::Error> {
events: Vec<EncodedEvent>,
) -> Result<EncodeResult<Bytes>, RequestBuildError> {
// We need to first serialize the payload separately so that we can figure out how big it is
// before compression. The Datadog Logs API has a limit on uncompressed data, so we can't
// use the default implementation of this method.
Expand All @@ -207,9 +143,16 @@ impl RequestBuilder<(Option<Arc<str>>, Vec<Event>)> for LogRequestBuilder {
// rejecting anyways, which is meh. This might be a signal that the true "right" fix is to actually switch this
// sink to incremental encoding and simply put up with suboptimal batch sizes if we need to end up splitting due
// to (un)compressed size limitations.
let mut buf = Vec::new();
let mut byte_size = telemetry().create_request_count_byte_size();
let n_events = events.len();
let (uncompressed_size, byte_size) = self.encoder().encode_input(events, &mut buf)?;
let mut payload = Vec::with_capacity(n_events);
for e in events {
byte_size += e.byte_size;
payload.push(e.encoded);
}

let buf = serde_json::to_vec(&payload).expect("serializing to memory");
let uncompressed_size = buf.len();
if uncompressed_size > MAX_PAYLOAD_BYTES {
return Err(RequestBuildError::PayloadTooBig);
}
Expand All @@ -229,24 +172,46 @@ impl RequestBuilder<(Option<Arc<str>>, Vec<Event>)> for LogRequestBuilder {
Ok(EncodeResult::uncompressed(bytes, byte_size))
}
}
}

fn build_request(
&self,
dd_metadata: Self::Metadata,
metadata: RequestMetadata,
payload: EncodeResult<Self::Payload>,
) -> Self::Request {
let (api_key, finalizers) = dd_metadata;
let uncompressed_size = payload.uncompressed_byte_size;

LogApiRequest {
api_key,
compression: self.compression,
body: payload.into_payload(),
finalizers,
uncompressed_size,
metadata,
}
struct ActualJsonSize;

impl ItemBatchSize<EncodedEvent> for ActualJsonSize {
fn size(&self, item: &EncodedEvent) -> usize {
item.encoded.get().len() + 1 // one for comma
}
}

struct EncodedEvent {
original: Event,
encoded: Box<serde_json::value::RawValue>,
byte_size: GroupedCountByteSize,
}

impl Finalizable for EncodedEvent {
fn take_finalizers(&mut self) -> EventFinalizers {
self.original.take_finalizers()
}
}

// only for RequestMetadataBuilder::from_events
impl EstimatedJsonEncodedSizeOf for EncodedEvent {
fn estimated_json_encoded_size_of(&self) -> JsonSize {
// we could use the actual json size here, but opting to stay consistent
self.original.estimated_json_encoded_size_of()
}
}

// only for RequestMetadataBuilder::from_events
impl ByteSizeOf for EncodedEvent {
fn allocated_bytes(&self) -> usize {
self.original.allocated_bytes()
}
}

impl GetEventCountTags for EncodedEvent {
fn get_tags(&self) -> TaggedEventsSent {
self.original.get_tags()
}
}

Expand All @@ -262,17 +227,63 @@ where

let partitioner = EventPartitioner;
let batch_settings = self.batch_settings;
let transformer = Arc::new(self.transformer);
let builder = Arc::new(LogRequestBuilder {
default_api_key,
compression: self.compression,
});

let input = input
.ready_chunks(1024)
.concurrent_map(default_request_builder_concurrency_limit(), move |events| {
let transformer = Arc::clone(&transformer);
Box::pin(std::future::ready(futures::stream::iter(
events.into_iter().map(move |mut event| {
let original = event.clone();

map_event(&mut event);
transformer.transform(&mut event);

let mut byte_size = telemetry().create_request_count_byte_size();
byte_size.add_event(&event, event.estimated_json_encoded_size_of());
let encoded = serde_json::value::to_raw_value(&event.as_log())
.expect("serializing to memory");

EncodedEvent {
original,
encoded,
byte_size,
}
}),
)))
})
.flatten();

let input = input.batched_partitioned(partitioner, || batch_settings.as_byte_size_config());
input
.request_builder(
default_request_builder_concurrency_limit(),
LogRequestBuilder {
default_api_key,
encoding: self.encoding,
compression: self.compression,
},
)
.batched_partitioned(partitioner, || {
batch_settings.as_item_size_config(ActualJsonSize)
})
.concurrent_map(default_request_builder_concurrency_limit(), move |input| {
let builder = Arc::clone(&builder);

Box::pin(async move {
let (api_key, mut events) = input;
let finalizers = events.take_finalizers();
let api_key = api_key.unwrap_or_else(|| Arc::clone(&builder.default_api_key));
let request_metadata_builder = RequestMetadataBuilder::from_events(&events);

let payload = builder.encode_events(events)?;

Ok::<_, RequestBuildError>(LogApiRequest {
api_key,
finalizers,
compression: builder.compression,
metadata: request_metadata_builder.build(&payload),
uncompressed_size: payload.uncompressed_byte_size,
body: payload.into_payload(),
})
})
})
.filter_map(|request| async move {
match request {
Err(error) => {
Expand Down
Loading
Loading