-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add table changes constructor #505
Add table changes constructor #505
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #505 +/- ##
==========================================
+ Coverage 80.24% 80.31% +0.06%
==========================================
Files 61 62 +1
Lines 13402 13541 +139
Branches 13402 13541 +139
==========================================
+ Hits 10755 10876 +121
- Misses 2093 2106 +13
- Partials 554 559 +5 ☔ View full report in Codecov by Sentry. 🚨 Try these New Features:
|
pub fn table_changes( | ||
&self, | ||
engine: &dyn Engine, | ||
start_version: Version, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not impl Into<Version>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hadn't considered that. Tho this change seems to cause issues:
154 | let table_changes_res = table.table_changes(engine.as_ref(), 3, 4);
| ------------- ^ the trait `From<i32>` is not implemented for `u64`, which is required by `{integer}: Into<u64>
Integers are treated as i32 by default, and i32 can't be converted into u64. By using just start_version: Version
, seems that the compiler treats it as a u64 from the get go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, what situation would produce an Into<Version>
that is not already Version
, that we need the fancy arg passing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, maybe my question should actually have been, "why not Option<Version>
for end_version
"? When do we need the into there? I just erred on the side of flexibility
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh using end_version: impl Into<Option<Version>>
lets us pass in either Version
or Option<Version>
to the function. This is a trick I learned from @scovich
So the following are legal:
table.table_changes(engine.as_ref(), 3, 4);
table.table_changes(engine.as_ref(), Some(3), 4);
table.table_changes(engine.as_ref(), None, 4);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh cool, that's nice. Sorry for the long aside :)
.get(ENABLE_CDF_FLAG) | ||
.is_some_and(|val| val == "true") | ||
}; | ||
if !is_cdf_enabled(&start_snapshot) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to check at every point along the way, not just start and end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, we could leave that till later. I was hoping to do some checking at this stage since we can catch that error earlier. Should I just remove this check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No I think this is okay, as long as we somehow check at each point. I think if it's cheap to error early we should
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No extra cost since we needed to get both snapshots anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add comments here for clarity? (e.g. "we check start/end to fail early if needed but at each point yielding CDF batches we do schema compat check" or something like that)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added comments addressing enable cdf flag, schema, and protocol.
/// Create a [`TableChanges`] to get a change data feed for the table between `start_version`, | ||
/// and `end_version`. If no `end_version` is supplied, the latest version will be used as the | ||
/// `end_version`. | ||
pub fn table_changes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to be able to specify:
- A schema? Does CDF always return the full table schema?
- A predicate? Can't you get a CDF with a predicate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is usually done in the Scan Builder. The plan is to specify the schema and predicate when building a TableChangesScan
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and i suppose there are no optimizations to be done here with that information? If yes we likely would want to propagate that information here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is the main 'entrypoint' API for users to interact with reading CDF can we add some docs on semantics and ideally examples too? important things to call out: (1) require that CDF is enabled for the whole range, (2) require the same schema for the whole range (for now!), (3) how do i scan this thing?
(i'm probably forgetting other bits of semantics so don't take the above as exhaustive)
@nicklan @zachschuermann I wanted your opinion on the
|
Also I wanted to flag this: I have the field I'm leaning towards just storing column mapping mode and partition columns directly in |
AFAIK, streaming in delta-spark does a complicated dance to process metadata changes in a commit separately from the data changes of that commit, and an incompatible metadata change causes the stream to restart. We should probably use that code as inspiration so we don't reinvent the wheel? (but it's also somewhat messy code due to its organic development over years, so we should probably not copy blindly) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool, mostly fine, just one small thing
pub fn table_changes( | ||
&self, | ||
engine: &dyn Engine, | ||
start_version: Version, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, maybe my question should actually have been, "why not Option<Version>
for end_version
"? When do we need the into there? I just erred on the side of flexibility
.get(ENABLE_CDF_FLAG) | ||
.is_some_and(|val| val == "true") | ||
}; | ||
if !is_cdf_enabled(&start_snapshot) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No I think this is okay, as long as we somehow check at each point. I think if it's cheap to error early we should
pub log_segment: LogSegment, | ||
table_root: Url, | ||
end_snapshot: Snapshot, | ||
start_version: Version, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just checking, we won't have to re-find the start snapshot when we actually go to return results right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we don't. Henceforth all we are interested in are the commits in the LogSegment
, and the schema we got from the end_snapshot.
kernel/src/table_changes/mod.rs
Outdated
} | ||
if start_snapshot.schema() != end_snapshot.schema() { | ||
return Err(Error::generic( | ||
"Failed to build TableChanges: Start and end version schemas are different.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's put the schemas in the output to help with debugging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to the print logs. How's it look?l
let's do the minimal thing for now and only include the data we need immediately |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(copying from slack thread so we don't lose it)
You mentioned
Note that the behaviour to fail early in table changes constructor aligns with spark's behaviour. Only the CDF range is returned in spark's error. No specific commit version that causes the failure is provided.
can we add that to PR description and/or docs?
generally PR looks good, awesome work! mostly nits and will come back for final review after those are addressed
/// Create a [`TableChanges`] to get a change data feed for the table between `start_version`, | ||
/// and `end_version`. If no `end_version` is supplied, the latest version will be used as the | ||
/// `end_version`. | ||
pub fn table_changes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and i suppose there are no optimizations to be done here with that information? If yes we likely would want to propagate that information here?
/// Create a [`TableChanges`] to get a change data feed for the table between `start_version`, | ||
/// and `end_version`. If no `end_version` is supplied, the latest version will be used as the | ||
/// `end_version`. | ||
pub fn table_changes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is the main 'entrypoint' API for users to interact with reading CDF can we add some docs on semantics and ideally examples too? important things to call out: (1) require that CDF is enabled for the whole range, (2) require the same schema for the whole range (for now!), (3) how do i scan this thing?
(i'm probably forgetting other bits of semantics so don't take the above as exhaustive)
self.start_version | ||
} | ||
/// The end version of the `TableChanges`. If no end_version was specified in | ||
/// [`TableChanges::try_new`], this returns the newest version as of the call to `try_new`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// [`TableChanges::try_new`], this returns the newest version as of the call to `try_new`. | |
/// [`TableChanges::try_new`], this returns the newest version as of the call to [`try_new`]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This causes issues with cargo doc, and I think it's a lot of visual clutter to repeat the full [`TableChanges::try_new`]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
StructField::new("_change_type", DataType::STRING, false), | ||
StructField::new("_commit_version", DataType::LONG, false), | ||
StructField::new("_commit_timestamp", DataType::TIMESTAMP, false), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Side note: delta-spark does some hand-wringing about table schemas that already provide columns with these names. I think the solution was to block enabling CDF on such tables, and to block creating columns with those names on tables that are already CDF-enabled? (both are writer issues, not reader, but it's prob worth tracking)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tracked in #524. Thx!
let start_snapshot = | ||
Snapshot::try_new(table_root.as_url().clone(), engine, Some(start_version))?; | ||
let end_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, end_version)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aside: Relating to the optimization we've discussed a few times -- the end snapshot should be able to use the first snapshot as a starting point for listing (and P&M), if the two versions aren't too far apart?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I would like that to be the case eventually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a note to #489
.metadata() | ||
.configuration | ||
.get(ENABLE_CDF_FLAG) | ||
.is_some_and(|val| val == "true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this will simplify once #453 merges?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep!
} else if !is_cdf_enabled(&end_snapshot) { | ||
return Err(Error::table_changes_disabled(end_snapshot.version())); | ||
} | ||
if start_snapshot.schema() != end_snapshot.schema() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aside: Technically users can stuff whatever metadata they want into the schema fields; should we track an item to ignore unknown metadata entries when comparing schemas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we could do that. I think schema checking needs to be changed a lot anyway to support at least adding columns and changing nullability, so there's more work to do in that department.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added link to issue to track schema compatibility checks. #523
.schema() | ||
.fields() | ||
.cloned() | ||
.chain(CDF_FIELDS.clone()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CDF fields are generated columns right? (not read directly from parquet)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, correct. I was planning on making them a special case only in CDF code. If you feel that we can legitimately treat these as generated column, we could add a new column type ColumnType::GeneratedColumn
.
a6f13c4
to
d3eb161
Compare
let start_snapshot = | ||
Snapshot::try_new(table_root.as_url().clone(), engine, Some(start_version))?; | ||
let end_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, end_version)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a note to #489
pub fn table_changes( | ||
&self, | ||
engine: &dyn Engine, | ||
start_version: Version, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh cool, that's nice. Sorry for the long aside :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM ship ship ship
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-incubator/delta-kernel-rs/blob/main/CONTRIBUTING.md 2. Run `cargo t --all-features --all-targets` to get started testing, and run `cargo fmt`. 3. Ensure you have added or run the appropriate tests for your PR. 4. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 5. Be sure to keep the PR description updated to reflect all changes. --> ## What changes are proposed in this pull request? <!-- Please clarify what changes you are proposing and why the changes are needed. The purpose of this section is to outline the changes, why they are needed, and how this PR fixes the issue. If the reason for the change is already explained clearly in an issue, then it does not need to be restated here. 1. If you propose a new API or feature, clarify the use case for a new API or feature. 2. If you fix a bug, you can clarify why it is a bug. --> This removes files that were accidentally added in prior PRs that were un-reviewed in #505 and #506. <!-- Uncomment this section if there are any changes affecting public APIs: ### This PR affects the following public APIs If there are breaking changes, please ensure the `breaking-changes` label gets added by CI, and describe why the changes are needed. Note that _new_ public APIs are not considered breaking. --> ## How was this change tested? <!-- Please make sure to add test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested, ideally via a reproducible test documented in the PR description. --> Co-authored-by: Zach Schuermann <[email protected]>
What changes are proposed in this pull request?
This PR introduces the TableChanges struct which represents a Change Data Feed scan. TableChanges is constructed from a Table, and performs 2 protocol and metadata scans.
I also add the logic for converting the end version's schema into the cdf schema.
Note that the behaviour to fail early in table changes constructor aligns with spark's behaviour. Only the CDF range is returned in spark's error. No specific commit version that causes the failure is provided.
How was this change tested?
TableChanges::try_new
checks the start and end versiontable_changes.schema()
method returns the CDF schema.