Skip to content

Commit

Permalink
Implement merge/replace branch in BranchManager #2153
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 3, 2024
1 parent 6a2c24d commit a5de1b2
Show file tree
Hide file tree
Showing 11 changed files with 479 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,28 @@ public Optional<TableSchema> latest(String branchName) {
}
}

/** List all schema with branch. */
public List<TableSchema> listAllWithBranch(String branchName) {
return listAllIdsWithBranch(branchName).stream()
.map(this::schema)
.collect(Collectors.toList());
}

/** List all schema. */
public List<TableSchema> listAll() {
return listAllIds().stream().map(this::schema).collect(Collectors.toList());
}

/** List all schema IDs with branch. */
public List<Long> listAllIdsWithBranch(String branchName) {
try {
return listVersionedFiles(fileIO, branchSchemaDirectory(branchName), SCHEMA_PREFIX)
.collect(Collectors.toList());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

/** List all schema IDs. */
public List<Long> listAllIds() {
try {
Expand Down Expand Up @@ -482,22 +499,25 @@ public static TableSchema fromPath(FileIO fileIO, Path path) {
}
}

private Path schemaDirectory() {
return new Path(tableRoot + "/schema");
public Path schemaDirectory() {
return branchSchemaDirectory(DEFAULT_MAIN_BRANCH);
}

@VisibleForTesting
public Path toSchemaPath(long id) {
return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id);
return branchSchemaPath(DEFAULT_MAIN_BRANCH, id);
}

public Path branchSchemaDirectory(String branchName) {
return new Path(getBranchPath(tableRoot, branchName) + "/schema");
return new Path(getBranchPath(fileIO, tableRoot, branchName) + "/schema");
}

public Path branchSchemaPath(String branchName, long schemaId) {
return new Path(
getBranchPath(tableRoot, branchName) + "/schema/" + SCHEMA_PREFIX + schemaId);
getBranchPath(fileIO, tableRoot, branchName)
+ "/schema/"
+ SCHEMA_PREFIX
+ schemaId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,11 @@ public void deleteBranch(String branchName) {
branchManager().deleteBranch(branchName);
}

@Override
public void replaceBranch(String fromBranch) {
branchManager().replaceBranch(fromBranch);
}

@Override
public void rollbackTo(String tagName) {
TagManager tagManager = tagManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ default void deleteBranch(String branchName) {
this.getClass().getSimpleName()));
}

@Override
default void replaceBranch(String fromBranch) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support replaceBranch.",
this.getClass().getSimpleName()));
}

