diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 0001d5b19867..d35ece4e31d5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -774,6 +774,7 @@ public boolean tryCommitOnce( @Nullable String newStatsFileName) { long newSnapshotId = latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshot.id() + 1; + Path newSnapshotPath = branchName.equals(DEFAULT_MAIN_BRANCH) ? snapshotManager.snapshotPath(newSnapshotId) diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java index 1881c7350900..e4b09df3893b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java @@ -204,6 +204,12 @@ public void deleteBranch(String branchName) { wrapped.deleteBranch(branchName); } + @Override + public void replaceBranch(String fromBranch) { + privilegeChecker.assertCanInsert(identifier); + wrapped.replaceBranch(fromBranch); + } + @Override public ExpireSnapshots newExpireSnapshots() { privilegeChecker.assertCanInsert(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 228d30d5cb08..cb478d7bad28 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -68,7 +68,6 @@ import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.BranchManager.getBranchPath; -import static org.apache.paimon.utils.BranchManager.isMainBranch; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; import static org.apache.paimon.utils.Preconditions.checkState; @@ -500,17 +499,13 @@ public static TableSchema fromPath(FileIO fileIO, Path path) { } public Path schemaDirectory() { - return isMainBranch(branch) - ? new Path(tableRoot + "/schema") - : new Path(getBranchPath(tableRoot, branch) + "/schema"); + return new Path(getBranchPath(fileIO, tableRoot, branch) + "/schema"); } @VisibleForTesting public Path toSchemaPath(long schemaId) { - return isMainBranch(branch) - ? new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + schemaId) - : new Path( - getBranchPath(tableRoot, branch) + "/schema/" + SCHEMA_PREFIX + schemaId); + return new Path( + getBranchPath(fileIO, tableRoot, branch) + "/schema/" + SCHEMA_PREFIX + schemaId); } /** diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 655e81431690..82cc47ad5a47 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -525,6 +525,11 @@ public void deleteBranch(String branchName) { branchManager().deleteBranch(branchName); } + @Override + public void replaceBranch(String fromBranch) { + branchManager().replaceBranch(fromBranch); + } + @Override public void rollbackTo(String tagName) { TagManager tagManager = tagManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index 42bea3f6813e..dcb62dfcbea6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -182,6 +182,14 @@ default void deleteBranch(String branchName) { this.getClass().getSimpleName())); } + @Override + default void replaceBranch(String fromBranch) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support replaceBranch.", + this.getClass().getSimpleName())); + } + @Override default ExpireSnapshots newExpireSnapshots() { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 876908394f2d..d01ecc95cdb2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -111,6 +111,9 @@ public interface Table extends Serializable { @Experimental void deleteBranch(String branchName); + @Experimental + void replaceBranch(String fromBranch); + /** Manually expire snapshots, parameters can be controlled independently of table options. */ @Experimental ExpireSnapshots newExpireSnapshots(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 41099dfac46b..c19f18361286 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -33,9 +33,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.PriorityQueue; +import java.util.Set; import java.util.SortedMap; import java.util.stream.Collectors; @@ -49,6 +52,7 @@ public class BranchManager { public static final String BRANCH_PREFIX = "branch-"; public static final String DEFAULT_MAIN_BRANCH = "main"; + public static final String MAIN_BRANCH_FILE = "MAIN-BRANCH"; private final FileIO fileIO; private final Path tablePath; @@ -69,6 +73,12 @@ public BranchManager( this.schemaManager = schemaManager; } + /** Commit specify branch to main. */ + public void commitMainBranch(String branchName) throws IOException { + Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE); + fileIO.overwriteFileUtf8(mainBranchFile, branchName); + } + /** Return the root Directory of branch. */ public Path branchDirectory() { return new Path(tablePath + "/branch"); @@ -79,13 +89,45 @@ public static boolean isMainBranch(String branch) { } /** Return the path string of a branch. */ - public static String getBranchPath(Path tablePath, String branchName) { - return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branchName; + public static String getBranchPath(FileIO fileIO, Path tablePath, String branch) { + if (isMainBranch(branch)) { + Path path = new Path(tablePath, MAIN_BRANCH_FILE); + try { + if (fileIO.exists(path)) { + String data = fileIO.readFileUtf8(path); + if (StringUtils.isBlank(data)) { + return tablePath.toString(); + } else { + return tablePath.toString() + "/branch/" + BRANCH_PREFIX + data; + } + } else { + return tablePath.toString(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branch; + } + + public String defaultMainBranch() { + Path path = new Path(tablePath, MAIN_BRANCH_FILE); + try { + if (fileIO.exists(path)) { + String data = fileIO.readFileUtf8(path); + if (!StringUtils.isBlank(data)) { + return data; + } + } + return DEFAULT_MAIN_BRANCH; + } catch (IOException e) { + throw new RuntimeException(e); + } } /** Return the path of a branch. */ public Path branchPath(String branchName) { - return new Path(getBranchPath(tablePath, branchName)); + return new Path(getBranchPath(fileIO, tablePath, branchName)); } /** Create empty branch. */ @@ -111,7 +153,7 @@ public void createBranch(String branchName) { throw new RuntimeException( String.format( "Exception occurs when create branch '%s' (directory in %s).", - branchName, getBranchPath(tablePath, branchName)), + branchName, getBranchPath(fileIO, tablePath, branchName)), e); } } @@ -143,17 +185,17 @@ public void createBranch(String branchName, long snapshotId) { throw new RuntimeException( String.format( "Exception occurs when create branch '%s' (directory in %s).", - branchName, getBranchPath(tablePath, branchName)), + branchName, getBranchPath(fileIO, tablePath, branchName)), e); } } public void createBranch(String branchName, String tagName) { + String mainBranch = defaultMainBranch(); checkArgument( !isMainBranch(branchName), String.format( - "Branch name '%s' is the default branch and cannot be used.", - DEFAULT_MAIN_BRANCH)); + "Branch name '%s' is the default branch and cannot be used.", mainBranch)); checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName); checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName); checkArgument(tagManager.tagExists(tagName), "Tag name '%s' not exists.", tagName); @@ -179,7 +221,7 @@ public void createBranch(String branchName, String tagName) { throw new RuntimeException( String.format( "Exception occurs when create branch '%s' (directory in %s).", - branchName, getBranchPath(tablePath, branchName)), + branchName, getBranchPath(fileIO, tablePath, branchName)), e); } } @@ -193,11 +235,109 @@ public void deleteBranch(String branchName) { LOG.info( String.format( "Deleting the branch failed due to an exception in deleting the directory %s. Please try again.", - getBranchPath(tablePath, branchName)), + getBranchPath(fileIO, tablePath, branchName)), e); } } + /** Replace specify branch to main branch. */ + public void replaceBranch(String branchName) { + String mainBranch = defaultMainBranch(); + checkArgument( + !isMainBranch(branchName), + String.format( + "Branch name '%s' is the default main branch and cannot be replaced repeatedly.", + mainBranch)); + checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName); + checkArgument(branchExists(branchName), "Branch name '%s' not exists.", branchName); + try { + // 0. Cache previous tag,snapshot,schema directory. + Path tagDirectory = tagManager.tagDirectory(); + Path snapshotDirectory = snapshotManager.snapshotDirectory(); + Path schemaDirectory = schemaManager.schemaDirectory(); + // 1. Calculate and copy the snapshots, tags and schemas which should be copied from the + // main to branch. + calculateCopyMainToBranch(branchName); + // 2. Update the Main Branch File to the target branch. + commitMainBranch(branchName); + // 3.Drop the previous main branch, including snapshots, tags and schemas. + dropPreviousMainBranch(tagDirectory, snapshotDirectory, schemaDirectory); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** Calculate copy main branch to target branch. */ + private void calculateCopyMainToBranch(String branchName) throws IOException { + TableBranch fromBranch = + this.branches().stream() + .filter(branch -> branch.getBranchName().equals(branchName)) + .findFirst() + .orElse(null); + if (fromBranch == null) { + throw new RuntimeException(String.format("No branches found %s", branchName)); + } + Snapshot fromSnapshot = snapshotManager.snapshot(fromBranch.getCreatedFromSnapshot()); + // Copy tags. + List tags = tagManager.allTagNames(); + for (String tagName : tags) { + if (tagManager.copyWithBranch(branchName).tagExists(tagName)) { + // If it already exists, skip it directly. + continue; + } + Snapshot snapshot = tagManager.taggedSnapshot(tagName); + if (snapshot.id() < fromSnapshot.id()) { + fileIO.copyFileUtf8( + tagManager.tagPath(tagName), + tagManager.copyWithBranch(branchName).tagPath(tagName)); + } + } + // Copy snapshots. + Iterator snapshots = snapshotManager.snapshots(); + while (snapshots.hasNext()) { + Snapshot snapshot = snapshots.next(); + if (snapshotManager.copyWithBranch(branchName).snapshotExists(snapshot.id())) { + // If it already exists, skip it directly. + continue; + } + if (snapshot.id() < fromSnapshot.id()) { + fileIO.copyFileUtf8( + snapshotManager.snapshotPath(snapshot.id()), + snapshotManager.copyWithBranch(branchName).snapshotPath(snapshot.id())); + } + } + + // Copy schemas. + List schemaIds = schemaManager.listAllIds(); + Set existsSchemas = + new HashSet<>(schemaManager.copyWithBranch(branchName).listAllIds()); + for (Long schemaId : schemaIds) { + TableSchema tableSchema = schemaManager.schema(schemaId); + if (existsSchemas.contains(schemaId)) { + // If it already exists, skip it directly. + continue; + } + if (tableSchema.id() < fromSnapshot.schemaId()) { + fileIO.copyFileUtf8( + schemaManager.toSchemaPath(schemaId), + schemaManager.copyWithBranch(branchName).toSchemaPath(schemaId)); + } + } + } + + /** Directly delete snapshot, tag , schema directory. */ + private void dropPreviousMainBranch( + Path tagDirectory, Path snapshotDirectory, Path schemaDirectory) throws IOException { + // Delete tags. + fileIO.delete(tagDirectory, true); + + // Delete snapshots. + fileIO.delete(snapshotDirectory, true); + + // Delete schemas. + fileIO.delete(schemaDirectory, true); + } + /** Check if path exists. */ public boolean fileExists(Path path) { try { @@ -246,8 +386,7 @@ public List branches() { } FileStoreTable branchTable = FileStoreTableFactory.create( - fileIO, new Path(getBranchPath(tablePath, branchName))); - + fileIO, new Path(getBranchPath(fileIO, tablePath, branchName))); SortedMap> snapshotTags = branchTable.tagManager().tags(); Long earliestSnapshotId = branchTable.snapshotManager().earliestSnapshotId(); if (snapshotTags.isEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 63679b86acea..9813debe4761 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -47,7 +47,6 @@ import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.BranchManager.getBranchPath; -import static org.apache.paimon.utils.BranchManager.isMainBranch; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; /** Manager for {@link Snapshot}, providing utility methods related to paths and snapshot hints. */ @@ -90,35 +89,27 @@ public Path tablePath() { } public Path changelogDirectory() { - return isMainBranch(branch) - ? new Path(tablePath + "/changelog") - : new Path(getBranchPath(tablePath, branch) + "/changelog"); + return new Path(getBranchPath(fileIO, tablePath, branch) + "/changelog"); } public Path longLivedChangelogPath(long snapshotId) { - return isMainBranch(branch) - ? new Path(tablePath + "/changelog/" + CHANGELOG_PREFIX + snapshotId) - : new Path( - getBranchPath(tablePath, branch) - + "/changelog/" - + CHANGELOG_PREFIX - + snapshotId); + return new Path( + getBranchPath(fileIO, tablePath, branch) + + "/changelog/" + + CHANGELOG_PREFIX + + snapshotId); } public Path snapshotPath(long snapshotId) { - return isMainBranch(branch) - ? new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId) - : new Path( - getBranchPath(tablePath, branch) - + "/snapshot/" - + SNAPSHOT_PREFIX - + snapshotId); + return new Path( + getBranchPath(fileIO, tablePath, branch) + + "/snapshot/" + + SNAPSHOT_PREFIX + + snapshotId); } public Path snapshotDirectory() { - return isMainBranch(branch) - ? new Path(tablePath + "/snapshot") - : new Path(getBranchPath(tablePath, branch) + "/snapshot"); + return new Path(getBranchPath(fileIO, tablePath, branch) + "/snapshot"); } public Snapshot snapshot(long snapshotId) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index c96bbdd568a7..78f70a4e3912 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -47,7 +47,6 @@ import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.BranchManager.getBranchPath; -import static org.apache.paimon.utils.BranchManager.isMainBranch; import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -79,16 +78,12 @@ public TagManager copyWithBranch(String branchName) { /** Return the root Directory of tags. */ public Path tagDirectory() { - return isMainBranch(branch) - ? new Path(tablePath + "/tag") - : new Path(getBranchPath(tablePath, branch) + "/tag"); + return new Path(getBranchPath(fileIO, tablePath, branch) + "/tag"); } - /** Return the path of a tag in branch. */ + /** Return the path of a tag. */ public Path tagPath(String tagName) { - return isMainBranch(branch) - ? new Path(tablePath + "/tag/" + TAG_PREFIX + tagName) - : new Path(getBranchPath(tablePath, branch) + "/tag/" + TAG_PREFIX + tagName); + return new Path(getBranchPath(fileIO, tablePath, branch) + "/tag/" + TAG_PREFIX + tagName); } /** Create a tag from given snapshot and save it in the storage. */ @@ -237,7 +232,7 @@ private void doClean( taggedSnapshot, tagDeletion.manifestSkippingSet(skippedSnapshots)); } - /** Check if a branch tag exists. */ + /** Check if a tag exists. */ public boolean tagExists(String tagName) { Path path = tagPath(tagName); try { @@ -286,11 +281,18 @@ public SortedMap> tags() { * @throws RuntimeException if an IOException occurs during retrieval of snapshots. */ public SortedMap> tags(Predicate filter) { + return tagsWithBranch(filter, null); + } + + public SortedMap> tagsWithBranch( + Predicate filter, String branchName) { TreeMap> tags = new TreeMap<>(Comparator.comparingLong(Snapshot::id)); try { + + Path tagDirectory = tagDirectory(); List paths = - listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX) + listVersionedFileStatus(fileIO, tagDirectory, TAG_PREFIX) .map(FileStatus::getPath) .collect(Collectors.toList()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java new file mode 100644 index 000000000000..10ef4a67ae87 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.Table; + +import org.apache.flink.table.procedure.ProcedureContext; + +/** + * Replace branch procedure for given branch. Usage: + * + *

+ *  CALL sys.replace_branch('tableId', 'branchName')
+ * 
+ */ +public class ReplaceBranchProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "replace_branch"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + public String[] call(ProcedureContext procedureContext, String tableId, String branchName) + throws Catalog.TableNotExistException { + return innerCall(tableId, branchName); + } + + private String[] innerCall(String tableId, String branchName) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + table.replaceBranch(branchName); + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 848dd317d43b..33a43009d69b 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -51,3 +51,4 @@ org.apache.paimon.flink.procedure.privilege.DropPrivilegedUserProcedure org.apache.paimon.flink.procedure.privilege.GrantPrivilegeToUserProcedure org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure org.apache.paimon.flink.procedure.RepairProcedure +org.apache.paimon.flink.procedure.ReplaceBranchProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java index 007d1ac5c147..209b0d2e7bda 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java @@ -21,15 +21,22 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.StreamWriteBuilder; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.TableScan; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.junit.Assert; import org.junit.jupiter.api.Test; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init; import static org.assertj.core.api.Assertions.assertThat; @@ -115,7 +122,6 @@ void testCreateAndDeleteBranchWithSnapshotId() throws Exception { "CALL sys.create_branch('%s.%s', 'branch_name_with_snapshotId', 2)", database, tableName)); assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isTrue(); - branchManager.branches(); callProcedure( String.format( @@ -163,4 +169,90 @@ void testCreateAndDeleteEmptyBranch() throws Exception { database, tableName)); assertThat(branchManager.branchExists("empty_branch_name")).isFalse(); } + + @Test + void testReplaceBranch() throws Exception { + init(warehouse); + RowType rowType = + RowType.of( + new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()}, + new String[] {"k", "v"}); + FileStoreTable table = + createFileStoreTable( + rowType, + Collections.emptyList(), + Collections.singletonList("k"), + Collections.emptyList(), + Collections.emptyMap()); + + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + + // 3 snapshots + writeData(rowData(1L, BinaryString.fromString("Hi"))); + writeData(rowData(2L, BinaryString.fromString("Hello"))); + writeData(rowData(3L, BinaryString.fromString("Paimon"))); + + // Create tag2 + TagManager tagManager = new TagManager(table.fileIO(), table.location()); + callProcedure( + String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); + assertThat(tagManager.tagExists("tag2")).isTrue(); + + // Create replace_branch_name branch + BranchManager branchManager = table.branchManager(); + callProcedure( + String.format( + "CALL sys.create_branch('%s.%s', 'replace_branch_name', 'tag2')", + database, tableName)); + assertThat(branchManager.branchExists("replace_branch_name")).isTrue(); + + // Replace branch + callProcedure( + String.format( + "CALL sys.replace_branch('%s.%s', 'replace_branch_name')", + database, tableName)); + + // Check snapshot + SnapshotManager snapshotManager = table.snapshotManager(); + assertThat(snapshotManager.snapshotExists(3)).isFalse(); + + // Renew write + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + + // Add data, forward to replace branch + for (long i = 4; i < 14; i++) { + writeData(rowData(i, BinaryString.fromString(String.format("new.data_%s", i)))); + } + + ReadBuilder readBuilder = table.newReadBuilder(); + TableScan.Plan plan = readBuilder.newScan().plan(); + List result = + getResult( + readBuilder.newRead(), + plan == null ? Collections.emptyList() : plan.splits(), + rowType); + List sortedActual = new ArrayList<>(result); + List expected = + Arrays.asList( + "+I[1, Hi]", + "+I[2, Hello]", + "+I[4, new.data_4]", + "+I[5, new.data_5]", + "+I[6, new.data_6]", + "+I[7, new.data_7]", + "+I[8, new.data_8]", + "+I[9, new.data_9]", + "+I[10, new.data_10]", + "+I[11, new.data_11]", + "+I[12, new.data_12]", + "+I[13, new.data_13]"); + Assert.assertEquals(expected, sortedActual); + + callProcedure( + String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)", database, tableName)); + assertThat(tagManager.tagExists("tag3")).isTrue(); + } }