diff --git a/crates/dekaf/src/read.rs b/crates/dekaf/src/read.rs index fa031699d9..b1f2dedec7 100644 --- a/crates/dekaf/src/read.rs +++ b/crates/dekaf/src/read.rs @@ -95,10 +95,10 @@ impl Read { ReadJsonLine::Doc { root, next_offset } => (root, next_offset), }; let Some(doc::ArchivedNode::String(uuid)) = self.uuid_ptr.query(root.get()) else { - anyhow::bail!(gazette::Error::Parsing( - self.offset, - std::io::Error::other("document does not have a valid UUID"), - )); + anyhow::bail!( + "document at offset {} does not have a valid UUID", + self.offset + ); }; let (producer, clock, flags) = gazette::uuid::parse_str(uuid.as_str())?; diff --git a/crates/gazette/src/journal/read_json_lines.rs b/crates/gazette/src/journal/read_json_lines.rs index 8f4302e3d6..84f63fd9c7 100644 --- a/crates/gazette/src/journal/read_json_lines.rs +++ b/crates/gazette/src/journal/read_json_lines.rs @@ -77,6 +77,17 @@ impl futures::Stream for ReadJsonLines { return Poll::Ready(Some(Ok(ReadJsonLine::Doc { root, next_offset }))); } + match me.parser.transcode_many(Default::default()) { + Ok(out) if !out.is_empty() => { + *me.parsed = out.into_iter(); + continue; + } + Err((err, location)) => { + return Poll::Ready(Some(Err(Error::Parsing { location, err }))) + } + Ok(_out) => {} // Requires more chunks. + } + // Poll the inner stream for the next item match me.inner.poll_next_unpin(cx) { Poll::Ready(Some(response)) => { @@ -90,11 +101,12 @@ impl futures::Stream for ReadJsonLines { return Poll::Ready(Some(Ok(ReadJsonLine::Meta(response)))); } - *me.parsed = me - .parser - .transcode_chunk(&response.content, response.offset, Default::default()) - .map_err(|err| Error::Parsing(response.offset, err))? - .into_iter(); + me.parser + .chunk(&response.content, response.offset) + .map_err(|err| Error::Parsing { + location: response.offset..response.offset, + err, + })?; } std::task::Poll::Ready(None) => return std::task::Poll::Ready(None), std::task::Poll::Pending => return std::task::Poll::Pending, diff --git a/crates/gazette/src/lib.rs b/crates/gazette/src/lib.rs index a1acae4958..ca8e98d08e 100644 --- a/crates/gazette/src/lib.rs +++ b/crates/gazette/src/lib.rs @@ -27,8 +27,12 @@ pub enum Error { BrokerStatus(broker::Status), #[error("unexpected consumer status: {0:?}")] ConsumerStatus(consumer::Status), - #[error("failed to parse document near journal offset {0}")] - Parsing(i64, #[source] std::io::Error), + #[error("failed to parse document at journal offset range {location:?}")] + Parsing { + location: std::ops::Range, + #[source] + err: std::io::Error, + }, #[error("{0}")] Protocol(&'static str), #[error(transparent)] diff --git a/crates/simd-doc/src/lib.rs b/crates/simd-doc/src/lib.rs index c7da7af8eb..b4446ab84a 100644 --- a/crates/simd-doc/src/lib.rs +++ b/crates/simd-doc/src/lib.rs @@ -20,22 +20,32 @@ mod tests; /// For large documents (greater than one megabyte) it falls back to serde_json /// for parsing. pub struct Parser { - buf: Vec, ffi: cxx::UniquePtr, + // Complete, newline-separate documents which are ready to parse. + // This buffer always ends with a newline or is empty. + whole: Vec, + // Partial document for which we're awaiting a newline. + // This buffer never contains any newlines. + partial: Vec, + // Offset of the first byte of `whole` or `partial` within the external stream. offset: i64, + // Interior buffer used to hold parsed HeapNodes. + // It's allocated but always empty between calls (drained upon parse() return). parsed: Vec<(doc::HeapNode<'static>, i64)>, } impl Parser { + /// Return a new, empty Parser. pub fn new() -> Self { Self { - buf: Vec::new(), // We must choose what the maximum capacity (and document size) of the // parser will be. This value shouldn't be too large, or it negatively // impacts parser performance. According to the simdjson docs, 1MB is // something of a sweet spot. Inputs larger than this capacity will // trigger the fallback handler. ffi: ffi::new_parser(1_000_000), + whole: Vec::new(), + partial: Vec::new(), offset: 0, parsed: Vec::new(), } @@ -44,10 +54,10 @@ impl Parser { /// Parse a JSON document, which may have arbitrary whitespace, /// from `input` and return its doc::HeapNode representation. /// - /// parse_one() cannot be called after a call to parse_chunk() - /// or transcode_chunk() which retained a partial line remainder. - /// Generally, a Parser should be used for working with single - /// documents or working chunks of documents, but not both. + /// parse_one() cannot be called unless the Parser is completely empty, + /// with no internal remainder from prior calls to chunk(), parse(), + /// and transcode(). Generally, a Parser should be used for working with + /// single documents or working chunks of documents, but not both. pub fn parse_one<'s, 'a>( &'s mut self, input: &[u8], @@ -57,154 +67,178 @@ impl Parser { let alloc: &'static doc::Allocator = unsafe { std::mem::transmute(alloc) }; assert!( - self.buf.is_empty(), - "internal buffer is non-empty (incorrect mixed use of parse_one() with parse() or transcode())" + self.whole.is_empty(), + "internal buffer is non-empty (incorrect mixed use of parse_one() with chunk())" ); - self.buf.extend_from_slice(input); + let mut buf = std::mem::take(&mut self.whole); + buf.extend_from_slice(input); - if let Err(err) = parse_simd( - &mut self.buf, - self.offset, - alloc, - &mut self.parsed, - &mut self.ffi, - ) { + if let Err(err) = parse_simd(&mut buf, 0, alloc, &mut self.parsed, &mut self.ffi) { self.parsed.clear(); // Clear a partial simd parsing. - tracing::debug!(%err, "simdjson JSON parsing failed; using fallback"); - () = parse_fallback(&mut self.buf, self.offset, alloc, &mut self.parsed)?; - } + tracing::debug!(%err, "simdjson JSON parse-one failed; using fallback"); - if self.parsed.len() != 1 { - let len = self.parsed.len(); - self.parsed.clear(); + let mut de = serde_json::Deserializer::from_slice(&buf); + let node = doc::HeapNode::from_serde(&mut de, &alloc)?; + self.parsed.push((node, 0)); + } + let mut parsed = self.parsed.drain(..); + if parsed.len() != 1 { return Err(std::io::Error::new( std::io::ErrorKind::InvalidData, - format!("expected one document, but parsed {len}"), + format!("expected one document, but parsed {}", parsed.len()), )); } - self.buf.clear(); + // Re-use allocated capacity. + buf.clear(); + self.whole = buf; - Ok(self.parsed.pop().unwrap().0) + Ok(parsed.next().unwrap().0) } - /// Parse newline-delimited JSON documents of `chunk` into equivalent - /// doc::HeapNode representations. `offset` is the offset of the first - /// `chunk` byte within the context of its source stream. + /// Supply Parser with the next chunk of newline-delimited JSON document content. /// - /// `chunk` may end with a partial document, in which case the partial - /// document is held back and is expected to be continued by the `chunk` - /// of a following call to `parse_chunk`. + /// `chunk_offset` is the offset of the first `chunk` byte within the + /// context of its external source stream. /// - /// `parse_chunk` returns the begin offset of the document sequence, - /// and an iterator of a parsed document and the input offset of its - /// *following* document. The caller can use the returned begin offset - /// and iterator offsets to compute the [begin, end) offset extents - /// of each parsed document. - pub fn parse_chunk<'s, 'a>( - &'s mut self, - chunk: &[u8], - offset: i64, - alloc: &'a doc::Allocator, - ) -> Result<(i64, std::vec::Drain<'s, (doc::HeapNode<'a>, i64)>), std::io::Error> { - // Safety: we'll transmute back to lifetime 'a prior to return. - let alloc: &'static doc::Allocator = unsafe { std::mem::transmute(alloc) }; + /// `chunk` may end with a partial document, or only contain part of a + /// single document, in which case the partial document is expected to + /// be continued by a following call to chunk(). + pub fn chunk(&mut self, chunk: &[u8], chunk_offset: i64) -> Result<(), std::io::Error> { + let enqueued = self.whole.len() + self.partial.len(); + + if enqueued == 0 { + self.offset = chunk_offset; // We're empty. Allow the offset to jump. + } else if chunk_offset != self.offset + enqueued as i64 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "parser has {enqueued} bytes of document prefix starting at offset {}, but got {}-byte chunk at unexpected input offset {chunk_offset}", + self.offset, chunk.len(), + ), + )); + } - let Some(last_newline) = self.prepare_chunk(chunk, offset)? else { - return Ok((self.offset, self.parsed.drain(..))); // Nothing to parse yet. drain(..) is empty. + let Some(last_newline) = memchr::memrchr(b'\n', &chunk) else { + // If `chunk` doesn't contain a newline, it cannot complete a document. + self.partial.extend_from_slice(chunk); + return Ok(()); }; - if let Err(err) = parse_simd( - &mut self.buf, - self.offset, - alloc, - &mut self.parsed, - &mut self.ffi, - ) { - self.parsed.clear(); // Clear a partial simd parsing. - tracing::debug!(%err, "simdjson JSON parsing failed; using fallback"); - () = parse_fallback(&mut self.buf, self.offset, alloc, &mut self.parsed)?; - } - let begin = self.offset; - self.offset += self.buf.len() as i64; - self.buf.clear(); - self.buf.extend_from_slice(&chunk[last_newline + 1..]); + if self.whole.is_empty() { + std::mem::swap(&mut self.whole, &mut self.partial); + self.whole.extend_from_slice(&chunk[..last_newline + 1]); + self.partial.extend_from_slice(&chunk[last_newline + 1..]); + } else { + self.whole.extend_from_slice(&self.partial); + self.whole.extend_from_slice(&chunk[..last_newline + 1]); - Ok((begin, self.parsed.drain(..))) + self.partial.clear(); + self.partial.extend_from_slice(&chunk[last_newline + 1..]); + } + + Ok(()) } - /// Transcode newline-delimited JSON documents of `chunk` into equivalent - /// doc::ArchivedNode representations. `offset` is the offset of the first - /// `chunk` byte within the context of its source stream, and is mapped into - /// enumerated offsets of each transcoded output document. - /// - /// `chunk` may end with a partial document, in which case the partial - /// document is held back and is expected to be continued by the `chunk` - /// of a following call to `transcode()`. + /// Transcode newline-delimited JSON documents into equivalent + /// doc::ArchivedNode representations. `pre_allocated` is a potentially + /// pre-allocated buffer which is cleared and used within the returned + /// Transcoded instance. /// - /// `pre_allocated` is a potentially pre-allocated buffer which is cleared - /// and used within the returned Transcoded instance. - pub fn transcode_chunk( + /// transcode() may return fewer documents than are available if an error + /// is encountered in the input. Callers should repeatedly poll transcode() + /// until it returns an empty Ok(Transcoded) in order to consume all + /// documents and errors. + pub fn transcode_many( &mut self, - chunk: &[u8], - offset: i64, pre_allocated: rkyv::AlignedVec, - ) -> Result { - let last_newline = self.prepare_chunk(chunk, offset)?; - + ) -> Result)> { let mut output = Transcoded { v: pre_allocated, - offset: self.offset, // Note self.offset is updated by prepare_chunk(). + offset: self.offset, }; output.v.clear(); - let Some(last_newline) = last_newline else { - return Ok(output); // Nothing to parse yet. `output` is empty. - }; - if let Err(err) = transcode_simd(&mut self.buf, &mut output, &mut self.ffi) { - output.v.clear(); // Clear a partial simd transcoding. - tracing::debug!(%err, "simdjson JSON parsing failed; using fallback"); - output.v = transcode_fallback(&mut self.buf, std::mem::take(&mut output.v))?; + if self.whole.is_empty() { + return Ok(output); } - self.offset += self.buf.len() as i64; - self.buf.clear(); - self.buf.extend_from_slice(&chunk[last_newline + 1..]); + let (consumed, maybe_err) = + match transcode_simd(&mut self.whole, &mut output, &mut self.ffi) { + Err(exception) => { + output.v.clear(); // Clear a partial simd transcoding. + tracing::debug!(%exception, "simdjson JSON transcoding failed; using fallback"); + + let (consumed, v, maybe_err) = + transcode_fallback(&self.whole, self.offset, std::mem::take(&mut output.v)); + output.v = v; + + (consumed, maybe_err) + } + Ok(()) => (self.whole.len(), None), + }; + + self.offset += consumed as i64; + self.whole.drain(..consumed); + if let Some(err) = maybe_err { + return Err(err); + } Ok(output) } - #[inline] - fn prepare_chunk( - &mut self, - input: &[u8], - offset: i64, - ) -> Result, std::io::Error> { - if self.buf.is_empty() { - self.offset = offset; - } else if self.offset + self.buf.len() as i64 != offset { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!( - "parser has {} bytes of document prefix at offset {}, but got unexpected input offset {offset}", - self.buf.len(), self.offset - ), - )); + /// Parse newline-delimited JSON documents into equivalent doc::HeapNode + /// representations, backed by `alloc`. + /// + /// parse() returns the begin offset of the document sequence, + /// and an iterator of a parsed document and the input offset of its + /// *following* document. The caller can use the returned begin offset + /// and iterator offsets to compute the [begin, end) offset extents + /// of each parsed document. + /// + /// parse() may return fewer documents than are available if an error + /// is encountered in the input. Callers should repeatedly poll parse() + /// until it returns Ok with an empty iterator in order to consume all + /// documents and errors. + pub fn parse_many<'s, 'a>( + &'s mut self, + alloc: &'a doc::Allocator, + ) -> Result< + (i64, std::vec::Drain<'s, (doc::HeapNode<'a>, i64)>), + (std::io::Error, std::ops::Range), + > { + // Safety: we'll transmute back to lifetime 'a prior to return. + let alloc: &'static doc::Allocator = unsafe { std::mem::transmute(alloc) }; + + if self.whole.is_empty() { + return Ok((self.offset, self.parsed.drain(..))); // Nothing to parse yet. drain(..) is empty. }; - let Some(last_newline) = memchr::memrchr(b'\n', &input) else { - // Neither `self.buf` nor `input` contain a newline, - // and together reflect only a partial document. - self.buf.extend_from_slice(input); - return Ok(None); + let (consumed, maybe_err) = match parse_simd( + &mut self.whole, + self.offset, + alloc, + &mut self.parsed, + &mut self.ffi, + ) { + Err(exception) => { + self.parsed.clear(); // Clear a partial simd parsing. + tracing::debug!(%exception, "simdjson JSON parsing failed; using fallback"); + + parse_fallback(&self.whole, self.offset, alloc, &mut self.parsed) + } + Ok(()) => (self.whole.len(), None), }; - // Complete a series of whole documents by appending through the final newline. - // The remainder, which doesn't contain a newline, is held back for now. - self.buf.extend_from_slice(&input[..last_newline + 1]); + let begin = self.offset; + self.offset += consumed as i64; + self.whole.drain(..consumed); - Ok(Some(last_newline)) + if let Some(err) = maybe_err { + return Err(err); + } + Ok((begin, self.parsed.drain(..))) } } @@ -247,71 +281,89 @@ fn transcode_simd( } fn parse_fallback<'a>( - input: &[u8], + mut input: &[u8], offset: i64, alloc: &'a doc::Allocator, output: &mut Vec<(doc::HeapNode<'a>, i64)>, -) -> Result<(), serde_json::Error> { - let mut r = input; - - while !r.is_empty() { - let mut deser = serde_json::Deserializer::from_reader(&mut r); - let node = doc::HeapNode::from_serde(&mut deser, &alloc)?; - - if let Some(skip) = r.iter().position(|c| !c.is_ascii_whitespace()) { - r = &r[skip..]; - } else { - r = &r[..0]; // Only whitespace remains. +) -> (usize, Option<(std::io::Error, std::ops::Range)>) { + let mut consumed = 0; + + while !input.is_empty() { + let pivot = memchr::memchr(b'\n', &input).expect("input always ends with newline") + 1; + + let mut de = serde_json::Deserializer::from_slice(&input[..pivot]); + match doc::HeapNode::from_serde(&mut de, &alloc) { + Ok(node) => { + input = &input[pivot..]; + consumed += pivot; + output.push((node, offset + consumed as i64)); + } + // Surface an error encountered at the very first document. + Err(err) if consumed == 0 => { + return (pivot, Some((err.into(), offset..offset + pivot as i64))) + } + // Otherwise, return early with the documents we did parse. + // We'll encounter the error again on our next call and return it then. + Err(_err) => break, } - let next_offset = offset + input.len() as i64 - r.len() as i64; - - output.push((node, next_offset)); } - Ok(()) + (consumed, None) } fn transcode_fallback( - input: &[u8], + mut input: &[u8], + offset: i64, mut v: rkyv::AlignedVec, -) -> Result { +) -> ( + usize, + rkyv::AlignedVec, + Option<(std::io::Error, std::ops::Range)>, +) { let mut alloc = doc::HeapNode::allocator_with_capacity(input.len()); - let mut r = input; - - while !r.is_empty() { - let mut deser = serde_json::Deserializer::from_reader(&mut r); - let node = doc::HeapNode::from_serde(&mut deser, &alloc)?; - - if let Some(skip) = r.iter().position(|c| !c.is_ascii_whitespace()) { - r = &r[skip..]; - } else { - r = &r[..0]; // Only whitespace remains. + let mut consumed = 0; + + while !input.is_empty() { + let pivot = memchr::memchr(b'\n', &input).expect("input always ends with newline") + 1; + + let mut de = serde_json::Deserializer::from_slice(&input[..pivot]); + match doc::HeapNode::from_serde(&mut de, &alloc) { + Ok(node) => { + input = &input[pivot..]; + consumed += pivot; + + // Write the document header (next interior offset and length placeholder). + v.extend_from_slice(&(consumed as u32).to_le_bytes()); + v.extend_from_slice(&[0; 4]); // Length placeholder. + let start_len = v.len(); + + // Serialize HeapNode into ArchivedNode by extending our `output.v` buffer. + let mut ser = rkyv::ser::serializers::AllocSerializer::<512>::new( + rkyv::ser::serializers::AlignedSerializer::new(v), + Default::default(), + Default::default(), + ); + ser.serialize_value(&node) + .expect("rkyv serialization cannot fail"); + v = ser.into_serializer().into_inner(); + + // Update the document header, now that we know the actual length. + let len = ((v.len() - start_len) as u32).to_le_bytes(); + (&mut v[start_len - 4..start_len]).copy_from_slice(&len); + + alloc.reset(); + } + // Surface an error encountered at the very first document. + Err(err) if consumed == 0 => { + return (pivot, v, Some((err.into(), offset..offset + pivot as i64))) + } + // Otherwise, return early with the documents we did parse. + // We'll encounter the error again on our next call and return it then. + Err(_err) => break, } - let next_offset = input.len() as u32 - r.len() as u32; - - // Write the document header (next offset and length placeholder). - v.extend_from_slice(&next_offset.to_le_bytes()); - v.extend_from_slice(&[0; 4]); // Length placeholder. - let start_len = v.len(); - - // Serialize HeapNode into ArchivedNode by extending our `output.v` buffer. - let mut ser = rkyv::ser::serializers::AllocSerializer::<512>::new( - rkyv::ser::serializers::AlignedSerializer::new(v), - Default::default(), - Default::default(), - ); - ser.serialize_value(&node) - .expect("rkyv serialization cannot fail"); - v = ser.into_serializer().into_inner(); - - // Update the document header, now that we know the actual length. - let len = ((v.len() - start_len) as u32).to_le_bytes(); - (&mut v[start_len - 4..start_len]).copy_from_slice(&len); - - alloc.reset(); } - Ok(v) + (consumed, v, None) } #[inline] diff --git a/crates/simd-doc/src/tests/fixtures.rs b/crates/simd-doc/src/tests/fixtures.rs index f1f1b6c4b0..44016ce024 100644 --- a/crates/simd-doc/src/tests/fixtures.rs +++ b/crates/simd-doc/src/tests/fixtures.rs @@ -60,11 +60,11 @@ fn test_simd_and_fallback_results_are_equal() { ]); let cases: Vec = serde_json::from_value(cases).unwrap(); - // Build up an input fixture which has lots of whitespace, but consists of a whole documents. + // Build up an input fixture of whole documents. let mut input = Vec::new(); for doc in cases.iter() { - serde_json::to_writer_pretty(&mut input, &doc).unwrap(); - input.push(b'\t'); + serde_json::to_writer(&mut input, &doc).unwrap(); + input.push(b'\n'); } let (transcoded, fallback) = transcoded_and_fallback(&mut input); assert_eq!(transcoded.offset, fallback.offset); @@ -104,29 +104,45 @@ fn test_simd_and_fallback_results_are_equal() { #[test] fn test_basic_parser_apis() { - let cases = json!([ - { - "hello": {"big": "worldddddd", "wide": true, "big big key": "smol"}, - "aaaaaaaaa": 1, - "bbbbbbbbb": 2, - "unicode": "语言处理 😊", - }, - { - "a\ta": { "b\tb": -9007, "z\tz": true}, - "c\tc": "string!", - "d\td": { "e\te": 1234, "zz\tzz": false, "s\ts": "other string!"}, - "last": false - }, - {"\u{80}111abc": "ࠀ\u{80}222"}, - ]); - let cases: Vec = serde_json::from_value(cases).unwrap(); - let mut input = Vec::new(); - for doc in cases.iter() { - serde_json::to_writer(&mut input, &doc).unwrap(); + // Build up a fixture to parse, which includes an invalid document. + { + input.extend( + json!( + { + "hello": {"big": "worldddddd", "wide": true, "big big key": "smol"}, + "aaaaaaaaa": 1, + "bbbbbbbbb": 2, + "unicode": "语言处理 😊", + }) + .to_string() + .into_bytes(), + ); + input.push(b'\n'); + + input.extend( + json!({ + "a\ta": { "b\tb": -9007, "z\tz": true}, + "c\tc": "string!", + "d\td": { "e\te": 1234, "zz\tzz": false, "s\ts": "other string!"}, + "last": false + }) + .to_string() + .into_bytes(), + ); + input.push(b'\n'); + + input.extend(b"{\"whoops\": !\n"); // Invalid JSON. + + input.extend( + json!({"\u{80}111abc": "ࠀ\u{80}222"}) + .to_string() + .into_bytes(), + ); input.push(b'\n'); } let (chunk_1, chunk_2) = input.split_at(input.len() / 2); + let (chunk_1a, chunk_1b) = chunk_1.split_at(chunk_1.len() / 2); let alloc = doc::Allocator::new(); let mut parser = Parser::new(); @@ -142,39 +158,56 @@ fn test_basic_parser_apis() { )), )); - let (begin, chunk) = parser.parse_chunk(chunk_1, 1000, &alloc).unwrap(); - snap.push((begin, json!("PARSE_CHUNK_1"))); - - for (doc, next_offset) in chunk { - snap.push((next_offset, doc.to_debug_json_value())); - } + let mut poll_parse = |parser: &mut Parser, step: &str| loop { + match parser.parse_many(&alloc) { + Ok((begin, chunk)) => { + if chunk.len() == 0 { + break; + } + snap.push((begin, json!(step))); + for (doc, next_offset) in chunk { + snap.push((next_offset, doc.to_debug_json_value())); + } + } + Err((err, location)) => { + snap.push((-1, json!(format!("{step}: {err} @ {location:?}")))); + } + } + }; - let (begin, chunk) = parser - .parse_chunk(chunk_2, 1000 + chunk_1.len() as i64, &alloc) - .unwrap(); - snap.push((begin, json!("PARSE_CHUNK_2"))); + () = parser.chunk(chunk_1, 1000).unwrap(); + poll_parse(&mut parser, "PARSE_CHUNK_1"); - for (doc, next_offset) in chunk { - snap.push((next_offset, doc.to_debug_json_value())); - } + () = parser.chunk(chunk_2, 1000 + chunk_1.len() as i64).unwrap(); + poll_parse(&mut parser, "PARSE_CHUNK_2"); - let transcoded = parser - .transcode_chunk(chunk_1, 1000, Default::default()) - .unwrap(); - snap.push((transcoded.offset, json!("TRANSCODE_CHUNK_1"))); + let mut poll_transcode = |parser: &mut Parser, step: &str| loop { + match parser.transcode_many(Default::default()) { + Ok(transcoded) => { + if transcoded.is_empty() { + break; + } + snap.push((transcoded.offset, json!(step))); - for (doc, next_offset) in transcoded.into_iter() { - snap.push((next_offset, doc.get().to_debug_json_value())); - } + for (doc, next_offset) in transcoded.into_iter() { + snap.push((next_offset, doc.get().to_debug_json_value())); + } + } + Err((err, location)) => { + snap.push((-1, json!(format!("{step}: {err} @ {location:?}")))); + } + } + }; - let transcoded = parser - .transcode_chunk(chunk_2, 1000 + chunk_1.len() as i64, Default::default()) + // This time, use multiple calls to chunk. + () = parser.chunk(chunk_1a, 1000).unwrap(); + () = parser + .chunk(chunk_1b, 1000 + chunk_1a.len() as i64) .unwrap(); - snap.push((transcoded.offset, json!("TRANSCODE_CHUNK_2"))); + poll_transcode(&mut parser, "TRANSCODE_CHUNK_1"); - for (doc, next_offset) in transcoded.into_iter() { - snap.push((next_offset, doc.get().to_debug_json_value())); - } + () = parser.chunk(chunk_2, 1000 + chunk_1.len() as i64).unwrap(); + poll_transcode(&mut parser, "TRANSCODE_CHUNK_2"); snap.push((0, json!("PARSE_ONE"))); let input = json!({"one": [2, "three"], "four": {"five": 6}, "done": true}); @@ -194,7 +227,7 @@ fn test_basic_parser_apis() { [ [ 0, - "input: 271 chunk_1: 135 chunk_2: 136" + "input: 284 chunk_1: 142 chunk_2: 142" ], [ 1000, @@ -234,7 +267,15 @@ fn test_basic_parser_apis() { } ], [ - 1271, + -1, + "PARSE_CHUNK_2: expected value at line 1 column 12 @ 1247..1260" + ], + [ + 1260, + "PARSE_CHUNK_2" + ], + [ + 1284, { "€111abc": "ࠀ€222" } @@ -277,7 +318,15 @@ fn test_basic_parser_apis() { } ], [ - 1271, + -1, + "TRANSCODE_CHUNK_2: expected value at line 1 column 12 @ 1247..1260" + ], + [ + 1260, + "TRANSCODE_CHUNK_2" + ], + [ + 1284, { "€111abc": "ࠀ€222" } diff --git a/crates/simd-doc/src/tests/fuzz.rs b/crates/simd-doc/src/tests/fuzz.rs index 16e2c2c3d3..956f518642 100644 --- a/crates/simd-doc/src/tests/fuzz.rs +++ b/crates/simd-doc/src/tests/fuzz.rs @@ -1,66 +1,109 @@ use quickcheck::quickcheck; use serde_json::Value; +use std::hash::Hasher; +use xxhash_rust::xxh3::Xxh3; use super::ArbitraryValue; quickcheck! { fn transcode_matches_fallback_fuzz(input: Vec) -> bool { - let (simd, fallback) = super::transcoded_and_fallback(&mut build_fixture(input)); + let (simd, fallback) = super::transcoded_and_fallback(&mut extend_fixture(Vec::new(), input)); return fallback.v.as_slice() == simd.v.as_slice(); } fn parse_matches_fallback_fuzz(input: Vec) -> bool { let alloc = doc::Allocator::new(); - let (simd, fallback) = super::parsed_and_fallback(&mut build_fixture(input), &alloc); + let (simd, fallback) = super::parsed_and_fallback(&mut extend_fixture(Vec::new(), input), &alloc); return simd.iter().zip(fallback.iter()).all(|((l_d, l_o), (r_d, r_o))| l_o == r_o && doc::compare(l_d, r_d).is_eq()); } - fn incremental_parse_splits_fuzz(input: Vec, s1: u16, s2: u16) -> bool { - incremental_parse_splits_case(input, s1, s2) + fn parse_and_transcode_with_errors( in1: Vec, in2: Vec, in3: Vec, s1: u16, s2: u16) -> bool { + parse_and_transcode_with_errors_case(in1, in2, in3, s1, s2) } } -fn incremental_parse_splits_case(input: Vec, s1: u16, s2: u16) -> bool { - if input.is_empty() { - return true; // Cannot modulo on len(). - } - let input = build_fixture(input); +fn parse_and_transcode_with_errors_case( + in1: Vec, + in2: Vec, + in3: Vec, + s1: u16, + s2: u16, +) -> bool { + let mut input = extend_fixture(Vec::new(), in1); + input.extend_from_slice(b"{error one}\n"); + input = extend_fixture(input, in2); + input.extend_from_slice(b"{error: two}\n"); + input = extend_fixture(input, in3); let mut p1 = crate::Parser::new(); let mut p2 = crate::Parser::new(); + let mut p3 = crate::Parser::new(); - use std::hash::Hasher; - let mut h1 = xxhash_rust::xxh3::Xxh3::with_seed(0); + let mut h1 = Xxh3::with_seed(0); let mut h2 = h1.clone(); + let mut h3 = h1.clone(); - for (p, s, h) in [(&mut p1, s1, &mut h1), (&mut p2, s2, &mut h2)] { - let s = (s as usize) % input.len(); - - let out = p - .transcode_chunk(&input[..s], 0, Default::default()) - .unwrap(); + drive_parse(&mut p1, &input, s1 as usize, &mut h1); + drive_transcode(&mut p2, &input, s2 as usize, &mut h2); + drive_parse(&mut p3, &input, 0, &mut h3); - for (doc, next_offset) in out.iter() { - h.write_i64(next_offset); - h.update(doc); - } + return h1.digest() == h2.digest() && h1.digest() == h3.digest(); +} - let out = p - .transcode_chunk(&input[s..], s as i64, out.into_inner()) - .unwrap(); +fn drive_transcode(p: &mut crate::Parser, input: &[u8], split: usize, hash: &mut Xxh3) { + let split = split % input.len(); + let mut scratch = Default::default(); + + for (chunk, chunk_offset) in [(&input[..split], 0), (&input[split..], split as i64)] { + () = p.chunk(chunk, chunk_offset).unwrap(); + + scratch = loop { + match p.transcode_many(scratch) { + Ok(out) if out.is_empty() => break out.into_inner(), + Ok(out) => { + for (doc, next_offset) in out.iter() { + let doc = doc::ArchivedNode::from_archive(doc); + let doc = serde_json::to_string(&doc::SerPolicy::noop().on(doc)).unwrap(); + hash.write(doc.as_bytes()); + hash.write_i64(next_offset); + } + scratch = out.into_inner(); + } + Err((err, location)) => { + hash.write(format!("{err} @ {location:?}").as_bytes()); + scratch = Default::default(); + } + } + }; + } +} - for (doc, next_offset) in out.iter() { - h.write_i64(next_offset); - h.update(doc); +fn drive_parse(p: &mut crate::Parser, input: &[u8], split: usize, hash: &mut Xxh3) { + let split = split % input.len(); + let mut alloc = doc::Allocator::new(); + + for (chunk, chunk_offset) in [(&input[..split], 0), (&input[split..], split as i64)] { + () = p.chunk(chunk, chunk_offset).unwrap(); + + loop { + alloc.reset(); + + match p.parse_many(&alloc) { + Ok((_begin, drained)) if drained.len() == 0 => break, + Ok((_begin, drained)) => { + for (doc, next_offset) in drained { + let doc = serde_json::to_string(&doc::SerPolicy::noop().on(&doc)).unwrap(); + hash.write(doc.as_bytes()); + hash.write_i64(next_offset); + } + } + Err((err, location)) => hash.write(format!("{err} @ {location:?}").as_bytes()), + } } } - - return h1.digest() == h2.digest(); } -fn build_fixture(it: Vec) -> Vec { - let mut b = Vec::new(); - +fn extend_fixture(mut b: Vec, it: Vec) -> Vec { for doc in it { serde_json::to_writer( &mut b, diff --git a/crates/simd-doc/src/tests/mod.rs b/crates/simd-doc/src/tests/mod.rs index fb28b3825d..c40bb981b4 100644 --- a/crates/simd-doc/src/tests/mod.rs +++ b/crates/simd-doc/src/tests/mod.rs @@ -12,7 +12,9 @@ fn transcoded_and_fallback(input: &mut Vec) -> (crate::Transcoded, crate::Tr }; () = crate::transcode_simd(input, &mut simd, &mut crate::ffi::new_parser(1_000_000)).unwrap(); - let fallback = crate::transcode_fallback(&input, Default::default()).unwrap(); + let (_consumed, fallback, maybe_err) = crate::transcode_fallback(&input, 0, Default::default()); + assert_eq!(None, maybe_err.map(|(err, _location)| err.to_string())); + let fallback = crate::Transcoded { v: fallback, offset: 0, @@ -36,7 +38,8 @@ fn parsed_and_fallback<'a>( .unwrap(); let mut fallback = Vec::new(); - () = crate::parse_fallback(input, 123_000_000, alloc, &mut fallback).unwrap(); + let (_consumed, maybe_err) = crate::parse_fallback(input, 123_000_000, alloc, &mut fallback); + assert_eq!(None, maybe_err.map(|(err, _location)| err.to_string())); (simd, fallback) } diff --git a/crates/simd-doc/src/transcoded.rs b/crates/simd-doc/src/transcoded.rs index 81aa3b024e..7b7517e9b0 100644 --- a/crates/simd-doc/src/transcoded.rs +++ b/crates/simd-doc/src/transcoded.rs @@ -8,6 +8,10 @@ pub struct Transcoded { } impl Transcoded { + pub fn is_empty(&self) -> bool { + self.v.is_empty() + } + pub fn iter<'s>(&'s self) -> IterOut<'s> { IterOut { v: self.v.as_slice(), diff --git a/crates/simd-doc/tests/parser_perf.rs b/crates/simd-doc/tests/parser_perf.rs index 953d34aef9..79f441879e 100644 --- a/crates/simd-doc/tests/parser_perf.rs +++ b/crates/simd-doc/tests/parser_perf.rs @@ -89,11 +89,18 @@ pub fn parse_perf() { for _ in 0..TOTAL_ROUNDS { for chunk in &chunks { - let (_begin_offset, parsed) = parser.parse_chunk(chunk, bytes as i64, &alloc).unwrap(); - + () = parser.chunk(chunk, bytes as i64).unwrap(); bytes += chunk.len(); - docs += parsed.count(); - alloc.reset(); + + loop { + alloc.reset(); + let (_begin_offset, parsed) = parser.parse_many(&alloc).unwrap(); + docs += parsed.len(); + + if parsed.len() == 0 { + break; + } + } } } @@ -117,14 +124,18 @@ pub fn transcode_perf() { for _ in 0..TOTAL_ROUNDS { for chunk in &chunks { - let output = parser - .transcode_chunk(chunk, bytes as i64, scratch) - .unwrap(); - + () = parser.chunk(chunk, bytes as i64).unwrap(); bytes += chunk.len(); - docs += output.iter().count(); - scratch = output.into_inner(); + loop { + let output = parser.transcode_many(scratch).unwrap(); + docs += output.iter().count(); + scratch = output.into_inner(); + + if scratch.is_empty() { + break; + } + } } }