@Override
default ExpireSnapshots newExpireSnapshots() {
throw new UnsupportedOperationException(
Expand Down
3 changes: 3 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public interface Table extends Serializable {
@Experimental
void deleteBranch(String branchName);

@Experimental
void replaceBranch(String fromBranch);

/** Manually expire snapshots, parameters can be controlled independently of table options. */
@Experimental
ExpireSnapshots newExpireSnapshots();
Expand Down
38 changes: 38 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/tag/TableTag.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.tag;

/** {@link TableTag} has tag relevant information for table. */
public class TableTag {
private final String tagName;
private final long createTime;

public TableTag(String tagName, Long createTime) {
this.tagName = tagName;
this.createTime = createTime;
}

public String getTagName() {
return tagName;
}

public long getCreateTime() {
return createTime;
}
}
158 changes: 153 additions & 5 deletions paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,20 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.tag.TableTag;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.SortedMap;
import java.util.stream.Collectors;

Expand All @@ -45,6 +50,7 @@ public class BranchManager {

public static final String BRANCH_PREFIX = "branch-";
public static final String DEFAULT_MAIN_BRANCH = "main";
public static final String MAIN_BRANCH_FILE = "MAIN-BRANCH";

private final FileIO fileIO;
private final Path tablePath;
Expand All @@ -65,19 +71,36 @@ public BranchManager(
this.schemaManager = schemaManager;
}

/** Commit specify branch to main. */
public void commitMainBranch(String branchName) throws IOException {
Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE);
fileIO.delete(mainBranchFile, false);
fileIO.overwriteFileUtf8(mainBranchFile, branchName);
}

/** Return the root Directory of branch. */
public Path branchDirectory() {
return new Path(tablePath + "/branch");
}

/** Return the path string of a branch. */
public static String getBranchPath(Path tablePath, String branchName) {
public static String getBranchPath(FileIO fileIO, Path tablePath, String branchName) {
if (StringUtils.isBlank(branchName)) {
return tablePath.toString();
}
if (branchName.equals(DEFAULT_MAIN_BRANCH)) {
branchName = forwardBranchName(fileIO, tablePath, branchName);
}
// No main branch replacement has occurred.
if (branchName.equals(DEFAULT_MAIN_BRANCH)) {
return tablePath.toString();
}
return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branchName;
}

/** Return the path of a branch. */
public Path branchPath(String branchName) {
return new Path(getBranchPath(tablePath, branchName));
return new Path(getBranchPath(fileIO, tablePath, branchName));
}

public void createBranch(String branchName, String tagName) {
Expand Down Expand Up @@ -110,7 +133,7 @@ public void createBranch(String branchName, String tagName) {
throw new RuntimeException(
String.format(
"Exception occurs when create branch '%s' (directory in %s).",
branchName, getBranchPath(tablePath, branchName)),
branchName, getBranchPath(fileIO, tablePath, branchName)),
e);
}
}
Expand All @@ -124,11 +147,114 @@ public void deleteBranch(String branchName) {
LOG.info(
String.format(
"Deleting the branch failed due to an exception in deleting the directory %s. Please try again.",
getBranchPath(tablePath, branchName)),
getBranchPath(fileIO, tablePath, branchName)),
e);
}
}

/** Replace specify branch to main branch. */
public void replaceBranch(String branchName) {
checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName);
checkArgument(branchExists(branchName), "Branch name '%s' not exists.", branchName);
try {
// 0. Cache previous tag,snapshot,schema directory.
Path tagDirectory = tagManager.tagDirectory();
Path snapshotDirectory = snapshotManager.snapshotDirectory();
Path schemaDirectory = schemaManager.schemaDirectory();
// 1. Calculate and copy the snapshots, tags and schemas which should be copied from the
// main branch to target branch.
calculateCopyMainBranchToTargetBranch(branchName);
// 2. Update the Main Branch File to the target branch.
updateMainBranchToTargetBranch(branchName);
// 3.Drop the previous main branch, including snapshots, tags and schemas.
dropPreviousMainBranch(tagDirectory, snapshotDirectory, schemaDirectory);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/** Calculate copy main branch to target branch. */
private void calculateCopyMainBranchToTargetBranch(String branchName) throws IOException {
TableBranch fromBranch =
this.branches().stream()
.filter(branch -> branch.getBranchName().equals(branchName))
.findFirst()
.orElse(null);
if (fromBranch == null) {
throw new RuntimeException(String.format("No branches found %s", branchName));
}
// Copy tags.
List<TableTag> tags = tagManager.tableTags();
TableTag fromTag =
tags.stream()
.filter(
tableTag ->
tableTag.getTagName()
.equals(fromBranch.getCreatedFromTag()))
.findFirst()
.get();
for (TableTag tag : tags) {
if (tagManager.branchTagExists(branchName, tag.getTagName())) {
// If it already exists, skip it directly.
continue;
}
if (tag.getCreateTime() < fromTag.getCreateTime()) {
fileIO.copyFileUtf8(
tagManager.tagPath(tag.getTagName()),
tagManager.branchTagPath(branchName, tag.getTagName()));
}
}
// Copy snapshots.
Iterator<Snapshot> snapshots = snapshotManager.snapshots();
Snapshot fromSnapshot = snapshotManager.snapshot(fromBranch.getCreatedFromSnapshot());
while (snapshots.hasNext()) {
Snapshot snapshot = snapshots.next();
if (snapshotManager.branchSnapshotExists(branchName, snapshot.id())) {
// If it already exists, skip it directly.
continue;
}
if (snapshot.id() < fromSnapshot.id()) {
fileIO.copyFileUtf8(
snapshotManager.snapshotPath(snapshot.id()),
snapshotManager.branchSnapshotPath(branchName, snapshot.id()));
}
}

// Copy schemas.
List<Long> schemaIds = schemaManager.listAllIds();
Set<Long> existsSchemas = new HashSet<>(schemaManager.listAllIdsWithBranch(branchName));
for (Long schemaId : schemaIds) {
TableSchema tableSchema = schemaManager.schema(schemaId);
if (existsSchemas.contains(schemaId)) {
// If it already exists, skip it directly.
continue;
}
if (tableSchema.id() < fromSnapshot.schemaId()) {
fileIO.copyFileUtf8(
schemaManager.toSchemaPath(schemaId),
schemaManager.branchSchemaPath(branchName, schemaId));
}
}
}

/** Update main branch to target branch. */
private void updateMainBranchToTargetBranch(String branchName) throws IOException {
commitMainBranch(branchName);
}

/** Directly delete snapshot, tag , schema directory. */
private void dropPreviousMainBranch(
Path tagDirectory, Path snapshotDirectory, Path schemaDirectory) throws IOException {
// Delete tags.
fileIO.delete(tagDirectory, true);

// Delete snapshots.
fileIO.delete(snapshotDirectory, true);

// Delete schemas.
fileIO.delete(schemaDirectory, true);
}

/** Check if path exists. */
public boolean fileExists(Path path) {
try {
Expand Down Expand Up @@ -169,7 +295,7 @@ public List<TableBranch> branches() {
String branchName = path.getLeft().getName().substring(BRANCH_PREFIX.length());
FileStoreTable branchTable =
FileStoreTableFactory.create(
fileIO, new Path(getBranchPath(tablePath, branchName)));
fileIO, new Path(getBranchPath(fileIO, tablePath, branchName)));
SortedMap<Snapshot, List<String>> snapshotTags = branchTable.tagManager().tags();
checkArgument(!snapshotTags.isEmpty());
Snapshot snapshot = snapshotTags.firstKey();
Expand All @@ -184,4 +310,26 @@ public List<TableBranch> branches() {
throw new RuntimeException(e);
}
}

/** Forward branch name. */
public static String forwardBranchName(FileIO fileIO, Path tablePath, String branchName) {
if (branchName.equals(DEFAULT_MAIN_BRANCH)) {
Path path = new Path(tablePath, MAIN_BRANCH_FILE);
try {
if (fileIO.exists(path)) {
String data = fileIO.readFileUtf8(path);
if (StringUtils.isBlank(data)) {
return DEFAULT_MAIN_BRANCH;
} else {
return data;
}
} else {
return DEFAULT_MAIN_BRANCH;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return branchName;
}
}
Loading

0 comments on commit a5de1b2

Please sign in to comment.