Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Schema inference is non-deterministic #30276

Open
1 of 16 tasks
iht opened this issue Feb 9, 2024 · 4 comments
Open
1 of 16 tasks

[Bug]: Schema inference is non-deterministic #30276

iht opened this issue Feb 9, 2024 · 4 comments

Comments

@iht
Copy link
Contributor

iht commented Feb 9, 2024

What happened?

When using the inference of schema with a POJO, a Java Bean or an AutoValue class, the order of the fields in the schema may change during the inference process. This can cause issues with updates in the runners that support them (e.g. Cloud Dataflow).

Because the schemas are different (same fields, but in different order), the coders used for those PCollections will be different too, and reading the data from the previous job's checkpoint will fail. The job will process new elements fine, but the old checkpoint elements will throw decode errors.

Example of the same AutoValue class with two different runs, omitting the fields part for brevity. In the first run, this is the inferred schema:

[...]
Encoding positions:
{sequenceNumber=12, destination=11, messageId=7, receiveTimestamp=5, priority=1, senderTimestamp=6, redelivered=4, timeToLive=3, payload=2, replyTo=8, expiration=10, replicationGroupMessageId=9, properties=0}
[...]

In the second run, no changes in the code, but the code is recompiled again, this is the schema:

[...]
Encoding positions:
{sequenceNumber=12, destination=11, messageId=8, receiveTimestamp=6, priority=1, senderTimestamp=7, redelivered=5, timeToLive=3, payload=2, replyTo=9, expiration=10, replicationGroupMessageId=4, properties=0}
[...]

Notice how for instance messageId is detected as the 7th field in the first schema, and the 8th in the second.

The problem seems to be in the usage of Class.getDeclaredMethods in the reflect utils.

The documentation of getDeclaredMethods (doc for Java 8, doc for Java 17) says that: "The elements in the returned array are not sorted and are not in any particular order.".

So in summary, the schema inference is non-deterministic.

As a workaround, you can use the annotation @SchemaFieldNumber to override the order of the fields, but it is far from obvious that if you create a data class without changing the order of fields in the code, the order may not always be the same.

So I would suggest for instance ordering the getter methods lexicographically (which is deterministic across runs using the same code), and clearly adding that to the documentation. The main drawback would be that adding a field, even if it is nullable, would change the order if fields are sorted lexicographically. But if this clearly documented, users can always add the @SchemaFieldNumber to the new fields, to ensure they are appended at the end of the fields positions.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@kennknowles
Copy link
Member

I think @reuvenlax did some work to deal with this before.

@reuvenlax
Copy link
Contributor

Sorting doesn't help here as a common use case is to add a new field on update. Dataflow at least should be able to handle schema update, as long as the schema is on a PCollection.

@bzablocki bzablocki mentioned this issue Jun 13, 2024
3 tasks
@scwhittle
Copy link
Contributor

scwhittle commented Sep 3, 2024

I filed #29245 previously and had a PR in progress similar to #30279 but also had changes to fix brittle tests relying on the ordering of reflection.

I was investigating a problem with update and identified a bug that might be the root cause of the issue here. The null bitset used by the generated row coder was not respecting the encoding position overrides. See #32388

Separate from the bug I think that sorting to avoid non-deterministic reflection ordering seems clearer and should be done but perhaps it won't be as urgent.

@reuvenlax
Copy link
Contributor

Simply sorting does not solve the problem - we support adding new fields on update, which isn't compatible with sorting. Sorting might be beneficial for non Dataflow runners that don't currently implement schema update (as long as they are ok with not supporting field addition).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants