Skip to content

Commit

Permalink
[Spark][Kernel] Remove duplicate CoordinatedCommitsUtils.java from Dy…
Browse files Browse the repository at this point in the history
…namoDBCommitCoordinator (#3653)

#### Which Delta project/connector is this regarding?

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [x] Kernel
- [ ] Other

## Description

- Remove duplicate CoordinatedCommitsUtils.java from
DynamoDBCommitCoordinator package.
- Move the common utils to non-test package.

## How was this patch tested?

Compile

## Does this PR introduce _any_ user-facing changes?

No
  • Loading branch information
sumeet-db committed Sep 12, 2024
1 parent 920f185 commit 1aaf10b
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 82 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ private GetCommitsResultInternal getCommitsImpl(
Long.parseLong(item.get(DynamoDBTableEntryConstants.TABLE_LATEST_VERSION).getN());
AttributeValue allStoredCommits = item.get(DynamoDBTableEntryConstants.COMMITS);
ArrayList<Commit> commits = new ArrayList<>();
Path unbackfilledCommitsPath = new Path(logPath, CoordinatedCommitsUtils.COMMIT_SUBDIR);
Path unbackfilledCommitsPath = CoordinatedCommitsUtils.commitDirPath(logPath);
for(AttributeValue attr: allStoredCommits.getL()) {
java.util.Map<String, AttributeValue> commitMap = attr.getM();
long commitVersion =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,24 @@ public class CoordinatedCommitsUtils {
private CoordinatedCommitsUtils() {}

/** The subdirectory in which to store the unbackfilled commit files. */
final static String COMMIT_SUBDIR = "_commits";
private static final String COMMIT_SUBDIR = "_commits";

/** The configuration key for the coordinated commits owner. */
private static final String COORDINATED_COMMITS_COORDINATOR_CONF_KEY =
/** The configuration key for the coordinated commits owner name. */
private static final String COORDINATED_COMMITS_COORDINATOR_NAME_KEY =
"delta.coordinatedCommits.commitCoordinator-preview";

/**
* Creates a new unbackfilled delta file path for the given commit version.
* The path is of the form `tablePath/_delta_log/_commits/00000000000000000001.uuid.json`.
*/
public static Path generateUnbackfilledDeltaFilePath(
Path logPath,
long version) {
String uuid = UUID.randomUUID().toString();
Path basePath = new Path(logPath, COMMIT_SUBDIR);
return new Path(basePath, String.format("%020d.%s.json", version, uuid));
}

/**
* Returns the path to the backfilled delta file for the given commit version.
* The path is of the form `tablePath/_delta_log/00000000000000000001.json`.
Expand Down Expand Up @@ -108,10 +120,10 @@ public static Path commitDirPath(Path logPath) {
return new Path(logPath, COMMIT_SUBDIR);
}

private static String getCoordinator(AbstractMetadata metadata) {
public static String getCoordinator(AbstractMetadata metadata) {
String coordinator = metadata
.getConfiguration()
.get(COORDINATED_COMMITS_COORDINATOR_CONF_KEY);
.get(COORDINATED_COMMITS_COORDINATOR_NAME_KEY);
return coordinator != null ? coordinator : "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.locks.ReentrantReadWriteLock

import io.delta.storage.LogStore
import io.delta.storage.commit.CoordinatedCommitsUtils
import io.delta.storage.commit.actions.AbstractMetadata
import io.delta.storage.commit.actions.AbstractProtocol
import org.apache.hadoop.conf.Configuration
Expand Down

0 comments on commit 1aaf10b

Please sign in to comment.