From d35b1eec418822ded38c0f4d3ba379d6d287c68a Mon Sep 17 00:00:00 2001 From: Aitozi Date: Tue, 11 Jun 2024 10:17:59 +0800 Subject: [PATCH] [core] Changelog decouple supports delta files (#3407) --- .../org/apache/paimon/AbstractFileStore.java | 3 +- .../paimon/operation/FileDeletionBase.java | 11 ++- .../paimon/operation/OrphanFilesClean.java | 75 ++++++++++++++---- .../paimon/operation/SnapshotDeletion.java | 28 ++++++- .../apache/paimon/operation/TagDeletion.java | 2 +- .../paimon/table/AbstractFileStoreTable.java | 1 + .../apache/paimon/table/RollbackHelper.java | 25 +++--- .../table/source/AbstractDataTableScan.java | 3 - ...ContinuousFromSnapshotStartingScanner.java | 9 +-- ...ontinuousFromTimestampStartingScanner.java | 7 +- .../paimon/utils/NextSnapshotFetcher.java | 7 +- .../java/org/apache/paimon/TestFileStore.java | 76 +++++++++++++++++-- .../paimon/operation/ExpireSnapshotsTest.java | 2 +- .../operation/OrphanFilesCleanTest.java | 9 ++- .../table/PrimaryKeyFileStoreTableTest.java | 12 +-- ...nuousFromTimestampStartingScannerTest.java | 23 +++--- .../flink/ContinuousFileStoreITCase.java | 12 ++- 17 files changed, 222 insertions(+), 83 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 8d9eca29584b..ca8b90994d23 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -214,7 +214,8 @@ public SnapshotDeletion newSnapshotDeletion() { manifestFileFactory().create(), manifestListFactory().create(), newIndexFileHandler(), - newStatsFileHandler()); + newStatsFileHandler(), + options.changelogProducer() != CoreOptions.ChangelogProducer.NONE); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java index c0b5c289cf7f..9d923f46d069 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java @@ -307,9 +307,14 @@ public void cleanUnusedManifestList(String manifestName, Set skippingSet } protected void cleanUnusedManifests( - Snapshot snapshot, Set skippingSet, boolean deleteChangelog) { - cleanUnusedManifestList(snapshot.baseManifestList(), skippingSet); - cleanUnusedManifestList(snapshot.deltaManifestList(), skippingSet); + Snapshot snapshot, + Set skippingSet, + boolean deleteDataManifestLists, + boolean deleteChangelog) { + if (deleteDataManifestLists) { + cleanUnusedManifestList(snapshot.baseManifestList(), skippingSet); + cleanUnusedManifestList(snapshot.deltaManifestList(), skippingSet); + } if (deleteChangelog && snapshot.changelogManifestList() != null) { cleanUnusedManifestList(snapshot.changelogManifestList(), skippingSet); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index e78174db7ac9..16d854ea43a4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -212,25 +212,72 @@ private Map getCandidateDeletingFiles() { private List getUsedFilesForChangelog(Changelog changelog) { List files = new ArrayList<>(); - if (changelog.changelogManifestList() != null) { - files.add(changelog.changelogManifestList()); - } - + List manifestFileMetas = new ArrayList<>(); try { // try to read manifests - List manifestFileMetas = - retryReadingFiles( - () -> - manifestList.readWithIOException( - changelog.changelogManifestList())); - if (manifestFileMetas == null) { - return Collections.emptyList(); + // changelog manifest + List changelogManifest = new ArrayList<>(); + if (changelog.changelogManifestList() != null) { + files.add(changelog.changelogManifestList()); + changelogManifest = + retryReadingFiles( + () -> + manifestList.readWithIOException( + changelog.changelogManifestList())); + if (changelogManifest != null) { + manifestFileMetas.addAll(changelogManifest); + } } - List manifestFileName = + + // base manifest + if (manifestList.exists(changelog.baseManifestList())) { + files.add(changelog.baseManifestList()); + List baseManifest = + retryReadingFiles( + () -> + manifestList.readWithIOException( + changelog.baseManifestList())); + if (baseManifest != null) { + manifestFileMetas.addAll(baseManifest); + } + } + + // delta manifest + List deltaManifest = null; + if (manifestList.exists(changelog.deltaManifestList())) { + files.add(changelog.deltaManifestList()); + deltaManifest = + retryReadingFiles( + () -> + manifestList.readWithIOException( + changelog.deltaManifestList())); + if (deltaManifest != null) { + manifestFileMetas.addAll(deltaManifest); + } + } + + files.addAll( manifestFileMetas.stream() .map(ManifestFileMeta::fileName) - .collect(Collectors.toList()); - files.addAll(manifestFileName); + .collect(Collectors.toList())); + + // data file + List manifestFileName = new ArrayList<>(); + if (changelog.changelogManifestList() != null) { + manifestFileName.addAll( + changelogManifest == null + ? new ArrayList<>() + : changelogManifest.stream() + .map(ManifestFileMeta::fileName) + .collect(Collectors.toList())); + } else { + manifestFileName.addAll( + deltaManifest == null + ? new ArrayList<>() + : deltaManifest.stream() + .map(ManifestFileMeta::fileName) + .collect(Collectors.toList())); + } // try to read data files List dataFiles = retryReadingDataFiles(manifestFileName); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java index f7fbdfac20e2..e83056638639 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java @@ -23,6 +23,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestList; @@ -39,24 +40,45 @@ /** Delete snapshot files. */ public class SnapshotDeletion extends FileDeletionBase { + private final boolean produceChangelog; + public SnapshotDeletion( FileIO fileIO, FileStorePathFactory pathFactory, ManifestFile manifestFile, ManifestList manifestList, IndexFileHandler indexFileHandler, - StatsFileHandler statsFileHandler) { + StatsFileHandler statsFileHandler, + boolean produceChangelog) { super(fileIO, pathFactory, manifestFile, manifestList, indexFileHandler, statsFileHandler); + this.produceChangelog = produceChangelog; } @Override public void cleanUnusedDataFiles(Snapshot snapshot, Predicate skipper) { - cleanUnusedDataFiles(snapshot.deltaManifestList(), skipper); + if (changelogDecoupled && !produceChangelog) { + // Skip clean the 'APPEND' data files.If we do not have the file source information + // eg: the old version table file, we just skip clean this here, let it done by + // ExpireChangelogImpl + Predicate enriched = + manifestEntry -> + skipper.test(manifestEntry) + || (manifestEntry.file().fileSource().orElse(FileSource.APPEND) + == FileSource.APPEND); + cleanUnusedDataFiles(snapshot.deltaManifestList(), enriched); + } else { + cleanUnusedDataFiles(snapshot.deltaManifestList(), skipper); + } } @Override public void cleanUnusedManifests(Snapshot snapshot, Set skippingSet) { - cleanUnusedManifests(snapshot, skippingSet, !changelogDecoupled); + // delay clean the base and delta manifest lists when changelog decoupled enabled + cleanUnusedManifests( + snapshot, + skippingSet, + !changelogDecoupled || produceChangelog, + !changelogDecoupled); } @VisibleForTesting diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java index 01db29ec94c8..3b1174223676 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java @@ -85,7 +85,7 @@ public void cleanUnusedDataFiles(Snapshot taggedSnapshot, Predicate skippingSet) { // doesn't clean changelog files because they are handled by SnapshotDeletion - cleanUnusedManifests(taggedSnapshot, skippingSet, false); + cleanUnusedManifests(taggedSnapshot, skippingSet, true, false); } public Predicate dataFileSkipper(Snapshot fromSnapshot) throws Exception { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 6f723f834f1c..bf8857e721c8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -570,6 +570,7 @@ private RollbackHelper rollbackHelper() { tagManager(), fileIO, store().newSnapshotDeletion(), + store().newChangelogDeletion(), store().newTagDeletion()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java index 90801caf0978..bd608cdcaf4c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java @@ -22,6 +22,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.fs.FileIO; import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.operation.ChangelogDeletion; import org.apache.paimon.operation.SnapshotDeletion; import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.utils.SnapshotManager; @@ -51,6 +52,7 @@ public class RollbackHelper { private final TagManager tagManager; private final FileIO fileIO; private final SnapshotDeletion snapshotDeletion; + private final ChangelogDeletion changelogDeletion; private final TagDeletion tagDeletion; public RollbackHelper( @@ -58,11 +60,13 @@ public RollbackHelper( TagManager tagManager, FileIO fileIO, SnapshotDeletion snapshotDeletion, + ChangelogDeletion changelogDeletion, TagDeletion tagDeletion) { this.snapshotManager = snapshotManager; this.tagManager = tagManager; this.fileIO = fileIO; this.snapshotDeletion = snapshotDeletion; + this.changelogDeletion = changelogDeletion; this.tagDeletion = tagDeletion; } @@ -72,6 +76,7 @@ public void cleanLargerThan(Snapshot retainedSnapshot) { List cleanedSnapshots = cleanSnapshotsDataFiles(retainedSnapshot); List cleanedChangelogs = cleanLongLivedChangelogDataFiles(retainedSnapshot); List cleanedTags = cleanTagsDataFiles(retainedSnapshot); + Set cleanedIds = new HashSet<>(); // clean manifests // this can be used for snapshots and tags manifests cleaning both @@ -79,17 +84,18 @@ public void cleanLargerThan(Snapshot retainedSnapshot) { for (Snapshot snapshot : cleanedSnapshots) { snapshotDeletion.cleanUnusedManifests(snapshot, manifestsSkippingSet); + cleanedIds.add(snapshot.id()); } for (Changelog changelog : cleanedChangelogs) { - if (changelog.changelogManifestList() != null) { - snapshotDeletion.cleanUnusedManifestList( - changelog.changelogManifestList(), new HashSet<>()); - } + changelogDeletion.cleanUnusedManifests(changelog, manifestsSkippingSet); + cleanedIds.add(changelog.id()); } - cleanedTags.removeAll(cleanedSnapshots); for (Snapshot snapshot : cleanedTags) { + if (cleanedIds.contains(snapshot.id())) { + continue; + } tagDeletion.cleanUnusedManifests(snapshot, manifestsSkippingSet); } @@ -122,7 +128,9 @@ private List cleanSnapshotsDataFiles(Snapshot retainedSnapshot) { // when deleting non-existing data files for (Snapshot snapshot : toBeCleaned) { snapshotDeletion.deleteAddedDataFiles(snapshot.deltaManifestList()); - snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList()); + if (snapshot.changelogManifestList() != null) { + snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList()); + } } // delete directories @@ -149,9 +157,8 @@ private List cleanLongLivedChangelogDataFiles(Snapshot retainedSnapsh // delete data files of changelog for (Changelog changelog : toBeCleaned) { - if (changelog.changelogManifestList() != null) { - snapshotDeletion.deleteAddedDataFiles(changelog.changelogManifestList()); - } + // clean the deleted file + changelogDeletion.cleanUnusedDataFiles(changelog, manifestEntry -> false); } // delete directories diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index 66b4dd8e438c..8300db4c71fa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -122,7 +122,6 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { return new ContinuousFromSnapshotStartingScanner( snapshotManager, consumer.get().nextSnapshot(), - options.changelogProducer() != ChangelogProducer.NONE, options.changelogLifecycleDecoupled()); } } @@ -152,7 +151,6 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { ? new ContinuousFromTimestampStartingScanner( snapshotManager, startupMillis, - options.changelogProducer() != ChangelogProducer.NONE, options.changelogLifecycleDecoupled()) : new StaticFromTimestampStartingScanner(snapshotManager, startupMillis); case FROM_FILE_CREATION_TIME: @@ -164,7 +162,6 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { ? new ContinuousFromSnapshotStartingScanner( snapshotManager, options.scanSnapshotId(), - options.changelogProducer() != ChangelogProducer.NONE, options.changelogLifecycleDecoupled()) : new StaticFromSnapshotStartingScanner( snapshotManager, options.scanSnapshotId()); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java index 38c01f35d245..d8e614222857 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java @@ -27,18 +27,13 @@ */ public class ContinuousFromSnapshotStartingScanner extends AbstractStartingScanner { - private final boolean changelogAsFollowup; private final boolean changelogDecoupled; public ContinuousFromSnapshotStartingScanner( - SnapshotManager snapshotManager, - long snapshotId, - boolean changelogAsFollowup, - boolean changelogDecoupled) { + SnapshotManager snapshotManager, long snapshotId, boolean changelogDecoupled) { super(snapshotManager); this.startingSnapshotId = snapshotId; this.changelogDecoupled = changelogDecoupled; - this.changelogAsFollowup = changelogAsFollowup; } @Override @@ -54,7 +49,7 @@ public Result scan(SnapshotReader snapshotReader) { private Long getEarliestId() { Long earliestId; - if (changelogAsFollowup && changelogDecoupled) { + if (changelogDecoupled) { Long earliestChangelogId = snapshotManager.earliestLongLivedChangelogId(); earliestId = earliestChangelogId == null diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java index 7e39e0859781..941174835537 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java @@ -37,13 +37,10 @@ public class ContinuousFromTimestampStartingScanner extends AbstractStartingScan private final boolean startFromChangelog; public ContinuousFromTimestampStartingScanner( - SnapshotManager snapshotManager, - long startupMillis, - boolean changelogAsFollowup, - boolean changelogDecoupled) { + SnapshotManager snapshotManager, long startupMillis, boolean changelogDecoupled) { super(snapshotManager); this.startupMillis = startupMillis; - this.startFromChangelog = changelogAsFollowup && changelogDecoupled; + this.startFromChangelog = changelogDecoupled; this.startingSnapshotId = this.snapshotManager.earlierThanTimeMills(startupMillis, startFromChangelog); } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java b/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java index 87f1fb84984c..2044083924b6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java @@ -32,8 +32,6 @@ public class NextSnapshotFetcher { public static final Logger LOG = LoggerFactory.getLogger(NextSnapshotFetcher.class); private final SnapshotManager snapshotManager; private final boolean changelogDecoupled; - // Only support changelog as follow-up now. - private final boolean changelogAsFollowup; public NextSnapshotFetcher( SnapshotManager snapshotManager, @@ -41,7 +39,6 @@ public NextSnapshotFetcher( boolean changelogAsFollowup) { this.snapshotManager = snapshotManager; this.changelogDecoupled = changelogDecoupled; - this.changelogAsFollowup = changelogAsFollowup; } @Nullable @@ -59,9 +56,7 @@ public Snapshot getNextSnapshot(long nextSnapshotId) { return null; } - if (!changelogAsFollowup - || !changelogDecoupled - || !snapshotManager.longLivedChangelogExists(nextSnapshotId)) { + if (!changelogDecoupled || !snapshotManager.longLivedChangelogExists(nextSnapshotId)) { throw new OutOfRangeException( String.format( "The snapshot with id %d has expired. You can: " diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index 91b08635fc96..2b0c66380956 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -26,6 +26,8 @@ import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.IndexIncrement; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFileMeta; @@ -607,6 +609,12 @@ private static Set getSnapshotFileInUse( FileStorePathFactory pathFactory, ManifestList manifestList) { Set result = new HashSet<>(); + SchemaManager schemaManager = new SchemaManager(fileIO, snapshotManager.tablePath()); + CoreOptions options = new CoreOptions(schemaManager.latest().get().options()); + boolean produceChangelog = + options.changelogProducer() != CoreOptions.ChangelogProducer.NONE; + // The option from the table may not align with the expiration config + boolean changelogDecoupled = snapshotManager.earliestLongLivedChangelogId() != null; Path snapshotPath = snapshotManager.snapshotPath(snapshotId); Snapshot snapshot = Snapshot.fromPath(fileIO, snapshotPath); @@ -634,6 +642,27 @@ private static Set getSnapshotFileInUse( entry.file().fileName())); } + // Add 'DELETE' 'APPEND' file in snapshot + // These 'delete' files can be merged by the plan#splits, + // so it's not shown in the entries above. + // In other words, these files are not used (by snapshot or changelog) now, + // but it can only be cleaned after this snapshot expired, so we should add it to the file + // use list. + if (changelogDecoupled && !produceChangelog) { + entries = scan.withManifestList(snapshot.deltaManifests(manifestList)).plan().files(); + for (ManifestEntry entry : entries) { + // append delete file are delayed to delete + if (entry.kind() == FileKind.DELETE + && entry.file().fileSource().orElse(FileSource.APPEND) + == FileSource.APPEND) { + result.add( + new Path( + pathFactory.bucketPath(entry.partition(), entry.bucket()), + entry.file().fileName())); + } + } + } + return result; } @@ -645,6 +674,10 @@ private static Set getChangelogFileInUse( FileStorePathFactory pathFactory, ManifestList manifestList) { Set result = new HashSet<>(); + SchemaManager schemaManager = new SchemaManager(fileIO, snapshotManager.tablePath()); + CoreOptions options = new CoreOptions(schemaManager.latest().get().options()); + boolean produceChangelog = + options.changelogProducer() != CoreOptions.ChangelogProducer.NONE; Path changelogPath = snapshotManager.longLivedChangelogPath(changelogId); Changelog changelog = Changelog.fromPath(fileIO, changelogPath); @@ -653,23 +686,50 @@ private static Set getChangelogFileInUse( result.add(changelogPath); // manifest lists + if (!produceChangelog) { + result.add(pathFactory.toManifestListPath(changelog.baseManifestList())); + result.add(pathFactory.toManifestListPath(changelog.deltaManifestList())); + } if (changelog.changelogManifestList() != null) { result.add(pathFactory.toManifestListPath(changelog.changelogManifestList())); } // manifests - List manifests = new ArrayList<>(); - manifests.addAll(changelog.changelogManifests(manifestList)); + List manifests = + new ArrayList<>(changelog.changelogManifests(manifestList)); + if (!produceChangelog) { + manifests.addAll(changelog.dataManifests(manifestList)); + } manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName()))); // data file - List entries = scan.withManifestList(manifests).plan().files(); - for (ManifestEntry entry : entries) { - result.add( - new Path( - pathFactory.bucketPath(entry.partition(), entry.bucket()), - entry.file().fileName())); + // not all manifests contains useful data file + // (1) produceChangelog = 'true': data file in changelog manifests + // (2) produceChangelog = 'false': 'APPEND' data file in delta manifests + + // delta file + if (!produceChangelog) { + for (ManifestEntry entry : + scan.withManifestList(changelog.deltaManifests(manifestList)).plan().files()) { + if (entry.file().fileSource().orElse(FileSource.APPEND) == FileSource.APPEND) { + result.add( + new Path( + pathFactory.bucketPath(entry.partition(), entry.bucket()), + entry.file().fileName())); + } + } + } else { + // changelog + for (ManifestEntry entry : + scan.withManifestList(changelog.changelogManifests(manifestList)) + .plan() + .files()) { + result.add( + new Path( + pathFactory.bucketPath(entry.partition(), entry.bucket()), + entry.file().fileName())); + } } return result; } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index 7c725164ef68..6af2d3051a97 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -85,7 +85,7 @@ public void beforeEach() throws Exception { TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), TestKeyValueGenerator.getPrimaryKeys( TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED), - Collections.emptyMap(), + store.options().toMap(), null)); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java index 129f928c9c65..7ca2b58d7bf1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java @@ -59,6 +59,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.util.ArrayList; @@ -328,8 +330,10 @@ private void validateSnapshot(Snapshot snapshot, List data) throws Exc assertThat(result).containsExactlyInAnyOrderElementsOf(TestPojo.formatData(data)); } - @Test - public void testCleanOrphanFilesWithChangelogDecoupled() throws Exception { + @ValueSource(strings = {"none", "input"}) + @ParameterizedTest(name = "changelog-producer = {0}") + public void testCleanOrphanFilesWithChangelogDecoupled(String changelogProducer) + throws Exception { // recreate the table with another option this.write.close(); this.commit.close(); @@ -338,6 +342,7 @@ public void testCleanOrphanFilesWithChangelogDecoupled() throws Exception { options.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT); options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 15); options.set(CoreOptions.CHANGELOG_NUM_RETAINED_MAX, 20); + options.setString(CoreOptions.CHANGELOG_PRODUCER.key(), changelogProducer); FileStoreTable table = createFileStoreTable(rowType, options); String commitUser = UUID.randomUUID().toString(); this.table = table; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index 8026becfb9bd..095511ae1d35 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -65,6 +65,8 @@ import org.apache.paimon.utils.CompatibilityTestUtils; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.nio.file.Files; @@ -1491,15 +1493,15 @@ public void testLookupWithDropDelete() throws Exception { "1|2|200|binary|varbinary|mapKey:mapVal|multiset")); } - @Test - public void testRollbackToTagWithChangelogDecoupled() throws Exception { + @ParameterizedTest(name = "changelog-producer = {0}") + @ValueSource(strings = {"none", "input"}) + public void testRollbackToTagWithChangelogDecoupled(String changelogProducer) throws Exception { int commitTimes = ThreadLocalRandom.current().nextInt(100) + 6; FileStoreTable table = createFileStoreTable( options -> - options.set( - CoreOptions.CHANGELOG_PRODUCER, - CoreOptions.ChangelogProducer.INPUT)); + options.setString( + CoreOptions.CHANGELOG_PRODUCER.key(), changelogProducer)); prepareRollbackTable(commitTimes, table); int t1 = 1; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java index 51c2c10e7260..46093f2cba0d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java @@ -29,6 +29,8 @@ import org.apache.paimon.utils.TraceableFileIO; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.UUID; @@ -65,8 +67,7 @@ public void testScan() throws Exception { long timestamp = snapshotManager.snapshot(3).timeMillis(); ContinuousFromTimestampStartingScanner scanner = - new ContinuousFromTimestampStartingScanner( - snapshotManager, timestamp, false, false); + new ContinuousFromTimestampStartingScanner(snapshotManager, timestamp, false); StartingScanner.NextSnapshot result = (StartingScanner.NextSnapshot) scanner.scan(snapshotReader); assertThat(result.nextSnapshotId()).isEqualTo(3); @@ -80,7 +81,7 @@ public void testNoSnapshot() { SnapshotManager snapshotManager = table.snapshotManager(); ContinuousFromTimestampStartingScanner scanner = new ContinuousFromTimestampStartingScanner( - snapshotManager, System.currentTimeMillis(), false, false); + snapshotManager, System.currentTimeMillis(), false); assertThat(scanner.scan(snapshotReader)).isInstanceOf(StartingScanner.NoSnapshot.class); } @@ -100,8 +101,7 @@ public void testNoSnapshotBeforeTimestamp() throws Exception { long timestamp = snapshotManager.snapshot(1).timeMillis(); ContinuousFromTimestampStartingScanner scanner = - new ContinuousFromTimestampStartingScanner( - snapshotManager, timestamp, false, false); + new ContinuousFromTimestampStartingScanner(snapshotManager, timestamp, false); StartingScanner.NextSnapshot result = (StartingScanner.NextSnapshot) scanner.scan(snapshotReader); // next snapshot @@ -111,14 +111,15 @@ public void testNoSnapshotBeforeTimestamp() throws Exception { commit.close(); } - @Test - public void testScanFromChangelog() throws Exception { + @ParameterizedTest(name = "changelog-producer = {0}") + @ValueSource(strings = {"none", "input"}) + public void testScanFromChangelog(String changelogProducer) throws Exception { Options options = new Options(); options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 2); options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 1); options.set(CoreOptions.CHANGELOG_NUM_RETAINED_MAX, 20); options.set(CoreOptions.CHANGELOG_NUM_RETAINED_MIN, 10); - options.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT); + options.setString(CoreOptions.CHANGELOG_PRODUCER.key(), changelogProducer); FileStoreTable table = createFileStoreTable( true, @@ -158,20 +159,20 @@ public void testScanFromChangelog() throws Exception { ContinuousFromTimestampStartingScanner scanner = new ContinuousFromTimestampStartingScanner( - snapshotManager, snapshotManager.snapshot(3).timeMillis(), true, true); + snapshotManager, snapshotManager.snapshot(3).timeMillis(), true); StartingScanner.NextSnapshot result = (StartingScanner.NextSnapshot) scanner.scan(snapshotReader); assertThat(result.nextSnapshotId()).isEqualTo(3); scanner = new ContinuousFromTimestampStartingScanner( - snapshotManager, snapshotManager.snapshot(2).timeMillis(), true, true); + snapshotManager, snapshotManager.snapshot(2).timeMillis(), true); assertThat(((StartingScanner.NextSnapshot) scanner.scan(snapshotReader)).nextSnapshotId()) .isEqualTo(2); scanner = new ContinuousFromTimestampStartingScanner( - snapshotManager, snapshotManager.changelog(1).timeMillis(), true, true); + snapshotManager, snapshotManager.changelog(1).timeMillis(), true); assertThat(((StartingScanner.NextSnapshot) scanner.scan(snapshotReader)).nextSnapshotId()) .isEqualTo(1); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java index c60b26f224ef..de3ab0c5f698 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java @@ -33,6 +33,8 @@ import org.apache.flink.util.CloseableIterator; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.ArrayList; import java.util.Arrays; @@ -604,16 +606,18 @@ public void testScanFromOldSchema() throws Exception { assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(3, "c")); } - @Test - public void testScanFromChangelog() throws Exception { + @ParameterizedTest(name = "changelog-producer = {0}") + @ValueSource(strings = {"none", "input"}) + public void testScanFromChangelog(String changelogProducer) throws Exception { batchSql( "CREATE TABLE IF NOT EXISTS T3 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)\n" - + " WITH ('changelog-producer'='input', 'bucket' = '1', \n" + + " WITH ('changelog-producer'='%s', 'bucket' = '1', \n" + " 'snapshot.num-retained.max' = '2',\n" + " 'snapshot.num-retained.min' = '1',\n" + " 'changelog.num-retained.max' = '3',\n" + " 'changelog.num-retained.min' = '1'\n" - + ")"); + + ")", + changelogProducer); batchSql("INSERT INTO T3 VALUES ('1', '2', '3')"); batchSql("INSERT INTO T3 VALUES ('4', '5', '6')");