-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deduplicate and standardize deserialization logic for streams #13412
Conversation
We discussed this with @alihan-synnada and it looks good to me, but it'd be great to get community review. cc @alamb |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's wait a little bit for more eyes on this, but I carefully went through and it seems like a good first step towards removing code duplication on the read side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @alihan-synnada and @ozankabak
I think this PR is really well documented and makes a lot of sene to me
One thing I noticed is that the ticket talks about the arrow and avro as well. Do you plan to update them in a follow on PR?
Finally, the ticket also mentions parquet -- I think it will be hard to update the parquet reader (or any columnar file format) to use the DecodeTrait
. The parquet reader itself drives what IO to do (aka what byte ranges and when) rather than the more row oriented format.
@@ -168,6 +172,164 @@ pub enum FilePushdownSupport { | |||
Supported, | |||
} | |||
|
|||
/// Possible outputs of a [`BatchDeserializer`]. | |||
#[derive(Debug, PartialEq)] | |||
pub enum DeserializerOutput { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 nice
pub(crate) trait Decoder: Send + Debug { | ||
/// See [`arrow::json::reader::Decoder::decode`]. | ||
/// | ||
/// [`arrow::json::reader::Decoder::decode`]: ::arrow::json::reader::Decoder::decode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I double checked and https://docs.rs/arrow-json/53.2.0/arrow_json/reader/struct.Decoder.html seems to describe this interface well.
/// | ||
/// This struct is responsible for converting a stream of bytes, which represent | ||
/// encoded data, into a stream of `RecordBatch` objects, following the specified | ||
/// schema and formatting options. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might be worth also mentioning here this handles any buffering on the input that might be required to fulfill the decode interface (that might return RecordBatches before fully consuming the input)
It took me a while to figure out why this was required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done - thank you for pointing it out
@@ -651,36 +651,14 @@ impl FileOpener for CsvOpener { | |||
Ok(futures::stream::iter(config.open(decoder)?).boxed()) | |||
} | |||
GetResultPayload::Stream(s) => { | |||
let mut decoder = config.builder().build_decoder(); | |||
let decoder = config.builder().build_decoder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is certainly a lot nicer 😍
Yes, indeed. Not an immediate priority but we would like to tidy up the read side.
I agree -- Parquet will probably stay separate for the time being. |
Which issue does this PR close?
None
Rationale for this change
Part of #13411
This PR implements a common
Decoder
trait, theBatchDeserializer
trait and theDecoderDeserializer
struct as described in the issue, along withCsvDecoder
andJsonDecoder
asarrow-csv
andarrow-json
Decoder
s are readily available.What changes are included in this PR?
Note: There are about 290 lines of new tests, so it is about 250 lines of actual code.
BatchDeserializer
as a common interface.digest
consumes the input in chunksnext
attempts to deserialize the digested data and returns aDeserializerOutput
which is either aRecordBatch
,RequiresMoreData
andInputExhausted
finish
signals the end of the input streamDecoder
traitDecoder
sDecoder
forCsvDecoder
andJsonDecoder
by forwarding methodsDecoderDeserializer
and implementBatchDeserializer
for formats that have aDecoder
implementation.deserialize_stream
function to deduplicate the deserialization logicAre these changes tested?
Yes, the changes are covered by new tests added to the CSV and JSON modules.
Are there any user-facing changes?
No