-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Kernel] Added Domain Metadata support to Delta Kernel #3835
base: master
Are you sure you want to change the base?
Changes from all commits
94260ef
7612613
2ac3985
807c896
84d4ae6
509bc27
83f3b1e
7d7032e
dadc5a6
0c188c7
7948fdf
6195e7f
fd06f6d
3e30a41
cec85cf
8edcc9a
baca1c5
7e0a172
127de8c
c5f3672
b2d6546
cd7ddeb
68c77d8
5afec36
e358878
4f56a2f
189ec75
a4e3104
b0e4a65
f89199c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ | |
import io.delta.kernel.engine.CommitCoordinatorClientHandler; | ||
import io.delta.kernel.engine.Engine; | ||
import io.delta.kernel.internal.actions.CommitInfo; | ||
import io.delta.kernel.internal.actions.DomainMetadata; | ||
import io.delta.kernel.internal.actions.Metadata; | ||
import io.delta.kernel.internal.actions.Protocol; | ||
import io.delta.kernel.internal.fs.Path; | ||
|
@@ -31,6 +32,7 @@ | |
import io.delta.kernel.internal.snapshot.LogSegment; | ||
import io.delta.kernel.internal.snapshot.TableCommitCoordinatorClientHandler; | ||
import io.delta.kernel.types.StructType; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
|
||
/** Implementation of {@link Snapshot}. */ | ||
|
@@ -83,6 +85,17 @@ public Protocol getProtocol() { | |
return protocol; | ||
} | ||
|
||
/** | ||
* Get the domain metadata map from the log replay, which lazily loads and replays a history of | ||
* domain metadata actions, resolving them to produce the current state of the domain metadata. | ||
* | ||
* @return A map where the keys are domain names and the values are {@link DomainMetadata} | ||
* objects. | ||
*/ | ||
public Map<String, DomainMetadata> getDomainMetadataMap() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add docs (see getLatestTransactionVersion for an example) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like this is used just in tests? Do we see a need for it when writing to the table? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added a doc for this. Currently, it is only used in tests. I imagine we will need this when writing to the table in the future. For example, in Row Tracking, we’ll need to get its domain metadata from the snapshot to retrieve the previous |
||
return logReplay.getDomainMetadataMap(); | ||
} | ||
|
||
public CreateCheckpointIterator getCreateCheckpointIterator(Engine engine) { | ||
long minFileRetentionTimestampMillis = | ||
System.currentTimeMillis() - TOMBSTONE_RETENTION.fromMetadata(metadata); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
/* | ||
* Copyright (2024) The Delta Lake Project Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.delta.kernel.internal.actions; | ||
|
||
import static io.delta.kernel.internal.util.InternalUtils.requireNonNull; | ||
import static java.util.Objects.requireNonNull; | ||
|
||
import io.delta.kernel.data.ColumnVector; | ||
import io.delta.kernel.data.Row; | ||
import io.delta.kernel.internal.data.GenericRow; | ||
import io.delta.kernel.types.BooleanType; | ||
import io.delta.kernel.types.StringType; | ||
import io.delta.kernel.types.StructType; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
/** Delta log action representing an `DomainMetadata` action */ | ||
public class DomainMetadata { | ||
/** Full schema of the {@link DomainMetadata} action in the Delta Log. */ | ||
public static final StructType FULL_SCHEMA = | ||
new StructType() | ||
.add("domain", StringType.STRING, false /* nullable */) | ||
.add("configuration", StringType.STRING, false /* nullable */) | ||
.add("removed", BooleanType.BOOLEAN, false /* nullable */); | ||
|
||
public static DomainMetadata fromColumnVector(ColumnVector vector, int rowId) { | ||
if (vector.isNullAt(rowId)) { | ||
return null; | ||
} | ||
return new DomainMetadata( | ||
requireNonNull(vector.getChild(0), rowId, "domain").getString(rowId), | ||
requireNonNull(vector.getChild(1), rowId, "configuration").getString(rowId), | ||
requireNonNull(vector.getChild(2), rowId, "removed").getBoolean(rowId)); | ||
} | ||
|
||
public static DomainMetadata fromRow(Row row) { | ||
qiyuandong-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (row == null) { | ||
return null; | ||
} | ||
assert (row.getSchema().equals(FULL_SCHEMA)); | ||
return new DomainMetadata( | ||
requireNonNull(row, 0, "domain").getString(0), | ||
requireNonNull(row, 1, "configuration").getString(1), | ||
requireNonNull(row, 2, "removed").getBoolean(2)); | ||
} | ||
|
||
private final String domain; | ||
private final String configuration; | ||
private final boolean removed; | ||
|
||
/** | ||
* The domain metadata action contains a configuration string for a named metadata domain. Two | ||
* overlapping transactions conflict if they both contain a domain metadata action for the same | ||
* metadata domain. Per-domain conflict resolution logic can be implemented. | ||
* | ||
* @param domain A string used to identify a specific domain. | ||
* @param configuration A string containing configuration for the metadata domain. | ||
* @param removed If it is true it serves as a tombstone to logically delete a {@link | ||
* DomainMetadata} action. | ||
*/ | ||
public DomainMetadata(String domain, String configuration, boolean removed) { | ||
scottsand-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
this.domain = requireNonNull(domain, "domain is null"); | ||
this.configuration = requireNonNull(configuration, "configuration is null"); | ||
this.removed = removed; | ||
} | ||
|
||
public String getDomain() { | ||
return domain; | ||
} | ||
|
||
public String getConfiguration() { | ||
return configuration; | ||
} | ||
|
||
public boolean isRemoved() { | ||
return removed; | ||
} | ||
|
||
/** | ||
* Encode as a {@link Row} object with the schema {@link DomainMetadata#FULL_SCHEMA}. | ||
* | ||
* @return {@link Row} object with the schema {@link DomainMetadata#FULL_SCHEMA} | ||
*/ | ||
public Row toRow() { | ||
Map<Integer, Object> domainMetadataMap = new HashMap<>(); | ||
domainMetadataMap.put(0, domain); | ||
domainMetadataMap.put(1, configuration); | ||
domainMetadataMap.put(2, removed); | ||
|
||
return new GenericRow(DomainMetadata.FULL_SCHEMA, domainMetadataMap); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
qiyuandong-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return String.format( | ||
"DomainMetadata{domain='%s', configuration='%s', removed='%s'}", | ||
domain, configuration, removed); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object obj) { | ||
if (this == obj) return true; | ||
if (obj == null || getClass() != obj.getClass()) return false; | ||
DomainMetadata that = (DomainMetadata) obj; | ||
return removed == that.removed | ||
&& domain.equals(that.domain) | ||
&& configuration.equals(that.configuration); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return java.util.Objects.hash(domain, configuration, removed); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this error is thrown when a commit (already on the filesystem contains duplicate entries for the same metadata). In this case the table is corrupted and we should use
InvalidTableException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error is thrown in two cases:
Transaction.commit()
(write path).As discussed in another thread, I think we can possibly remove the duplicate check in DM conflict resolving, and in this case this error is only thrown in
Transaction.commit()
.