Skip to content

Commit

Permalink
remove merge branch
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 3, 2024
1 parent 412a1ff commit 8c63952
Show file tree
Hide file tree
Showing 8 changed files with 0 additions and 231 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -461,11 +461,6 @@ public void replaceBranch(String fromBranch) {
branchManager().replaceBranch(fromBranch);
}

@Override
public void mergeBranch(String fromBranch) {
branchManager().mergeBranch(fromBranch);
}

@Override
public void rollbackTo(String tagName) {
TagManager tagManager = tagManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,6 @@ default void replaceBranch(String fromBranch) {
this.getClass().getSimpleName()));
}

@Override
default void mergeBranch(String fromBranch) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support mergeBranch.",
this.getClass().getSimpleName()));
}

@Override
default ExpireSnapshots newExpireSnapshots() {
throw new UnsupportedOperationException(
Expand Down
3 changes: 0 additions & 3 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,6 @@ public interface Table extends Serializable {
@Experimental
void replaceBranch(String fromBranch);

@Experimental
void mergeBranch(String fromBranch);

/** Manually expire snapshots, parameters can be controlled independently of table options. */
@Experimental
ExpireSnapshots newExpireSnapshots();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -153,96 +152,6 @@ public void deleteBranch(String branchName) {
}
}

/** Merge specify branch into main. */
public void mergeBranch(String fromBranch) {
checkArgument(!StringUtils.isBlank(fromBranch), "Branch name '%s' is blank.", fromBranch);
checkArgument(branchExists(fromBranch), "Branch name '%s' not exists.", fromBranch);
try {
TableBranch tableFromBranch =
this.branches().stream()
.filter(branch -> branch.getBranchName().equals(fromBranch))
.findFirst()
.orElse(null);
if (tableFromBranch == null) {
throw new RuntimeException(String.format("No branches found %s", fromBranch));
}
if (cleanMainBranch(tableFromBranch)) {
copyBranchToMain(fromBranch);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private void copyBranchToMain(String fromBranch) throws IOException {
// Copy the corresponding tag, snapshot and schema files into the branch directory
SortedMap<Snapshot, List<String>> tags = tagManager.tagsWithBranch(fromBranch);

for (Map.Entry<Snapshot, List<String>> tagEntry : tags.entrySet()) {
for (String tagName : tagEntry.getValue()) {
fileIO.copyFileUtf8(
tagManager.branchTagPath(fromBranch, tagName), tagManager.tagPath(tagName));
}
}
Iterator<Snapshot> snapshotIterator = snapshotManager.snapshotsWithBranch(fromBranch);
while (snapshotIterator.hasNext()) {
Snapshot snapshot = snapshotIterator.next();
fileIO.copyFileUtf8(
snapshotManager.branchSnapshotPath(fromBranch, snapshot.id()),
snapshotManager.snapshotPath(snapshot.id()));
}

List<Long> schemaIds = schemaManager.listAllIdsWithBranch(fromBranch);
for (Long schemaId : schemaIds) {
fileIO.copyFileUtf8(
schemaManager.branchSchemaPath(fromBranch, schemaId),
schemaManager.toSchemaPath(schemaId));
}
}

/**
* Delete all the snapshots, tags and schemas in the main branch that are created after the
* created tag for the branch.
*/
private boolean cleanMainBranch(TableBranch fromBranch) throws IOException {
// clean tags.
List<TableTag> tags = tagManager.tableTags();
TableTag fromTag =
tags.stream()
.filter(
tableTag ->
tableTag.getTagName()
.equals(fromBranch.getCreatedFromTag()))
.findFirst()
.get();
for (TableTag tag : tags) {
if (tag.getCreateTime() >= fromTag.getCreateTime()) {
fileIO.delete(tagManager.tagPath(tag.getTagName()), true);
}
}
// clean snapshots.
Iterator<Snapshot> snapshots = snapshotManager.snapshots();
Snapshot fromSnapshot = snapshotManager.snapshot(fromBranch.getCreatedFromSnapshot());
while (snapshots.hasNext()) {
Snapshot snapshot = snapshots.next();
if (snapshot.id() >= fromSnapshot.id()) {
fileIO.delete(snapshotManager.snapshotPath(snapshot.id()), true);
}
}
// Clean latest file.
snapshotManager.deleteLatestHint();

// clean schemas.
List<Long> schemaIds = schemaManager.listAllIds();
for (Long schemaId : schemaIds) {
TableSchema tableSchema = schemaManager.schema(schemaId);
if (tableSchema.id() >= fromSnapshot.schemaId()) {
fileIO.delete(schemaManager.toSchemaPath(schemaId), true);
}
}
return true;
}

/** Replace specify branch to main branch. */
public void replaceBranch(String branchName) {
checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,10 +478,6 @@ private Long findByListFiles(BinaryOperator<Long> reducer, String branchName)
.orElse(null);
}

public void deleteLatestHint() throws IOException {
deleteLatestHint(DEFAULT_MAIN_BRANCH);
}

public void deleteLatestHint(String branchName) throws IOException {
Path snapshotDir = snapshotDirByBranch(branchName);
Path hintFile = new Path(snapshotDir, LATEST);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,4 @@ org.apache.paimon.flink.procedure.MigrateFileProcedure
org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
org.apache.paimon.flink.procedure.QueryServiceProcedure
org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
org.apache.paimon.flink.procedure.MergeBranchProcedure
org.apache.paimon.flink.procedure.ReplaceBranchProcedure
Original file line number Diff line number Diff line change
Expand Up @@ -87,71 +87,6 @@ void testCreateAndDeleteBranch() throws Exception {
assertThat(branchManager.branchExists("branch_name")).isFalse();
}

@Test
void testMergeBranchToTargetBranch() throws Exception {
init(warehouse);

RowType rowType =
RowType.of(
new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},
new String[] {"k", "v"});
FileStoreTable table =
createFileStoreTable(
rowType,
Collections.emptyList(),
Collections.singletonList("k"),
Collections.emptyMap());

StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser);
write = writeBuilder.newWrite();
commit = writeBuilder.newCommit();

// 3 snapshots
writeData(rowData(1L, BinaryString.fromString("Hi")));
writeData(rowData(2L, BinaryString.fromString("Hello")));
writeData(rowData(3L, BinaryString.fromString("Paimon")));

TagManager tagManager = new TagManager(table.fileIO(), table.location());
callProcedure(
String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName));
assertThat(tagManager.tagExists("tag2")).isTrue();

BranchManager branchManager = table.branchManager();
callProcedure(
String.format(
"CALL sys.create_branch('%s.%s', 'merge_branch_name', 'tag2')",
database, tableName));
assertThat(branchManager.branchExists("merge_branch_name")).isTrue();

// 4-5 snapshots
writeData(rowData(4L, BinaryString.fromString("new.data_4")));
writeData(rowData(5L, BinaryString.fromString("new.data_5")));

callProcedure(
String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)", database, tableName));
assertThat(tagManager.tagExists("tag3")).isTrue();

callProcedure(
String.format(
"CALL sys.merge_branch('%s.%s', 'merge_branch_name')",
database, tableName));
assertThat(branchManager.branchExists("merge_branch_name")).isTrue();
assertThat(tagManager.tagExists("tag3")).isFalse();

// Check snapshot after merge branch
SnapshotManager snapshotManager = table.snapshotManager();
assertThat(snapshotManager.snapshotExists(3)).isFalse();
assertThat(snapshotManager.snapshotExists(4)).isFalse();
assertThat(snapshotManager.snapshotExists(5)).isFalse();

// 3-4 snapshots
writeData(rowData(6L, BinaryString.fromString("new.data_6")));
writeData(rowData(7L, BinaryString.fromString("new.data_7")));

assertThat(snapshotManager.snapshotExists(3)).isTrue();
assertThat(snapshotManager.snapshotExists(4)).isTrue();
}

@Test
void testReplaceBranchToTargetBranch() throws Exception {
init(warehouse);
Expand Down

0 comments on commit 8c63952

Please sign in to comment.