diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 9a841ef7bfd9..59e12e45af3a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -109,13 +109,6 @@ public Optional latest(String branchName) { } } - /** List all schema with branch. */ - public List listAllWithBranch(String branchName) { - return listAllIdsWithBranch(branchName).stream() - .map(this::schema) - .collect(Collectors.toList()); - } - /** List all schema. */ public List listAll() { return listAllIds().stream().map(this::schema).collect(Collectors.toList()); diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TableTag.java b/paimon-core/src/main/java/org/apache/paimon/tag/TableTag.java deleted file mode 100644 index 9d82fbedcec1..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TableTag.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.tag; - -/** {@link TableTag} has tag relevant information for table. */ -public class TableTag { - private final String tagName; - private final long createTime; - - public TableTag(String tagName, Long createTime) { - this.tagName = tagName; - this.createTime = createTime; - } - - public String getTagName() { - return tagName; - } - - public long getCreateTime() { - return createTime; - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 04e7ba4364be..7560534b8af2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -26,7 +26,6 @@ import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; -import org.apache.paimon.tag.TableTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,11 +85,21 @@ public Path branchDirectory() { /** Return the path string of a branch. */ public static String getBranchPath(FileIO fileIO, Path tablePath, String branchName) { if (branchName.equals(DEFAULT_MAIN_BRANCH)) { - branchName = forwardBranchName(fileIO, tablePath, branchName); - } - // No main branch replacement has occurred. - if (branchName.equals(DEFAULT_MAIN_BRANCH)) { - return tablePath.toString(); + Path path = new Path(tablePath, MAIN_BRANCH_FILE); + try { + if (fileIO.exists(path)) { + String data = fileIO.readFileUtf8(path); + if (StringUtils.isBlank(data)) { + return tablePath.toString(); + } else { + return tablePath.toString() + "/branch/" + BRANCH_PREFIX + data; + } + } else { + return tablePath.toString(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } } return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branchName; } @@ -160,9 +169,9 @@ public void replaceBranch(String branchName) { Path schemaDirectory = schemaManager.schemaDirectory(); // 1. Calculate and copy the snapshots, tags and schemas which should be copied from the // main branch to target branch. - calculateCopyMainBranchToTargetBranch(branchName); + calculateCopyMainToBranch(branchName); // 2. Update the Main Branch File to the target branch. - updateMainBranchToTargetBranch(branchName); + commitMainBranch(branchName); // 3.Drop the previous main branch, including snapshots, tags and schemas. dropPreviousMainBranch(tagDirectory, snapshotDirectory, schemaDirectory); } catch (IOException e) { @@ -171,7 +180,7 @@ public void replaceBranch(String branchName) { } /** Calculate copy main branch to target branch. */ - private void calculateCopyMainBranchToTargetBranch(String branchName) throws IOException { + private void calculateCopyMainToBranch(String branchName) throws IOException { TableBranch fromBranch = this.branches().stream() .filter(branch -> branch.getBranchName().equals(branchName)) @@ -180,30 +189,22 @@ private void calculateCopyMainBranchToTargetBranch(String branchName) throws IOE if (fromBranch == null) { throw new RuntimeException(String.format("No branches found %s", branchName)); } + Snapshot fromSnapshot = snapshotManager.snapshot(fromBranch.getCreatedFromSnapshot()); // Copy tags. - List tags = tagManager.tableTags(); - TableTag fromTag = - tags.stream() - .filter( - tableTag -> - tableTag.getTagName() - .equals(fromBranch.getCreatedFromTag())) - .findFirst() - .get(); - for (TableTag tag : tags) { - if (tagManager.branchTagExists(branchName, tag.getTagName())) { + List tags = tagManager.allTagNames(); + for (String tagName : tags) { + if (tagManager.tagExists(branchName, tagName)) { // If it already exists, skip it directly. continue; } - if (tag.getCreateTime() < fromTag.getCreateTime()) { + Snapshot snapshot = tagManager.taggedSnapshot(tagName); + if (snapshot.id() < fromSnapshot.id()) { fileIO.copyFileUtf8( - tagManager.tagPath(tag.getTagName()), - tagManager.tagPath(branchName, tag.getTagName())); + tagManager.tagPath(tagName), tagManager.tagPath(branchName, tagName)); } } // Copy snapshots. Iterator snapshots = snapshotManager.snapshots(); - Snapshot fromSnapshot = snapshotManager.snapshot(fromBranch.getCreatedFromSnapshot()); while (snapshots.hasNext()) { Snapshot snapshot = snapshots.next(); if (snapshotManager.snapshotExists(branchName, snapshot.id())) { @@ -234,11 +235,6 @@ private void calculateCopyMainBranchToTargetBranch(String branchName) throws IOE } } - /** Update main branch to target branch. */ - private void updateMainBranchToTargetBranch(String branchName) throws IOException { - commitMainBranch(branchName); - } - /** Directly delete snapshot, tag , schema directory. */ private void dropPreviousMainBranch( Path tagDirectory, Path snapshotDirectory, Path schemaDirectory) throws IOException { @@ -307,26 +303,4 @@ public List branches() { throw new RuntimeException(e); } } - - /** Forward branch name. */ - public static String forwardBranchName(FileIO fileIO, Path tablePath, String branchName) { - if (branchName.equals(DEFAULT_MAIN_BRANCH)) { - Path path = new Path(tablePath, MAIN_BRANCH_FILE); - try { - if (fileIO.exists(path)) { - String data = fileIO.readFileUtf8(path); - if (StringUtils.isBlank(data)) { - return DEFAULT_MAIN_BRANCH; - } else { - return data; - } - } else { - return DEFAULT_MAIN_BRANCH; - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return branchName; - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 66d90c4ea4bb..bd97a6a57be5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -252,13 +252,6 @@ public Iterator snapshots() throws IOException { .iterator(); } - public Iterator snapshotsWithBranch(String branchName) throws IOException { - return listVersionedFiles(fileIO, snapshotDirectory(branchName), SNAPSHOT_PREFIX) - .map(snapshotId -> snapshot(branchName, snapshotId)) - .sorted(Comparator.comparingLong(Snapshot::id)) - .iterator(); - } - /** * If {@link FileNotFoundException} is thrown when reading the snapshot file, this snapshot may * be deleted by other processes, so just skip this snapshot. @@ -466,12 +459,6 @@ private Long findByListFiles(BinaryOperator reducer, String branchName) .orElse(null); } - public void deleteLatestHint(String branchName) throws IOException { - Path snapshotDir = snapshotDirectory(branchName); - Path hintFile = new Path(snapshotDir, LATEST); - fileIO.delete(hintFile, false); - } - public void commitLatestHint(long snapshotId) throws IOException { commitLatestHint(snapshotId, DEFAULT_MAIN_BRANCH); } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 6af17ec8444e..b320819b507b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -26,7 +26,6 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.table.sink.TagCallback; -import org.apache.paimon.tag.TableTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +35,6 @@ import java.util.Collection; import java.util.Comparator; import java.util.List; -import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; import java.util.function.Predicate; @@ -199,8 +197,8 @@ private void doClean( taggedSnapshot, tagDeletion.manifestSkippingSet(skippedSnapshots)); } - /** Check if a tag exists. */ - public boolean branchTagExists(String branchName, String tagName) { + /** Check if a tag exists in branch. */ + public boolean tagExists(String branchName, String tagName) { Path path = tagPath(branchName, tagName); try { return fileIO.exists(path); @@ -249,11 +247,6 @@ public SortedMap> tags() { return tags(tagName -> true); } - /** Get all tagged snapshots with names sorted by snapshot id. */ - public SortedMap> tagsWithBranch(String branchName) { - return tagsWithBranch(tagName -> true, branchName); - } - /** * Retrieves a sorted map of snapshots filtered based on a provided predicate. The predicate * determines which tag names should be included in the result. Only snapshots with tag names @@ -322,34 +315,6 @@ public List allTagNames() { return tags().values().stream().flatMap(Collection::stream).collect(Collectors.toList()); } - public List tableTags() { - return branchTableTags(DEFAULT_MAIN_BRANCH); - } - - public List branchTableTags(String branchName) { - List tags = new ArrayList<>(); - try { - - Path tagDirectory = - branchName.equals(DEFAULT_MAIN_BRANCH) - ? tagDirectory() - : tagDirectory(branchName); - - List> paths = - listVersionedFileStatus(fileIO, tagDirectory, TAG_PREFIX) - .map(status -> Pair.of(status.getPath(), status.getModificationTime())) - .collect(Collectors.toList()); - - for (Map.Entry path : paths) { - String tagName = path.getKey().getName().substring(TAG_PREFIX.length()); - tags.add(new TableTag(tagName, path.getValue())); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - return tags; - } - private int findIndex(Snapshot taggedSnapshot, List taggedSnapshots) { for (int i = 0; i < taggedSnapshots.size(); i++) { if (taggedSnapshot.id() == taggedSnapshots.get(i).id()) {