Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Demonstrate a bug for schema "validation" when writing JSON to a table #131

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

rtyler
Copy link
Member

@rtyler rtyler commented Jan 11, 2023

I stumbled into this while pilfering code from kafka-delta-ingest for another project and discovered that the code in write_values which does record_batch.schema() != arrow_schema doesn't do what we think it does.

Basically if Decoder "works" the schema it's going to return is just the schema passed into it. It has no bearing on whether the JSON has the same schema. Don't ask me why.

Using the reader's infer_json_schema_* functions can provide a Schema that is useful for comparison:

    let mut value_iter = json_buffer.iter().map(|j| Ok(j.to_owned()));
    let json_schema = infer_json_schema_from_iterator(value_iter.clone()).expect("Failed to infer!");
    let decoder = Decoder::new(Arc::new(json_schema), options);
    if let Some(batch) = decoder.next_batch(&mut value_iter).expect("Failed to create RecordBatch") {
        assert_eq!(batch.schema(), arrow_schema_ref, "Schemas don't match!");
    }

What's even more interesting, is that after a certain number of fields are removed, the Decoder no longer pretends it can Decode the JSON. I am baffled as to why.

The current failure from this test is:

---- writer::tests::test_schema_matching stdout ----
thread 'writer::tests::test_schema_matching' panicked at 'Expected the write of our invalid schema rows to fail!
Ok(())', src/writer.rs:1089:9
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

I stumbled into this while pilfering code from kafka-delta-ingest for another
project and discovered that the code in `write_values` which does
`record_batch.schema() != arrow_schema` doesn't do what we think it does.

Basically if `Decoder` "works" the schema it's going to return is just the
schema passed into it. It has no bearing on whether the JSON has the same
schema. Don't ask me why.

Using the reader's `infer_json_schema_*` functions can provide a Schema that is
useful for comparison:

        let mut value_iter = json_buffer.iter().map(|j| Ok(j.to_owned()));
        let json_schema = infer_json_schema_from_iterator(value_iter.clone()).expect("Failed to infer!");
        let decoder = Decoder::new(Arc::new(json_schema), options);
        if let Some(batch) = decoder.next_batch(&mut value_iter).expect("Failed to create RecordBatch") {
            assert_eq!(batch.schema(), arrow_schema_ref, "Schemas don't match!");
        }

What's even more interesting, is that after a certain number of fields are
removed, the Decoder no longer pretends it can Decode the JSON. I am baffled as
to why.
@rtyler rtyler requested review from xianwill and mosyp January 11, 2023 06:00
@rtyler
Copy link
Member Author

rtyler commented Jan 12, 2023

I discussed this at length today with @xianwill privately. I will attempt to summarize our verbose conversation 😄

Basically this does look like a bug and a half, and maybe some unclear behavior. The schemas themselves being used have nullable fields for everything. What is interesting is that a nullable "primitive type" can be omitted and the Decoder will try to fill out the batch. Whereas a missing struct/map/list causes the Decoder to ignore the record entirely, and can lead to an EmptyBatchError. 🐛

To that end, there is in fact no real check occurring that the schema of the JSON lines up with the schema of the Delta table at the moment 🐛 Because of nullable fields, the primitive batch.schema() != arrow_schema_ref which is present in code might not be helpful even if it did work correctly. (Meaning it compared the inferred schema of the JSON to the actualized schema of the Delta table)

From @xianwill :

we should detect a SchemaMismatch in case of missing required fields or incompatible data types. IMO == is a bit too strong. For example, it should still be okay for nullable fields to not be present in the source schema. Not sure if inferring schema from the first json in the batch solves it quite right either. What if the second record has a different structure? Also, we need to be able to dead letter individual messages in case of SchemaMismatch and continue processing the stream.

The disconnect between schema in the parquet files and the schema of the Delta table doesn't make deciding what the appropriate behavior for kafka-delta-ingest should be here. 🙀

@rtyler rtyler marked this pull request as draft January 12, 2023 00:36
rtyler added a commit that referenced this pull request Jan 8, 2024
This commit introduces some interplay between the IngestProcessor and
DataWriter, the latter of which needs to keep track of whether or not it
has a changed schema.

What should be done with that changed schema must necessarily live in
IngestProcessor since that will perform the Delta transaction commits at
the tail end of batch processing.

There is some potential mismatches between the schema in storage and
what the DataWriter has, so this change tries to run the runloop again
if the current schema and the evolved schema are incompatible

Closes #131

Sponsored-by: Raft LLC
@rtyler rtyler added this to the kafka-delta-ingest 0.2 release milestone Jan 9, 2024
rtyler added a commit that referenced this pull request Jan 9, 2024
This commit introduces some interplay between the IngestProcessor and
DataWriter, the latter of which needs to keep track of whether or not it
has a changed schema.

What should be done with that changed schema must necessarily live in
IngestProcessor since that will perform the Delta transaction commits at
the tail end of batch processing.

There is some potential mismatches between the schema in storage and
what the DataWriter has, so this change tries to run the runloop again
if the current schema and the evolved schema are incompatible

Closes #131

Sponsored-by: Raft LLC
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant