-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[Kernel] Add Domain Metadata support to Delta Kernel #3835
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
94260ef
7612613
2ac3985
807c896
84d4ae6
509bc27
83f3b1e
7d7032e
dadc5a6
0c188c7
7948fdf
6195e7f
fd06f6d
3e30a41
cec85cf
8edcc9a
baca1c5
7e0a172
127de8c
c5f3672
b2d6546
cd7ddeb
68c77d8
5afec36
e358878
4f56a2f
189ec75
a4e3104
b0e4a65
f89199c
a5c5726
a893fa0
73faf8f
9ce9b56
60a29dd
b27ca3e
6b2c35e
5f8c4b1
33ddcbc
bf48c01
0fc12a7
20dfb86
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,11 +32,7 @@ | |
import io.delta.kernel.internal.fs.Path; | ||
import io.delta.kernel.internal.replay.ConflictChecker; | ||
import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState; | ||
import io.delta.kernel.internal.util.Clock; | ||
import io.delta.kernel.internal.util.ColumnMapping; | ||
import io.delta.kernel.internal.util.FileNames; | ||
import io.delta.kernel.internal.util.InCommitTimestampUtils; | ||
import io.delta.kernel.internal.util.VectorUtils; | ||
import io.delta.kernel.internal.util.*; | ||
import io.delta.kernel.types.StructType; | ||
import io.delta.kernel.utils.CloseableIterable; | ||
import io.delta.kernel.utils.CloseableIterator; | ||
|
@@ -73,6 +69,7 @@ public class TransactionImpl implements Transaction { | |
private final Optional<SetTransaction> setTxnOpt; | ||
private final boolean shouldUpdateProtocol; | ||
private final Clock clock; | ||
private final List<DomainMetadata> domainMetadatas = new ArrayList<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this field should be called |
||
private Metadata metadata; | ||
private boolean shouldUpdateMetadata; | ||
|
||
|
@@ -120,6 +117,23 @@ public StructType getSchema(Engine engine) { | |
return readSnapshot.getSchema(engine); | ||
} | ||
|
||
public Optional<SetTransaction> getSetTxnOpt() { | ||
qiyuandong-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return setTxnOpt; | ||
} | ||
|
||
/** | ||
* Internal API to add domain metadata actions for this transaction. Visible for testing. | ||
qiyuandong-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* | ||
* @param domainMetadatas List of domain metadata to be added to the transaction. | ||
*/ | ||
public void addDomainMetadatas(List<DomainMetadata> domainMetadatas) { | ||
this.domainMetadatas.addAll(domainMetadatas); | ||
} | ||
|
||
public List<DomainMetadata> getDomainMetadatas() { | ||
return domainMetadatas; | ||
} | ||
|
||
@Override | ||
public TransactionCommitResult commit(Engine engine, CloseableIterable<Row> dataActions) | ||
throws ConcurrentWriteException { | ||
|
@@ -221,6 +235,12 @@ private TransactionCommitResult doCommit( | |
} | ||
setTxnOpt.ifPresent(setTxn -> metadataActions.add(createTxnSingleAction(setTxn.toRow()))); | ||
|
||
// Check for duplicate domain metadata and if the protocol supports | ||
DomainMetadataUtils.validateDomainMetadatas(domainMetadatas, protocol); | ||
|
||
domainMetadatas.forEach( | ||
dm -> metadataActions.add(createDomainMetadataSingleAction(dm.toRow()))); | ||
|
||
try (CloseableIterator<Row> stageDataIter = dataActions.iterator()) { | ||
// Create a new CloseableIterator that will return the metadata actions followed by the | ||
// data actions. | ||
|
@@ -265,10 +285,6 @@ public boolean isBlindAppend() { | |
return true; | ||
} | ||
|
||
public Optional<SetTransaction> getSetTxnOpt() { | ||
return setTxnOpt; | ||
} | ||
|
||
/** | ||
* Generates a timestamp which is greater than the commit timestamp of the readSnapshot. This can | ||
* result in an additional file read and that this will only happen if ICT is enabled. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
/* | ||
* Copyright (2024) The Delta Lake Project Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.delta.kernel.internal.actions; | ||
|
||
import static io.delta.kernel.internal.util.InternalUtils.requireNonNull; | ||
import static io.delta.kernel.internal.util.Preconditions.checkArgument; | ||
import static java.util.Objects.requireNonNull; | ||
|
||
import io.delta.kernel.data.ColumnVector; | ||
import io.delta.kernel.data.Row; | ||
import io.delta.kernel.internal.data.GenericRow; | ||
import io.delta.kernel.types.BooleanType; | ||
import io.delta.kernel.types.StringType; | ||
import io.delta.kernel.types.StructType; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
/** Delta log action representing an `DomainMetadata` action */ | ||
public class DomainMetadata { | ||
qiyuandong-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/** Full schema of the {@link DomainMetadata} action in the Delta Log. */ | ||
public static final StructType FULL_SCHEMA = | ||
new StructType() | ||
.add("domain", StringType.STRING, false /* nullable */) | ||
.add("configuration", StringType.STRING, false /* nullable */) | ||
.add("removed", BooleanType.BOOLEAN, false /* nullable */); | ||
|
||
public static DomainMetadata fromColumnVector(ColumnVector vector, int rowId) { | ||
if (vector.isNullAt(rowId)) { | ||
return null; | ||
} | ||
Comment on lines
+40
to
+43
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (sorry again naive question) is this a typical pattern? something that returns There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This does appear to be a pattern within Kernel. I too have wondered about this a bit. We seem to often first check if a column vector is null at a row id. If not, then we call IMO, in another PR, we can clarify the expectations of of all of our There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. making sure i wasn't crazy and yes obviously much prefer Option :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. need a follow-up issue to ensure this isn't dropped? |
||
return new DomainMetadata( | ||
requireNonNull(vector.getChild(0), rowId, "domain").getString(rowId), | ||
requireNonNull(vector.getChild(1), rowId, "configuration").getString(rowId), | ||
requireNonNull(vector.getChild(2), rowId, "removed").getBoolean(rowId)); | ||
} | ||
|
||
/** | ||
* Creates a {@link DomainMetadata} instance from a Row with the schema being {@link | ||
* DomainMetadata#FULL_SCHEMA}. | ||
* | ||
* @param row the Row object containing the DomainMetadata action | ||
* @return a DomainMetadata instance or null if the row is null | ||
* @throws IllegalArgumentException if the schema of the row does not match {@link | ||
* DomainMetadata#FULL_SCHEMA} | ||
*/ | ||
public static DomainMetadata fromRow(Row row) { | ||
qiyuandong-db marked this conversation as resolved.
Show resolved
Hide resolved
qiyuandong-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (row == null) { | ||
return null; | ||
} | ||
checkArgument( | ||
row.getSchema().equals(FULL_SCHEMA), | ||
"Expected schema: %s, found: %s", | ||
FULL_SCHEMA, | ||
row.getSchema()); | ||
return new DomainMetadata( | ||
requireNonNull(row, 0, "domain").getString(0), | ||
requireNonNull(row, 1, "configuration").getString(1), | ||
requireNonNull(row, 2, "removed").getBoolean(2)); | ||
} | ||
|
||
private final String domain; | ||
private final String configuration; | ||
private final boolean removed; | ||
|
||
/** | ||
* The domain metadata action contains a configuration string for a named metadata domain. Two | ||
* overlapping transactions conflict if they both contain a domain metadata action for the same | ||
* metadata domain. Per-domain conflict resolution logic can be implemented. | ||
* | ||
* @param domain A string used to identify a specific domain. | ||
* @param configuration A string containing configuration for the metadata domain. | ||
* @param removed If it is true it serves as a tombstone to logically delete a {@link | ||
* DomainMetadata} action. | ||
*/ | ||
public DomainMetadata(String domain, String configuration, boolean removed) { | ||
scottsand-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
this.domain = requireNonNull(domain, "domain is null"); | ||
this.configuration = requireNonNull(configuration, "configuration is null"); | ||
this.removed = removed; | ||
qiyuandong-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
public String getDomain() { | ||
return domain; | ||
} | ||
|
||
public String getConfiguration() { | ||
return configuration; | ||
} | ||
|
||
public boolean isRemoved() { | ||
return removed; | ||
} | ||
|
||
/** | ||
* Encode as a {@link Row} object with the schema {@link DomainMetadata#FULL_SCHEMA}. | ||
* | ||
* @return {@link Row} object with the schema {@link DomainMetadata#FULL_SCHEMA} | ||
*/ | ||
public Row toRow() { | ||
Map<Integer, Object> domainMetadataMap = new HashMap<>(); | ||
domainMetadataMap.put(0, domain); | ||
domainMetadataMap.put(1, configuration); | ||
domainMetadataMap.put(2, removed); | ||
|
||
return new GenericRow(DomainMetadata.FULL_SCHEMA, domainMetadataMap); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
qiyuandong-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return String.format( | ||
"DomainMetadata{domain='%s', configuration='%s', removed='%s'}", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For |
||
domain, configuration, removed); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object obj) { | ||
if (this == obj) return true; | ||
if (obj == null || getClass() != obj.getClass()) return false; | ||
DomainMetadata that = (DomainMetadata) obj; | ||
return removed == that.removed | ||
&& domain.equals(that.domain) | ||
&& configuration.equals(that.configuration); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return java.util.Objects.hash(domain, configuration, removed); | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.