From fcebcd329ec8829e7cedd4cd8bb42bd7cfa5ff24 Mon Sep 17 00:00:00 2001 From: Jesse Szwedko Date: Fri, 16 Aug 2024 15:24:42 -0700 Subject: [PATCH 1/2] fix(codecs): Ensure that batches using newline delimited framing end in a newline Fixes: #21086 Signed-off-by: Jesse Szwedko --- changelog.d/batch-newline.fix.md | 1 + src/codecs/encoding/encoder.rs | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+) create mode 100644 changelog.d/batch-newline.fix.md diff --git a/changelog.d/batch-newline.fix.md b/changelog.d/batch-newline.fix.md new file mode 100644 index 0000000000000..a3586dfd1807b --- /dev/null +++ b/changelog.d/batch-newline.fix.md @@ -0,0 +1 @@ +Batches encoded using newline delimited framing now end with a trailing newline. diff --git a/src/codecs/encoding/encoder.rs b/src/codecs/encoding/encoder.rs index 4a5c3a62869ff..566f68c550783 100644 --- a/src/codecs/encoding/encoder.rs +++ b/src/codecs/encoding/encoder.rs @@ -99,6 +99,7 @@ impl Encoder { Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }), Serializer::Json(_) | Serializer::NativeJson(_), ) => b"]", + (Framer::NewlineDelimited(_), _) => b"\n", _ => &[], } } @@ -322,4 +323,23 @@ mod tests { let sink = framed.into_inner(); assert_eq!(sink, b"(foo)(bar)"); } + + #[tokio::test] + async fn test_encode_batch_newline() { + let encoder = Encoder::::new( + Framer::NewlineDelimited(NewlineDelimitedEncoder::default()), + TextSerializerConfig::default().build().into(), + ); + let source = futures::stream::iter(vec![ + Event::Log(LogEvent::from("bar")), + Event::Log(LogEvent::from("baz")), + Event::Log(LogEvent::from("bat")), + ]) + .map(Ok); + let sink: Vec = Vec::new(); + let mut framed = FramedWrite::new(sink, encoder); + source.forward(&mut framed).await.unwrap(); + let sink = framed.into_inner(); + assert_eq!(sink, b"bar\nbaz\nbat\n"); + } } From a9f867c3efeb9f02fa98d5867c94a762b30fb743 Mon Sep 17 00:00:00 2001 From: Jesse Szwedko Date: Mon, 19 Aug 2024 15:00:44 -0700 Subject: [PATCH 2/2] Update tests and handle empty batches Signed-off-by: Jesse Szwedko --- src/codecs/encoding/encoder.rs | 7 ++++--- src/sinks/util/encoding.rs | 11 ++++++----- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/codecs/encoding/encoder.rs b/src/codecs/encoding/encoder.rs index 566f68c550783..61c0b4cc3de03 100644 --- a/src/codecs/encoding/encoder.rs +++ b/src/codecs/encoding/encoder.rs @@ -93,13 +93,14 @@ impl Encoder { } /// Get the suffix that encloses a batch of events. - pub const fn batch_suffix(&self) -> &[u8] { - match (&self.framer, &self.serializer) { + pub const fn batch_suffix(&self, empty: bool) -> &[u8] { + match (&self.framer, &self.serializer, empty) { ( Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }), Serializer::Json(_) | Serializer::NativeJson(_), + _, ) => b"]", - (Framer::NewlineDelimited(_), _) => b"\n", + (Framer::NewlineDelimited(_), _, false) => b"\n", _ => &[], } } diff --git a/src/sinks/util/encoding.rs b/src/sinks/util/encoding.rs index 154ca256e0a08..52e34159a38a1 100644 --- a/src/sinks/util/encoding.rs +++ b/src/sinks/util/encoding.rs @@ -31,6 +31,7 @@ impl Encoder> for (Transformer, crate::codecs::Encoder) { let mut encoder = self.1.clone(); let mut bytes_written = 0; let mut n_events_pending = events.len(); + let is_empty = events.is_empty(); let batch_prefix = encoder.batch_prefix(); write_all(writer, n_events_pending, batch_prefix)?; bytes_written += batch_prefix.len(); @@ -62,7 +63,7 @@ impl Encoder> for (Transformer, crate::codecs::Encoder) { n_events_pending -= 1; } - let batch_suffix = encoder.batch_suffix(); + let batch_suffix = encoder.batch_suffix(is_empty); assert!(n_events_pending == 0); write_all(writer, 0, batch_suffix)?; bytes_written += batch_suffix.len(); @@ -289,9 +290,9 @@ mod tests { .sum::(); let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap(); - assert_eq!(written, 15); + assert_eq!(written, 16); - assert_eq!(String::from_utf8(writer).unwrap(), r#"{"key":"value"}"#); + assert_eq!(String::from_utf8(writer).unwrap(), "{\"key\":\"value\"}\n"); assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap()); } @@ -326,11 +327,11 @@ mod tests { .sum::(); let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap(); - assert_eq!(written, 50); + assert_eq!(written, 51); assert_eq!( String::from_utf8(writer).unwrap(), - "{\"key\":\"value1\"}\n{\"key\":\"value2\"}\n{\"key\":\"value3\"}" + "{\"key\":\"value1\"}\n{\"key\":\"value2\"}\n{\"key\":\"value3\"}\n" ); assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap()); }