Skip to content

Commit

Permalink
[core] Changelog decouple supports delta files (apache#3407)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi authored Jun 11, 2024
1 parent a530d96 commit d35b1ee
Show file tree
Hide file tree
Showing 17 changed files with 222 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ public SnapshotDeletion newSnapshotDeletion() {
manifestFileFactory().create(),
manifestListFactory().create(),
newIndexFileHandler(),
newStatsFileHandler());
newStatsFileHandler(),
options.changelogProducer() != CoreOptions.ChangelogProducer.NONE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,14 @@ public void cleanUnusedManifestList(String manifestName, Set<String> skippingSet
}

protected void cleanUnusedManifests(
Snapshot snapshot, Set<String> skippingSet, boolean deleteChangelog) {
cleanUnusedManifestList(snapshot.baseManifestList(), skippingSet);
cleanUnusedManifestList(snapshot.deltaManifestList(), skippingSet);
Snapshot snapshot,
Set<String> skippingSet,
boolean deleteDataManifestLists,
boolean deleteChangelog) {
if (deleteDataManifestLists) {
cleanUnusedManifestList(snapshot.baseManifestList(), skippingSet);
cleanUnusedManifestList(snapshot.deltaManifestList(), skippingSet);
}
if (deleteChangelog && snapshot.changelogManifestList() != null) {
cleanUnusedManifestList(snapshot.changelogManifestList(), skippingSet);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,25 +212,72 @@ private Map<String, Path> getCandidateDeletingFiles() {

private List<String> getUsedFilesForChangelog(Changelog changelog) {
List<String> files = new ArrayList<>();
if (changelog.changelogManifestList() != null) {
files.add(changelog.changelogManifestList());
}

List<ManifestFileMeta> manifestFileMetas = new ArrayList<>();
try {
// try to read manifests
List<ManifestFileMeta> manifestFileMetas =
retryReadingFiles(
() ->
manifestList.readWithIOException(
changelog.changelogManifestList()));
if (manifestFileMetas == null) {
return Collections.emptyList();
// changelog manifest
List<ManifestFileMeta> changelogManifest = new ArrayList<>();
if (changelog.changelogManifestList() != null) {
files.add(changelog.changelogManifestList());
changelogManifest =
retryReadingFiles(
() ->
manifestList.readWithIOException(
changelog.changelogManifestList()));
if (changelogManifest != null) {
manifestFileMetas.addAll(changelogManifest);
}
}
List<String> manifestFileName =

// base manifest
if (manifestList.exists(changelog.baseManifestList())) {
files.add(changelog.baseManifestList());
List<ManifestFileMeta> baseManifest =
retryReadingFiles(
() ->
manifestList.readWithIOException(
changelog.baseManifestList()));
if (baseManifest != null) {
manifestFileMetas.addAll(baseManifest);
}
}

// delta manifest
List<ManifestFileMeta> deltaManifest = null;
if (manifestList.exists(changelog.deltaManifestList())) {
files.add(changelog.deltaManifestList());
deltaManifest =
retryReadingFiles(
() ->
manifestList.readWithIOException(
changelog.deltaManifestList()));
if (deltaManifest != null) {
manifestFileMetas.addAll(deltaManifest);
}
}

files.addAll(
manifestFileMetas.stream()
.map(ManifestFileMeta::fileName)
.collect(Collectors.toList());
files.addAll(manifestFileName);
.collect(Collectors.toList()));

// data file
List<String> manifestFileName = new ArrayList<>();
if (changelog.changelogManifestList() != null) {
manifestFileName.addAll(
changelogManifest == null
? new ArrayList<>()
: changelogManifest.stream()
.map(ManifestFileMeta::fileName)
.collect(Collectors.toList()));
} else {
manifestFileName.addAll(
deltaManifest == null
? new ArrayList<>()
: deltaManifest.stream()
.map(ManifestFileMeta::fileName)
.collect(Collectors.toList()));
}

// try to read data files
List<String> dataFiles = retryReadingDataFiles(manifestFileName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
Expand All @@ -39,24 +40,45 @@
/** Delete snapshot files. */
public class SnapshotDeletion extends FileDeletionBase<Snapshot> {

private final boolean produceChangelog;

public SnapshotDeletion(
FileIO fileIO,
FileStorePathFactory pathFactory,
ManifestFile manifestFile,
ManifestList manifestList,
IndexFileHandler indexFileHandler,
StatsFileHandler statsFileHandler) {
StatsFileHandler statsFileHandler,
boolean produceChangelog) {
super(fileIO, pathFactory, manifestFile, manifestList, indexFileHandler, statsFileHandler);
this.produceChangelog = produceChangelog;
}

@Override
public void cleanUnusedDataFiles(Snapshot snapshot, Predicate<ManifestEntry> skipper) {
cleanUnusedDataFiles(snapshot.deltaManifestList(), skipper);
if (changelogDecoupled && !produceChangelog) {
// Skip clean the 'APPEND' data files.If we do not have the file source information
// eg: the old version table file, we just skip clean this here, let it done by
// ExpireChangelogImpl
Predicate<ManifestEntry> enriched =
manifestEntry ->
skipper.test(manifestEntry)
|| (manifestEntry.file().fileSource().orElse(FileSource.APPEND)
== FileSource.APPEND);
cleanUnusedDataFiles(snapshot.deltaManifestList(), enriched);
} else {
cleanUnusedDataFiles(snapshot.deltaManifestList(), skipper);
}
}

@Override
public void cleanUnusedManifests(Snapshot snapshot, Set<String> skippingSet) {
cleanUnusedManifests(snapshot, skippingSet, !changelogDecoupled);
// delay clean the base and delta manifest lists when changelog decoupled enabled
cleanUnusedManifests(
snapshot,
skippingSet,
!changelogDecoupled || produceChangelog,
!changelogDecoupled);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void cleanUnusedDataFiles(Snapshot taggedSnapshot, Predicate<ManifestEntr
@Override
public void cleanUnusedManifests(Snapshot taggedSnapshot, Set<String> skippingSet) {
// doesn't clean changelog files because they are handled by SnapshotDeletion
cleanUnusedManifests(taggedSnapshot, skippingSet, false);
cleanUnusedManifests(taggedSnapshot, skippingSet, true, false);
}

public Predicate<ManifestEntry> dataFileSkipper(Snapshot fromSnapshot) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ private RollbackHelper rollbackHelper() {
tagManager(),
fileIO,
store().newSnapshotDeletion(),
store().newChangelogDeletion(),
store().newTagDeletion());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.ChangelogDeletion;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.utils.SnapshotManager;
Expand Down Expand Up @@ -51,18 +52,21 @@ public class RollbackHelper {
private final TagManager tagManager;
private final FileIO fileIO;
private final SnapshotDeletion snapshotDeletion;
private final ChangelogDeletion changelogDeletion;
private final TagDeletion tagDeletion;

public RollbackHelper(
SnapshotManager snapshotManager,
TagManager tagManager,
FileIO fileIO,
SnapshotDeletion snapshotDeletion,
ChangelogDeletion changelogDeletion,
TagDeletion tagDeletion) {
this.snapshotManager = snapshotManager;
this.tagManager = tagManager;
this.fileIO = fileIO;
this.snapshotDeletion = snapshotDeletion;
this.changelogDeletion = changelogDeletion;
this.tagDeletion = tagDeletion;
}

Expand All @@ -72,24 +76,26 @@ public void cleanLargerThan(Snapshot retainedSnapshot) {
List<Snapshot> cleanedSnapshots = cleanSnapshotsDataFiles(retainedSnapshot);
List<Changelog> cleanedChangelogs = cleanLongLivedChangelogDataFiles(retainedSnapshot);
List<Snapshot> cleanedTags = cleanTagsDataFiles(retainedSnapshot);
Set<Long> cleanedIds = new HashSet<>();

// clean manifests
// this can be used for snapshots and tags manifests cleaning both
Set<String> manifestsSkippingSet = snapshotDeletion.manifestSkippingSet(retainedSnapshot);

for (Snapshot snapshot : cleanedSnapshots) {
snapshotDeletion.cleanUnusedManifests(snapshot, manifestsSkippingSet);
cleanedIds.add(snapshot.id());
}

for (Changelog changelog : cleanedChangelogs) {
if (changelog.changelogManifestList() != null) {
snapshotDeletion.cleanUnusedManifestList(
changelog.changelogManifestList(), new HashSet<>());
}
changelogDeletion.cleanUnusedManifests(changelog, manifestsSkippingSet);
cleanedIds.add(changelog.id());
}

cleanedTags.removeAll(cleanedSnapshots);
for (Snapshot snapshot : cleanedTags) {
if (cleanedIds.contains(snapshot.id())) {
continue;
}
tagDeletion.cleanUnusedManifests(snapshot, manifestsSkippingSet);
}

Expand Down Expand Up @@ -122,7 +128,9 @@ private List<Snapshot> cleanSnapshotsDataFiles(Snapshot retainedSnapshot) {
// when deleting non-existing data files
for (Snapshot snapshot : toBeCleaned) {
snapshotDeletion.deleteAddedDataFiles(snapshot.deltaManifestList());
snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList());
if (snapshot.changelogManifestList() != null) {
snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList());
}
}

// delete directories
Expand All @@ -149,9 +157,8 @@ private List<Changelog> cleanLongLivedChangelogDataFiles(Snapshot retainedSnapsh

// delete data files of changelog
for (Changelog changelog : toBeCleaned) {
if (changelog.changelogManifestList() != null) {
snapshotDeletion.deleteAddedDataFiles(changelog.changelogManifestList());
}
// clean the deleted file
changelogDeletion.cleanUnusedDataFiles(changelog, manifestEntry -> false);
}

// delete directories
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ protected StartingScanner createStartingScanner(boolean isStreaming) {
return new ContinuousFromSnapshotStartingScanner(
snapshotManager,
consumer.get().nextSnapshot(),
options.changelogProducer() != ChangelogProducer.NONE,
options.changelogLifecycleDecoupled());
}
}
Expand Down Expand Up @@ -152,7 +151,6 @@ protected StartingScanner createStartingScanner(boolean isStreaming) {
? new ContinuousFromTimestampStartingScanner(
snapshotManager,
startupMillis,
options.changelogProducer() != ChangelogProducer.NONE,
options.changelogLifecycleDecoupled())
: new StaticFromTimestampStartingScanner(snapshotManager, startupMillis);
case FROM_FILE_CREATION_TIME:
Expand All @@ -164,7 +162,6 @@ protected StartingScanner createStartingScanner(boolean isStreaming) {
? new ContinuousFromSnapshotStartingScanner(
snapshotManager,
options.scanSnapshotId(),
options.changelogProducer() != ChangelogProducer.NONE,
options.changelogLifecycleDecoupled())
: new StaticFromSnapshotStartingScanner(
snapshotManager, options.scanSnapshotId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,13 @@
*/
public class ContinuousFromSnapshotStartingScanner extends AbstractStartingScanner {

private final boolean changelogAsFollowup;
private final boolean changelogDecoupled;

public ContinuousFromSnapshotStartingScanner(
SnapshotManager snapshotManager,
long snapshotId,
boolean changelogAsFollowup,
boolean changelogDecoupled) {
SnapshotManager snapshotManager, long snapshotId, boolean changelogDecoupled) {
super(snapshotManager);
this.startingSnapshotId = snapshotId;
this.changelogDecoupled = changelogDecoupled;
this.changelogAsFollowup = changelogAsFollowup;
}

@Override
Expand All @@ -54,7 +49,7 @@ public Result scan(SnapshotReader snapshotReader) {

private Long getEarliestId() {
Long earliestId;
if (changelogAsFollowup && changelogDecoupled) {
if (changelogDecoupled) {
Long earliestChangelogId = snapshotManager.earliestLongLivedChangelogId();
earliestId =
earliestChangelogId == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,10 @@ public class ContinuousFromTimestampStartingScanner extends AbstractStartingScan
private final boolean startFromChangelog;

public ContinuousFromTimestampStartingScanner(
SnapshotManager snapshotManager,
long startupMillis,
boolean changelogAsFollowup,
boolean changelogDecoupled) {
SnapshotManager snapshotManager, long startupMillis, boolean changelogDecoupled) {
super(snapshotManager);
this.startupMillis = startupMillis;
this.startFromChangelog = changelogAsFollowup && changelogDecoupled;
this.startFromChangelog = changelogDecoupled;
this.startingSnapshotId =
this.snapshotManager.earlierThanTimeMills(startupMillis, startFromChangelog);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,13 @@ public class NextSnapshotFetcher {
public static final Logger LOG = LoggerFactory.getLogger(NextSnapshotFetcher.class);
private final SnapshotManager snapshotManager;
private final boolean changelogDecoupled;
// Only support changelog as follow-up now.
private final boolean changelogAsFollowup;

public NextSnapshotFetcher(
SnapshotManager snapshotManager,
boolean changelogDecoupled,
boolean changelogAsFollowup) {
this.snapshotManager = snapshotManager;
this.changelogDecoupled = changelogDecoupled;
this.changelogAsFollowup = changelogAsFollowup;
}

@Nullable
Expand All @@ -59,9 +56,7 @@ public Snapshot getNextSnapshot(long nextSnapshotId) {
return null;
}

if (!changelogAsFollowup
|| !changelogDecoupled
|| !snapshotManager.longLivedChangelogExists(nextSnapshotId)) {
if (!changelogDecoupled || !snapshotManager.longLivedChangelogExists(nextSnapshotId)) {
throw new OutOfRangeException(
String.format(
"The snapshot with id %d has expired. You can: "
Expand Down
Loading

0 comments on commit d35b1ee

Please sign in to comment.