Skip to content

Commit

Permalink
Implement replace branch in BranchManager
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed May 27, 2024
1 parent 46816e9 commit ab21525
Show file tree
Hide file tree
Showing 11 changed files with 341 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ public void deleteBranch(String branchName) {
wrapped.deleteBranch(branchName);
}

@Override
public void replaceBranch(String fromBranch) {
privilegeChecker.assertCanInsert(identifier);
wrapped.replaceBranch(fromBranch);
}

@Override
public ExpireSnapshots newExpireSnapshots() {
privilegeChecker.assertCanInsert(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.BranchManager.getBranchPath;
import static org.apache.paimon.utils.BranchManager.isMainBranch;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
import static org.apache.paimon.utils.Preconditions.checkState;

Expand Down Expand Up @@ -500,17 +499,13 @@ public static TableSchema fromPath(FileIO fileIO, Path path) {
}

public Path schemaDirectory() {
return isMainBranch(branch)
? new Path(tableRoot + "/schema")
: new Path(getBranchPath(tableRoot, branch) + "/schema");
return new Path(getBranchPath(fileIO, tableRoot, branch) + "/schema");
}

@VisibleForTesting
public Path toSchemaPath(long schemaId) {
return isMainBranch(branch)
? new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + schemaId)
: new Path(
getBranchPath(tableRoot, branch) + "/schema/" + SCHEMA_PREFIX + schemaId);
return new Path(
getBranchPath(fileIO, tableRoot, branch) + "/schema/" + SCHEMA_PREFIX + schemaId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,11 @@ public void deleteBranch(String branchName) {
branchManager().deleteBranch(branchName);
}

@Override
public void replaceBranch(String fromBranch) {
branchManager().replaceBranch(fromBranch);
}

@Override
public void rollbackTo(String tagName) {
TagManager tagManager = tagManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@ default void deleteBranch(String branchName) {
this.getClass().getSimpleName()));
}

@Override
default void replaceBranch(String fromBranch) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support replaceBranch.",
this.getClass().getSimpleName()));
}

@Override
default ExpireSnapshots newExpireSnapshots() {
throw new UnsupportedOperationException(
Expand Down
3 changes: 3 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ public interface Table extends Serializable {
@Experimental
void deleteBranch(String branchName);

@Experimental
void replaceBranch(String fromBranch);

/** Manually expire snapshots, parameters can be controlled independently of table options. */
@Experimental
ExpireSnapshots newExpireSnapshots();
Expand Down
163 changes: 152 additions & 11 deletions paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedMap;
import java.util.stream.Collectors;

Expand All @@ -49,6 +52,7 @@ public class BranchManager {

public static final String BRANCH_PREFIX = "branch-";
public static final String DEFAULT_MAIN_BRANCH = "main";
public static final String MAIN_BRANCH_FILE = "MAIN-BRANCH";

private final FileIO fileIO;
private final Path tablePath;
Expand All @@ -69,6 +73,12 @@ public BranchManager(
this.schemaManager = schemaManager;
}

/** Commit specify branch to main. */
public void commitMainBranch(String branchName) throws IOException {
Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE);
fileIO.overwriteFileUtf8(mainBranchFile, branchName);
}

/** Return the root Directory of branch. */
public Path branchDirectory() {
return new Path(tablePath + "/branch");
Expand All @@ -79,13 +89,45 @@ public static boolean isMainBranch(String branch) {
}

/** Return the path string of a branch. */
public static String getBranchPath(Path tablePath, String branchName) {
return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branchName;
public static String getBranchPath(FileIO fileIO, Path tablePath, String branch) {
if (isMainBranch(branch)) {
Path path = new Path(tablePath, MAIN_BRANCH_FILE);
try {
if (fileIO.exists(path)) {
String data = fileIO.readFileUtf8(path);
if (StringUtils.isBlank(data)) {
return tablePath.toString();
} else {
return tablePath.toString() + "/branch/" + BRANCH_PREFIX + data;
}
} else {
return tablePath.toString();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branch;
}

public String defaultMainBranch() {
Path path = new Path(tablePath, MAIN_BRANCH_FILE);
try {
if (fileIO.exists(path)) {
String data = fileIO.readFileUtf8(path);
if (!StringUtils.isBlank(data)) {
return data;
}
}
return DEFAULT_MAIN_BRANCH;
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/** Return the path of a branch. */
public Path branchPath(String branchName) {
return new Path(getBranchPath(tablePath, branchName));
return new Path(getBranchPath(fileIO, tablePath, branchName));
}

/** Create empty branch. */
Expand All @@ -111,7 +153,7 @@ public void createBranch(String branchName) {
throw new RuntimeException(
String.format(
"Exception occurs when create branch '%s' (directory in %s).",
branchName, getBranchPath(tablePath, branchName)),
branchName, getBranchPath(fileIO, tablePath, branchName)),
e);
}
}
Expand Down Expand Up @@ -143,17 +185,17 @@ public void createBranch(String branchName, long snapshotId) {
throw new RuntimeException(
String.format(
"Exception occurs when create branch '%s' (directory in %s).",
branchName, getBranchPath(tablePath, branchName)),
branchName, getBranchPath(fileIO, tablePath, branchName)),
e);
}
}

public void createBranch(String branchName, String tagName) {
String mainBranch = defaultMainBranch();
checkArgument(
!isMainBranch(branchName),
String.format(
"Branch name '%s' is the default branch and cannot be used.",
DEFAULT_MAIN_BRANCH));
"Branch name '%s' is the default branch and cannot be used.", mainBranch));
checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName);
checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName);
checkArgument(tagManager.tagExists(tagName), "Tag name '%s' not exists.", tagName);
Expand All @@ -179,7 +221,7 @@ public void createBranch(String branchName, String tagName) {
throw new RuntimeException(
String.format(
"Exception occurs when create branch '%s' (directory in %s).",
branchName, getBranchPath(tablePath, branchName)),
branchName, getBranchPath(fileIO, tablePath, branchName)),
e);
}
}
Expand All @@ -193,11 +235,111 @@ public void deleteBranch(String branchName) {
LOG.info(
String.format(
"Deleting the branch failed due to an exception in deleting the directory %s. Please try again.",
getBranchPath(tablePath, branchName)),
getBranchPath(fileIO, tablePath, branchName)),
e);
}
}

