diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java index 6a3dc23fd4..ea0a4b3010 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java @@ -18,6 +18,7 @@ import static java.lang.String.format; import io.delta.kernel.exceptions.*; +import io.delta.kernel.internal.actions.DomainMetadata; import io.delta.kernel.types.DataType; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.DataFileStatus; @@ -274,6 +275,34 @@ public static KernelException invalidConfigurationValueException( return new InvalidConfigurationValueException(key, value, helpMessage); } + public static KernelException domainMetadataUnsupported() { + String message = + "Found DomainMetadata action(s) but table feature 'domainMetadata' " + + "is not supported on this table."; + return new KernelException(message); + } + + public static KernelException duplicateDomainMetadataAction( + String domain, DomainMetadata action1, DomainMetadata action2) { + String message = + String.format( + "Multiple actions detected for domain '%s' in single transaction: '%s' and '%s'. " + + "Only one action per domain is allowed.", + domain, action1.toString(), action2.toString()); + return new KernelException(message); + } + + public static ConcurrentWriteException concurrentDomainMetadataAction( + DomainMetadata domainMetadataAttempt, DomainMetadata winningDomainMetadata) { + String message = + String.format( + "A concurrent writer added a domainMetadata action for the same domain: %s. " + + "No domain-specific conflict resolution is available for this domain. " + + "Attempted domainMetadata: %s. Winning domainMetadata: %s", + domainMetadataAttempt.getDomain(), domainMetadataAttempt, winningDomainMetadata); + return new ConcurrentWriteException(message); + } + /* ------------------------ HELPER METHODS ----------------------------- */ private static String formatTimestamp(long millisSinceEpochUTC) { return new Timestamp(millisSinceEpochUTC).toInstant().toString(); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java index 67dca98e89..5dd6cd2d2d 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java @@ -23,6 +23,7 @@ import io.delta.kernel.engine.CommitCoordinatorClientHandler; import io.delta.kernel.engine.Engine; import io.delta.kernel.internal.actions.CommitInfo; +import io.delta.kernel.internal.actions.DomainMetadata; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; import io.delta.kernel.internal.fs.Path; @@ -31,6 +32,7 @@ import io.delta.kernel.internal.snapshot.LogSegment; import io.delta.kernel.internal.snapshot.TableCommitCoordinatorClientHandler; import io.delta.kernel.types.StructType; +import java.util.Map; import java.util.Optional; /** Implementation of {@link Snapshot}. */ @@ -83,6 +85,17 @@ public Protocol getProtocol() { return protocol; } + /** + * Get the domain metadata map from the log replay, which lazily loads and replays a history of + * domain metadata actions, resolving them to produce the current state of the domain metadata. + * + * @return A map where the keys are domain names and the values are {@link DomainMetadata} + * objects. + */ + public Map getDomainMetadataMap() { + return logReplay.getDomainMetadataMap(); + } + public CreateCheckpointIterator getCreateCheckpointIterator(Engine engine) { long minFileRetentionTimestampMillis = System.currentTimeMillis() - TOMBSTONE_RETENTION.fromMetadata(metadata); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java index eeeffc7cc7..2f91a4650c 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java @@ -39,6 +39,7 @@ public class TableFeatures { add("columnMapping"); add("typeWidening-preview"); add("typeWidening"); + add("domainMetadata"); } }); @@ -57,6 +58,12 @@ public class TableFeatures { } }); + /** The feature name for domain metadata. */ + public static final String DOMAIN_METADATA_FEATURE_NAME = "domainMetadata"; + + /** The minimum writer version required to support domain metadata. */ + public static final int DOMAIN_METADATA_MIN_WRITER_VERSION_REQUIRED = 7; + //////////////////// // Helper Methods // //////////////////// @@ -93,7 +100,7 @@ public static void validateReadSupportedTable( *
  • protocol writer version 1. *
  • protocol writer version 2 only with appendOnly feature enabled. *
  • protocol writer version 7 with {@code appendOnly}, {@code inCommitTimestamp}, {@code - * columnMapping}, {@code typeWidening} feature enabled. + * columnMapping}, {@code typeWidening}, {@code domainMetadata} feature enabled. * * * @param protocol Table protocol @@ -125,20 +132,8 @@ public static void validateWriteSupportedTable( throw unsupportedWriterProtocol(tablePath, minWriterVersion); case 7: for (String writerFeature : protocol.getWriterFeatures()) { - switch (writerFeature) { - // Only supported writer features as of today in Kernel - case "appendOnly": - break; - case "inCommitTimestamp": - break; - case "columnMapping": - break; - case "typeWidening-preview": - break; - case "typeWidening": - break; - default: - throw unsupportedWriterFeature(tablePath, writerFeature); + if (!SUPPORTED_WRITER_FEATURES.contains(writerFeature)) { + throw unsupportedWriterFeature(tablePath, writerFeature); } } break; diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java index b11e4e8d0a..86f2321e20 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java @@ -54,6 +54,7 @@ public class TransactionBuilderImpl implements TransactionBuilder { private Optional> partitionColumns = Optional.empty(); private Optional setTxnOpt = Optional.empty(); private Optional> tableProperties = Optional.empty(); + private List domainMetadatas = new ArrayList<>(); public TransactionBuilderImpl(TableImpl table, String engineInfo, Operation operation) { this.table = table; @@ -93,6 +94,16 @@ public TransactionBuilder withTableProperties(Engine engine, Map return this; } + /** + * Internal API to set the domain metadata for the transaction. Visible for testing. + * + * @param domainMetadatas List of domain metadata to be added to the transaction. + */ + public TransactionBuilder withDomainMetadatas(List domainMetadatas) { + this.domainMetadatas = new ArrayList<>(domainMetadatas); + return this; + } + @Override public Transaction build(Engine engine) { SnapshotImpl snapshot; @@ -156,7 +167,8 @@ public Transaction build(Engine engine) { setTxnOpt, shouldUpdateMetadata, shouldUpdateProtocol, - table.getClock()); + table.getClock(), + domainMetadatas); } /** Validate the given parameters for the transaction. */ diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java index 7b90d2be1d..602de9f6c2 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java @@ -32,11 +32,7 @@ import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.replay.ConflictChecker; import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState; -import io.delta.kernel.internal.util.Clock; -import io.delta.kernel.internal.util.ColumnMapping; -import io.delta.kernel.internal.util.FileNames; -import io.delta.kernel.internal.util.InCommitTimestampUtils; -import io.delta.kernel.internal.util.VectorUtils; +import io.delta.kernel.internal.util.*; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterable; import io.delta.kernel.utils.CloseableIterator; @@ -73,6 +69,7 @@ public class TransactionImpl implements Transaction { private final Optional setTxnOpt; private final boolean shouldUpdateProtocol; private final Clock clock; + private final List domainMetadatas; private Metadata metadata; private boolean shouldUpdateMetadata; @@ -90,7 +87,8 @@ public TransactionImpl( Optional setTxnOpt, boolean shouldUpdateMetadata, boolean shouldUpdateProtocol, - Clock clock) { + Clock clock, + List domainMetadatas) { this.isNewTable = isNewTable; this.dataPath = dataPath; this.logPath = logPath; @@ -103,6 +101,7 @@ public TransactionImpl( this.shouldUpdateMetadata = shouldUpdateMetadata; this.shouldUpdateProtocol = shouldUpdateProtocol; this.clock = clock; + this.domainMetadatas = domainMetadatas; } @Override @@ -221,6 +220,12 @@ private TransactionCommitResult doCommit( } setTxnOpt.ifPresent(setTxn -> metadataActions.add(createTxnSingleAction(setTxn.toRow()))); + // Check for duplicate domain metadata and if the protocol supports + DomainMetadataUtils.validateDomainMetadatas(domainMetadatas, protocol); + + domainMetadatas.forEach( + dm -> metadataActions.add(createDomainMetadataSingleAction(dm.toRow()))); + try (CloseableIterator stageDataIter = dataActions.iterator()) { // Create a new CloseableIterator that will return the metadata actions followed by the // data actions. @@ -269,6 +274,10 @@ public Optional getSetTxnOpt() { return setTxnOpt; } + public List getDomainMetadatas() { + return domainMetadatas; + } + /** * Generates a timestamp which is greater than the commit timestamp of the readSnapshot. This can * result in an additional file read and that this will only happen if ICT is enabled. diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DomainMetadata.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DomainMetadata.java new file mode 100644 index 0000000000..77daa14731 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DomainMetadata.java @@ -0,0 +1,127 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.actions; + +import static io.delta.kernel.internal.util.InternalUtils.requireNonNull; +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.Row; +import io.delta.kernel.internal.data.GenericRow; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructType; +import java.util.HashMap; +import java.util.Map; + +/** Delta log action representing an `DomainMetadata` action */ +public class DomainMetadata { + /** Full schema of the {@link DomainMetadata} action in the Delta Log. */ + public static final StructType FULL_SCHEMA = + new StructType() + .add("domain", StringType.STRING, false /* nullable */) + .add("configuration", StringType.STRING, false /* nullable */) + .add("removed", BooleanType.BOOLEAN, false /* nullable */); + + public static DomainMetadata fromColumnVector(ColumnVector vector, int rowId) { + if (vector.isNullAt(rowId)) { + return null; + } + return new DomainMetadata( + requireNonNull(vector.getChild(0), rowId, "domain").getString(rowId), + requireNonNull(vector.getChild(1), rowId, "configuration").getString(rowId), + requireNonNull(vector.getChild(2), rowId, "removed").getBoolean(rowId)); + } + + public static DomainMetadata fromRow(Row row) { + if (row == null) { + return null; + } + assert (row.getSchema().equals(FULL_SCHEMA)); + return new DomainMetadata( + requireNonNull(row, 0, "domain").getString(0), + requireNonNull(row, 1, "configuration").getString(1), + requireNonNull(row, 2, "removed").getBoolean(2)); + } + + private final String domain; + private final String configuration; + private final boolean removed; + + /** + * The domain metadata action contains a configuration string for a named metadata domain. Two + * overlapping transactions conflict if they both contain a domain metadata action for the same + * metadata domain. Per-domain conflict resolution logic can be implemented. + * + * @param domain A string used to identify a specific domain. + * @param configuration A string containing configuration for the metadata domain. + * @param removed If it is true it serves as a tombstone to logically delete a {@link + * DomainMetadata} action. + */ + public DomainMetadata(String domain, String configuration, boolean removed) { + this.domain = requireNonNull(domain, "domain is null"); + this.configuration = requireNonNull(configuration, "configuration is null"); + this.removed = removed; + } + + public String getDomain() { + return domain; + } + + public String getConfiguration() { + return configuration; + } + + public boolean isRemoved() { + return removed; + } + + /** + * Encode as a {@link Row} object with the schema {@link DomainMetadata#FULL_SCHEMA}. + * + * @return {@link Row} object with the schema {@link DomainMetadata#FULL_SCHEMA} + */ + public Row toRow() { + Map domainMetadataMap = new HashMap<>(); + domainMetadataMap.put(0, domain); + domainMetadataMap.put(1, configuration); + domainMetadataMap.put(2, removed); + + return new GenericRow(DomainMetadata.FULL_SCHEMA, domainMetadataMap); + } + + @Override + public String toString() { + return String.format( + "DomainMetadata{domain='%s', configuration='%s', removed='%s'}", + domain, configuration, removed); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + DomainMetadata that = (DomainMetadata) obj; + return removed == that.removed + && domain.equals(that.domain) + && configuration.equals(that.configuration); + } + + @Override + public int hashCode() { + return java.util.Objects.hash(domain, configuration, removed); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/SingleAction.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/SingleAction.java index 94626ea5f7..1f32226a0e 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/SingleAction.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/SingleAction.java @@ -18,6 +18,7 @@ import io.delta.kernel.data.Row; import io.delta.kernel.internal.data.GenericRow; import io.delta.kernel.types.StructType; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -32,7 +33,9 @@ public class SingleAction { .add("add", AddFile.FULL_SCHEMA) .add("remove", RemoveFile.FULL_SCHEMA) .add("metaData", Metadata.FULL_SCHEMA) - .add("protocol", Protocol.FULL_SCHEMA); + .add("protocol", Protocol.FULL_SCHEMA) + .add("domainMetadata", DomainMetadata.FULL_SCHEMA); + // Once we start supporting updating CDC or domain metadata enabled tables, we should add the // schema for those fields here. @@ -48,7 +51,9 @@ public class SingleAction { // .add("remove", RemoveFile.FULL_SCHEMA) // not needed for blind appends .add("metaData", Metadata.FULL_SCHEMA) .add("protocol", Protocol.FULL_SCHEMA) - .add("commitInfo", CommitInfo.FULL_SCHEMA); + .add("commitInfo", CommitInfo.FULL_SCHEMA) + .add("domainMetadata", DomainMetadata.FULL_SCHEMA); + // Once we start supporting domain metadata/row tracking enabled tables, we should add the // schema for domain metadata fields here. @@ -61,7 +66,8 @@ public class SingleAction { .add("metaData", Metadata.FULL_SCHEMA) .add("protocol", Protocol.FULL_SCHEMA) .add("cdc", new StructType()) - .add("commitInfo", CommitInfo.FULL_SCHEMA); + .add("commitInfo", CommitInfo.FULL_SCHEMA) + .add("domainMetadata", DomainMetadata.FULL_SCHEMA); // Once we start supporting updating CDC or domain metadata enabled tables, we should add the // schema for those fields here. @@ -71,6 +77,7 @@ public class SingleAction { private static final int METADATA_ORDINAL = FULL_SCHEMA.indexOf("metaData"); private static final int PROTOCOL_ORDINAL = FULL_SCHEMA.indexOf("protocol"); private static final int COMMIT_INFO_ORDINAL = FULL_SCHEMA.indexOf("commitInfo"); + private static final int DOMAIN_METADATA_ORDINAL = FULL_SCHEMA.indexOf("domainMetadata"); public static Row createAddFileSingleAction(Row addFile) { Map singleActionValueMap = new HashMap<>(); @@ -102,6 +109,11 @@ public static Row createCommitInfoSingleAction(Row commitInfo) { return new GenericRow(FULL_SCHEMA, singleActionValueMap); } + public static Row createDomainMetadataSingleAction(Row domainMetadata) { + return new GenericRow( + FULL_SCHEMA, Collections.singletonMap(DOMAIN_METADATA_ORDINAL, domainMetadata)); + } + public static Row createTxnSingleAction(Row txn) { Map singleActionValueMap = new HashMap<>(); singleActionValueMap.put(TXN_ORDINAL, txn); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java index 1846fd2259..33808db8be 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java @@ -15,9 +15,10 @@ */ package io.delta.kernel.internal.replay; +import static io.delta.kernel.internal.DeltaErrors.concurrentDomainMetadataAction; import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO; import static io.delta.kernel.internal.TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED; -import static io.delta.kernel.internal.actions.SingleAction.CONFLICT_RESOLUTION_SCHEMA; +import static io.delta.kernel.internal.actions.SingleAction.*; import static io.delta.kernel.internal.util.FileNames.deltaFile; import static io.delta.kernel.internal.util.Preconditions.checkArgument; import static io.delta.kernel.internal.util.Preconditions.checkState; @@ -29,7 +30,9 @@ import io.delta.kernel.exceptions.ConcurrentWriteException; import io.delta.kernel.internal.*; import io.delta.kernel.internal.actions.CommitInfo; +import io.delta.kernel.internal.actions.DomainMetadata; import io.delta.kernel.internal.actions.SetTransaction; +import io.delta.kernel.internal.util.DomainMetadataUtils; import io.delta.kernel.internal.util.FileNames; import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.FileStatus; @@ -48,6 +51,8 @@ public class ConflictChecker { private static final int METADATA_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("metaData"); private static final int TXN_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("txn"); private static final int COMMITINFO_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("commitInfo"); + private static final int DOMAIN_METADATA_ORDINAL = + CONFLICT_RESOLUTION_SCHEMA.indexOf("domainMetadata"); // Snapshot of the table read by the transaction that encountered the conflict // (a.k.a the losing transaction) @@ -109,6 +114,7 @@ public TransactionRebaseState resolveConflicts(Engine engine) throws ConcurrentW handleProtocol(batch.getColumnVector(PROTOCOL_ORDINAL)); handleMetadata(batch.getColumnVector(METADATA_ORDINAL)); handleTxn(batch.getColumnVector(TXN_ORDINAL)); + handleDomainMetadata(batch.getColumnVector(DOMAIN_METADATA_ORDINAL)); }); } catch (IOException ioe) { throw new UncheckedIOException("Error reading actions from winning commits.", ioe); @@ -191,6 +197,39 @@ private void handleMetadata(ColumnVector metadataVector) { } } + /** + * Checks whether each of the current transaction's {@link DomainMetadata} conflicts with the + * winning transaction at any domain. + * + *
      + *
    1. Accept the current transaction if its set of metadata domains does not overlap with the + * winning transaction's set of metadata domains. + *
    2. Otherwise, fail the current transaction unless each conflicting domain is associated with + * a domain-specific way of resolving the conflict. + *
    + * + * @param domainMetadataVector domainMetadata rows from the winning transactions + */ + private void handleDomainMetadata(ColumnVector domainMetadataVector) { + // Build a domain metadata map from the winning transaction. + Map winningTxnDomainMetadataMap = new HashMap<>(); + DomainMetadataUtils.populateDomainMetadataMap( + domainMetadataVector, winningTxnDomainMetadataMap); + + for (DomainMetadata currentTxnDM : this.transaction.getDomainMetadatas()) { + // For each domain metadata action in the current transaction, check if it has a conflict with + // the winning transaction. + String domainName = currentTxnDM.getDomain(); + DomainMetadata winningTxnDM = winningTxnDomainMetadataMap.get(domainName); + if (winningTxnDM != null) { + // Conflict - check if the conflict can be resolved. + // Currently, we don't have any domain-specific way of resolving the conflict. + // Domain-specific ways of resolving the conflict can be added here (e.g. for Row Tracking). + throw concurrentDomainMetadataAction(currentTxnDM, winningTxnDM); + } + } + } + /** * Get the commit info from the winning transactions. * diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CreateCheckpointIterator.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CreateCheckpointIterator.java index d1b3d55ee7..11ff3879aa 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CreateCheckpointIterator.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CreateCheckpointIterator.java @@ -78,6 +78,8 @@ public class CreateCheckpointIterator implements CloseableIterator txnAppIdToVersion = new HashMap<>(); + // Current state of all domains we have seen in {@link DomainMetadata} during the log replay. We + // traverse the log in reverse, so remembering the domains we have seen is enough for creating a + // checkpoint. + private final Set domainSeen = new HashSet<>(); + // Metadata about the checkpoint to store in `_last_checkpoint` file private long numberOfAddActions = 0; // final number of add actions survived in the checkpoint @@ -234,6 +241,11 @@ private boolean prepareNext() { final ColumnVector txnVector = getVector(actionsBatch, TXN_ORDINAL); processTxn(txnVector, selectionVectorBuffer); + // Step 5: Process the domain metadata + final ColumnVector domainMetadataDomainNameVector = + getVector(actionsBatch, DOMAIN_METADATA_DOMAIN_NAME_ORDINAL); + processDomainMetadata(domainMetadataDomainNameVector, selectionVectorBuffer); + Optional selectionVector = Optional.of(createSelectionVector(selectionVectorBuffer, actionsBatch.getSize())); toReturnNext = Optional.of(new FilteredColumnarBatch(actionsBatch, selectionVector)); @@ -352,6 +364,37 @@ private void processTxn(ColumnVector txnVector, boolean[] selectionVectorBuffer) } } + /** + * Processes domain metadata actions during checkpoint creation. During the reverse log replay, + * for each domain, we only keep the first (latest) domain metadata action encountered by + * selecting them in the selection vector, and ignore any older ones for the same domain by + * unselecting them. + * + * @param domainMetadataVector Column vector containing domain names of domain metadata actions. + * @param selectionVectorBuffer The selection vector to attach to the batch to indicate which + * records to write to the checkpoint and which ones not to. + */ + private void processDomainMetadata( + ColumnVector domainMetadataVector, boolean[] selectionVectorBuffer) { + final int vectorSize = domainMetadataVector.getSize(); + for (int rowId = 0; rowId < vectorSize; rowId++) { + if (domainMetadataVector.isNullAt(rowId)) { + continue; // selectionVector will be `false` at rowId by default + } + + final String domain = domainMetadataVector.getString(rowId); + if (domainSeen.contains(domain)) { + // We do a reverse log replay. The latest domainMetadata seen for a given domain wins and + // should be written to the checkpoint. Anything after the first one shouldn't be in + // checkpoint. + unselect(selectionVectorBuffer, rowId); + } else { + select(selectionVectorBuffer, rowId); + domainSeen.add(domain); + } + } + } + private void unselect(boolean[] selectionVectorBuffer, int rowId) { // Just use the java assert (which are enabled in tests) for sanity checks. This should // never happen. Given this is going to be on the hot path, we want to avoid cost in diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java index 170dad9ac1..f81ce16c99 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java @@ -27,13 +27,18 @@ import io.delta.kernel.internal.actions.*; import io.delta.kernel.internal.checkpoints.SidecarFile; import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.lang.Lazy; import io.delta.kernel.internal.snapshot.LogSegment; import io.delta.kernel.internal.snapshot.SnapshotHint; +import io.delta.kernel.internal.util.DomainMetadataUtils; import io.delta.kernel.internal.util.Tuple2; import io.delta.kernel.types.StringType; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterator; import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; /** @@ -74,6 +79,10 @@ private static StructType getAddSchema(boolean shouldReadStats) { return shouldReadStats ? AddFile.SCHEMA_WITH_STATS : AddFile.SCHEMA_WITHOUT_STATS; } + /** Read schema when searching for just the domain metadata */ + public static final StructType DOMAIN_METADATA_READ_SCHEMA = + new StructType().add("domainMetadata", DomainMetadata.FULL_SCHEMA); + public static String SIDECAR_FIELD_NAME = "sidecar"; public static String ADDFILE_FIELD_NAME = "add"; public static String REMOVEFILE_FIELD_NAME = "remove"; @@ -109,6 +118,7 @@ public static StructType getAddRemoveReadSchema(boolean shouldReadStats) { private final Path dataPath; private final LogSegment logSegment; private final Tuple2 protocolAndMetadata; + private final Lazy> domainMetadataMap; public LogReplay( Path logPath, @@ -122,6 +132,8 @@ public LogReplay( this.dataPath = dataPath; this.logSegment = logSegment; this.protocolAndMetadata = loadTableProtocolAndMetadata(engine, snapshotHint, snapshotVersion); + // Lazy loading of domain metadata only when needed + this.domainMetadataMap = new Lazy<>(() -> loadDomainMetadataMap(engine)); } ///////////////// @@ -140,6 +152,10 @@ public Optional getLatestTransactionIdentifier(Engine engine, String appli return loadLatestTransactionVersion(engine, applicationId); } + public Map getDomainMetadataMap() { + return domainMetadataMap.get(); + } + /** * Returns an iterator of {@link FilteredColumnarBatch} representing all the active AddFiles in * the table. @@ -296,4 +312,40 @@ private Optional loadLatestTransactionVersion(Engine engine, String applic return Optional.empty(); } + + /** + * Retrieves a map of domainName to {@link DomainMetadata} from the log files. + * + *

    Loading domain metadata requires an additional round of log replay so this is done lazily + * only when domain metadata is requested. We might want to merge this into {@link + * #loadTableProtocolAndMetadata}. + * + * @param engine The engine used to process the log files. + * @return A map where the keys are domain names and the values are the corresponding {@link + * DomainMetadata} objects. + * @throws UncheckedIOException if an I/O error occurs while closing the iterator. + */ + private Map loadDomainMetadataMap(Engine engine) { + try (CloseableIterator reverseIter = + new ActionsIterator( + engine, + logSegment.allLogFilesReversed(), + DOMAIN_METADATA_READ_SCHEMA, + Optional.empty() /* checkpointPredicate */)) { + Map domainMetadataMap = new HashMap<>(); + while (reverseIter.hasNext()) { + final ColumnarBatch columnarBatch = reverseIter.next().getColumnarBatch(); + assert (columnarBatch.getSchema().equals(DOMAIN_METADATA_READ_SCHEMA)); + + final ColumnVector dmVector = columnarBatch.getColumnVector(0); + + // We are performing a reverse log replay. This function ensures that only the first + // encountered domain metadata for each domain is added to the map. + DomainMetadataUtils.populateDomainMetadataMap(dmVector, domainMetadataMap); + } + return domainMetadataMap; + } catch (IOException ex) { + throw new UncheckedIOException("Could not close iterator", ex); + } + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/DomainMetadataUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/DomainMetadataUtils.java new file mode 100644 index 0000000000..0c18a66ae6 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/DomainMetadataUtils.java @@ -0,0 +1,103 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.internal.util; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.internal.DeltaErrors; +import io.delta.kernel.internal.TableFeatures; +import io.delta.kernel.internal.actions.DomainMetadata; +import io.delta.kernel.internal.actions.Protocol; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DomainMetadataUtils { + + private DomainMetadataUtils() { + // Empty private constructor to prevent instantiation + } + + /** + * Populate the map of domain metadata from actions. When encountering duplicate domain metadata + * actions for the same domain, this method preserves the first seen entry and skips subsequent + * entries. This behavior is especially useful for log replay as we want to ensure that earlier + * domain metadata entries take precedence over later ones. + * + * @param domainMetadataActionVector A {@link ColumnVector} containing the domain metadata rows + * @param domainMetadataMap The existing map to be populated with domain metadata entries, where + * the key is the domain name and the value is the domain metadata + */ + public static void populateDomainMetadataMap( + ColumnVector domainMetadataActionVector, Map domainMetadataMap) { + final int vectorSize = domainMetadataActionVector.getSize(); + for (int rowId = 0; rowId < vectorSize; rowId++) { + DomainMetadata dm = DomainMetadata.fromColumnVector(domainMetadataActionVector, rowId); + if (dm != null && !domainMetadataMap.containsKey(dm.getDomain())) { + // We only add the domain metadata if its domain name not already present in the map + domainMetadataMap.put(dm.getDomain(), dm); + } + } + } + + /** + * Checks if the table protocol supports the "domainMetadata" writer feature. + * + * @param protocol the protocol to check + * @return true if the "domainMetadata" feature is supported, false otherwise + */ + public static boolean isDomainMetadataSupported(Protocol protocol) { + List writerFeatures = protocol.getWriterFeatures(); + if (writerFeatures == null) { + return false; + } + return writerFeatures.contains(TableFeatures.DOMAIN_METADATA_FEATURE_NAME) + && protocol.getMinWriterVersion() + >= TableFeatures.DOMAIN_METADATA_MIN_WRITER_VERSION_REQUIRED; + } + + /** + * Validates the list of domain metadata actions before committing them. It ensures that + * + *

      + *
    1. domain metadata actions are only present when supported by the table protocol + *
    2. there are no duplicate domain metadata actions for the same domain in the provided + * actions. + *
    + * + * @param domainMetadataActions The list of domain metadata actions to validate + * @param protocol The protocol to check for domain metadata support + */ + public static void validateDomainMetadatas( + List domainMetadataActions, Protocol protocol) { + if (domainMetadataActions.isEmpty()) return; + + // The list of domain metadata is non-empty, so the protocol must support domain metadata + if (!isDomainMetadataSupported(protocol)) { + throw DeltaErrors.domainMetadataUnsupported(); + } + + Map domainMetadataMap = new HashMap<>(); + for (DomainMetadata domainMetadata : domainMetadataActions) { + String domain = domainMetadata.getDomain(); + if (domainMetadataMap.containsKey(domain)) { + throw DeltaErrors.duplicateDomainMetadataAction( + domain, domainMetadataMap.get(domain), domainMetadata); + } + domainMetadataMap.put(domain, domainMetadata); + } + } +} diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala index 2dfb034ea9..a98d39840d 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala @@ -68,7 +68,8 @@ class TableFeaturesSuite extends AnyFunSuite { checkSupported(createTestProtocol(minWriterVersion = 7)) } - Seq("appendOnly", "inCommitTimestamp", "columnMapping", "typeWidening-preview", "typeWidening") + Seq("appendOnly", "inCommitTimestamp", "columnMapping", "typeWidening-preview", "typeWidening", + "domainMetadata") .foreach { supportedWriterFeature => test(s"validateWriteSupported: protocol 7 with $supportedWriterFeature") { checkSupported(createTestProtocol(minWriterVersion = 7, supportedWriterFeature)) @@ -77,7 +78,7 @@ class TableFeaturesSuite extends AnyFunSuite { Seq("invariants", "checkConstraints", "generatedColumns", "allowColumnDefaults", "changeDataFeed", "identityColumns", "deletionVectors", "rowTracking", "timestampNtz", - "domainMetadata", "v2Checkpoint", "icebergCompatV1", "icebergCompatV2", "clustering", + "v2Checkpoint", "icebergCompatV1", "icebergCompatV2", "clustering", "vacuumProtocolCheck").foreach { unsupportedWriterFeature => test(s"validateWriteSupported: protocol 7 with $unsupportedWriterFeature") { checkUnsupported(createTestProtocol(minWriterVersion = 7, unsupportedWriterFeature)) diff --git a/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/.test%file%prefix-part-00000-48cf7913-43ae-45bf-ab2c-94eb2fe77358-c000.snappy.parquet.crc b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/.test%file%prefix-part-00000-48cf7913-43ae-45bf-ab2c-94eb2fe77358-c000.snappy.parquet.crc new file mode 100644 index 0000000000..2f99896a06 Binary files /dev/null and b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/.test%file%prefix-part-00000-48cf7913-43ae-45bf-ab2c-94eb2fe77358-c000.snappy.parquet.crc differ diff --git a/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/.test%file%prefix-part-00001-071539c0-ef9e-478c-b550-035c6b5a31c2-c000.snappy.parquet.crc b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/.test%file%prefix-part-00001-071539c0-ef9e-478c-b550-035c6b5a31c2-c000.snappy.parquet.crc new file mode 100644 index 0000000000..2d6fb4befb Binary files /dev/null and b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/.test%file%prefix-part-00001-071539c0-ef9e-478c-b550-035c6b5a31c2-c000.snappy.parquet.crc differ diff --git a/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/.00000000000000000000.json.crc b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/.00000000000000000000.json.crc new file mode 100644 index 0000000000..f78d578b77 Binary files /dev/null and b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/.00000000000000000000.json.crc differ diff --git a/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/.00000000000000000001.json.crc b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/.00000000000000000001.json.crc new file mode 100644 index 0000000000..74116dc9a4 Binary files /dev/null and b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/.00000000000000000001.json.crc differ diff --git a/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/.00000000000000000002.json.crc b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/.00000000000000000002.json.crc new file mode 100644 index 0000000000..d586b96c52 Binary files /dev/null and b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/.00000000000000000002.json.crc differ diff --git a/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/.00000000000000000003.checkpoint.parquet.crc b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/.00000000000000000003.checkpoint.parquet.crc new file mode 100644 index 0000000000..b886bbf07c Binary files /dev/null and b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/.00000000000000000003.checkpoint.parquet.crc differ diff --git a/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/.00000000000000000003.json.crc b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/.00000000000000000003.json.crc new file mode 100644 index 0000000000..8c18893417 Binary files /dev/null and b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/.00000000000000000003.json.crc differ diff --git a/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/.00000000000000000004.json.crc b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/.00000000000000000004.json.crc new file mode 100644 index 0000000000..8529eeaf90 Binary files /dev/null and b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/.00000000000000000004.json.crc differ diff --git a/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/._last_checkpoint.crc b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/._last_checkpoint.crc new file mode 100644 index 0000000000..3868edc105 Binary files /dev/null and b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/._last_checkpoint.crc differ diff --git a/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/00000000000000000000.json b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/00000000000000000000.json new file mode 100644 index 0000000000..a40e63bd38 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/00000000000000000000.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1730671956424,"operation":"CREATE TABLE","operationParameters":{"partitionBy":"[]","clusterBy":"[]","description":null,"isManaged":"false","properties":"{\"delta.checkpointInterval\":\"3\"}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.3.0-SNAPSHOT","txnId":"90158bef-da23-4500-aafc-d2932e80f8cb"}} +{"metaData":{"id":"04e4bf27-b577-4f7d-b002-08b3bbc00ce5","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpointInterval":"3"},"createdTime":1730671956256}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} diff --git a/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/00000000000000000001.json b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/00000000000000000001.json new file mode 100644 index 0000000000..c5c14e5749 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/00000000000000000001.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1730671958509,"operation":"SET TBLPROPERTIES","operationParameters":{"properties":"{\"delta.feature.domainmetadata\":\"enabled\"}"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.3.0-SNAPSHOT","txnId":"4b1dc72c-432c-4753-b9d3-68ab89f3cb91"}} +{"metaData":{"id":"04e4bf27-b577-4f7d-b002-08b3bbc00ce5","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpointInterval":"3"},"createdTime":1730671956256}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":7,"writerFeatures":["domainMetadata","appendOnly","invariants"]}} diff --git a/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/00000000000000000002.json b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/00000000000000000002.json new file mode 100644 index 0000000000..36fc046ac4 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/00000000000000000002.json @@ -0,0 +1,4 @@ +{"commitInfo":{"timestamp":1730671958797,"operation":"Manual Update","operationParameters":{},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.3.0-SNAPSHOT","txnId":"9a6324de-800e-4c8f-9ce3-7766d0462474"}} +{"domainMetadata":{"domain":"testDomain1","configuration":"{\"key1\":\"1\"}","removed":false}} +{"domainMetadata":{"domain":"testDomain2","configuration":"","removed":false}} +{"domainMetadata":{"domain":"testDomain3","configuration":"","removed":false}} diff --git a/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/00000000000000000003.checkpoint.parquet b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/00000000000000000003.checkpoint.parquet new file mode 100644 index 0000000000..01aa31cd40 Binary files /dev/null and b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/00000000000000000003.checkpoint.parquet differ diff --git a/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/00000000000000000003.json b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/00000000000000000003.json new file mode 100644 index 0000000000..c328eadd40 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/00000000000000000003.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1730671959801,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":2,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"2","numOutputRows":"2","numOutputBytes":"956"},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.3.0-SNAPSHOT","txnId":"54aecdaa-e039-40c8-ac9b-bcd7f30183fa"}} +{"add":{"path":"test%25file%25prefix-part-00000-48cf7913-43ae-45bf-ab2c-94eb2fe77358-c000.snappy.parquet","partitionValues":{},"size":478,"modificationTime":1730671959767,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":0},\"maxValues\":{\"id\":0},\"nullCount\":{\"id\":0}}"}} +{"add":{"path":"test%25file%25prefix-part-00001-071539c0-ef9e-478c-b550-035c6b5a31c2-c000.snappy.parquet","partitionValues":{},"size":478,"modificationTime":1730671959767,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}"}} diff --git a/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/00000000000000000004.json b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/00000000000000000004.json new file mode 100644 index 0000000000..3cba799cc3 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/00000000000000000004.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1730671962209,"operation":"Manual Update","operationParameters":{},"readVersion":3,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.3.0-SNAPSHOT","txnId":"063e4cd4-4091-4a30-a089-4e8a0fc5c0ac"}} +{"domainMetadata":{"domain":"testDomain1","configuration":"{\"key1\":\"10\"}","removed":false}} +{"domainMetadata":{"domain":"testDomain2","configuration":"","removed":true}} diff --git a/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/_last_checkpoint b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/_last_checkpoint new file mode 100644 index 0000000000..760af3338a --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/_delta_log/_last_checkpoint @@ -0,0 +1 @@ +{"version":3,"size":7,"sizeInBytes":16337,"numOfAddFiles":2,"checkpointSchema":{"type":"struct","fields":[{"name":"txn","type":{"type":"struct","fields":[{"name":"appId","type":"string","nullable":true,"metadata":{}},{"name":"version","type":"long","nullable":true,"metadata":{}},{"name":"lastUpdated","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"add","type":{"type":"struct","fields":[{"name":"path","type":"string","nullable":true,"metadata":{}},{"name":"partitionValues","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"size","type":"long","nullable":true,"metadata":{}},{"name":"modificationTime","type":"long","nullable":true,"metadata":{}},{"name":"dataChange","type":"boolean","nullable":true,"metadata":{}},{"name":"tags","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"deletionVector","type":{"type":"struct","fields":[{"name":"storageType","type":"string","nullable":true,"metadata":{}},{"name":"pathOrInlineDv","type":"string","nullable":true,"metadata":{}},{"name":"offset","type":"integer","nullable":true,"metadata":{}},{"name":"sizeInBytes","type":"integer","nullable":true,"metadata":{}},{"name":"cardinality","type":"long","nullable":true,"metadata":{}},{"name":"maxRowIndex","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"baseRowId","type":"long","nullable":true,"metadata":{}},{"name":"defaultRowCommitVersion","type":"long","nullable":true,"metadata":{}},{"name":"clusteringProvider","type":"string","nullable":true,"metadata":{}},{"name":"stats","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"remove","type":{"type":"struct","fields":[{"name":"path","type":"string","nullable":true,"metadata":{}},{"name":"deletionTimestamp","type":"long","nullable":true,"metadata":{}},{"name":"dataChange","type":"boolean","nullable":true,"metadata":{}},{"name":"extendedFileMetadata","type":"boolean","nullable":true,"metadata":{}},{"name":"partitionValues","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"size","type":"long","nullable":true,"metadata":{}},{"name":"deletionVector","type":{"type":"struct","fields":[{"name":"storageType","type":"string","nullable":true,"metadata":{}},{"name":"pathOrInlineDv","type":"string","nullable":true,"metadata":{}},{"name":"offset","type":"integer","nullable":true,"metadata":{}},{"name":"sizeInBytes","type":"integer","nullable":true,"metadata":{}},{"name":"cardinality","type":"long","nullable":true,"metadata":{}},{"name":"maxRowIndex","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"baseRowId","type":"long","nullable":true,"metadata":{}},{"name":"defaultRowCommitVersion","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"metaData","type":{"type":"struct","fields":[{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"description","type":"string","nullable":true,"metadata":{}},{"name":"format","type":{"type":"struct","fields":[{"name":"provider","type":"string","nullable":true,"metadata":{}},{"name":"options","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"schemaString","type":"string","nullable":true,"metadata":{}},{"name":"partitionColumns","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"configuration","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"createdTime","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"protocol","type":{"type":"struct","fields":[{"name":"minReaderVersion","type":"integer","nullable":true,"metadata":{}},{"name":"minWriterVersion","type":"integer","nullable":true,"metadata":{}},{"name":"readerFeatures","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"writerFeatures","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"domainMetadata","type":{"type":"struct","fields":[{"name":"domain","type":"string","nullable":true,"metadata":{}},{"name":"configuration","type":"string","nullable":true,"metadata":{}},{"name":"removed","type":"boolean","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"checksum":"7a97cc187ffb0604cc50e51bddb3cbfa"} diff --git a/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/test%file%prefix-part-00000-48cf7913-43ae-45bf-ab2c-94eb2fe77358-c000.snappy.parquet b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/test%file%prefix-part-00000-48cf7913-43ae-45bf-ab2c-94eb2fe77358-c000.snappy.parquet new file mode 100644 index 0000000000..bc184bfa94 Binary files /dev/null and b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/test%file%prefix-part-00000-48cf7913-43ae-45bf-ab2c-94eb2fe77358-c000.snappy.parquet differ diff --git a/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/test%file%prefix-part-00001-071539c0-ef9e-478c-b550-035c6b5a31c2-c000.snappy.parquet b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/test%file%prefix-part-00001-071539c0-ef9e-478c-b550-035c6b5a31c2-c000.snappy.parquet new file mode 100644 index 0000000000..737d58aed4 Binary files /dev/null and b/kernel/kernel-defaults/src/test/resources/kernel-domain-metadata/test%file%prefix-part-00001-071539c0-ef9e-478c-b550-035c6b5a31c2-c000.snappy.parquet differ diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala new file mode 100644 index 0000000000..9222956f88 --- /dev/null +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala @@ -0,0 +1,433 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults + +import io.delta.kernel._ +import io.delta.kernel.defaults.internal.parquet.ParquetSuiteBase +import io.delta.kernel.engine.Engine +import io.delta.kernel.exceptions._ +import io.delta.kernel.internal.{SnapshotImpl, TableImpl, TransactionBuilderImpl} +import io.delta.kernel.internal.actions.{DomainMetadata, Protocol, SingleAction} +import io.delta.kernel.internal.util.Utils.toCloseableIterator +import io.delta.kernel.utils.CloseableIterable.{emptyIterable, inMemoryIterable} + +import java.util.Collections +import scala.collection.JavaConverters._ +import scala.collection.immutable.Seq + +class DomainMetadataSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBase { + + private def assertDomainMetadata( + snapshot: SnapshotImpl, + expectedValue: Map[String, DomainMetadata]): Unit = { + assert(expectedValue === snapshot.getDomainMetadataMap.asScala) + } + + private def assertDomainMetadata( + table: Table, + engine: Engine, + expectedValue: Map[String, DomainMetadata]): Unit = { + // Get the latest snapshot of the table + val snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] + assertDomainMetadata(snapshot, expectedValue) + } + + private def createTxnWithDomainMetadatas( + engine: Engine, + tablePath: String, + domainMetadatas: Seq[DomainMetadata]): Transaction = { + + val txnBuilder = createWriteTxnBuilder(TableImpl.forPath(engine, tablePath)) + .asInstanceOf[TransactionBuilderImpl] + + txnBuilder.withDomainMetadatas(domainMetadatas.asJava).build(engine) + } + + private def commitDomainMetadataAndVerify( + engine: Engine, + tablePath: String, + domainMetadatas: Seq[DomainMetadata], + expectedValue: Map[String, DomainMetadata]): Unit = { + // Create the transaction with domain metadata and commit + val txn = createTxnWithDomainMetadatas(engine, tablePath, domainMetadatas) + txn.commit(engine, emptyIterable()) + + // Verify the final state includes the expected domain metadata + val table = Table.forPath(engine, tablePath) + assertDomainMetadata(table, engine, expectedValue) + } + + private def setDomainMetadataSupport(engine: Engine, tablePath: String): Unit = { + val protocol = new Protocol( + 1, // minReaderVersion + 7, // minWriterVersion + Collections.emptyList(), // readerFeatures + Seq("domainMetadata").asJava // writerFeatures + ) + + val protocolAction = SingleAction.createProtocolSingleAction(protocol.toRow) + val txn = createTxn(engine, tablePath, isNewTable = false, testSchema, Seq.empty) + txn.commit(engine, inMemoryIterable(toCloseableIterator(Seq(protocolAction).asJava.iterator()))) + } + + private def createTableWithDomainMetadataSupported(engine: Engine, tablePath: String): Unit = { + // Create an empty table + createTxn(engine, tablePath, isNewTable = true, testSchema, Seq.empty) + .commit(engine, emptyIterable()) + + // Set writer version and writer feature to support domain metadata + setDomainMetadataSupport(engine, tablePath) + } + + private def validateDomainMetadataConflictResolution( + engine: Engine, + tablePath: String, + currentTxn1DomainMetadatas: Seq[DomainMetadata], + winningTxn2DomainMetadatas: Seq[DomainMetadata], + winningTxn3DomainMetadatas: Seq[DomainMetadata], + expectedConflict: Boolean): Unit = { + // Create table with domain metadata support + createTableWithDomainMetadataSupported(engine, tablePath) + val table = Table.forPath(engine, tablePath) + + /** + * Txn1: i.e. the current transaction that comes later than winning transactions. + * Txn2: i.e. the winning transaction that was committed first. + * Txn3: i.e. the winning transaction that was committed secondly. + * + * Note tx is the timestamp. + * + * t1 ------------------------ Txn1 starts. + * t2 ------- Txn2 starts. + * t3 ------- Txn2 commits. + * t4 ------- Txn3 starts. + * t5 ------- Txn3 commits. + * t6 ------------------------ Txn1 commits (SUCCESS or FAIL). + */ + val txn1 = createTxnWithDomainMetadatas(engine, tablePath, currentTxn1DomainMetadatas) + + val txn2 = createTxnWithDomainMetadatas(engine, tablePath, winningTxn2DomainMetadatas) + txn2.commit(engine, emptyIterable()) + + val txn3 = createTxnWithDomainMetadatas(engine, tablePath, winningTxn3DomainMetadatas) + txn3.commit(engine, emptyIterable()) + + if (expectedConflict) { + // We expect the commit of txn1 to fail because of the conflicting DM actions + val ex = intercept[KernelException] { + txn1.commit(engine, emptyIterable()) + } + assert( + ex.getMessage.contains( + "A concurrent writer added a domainMetadata action for the same domain" + ) + ) + } else { + // We expect the commit of txn1 to succeed + txn1.commit(engine, emptyIterable()) + // Verify the final state includes merged domain metadata + val expectedMetadata = + (winningTxn2DomainMetadatas ++ winningTxn3DomainMetadatas ++ currentTxn1DomainMetadatas) + .groupBy(_.getDomain) + .mapValues(_.last) + assertDomainMetadata(table, engine, expectedMetadata) + } + } + + test("create table w/o domain metadata") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + + // Create an empty table + createTxn(engine, tablePath, isNewTable = true, testSchema, Seq.empty) + .commit(engine, emptyIterable()) + + // Verify that the table doesn't have any domain metadata + assertDomainMetadata(table, engine, Map.empty) + } + } + + test("table w/o domain metadata support fails domain metadata commits") { + withTempDirAndEngine { (tablePath, engine) => + // Create an empty table + // Its minWriterVersion is 2 and doesn't have 'domainMetadata' in its writerFeatures + createTxn(engine, tablePath, isNewTable = true, testSchema, Seq.empty) + .commit(engine, emptyIterable()) + + val dm1 = new DomainMetadata("domain1", "", false) + val txn1 = createTxnWithDomainMetadatas(engine, tablePath, List(dm1)) + + // We expect the commit to fail because the table doesn't support domain metadata + val e = intercept[KernelException] { + txn1.commit(engine, emptyIterable()) + } + assert( + e.getMessage + .contains( + "Found DomainMetadata action(s) but table feature 'domainMetadata' " + + "is not supported on this table." + ) + ) + + // Set writer version and writer feature to support domain metadata + setDomainMetadataSupport(engine, tablePath) + + // Commit domain metadata again and expect success + val txn2 = createTxnWithDomainMetadatas(engine, tablePath, List(dm1)) + txn2.commit(engine, emptyIterable()) + } + } + + test("multiple DomainMetadatas for the same domain should fail in single transaction") { + withTempDirAndEngine { (tablePath, engine) => + createTableWithDomainMetadataSupported(engine, tablePath) + + val dm1_1 = new DomainMetadata("domain1", """{"key1":"1"}""", false) + val dm2 = new DomainMetadata("domain2", "", false) + val dm1_2 = new DomainMetadata("domain1", """{"key1":"10"}"""", false) + + val txn = createTxnWithDomainMetadatas(engine, tablePath, List(dm1_1, dm2, dm1_2)) + + val e = intercept[KernelException] { + txn.commit(engine, emptyIterable()) + } + assert( + e.getMessage.contains( + "Multiple actions detected for domain 'domain1' in single transaction" + ) + ) + } + } + + test("latest domain metadata overwriting existing ones") { + withTempDirAndEngine { (tablePath, engine) => + createTableWithDomainMetadataSupported(engine, tablePath) + + val dm1 = new DomainMetadata("domain1", """{"key1":"1"}, {"key2":"2"}""", false) + val dm2 = new DomainMetadata("domain2", "", false) + val dm3 = new DomainMetadata("domain3", """{"key3":"3"}""", false) + + val dm1_2 = new DomainMetadata("domain1", """{"key1":"10"}""", false) + val dm3_2 = new DomainMetadata("domain3", """{"key3":"30"}""", false) + + Seq( + (Seq(dm1), Map("domain1" -> dm1)), + (Seq(dm2, dm3, dm1_2), Map("domain1" -> dm1_2, "domain2" -> dm2, "domain3" -> dm3)), + (Seq(dm3_2), Map("domain1" -> dm1_2, "domain2" -> dm2, "domain3" -> dm3_2)) + ).foreach { + case (domainMetadatas, expectedValue) => + commitDomainMetadataAndVerify(engine, tablePath, domainMetadatas, expectedValue) + } + } + } + + test("domain metadata persistence across log replay") { + withTempDirAndEngine { (tablePath, engine) => + createTableWithDomainMetadataSupported(engine, tablePath) + + val dm1 = new DomainMetadata("domain1", """{"key1":"1"}, {"key2":"2"}""", false) + val dm2 = new DomainMetadata("domain2", "", false) + + commitDomainMetadataAndVerify( + engine, + tablePath, + domainMetadatas = Seq(dm1, dm2), + expectedValue = Map("domain1" -> dm1, "domain2" -> dm2) + ) + + // Restart the table and verify the domain metadata + val table2 = Table.forPath(engine, tablePath) + assertDomainMetadata(table2, engine, Map("domain1" -> dm1, "domain2" -> dm2)) + } + } + + test("only the latest domain metadata per domain is stored in checkpoints") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + createTableWithDomainMetadataSupported(engine, tablePath) + + val dm1 = new DomainMetadata("domain1", """{"key1":"1"}, {"key2":"2"}""", false) + val dm2 = new DomainMetadata("domain2", "", false) + val dm3 = new DomainMetadata("domain3", """{"key3":"3"}""", false) + val dm1_2 = new DomainMetadata("domain1", """{"key1":"10"}""", false) + val dm3_2 = new DomainMetadata("domain3", """{"key3":"30"}""", true) + + Seq( + (Seq(dm1), Map("domain1" -> dm1)), + (Seq(dm2), Map("domain1" -> dm1, "domain2" -> dm2)), + (Seq(dm3), Map("domain1" -> dm1, "domain2" -> dm2, "domain3" -> dm3)), + (Seq(dm1_2, dm3_2), Map("domain1" -> dm1_2, "domain2" -> dm2, "domain3" -> dm3_2)) + ).foreach { + case (domainMetadatas, expectedValue) => + commitDomainMetadataAndVerify(engine, tablePath, domainMetadatas, expectedValue) + } + + // Checkpoint the table + val latestVersion = table.getLatestSnapshot(engine).getVersion(engine) + table.checkpoint(engine, latestVersion) + + // Verify that only the latest domain metadata is persisted in the checkpoint + val table2 = Table.forPath(engine, tablePath) + assertDomainMetadata( + table2, + engine, + Map("domain1" -> dm1_2, "domain2" -> dm2, "domain3" -> dm3_2) + ) + } + } + + test("Conflict resolution - one of three concurrent txns has DomainMetadata") { + withTempDirAndEngine { (tablePath, engine) => + /** + * Txn1: include DomainMetadata action. + * Txn2: does NOT include DomainMetadata action. + * Txn3: does NOT include DomainMetadata action. + * + * t1 ------------------------ Txn1 starts. + * t2 ------- Txn2 starts. + * t3 ------- Txn2 commits. + * t4 ------- Txn3 starts. + * t5 ------- Txn3 commits. + * t6 ------------------------ Txn1 commits (SUCCESS). + */ + val dm1 = new DomainMetadata("domain1", "", false) + + validateDomainMetadataConflictResolution( + engine, + tablePath, + currentTxn1DomainMetadatas = Seq(dm1), + winningTxn2DomainMetadatas = Seq.empty, + winningTxn3DomainMetadatas = Seq.empty, + expectedConflict = false + ) + } + } + + test( + "Conflict resolution - three concurrent txns have DomainMetadata w/o conflicting domains" + ) { + withTempDirAndEngine { (tablePath, engine) => + /** + * Txn1: include DomainMetadata action for "domain1". + * Txn2: include DomainMetadata action for "domain2". + * Txn3: include DomainMetadata action for "domain3". + * + * t1 ------------------------ Txn1 starts. + * t2 ------- Txn2 starts. + * t3 ------- Txn2 commits. + * t4 ------- Txn3 starts. + * t5 ------- Txn3 commits. + * t6 ------------------------ Txn1 commits (SUCCESS). + */ + val dm1 = new DomainMetadata("domain1", "", false) + val dm2 = new DomainMetadata("domain2", "", false) + val dm3 = new DomainMetadata("domain3", "", false) + + validateDomainMetadataConflictResolution( + engine, + tablePath, + currentTxn1DomainMetadatas = Seq(dm1), + winningTxn2DomainMetadatas = Seq(dm2), + winningTxn3DomainMetadatas = Seq(dm3), + expectedConflict = false + ) + } + } + + test( + "Conflict resolution - three concurrent txns have DomainMetadata w/ conflicting domains" + ) { + withTempDirAndEngine { (tablePath, engine) => + /** + * Txn1: include DomainMetadata action for "domain1". + * Txn2: include DomainMetadata action for "domain2". + * Txn3: include DomainMetadata action for "domain1". + * + * t1 ------------------------ Txn1 starts. + * t2 ------- Txn2 starts. + * t3 ------- Txn2 commits. + * t4 ------- Txn3 starts. + * t5 ------- Txn3 commits. + * t6 ------------------------ Txn1 commits (FAIL). + */ + val dm1 = new DomainMetadata("domain1", "", false) + val dm2 = new DomainMetadata("domain2", "", false) + val dm3 = new DomainMetadata("domain1", "", false) + + validateDomainMetadataConflictResolution( + engine, + tablePath, + currentTxn1DomainMetadatas = Seq(dm1), + winningTxn2DomainMetadatas = Seq(dm2), + winningTxn3DomainMetadatas = Seq(dm3), + expectedConflict = true + ) + } + } + + test( + "Conflict resolution - three concurrent txns have DomainMetadata w/ conflict domains - 2" + ) { + withTempDirAndEngine { (tablePath, engine) => + /** + * Txn1: include DomainMetadata action for "domain1". + * Txn2: include DomainMetadata action for "domain1". + * Txn3: include DomainMetadata action for "domain2". + * + * t1 ------------------------ Txn1 starts. + * t2 ------- Txn2 starts. + * t3 ------- Txn2 commits. + * t4 ------- Txn3 starts. + * t5 ------- Txn3 commits. + * t6 ------------------------ Txn1 commits (FAIL). + */ + val dm1 = new DomainMetadata("domain1", "", false) + val dm2 = new DomainMetadata("domain1", "", false) + val dm3 = new DomainMetadata("domain2", "", false) + + validateDomainMetadataConflictResolution( + engine, + tablePath, + currentTxn1DomainMetadatas = Seq(dm1), + winningTxn2DomainMetadatas = Seq(dm2), + winningTxn3DomainMetadatas = Seq(dm3), + expectedConflict = true + ) + } + } + + test("Integration test - read a golden table with checkpoints and log files") { + withTempDirAndEngine((tablePath, engine) => { + val path = getTestResourceFilePath("kernel-domain-metadata") + val snapshot = latestSnapshot(path).asInstanceOf[SnapshotImpl] + + // We need to read 1 checkpoint file and 1 log file to replay the golden table + // The state of the domain metadata should be: + // testDomain1: "{\"key1\":\"10\"}", removed = false (from 03.checkpoint) + // testDomain2: "", removed = true (from 03.checkpoint) + // testDomain3: "", removed = false (from 04.json) + + val dm1 = new DomainMetadata("testDomain1", """{"key1":"10"}""", false) + val dm2 = new DomainMetadata("testDomain2", "", true) + val dm3 = new DomainMetadata("testDomain3", "", false) + + assertDomainMetadata( + snapshot, + Map("testDomain1" -> dm1, "testDomain2" -> dm2, "testDomain3" -> dm3) + ) + }) + } +}