Skip to content

Commit

Permalink
fix(sources): always emit HttpBytesReceived after decompression
Browse files Browse the repository at this point in the history
  • Loading branch information
dsmith3197 committed Nov 3, 2023
1 parent fb63f8e commit 06f4504
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 26 deletions.
15 changes: 3 additions & 12 deletions src/sources/prometheus/remote_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
serde::bool_or_struct,
sources::{
self,
util::{decode, http::HttpMethod, ErrorMessage, HttpSource, HttpSourceAuthConfig},
util::{http::HttpMethod, ErrorMessage, HttpSource, HttpSourceAuthConfig},
},
tls::TlsEnableableConfig,
};
Expand Down Expand Up @@ -124,20 +124,11 @@ impl RemoteWriteSource {
impl HttpSource for RemoteWriteSource {
fn build_events(
&self,
mut body: Bytes,
header_map: &HeaderMap,
body: Bytes,
_header_map: &HeaderMap,
_query_parameters: &HashMap<String, String>,
_full_path: &str,
) -> Result<Vec<Event>, ErrorMessage> {
// If `Content-Encoding` header isn't `snappy` HttpSource won't decode it for us
// se we need to.
if header_map
.get("Content-Encoding")
.map(|header| header.as_ref())
!= Some(&b"snappy"[..])
{
body = decode(&Some("snappy".to_string()), body)?;
}
let events = self.decode_body(body)?;
Ok(events)
}
Expand Down
16 changes: 8 additions & 8 deletions src/sources/splunk_hec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,26 +351,26 @@ impl SplunkSource {
let mut out = out.clone();
let idx_ack = idx_ack.clone();
let events_received = events_received.clone();
emit!(HttpBytesReceived {
byte_size: body.len(),
http_path: path.as_str(),
protocol,
});

async move {
if idx_ack.is_some() && channel.is_none() {
return Err(Rejection::from(ApiError::MissingChannel));
}

let mut data = Vec::new();
let body = if gzip {
let (byte_size, body) = if gzip {
MultiGzDecoder::new(body.reader())
.read_to_end(&mut data)
.map_err(|_| Rejection::from(ApiError::BadRequest))?;
String::from_utf8_lossy(data.as_slice())
(data.len(), String::from_utf8_lossy(data.as_slice()))
} else {
String::from_utf8_lossy(body.as_ref())
(body.len(), String::from_utf8_lossy(body.as_ref()))
};
emit!(HttpBytesReceived {
byte_size,
http_path: path.as_str(),
protocol,
});

let (batch, receiver) =
BatchNotifier::maybe_new_with_receiver(idx_ack.is_some());
Expand Down
11 changes: 5 additions & 6 deletions src/sources/util/http/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,15 @@ pub trait HttpSource: Clone + Send + Sync + 'static {
debug!(message = "Handling HTTP request.", headers = ?headers);
let http_path = path.as_str();

emit!(HttpBytesReceived {
byte_size: body.len(),
http_path,
protocol,
});

let events = auth
.is_valid(&auth_header)
.and_then(|()| decode(&encoding_header, body))
.and_then(|body| {
emit!(HttpBytesReceived {
byte_size: body.len(),
http_path,
protocol,
});
self.build_events(body, &headers, &query_parameters, path.as_str())
})
.map(|mut events| {
Expand Down

0 comments on commit 06f4504

Please sign in to comment.