/** Replace specify branch to main branch. */
public void replaceBranch(String branchName) {
String mainBranch = defaultMainBranch();
checkArgument(
!isMainBranch(branchName),
String.format(
"Branch name '%s' is the default main branch and cannot be replaced repeatedly.",
mainBranch));
checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName);
checkArgument(branchExists(branchName), "Branch name '%s' not exists.", branchName);
try {
// 0. Cache previous tag,snapshot,schema directory.
Path tagDirectory = tagManager.tagDirectory();
Path snapshotDirectory = snapshotManager.snapshotDirectory();
Path schemaDirectory = schemaManager.schemaDirectory();
// 1. Calculate and copy the snapshots, tags and schemas which should be copied from the
// main to branch.
calculateCopyMainToBranch(branchName);
// 2. Update the Main Branch File to the target branch.
commitMainBranch(branchName);
// 3.Drop the previous main branch, including snapshots, tags and schemas.
dropPreviousMainBranch(tagDirectory, snapshotDirectory, schemaDirectory);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/** Calculate copy main branch to target branch. */
private void calculateCopyMainToBranch(String branchName) throws IOException {
TableBranch fromBranch =
this.branches().stream()
.filter(branch -> branch.getBranchName().equals(branchName))
.findFirst()
.orElse(null);
if (fromBranch == null) {
throw new RuntimeException(String.format("No branches found %s", branchName));
}
Snapshot fromSnapshot = snapshotManager.snapshot(fromBranch.getCreatedFromSnapshot());
// Copy tags.
List<String> tags = tagManager.allTagNames();
TagManager branchTagManager = tagManager.copyWithBranch(branchName);
for (String tagName : tags) {
if (branchTagManager.tagExists(tagName)) {
// If it already exists, skip it directly.
continue;
}
Snapshot snapshot = tagManager.taggedSnapshot(tagName);
if (snapshot.id() < fromSnapshot.id()) {
fileIO.copyFileUtf8(tagManager.tagPath(tagName), branchTagManager.tagPath(tagName));
}
}
// Copy snapshots.
Iterator<Snapshot> snapshots = snapshotManager.snapshots();
SnapshotManager branchSnapshotManager = snapshotManager.copyWithBranch(branchName);
while (snapshots.hasNext()) {
Snapshot snapshot = snapshots.next();
if (snapshot.id() >= fromSnapshot.id()) {
continue;
}
if (branchSnapshotManager.snapshotExists(snapshot.id())) {
// If it already exists, skip it directly.
continue;
}
fileIO.copyFileUtf8(
snapshotManager.snapshotPath(snapshot.id()),
branchSnapshotManager.snapshotPath(snapshot.id()));
}

// Copy schemas.
List<Long> schemaIds = schemaManager.listAllIds();
SchemaManager branchSchemaManager = schemaManager.copyWithBranch(branchName);
Set<Long> existsSchemas = new HashSet<>(branchSchemaManager.listAllIds());

for (Long schemaId : schemaIds) {
if (existsSchemas.contains(schemaId)) {
// If it already exists, skip it directly.
continue;
}
TableSchema tableSchema = schemaManager.schema(schemaId);
if (tableSchema.id() < fromSnapshot.schemaId()) {
fileIO.copyFileUtf8(
schemaManager.toSchemaPath(schemaId),
branchSchemaManager.toSchemaPath(schemaId));
}
}
}

/** Directly delete snapshot, tag , schema directory. */
private void dropPreviousMainBranch(
Path tagDirectory, Path snapshotDirectory, Path schemaDirectory) throws IOException {
// Delete tags.
fileIO.delete(tagDirectory, true);

// Delete snapshots.
fileIO.delete(snapshotDirectory, true);

// Delete schemas.
fileIO.delete(schemaDirectory, true);
}

/** Check if path exists. */
public boolean fileExists(Path path) {
try {
Expand Down Expand Up @@ -246,8 +388,7 @@ public List<TableBranch> branches() {
}
FileStoreTable branchTable =
FileStoreTableFactory.create(
fileIO, new Path(getBranchPath(tablePath, branchName)));

fileIO, new Path(getBranchPath(fileIO, tablePath, branchName)));
SortedMap<Snapshot, List<String>> snapshotTags = branchTable.tagManager().tags();
Long earliestSnapshotId = branchTable.snapshotManager().earliestSnapshotId();
if (snapshotTags.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@

import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.BranchManager.getBranchPath;
import static org.apache.paimon.utils.BranchManager.isMainBranch;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;

/** Manager for {@link Snapshot}, providing utility methods related to paths and snapshot hints. */
Expand Down Expand Up @@ -90,35 +89,27 @@ public Path tablePath() {
}

public Path changelogDirectory() {
return isMainBranch(branch)
? new Path(tablePath + "/changelog")
: new Path(getBranchPath(tablePath, branch) + "/changelog");
return new Path(getBranchPath(fileIO, tablePath, branch) + "/changelog");
}

public Path longLivedChangelogPath(long snapshotId) {
return isMainBranch(branch)
? new Path(tablePath + "/changelog/" + CHANGELOG_PREFIX + snapshotId)
: new Path(
getBranchPath(tablePath, branch)
+ "/changelog/"
+ CHANGELOG_PREFIX
+ snapshotId);
return new Path(
getBranchPath(fileIO, tablePath, branch)
+ "/changelog/"
+ CHANGELOG_PREFIX
+ snapshotId);
}

public Path snapshotPath(long snapshotId) {
return isMainBranch(branch)
? new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId)
: new Path(
getBranchPath(tablePath, branch)
+ "/snapshot/"
+ SNAPSHOT_PREFIX
+ snapshotId);
return new Path(
getBranchPath(fileIO, tablePath, branch)
+ "/snapshot/"
+ SNAPSHOT_PREFIX
+ snapshotId);
}

public Path snapshotDirectory() {
return isMainBranch(branch)
? new Path(tablePath + "/snapshot")
: new Path(getBranchPath(tablePath, branch) + "/snapshot");
return new Path(getBranchPath(fileIO, tablePath, branch) + "/snapshot");
}

public Snapshot snapshot(long snapshotId) {
Expand Down
Loading

0 comments on commit ab21525

Please sign in to comment.