Skip to content

Commit

Permalink
optimized code
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 7, 2024
1 parent 97bf9e7 commit 06b4805
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,6 @@ public Optional<TableSchema> latest(String branchName) {
}
}

/** List all schema with branch. */
public List<TableSchema> listAllWithBranch(String branchName) {
return listAllIdsWithBranch(branchName).stream()
.map(this::schema)
.collect(Collectors.toList());
}

/** List all schema. */
public List<TableSchema> listAll() {
return listAllIds().stream().map(this::schema).collect(Collectors.toList());
Expand Down
38 changes: 0 additions & 38 deletions paimon-core/src/main/java/org/apache/paimon/tag/TableTag.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.tag.TableTag;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -86,11 +85,21 @@ public Path branchDirectory() {
/** Return the path string of a branch. */
public static String getBranchPath(FileIO fileIO, Path tablePath, String branchName) {
if (branchName.equals(DEFAULT_MAIN_BRANCH)) {
branchName = forwardBranchName(fileIO, tablePath, branchName);
}
// No main branch replacement has occurred.
if (branchName.equals(DEFAULT_MAIN_BRANCH)) {
return tablePath.toString();
Path path = new Path(tablePath, MAIN_BRANCH_FILE);
try {
if (fileIO.exists(path)) {
String data = fileIO.readFileUtf8(path);
if (StringUtils.isBlank(data)) {
return tablePath.toString();
} else {
return tablePath.toString() + "/branch/" + BRANCH_PREFIX + data;
}
} else {
return tablePath.toString();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branchName;
}
Expand Down Expand Up @@ -160,9 +169,9 @@ public void replaceBranch(String branchName) {
Path schemaDirectory = schemaManager.schemaDirectory();
// 1. Calculate and copy the snapshots, tags and schemas which should be copied from the
// main branch to target branch.
calculateCopyMainBranchToTargetBranch(branchName);
calculateCopyMainToBranch(branchName);
// 2. Update the Main Branch File to the target branch.
updateMainBranchToTargetBranch(branchName);
commitMainBranch(branchName);
// 3.Drop the previous main branch, including snapshots, tags and schemas.
dropPreviousMainBranch(tagDirectory, snapshotDirectory, schemaDirectory);
} catch (IOException e) {
Expand All @@ -171,7 +180,7 @@ public void replaceBranch(String branchName) {
}

/** Calculate copy main branch to target branch. */
private void calculateCopyMainBranchToTargetBranch(String branchName) throws IOException {
private void calculateCopyMainToBranch(String branchName) throws IOException {
TableBranch fromBranch =
this.branches().stream()
.filter(branch -> branch.getBranchName().equals(branchName))
Expand All @@ -180,30 +189,22 @@ private void calculateCopyMainBranchToTargetBranch(String branchName) throws IOE
if (fromBranch == null) {
throw new RuntimeException(String.format("No branches found %s", branchName));
}
Snapshot fromSnapshot = snapshotManager.snapshot(fromBranch.getCreatedFromSnapshot());
// Copy tags.
List<TableTag> tags = tagManager.tableTags();
TableTag fromTag =
tags.stream()
.filter(
tableTag ->
tableTag.getTagName()
.equals(fromBranch.getCreatedFromTag()))
.findFirst()
.get();
for (TableTag tag : tags) {
if (tagManager.branchTagExists(branchName, tag.getTagName())) {
List<String> tags = tagManager.allTagNames();
for (String tagName : tags) {
if (tagManager.tagExists(branchName, tagName)) {
// If it already exists, skip it directly.
continue;
}
if (tag.getCreateTime() < fromTag.getCreateTime()) {
Snapshot snapshot = tagManager.taggedSnapshot(tagName);
if (snapshot.id() < fromSnapshot.id()) {
fileIO.copyFileUtf8(
tagManager.tagPath(tag.getTagName()),
tagManager.tagPath(branchName, tag.getTagName()));
tagManager.tagPath(tagName), tagManager.tagPath(branchName, tagName));
}
}
// Copy snapshots.
Iterator<Snapshot> snapshots = snapshotManager.snapshots();
Snapshot fromSnapshot = snapshotManager.snapshot(fromBranch.getCreatedFromSnapshot());
while (snapshots.hasNext()) {
Snapshot snapshot = snapshots.next();
if (snapshotManager.snapshotExists(branchName, snapshot.id())) {
Expand Down Expand Up @@ -234,11 +235,6 @@ private void calculateCopyMainBranchToTargetBranch(String branchName) throws IOE
}
}

/** Update main branch to target branch. */
private void updateMainBranchToTargetBranch(String branchName) throws IOException {
commitMainBranch(branchName);
}

/** Directly delete snapshot, tag , schema directory. */
private void dropPreviousMainBranch(
Path tagDirectory, Path snapshotDirectory, Path schemaDirectory) throws IOException {
Expand Down Expand Up @@ -307,26 +303,4 @@ public List<TableBranch> branches() {
throw new RuntimeException(e);
}
}

/** Forward branch name. */
public static String forwardBranchName(FileIO fileIO, Path tablePath, String branchName) {
if (branchName.equals(DEFAULT_MAIN_BRANCH)) {
Path path = new Path(tablePath, MAIN_BRANCH_FILE);
try {
if (fileIO.exists(path)) {
String data = fileIO.readFileUtf8(path);
if (StringUtils.isBlank(data)) {
return DEFAULT_MAIN_BRANCH;
} else {
return data;
}
} else {
return DEFAULT_MAIN_BRANCH;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return branchName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,6 @@ public Iterator<Snapshot> snapshots() throws IOException {
.iterator();
}

public Iterator<Snapshot> snapshotsWithBranch(String branchName) throws IOException {
return listVersionedFiles(fileIO, snapshotDirectory(branchName), SNAPSHOT_PREFIX)
.map(snapshotId -> snapshot(branchName, snapshotId))
.sorted(Comparator.comparingLong(Snapshot::id))
.iterator();
}

/**
* If {@link FileNotFoundException} is thrown when reading the snapshot file, this snapshot may
* be deleted by other processes, so just skip this snapshot.
Expand Down Expand Up @@ -466,12 +459,6 @@ private Long findByListFiles(BinaryOperator<Long> reducer, String branchName)
.orElse(null);
}

public void deleteLatestHint(String branchName) throws IOException {
Path snapshotDir = snapshotDirectory(branchName);
Path hintFile = new Path(snapshotDir, LATEST);
fileIO.delete(hintFile, false);
}

public void commitLatestHint(long snapshotId) throws IOException {
commitLatestHint(snapshotId, DEFAULT_MAIN_BRANCH);
}
Expand Down
39 changes: 2 additions & 37 deletions paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TableTag;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,7 +35,6 @@
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.Predicate;
Expand Down Expand Up @@ -199,8 +197,8 @@ private void doClean(
taggedSnapshot, tagDeletion.manifestSkippingSet(skippedSnapshots));
}

/** Check if a tag exists. */
public boolean branchTagExists(String branchName, String tagName) {
/** Check if a tag exists in branch. */
public boolean tagExists(String branchName, String tagName) {
Path path = tagPath(branchName, tagName);
try {
return fileIO.exists(path);
Expand Down Expand Up @@ -249,11 +247,6 @@ public SortedMap<Snapshot, List<String>> tags() {
return tags(tagName -> true);
}

/** Get all tagged snapshots with names sorted by snapshot id. */
public SortedMap<Snapshot, List<String>> tagsWithBranch(String branchName) {
return tagsWithBranch(tagName -> true, branchName);
}

/**
* Retrieves a sorted map of snapshots filtered based on a provided predicate. The predicate
* determines which tag names should be included in the result. Only snapshots with tag names
Expand Down Expand Up @@ -322,34 +315,6 @@ public List<String> allTagNames() {
return tags().values().stream().flatMap(Collection::stream).collect(Collectors.toList());
}

public List<TableTag> tableTags() {
return branchTableTags(DEFAULT_MAIN_BRANCH);
}

public List<TableTag> branchTableTags(String branchName) {
List<TableTag> tags = new ArrayList<>();
try {

Path tagDirectory =
branchName.equals(DEFAULT_MAIN_BRANCH)
? tagDirectory()
: tagDirectory(branchName);

List<Pair<Path, Long>> paths =
listVersionedFileStatus(fileIO, tagDirectory, TAG_PREFIX)
.map(status -> Pair.of(status.getPath(), status.getModificationTime()))
.collect(Collectors.toList());

for (Map.Entry<Path, Long> path : paths) {
String tagName = path.getKey().getName().substring(TAG_PREFIX.length());
tags.add(new TableTag(tagName, path.getValue()));
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return tags;
}

private int findIndex(Snapshot taggedSnapshot, List<Snapshot> taggedSnapshots) {
for (int i = 0; i < taggedSnapshots.size(); i++) {
if (taggedSnapshot.id() == taggedSnapshots.get(i).id()) {
Expand Down

0 comments on commit 06b4805

Please sign in to comment.