Skip to content

Commit

Permalink
simd-doc: allow for continued parsing after errors
Browse files Browse the repository at this point in the history
Update Parser to support continued parsing after an error in parsing a
JSON document. Use a polling-based design, where a caller can add
one or more chunk()'s to a Parser and then repeatedly poll it to drain
all parsed or transcoded documents and errors.

Also update error tracking to provide fine-grain localization of exactly
where, in the input stream, the erroring document is located.

Enhance fuzz tests to mix in errors alongside valid documents, and
verify that parsing and transcoding give exactly the same outcomes.

Resolves #1584
  • Loading branch information
jgraettinger committed Aug 29, 2024
1 parent 7557ed6 commit a95a6f8
Show file tree
Hide file tree
Showing 9 changed files with 449 additions and 271 deletions.
8 changes: 4 additions & 4 deletions crates/dekaf/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ impl Read {
ReadJsonLine::Doc { root, next_offset } => (root, next_offset),
};
let Some(doc::ArchivedNode::String(uuid)) = self.uuid_ptr.query(root.get()) else {
anyhow::bail!(gazette::Error::Parsing(
self.offset,
std::io::Error::other("document does not have a valid UUID"),
));
anyhow::bail!(
"document at offset {} does not have a valid UUID",
self.offset
);
};
let (producer, clock, flags) = gazette::uuid::parse_str(uuid.as_str())?;

Expand Down
22 changes: 17 additions & 5 deletions crates/gazette/src/journal/read_json_lines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ impl futures::Stream for ReadJsonLines {
return Poll::Ready(Some(Ok(ReadJsonLine::Doc { root, next_offset })));
}

match me.parser.transcode_many(Default::default()) {
Ok(out) if !out.is_empty() => {
*me.parsed = out.into_iter();
continue;
}
Err((err, location)) => {
return Poll::Ready(Some(Err(Error::Parsing { location, err })))
}
Ok(_out) => {} // Requires more chunks.
}

// Poll the inner stream for the next item
match me.inner.poll_next_unpin(cx) {
Poll::Ready(Some(response)) => {
Expand All @@ -90,11 +101,12 @@ impl futures::Stream for ReadJsonLines {
return Poll::Ready(Some(Ok(ReadJsonLine::Meta(response))));
}

*me.parsed = me
.parser
.transcode_chunk(&response.content, response.offset, Default::default())
.map_err(|err| Error::Parsing(response.offset, err))?
.into_iter();
me.parser
.chunk(&response.content, response.offset)
.map_err(|err| Error::Parsing {
location: response.offset..response.offset,
err,
})?;
}
std::task::Poll::Ready(None) => return std::task::Poll::Ready(None),
std::task::Poll::Pending => return std::task::Poll::Pending,
Expand Down
8 changes: 6 additions & 2 deletions crates/gazette/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ pub enum Error {
BrokerStatus(broker::Status),
#[error("unexpected consumer status: {0:?}")]
ConsumerStatus(consumer::Status),
#[error("failed to parse document near journal offset {0}")]
Parsing(i64, #[source] std::io::Error),
#[error("failed to parse document at journal offset range {location:?}")]
Parsing {
location: std::ops::Range<i64>,
#[source]
err: std::io::Error,
},
#[error("{0}")]
Protocol(&'static str),
#[error(transparent)]
Expand Down
Loading

0 comments on commit a95a6f8

Please sign in to comment.