Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplicate and standardize deserialization logic for streams #13412

Merged
merged 4 commits into from
Nov 16, 2024

Conversation

alihan-synnada
Copy link
Contributor

Which issue does this PR close?

None

Rationale for this change

Part of #13411

This PR implements a common Decoder trait, the BatchDeserializer trait and the DecoderDeserializer struct as described in the issue, along with CsvDecoder and JsonDecoder as arrow-csv and arrow-json Decoders are readily available.

What changes are included in this PR?

Note: There are about 290 lines of new tests, so it is about 250 lines of actual code.

  • Add BatchDeserializer as a common interface.
    • digest consumes the input in chunks
    • next attempts to deserialize the digested data and returns a DeserializerOutput which is either a RecordBatch, RequiresMoreData and InputExhausted
    • finish signals the end of the input stream
  • Add Decoder trait
    • Mimics arrow-json and arrow-csv's Decoders
  • Implement Decoder for CsvDecoder and JsonDecoder by forwarding methods
  • Add DecoderDeserializer and implement BatchDeserializer for formats that have a Decoder implementation.
  • Add deserialize_stream function to deduplicate the deserialization logic

Are these changes tested?

Yes, the changes are covered by new tests added to the CSV and JSON modules.

Are there any user-facing changes?

No

@github-actions github-actions bot added the core Core DataFusion crate label Nov 14, 2024
@ozankabak
Copy link
Contributor

We discussed this with @alihan-synnada and it looks good to me, but it'd be great to get community review. cc @alamb

Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's wait a little bit for more eyes on this, but I carefully went through and it seems like a good first step towards removing code duplication on the read side.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @alihan-synnada and @ozankabak

I think this PR is really well documented and makes a lot of sene to me

One thing I noticed is that the ticket talks about the arrow and avro as well. Do you plan to update them in a follow on PR?

Finally, the ticket also mentions parquet -- I think it will be hard to update the parquet reader (or any columnar file format) to use the DecodeTrait. The parquet reader itself drives what IO to do (aka what byte ranges and when) rather than the more row oriented format.

@@ -168,6 +172,164 @@ pub enum FilePushdownSupport {
Supported,
}

/// Possible outputs of a [`BatchDeserializer`].
#[derive(Debug, PartialEq)]
pub enum DeserializerOutput {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 nice

pub(crate) trait Decoder: Send + Debug {
/// See [`arrow::json::reader::Decoder::decode`].
///
/// [`arrow::json::reader::Decoder::decode`]: ::arrow::json::reader::Decoder::decode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I double checked and https://docs.rs/arrow-json/53.2.0/arrow_json/reader/struct.Decoder.html seems to describe this interface well.

///
/// This struct is responsible for converting a stream of bytes, which represent
/// encoded data, into a stream of `RecordBatch` objects, following the specified
/// schema and formatting options.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be worth also mentioning here this handles any buffering on the input that might be required to fulfill the decode interface (that might return RecordBatches before fully consuming the input)

It took me a while to figure out why this was required

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - thank you for pointing it out

@@ -651,36 +651,14 @@ impl FileOpener for CsvOpener {
Ok(futures::stream::iter(config.open(decoder)?).boxed())
}
GetResultPayload::Stream(s) => {
let mut decoder = config.builder().build_decoder();
let decoder = config.builder().build_decoder();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is certainly a lot nicer 😍

@ozankabak
Copy link
Contributor

One thing I noticed is that #13411 talks about the arrow and avro as well. Do you plan to update them in a follow on PR?

Yes, indeed. Not an immediate priority but we would like to tidy up the read side.

Finally, the ticket also mentions parquet -- I think it will be hard to update the parquet reader (or any columnar file format) to use the DecodeTrait. The parquet reader itself drives what IO to do (aka what byte ranges and when) rather than the more row oriented format.

I agree -- Parquet will probably stay separate for the time being.

@ozankabak ozankabak merged commit 06db9ed into apache:main Nov 16, 2024
25 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants