Skip to content

Commit

Permalink
try adding some concurrency back
Browse files Browse the repository at this point in the history
Signed-off-by: Luke Steensen <[email protected]>
  • Loading branch information
lukesteensen committed Nov 6, 2023
1 parent 131822f commit dd9a035
Showing 1 changed file with 26 additions and 17 deletions.
43 changes: 26 additions & 17 deletions src/sinks/datadog/logs/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,28 +227,37 @@ where

let partitioner = EventPartitioner;
let batch_settings = self.batch_settings;
let transformer = self.transformer;
let transformer = Arc::new(self.transformer);
let builder = Arc::new(LogRequestBuilder {
default_api_key,
compression: self.compression,
});

let input = input.map(|mut event| {
let original = event.clone();

map_event(&mut event);
transformer.transform(&mut event);

let mut byte_size = telemetry().create_request_count_byte_size();
byte_size.add_event(&event, event.estimated_json_encoded_size_of());
let encoded = serde_json::value::to_raw_value(&event.as_log()).expect("serializing to memory");

EncodedEvent {
original,
encoded,
byte_size,
}
});
let input = input
.ready_chunks(1024)
.concurrent_map(default_request_builder_concurrency_limit(), move |events| {
let transformer = Arc::clone(&transformer);
Box::pin(std::future::ready(futures::stream::iter(
events.into_iter().map(move |mut event| {
let original = event.clone();

map_event(&mut event);
transformer.transform(&mut event);

let mut byte_size = telemetry().create_request_count_byte_size();
byte_size.add_event(&event, event.estimated_json_encoded_size_of());
let encoded = serde_json::value::to_raw_value(&event.as_log())
.expect("serializing to memory");

EncodedEvent {
original,
encoded,
byte_size,
}
}),
)))
})
.flatten();

input
.batched_partitioned(partitioner, || {
Expand Down

0 comments on commit dd9a035

Please sign in to comment.