From 5db7bfb39c9eb0eb2d58636bcf9a2734cbfb04f5 Mon Sep 17 00:00:00 2001 From: Xiaojian Sun Date: Tue, 28 May 2024 15:35:23 +0800 Subject: [PATCH] Implement replace branch in BranchManager (#2911) --- .../privilege/PrivilegedFileStoreTable.java | 6 + .../apache/paimon/schema/SchemaManager.java | 5 +- .../paimon/table/AbstractFileStoreTable.java | 5 + .../apache/paimon/table/ReadonlyTable.java | 8 + .../java/org/apache/paimon/table/Table.java | 3 + .../apache/paimon/utils/BranchManager.java | 163 ++++++++++++++++-- .../apache/paimon/utils/SnapshotManager.java | 33 ++-- .../org/apache/paimon/utils/TagManager.java | 20 +-- .../procedure/ReplaceBranchProcedure.java | 54 ++++++ .../org.apache.paimon.factories.Factory | 1 + .../flink/action/BranchActionITCase.java | 94 +++++++++- 11 files changed, 341 insertions(+), 51 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java index 1881c7350900..e4b09df3893b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java @@ -204,6 +204,12 @@ public void deleteBranch(String branchName) { wrapped.deleteBranch(branchName); } + @Override + public void replaceBranch(String fromBranch) { + privilegeChecker.assertCanInsert(identifier); + wrapped.replaceBranch(fromBranch); + } + @Override public ExpireSnapshots newExpireSnapshots() { privilegeChecker.assertCanInsert(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index e95919517f61..cb478d7bad28 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -68,7 +68,6 @@ import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.BranchManager.getBranchPath; -import static org.apache.paimon.utils.BranchManager.isMainBranch; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; import static org.apache.paimon.utils.Preconditions.checkState; @@ -500,13 +499,13 @@ public static TableSchema fromPath(FileIO fileIO, Path path) { } public Path schemaDirectory() { - return new Path(getBranchPath(tableRoot, branch) + "/schema"); + return new Path(getBranchPath(fileIO, tableRoot, branch) + "/schema"); } @VisibleForTesting public Path toSchemaPath(long schemaId) { return new Path( - getBranchPath(tableRoot, branch) + "/schema/" + SCHEMA_PREFIX + schemaId); + getBranchPath(fileIO, tableRoot, branch) + "/schema/" + SCHEMA_PREFIX + schemaId); } /** diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index b7f62f4ab8d3..17846f1a5cae 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -530,6 +530,11 @@ public void mergeBranch(String branchName) { branchManager().mergeBranch(branchName); } + @Override + public void replaceBranch(String fromBranch) { + branchManager().replaceBranch(fromBranch); + } + @Override public void rollbackTo(String tagName) { TagManager tagManager = tagManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index 211bce3be684..326506201e34 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -190,6 +190,14 @@ default void mergeBranch(String branchName) { this.getClass().getSimpleName())); } + @Override + default void replaceBranch(String fromBranch) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support replaceBranch.", + this.getClass().getSimpleName())); + } + @Override default ExpireSnapshots newExpireSnapshots() { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index aeca29d63cbb..227f2836d931 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -115,6 +115,9 @@ public interface Table extends Serializable { @Experimental void mergeBranch(String branchName); + @Experimental + void replaceBranch(String fromBranch); + /** Manually expire snapshots, parameters can be controlled independently of table options. */ @Experimental ExpireSnapshots newExpireSnapshots(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 1361eefed978..785de6008cbe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -34,9 +34,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.PriorityQueue; +import java.util.Set; import java.util.SortedMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -51,6 +54,7 @@ public class BranchManager { public static final String BRANCH_PREFIX = "branch-"; public static final String DEFAULT_MAIN_BRANCH = "main"; + public static final String MAIN_BRANCH_FILE = "MAIN-BRANCH"; private final FileIO fileIO; private final Path tablePath; @@ -71,6 +75,12 @@ public BranchManager( this.schemaManager = schemaManager; } + /** Commit specify branch to main. */ + public void commitMainBranch(String branchName) throws IOException { + Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE); + fileIO.overwriteFileUtf8(mainBranchFile, branchName); + } + /** Return the root Directory of branch. */ public Path branchDirectory() { return new Path(tablePath + "/branch"); @@ -81,13 +91,45 @@ public static boolean isMainBranch(String branch) { } /** Return the path string of a branch. */ - public static String getBranchPath(Path tablePath, String branchName) { - return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branchName; + public static String getBranchPath(FileIO fileIO, Path tablePath, String branch) { + if (isMainBranch(branch)) { + Path path = new Path(tablePath, MAIN_BRANCH_FILE); + try { + if (fileIO.exists(path)) { + String data = fileIO.readFileUtf8(path); + if (StringUtils.isBlank(data)) { + return tablePath.toString(); + } else { + return tablePath.toString() + "/branch/" + BRANCH_PREFIX + data; + } + } else { + return tablePath.toString(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branch; + } + + public String defaultMainBranch() { + Path path = new Path(tablePath, MAIN_BRANCH_FILE); + try { + if (fileIO.exists(path)) { + String data = fileIO.readFileUtf8(path); + if (!StringUtils.isBlank(data)) { + return data; + } + } + return DEFAULT_MAIN_BRANCH; + } catch (IOException e) { + throw new RuntimeException(e); + } } /** Return the path of a branch. */ public Path branchPath(String branchName) { - return new Path(getBranchPath(tablePath, branchName)); + return new Path(getBranchPath(fileIO, tablePath, branchName)); } /** Create empty branch. */ @@ -113,7 +155,7 @@ public void createBranch(String branchName) { throw new RuntimeException( String.format( "Exception occurs when create branch '%s' (directory in %s).", - branchName, getBranchPath(tablePath, branchName)), + branchName, getBranchPath(fileIO, tablePath, branchName)), e); } } @@ -145,17 +187,17 @@ public void createBranch(String branchName, long snapshotId) { throw new RuntimeException( String.format( "Exception occurs when create branch '%s' (directory in %s).", - branchName, getBranchPath(tablePath, branchName)), + branchName, getBranchPath(fileIO, tablePath, branchName)), e); } } public void createBranch(String branchName, String tagName) { + String mainBranch = defaultMainBranch(); checkArgument( !isMainBranch(branchName), String.format( - "Branch name '%s' is the default branch and cannot be used.", - DEFAULT_MAIN_BRANCH)); + "Branch name '%s' is the default branch and cannot be used.", mainBranch)); checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName); checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName); checkArgument(tagManager.tagExists(tagName), "Tag name '%s' not exists.", tagName); @@ -181,7 +223,7 @@ public void createBranch(String branchName, String tagName) { throw new RuntimeException( String.format( "Exception occurs when create branch '%s' (directory in %s).", - branchName, getBranchPath(tablePath, branchName)), + branchName, getBranchPath(fileIO, tablePath, branchName)), e); } } @@ -195,11 +237,111 @@ public void deleteBranch(String branchName) { LOG.info( String.format( "Deleting the branch failed due to an exception in deleting the directory %s. Please try again.", - getBranchPath(tablePath, branchName)), + getBranchPath(fileIO, tablePath, branchName)), e); } } + /** Replace specify branch to main branch. */ + public void replaceBranch(String branchName) { + String mainBranch = defaultMainBranch(); + checkArgument( + !isMainBranch(branchName), + String.format( + "Branch name '%s' is the default main branch and cannot be replaced repeatedly.", + mainBranch)); + checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName); + checkArgument(branchExists(branchName), "Branch name '%s' not exists.", branchName); + try { + // 0. Cache previous tag,snapshot,schema directory. + Path tagDirectory = tagManager.tagDirectory(); + Path snapshotDirectory = snapshotManager.snapshotDirectory(); + Path schemaDirectory = schemaManager.schemaDirectory(); + // 1. Calculate and copy the snapshots, tags and schemas which should be copied from the + // main to branch. + calculateCopyMainToBranch(branchName); + // 2. Update the Main Branch File to the target branch. + commitMainBranch(branchName); + // 3.Drop the previous main branch, including snapshots, tags and schemas. + dropPreviousMainBranch(tagDirectory, snapshotDirectory, schemaDirectory); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** Calculate copy main branch to target branch. */ + private void calculateCopyMainToBranch(String branchName) throws IOException { + TableBranch fromBranch = + this.branches().stream() + .filter(branch -> branch.getBranchName().equals(branchName)) + .findFirst() + .orElse(null); + if (fromBranch == null) { + throw new RuntimeException(String.format("No branches found %s", branchName)); + } + Snapshot fromSnapshot = snapshotManager.snapshot(fromBranch.getCreatedFromSnapshot()); + // Copy tags. + List tags = tagManager.allTagNames(); + TagManager branchTagManager = tagManager.copyWithBranch(branchName); + for (String tagName : tags) { + if (branchTagManager.tagExists(tagName)) { + // If it already exists, skip it directly. + continue; + } + Snapshot snapshot = tagManager.taggedSnapshot(tagName); + if (snapshot.id() < fromSnapshot.id()) { + fileIO.copyFileUtf8(tagManager.tagPath(tagName), branchTagManager.tagPath(tagName)); + } + } + // Copy snapshots. + Iterator snapshots = snapshotManager.snapshots(); + SnapshotManager branchSnapshotManager = snapshotManager.copyWithBranch(branchName); + while (snapshots.hasNext()) { + Snapshot snapshot = snapshots.next(); + if (snapshot.id() >= fromSnapshot.id()) { + continue; + } + if (branchSnapshotManager.snapshotExists(snapshot.id())) { + // If it already exists, skip it directly. + continue; + } + fileIO.copyFileUtf8( + snapshotManager.snapshotPath(snapshot.id()), + branchSnapshotManager.snapshotPath(snapshot.id())); + } + + // Copy schemas. + List schemaIds = schemaManager.listAllIds(); + SchemaManager branchSchemaManager = schemaManager.copyWithBranch(branchName); + Set existsSchemas = new HashSet<>(branchSchemaManager.listAllIds()); + + for (Long schemaId : schemaIds) { + if (existsSchemas.contains(schemaId)) { + // If it already exists, skip it directly. + continue; + } + TableSchema tableSchema = schemaManager.schema(schemaId); + if (tableSchema.id() < fromSnapshot.schemaId()) { + fileIO.copyFileUtf8( + schemaManager.toSchemaPath(schemaId), + branchSchemaManager.toSchemaPath(schemaId)); + } + } + } + + /** Directly delete snapshot, tag , schema directory. */ + private void dropPreviousMainBranch( + Path tagDirectory, Path snapshotDirectory, Path schemaDirectory) throws IOException { + // Delete tags. + fileIO.delete(tagDirectory, true); + + // Delete snapshots. + fileIO.delete(snapshotDirectory, true); + + // Delete schemas. + fileIO.delete(schemaDirectory, true); + } + /** Check if path exists. */ public boolean fileExists(Path path) { try { @@ -316,8 +458,7 @@ public List branches() { } FileStoreTable branchTable = FileStoreTableFactory.create( - fileIO, new Path(getBranchPath(tablePath, branchName))); - + fileIO, new Path(getBranchPath(fileIO, tablePath, branchName))); SortedMap> snapshotTags = branchTable.tagManager().tags(); Long earliestSnapshotId = branchTable.snapshotManager().earliestSnapshotId(); if (snapshotTags.isEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 3e82108f8665..51c915d9cd38 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -47,7 +47,6 @@ import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.BranchManager.getBranchPath; -import static org.apache.paimon.utils.BranchManager.isMainBranch; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; /** Manager for {@link Snapshot}, providing utility methods related to paths and snapshot hints. */ @@ -90,35 +89,27 @@ public Path tablePath() { } public Path changelogDirectory() { - return isMainBranch(branch) - ? new Path(tablePath + "/changelog") - : new Path(getBranchPath(tablePath, branch) + "/changelog"); + return new Path(getBranchPath(fileIO, tablePath, branch) + "/changelog"); } public Path longLivedChangelogPath(long snapshotId) { - return isMainBranch(branch) - ? new Path(tablePath + "/changelog/" + CHANGELOG_PREFIX + snapshotId) - : new Path( - getBranchPath(tablePath, branch) - + "/changelog/" - + CHANGELOG_PREFIX - + snapshotId); + return new Path( + getBranchPath(fileIO, tablePath, branch) + + "/changelog/" + + CHANGELOG_PREFIX + + snapshotId); } public Path snapshotPath(long snapshotId) { - return isMainBranch(branch) - ? new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId) - : new Path( - getBranchPath(tablePath, branch) - + "/snapshot/" - + SNAPSHOT_PREFIX - + snapshotId); + return new Path( + getBranchPath(fileIO, tablePath, branch) + + "/snapshot/" + + SNAPSHOT_PREFIX + + snapshotId); } public Path snapshotDirectory() { - return isMainBranch(branch) - ? new Path(tablePath + "/snapshot") - : new Path(getBranchPath(tablePath, branch) + "/snapshot"); + return new Path(getBranchPath(fileIO, tablePath, branch) + "/snapshot"); } public Snapshot snapshot(long snapshotId) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 9a4a351420a7..59f3545c60af 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -47,7 +47,6 @@ import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.BranchManager.getBranchPath; -import static org.apache.paimon.utils.BranchManager.isMainBranch; import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -79,21 +78,12 @@ public TagManager copyWithBranch(String branchName) { /** Return the root Directory of tags. */ public Path tagDirectory() { - return isMainBranch(branch) - ? new Path(tablePath + "/tag") - : new Path(getBranchPath(tablePath, branch) + "/tag"); + return new Path(getBranchPath(fileIO, tablePath, branch) + "/tag"); } - - /** Return the path of tag directory in branch. */ - public Path branchTagDirectory(String branchName) { - return new Path(getBranchPath(tablePath, branchName) + "/tag"); - } - - /** Return the path of a tag in branch. */ + + /** Return the path of a tag. */ public Path tagPath(String tagName) { - return isMainBranch(branch) - ? new Path(tablePath + "/tag/" + TAG_PREFIX + tagName) - : new Path(getBranchPath(tablePath, branch) + "/tag/" + TAG_PREFIX + tagName); + return new Path(getBranchPath(fileIO, tablePath, branch) + "/tag/" + TAG_PREFIX + tagName); } /** Create a tag from given snapshot and save it in the storage. */ @@ -242,7 +232,7 @@ private void doClean( taggedSnapshot, tagDeletion.manifestSkippingSet(skippedSnapshots)); } - /** Check if a branch tag exists. */ + /** Check if a tag exists. */ public boolean tagExists(String tagName) { Path path = tagPath(tagName); try { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java new file mode 100644 index 000000000000..10ef4a67ae87 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.Table; + +import org.apache.flink.table.procedure.ProcedureContext; + +/** + * Replace branch procedure for given branch. Usage: + * + *

+ *  CALL sys.replace_branch('tableId', 'branchName')
+ * 
+ */ +public class ReplaceBranchProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "replace_branch"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + public String[] call(ProcedureContext procedureContext, String tableId, String branchName) + throws Catalog.TableNotExistException { + return innerCall(tableId, branchName); + } + + private String[] innerCall(String tableId, String branchName) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + table.replaceBranch(branchName); + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 848dd317d43b..33a43009d69b 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -51,3 +51,4 @@ org.apache.paimon.flink.procedure.privilege.DropPrivilegedUserProcedure org.apache.paimon.flink.procedure.privilege.GrantPrivilegeToUserProcedure org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure org.apache.paimon.flink.procedure.RepairProcedure +org.apache.paimon.flink.procedure.ReplaceBranchProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java index 007d1ac5c147..209b0d2e7bda 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java @@ -21,15 +21,22 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.StreamWriteBuilder; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.TableScan; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.junit.Assert; import org.junit.jupiter.api.Test; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init; import static org.assertj.core.api.Assertions.assertThat; @@ -115,7 +122,6 @@ void testCreateAndDeleteBranchWithSnapshotId() throws Exception { "CALL sys.create_branch('%s.%s', 'branch_name_with_snapshotId', 2)", database, tableName)); assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isTrue(); - branchManager.branches(); callProcedure( String.format( @@ -163,4 +169,90 @@ void testCreateAndDeleteEmptyBranch() throws Exception { database, tableName)); assertThat(branchManager.branchExists("empty_branch_name")).isFalse(); } + + @Test + void testReplaceBranch() throws Exception { + init(warehouse); + RowType rowType = + RowType.of( + new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()}, + new String[] {"k", "v"}); + FileStoreTable table = + createFileStoreTable( + rowType, + Collections.emptyList(), + Collections.singletonList("k"), + Collections.emptyList(), + Collections.emptyMap()); + + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + + // 3 snapshots + writeData(rowData(1L, BinaryString.fromString("Hi"))); + writeData(rowData(2L, BinaryString.fromString("Hello"))); + writeData(rowData(3L, BinaryString.fromString("Paimon"))); + + // Create tag2 + TagManager tagManager = new TagManager(table.fileIO(), table.location()); + callProcedure( + String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); + assertThat(tagManager.tagExists("tag2")).isTrue(); + + // Create replace_branch_name branch + BranchManager branchManager = table.branchManager(); + callProcedure( + String.format( + "CALL sys.create_branch('%s.%s', 'replace_branch_name', 'tag2')", + database, tableName)); + assertThat(branchManager.branchExists("replace_branch_name")).isTrue(); + + // Replace branch + callProcedure( + String.format( + "CALL sys.replace_branch('%s.%s', 'replace_branch_name')", + database, tableName)); + + // Check snapshot + SnapshotManager snapshotManager = table.snapshotManager(); + assertThat(snapshotManager.snapshotExists(3)).isFalse(); + + // Renew write + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + + // Add data, forward to replace branch + for (long i = 4; i < 14; i++) { + writeData(rowData(i, BinaryString.fromString(String.format("new.data_%s", i)))); + } + + ReadBuilder readBuilder = table.newReadBuilder(); + TableScan.Plan plan = readBuilder.newScan().plan(); + List result = + getResult( + readBuilder.newRead(), + plan == null ? Collections.emptyList() : plan.splits(), + rowType); + List sortedActual = new ArrayList<>(result); + List expected = + Arrays.asList( + "+I[1, Hi]", + "+I[2, Hello]", + "+I[4, new.data_4]", + "+I[5, new.data_5]", + "+I[6, new.data_6]", + "+I[7, new.data_7]", + "+I[8, new.data_8]", + "+I[9, new.data_9]", + "+I[10, new.data_10]", + "+I[11, new.data_11]", + "+I[12, new.data_12]", + "+I[13, new.data_13]"); + Assert.assertEquals(expected, sortedActual); + + callProcedure( + String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)", database, tableName)); + assertThat(tagManager.tagExists("tag3")).isTrue(); + } }