Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] [CC Refactor #2] Add TableDescriptor and CommitCoordinatorClient API #3797

Merged
merged 13 commits into from
Nov 1, 2024

Conversation

scottsand-db
Copy link
Collaborator

@scottsand-db scottsand-db commented Oct 23, 2024

This is a stacked PR. Please view this PR's diff here:

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Adds new TableDescriptor and CommitCoordinatorClient API. Adds a new getCommitCoordinatorClient API to the Engine (with a default implementation that throws an exception).

How was this patch tested?

N/A trivial.

Does this PR introduce any user-facing changes?

Yes. See the above.

Copy link
Collaborator

@allisonport-db allisonport-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly clarifying questions due to my lack of knowledge for CC

Comment on lines +51 to +59
* Register the table represented by the given {@code logPath} at the provided {@code
* currentVersion} with the commit coordinator this commit coordinator client represents.
*
* <p>This API is called when the table is being converted from an existing file system table to a
* coordinated-commit table.
*
* <p>When a new coordinated-commit table is being created, the {@code currentVersion} will be -1
* and the upgrade commit needs to be a file system commit which will write the backfilled file
* directly.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding you (the client, ie Spark/Kernel etc) call this first for some version N. Then when commit is called with version N, the CCC recognizes that this is the same version and thus the commit needs to be immediately backfilled/written to the filesystem?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit N would add the CC configuration to the table so it'll be available in version N+1. It is not in version N so the commit does not go through the newly added commit coordinator client but rather just through the file system, i.e. backfilling is not necessary.

* @param tableDescriptor The descriptor for the table.
* @param commitVersion The version of the commit that is being committed.
* @param actions The set of actions to be committed
* @param updatedActions Additional information for the commit, including:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry a bunch of questions about CC writes not necessarily specific to this PR. What are the updatedActions for and why do they need to be separated from the other actions?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the updatedActions for and why do they need to be separated from the other actions?

Let's ask the feature owners: cc @dhruvarya-db and @sumeet-db and @prakharjain09

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The updated actions are the CommitInfo and the previous and current Metadata/Protocol. They are also included in the actions (Protocol and Metadata only if they changed) but we want to pass them separately for convenience in case commit coordinator client implementations want to do something with them (for example check if the Metadata of a table has changed).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it thanks!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the actions is an iterator i.e. it can only be traversed once. The commit coordinator can't go through it to get these important updates (e.g. schema change / protocol change). So the API explicitly passes such updates.

* <li>Protocol changes
* </ul>
*
* @return {@link CommitResponse} containing the file status of the committed file. Note: If the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the unbackfilled file right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily. It's acceptable for CC-Client to return the backfilled file too.

*/
default CommitCoordinatorClient getCommitCoordinatorClient(
String commitCoordinatorName, Map<String, String> commitCoordinatorConf) {
throw new UnsupportedOperationException("Not implemented");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this temporarily with a default implementation or will we be keeping it like this?

What will be the expected behavior if an engine interface hasn't implemented this method but some user tries to read a CC table? Will it throw this exception? Or do we want to force all engine impls to override this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

temporarily with a default implementation
Yup! This is so we can merge this without having to go and update all implementations of Engine within this PR

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it right to assume that implementations will still be able to read other dynamic configurations when building the coordinator? e.g. the Delta-spark getCCC interface also takes in a sparkSession allowing for dynamic configuration of the client. Implementations of this method will still be able to read some other configuration source (even though it is not explicitly being passed) right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it right to assume that implementations will still be able to read other dynamic configurations when building the coordinator?

Absolutely. We leave it up to the engine to create the CCC. If the engine is aware of any dynamodb configurations, it can use them!

Implementations of this method will still be able to read some other configuration source (even though it is not explicitly being passed) right?

Yes. I'd encourage you to look at the tracking issue #3817 to look at future PRs where you can see this being done.

Copy link
Collaborator

@allisonport-db allisonport-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes themselves LGTM

* and the upgrade commit needs to be a file system commit which will write the backfilled file
* directly.
*
* @param engine The {@link Engine} instance to use.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some information on what the Engine would/should be used for during table registration?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LukasRupprecht -- we don't want to prescribe what the engine should be used for. It can be used for any json reading or filesystem operations needed. who knows how they implement their Commit Coordinator Client.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also just convention to pass in the engine, so that the client may use any future engine interfaces (think logging or metrics) without having so save a reference of the engine

/**
* Commit the given set of actions to the table represented by {@code tableDescriptor}.
*
* @param engine The {@link Engine} instance to use.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, add some info for what we need the engine (here it'd be mainly for writing the commit file). Same for the other APIs below.

Map<String, String> registerTable(
Engine engine,
String logPath,
@Nullable TableIdentifier tableIdentifier,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we use Optional like in TableDescriptor?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been flip-flopping on this, myself.

Using Optional is not preferred in a java public API ... but I've been considering refactoring and using Optional actually

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LukasRupprecht -- I'll use Optional 👍

Comment on lines 27 to 28
* The complete descriptor of a Coordinated Commits (CC) Delta table, including its logPath, table
* identifier, and table CC table configuration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* The complete descriptor of a Coordinated Commits (CC) Delta table, including its logPath, table
* identifier, and table CC table configuration.
* The complete descriptor of a Coordinated Commits (CC) Delta table, including its logPath, table
* identifier (if access is not path-based), and table CC table configuration.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... that's not quite correct, right? It's not like you pass either one of the path OR the identifier, but not BOTH, to the CC. We are passing both ...

Copy link
Contributor

@LukasRupprecht LukasRupprecht Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant that if the access is path-based, there won't be a table identifier. This would explain why it is optional (because in some cases, there won't be one).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll include this change in PR 3 #3798

@scottsand-db scottsand-db merged commit 6ae4b62 into delta-io:master Nov 1, 2024
19 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants