-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Spark: Fix row lineage inheritance for distributed planning #13061
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Spark: Fix row lineage inheritance for distributed planning #13061
Conversation
{ | ||
"testhadoop", | ||
SparkCatalog.class.getName(), | ||
ImmutableMap.of("type", "hadoop"), | ||
FileFormat.PARQUET, | ||
false, | ||
WRITE_DISTRIBUTION_MODE_HASH, | ||
true, | ||
null, | ||
DISTRIBUTED, | ||
3 | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll see how much more time this adds, but at least in the interim I feel like it's worth having this as it's what caught the issue.
What we can probably do once the vectorized reader change is in, is remove the parquet + local test above since the vectorized reader is already testing local. Then we'll still have coverage of both local + distributed without multiple parquet local cases like we have right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
technically you could also override parameters()
in TestRowLevelOperationsWithLineage
so that the test matrix is only increased for those tests and not across all tests
@@ -36,6 +36,7 @@ public class ManifestFileBean implements ManifestFile, Serializable { | |||
private Long addedSnapshotId = null; | |||
private Integer content = null; | |||
private Long sequenceNumber = null; | |||
private Long firstRowId = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok this may not be quite right, forgot about delete manifests...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would this not work for delete manifests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should (it'd just be null for delete manifests). I confused myself when debugging an issue with failing tests but those tests are unrelated to if it's a delete manifest or not. Checkout my comment below on why I removed the getter on my latest update
561a43e
to
ebbb9a1
Compare
@@ -46,6 +47,7 @@ public static ManifestFileBean fromManifest(ManifestFile manifest) { | |||
bean.setAddedSnapshotId(manifest.snapshotId()); | |||
bean.setContent(manifest.content().id()); | |||
bean.setSequenceNumber(manifest.sequenceNumber()); | |||
bean.setFirstRowId(manifest.firstRowId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a proper getter for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On my latest push I removed the getter because the spark actions that read the paths in the manifest file as a dataframe (e.g. orphan files/expire snapshots) also use ManifestFileBean
and when reading the manifest DF, it was failing to find firstRowId
(the existence of the getter makes it so that every record read by these actions needs to have this field, and if it doesn't it fails during analysis).
The getter isn't needed for the distributed planning case since ManifestFileBean implements manifestFile and firstRowId
API gets used. It's also not required for the Spark actions which just need the minimal file info.
But it's a bit odd that this one particular field won't have a getter, let me think if there's a cleaner way. Having two different manifest file bean structures where one is even more minimal seems a bit messy, at least just for this case.
At the very least, if we go with this approach I should inline comment why there's no getter for the field.
For Spark Distributed planning we use a
ManifestFileBean
implementation of ManifestFile which is serializable and encodes the minimal amount of manifest fields required during distributed planning. This was missing firstRowId and as a result null values would be propogated for the inherited firstRowId. This fixes the issue by simply adding the firstRowId field to the bean which will be set correctly and as a result be inherited correctly during Spark distributed planning.I discovered this when going through the DML row lineage tests and noticed we weren't exercising a distributed planning case and after enabling, debugged. I added another test parameter set for distributed planning.