-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Kernel] Added Domain Metadata support to Delta Kernel #3835
base: master
Are you sure you want to change the base?
[Kernel] Added Domain Metadata support to Delta Kernel #3835
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work, this is looking pretty good
kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DomainMetadataUtils.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java
Outdated
Show resolved
Hide resolved
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala
Outdated
Show resolved
Hide resolved
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala
Show resolved
Hide resolved
…Errors.java Co-authored-by: Johan Lasperas <[email protected]>
Co-authored-by: Johan Lasperas <[email protected]>
Co-authored-by: Johan Lasperas <[email protected]>
…uring conflict resolution
2da16e9
to
7d7032e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! Thanks for this PR!
Left a few comments :p Mostly style. A few important ones about logic/functionality.
I did not require the tests, will do so after changes are made.
kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java
Outdated
Show resolved
Hide resolved
public static final String FEATURE_NAME = "domainMetadata"; | ||
|
||
/** The minimum writer version required to support domain metadata. */ | ||
public static final int MIN_WRITER_VERSION_REQUIRED = 7; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @vkorukanti -- I feel like we need to come up with a cleaner template for Kernel Delta Table Features for them to all include common information, like the min reader writer version required, the feature name, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah we should move this. For now at least to TableFeatures.java
before we have a proper TableFeature class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn’t find an elegant existing approach for this, so I just added the following to TableFeatures
. Does this look good to you?
public static final String DOMAIN_METADATA_FEATURE_NAME = "domainMetadata";
public static final int DOMAIN_METADATA_MIN_WRITER_VERSION_REQUIRED = 7;
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DomainMetadata.java
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DomainMetadata.java
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java
Outdated
Show resolved
Hide resolved
...l/kernel-api/src/main/java/io/delta/kernel/internal/util/ValidateDomainMetadataIterator.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DomainMetadata.java
Show resolved
Hide resolved
...l/kernel-api/src/main/java/io/delta/kernel/internal/util/ValidateDomainMetadataIterator.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CreateCheckpointIterator.java
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java
Outdated
Show resolved
Hide resolved
/** | ||
* Retrieves a map of domainName to {@link DomainMetadata} from the log files. | ||
* | ||
* <p>Now loading domain metadata requires an additional round of log replay so this is done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should do this during the P & M log replay
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to our discussion, we will address this in a separate PR.
@@ -221,7 +223,8 @@ private TransactionCommitResult doCommit( | |||
} | |||
setTxnOpt.ifPresent(setTxn -> metadataActions.add(createTxnSingleAction(setTxn.toRow()))); | |||
|
|||
try (CloseableIterator<Row> stageDataIter = dataActions.iterator()) { | |||
try (CloseableIterator<Row> stageDataIter = | |||
new ValidateDomainMetadataIterator(protocol, dataActions.iterator(), FULL_SCHEMA)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. So this is just a thin wrapper over the actions-to-be-committed and validates the DomainMetadata on write.
@vkorukanti -- would we ever want to validate other things on write?
@qiyuandong-db -- I wonder if we should not have a ValidateDomainMetadataIterator
but rather a ValidationIterator
. This validation iterator lets you pass in different validationFunctions that take a row and decide to or not to throw an error. This would be a more extensible solution. It would let us avoid iterater wrappers on top of iterator wrappers on top of iterator wrappers ...
For example, I'd be fine removing the validation from this PR and coding that ^ up in a followup PR ... let's see what @vkorukanti thinks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’ve updated this part to separate the domain metadata actions from dataActions
. Now, they’re stored in a List<domainMetadata>
and we can apply validation functions directly. So the ValidateDomainMetadataIterator
is no longer needed here.
I think we can implement a general ValidationIterator
in a separate PR if the need comes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Requested some minor changes. Had one pretty big question about the validator iterator. The tests look great! Will stamp after this 🫡
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java
Outdated
Show resolved
Hide resolved
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala
Outdated
Show resolved
Hide resolved
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala
Outdated
Show resolved
Hide resolved
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! Thanks for adding the domain metadata support. It brings Kernel closer to the full protocol support.
One change I think we should discuss is: we are passing the domain metadata as an action to the commit. Instead the domain metadata should be given when constructing the TransactionBuilder (this needs a API design). Internally Kernel will take care of converting this info into a domain metadata action. Transaction.commit
we expect only the data actions as these. These actions too are generated by Kernel when the connectors gives a set of files. The goal is to avoid connectors constructing actions which exposes protocol details to connectors.
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DomainMetadata.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DomainMetadata.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/SingleAction.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java
Outdated
Show resolved
Hide resolved
...l/kernel-api/src/main/java/io/delta/kernel/internal/util/ValidateDomainMetadataIterator.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing the review. Getting close.
connectors/golden-tables/src/test/scala/io/delta/golden/GoldenTables.scala
Outdated
Show resolved
Hide resolved
@@ -83,6 +85,10 @@ public Protocol getProtocol() { | |||
return protocol; | |||
} | |||
|
|||
public Map<String, DomainMetadata> getDomainMetadataMap() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add docs (see getLatestTransactionVersion for an example)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is used just in tests? Do we see a need for it when writing to the table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a doc for this.
Currently, it is only used in tests. I imagine we will need this when writing to the table in the future.
For example, in Row Tracking, we’ll need to get its domain metadata from the snapshot to retrieve the previous HighWatermark
. We need this to assign fresh Row IDs to any AddFile
actions within dataActions
that don’t yet have row IDs before committing.
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java
Outdated
Show resolved
Hide resolved
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala
Outdated
Show resolved
Hide resolved
* | ||
* @param domainMetadatas List of domain metadata to be added. | ||
*/ | ||
public void addDomainMetadata(List<DomainMetadata> domainMetadatas) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far this is just used in tests. Is it the common practise to add multiple domain metadata as part of the txn?
Also we could move the duplicate check here itself than validating just before the commit.
One more thing should we automatically upgrade the protocol version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also -- this should be in the TransactionBuilderImpl class instead. is there a specific reason you want to build
a Transaction and then mutate and append domain metadatas to it afterwards?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a txn can contains multiple domain metadata - it seems to be the case in Delta-Spark.
One reason I do the duplication check just before committing is that domain metadata might be mutated in other code paths, such as during conflict resolution. Validating them right before the commit helps ensure that we capture all changes.
Is there a specific reason you want to build a Transaction and then mutate and append domain metadatas to it afterwards?
I'm not sure about other domain metadata usages. For Row Tracking, its domain metadata depends on all AddFile
actions in the dataActions
passed into Transaction.commit()
. We can only append this domain metadata after building the transaction and seeing all AddFile
actions. Also, conflict resolution can mutate existing domain metadatas.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like in practice we might not use an API like this? The domain metadata might just be generated as a step during commit
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like I'm having a bit of a hard time mentally modeling this without an actual feature using domain metadatas. Are we adding support for row tracking or anything else after this?
If these are all private APIs we can revisit whether they make sense when adding support for a feature that uses them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is used only for tests, can you please add Visible for testing
to the javadoc.
Also -- @allisonport-db and @vkorukanti what do you think about adding a @VisibleForTesting
annotation that we can add to methods to make tthis even clearer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to documenting it's for testing (at least for now as long as no feature is using it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Annotation SGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is only for testing now I think there's also some validity to moving it to TransactionBuilderImpl
as @scottsand-db suggested to avoid mutating a transaction. But since it's just for testing maybe it's not that important ¯_(ツ)_/¯
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think avoiding mutating a transaction makes sense. I've moved it to TransactionBuilderImpl
and mentioned Visible for testing
in its javadoc.
public static final String FEATURE_NAME = "domainMetadata"; | ||
|
||
/** The minimum writer version required to support domain metadata. */ | ||
public static final int MIN_WRITER_VERSION_REQUIRED = 7; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah we should move this. For now at least to TableFeatures.java
before we have a proper TableFeature class.
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala
Outdated
Show resolved
Hide resolved
* DomainMetadata} objects. | ||
* @throws RuntimeException if an I/O error occurs while closing the iterator. | ||
*/ | ||
private Map<String, DomainMetadata> loadDomainMetadataMap(Engine engine) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if this is the right way. Do we need to load all the DMs? Or we need to load just the specific DM for a domain name? For example in the txnVersion (just above) our usecase turned out be just loading one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m kind of concerned about efficiency if we load the DM for only a specific domain each time. If we need DMs for multiple domains within a single transaction, would it require a log replay round for each domain?
We will have a separate PR to optimize how we load domain metadata by loading it together with the protocol and metadata, instead of in a separate round. Perhaps we can defer this to that PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious about the pros/cons of combining this with the protocol and metadata but we can discuss on that PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will have a separate PR to optimize how we load domain metadata by loading it together with the protocol and metadata
@qiyuandong-db it seems there's some uncertainty as to whether this is the best approach. Can we have a brief sync on this? Or can we put together a very brief 1 decision doc to discuss the pros and cons? I think we should do that before you spend the time putting up a PR -- what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that sounds good and we should do that! I’ll reach out to set up a sync before we start on this.
kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DomainMetadata.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DomainMetadata.java
Show resolved
Hide resolved
* @param protocol the protocol to check | ||
* @return true if the "domainMetadata" feature is supported, false otherwise | ||
*/ | ||
public static boolean isDomainMetadataSupported(Protocol protocol) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vkorukanti -- do you think these sorts of "is-table-feature-supported" checks should go here or in TableFeatures.java? I'd prefer the latter, curious what you think. Thanks!
kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java
Outdated
Show resolved
Hide resolved
* | ||
* @param domainMetadatas List of domain metadata to be added. | ||
*/ | ||
public void addDomainMetadata(List<DomainMetadata> domainMetadatas) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also -- this should be in the TransactionBuilderImpl class instead. is there a specific reason you want to build
a Transaction and then mutate and append domain metadatas to it afterwards?
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CreateCheckpointIterator.java
Outdated
Show resolved
Hide resolved
* DomainMetadata} objects. | ||
* @throws RuntimeException if an I/O error occurs while closing the iterator. | ||
*/ | ||
private Map<String, DomainMetadata> loadDomainMetadataMap(Engine engine) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious about the pros/cons of combining this with the protocol and metadata but we can discuss on that PR
* @param domainMetadataActionVector A {@link ColumnVector} containing the domain metadata rows | ||
* @return A map where the keys are domain names and the values are {@link DomainMetadata} objects | ||
*/ | ||
public static Map<String, DomainMetadata> extractDomainMetadataMap( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems really similar to what we do in log replay. If we remove the duplicate check here can it be combined? If not can we at least clarify specifically when we use this method (during conflict resolution)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could combine them if we decide to remove the duplicate check (during conflict resolution).
But we might need to adjust the method signature and usage. Currently, the method returns a map for each domainMetadata ColumnVector
, but log replay requires building a single map after consuming all domainMetadata ColumnVector
s.
We could create the map outside the method, and change the signature to something like
public static void fillDomainMetadataMap(ColumnVector domainMetadataActionVector, Map<String, DomainMetadata> mapToBeFilled)
This would allow it to accommodate both use cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I was thinking something like that. And during log replay we combine the maps. That plan sounds good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed the duplicate check and adjust the signature to
public static void populateDomainMetadataMap(
ColumnVector domainMetadataActionVector, Map<String, DomainMetadata> domainMetadataMap)
And now it is used for both domain metadata conflict checking and log replay. Thanks for the suggestion!
kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/DomainMetadataUtils.java
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java
Outdated
Show resolved
Hide resolved
* | ||
* @param domainMetadatas List of domain metadata to be added. | ||
*/ | ||
public void addDomainMetadata(List<DomainMetadata> domainMetadatas) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is used only for tests, can you please add Visible for testing
to the javadoc.
Also -- @allisonport-db and @vkorukanti what do you think about adding a @VisibleForTesting
annotation that we can add to methods to make tthis even clearer?
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DomainMetadata.java
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java
Outdated
Show resolved
Hide resolved
…actDomainMetadataMap to fillDomainMetadataMap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have much more to add on top of the extensive reviews others have provided.
I think we should aim to merge this PR once the current comments are addressed and continue some discussions (around e.g. parsing JSON configuration or API design to create metadata) on the next PR introducing row tracking domain metadata, where having a concrete example is going to make decisions easier.
All APIs are internal and will remain that way for the foreseeable future - we don't have any plans even in Delta spark to let user create their own domain metadata action - so we can easily adjust as we move forward
Which Delta project/connector is this regarding?
Description
This PR adds support for Domain Metadata to Delta Kernel as described in the Delta Protocal.
In particular, it enables Delta Kernel to handle domain metadata across transaction commit, checkpointing, log replay, and conflict resolution.
How was this patch tested?
Added tests covering all operations involving DomainMetadata in
DomainMetadataSuite
Does this PR introduce any user-facing changes?
No.