diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index e389a471c4ec..6c3b5592841a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -74,7 +74,6 @@ import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.apache.paimon.index.HashIndexFile.HASH_INDEX; -import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; /** * Default implementation of {@link FileStoreCommit}. @@ -751,10 +750,7 @@ public boolean tryCommitOnce( @Nullable String newStatsFileName) { long newSnapshotId = latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshot.id() + 1; - Path newSnapshotPath = - branchName.equals(DEFAULT_MAIN_BRANCH) - ? snapshotManager.snapshotPath(newSnapshotId) - : snapshotManager.branchSnapshotPath(branchName, newSnapshotId); + Path newSnapshotPath = snapshotManager.snapshotPath(branchName, newSnapshotId); if (LOG.isDebugEnabled()) { LOG.debug("Ready to commit table files to snapshot #" + newSnapshotId); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index d94da91ef4c5..a1b9cc342ecc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -115,6 +115,16 @@ public List listAll() { return listAllIds().stream().map(this::schema).collect(Collectors.toList()); } + /** List all schema IDs with branch. */ + public List listAllIdsWithBranch(String branchName) { + try { + return listVersionedFiles(fileIO, branchSchemaDirectory(branchName), SCHEMA_PREFIX) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + /** List all schema IDs. */ public List listAllIds() { try { @@ -497,22 +507,25 @@ public static TableSchema fromPath(FileIO fileIO, Path path) { } } - private Path schemaDirectory() { - return new Path(tableRoot + "/schema"); + public Path schemaDirectory() { + return branchSchemaDirectory(DEFAULT_MAIN_BRANCH); } @VisibleForTesting public Path toSchemaPath(long id) { - return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id); + return branchSchemaPath(DEFAULT_MAIN_BRANCH, id); } public Path branchSchemaDirectory(String branchName) { - return new Path(getBranchPath(tableRoot, branchName) + "/schema"); + return new Path(getBranchPath(fileIO, tableRoot, branchName) + "/schema"); } public Path branchSchemaPath(String branchName, long schemaId) { return new Path( - getBranchPath(tableRoot, branchName) + "/schema/" + SCHEMA_PREFIX + schemaId); + getBranchPath(fileIO, tableRoot, branchName) + + "/schema/" + + SCHEMA_PREFIX + + schemaId); } /** diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 7d4a4a7c27dc..206f54546422 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -544,6 +544,11 @@ public void deleteBranch(String branchName) { branchManager().deleteBranch(branchName); } + @Override + public void replaceBranch(String fromBranch) { + branchManager().replaceBranch(fromBranch); + } + @Override public void rollbackTo(String tagName) { TagManager tagManager = tagManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index 42bea3f6813e..dcb62dfcbea6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -182,6 +182,14 @@ default void deleteBranch(String branchName) { this.getClass().getSimpleName())); } + @Override + default void replaceBranch(String fromBranch) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support replaceBranch.", + this.getClass().getSimpleName())); + } + @Override default ExpireSnapshots newExpireSnapshots() { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 876908394f2d..d01ecc95cdb2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -111,6 +111,9 @@ public interface Table extends Serializable { @Experimental void deleteBranch(String branchName); + @Experimental + void replaceBranch(String fromBranch); + /** Manually expire snapshots, parameters can be controlled independently of table options. */ @Experimental ExpireSnapshots newExpireSnapshots(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index f3f06f89208a..70f9553e0abc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -33,9 +33,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.PriorityQueue; +import java.util.Set; import java.util.SortedMap; import java.util.stream.Collectors; @@ -49,6 +52,7 @@ public class BranchManager { public static final String BRANCH_PREFIX = "branch-"; public static final String DEFAULT_MAIN_BRANCH = "main"; + public static final String MAIN_BRANCH_FILE = "MAIN-BRANCH"; private final FileIO fileIO; private final Path tablePath; @@ -69,19 +73,57 @@ public BranchManager( this.schemaManager = schemaManager; } + /** Commit specify branch to main. */ + public void commitMainBranch(String branchName) throws IOException { + Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE); + fileIO.overwriteFileUtf8(mainBranchFile, branchName); + } + /** Return the root Directory of branch. */ public Path branchDirectory() { return new Path(tablePath + "/branch"); } /** Return the path string of a branch. */ - public static String getBranchPath(Path tablePath, String branchName) { + public static String getBranchPath(FileIO fileIO, Path tablePath, String branchName) { + if (branchName.equals(DEFAULT_MAIN_BRANCH)) { + Path path = new Path(tablePath, MAIN_BRANCH_FILE); + try { + if (fileIO.exists(path)) { + String data = fileIO.readFileUtf8(path); + if (StringUtils.isBlank(data)) { + return tablePath.toString(); + } else { + return tablePath.toString() + "/branch/" + BRANCH_PREFIX + data; + } + } else { + return tablePath.toString(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branchName; } + public String defaultMainBranch() { + Path path = new Path(tablePath, MAIN_BRANCH_FILE); + try { + if (fileIO.exists(path)) { + String data = fileIO.readFileUtf8(path); + if (!StringUtils.isBlank(data)) { + return data; + } + } + return DEFAULT_MAIN_BRANCH; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + /** Return the path of a branch. */ public Path branchPath(String branchName) { - return new Path(getBranchPath(tablePath, branchName)); + return new Path(getBranchPath(fileIO, tablePath, branchName)); } /** Create empty branch. */ @@ -106,7 +148,7 @@ public void createBranch(String branchName) { throw new RuntimeException( String.format( "Exception occurs when create branch '%s' (directory in %s).", - branchName, getBranchPath(tablePath, branchName)), + branchName, getBranchPath(fileIO, tablePath, branchName)), e); } } @@ -130,7 +172,7 @@ public void createBranch(String branchName, long snapshotId) { // Copy the corresponding snapshot and schema files into the branch directory fileIO.copyFileUtf8( snapshotManager.snapshotPath(snapshotId), - snapshotManager.branchSnapshotPath(branchName, snapshot.id())); + snapshotManager.snapshotPath(branchName, snapshot.id())); fileIO.copyFileUtf8( schemaManager.toSchemaPath(snapshot.schemaId()), schemaManager.branchSchemaPath(branchName, snapshot.schemaId())); @@ -138,17 +180,17 @@ public void createBranch(String branchName, long snapshotId) { throw new RuntimeException( String.format( "Exception occurs when create branch '%s' (directory in %s).", - branchName, getBranchPath(tablePath, branchName)), + branchName, getBranchPath(fileIO, tablePath, branchName)), e); } } public void createBranch(String branchName, String tagName) { + String mainBranch = defaultMainBranch(); checkArgument( - !branchName.equals(DEFAULT_MAIN_BRANCH), + !branchName.equals(mainBranch), String.format( - "Branch name '%s' is the default branch and cannot be used.", - DEFAULT_MAIN_BRANCH)); + "Branch name '%s' is the default branch and cannot be used.", mainBranch)); checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName); checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName); checkArgument(tagManager.tagExists(tagName), "Tag name '%s' not exists.", tagName); @@ -162,10 +204,10 @@ public void createBranch(String branchName, String tagName) { try { // Copy the corresponding tag, snapshot and schema files into the branch directory fileIO.copyFileUtf8( - tagManager.tagPath(tagName), tagManager.branchTagPath(branchName, tagName)); + tagManager.tagPath(tagName), tagManager.tagPath(branchName, tagName)); fileIO.copyFileUtf8( snapshotManager.snapshotPath(snapshot.id()), - snapshotManager.branchSnapshotPath(branchName, snapshot.id())); + snapshotManager.snapshotPath(branchName, snapshot.id())); fileIO.copyFileUtf8( schemaManager.toSchemaPath(snapshot.schemaId()), schemaManager.branchSchemaPath(branchName, snapshot.schemaId())); @@ -173,7 +215,7 @@ public void createBranch(String branchName, String tagName) { throw new RuntimeException( String.format( "Exception occurs when create branch '%s' (directory in %s).", - branchName, getBranchPath(tablePath, branchName)), + branchName, getBranchPath(fileIO, tablePath, branchName)), e); } } @@ -187,11 +229,107 @@ public void deleteBranch(String branchName) { LOG.info( String.format( "Deleting the branch failed due to an exception in deleting the directory %s. Please try again.", - getBranchPath(tablePath, branchName)), + getBranchPath(fileIO, tablePath, branchName)), e); } } + /** Replace specify branch to main branch. */ + public void replaceBranch(String branchName) { + String mainBranch = defaultMainBranch(); + checkArgument( + !branchName.equals(mainBranch), + String.format( + "Branch name '%s' is the default main branch and cannot be replaced repeatedly.", + mainBranch)); + checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName); + checkArgument(branchExists(branchName), "Branch name '%s' not exists.", branchName); + try { + // 0. Cache previous tag,snapshot,schema directory. + Path tagDirectory = tagManager.tagDirectory(); + Path snapshotDirectory = snapshotManager.snapshotDirectory(); + Path schemaDirectory = schemaManager.schemaDirectory(); + // 1. Calculate and copy the snapshots, tags and schemas which should be copied from the + // main branch to target branch. + calculateCopyMainToBranch(branchName); + // 2. Update the Main Branch File to the target branch. + commitMainBranch(branchName); + // 3.Drop the previous main branch, including snapshots, tags and schemas. + dropPreviousMainBranch(tagDirectory, snapshotDirectory, schemaDirectory); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** Calculate copy main branch to target branch. */ + private void calculateCopyMainToBranch(String branchName) throws IOException { + TableBranch fromBranch = + this.branches().stream() + .filter(branch -> branch.getBranchName().equals(branchName)) + .findFirst() + .orElse(null); + if (fromBranch == null) { + throw new RuntimeException(String.format("No branches found %s", branchName)); + } + Snapshot fromSnapshot = snapshotManager.snapshot(fromBranch.getCreatedFromSnapshot()); + // Copy tags. + List tags = tagManager.allTagNames(); + for (String tagName : tags) { + if (tagManager.tagExists(branchName, tagName)) { + // If it already exists, skip it directly. + continue; + } + Snapshot snapshot = tagManager.taggedSnapshot(tagName); + if (snapshot.id() < fromSnapshot.id()) { + fileIO.copyFileUtf8( + tagManager.tagPath(tagName), tagManager.tagPath(branchName, tagName)); + } + } + // Copy snapshots. + Iterator snapshots = snapshotManager.snapshots(); + while (snapshots.hasNext()) { + Snapshot snapshot = snapshots.next(); + if (snapshotManager.snapshotExists(branchName, snapshot.id())) { + // If it already exists, skip it directly. + continue; + } + if (snapshot.id() < fromSnapshot.id()) { + fileIO.copyFileUtf8( + snapshotManager.snapshotPath(snapshot.id()), + snapshotManager.snapshotPath(branchName, snapshot.id())); + } + } + + // Copy schemas. + List schemaIds = schemaManager.listAllIds(); + Set existsSchemas = new HashSet<>(schemaManager.listAllIdsWithBranch(branchName)); + for (Long schemaId : schemaIds) { + TableSchema tableSchema = schemaManager.schema(schemaId); + if (existsSchemas.contains(schemaId)) { + // If it already exists, skip it directly. + continue; + } + if (tableSchema.id() < fromSnapshot.schemaId()) { + fileIO.copyFileUtf8( + schemaManager.toSchemaPath(schemaId), + schemaManager.branchSchemaPath(branchName, schemaId)); + } + } + } + + /** Directly delete snapshot, tag , schema directory. */ + private void dropPreviousMainBranch( + Path tagDirectory, Path snapshotDirectory, Path schemaDirectory) throws IOException { + // Delete tags. + fileIO.delete(tagDirectory, true); + + // Delete snapshots. + fileIO.delete(snapshotDirectory, true); + + // Delete schemas. + fileIO.delete(schemaDirectory, true); + } + /** Check if path exists. */ public boolean fileExists(Path path) { try { @@ -239,8 +377,7 @@ public List branches() { } FileStoreTable branchTable = FileStoreTableFactory.create( - fileIO, new Path(getBranchPath(tablePath, branchName))); - + fileIO, new Path(getBranchPath(fileIO, tablePath, branchName))); SortedMap> snapshotTags = branchTable.tagManager().tags(); Long earliestSnapshotId = branchTable.snapshotManager().earliestSnapshotId(); if (snapshotTags.isEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index dbbc8fffdc05..5a629d2479ba 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -78,7 +78,11 @@ public Path tablePath() { } public Path snapshotDirectory() { - return new Path(tablePath + "/snapshot"); + return snapshotDirectory(DEFAULT_MAIN_BRANCH); + } + + public Path snapshotDirectory(String branchName) { + return new Path(getBranchPath(fileIO, tablePath, branchName) + "/snapshot"); } public Path changelogDirectory() { @@ -90,28 +94,15 @@ public Path longLivedChangelogPath(long snapshotId) { } public Path snapshotPath(long snapshotId) { - return new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId); + return snapshotPath(DEFAULT_MAIN_BRANCH, snapshotId); } - public Path branchSnapshotDirectory(String branchName) { - return new Path(getBranchPath(tablePath, branchName) + "/snapshot"); - } - - public Path branchSnapshotPath(String branchName, long snapshotId) { + public Path snapshotPath(String branchName, long snapshotId) { return new Path( - getBranchPath(tablePath, branchName) + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId); - } - - public Path snapshotPathByBranch(String branchName, long snapshotId) { - return branchName.equals(DEFAULT_MAIN_BRANCH) - ? snapshotPath(snapshotId) - : branchSnapshotPath(branchName, snapshotId); - } - - public Path snapshotDirByBranch(String branchName) { - return branchName.equals(DEFAULT_MAIN_BRANCH) - ? snapshotDirectory() - : branchSnapshotDirectory(branchName); + getBranchPath(fileIO, tablePath, branchName) + + "/snapshot/" + + SNAPSHOT_PREFIX + + snapshotId); } public Snapshot snapshot(long snapshotId) { @@ -128,10 +119,21 @@ public Changelog longLivedChangelog(long snapshotId) { } public Snapshot snapshot(String branchName, long snapshotId) { - Path snapshotPath = snapshotPathByBranch(branchName, snapshotId); + Path snapshotPath = snapshotPath(branchName, snapshotId); return Snapshot.fromPath(fileIO, snapshotPath); } + public boolean snapshotExists(String branchName, long snapshotId) { + Path path = snapshotPath(branchName, snapshotId); + try { + return fileIO.exists(path); + } catch (IOException e) { + throw new RuntimeException( + "Failed to determine if snapshot #" + snapshotId + " exists in path " + path, + e); + } + } + public boolean snapshotExists(long snapshotId) { Path path = snapshotPath(snapshotId); try { @@ -169,7 +171,7 @@ public boolean longLivedChangelogExists(long snapshotId) { public @Nullable Long latestSnapshotId(String branchName) { try { - return findLatest(snapshotDirByBranch(branchName), SNAPSHOT_PREFIX, this::snapshotPath); + return findLatest(snapshotDirectory(branchName), SNAPSHOT_PREFIX, this::snapshotPath); } catch (IOException e) { throw new RuntimeException("Failed to find latest snapshot id", e); } @@ -207,8 +209,7 @@ public boolean longLivedChangelogExists(long snapshotId) { public @Nullable Long earliestSnapshotId(String branchName) { try { - return findEarliest( - snapshotDirByBranch(branchName), SNAPSHOT_PREFIX, this::snapshotPath); + return findEarliest(snapshotDirectory(branchName), SNAPSHOT_PREFIX, this::snapshotPath); } catch (IOException e) { throw new RuntimeException("Failed to find earliest snapshot id", e); } @@ -602,7 +603,7 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { } public Long readHint(String fileName) { - return readHint(fileName, snapshotDirByBranch(DEFAULT_MAIN_BRANCH)); + return readHint(fileName, snapshotDirectory(DEFAULT_MAIN_BRANCH)); } public Long readHint(String fileName, Path dir) { @@ -633,7 +634,7 @@ public void commitLatestHint(long snapshotId) throws IOException { } public void commitLatestHint(long snapshotId, String branchName) throws IOException { - commitHint(snapshotId, LATEST, snapshotDirByBranch(branchName)); + commitHint(snapshotId, LATEST, snapshotDirectory(branchName)); } public void commitLongLivedChangelogLatestHint(long snapshotId) throws IOException { @@ -649,7 +650,7 @@ public void commitEarliestHint(long snapshotId) throws IOException { } public void commitEarliestHint(long snapshotId, String branchName) throws IOException { - commitHint(snapshotId, EARLIEST, snapshotDirByBranch(branchName)); + commitHint(snapshotId, EARLIEST, snapshotDirectory(branchName)); } private void commitHint(long snapshotId, String fileName, Path dir) throws IOException { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 8b7818fed782..138e3ff9e154 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -45,6 +45,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.BranchManager.getBranchPath; import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -66,17 +67,22 @@ public TagManager(FileIO fileIO, Path tablePath) { /** Return the root Directory of tags. */ public Path tagDirectory() { - return new Path(tablePath + "/tag"); + return tagDirectory(DEFAULT_MAIN_BRANCH); + } + + public Path tagDirectory(String branchName) { + return new Path(getBranchPath(fileIO, tablePath, branchName) + "/tag"); } /** Return the path of a tag. */ public Path tagPath(String tagName) { - return new Path(tablePath + "/tag/" + TAG_PREFIX + tagName); + return tagPath(DEFAULT_MAIN_BRANCH, tagName); } /** Return the path of a tag in branch. */ - public Path branchTagPath(String branchName, String tagName) { - return new Path(getBranchPath(tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName); + public Path tagPath(String branchName, String tagName) { + return new Path( + getBranchPath(fileIO, tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName); } /** Create a tag from given snapshot and save it in the storage. */ @@ -226,6 +232,19 @@ private void doClean( taggedSnapshot, tagDeletion.manifestSkippingSet(skippedSnapshots)); } + /** Check if a tag exists in branch. */ + public boolean tagExists(String branchName, String tagName) { + Path path = tagPath(branchName, tagName); + try { + return fileIO.exists(path); + } catch (IOException e) { + throw new RuntimeException( + String.format( + "Failed to determine if tag '%s' exists in path %s.", tagName, path), + e); + } + } + /** Check if a tag exists. */ public boolean tagExists(String tagName) { Path path = tagPath(tagName); @@ -276,11 +295,19 @@ public SortedMap> tags() { * @throws RuntimeException if an IOException occurs during retrieval of snapshots. */ public SortedMap> tags(Predicate filter) { + return tagsWithBranch(filter, null); + } + + public SortedMap> tagsWithBranch( + Predicate filter, String branchName) { TreeMap> tags = new TreeMap<>(Comparator.comparingLong(Snapshot::id)); try { + + Path tagDirectory = + StringUtils.isBlank(branchName) ? tagDirectory() : tagDirectory(branchName); List paths = - listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX) + listVersionedFileStatus(fileIO, tagDirectory, TAG_PREFIX) .map(FileStatus::getPath) .collect(Collectors.toList()); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index 138a30d5bf87..abd7c7752793 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -1019,15 +1019,14 @@ public void testCreateBranch() throws Exception { // verify test-tag in test-branch is equal to snapshot 2 Snapshot branchTag = Snapshot.fromPath( - new TraceableFileIO(), tagManager.branchTagPath("test-branch", "test-tag")); + new TraceableFileIO(), tagManager.tagPath("test-branch", "test-tag")); assertThat(branchTag.equals(snapshot2)).isTrue(); // verify snapshot in test-branch is equal to snapshot 2 SnapshotManager snapshotManager = new SnapshotManager(new TraceableFileIO(), tablePath); Snapshot branchSnapshot = Snapshot.fromPath( - new TraceableFileIO(), - snapshotManager.branchSnapshotPath("test-branch", 2)); + new TraceableFileIO(), snapshotManager.snapshotPath("test-branch", 2)); assertThat(branchSnapshot.equals(snapshot2)).isTrue(); // verify schema in test-branch is equal to schema 0 diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java new file mode 100644 index 000000000000..10ef4a67ae87 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.Table; + +import org.apache.flink.table.procedure.ProcedureContext; + +/** + * Replace branch procedure for given branch. Usage: + * + *

+ *  CALL sys.replace_branch('tableId', 'branchName')
+ * 
+ */ +public class ReplaceBranchProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "replace_branch"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + public String[] call(ProcedureContext procedureContext, String tableId, String branchName) + throws Catalog.TableNotExistException { + return innerCall(tableId, branchName); + } + + private String[] innerCall(String tableId, String branchName) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + table.replaceBranch(branchName); + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index fe463161beb2..50c913a33c82 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -44,4 +44,5 @@ org.apache.paimon.flink.procedure.MigrateDatabaseProcedure org.apache.paimon.flink.procedure.MigrateFileProcedure org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure org.apache.paimon.flink.procedure.QueryServiceProcedure -org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure \ No newline at end of file +org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure +org.apache.paimon.flink.procedure.ReplaceBranchProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java index 4f4f314966bb..dc245e4d4574 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java @@ -21,15 +21,22 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.StreamWriteBuilder; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.TableScan; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.junit.Assert; import org.junit.jupiter.api.Test; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init; import static org.assertj.core.api.Assertions.assertThat; @@ -113,7 +120,6 @@ void testCreateAndDeleteBranchWithSnapshotId() throws Exception { "CALL sys.create_branch('%s.%s', 'branch_name_with_snapshotId', 2)", database, tableName)); assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isTrue(); - branchManager.branches(); callProcedure( String.format( @@ -160,4 +166,89 @@ void testCreateAndDeleteEmptyBranch() throws Exception { database, tableName)); assertThat(branchManager.branchExists("empty_branch_name")).isFalse(); } + + @Test + void testReplaceBranchToTargetBranch() throws Exception { + init(warehouse); + RowType rowType = + RowType.of( + new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()}, + new String[] {"k", "v"}); + FileStoreTable table = + createFileStoreTable( + rowType, + Collections.emptyList(), + Collections.singletonList("k"), + Collections.emptyMap()); + + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + + // 3 snapshots + writeData(rowData(1L, BinaryString.fromString("Hi"))); + writeData(rowData(2L, BinaryString.fromString("Hello"))); + writeData(rowData(3L, BinaryString.fromString("Paimon"))); + + // Create tag2 + TagManager tagManager = new TagManager(table.fileIO(), table.location()); + callProcedure( + String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); + assertThat(tagManager.tagExists("tag2")).isTrue(); + + // Create replace_branch_name branch + BranchManager branchManager = table.branchManager(); + callProcedure( + String.format( + "CALL sys.create_branch('%s.%s', 'replace_branch_name', 'tag2')", + database, tableName)); + assertThat(branchManager.branchExists("replace_branch_name")).isTrue(); + + // Replace branch + callProcedure( + String.format( + "CALL sys.replace_branch('%s.%s', 'replace_branch_name')", + database, tableName)); + + // Check snapshot + SnapshotManager snapshotManager = table.snapshotManager(); + assertThat(snapshotManager.snapshotExists(3)).isFalse(); + + // Renew write + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + + // Add data, forward to replace branch + for (long i = 4; i < 14; i++) { + writeData(rowData(i, BinaryString.fromString(String.format("new.data_%s", i)))); + } + + ReadBuilder readBuilder = table.newReadBuilder(); + TableScan.Plan plan = readBuilder.newScan().plan(); + List result = + getResult( + readBuilder.newRead(), + plan == null ? Collections.emptyList() : plan.splits(), + rowType); + List sortedActual = new ArrayList<>(result); + List expected = + Arrays.asList( + "+I[1, Hi]", + "+I[2, Hello]", + "+I[4, new.data_4]", + "+I[5, new.data_5]", + "+I[6, new.data_6]", + "+I[7, new.data_7]", + "+I[8, new.data_8]", + "+I[9, new.data_9]", + "+I[10, new.data_10]", + "+I[11, new.data_11]", + "+I[12, new.data_12]", + "+I[13, new.data_13]"); + Assert.assertEquals(expected, sortedActual); + + callProcedure( + String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)", database, tableName)); + assertThat(tagManager.tagExists("tag3")).isTrue(); + } }