From 4547aec9cefd392650b912ad35691601beb10408 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Wed, 20 Mar 2024 16:35:14 +0800 Subject: [PATCH 1/5] Introduce SplitGroup for SplitGenerator --- .../org/apache/paimon/io/DataFileMeta.java | 4 ++ .../source/AppendOnlySplitGenerator.java | 11 +++-- .../table/source/MergeTreeSplitGenerator.java | 24 ++++++++--- .../paimon/table/source/SplitGenerator.java | 24 ++++++++++- .../snapshot/IncrementalStartingScanner.java | 5 ++- .../source/snapshot/SnapshotReaderImpl.java | 41 +++++++------------ .../table/source/SplitGeneratorTest.java | 8 ++-- 7 files changed, 73 insertions(+), 44 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index 3712411947f6..fc41c059fdd1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -260,6 +260,10 @@ public Optional fileFormat() { } } + public boolean rawConvertible() { + return level != 0 && Objects.equals(deleteRowCount, 0L); + } + public DataFileMeta upgrade(int newLevel) { checkArgument(newLevel > this.level); return new DataFileMeta( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java index 002072a38736..dbc341803075 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.List; import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.paimon.append.AppendOnlyCompactManager.fileComparator; @@ -44,21 +45,23 @@ public AppendOnlySplitGenerator( } @Override - public List> splitForBatch(List input) { + public List splitForBatch(List input) { List files = new ArrayList<>(input); files.sort(fileComparator(bucketMode == BucketMode.UNAWARE)); Function weightFunc = file -> Math.max(file.fileSize(), openFileCost); - return BinPacking.packForOrdered(files, weightFunc, targetSplitSize); + return BinPacking.packForOrdered(files, weightFunc, targetSplitSize).stream() + .map(SplitGroup::rawConvertibleGroup) + .collect(Collectors.toList()); } @Override - public List> splitForStreaming(List files) { + public List splitForStreaming(List files) { // When the bucket mode is unaware, we spit the files as batch, because unaware-bucket table // only contains one bucket (bucket 0). if (bucketMode == BucketMode.UNAWARE) { return splitForBatch(files); } else { - return Collections.singletonList(files); + return Collections.singletonList(SplitGroup.rawConvertibleGroup(files)); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java b/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java index 9a06a53f4ce6..206c177334f3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java @@ -61,11 +61,17 @@ public MergeTreeSplitGenerator( } @Override - public List> splitForBatch(List files) { - if (deletionVectorsEnabled || mergeEngine == FIRST_ROW) { + public List splitForBatch(List files) { + boolean rawConvertible = files.stream().allMatch(DataFileMeta::rawConvertible); + boolean oneLevel = + files.stream().map(DataFileMeta::level).collect(Collectors.toSet()).size() == 1; + + if (rawConvertible && (deletionVectorsEnabled || mergeEngine == FIRST_ROW || oneLevel)) { Function weightFunc = file -> Math.max(file.fileSize(), openFileCost); - return BinPacking.packForOrdered(files, weightFunc, targetSplitSize); + return BinPacking.packForOrdered(files, weightFunc, targetSplitSize).stream() + .map(SplitGroup::rawConvertibleGroup) + .collect(Collectors.toList()); } /* @@ -93,13 +99,19 @@ public List> splitForBatch(List files) { new IntervalPartition(files, keyComparator) .partition().stream().map(this::flatRun).collect(Collectors.toList()); - return packSplits(sections); + return packSplits(sections).stream() + .map( + f -> + f.size() == 1 && f.get(0).rawConvertible() + ? SplitGroup.rawConvertibleGroup(f) + : SplitGroup.nonRawConvertibleGroup(f)) + .collect(Collectors.toList()); } @Override - public List> splitForStreaming(List files) { + public List splitForStreaming(List files) { // We don't split streaming scan files - return Collections.singletonList(files); + return Collections.singletonList(SplitGroup.rawConvertibleGroup(files)); } private List> packSplits(List> sections) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java b/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java index f4aa7c7e2858..73cfa9826839 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/SplitGenerator.java @@ -25,7 +25,27 @@ /** Generate splits from {@link DataFileMeta}s. */ public interface SplitGenerator { - List> splitForBatch(List files); + List splitForBatch(List files); - List> splitForStreaming(List files); + List splitForStreaming(List files); + + /** Split group. */ + class SplitGroup { + + public final List files; + public final boolean rawConvertible; + + private SplitGroup(List files, boolean rawConvertible) { + this.files = files; + this.rawConvertible = rawConvertible; + } + + public static SplitGroup rawConvertibleGroup(List files) { + return new SplitGroup(files, true); + } + + public static SplitGroup nonRawConvertibleGroup(List files) { + return new SplitGroup(files, false); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java index 49ab3a87e764..602a6370a76f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java @@ -25,6 +25,7 @@ import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.PlanImpl; import org.apache.paimon.table.source.ScanMode; +import org.apache.paimon.table.source.SplitGenerator; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; @@ -65,7 +66,7 @@ public Result scan(SnapshotReader reader) { for (Map.Entry, List> entry : grouped.entrySet()) { BinaryRow partition = entry.getKey().getLeft(); int bucket = entry.getKey().getRight(); - for (List files : + for (SplitGenerator.SplitGroup splitGroup : reader.splitGenerator().splitForBatch(entry.getValue())) { // TODO pass deletion files result.add( @@ -73,7 +74,7 @@ public Result scan(SnapshotReader reader) { .withSnapshot(endingSnapshotId) .withPartition(partition) .withBucket(bucket) - .withDataFiles(files) + .withDataFiles(splitGroup.files) .build()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index aa28fa667467..fe48557e3ce8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -64,7 +64,6 @@ import java.util.function.BiConsumer; import java.util.stream.Collectors; -import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.apache.paimon.operation.FileStoreScan.Plan.groupByPartFiles; import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping; @@ -278,18 +277,24 @@ private List generateSplits( .withPartition(partition) .withBucket(bucket) .isStreaming(isStreaming); - List> splitGroups = + List splitGroups = isStreaming ? splitGenerator.splitForStreaming(bucketFiles) : splitGenerator.splitForBatch(bucketFiles); - for (List dataFiles : splitGroups) { - builder.withDataFiles(dataFiles) - .rawFiles(convertToRawFiles(partition, bucket, dataFiles)); - if (deletionVectors) { - IndexFileMeta deletionIndexFile = - indexFileHandler + + IndexFileMeta deletionIndexFile = + deletionVectors + ? indexFileHandler .scan(snapshotId, DELETION_VECTORS_INDEX, partition, bucket) - .orElse(null); + .orElse(null) + : null; + for (SplitGenerator.SplitGroup splitGroup : splitGroups) { + List dataFiles = splitGroup.files; + builder.withDataFiles(dataFiles); + if (splitGroup.rawConvertible) { + builder.rawFiles(convertToRawFiles(partition, bucket, dataFiles)); + } + if (deletionVectors) { builder.withDataDeletionFiles( getDeletionFiles(dataFiles, deletionIndexFile)); } @@ -370,8 +375,7 @@ private Plan toChangesPlan( .withBucket(bucket) .withBeforeFiles(before) .withDataFiles(data) - .isStreaming(isStreaming) - .rawFiles(convertToRawFiles(part, bucket, data)); + .isStreaming(isStreaming); if (deletionVectors) { IndexFileMeta beforeDeletionIndex = indexFileHandler @@ -437,21 +441,6 @@ private List getDeletionFiles( private List convertToRawFiles( BinaryRow partition, int bucket, List dataFiles) { String bucketPath = pathFactory.bucketPath(partition, bucket).toString(); - - // append only or deletionVectors files can be returned - if (tableSchema.primaryKeys().isEmpty() || deletionVectors || mergeEngine == FIRST_ROW) { - return makeRawTableFiles(bucketPath, dataFiles); - } - - int maxLevel = options.numLevels() - 1; - if (dataFiles.stream().map(DataFileMeta::level).allMatch(l -> l == maxLevel)) { - return makeRawTableFiles(bucketPath, dataFiles); - } - - return Collections.emptyList(); - } - - private List makeRawTableFiles(String bucketPath, List dataFiles) { return dataFiles.stream() .map(f -> makeRawTableFile(bucketPath, f)) .collect(Collectors.toList()); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java index 1278339210de..1cc3e2bf27f6 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java @@ -124,11 +124,11 @@ public void testMergeTree() { Collections.singletonList("6")); } - private List> toNames(List> splits) { - return splits.stream() + private List> toNames(List splitGroups) { + return splitGroups.stream() .map( - files -> - files.stream() + splitGroup -> + splitGroup.files.stream() .map(DataFileMeta::fileName) .collect(Collectors.toList())) .collect(Collectors.toList()); From bb36c4d4a5673e274d348e5c5257c5c33e301037 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Mon, 25 Mar 2024 15:59:27 +0800 Subject: [PATCH 2/5] add test case --- .../apache/paimon/io/DataFileTestUtils.java | 9 ++- .../table/source/SplitGeneratorTest.java | 76 +++++++++++++++++++ 2 files changed, 83 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java index 624237b38680..b902ae967773 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java @@ -72,10 +72,15 @@ public static DataFileMeta newFile() { public static DataFileMeta newFile( String name, int level, int minKey, int maxKey, long maxSequence) { + return newFile(name, level, minKey, maxKey, maxSequence, 0L); + } + + public static DataFileMeta newFile( + String name, int level, int minKey, int maxKey, long maxSequence, long deleteRowCount) { return new DataFileMeta( name, maxKey - minKey + 1, - 1, + maxKey - minKey + 1, row(minKey), row(maxKey), null, @@ -84,7 +89,7 @@ public static DataFileMeta newFile( maxSequence, 0, level, - 0L); + deleteRowCount); } public static BinaryRow row(int i) { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java index 1cc3e2bf27f6..82663a72cb2e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.table.BucketMode; +import org.apache.paimon.utils.Pair; import org.junit.jupiter.api.Test; @@ -31,8 +32,10 @@ import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE; +import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW; import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; import static org.apache.paimon.io.DataFileTestUtils.fromMinMax; +import static org.apache.paimon.io.DataFileTestUtils.newFile; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link AppendOnlySplitGenerator} and {@link MergeTreeSplitGenerator}. */ @@ -124,6 +127,64 @@ public void testMergeTree() { Collections.singletonList("6")); } + @Test + public void testSplitRawConvertible() { + Comparator comparator = Comparator.comparingInt(o -> o.getInt(0)); + MergeTreeSplitGenerator mergeTreeSplitGenerator = + new MergeTreeSplitGenerator(comparator, 100, 2, false, DEDUPLICATE); + + // When level0 exists, should not be rawConvertible + List files1 = + Arrays.asList(newFile("1", 0, 0, 10, 10L), newFile("2", 0, 10, 20, 20L)); + assertThat(toNamesAndRawConvertible(mergeTreeSplitGenerator.splitForBatch(files1))) + .containsExactlyInAnyOrder(Pair.of(Arrays.asList("1", "2"), false)); + + // When deleteRowCount > 0, should not be rawConvertible + List files2 = + Arrays.asList(newFile("1", 1, 0, 10, 10L, 1L), newFile("2", 1, 10, 20, 20L)); + assertThat(toNamesAndRawConvertible(mergeTreeSplitGenerator.splitForBatch(files2))) + .containsExactlyInAnyOrder(Pair.of(Arrays.asList("1", "2"), false)); + + // No level0 and deleteRowCount == 0: + // All in one level, should be rawConvertible + List files3 = + Arrays.asList(newFile("1", 1, 0, 10, 10L), newFile("2", 1, 10, 20, 20L)); + assertThat(toNamesAndRawConvertible(mergeTreeSplitGenerator.splitForBatch(files3))) + .containsExactlyInAnyOrder(Pair.of(Arrays.asList("1", "2"), true)); + + // Not all in one level, should not be rawConvertible + List files4 = + Arrays.asList(newFile("1", 1, 0, 10, 10L), newFile("2", 2, 10, 20, 20L)); + assertThat(toNamesAndRawConvertible(mergeTreeSplitGenerator.splitForBatch(files4))) + .containsExactlyInAnyOrder(Pair.of(Arrays.asList("1", "2"), false)); + + // Not all in one level but with deletion vectors enabled, should be rawConvertible + MergeTreeSplitGenerator splitGeneratorWithDVEnabled = + new MergeTreeSplitGenerator(comparator, 100, 2, true, DEDUPLICATE); + assertThat(toNamesAndRawConvertible(splitGeneratorWithDVEnabled.splitForBatch(files4))) + .containsExactlyInAnyOrder(Pair.of(Arrays.asList("1", "2"), true)); + + // Not all in one level but with first row merge engine, should be rawConvertible + MergeTreeSplitGenerator splitGeneratorWithFirstRow = + new MergeTreeSplitGenerator(comparator, 100, 2, false, FIRST_ROW); + assertThat(toNamesAndRawConvertible(splitGeneratorWithFirstRow.splitForBatch(files4))) + .containsExactlyInAnyOrder(Pair.of(Arrays.asList("1", "2"), true)); + + // Split with one file should be rawConvertible + List files5 = + Arrays.asList( + newFile("1", 1, 0, 10, 10L), + newFile("2", 2, 0, 12, 12L), + newFile("3", 3, 15, 60, 60L), + newFile("4", 4, 18, 40, 40L), + newFile("5", 5, 82, 85, 85L), + newFile("6", 6, 100, 200, 200L)); + assertThat(toNamesAndRawConvertible(mergeTreeSplitGenerator.splitForBatch(files5))) + .containsExactlyInAnyOrder( + Pair.of(Arrays.asList("1", "2", "3", "4", "5"), false), + Pair.of(Collections.singletonList("6"), true)); + } + private List> toNames(List splitGroups) { return splitGroups.stream() .map( @@ -133,4 +194,19 @@ private List> toNames(List splitGroups) .collect(Collectors.toList())) .collect(Collectors.toList()); } + + private List, Boolean>> toNamesAndRawConvertible( + List splitGroups) { + return splitGroups.stream() + .map( + splitGroup -> { + List sortedFileNames = + splitGroup.files.stream() + .sorted(Comparator.comparing(DataFileMeta::fileName)) + .map(DataFileMeta::fileName) + .collect(Collectors.toList()); + return Pair.of(sortedFileNames, splitGroup.rawConvertible); + }) + .collect(Collectors.toList()); + } } From 2edefe1bbd95874125e1b2cd02a8489b63aa47a2 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Tue, 26 Mar 2024 11:54:48 +0800 Subject: [PATCH 3/5] move rawConvertible --- .../src/main/java/org/apache/paimon/io/DataFileMeta.java | 4 ---- .../paimon/table/source/MergeTreeSplitGenerator.java | 9 +++++++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index fc41c059fdd1..3712411947f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -260,10 +260,6 @@ public Optional fileFormat() { } } - public boolean rawConvertible() { - return level != 0 && Objects.equals(deleteRowCount, 0L); - } - public DataFileMeta upgrade(int newLevel) { checkArgument(newLevel > this.level); return new DataFileMeta( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java b/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java index 206c177334f3..d172345ebfb8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java @@ -62,7 +62,7 @@ public MergeTreeSplitGenerator( @Override public List splitForBatch(List files) { - boolean rawConvertible = files.stream().allMatch(DataFileMeta::rawConvertible); + boolean rawConvertible = files.stream().allMatch(this::rawConvertible); boolean oneLevel = files.stream().map(DataFileMeta::level).collect(Collectors.toSet()).size() == 1; @@ -102,7 +102,7 @@ public List splitForBatch(List files) { return packSplits(sections).stream() .map( f -> - f.size() == 1 && f.get(0).rawConvertible() + f.size() == 1 && rawConvertible(f.get(0)) ? SplitGroup.rawConvertibleGroup(f) : SplitGroup.nonRawConvertibleGroup(f)) .collect(Collectors.toList()); @@ -141,4 +141,9 @@ private List flatFiles(List> section) { section.forEach(files::addAll); return files; } + + private boolean rawConvertible(DataFileMeta dataFileMeta) { + return dataFileMeta.level() != 0 + && dataFileMeta.deleteRowCount().map(count -> count == 0L).orElse(false); + } } From d3c9c2badc7fd6113563f163b6aa0dac7ff6cdb0 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Tue, 26 Mar 2024 12:14:18 +0800 Subject: [PATCH 4/5] update --- .../paimon/table/source/MergeTreeSplitGenerator.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java b/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java index d172345ebfb8..4cf4a18031f5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java @@ -62,7 +62,8 @@ public MergeTreeSplitGenerator( @Override public List splitForBatch(List files) { - boolean rawConvertible = files.stream().allMatch(this::rawConvertible); + boolean rawConvertible = + files.stream().allMatch(file -> file.level() != 0 && withoutDeleteRow(file)); boolean oneLevel = files.stream().map(DataFileMeta::level).collect(Collectors.toSet()).size() == 1; @@ -102,7 +103,7 @@ public List splitForBatch(List files) { return packSplits(sections).stream() .map( f -> - f.size() == 1 && rawConvertible(f.get(0)) + f.size() == 1 && withoutDeleteRow(f.get(0)) ? SplitGroup.rawConvertibleGroup(f) : SplitGroup.nonRawConvertibleGroup(f)) .collect(Collectors.toList()); @@ -142,8 +143,7 @@ private List flatFiles(List> section) { return files; } - private boolean rawConvertible(DataFileMeta dataFileMeta) { - return dataFileMeta.level() != 0 - && dataFileMeta.deleteRowCount().map(count -> count == 0L).orElse(false); + private boolean withoutDeleteRow(DataFileMeta dataFileMeta) { + return dataFileMeta.deleteRowCount().map(count -> count == 0L).orElse(false); } } From 44501bfd1f26477961ff70b14a4a405bfae94069 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Tue, 26 Mar 2024 12:31:40 +0800 Subject: [PATCH 5/5] fix failure test --- .../apache/paimon/table/source/snapshot/SnapshotReaderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java index e448d8684af1..a34f16a0305d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java @@ -100,7 +100,7 @@ public void testGetPrimaryKeyRawFiles() throws Exception { assertThat(dataSplit.dataFiles()).hasSize(1); DataFileMeta meta = dataSplit.dataFiles().get(0); String partition = dataSplit.partition().getString(0).toString(); - assertThat(dataSplit.convertToRawFiles()).isNotPresent(); + assertThat(dataSplit.convertToRawFiles()).isPresent(); } // write another file on level 0