Skip to content

Commit

Permalink
[Kernel] [CC Refactor #2] Add TableDescriptor and `CommitCoordinato…
Browse files Browse the repository at this point in the history
…rClient` API (#3797)

This is a stacked PR. Please view this PR's diff here:
-
scottsand-db/delta@delta_kernel_cc_1...delta_kernel_cc_2

#### Which Delta project/connector is this regarding?
- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [X] Kernel
- [ ] Other (fill in here)

## Description

Adds new `TableDescriptor` and `CommitCoordinatorClient` API. Adds a new
`getCommitCoordinatorClient` API to the `Engine` (with a default
implementation that throws an exception).

## How was this patch tested?

N/A trivial.

## Does this PR introduce _any_ user-facing changes?

Yes. See the above.
  • Loading branch information
scottsand-db authored Nov 1, 2024
1 parent 024dadb commit 6ae4b62
Show file tree
Hide file tree
Showing 5 changed files with 374 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
TableIdentifier that = (TableIdentifier) o;
final TableIdentifier that = (TableIdentifier) o;
return Arrays.equals(getNamespace(), that.getNamespace()) && getName().equals(that.getName());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.coordinatedcommits;

import io.delta.kernel.TableIdentifier;
import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.engine.coordinatedcommits.CommitFailedException;
import io.delta.kernel.engine.coordinatedcommits.CommitResponse;
import io.delta.kernel.engine.coordinatedcommits.GetCommitsResponse;
import io.delta.kernel.engine.coordinatedcommits.UpdatedActions;
import io.delta.kernel.engine.coordinatedcommits.actions.AbstractMetadata;
import io.delta.kernel.engine.coordinatedcommits.actions.AbstractProtocol;
import io.delta.kernel.utils.CloseableIterator;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;

/**
* The CommitCoordinatorClient is responsible for communicating with the commit coordinator and
* backfilling commits. It has four main APIs that need to be implemented:
*
* <ul>
* <li>{@link #registerTable}: Determine the table config during commit coordinator registration.
* <li>{@link #commit}: Commit a new version of the table.
* <li>{@link #getCommits}: Tracks and returns unbackfilled commits.
* <li>{@link #backfillToVersion}: Ensure that commits are backfilled if/when needed.
* </ul>
*
* @since 3.3.0
*/
@Evolving
public interface CommitCoordinatorClient {

/**
* Register the table represented by the given {@code logPath} at the provided {@code
* currentVersion} with the commit coordinator this commit coordinator client represents.
*
* <p>This API is called when the table is being converted from an existing file system table to a
* coordinated-commit table.
*
* <p>When a new coordinated-commit table is being created, the {@code currentVersion} will be -1
* and the upgrade commit needs to be a file system commit which will write the backfilled file
* directly.
*
* @param engine The {@link Engine} instance to use, if needed.
* @param logPath The path to the delta log of the table that should be converted.
* @param tableIdentifier The table identifier for the table, or {@link Optional#empty()} if the
* table doesn't use any identifier (i.e. it is path-based).
* @param currentVersion The version of the table just before conversion. currentVersion + 1
* represents the commit that will do the conversion. This must be backfilled atomically.
* currentVersion + 2 represents the first commit after conversion. This will go through the
* CommitCoordinatorClient and the client is free to choose when it wants to backfill this
* commit.
* @param currentMetadata The metadata of the table at currentVersion
* @param currentProtocol The protocol of the table at currentVersion
* @return A map of key-value pairs which is issued by the commit coordinator to uniquely identify
* the table. This should be stored in the table's metadata for table property {@link
* io.delta.kernel.internal.TableConfig#COORDINATED_COMMITS_TABLE_CONF}. This information
* needs to be passed to the {@link #commit}, {@link #getCommits}, and {@link
* #backfillToVersion} APIs to identify the table.
*/
Map<String, String> registerTable(
Engine engine,
String logPath,
Optional<TableIdentifier> tableIdentifier,
long currentVersion,
AbstractMetadata currentMetadata,
AbstractProtocol currentProtocol);

/**
* Commit the given set of actions to the table represented by {@code tableDescriptor}.
*
* @param engine The {@link Engine} instance to use. This gives client implementations access to
* {@link io.delta.kernel.engine.JsonHandler#writeJsonFileAtomically} in order to write the
* given set of actions to an unbackfilled Delta file.
* @param tableDescriptor The descriptor for the table.
* @param commitVersion The version of the commit that is being committed.
* @param actions The set of actions to be committed
* @param updatedActions Additional information for the commit, including:
* <ul>
* <li>Commit info
* <li>Metadata changes
* <li>Protocol changes
* </ul>
*
* @return {@link CommitResponse} containing the file status of the committed file. Note: If the
* commit is already backfilled, the file status may be omitted, and the client can retrieve
* this information independently.
* @throws CommitFailedException if the commit operation fails
*/
CommitResponse commit(
Engine engine,
TableDescriptor tableDescriptor,
long commitVersion,
CloseableIterator<Row> actions,
UpdatedActions updatedActions)
throws CommitFailedException;

/**
* Get the unbackfilled commits for the table represented by the given tableDescriptor. Commits
* older than startVersion (if given) or newer than endVersion (if given) are ignored. The
* returned commits are contiguous and in ascending version order.
*
* <p>Note that the first version returned by this API may not be equal to startVersion. This
* happens when some versions starting from startVersion have already been backfilled and so the
* commit coordinator may have stopped tracking them.
*
* <p>The returned latestTableVersion is the maximum commit version ratified by the commit
* coordinator. Note that returning latestTableVersion as -1 is acceptable only if the commit
* coordinator never ratified any version, i.e. it never accepted any unbackfilled commit.
*
* @param engine The {@link Engine} instance to use, if needed.
* @param tableDescriptor The descriptor for the table.
* @param startVersion The minimum version of the commit that should be returned, or {@link
* Optional#empty()} if there is no minimum.
* @param endVersion The maximum version of the commit that should be returned, or {@link
* Optional#empty()} if there is no maximum.
* @return {@link GetCommitsResponse} which has a list of {@link
* io.delta.kernel.engine.coordinatedcommits.Commit}s and the latestTableVersion which is
* tracked by the {@link CommitCoordinatorClient}.
*/
GetCommitsResponse getCommits(
Engine engine,
TableDescriptor tableDescriptor,
Optional<Long> startVersion,
Optional<Long> endVersion);

/**
* Backfill all commits up to {@code version} and notify the commit coordinator.
*
* <p>If this API returns successfully, that means the backfill must have been completed, although
* the commit coordinator may not be aware of it yet.
*
* @param engine The {@link Engine} instance to use, if needed.
* @param tableDescriptor The descriptor for the table.
* @param version The version until which the commit coordinator client should backfill.
* @param lastKnownBackfilledVersion The last known version that was backfilled before this API
* was called. If it is {@link Optional#empty()}, then the commit coordinator client should
* backfill from the beginning of the table.
* @throws IOException if there is an IO error while backfilling the commits.
*/
void backfillToVersion(
Engine engine,
TableDescriptor tableDescriptor,
long version,
Optional<Long> lastKnownBackfilledVersion)
throws IOException;

/**
* Checks if this CommitCoordinatorClient is semantically equal to another
* CommitCoordinatorClient.
*/
boolean semanticEquals(CommitCoordinatorClient other);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.coordinatedcommits;

import static java.util.Objects.requireNonNull;

import io.delta.kernel.TableIdentifier;
import io.delta.kernel.annotation.Evolving;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
* The complete descriptor of a Coordinated Commits (CC) Delta table, including its logPath, table
* identifier, and table CC configuration.
*
* @since 3.3.0
*/
@Evolving
public class TableDescriptor {

private final String logPath;
private final Optional<TableIdentifier> tableIdOpt;
private final Map<String, String> tableConf;

public TableDescriptor(
String logPath, Optional<TableIdentifier> tableIdOpt, Map<String, String> tableConf) {
this.logPath = requireNonNull(logPath, "logPath is null");
this.tableIdOpt = requireNonNull(tableIdOpt, "tableIdOpt is null");
this.tableConf = requireNonNull(tableConf, "tableConf is null");
}

/** Returns the Delta log path of the table. */
public String getLogPath() {
return logPath;
}

/** Returns the optional table identifier of the table, e.g. $catalog / $schema / $tableName */
public Optional<TableIdentifier> getTableIdentifierOpt() {
return tableIdOpt;
}

/**
* Returns the Coordinated Commits table configuration.
*
* <p>This is the parsed value of the Delta table property {@link
* io.delta.kernel.internal.TableConfig#COORDINATED_COMMITS_TABLE_CONF} and represents the
* configuration properties for describing the Delta table to commit-coordinator.
*/
public Map<String, String> getTableConf() {
return tableConf;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final TableDescriptor that = (TableDescriptor) o;
return getLogPath().equals(that.getLogPath())
&& tableIdOpt.equals(that.tableIdOpt)
&& getTableConf().equals(that.getTableConf());
}

@Override
public int hashCode() {
return Objects.hash(getLogPath(), tableIdOpt, getTableConf());
}

@Override
public String toString() {
return "TableDescriptor{"
+ "logPath='"
+ logPath
+ '\''
+ ", tableIdOpt="
+ tableIdOpt
+ ", tableConf="
+ tableConf
+ '}';
}
}
18 changes: 18 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.delta.kernel.engine;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.coordinatedcommits.CommitCoordinatorClient;
import java.util.Map;

/**
Expand Down Expand Up @@ -56,6 +57,23 @@ public interface Engine {
*/
ParquetHandler getParquetHandler();

/**
* Retrieves a {@link CommitCoordinatorClient} for the specified commit coordinator name.
*
* @param commitCoordinatorName The name (identifier) of the underlying commit coordinator client
* to instantiate
* @param commitCoordinatorConf The configuration settings for the underlying commit coordinator
* client, taken directly from the Delta table property {@link
* io.delta.kernel.internal.TableConfig#COORDINATED_COMMITS_TABLE_CONF}
* @return A {@link CommitCoordinatorClient} implementation corresponding to the specified commit
* coordinator name
* @since 3.3.0
*/
default CommitCoordinatorClient getCommitCoordinatorClient(
String commitCoordinatorName, Map<String, String> commitCoordinatorConf) {
throw new UnsupportedOperationException("Not implemented");
}

/**
* Retrieves a {@link CommitCoordinatorClientHandler} for the specified commit coordinator client.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.coordinatedcommits

import org.scalatest.funsuite.AnyFunSuite
import io.delta.kernel.TableIdentifier
import java.util.Optional

import scala.collection.JavaConverters._

class TableDescriptorSuite extends AnyFunSuite {

test("TableDescriptor should throw NullPointerException for null constructor arguments") {
assertThrows[NullPointerException] {
new TableDescriptor(null, Optional.empty(), Map.empty[String, String].asJava)
}
assertThrows[NullPointerException] {
new TableDescriptor("/delta/logPath", null, Map.empty[String, String].asJava)
}
assertThrows[NullPointerException] {
new TableDescriptor("/delta/logPath", Optional.empty(), null)
}
}

test("TableDescriptor should return the correct logPath, tableIdOpt, and tableConf") {
val logPath = "/delta/logPath"
val tableIdOpt = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table"))
val tableConf = Map("key1" -> "value1", "key2" -> "value2").asJava

val tableDescriptor = new TableDescriptor(logPath, tableIdOpt, tableConf)

assert(tableDescriptor.getLogPath == logPath)
assert(tableDescriptor.getTableIdentifierOpt == tableIdOpt)
assert(tableDescriptor.getTableConf == tableConf)
}

test("TableDescriptors with the same values should be equal") {
val logPath = "/delta/logPath"
val tableIdOpt = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table"))
val tableConf = Map("key1" -> "value1", "key2" -> "value2").asJava

val tableDescriptor1 = new TableDescriptor(logPath, tableIdOpt, tableConf)
val tableDescriptor2 = new TableDescriptor(logPath, tableIdOpt, tableConf)

assert(tableDescriptor1 == tableDescriptor2)
assert(tableDescriptor1.hashCode == tableDescriptor2.hashCode)
}

test("TableDescriptor with different values should not be equal") {
val logPath = "/delta/logPath"
val tableIdOpt = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table"))
val tableConf1 = Map("key1" -> "value1").asJava
val tableConf2 = Map("key1" -> "value2").asJava

val tableDescriptor1 = new TableDescriptor(logPath, tableIdOpt, tableConf1)
val tableDescriptor2 = new TableDescriptor(logPath, tableIdOpt, tableConf2)

assert(tableDescriptor1 != tableDescriptor2)
}

test("TableDescriptor toString format") {
val logPath = "/delta/logPath"
val tableIdOpt = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table"))
val tableConf = Map("key1" -> "value1").asJava

val tableDescriptor = new TableDescriptor(logPath, tableIdOpt, tableConf)
val expectedString = "TableDescriptor{logPath='/delta/logPath', " +
"tableIdOpt=Optional[TableIdentifier{catalog.schema.table}], " +
"tableConf={key1=value1}}"
assert(tableDescriptor.toString == expectedString)
}
}

0 comments on commit 6ae4b62

Please sign in to comment.