From 8c63952557b7c448531f0b3fdbdf7ca2b1c75381 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Sun, 3 Mar 2024 22:49:45 +0800 Subject: [PATCH] remove merge branch --- .../paimon/table/AbstractFileStoreTable.java | 5 - .../apache/paimon/table/ReadonlyTable.java | 8 -- .../java/org/apache/paimon/table/Table.java | 3 - .../apache/paimon/utils/BranchManager.java | 91 ------------------- .../apache/paimon/utils/SnapshotManager.java | 4 - .../flink/procedure/MergeBranchProcedure.java | 54 ----------- .../org.apache.paimon.factories.Factory | 1 - .../flink/action/BranchActionITCase.java | 65 ------------- 8 files changed, 231 deletions(-) delete mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeBranchProcedure.java diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index cf9414a67315..0fcab535a359 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -461,11 +461,6 @@ public void replaceBranch(String fromBranch) { branchManager().replaceBranch(fromBranch); } - @Override - public void mergeBranch(String fromBranch) { - branchManager().mergeBranch(fromBranch); - } - @Override public void rollbackTo(String tagName) { TagManager tagManager = tagManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index 91849a00492b..5c38f35532d3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -157,14 +157,6 @@ default void replaceBranch(String fromBranch) { this.getClass().getSimpleName())); } - @Override - default void mergeBranch(String fromBranch) { - throw new UnsupportedOperationException( - String.format( - "Readonly Table %s does not support mergeBranch.", - this.getClass().getSimpleName())); - } - @Override default ExpireSnapshots newExpireSnapshots() { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 61c4d8193901..8c3b467c8557 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -99,9 +99,6 @@ public interface Table extends Serializable { @Experimental void replaceBranch(String fromBranch); - @Experimental - void mergeBranch(String fromBranch); - /** Manually expire snapshots, parameters can be controlled independently of table options. */ @Experimental ExpireSnapshots newExpireSnapshots(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index a3bebaabe298..96e07805d7e0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -36,7 +36,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.stream.Collectors; @@ -153,96 +152,6 @@ public void deleteBranch(String branchName) { } } - /** Merge specify branch into main. */ - public void mergeBranch(String fromBranch) { - checkArgument(!StringUtils.isBlank(fromBranch), "Branch name '%s' is blank.", fromBranch); - checkArgument(branchExists(fromBranch), "Branch name '%s' not exists.", fromBranch); - try { - TableBranch tableFromBranch = - this.branches().stream() - .filter(branch -> branch.getBranchName().equals(fromBranch)) - .findFirst() - .orElse(null); - if (tableFromBranch == null) { - throw new RuntimeException(String.format("No branches found %s", fromBranch)); - } - if (cleanMainBranch(tableFromBranch)) { - copyBranchToMain(fromBranch); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private void copyBranchToMain(String fromBranch) throws IOException { - // Copy the corresponding tag, snapshot and schema files into the branch directory - SortedMap> tags = tagManager.tagsWithBranch(fromBranch); - - for (Map.Entry> tagEntry : tags.entrySet()) { - for (String tagName : tagEntry.getValue()) { - fileIO.copyFileUtf8( - tagManager.branchTagPath(fromBranch, tagName), tagManager.tagPath(tagName)); - } - } - Iterator snapshotIterator = snapshotManager.snapshotsWithBranch(fromBranch); - while (snapshotIterator.hasNext()) { - Snapshot snapshot = snapshotIterator.next(); - fileIO.copyFileUtf8( - snapshotManager.branchSnapshotPath(fromBranch, snapshot.id()), - snapshotManager.snapshotPath(snapshot.id())); - } - - List schemaIds = schemaManager.listAllIdsWithBranch(fromBranch); - for (Long schemaId : schemaIds) { - fileIO.copyFileUtf8( - schemaManager.branchSchemaPath(fromBranch, schemaId), - schemaManager.toSchemaPath(schemaId)); - } - } - - /** - * Delete all the snapshots, tags and schemas in the main branch that are created after the - * created tag for the branch. - */ - private boolean cleanMainBranch(TableBranch fromBranch) throws IOException { - // clean tags. - List tags = tagManager.tableTags(); - TableTag fromTag = - tags.stream() - .filter( - tableTag -> - tableTag.getTagName() - .equals(fromBranch.getCreatedFromTag())) - .findFirst() - .get(); - for (TableTag tag : tags) { - if (tag.getCreateTime() >= fromTag.getCreateTime()) { - fileIO.delete(tagManager.tagPath(tag.getTagName()), true); - } - } - // clean snapshots. - Iterator snapshots = snapshotManager.snapshots(); - Snapshot fromSnapshot = snapshotManager.snapshot(fromBranch.getCreatedFromSnapshot()); - while (snapshots.hasNext()) { - Snapshot snapshot = snapshots.next(); - if (snapshot.id() >= fromSnapshot.id()) { - fileIO.delete(snapshotManager.snapshotPath(snapshot.id()), true); - } - } - // Clean latest file. - snapshotManager.deleteLatestHint(); - - // clean schemas. - List schemaIds = schemaManager.listAllIds(); - for (Long schemaId : schemaIds) { - TableSchema tableSchema = schemaManager.schema(schemaId); - if (tableSchema.id() >= fromSnapshot.schemaId()) { - fileIO.delete(schemaManager.toSchemaPath(schemaId), true); - } - } - return true; - } - /** Replace specify branch to main branch. */ public void replaceBranch(String branchName) { checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index d321065bc63b..74b143776c52 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -478,10 +478,6 @@ private Long findByListFiles(BinaryOperator reducer, String branchName) .orElse(null); } - public void deleteLatestHint() throws IOException { - deleteLatestHint(DEFAULT_MAIN_BRANCH); - } - public void deleteLatestHint(String branchName) throws IOException { Path snapshotDir = snapshotDirByBranch(branchName); Path hintFile = new Path(snapshotDir, LATEST); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeBranchProcedure.java deleted file mode 100644 index e7eb3eb33bb8..000000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeBranchProcedure.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.procedure; - -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.table.Table; - -import org.apache.flink.table.procedure.ProcedureContext; - -/** - * Merge branch procedure for given branch. Usage: - * - *

- *  CALL sys.merge_branch('tableId', 'branchName')
- * 
- */ -public class MergeBranchProcedure extends ProcedureBase { - - public static final String IDENTIFIER = "merge_branch"; - - @Override - public String identifier() { - return IDENTIFIER; - } - - public String[] call(ProcedureContext procedureContext, String tableId, String branchName) - throws Catalog.TableNotExistException { - return innerCall(tableId, branchName); - } - - private String[] innerCall(String tableId, String branchName) - throws Catalog.TableNotExistException { - Table table = catalog.getTable(Identifier.fromString(tableId)); - table.mergeBranch(branchName); - return new String[] {"Success"}; - } -} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index c182bf4daec3..50c913a33c82 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -45,5 +45,4 @@ org.apache.paimon.flink.procedure.MigrateFileProcedure org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure org.apache.paimon.flink.procedure.QueryServiceProcedure org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure -org.apache.paimon.flink.procedure.MergeBranchProcedure org.apache.paimon.flink.procedure.ReplaceBranchProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java index 06b3d7fbe92f..f661d91e07c4 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java @@ -87,71 +87,6 @@ void testCreateAndDeleteBranch() throws Exception { assertThat(branchManager.branchExists("branch_name")).isFalse(); } - @Test - void testMergeBranchToTargetBranch() throws Exception { - init(warehouse); - - RowType rowType = - RowType.of( - new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()}, - new String[] {"k", "v"}); - FileStoreTable table = - createFileStoreTable( - rowType, - Collections.emptyList(), - Collections.singletonList("k"), - Collections.emptyMap()); - - StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); - write = writeBuilder.newWrite(); - commit = writeBuilder.newCommit(); - - // 3 snapshots - writeData(rowData(1L, BinaryString.fromString("Hi"))); - writeData(rowData(2L, BinaryString.fromString("Hello"))); - writeData(rowData(3L, BinaryString.fromString("Paimon"))); - - TagManager tagManager = new TagManager(table.fileIO(), table.location()); - callProcedure( - String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); - assertThat(tagManager.tagExists("tag2")).isTrue(); - - BranchManager branchManager = table.branchManager(); - callProcedure( - String.format( - "CALL sys.create_branch('%s.%s', 'merge_branch_name', 'tag2')", - database, tableName)); - assertThat(branchManager.branchExists("merge_branch_name")).isTrue(); - - // 4-5 snapshots - writeData(rowData(4L, BinaryString.fromString("new.data_4"))); - writeData(rowData(5L, BinaryString.fromString("new.data_5"))); - - callProcedure( - String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)", database, tableName)); - assertThat(tagManager.tagExists("tag3")).isTrue(); - - callProcedure( - String.format( - "CALL sys.merge_branch('%s.%s', 'merge_branch_name')", - database, tableName)); - assertThat(branchManager.branchExists("merge_branch_name")).isTrue(); - assertThat(tagManager.tagExists("tag3")).isFalse(); - - // Check snapshot after merge branch - SnapshotManager snapshotManager = table.snapshotManager(); - assertThat(snapshotManager.snapshotExists(3)).isFalse(); - assertThat(snapshotManager.snapshotExists(4)).isFalse(); - assertThat(snapshotManager.snapshotExists(5)).isFalse(); - - // 3-4 snapshots - writeData(rowData(6L, BinaryString.fromString("new.data_6"))); - writeData(rowData(7L, BinaryString.fromString("new.data_7"))); - - assertThat(snapshotManager.snapshotExists(3)).isTrue(); - assertThat(snapshotManager.snapshotExists(4)).isTrue(); - } - @Test void testReplaceBranchToTargetBranch() throws Exception { init(warehouse);