From 06f45048da226c875f99d8ab797042d62feb1355 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Fri, 3 Nov 2023 17:03:03 -0400 Subject: [PATCH] fix(sources): always emit HttpBytesReceived after decompression --- src/sources/prometheus/remote_write.rs | 15 +++------------ src/sources/splunk_hec/mod.rs | 16 ++++++++-------- src/sources/util/http/prelude.rs | 11 +++++------ 3 files changed, 16 insertions(+), 26 deletions(-) diff --git a/src/sources/prometheus/remote_write.rs b/src/sources/prometheus/remote_write.rs index 523df670ec5b8..135dab1d98d65 100644 --- a/src/sources/prometheus/remote_write.rs +++ b/src/sources/prometheus/remote_write.rs @@ -17,7 +17,7 @@ use crate::{ serde::bool_or_struct, sources::{ self, - util::{decode, http::HttpMethod, ErrorMessage, HttpSource, HttpSourceAuthConfig}, + util::{http::HttpMethod, ErrorMessage, HttpSource, HttpSourceAuthConfig}, }, tls::TlsEnableableConfig, }; @@ -124,20 +124,11 @@ impl RemoteWriteSource { impl HttpSource for RemoteWriteSource { fn build_events( &self, - mut body: Bytes, - header_map: &HeaderMap, + body: Bytes, + _header_map: &HeaderMap, _query_parameters: &HashMap, _full_path: &str, ) -> Result, ErrorMessage> { - // If `Content-Encoding` header isn't `snappy` HttpSource won't decode it for us - // se we need to. - if header_map - .get("Content-Encoding") - .map(|header| header.as_ref()) - != Some(&b"snappy"[..]) - { - body = decode(&Some("snappy".to_string()), body)?; - } let events = self.decode_body(body)?; Ok(events) } diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index 745e419c888ea..592c79e1e3ef6 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -351,11 +351,6 @@ impl SplunkSource { let mut out = out.clone(); let idx_ack = idx_ack.clone(); let events_received = events_received.clone(); - emit!(HttpBytesReceived { - byte_size: body.len(), - http_path: path.as_str(), - protocol, - }); async move { if idx_ack.is_some() && channel.is_none() { @@ -363,14 +358,19 @@ impl SplunkSource { } let mut data = Vec::new(); - let body = if gzip { + let (byte_size, body) = if gzip { MultiGzDecoder::new(body.reader()) .read_to_end(&mut data) .map_err(|_| Rejection::from(ApiError::BadRequest))?; - String::from_utf8_lossy(data.as_slice()) + (data.len(), String::from_utf8_lossy(data.as_slice())) } else { - String::from_utf8_lossy(body.as_ref()) + (body.len(), String::from_utf8_lossy(body.as_ref())) }; + emit!(HttpBytesReceived { + byte_size, + http_path: path.as_str(), + protocol, + }); let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(idx_ack.is_some()); diff --git a/src/sources/util/http/prelude.rs b/src/sources/util/http/prelude.rs index 1523427f392a4..1d9cadc812950 100644 --- a/src/sources/util/http/prelude.rs +++ b/src/sources/util/http/prelude.rs @@ -130,16 +130,15 @@ pub trait HttpSource: Clone + Send + Sync + 'static { debug!(message = "Handling HTTP request.", headers = ?headers); let http_path = path.as_str(); - emit!(HttpBytesReceived { - byte_size: body.len(), - http_path, - protocol, - }); - let events = auth .is_valid(&auth_header) .and_then(|()| decode(&encoding_header, body)) .and_then(|body| { + emit!(HttpBytesReceived { + byte_size: body.len(), + http_path, + protocol, + }); self.build_events(body, &headers, &query_parameters, path.as_str()) }) .map(|mut events| {