Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] [CC Refactor #3] Add ConfigurationProvider and AbstractCommitCoordinatorBuilder APIs #3798

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.config;

import io.delta.kernel.annotation.Evolving;
import java.util.NoSuchElementException;
import java.util.Optional;

/**
* A generic interface for retrieving configuration values.
*
* @since 3.3.0
*/
@Evolving
public interface ConfigurationProvider {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scottsand-db This class doesn't provide any method to iterate all configs. Does that work for UC Commit Coordinator? Don't we need to iterate over all configs in that? Will let @sumeet-db confirm.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prakharjain09 -- Yup, great callout. We can update APIs when we get there. IMO there's no rush or requirement to add it now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will need a getAll / getAllByPrefix API for UC Commit Coordinator Builder when we get there.

/**
* Retrieves the configuration value for the given key.
*
* @param key the configuration key
* @return the configuration value associated with the key, if it exists
* @throws NoSuchElementException if the key is not found
*/
String get(String key) throws NoSuchElementException;
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved

/**
* Retrieves the configuration value for the given key. If it doesn't exist, returns {@link
* Optional#empty}.
*
* @param key the configuration key
* @return the configuration value associated with the key, if it exists
*/
Optional<String> getOptional(String key);

/**
* Retrieves the configuration value for the given key, ensuring that the value is not null.
*
* @param key the configuration key
* @return the configuration value associated with the key, guaranteed to be non-null
* @throws NoSuchElementException if the key is not found in the configuration
* @throws IllegalStateException if the key exists but its value is null
*/
default String getNonNull(String key) throws NoSuchElementException, IllegalStateException {
final String value = get(key);
if (value == null) throw new IllegalStateException(String.format("%s is null", key));
return value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.coordinatedcommits;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.config.ConfigurationProvider;
import io.delta.kernel.internal.DeltaErrors;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.NoSuchElementException;

/**
* A builder class to create a {@link CommitCoordinatorClient} (CCC).
*
* <p>Subclasses of this class must provide a no-argument constructor.
*
* <p>Note: These builder classes are optional. The {@link
* io.delta.kernel.engine.Engine#getCommitCoordinatorClient} API does not prescribe how to create
* the underlying CCC and does not require a builder.
*
* <p>The benefit of implementing a builder for your CCC is that your Engine may then invoke {@link
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
* #buildCommitCoordinatorClient} to (1) instantiate your builder, and then (2) build a new CCC via
* reflection.
*
* <p>In short, this builder provides the ability for users to specify at runtime which builder
* implementation (e.g. x.y.z.FooCCBuilder) to use for a given commit coordinator name (e.g. "foo"),
* without imposing any restrictions on how to construct CCCs (i.e. these builders are optional).
*
* @since 3.3.0
*/
@Evolving
public abstract class AbstractCommitCoordinatorBuilder {

///////////////////////////
// Static Helper Methods //
///////////////////////////

/**
* Builds the underlying {@link CommitCoordinatorClient} associated with the given commit
* coordinator name.
*
* <ul>
* <li>Determines the specific builder configuration lookup key.
* <li>Grabs the corresponding builder className for that key from the provided sessionConfig.
* <li>Instantiates a new instance of that {@link AbstractCommitCoordinatorBuilder}.
* <li>Invokes the builder's build method, passing along the sessionConfig and commit
* coordinator config map.
* </ul>
*
* @param commitCoordinatorName the commit coordinator name
* @param sessionConfig The session-level configuration that may represent environment-specific
* configurations such as {@code HadoopConf} or {@code SparkConf}. This sessionConfig is used
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to make them implement ConfigurationProvider or how can we, e.g., pass a SparkConf as a ConfigurationProvider?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They need to pass in something that is a ConfigurationProvider (i.e. that implements the interface). You can easily create a wrapper around a SparkConf or a HadoopConf to implemeent the ConfigurationProvider interface

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if you need both hadoopConf and sparkConf?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question. You can union them to implement the get interface.

When we go to implement the UC CCC using this API, we can adjust methods as needed.

* to look up the right builder className to instantiate for the given commit coordinator
* name. It can also be used by builders to lookup per-session configuration values unique to
* that builder.
* @param commitCoordinatorConf the parsed value of the Delta table property {@link
* io.delta.kernel.internal.TableConfig#COORDINATED_COMMITS_TABLE_CONF} and represents the
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
* configuration properties for describing the Delta table to the commit-coordinator.
* @return the {@link CommitCoordinatorClient} corresponding to the given commit coordinator name
*/
public static CommitCoordinatorClient buildCommitCoordinatorClient(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the dev experience for using this method will be:

CommitCoordinatorClient client = AbstractCommitCoordinatorBuilder
                .buildCommitCoordinatorClient(
                    "dynamodb",
                    sessionConfig,
                    coordinatorConf
                );

Somehow, it doesn't feel intuitive that this code will look up the builder in the conf and do stuff

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How's this method different than CommitCoordinatorProvider.getBuilder(... /* load builder dynamically from conf */).build(...)? And why do you prefer this one? 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somehow, it doesn't feel intuitive that this code will look up the builder in the conf and do stuff

Really? I quite like it. Feel free to let me know what doesn't feel right about this. This is what we agreed on in the sync on Tuesday Oct 22 @ 1:00pm

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Further, this API feels very similar to the previous API of

CommitCoordinatorClient client = CommitCoordinatorProvider
  .getCommitCoordinatorClient(
    hadoopConf,
    name,
    commitCoordinatorConf);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping we could prevent patterns like the following by moving buildCommitCoordinatorClient to a separate DefaultCommitCoordinatorClientBuilder that DynamoDBCCClientBuilder doesn't need to extend:

CommitCoordinatorClient client = DynamoDBCCClientBuilder.buildCommitCoordinatorClient(
    "dynamodb", 
    config, 
    coordConf);

String commitCoordinatorName,
ConfigurationProvider sessionConfig,
Map<String, String> commitCoordinatorConf) {
final String builderConfKey = getCommitCoordinatorBuilderConfKey(commitCoordinatorName);

final String builderClassName;
try {
builderClassName = sessionConfig.getNonNull(builderConfKey);
} catch (NoSuchElementException | IllegalStateException ex) {
throw DeltaErrors.unknownCommitCoordinator(commitCoordinatorName, builderConfKey);
}

try {
return Class.forName(builderClassName)
.asSubclass(AbstractCommitCoordinatorBuilder.class)
.getConstructor()
.newInstance()
.build(sessionConfig, commitCoordinatorConf);
} catch (ClassNotFoundException
| NoSuchMethodException
| InstantiationException
| IllegalAccessException
| InvocationTargetException e) {
throw DeltaErrors.couldNotInstantiateCommitCoordinatorClient(
commitCoordinatorName, builderClassName, e);
}
}

/** Returns the builder configuration key for the given commit coordinator name */
public static String getCommitCoordinatorBuilderConfKey(String ccName) {
return String.format("io.delta.kernel.commitCoordinatorBuilder.%s.impl", ccName);
}

////////////////////
// Member Methods //
////////////////////

/** Subclasses of this class must provide a no-argument constructor. */
public AbstractCommitCoordinatorBuilder() {}

/** @return the commit coordinator name */
public abstract String getName();

/**
* Build the {@link CommitCoordinatorClient}. This may be a new instance or a cached one.
*
* @param sessionConfig The session-level configuration that may represent environment-specific
* configurations such as {@code HadoopConf} or {@code SparkConf}. This sessionConfig can be
* used by builders to lookup per-session configuration values unique to that builder.
* @param commitCoordinatorConf the parsed value of the Delta table property {@link
* io.delta.kernel.internal.TableConfig#COORDINATED_COMMITS_COORDINATOR_CONF} which represents
* the configuration properties the commit coordinator client needs to be correctly
* instantiated and to communicate to the commit coordinator.
* @return the {@link CommitCoordinatorClient}
*/
public abstract CommitCoordinatorClient build(
ConfigurationProvider sessionConfig, Map<String, String> commitCoordinatorConf);
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,11 @@ GetCommitsResponse getCommits(
*
* @param engine The {@link Engine} instance to use, if needed.
* @param tableDescriptor The descriptor for the table.
* @param version The version until which the commit coordinator client should backfill.
* @param lastKnownBackfilledVersion The last known version that was backfilled before this API
* was called. If it is {@link Optional#empty()}, then the commit coordinator client should
* backfill from the beginning of the table.
* @param version The version (inclusive) until which the commit coordinator client should
* backfill.
* @param lastKnownBackfilledVersion The last known version (inclusive) that was backfilled before
* this API was called. If it is {@link Optional#empty()}, then the commit coordinator client
* should backfill from the beginning of the table.
* @throws IOException if there is an IO error while backfilling the commits.
*/
void backfillToVersion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
* The complete descriptor of a Coordinated Commits (CC) Delta table, including its logPath, table
* identifier, and table CC configuration.
*
* <p>The table identifier is not required for path-based tables.
*
* @since 3.3.0
*/
@Evolving
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,26 @@ public static KernelException invalidVersionRange(long startVersion, long endVer
return new KernelException(message);
}

/* ------------------------ PROTOCOL EXCEPTIONS ----------------------------- */
/* ------------------------ COORDINATED COMMITS EXCEPTIONS - START ------------------------ */

public static KernelException unknownCommitCoordinator(String ccName, String ccBuilderConfKey) {
return new KernelException(
String.format(
"Unknown commit coordinator: '%s'. Please ensure that session config '%s' is set.",
ccName, ccBuilderConfKey));
}

public static KernelException couldNotInstantiateCommitCoordinatorClient(
String ccName, String ccBuilderClassName, Exception ex) {
return new KernelException(
String.format(
"Could not instantiate Commit Coordinator Client for '%s' using builder class '%s'.",
ccName, ccBuilderClassName),
ex);
}

/* ------------------------ COORDINATED COMMITS EXCEPTIONS - END ------------------------ */
/* ------------------------ PROTOCOL EXCEPTIONS - START ----------------------------- */

public static KernelException unsupportedReaderProtocol(
String tablePath, int tableReaderVersion) {
Expand Down Expand Up @@ -179,6 +198,8 @@ public static KernelException unsupportedWriterFeature(String tablePath, String
return new KernelException(message);
}

/* ------------------------ PROTOCOL EXCEPTIONS - END ----------------------------- */

public static KernelException columnInvariantsNotSupported() {
String message =
"This version of Delta Kernel does not support writing to tables with "
Expand Down
Loading