From 08b2114ec1d7b86ab2eae06d7ba71d21ae432f86 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Mon, 11 Mar 2024 16:34:53 +0800 Subject: [PATCH 1/2] [core] Introduce deletion files to DataSplit --- .../org/apache/paimon/AbstractFileStore.java | 1 + .../org/apache/paimon/KeyValueFileStore.java | 4 +- .../ApplyDeletionVectorReader.java | 13 ++ .../deletionvectors/DeletionVector.java | 27 +++ .../DeletionVectorsMaintainer.java | 8 +- .../apache/paimon/index/HashIndexFile.java | 4 + .../apache/paimon/index/IndexFileHandler.java | 9 + .../paimon/io/KeyValueFileReaderFactory.java | 4 + .../operation/KeyValueFileStoreRead.java | 171 ++++++++++-------- .../operation/KeyValueFileStoreWrite.java | 12 +- .../paimon/table/AbstractFileStoreTable.java | 3 +- .../table/PrimaryKeyFileStoreTable.java | 3 +- .../apache/paimon/table/source/DataSplit.java | 76 ++++++-- .../paimon/table/source/DeletionFile.java | 139 ++++++++++++++ .../table/source/MergeTreeSplitGenerator.java | 14 +- .../org/apache/paimon/table/source/Split.java | 9 + .../source/snapshot/SnapshotReaderImpl.java | 82 +++++++-- .../table/source/SplitGeneratorTest.java | 10 +- .../source/TestChangelogDataReadWrite.java | 4 +- .../org/apache/paimon/spark/ScanHelper.scala | 16 +- 20 files changed, 485 insertions(+), 124 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index f0dc463012ba..6413cd088ede 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -143,6 +143,7 @@ protected IndexManifestFile.Factory indexManifestFileFactory() { public IndexFileHandler newIndexFileHandler() { return new IndexFileHandler( snapshotManager(), + pathFactory().indexFileFactory(), indexManifestFileFactory().create(), new HashIndexFile(fileIO, pathFactory().indexFileFactory()), new DeletionVectorsIndexFile(fileIO, pathFactory().indexFileFactory())); diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index 21e6ac60e69c..3f4023b9d6de 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -130,9 +130,7 @@ public KeyValueFileStoreRead newRead() { newKeyComparator(), userDefinedSeqComparator(), mfFactory, - newReaderFactoryBuilder(), - options, - newIndexFileHandler()); + newReaderFactoryBuilder()); } public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() { diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java index 32d6da8617f5..3bba07506338 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java @@ -24,6 +24,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.Optional; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -39,6 +40,18 @@ public ApplyDeletionVectorReader(RecordReader reader, DeletionVector deletion this.deletionVector = deletionVector; } + public static RecordReader create(RecordReader reader, Optional dv) { + return create(reader, dv.orElse(null)); + } + + public static RecordReader create(RecordReader reader, @Nullable DeletionVector dv) { + if (dv == null) { + return reader; + } + + return new ApplyDeletionVectorReader<>(reader, dv); + } + @Nullable @Override public RecordIterator readBatch() throws IOException { diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java index c30df42abe42..50aa8f64ea09 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java @@ -18,6 +18,11 @@ package org.apache.paimon.deletionvectors; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.table.source.DeletionFile; + import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; @@ -91,4 +96,26 @@ static DeletionVector deserializeFromBytes(byte[] bytes) { throw new RuntimeException("Unable to deserialize deletion vector", e); } } + + static DeletionVector read(FileIO fileIO, DeletionFile deletionFile) throws IOException { + Path path = new Path(deletionFile.path()); + try (SeekableInputStream input = fileIO.newInputStream(path)) { + input.seek(deletionFile.offset()); + DataInputStream dis = new DataInputStream(input); + int actualLength = dis.readInt(); + if (actualLength != deletionFile.length()) { + throw new RuntimeException( + "Size not match, actual size: " + + actualLength + + ", expert size: " + + deletionFile.length()); + } + int magicNum = dis.readInt(); + if (magicNum == BitmapDeletionVector.MAGIC_NUMBER) { + return BitmapDeletionVector.deserializeFromDataInput(dis); + } else { + throw new RuntimeException("Invalid magic number: " + magicNum); + } + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java index 878c76841d6a..c014037d195a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java @@ -31,6 +31,8 @@ import java.util.Map; import java.util.Optional; +import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; + /** Maintainer of deletionVectors index. */ public class DeletionVectorsMaintainer { @@ -48,11 +50,7 @@ private DeletionVectorsMaintainer( snapshotId == null ? null : fileHandler - .scan( - snapshotId, - DeletionVectorsIndexFile.DELETION_VECTORS_INDEX, - partition, - bucket) + .scan(snapshotId, DELETION_VECTORS_INDEX, partition, bucket) .orElse(null); this.deletionVectors = indexFile == null diff --git a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java index 4919f08f398f..91efb9b654c6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java @@ -37,6 +37,10 @@ public HashIndexFile(FileIO fileIO, PathFactory pathFactory) { super(fileIO, pathFactory); } + public Path path(String fileName) { + return pathFactory.toPath(fileName); + } + public IntIterator read(String fileName) throws IOException { return readInts(fileIO, pathFactory.toPath(fileName)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java index d460097f05b3..97cdbf43e781 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java @@ -22,10 +22,12 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile; +import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.IndexManifestFile; import org.apache.paimon.utils.IntIterator; import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.PathFactory; import org.apache.paimon.utils.SnapshotManager; import java.io.IOException; @@ -43,16 +45,19 @@ public class IndexFileHandler { private final SnapshotManager snapshotManager; + private final PathFactory pathFactory; private final IndexManifestFile indexManifestFile; private final HashIndexFile hashIndex; private final DeletionVectorsIndexFile deletionVectorsIndex; public IndexFileHandler( SnapshotManager snapshotManager, + PathFactory pathFactory, IndexManifestFile indexManifestFile, HashIndexFile hashIndex, DeletionVectorsIndexFile deletionVectorsIndex) { this.snapshotManager = snapshotManager; + this.pathFactory = pathFactory; this.indexManifestFile = indexManifestFile; this.hashIndex = hashIndex; this.deletionVectorsIndex = deletionVectorsIndex; @@ -102,6 +107,10 @@ public List scan(long snapshotId, String indexType, BinaryRo return result; } + public Path filePath(IndexFileMeta file) { + return pathFactory.toPath(file.fileName()); + } + public List readHashIndexList(IndexFileMeta file) { return IntIterator.toIntList(readHashIndex(file)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index 3fa19681e4c3..b946756aeee2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -282,5 +282,9 @@ private void applyProjection() { projectedKeyType = Projection.of(keyProjection).project(keyType); projectedValueType = Projection.of(valueProjection).project(valueType); } + + public FileIO fileIO() { + return fileIO; + } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java index e7fbbea45cec..f1e7a80a2975 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java @@ -23,9 +23,10 @@ import org.apache.paimon.KeyValueFileStore; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile; +import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader; +import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.disk.IOManager; -import org.apache.paimon.index.IndexFileHandler; +import org.apache.paimon.fs.FileIO; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.mergetree.DropDeleteReader; @@ -44,6 +45,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FieldsComparator; import org.apache.paimon.utils.ProjectedRow; @@ -67,6 +69,7 @@ public class KeyValueFileStoreRead implements FileStoreRead { private final TableSchema tableSchema; + private final FileIO fileIO; private final KeyValueFileReaderFactory.Builder readerFactoryBuilder; private final Comparator keyComparator; private final MergeFunctionFactory mfFactory; @@ -75,14 +78,12 @@ public class KeyValueFileStoreRead implements FileStoreRead { @Nullable private int[][] keyProjectedFields; - @Nullable private List filtersForOverlappedSection; + @Nullable private List filtersForKeys; - @Nullable private List filtersForNonOverlappedSection; + @Nullable private List filtersForAll; @Nullable private int[][] pushdownProjection; @Nullable private int[][] outerProjection; - private final CoreOptions options; - private final IndexFileHandler indexFileHandler; private boolean forceKeepDelete = false; @@ -94,19 +95,16 @@ public KeyValueFileStoreRead( Comparator keyComparator, @Nullable FieldsComparator userDefinedSeqComparator, MergeFunctionFactory mfFactory, - KeyValueFileReaderFactory.Builder readerFactoryBuilder, - CoreOptions options, - IndexFileHandler indexFileHandler) { + KeyValueFileReaderFactory.Builder readerFactoryBuilder) { this.tableSchema = schemaManager.schema(schemaId); this.readerFactoryBuilder = readerFactoryBuilder; + this.fileIO = readerFactoryBuilder.fileIO(); this.keyComparator = keyComparator; this.mfFactory = mfFactory; this.userDefinedSeqComparator = userDefinedSeqComparator; this.mergeSorter = new MergeSorter( CoreOptions.fromMap(tableSchema.options()), keyType, valueType, null); - this.options = options; - this.indexFileHandler = indexFileHandler; } public KeyValueFileStoreRead withKeyProjection(int[][] projectedFields) { @@ -166,8 +164,8 @@ public FileStoreRead withFilter(Predicate predicate) { // So for sections with overlapping runs, we only push down key filters. // For sections with only one run, as each key only appears once, it is OK to push down // value filters. - filtersForNonOverlappedSection = allFilters; - filtersForOverlappedSection = pkFilters; + filtersForAll = allFilters; + filtersForKeys = pkFilters; return this; } @@ -183,40 +181,59 @@ public RecordReader createReader(DataSplit split) throws IOException { private RecordReader createReaderWithoutOuterProjection(DataSplit split) throws IOException { - if (options.deletionVectorsEnabled()) { - indexFileHandler - .scan( - split.snapshotId(), - DeletionVectorsIndexFile.DELETION_VECTORS_INDEX, - split.partition(), - split.bucket()) - .ifPresent( - fileMeta -> - readerFactoryBuilder.withDeletionVectorSupplier( - filename -> - indexFileHandler.readDeletionVector( - fileMeta, filename))); + ReaderSupplier beforeSupplier = null; + if (split.beforeFiles().size() > 0) { + if (split.isStreaming() || split.beforeDeletionFiles().isPresent()) { + beforeSupplier = + () -> + new ReverseReader( + noMergeRead( + split.partition(), + split.bucket(), + split.beforeFiles(), + split.beforeDeletionFiles().orElse(null), + split.isStreaming())); + } else { + beforeSupplier = + () -> + mergeRead( + split.partition(), + split.bucket(), + split.beforeFiles(), + false); + } } + + ReaderSupplier dataSupplier; + if (split.isStreaming() || split.deletionFiles().isPresent()) { + dataSupplier = + () -> + noMergeRead( + split.partition(), + split.bucket(), + split.dataFiles(), + split.deletionFiles().orElse(null), + split.isStreaming()); + } else { + dataSupplier = + () -> + mergeRead( + split.partition(), + split.bucket(), + split.dataFiles(), + split.beforeFiles().isEmpty() && forceKeepDelete); + } + if (split.isStreaming()) { - KeyValueFileReaderFactory readerFactory = - readerFactoryBuilder.build( - split.partition(), split.bucket(), true, filtersForOverlappedSection); - ReaderSupplier beforeSupplier = - () -> new ReverseReader(streamingConcat(split.beforeFiles(), readerFactory)); - ReaderSupplier dataSupplier = - () -> streamingConcat(split.dataFiles(), readerFactory); - return split.beforeFiles().isEmpty() + return beforeSupplier == null ? dataSupplier.get() : ConcatRecordReader.create(Arrays.asList(beforeSupplier, dataSupplier)); } else { - return split.beforeFiles().isEmpty() - ? batchMergeRead( - split.partition(), split.bucket(), split.dataFiles(), forceKeepDelete) + return beforeSupplier == null + ? dataSupplier.get() : DiffReader.readDiff( - batchMergeRead( - split.partition(), split.bucket(), split.beforeFiles(), false), - batchMergeRead( - split.partition(), split.bucket(), split.dataFiles(), false), + beforeSupplier.get(), + dataSupplier.get(), keyComparator, userDefinedSeqComparator, mergeSorter, @@ -224,40 +241,33 @@ private RecordReader createReaderWithoutOuterProjection(DataSplit spli } } - private RecordReader batchMergeRead( + private RecordReader mergeRead( BinaryRow partition, int bucket, List files, boolean keepDelete) throws IOException { // Sections are read by SortMergeReader, which sorts and merges records by keys. // So we cannot project keys or else the sorting will be incorrect. KeyValueFileReaderFactory overlappedSectionFactory = - readerFactoryBuilder.build(partition, bucket, false, filtersForOverlappedSection); + readerFactoryBuilder.build(partition, bucket, false, filtersForKeys); KeyValueFileReaderFactory nonOverlappedSectionFactory = - readerFactoryBuilder.build( - partition, bucket, false, filtersForNonOverlappedSection); + readerFactoryBuilder.build(partition, bucket, false, filtersForAll); - RecordReader reader; - if (options.deletionVectorsEnabled()) { - reader = streamingConcat(files, nonOverlappedSectionFactory); - } else { - List> sectionReaders = new ArrayList<>(); - MergeFunctionWrapper mergeFuncWrapper = - new ReducerMergeFunctionWrapper(mfFactory.create(pushdownProjection)); - for (List section : - new IntervalPartition(files, keyComparator).partition()) { - sectionReaders.add( - () -> - MergeTreeReaders.readerForSection( - section, - section.size() > 1 - ? overlappedSectionFactory - : nonOverlappedSectionFactory, - keyComparator, - userDefinedSeqComparator, - mergeFuncWrapper, - mergeSorter)); - } - reader = ConcatRecordReader.create(sectionReaders); + List> sectionReaders = new ArrayList<>(); + MergeFunctionWrapper mergeFuncWrapper = + new ReducerMergeFunctionWrapper(mfFactory.create(pushdownProjection)); + for (List section : new IntervalPartition(files, keyComparator).partition()) { + sectionReaders.add( + () -> + MergeTreeReaders.readerForSection( + section, + section.size() > 1 + ? overlappedSectionFactory + : nonOverlappedSectionFactory, + keyComparator, + userDefinedSeqComparator, + mergeFuncWrapper, + mergeSorter)); } + RecordReader reader = ConcatRecordReader.create(sectionReaders); if (!keepDelete) { reader = new DropDeleteReader(reader); @@ -267,17 +277,34 @@ private RecordReader batchMergeRead( return keyProjectedFields == null ? reader : projectKey(reader, keyProjectedFields); } - private RecordReader streamingConcat( - List files, KeyValueFileReaderFactory readerFactory) throws IOException { + private RecordReader noMergeRead( + BinaryRow partition, + int bucket, + List files, + @Nullable List deletionFiles, + boolean onlyFilterKey) + throws IOException { + KeyValueFileReaderFactory readerFactory = + readerFactoryBuilder.build( + partition, bucket, true, onlyFilterKey ? filtersForKeys : filtersForAll); List> suppliers = new ArrayList<>(); - for (DataFileMeta file : files) { + for (int i = 0; i < files.size(); i++) { + DataFileMeta file = files.get(i); + DeletionFile deletionFile = deletionFiles == null ? null : deletionFiles.get(i); suppliers.add( () -> { // We need to check extraFiles to be compatible with Paimon 0.2. // See comments on DataFileMeta#extraFiles. String fileName = changelogFile(file).orElse(file.fileName()); - return readerFactory.createRecordReader( - file.schemaId(), fileName, file.fileSize(), file.level()); + RecordReader reader = + readerFactory.createRecordReader( + file.schemaId(), fileName, file.fileSize(), file.level()); + if (deletionFile != null) { + DeletionVector deletionVector = + DeletionVector.read(fileIO, deletionFile); + reader = ApplyDeletionVectorReader.create(reader, deletionVector); + } + return reader; }); } return ConcatRecordReader.create(suppliers); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index ce4a27813923..441f8f841003 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -252,10 +252,11 @@ private MergeTreeCompactRewriter createRewriter( Comparator keyComparator, @Nullable FieldsComparator userDefinedSeqComparator, Levels levels, - @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) { - if (deletionVectorsMaintainer != null) { - readerFactoryBuilder.withDeletionVectorSupplier( - deletionVectorsMaintainer::deletionVectorOf); + @Nullable DeletionVectorsMaintainer dvMaintainer) { + KeyValueFileReaderFactory.Builder readerFactoryBuilder = + this.readerFactoryBuilder.copyWithoutProjection(); + if (dvMaintainer != null) { + readerFactoryBuilder.withDeletionVectorSupplier(dvMaintainer::deletionVectorOf); } KeyValueFileReaderFactory readerFactory = readerFactoryBuilder.build(partition, bucket); KeyValueFileWriterFactory writerFactory = @@ -288,7 +289,6 @@ private MergeTreeCompactRewriter createRewriter( } lookupReaderFactory = readerFactoryBuilder - .copyWithoutProjection() .withValueProjection(new int[0][]) .build(partition, bucket); processor = new ContainsValueProcessor(); @@ -317,7 +317,7 @@ private MergeTreeCompactRewriter createRewriter( mergeSorter, wrapperFactory, lookupStrategy.produceChangelog, - deletionVectorsMaintainer); + dvMaintainer); } else { return new MergeTreeCompactRewriter( readerFactory, diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index c2756fbfc573..39b368982ae1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -149,7 +149,8 @@ public SnapshotReader newSnapshotReader(String branchName) { nonPartitionFilterConsumer(), DefaultValueAssigner.create(tableSchema), store().pathFactory(), - name()); + name(), + store().newIndexFileHandler()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index 781e71e86865..7b30fb832ef0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -119,7 +119,8 @@ protected SplitGenerator splitGenerator() { return new MergeTreeSplitGenerator( store().newKeyComparator(), store().options().splitTargetSize(), - store().options().splitOpenFileCost()); + store().options().splitOpenFileCost(), + store().options().deletionVectorsEnabled()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index 047841837e6d..fbd5f723e975 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -27,6 +27,8 @@ import org.apache.paimon.io.DataOutputViewStreamWrapper; import org.apache.paimon.utils.SerializationUtils; +import javax.annotation.Nullable; + import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; @@ -42,15 +44,17 @@ /** Input splits. Needed by most batch computation engines. */ public class DataSplit implements Split { - private static final long serialVersionUID = 5L; + private static final long serialVersionUID = 6L; private long snapshotId = 0; private boolean isStreaming = false; private List beforeFiles = new ArrayList<>(); + @Nullable private List beforeDeletionFiles; private BinaryRow partition; private int bucket = -1; private List dataFiles; + @Nullable private List dataDeletionFiles; private List rawFiles = Collections.emptyList(); @@ -72,10 +76,19 @@ public List beforeFiles() { return beforeFiles; } + public Optional> beforeDeletionFiles() { + return Optional.ofNullable(beforeDeletionFiles); + } + public List dataFiles() { return dataFiles; } + @Override + public Optional> deletionFiles() { + return Optional.ofNullable(dataDeletionFiles); + } + public boolean isStreaming() { return isStreaming; } @@ -114,14 +127,24 @@ public boolean equals(Object o) { return bucket == split.bucket && Objects.equals(partition, split.partition) && Objects.equals(beforeFiles, split.beforeFiles) + && Objects.equals(beforeDeletionFiles, split.beforeDeletionFiles) && Objects.equals(dataFiles, split.dataFiles) + && Objects.equals(dataDeletionFiles, split.dataDeletionFiles) && isStreaming == split.isStreaming && Objects.equals(rawFiles, split.rawFiles); } @Override public int hashCode() { - return Objects.hash(partition, bucket, beforeFiles, dataFiles, isStreaming, rawFiles); + return Objects.hash( + partition, + bucket, + beforeFiles, + beforeDeletionFiles, + dataFiles, + dataDeletionFiles, + isStreaming, + rawFiles); } private void writeObject(ObjectOutputStream out) throws IOException { @@ -137,7 +160,9 @@ private void assign(DataSplit other) { this.partition = other.partition; this.bucket = other.bucket; this.beforeFiles = other.beforeFiles; + this.beforeDeletionFiles = other.beforeDeletionFiles; this.dataFiles = other.dataFiles; + this.dataDeletionFiles = other.dataDeletionFiles; this.isStreaming = other.isStreaming; this.rawFiles = other.rawFiles; } @@ -153,11 +178,15 @@ public void serialize(DataOutputView out) throws IOException { dataFileSer.serialize(file, out); } + DeletionFile.serializeList(out, beforeDeletionFiles); + out.writeInt(dataFiles.size()); for (DataFileMeta file : dataFiles) { dataFileSer.serialize(file, out); } + DeletionFile.serializeList(out, dataDeletionFiles); + out.writeBoolean(isStreaming); out.writeInt(rawFiles.size()); @@ -178,12 +207,16 @@ public static DataSplit deserialize(DataInputView in) throws IOException { beforeFiles.add(dataFileSer.deserialize(in)); } + List beforeDeletionFiles = DeletionFile.deserializeList(in); + int fileNumber = in.readInt(); List dataFiles = new ArrayList<>(fileNumber); for (int i = 0; i < fileNumber; i++) { dataFiles.add(dataFileSer.deserialize(in)); } + List dataDeletionFiles = DeletionFile.deserializeList(in); + boolean isStreaming = in.readBoolean(); int rawFileNum = in.readInt(); @@ -192,15 +225,22 @@ public static DataSplit deserialize(DataInputView in) throws IOException { rawFiles.add(RawFile.deserialize(in)); } - return builder() - .withSnapshot(snapshotId) - .withPartition(partition) - .withBucket(bucket) - .withBeforeFiles(beforeFiles) - .withDataFiles(dataFiles) - .isStreaming(isStreaming) - .rawFiles(rawFiles) - .build(); + DataSplit.Builder builder = + builder() + .withSnapshot(snapshotId) + .withPartition(partition) + .withBucket(bucket) + .withBeforeFiles(beforeFiles) + .withDataFiles(dataFiles) + .isStreaming(isStreaming) + .rawFiles(rawFiles); + if (beforeDeletionFiles != null) { + builder.withBeforeDeletionFiles(beforeDeletionFiles); + } + if (dataDeletionFiles != null) { + builder.withDataDeletionFiles(dataDeletionFiles); + } + return builder.build(); } public static Builder builder() { @@ -228,12 +268,22 @@ public Builder withBucket(int bucket) { } public Builder withBeforeFiles(List beforeFiles) { - this.split.beforeFiles = beforeFiles; + this.split.beforeFiles = new ArrayList<>(beforeFiles); + return this; + } + + public Builder withBeforeDeletionFiles(List beforeDeletionFiles) { + this.split.beforeDeletionFiles = new ArrayList<>(beforeDeletionFiles); return this; } public Builder withDataFiles(List dataFiles) { - this.split.dataFiles = dataFiles; + this.split.dataFiles = new ArrayList<>(dataFiles); + return this; + } + + public Builder withDataDeletionFiles(List dataDeletionFiles) { + this.split.dataDeletionFiles = new ArrayList<>(dataDeletionFiles); return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java new file mode 100644 index 000000000000..5f2d47164c44 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source; + +import org.apache.paimon.annotation.Public; +import org.apache.paimon.io.DataInputView; +import org.apache.paimon.io.DataOutputView; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Deletion file for data file, the first 4 bytes are length, should, the following is the bitmap + * content. + * + *
    + *
  • The first 4 bytes are length, should equal to {@link #length()}. + *
  • Next 4 bytes are the magic number, should be equal to 1581511376. + *
  • The remaining content should be a RoaringBitmap. + *
+ */ +@Public +public class DeletionFile { + + private final String path; + private final long offset; + private final long length; + + public DeletionFile(String path, long offset, long length) { + this.path = path; + this.offset = offset; + this.length = length; + } + + /** Path of the file. */ + public String path() { + return path; + } + + /** Starting offset of data in the file. */ + public long offset() { + return offset; + } + + /** Length of data in the file. */ + public long length() { + return length; + } + + public static void serialize(DataOutputView out, @Nullable DeletionFile file) + throws IOException { + if (file == null) { + out.write(0); + } else { + out.write(1); + out.writeUTF(file.path); + out.writeLong(file.offset); + out.writeLong(file.length); + } + } + + public static void serializeList(DataOutputView out, @Nullable List files) + throws IOException { + if (files == null) { + out.write(0); + } else { + out.write(1); + out.writeInt(files.size()); + for (DeletionFile file : files) { + serialize(out, file); + } + } + } + + @Nullable + public static DeletionFile deserialize(DataInputView in) throws IOException { + if (in.readByte() == 0) { + return null; + } + + String path = in.readUTF(); + long offset = in.readLong(); + long length = in.readLong(); + return new DeletionFile(path, offset, length); + } + + @Nullable + public static List deserializeList(DataInputView in) throws IOException { + List files = null; + if (in.readByte() == 1) { + int size = in.readInt(); + files = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + files.add(DeletionFile.deserialize(in)); + } + } + return files; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof DeletionFile)) { + return false; + } + + DeletionFile other = (DeletionFile) o; + return Objects.equals(path, other.path) && offset == other.offset && length == other.length; + } + + @Override + public int hashCode() { + return Objects.hash(path, offset, length); + } + + @Override + public String toString() { + return String.format("{path = %s, offset = %d, length = %d}", path, offset, length); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java b/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java index cd024e1e7048..7cf1ed24d9c8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/MergeTreeSplitGenerator.java @@ -40,15 +40,27 @@ public class MergeTreeSplitGenerator implements SplitGenerator { private final long openFileCost; + private final boolean deletionVectorsEnabled; + public MergeTreeSplitGenerator( - Comparator keyComparator, long targetSplitSize, long openFileCost) { + Comparator keyComparator, + long targetSplitSize, + long openFileCost, + boolean deletionVectorsEnabled) { this.keyComparator = keyComparator; this.targetSplitSize = targetSplitSize; this.openFileCost = openFileCost; + this.deletionVectorsEnabled = deletionVectorsEnabled; } @Override public List> splitForBatch(List files) { + if (deletionVectorsEnabled) { + Function weightFunc = + file -> Math.max(file.fileSize(), openFileCost); + return BinPacking.packForOrdered(files, weightFunc, targetSplitSize); + } + /* * The generator aims to parallel the scan execution by slicing the files of each bucket * into multiple splits. The generation has one constraint: files with intersected key diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java b/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java index ee96943a4f19..adefb868fc9d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java @@ -42,4 +42,13 @@ public interface Split extends Serializable { default Optional> convertToRawFiles() { return Optional.empty(); } + + /** + * Return the deletion file of the data file, indicating which row in the data file was deleted. + * + *

If there is no corresponding deletion file, the element will be null. + */ + default Optional> deletionFiles() { + return Optional.empty(); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 82b68b6e6792..241f42395628 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -24,6 +24,8 @@ import org.apache.paimon.codegen.RecordComparator; import org.apache.paimon.consumer.ConsumerManager; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.index.IndexFileHandler; +import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.ManifestEntry; @@ -35,6 +37,7 @@ import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.table.source.RawFile; import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.table.source.Split; @@ -42,6 +45,7 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Filter; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TypeUtils; @@ -59,6 +63,7 @@ import java.util.function.BiConsumer; import java.util.stream.Collectors; +import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.apache.paimon.operation.FileStoreScan.Plan.groupByPartFiles; import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping; @@ -68,18 +73,19 @@ public class SnapshotReaderImpl implements SnapshotReader { private final FileStoreScan scan; private final TableSchema tableSchema; private final CoreOptions options; + private final boolean deletionVectors; private final SnapshotManager snapshotManager; private final ConsumerManager consumerManager; private final SplitGenerator splitGenerator; private final BiConsumer nonPartitionFilterConsumer; private final DefaultValueAssigner defaultValueAssigner; private final FileStorePathFactory pathFactory; + private final String tableName; + private final IndexFileHandler indexFileHandler; private ScanMode scanMode = ScanMode.ALL; private RecordComparator lazyPartitionComparator; - private final String tableName; - public SnapshotReaderImpl( FileStoreScan scan, TableSchema tableSchema, @@ -89,10 +95,12 @@ public SnapshotReaderImpl( BiConsumer nonPartitionFilterConsumer, DefaultValueAssigner defaultValueAssigner, FileStorePathFactory pathFactory, - String tableName) { + String tableName, + IndexFileHandler indexFileHandler) { this.scan = scan; this.tableSchema = tableSchema; this.options = options; + this.deletionVectors = options.deletionVectorsEnabled(); this.snapshotManager = snapshotManager; this.consumerManager = new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath()); @@ -102,6 +110,7 @@ public SnapshotReaderImpl( this.pathFactory = pathFactory; this.tableName = tableName; + this.indexFileHandler = indexFileHandler; } @Override @@ -287,10 +296,18 @@ private List generateSplits( ? splitGenerator.splitForStreaming(bucketFiles) : splitGenerator.splitForBatch(bucketFiles); for (List dataFiles : splitGroups) { - splits.add( - builder.withDataFiles(dataFiles) - .rawFiles(convertToRawFiles(partition, bucket, dataFiles)) - .build()); + builder.withDataFiles(dataFiles) + .rawFiles(convertToRawFiles(partition, bucket, dataFiles)); + if (deletionVectors) { + IndexFileMeta deletionIndexFile = + indexFileHandler + .scan(snapshotId, DELETION_VECTORS_INDEX, partition, bucket) + .orElse(null); + builder.withDataDeletionFiles( + getDeletionFiles(dataFiles, deletionIndexFile)); + } + + splits.add(builder.build()); } } } @@ -324,12 +341,13 @@ public Plan readChanges() { Map>> dataFiles = groupByPartFiles(plan.files(FileKind.ADD)); - return toChangesPlan(true, plan, beforeFiles, dataFiles); + return toChangesPlan(true, plan, plan.snapshotId() - 1, beforeFiles, dataFiles); } private Plan toChangesPlan( boolean isStreaming, FileStoreScan.Plan plan, + long beforeSnapshotId, Map>> beforeFiles, Map>> dataFiles) { List splits = new ArrayList<>(); @@ -358,7 +376,7 @@ private Plan toChangesPlan( // deduplicate before.removeIf(data::remove); - DataSplit split = + DataSplit.Builder builder = DataSplit.builder() .withSnapshot(plan.snapshotId()) .withPartition(part) @@ -366,9 +384,20 @@ private Plan toChangesPlan( .withBeforeFiles(before) .withDataFiles(data) .isStreaming(isStreaming) - .rawFiles(convertToRawFiles(part, bucket, data)) - .build(); - splits.add(split); + .rawFiles(convertToRawFiles(part, bucket, data)); + if (deletionVectors) { + IndexFileMeta beforeDeletionIndex = + indexFileHandler + .scan(beforeSnapshotId, DELETION_VECTORS_INDEX, part, bucket) + .orElse(null); + IndexFileMeta deletionIndex = + indexFileHandler + .scan(plan.snapshotId(), DELETION_VECTORS_INDEX, part, bucket) + .orElse(null); + builder.withBeforeDeletionFiles(getDeletionFiles(before, beforeDeletionIndex)); + builder.withDataDeletionFiles(getDeletionFiles(data, deletionIndex)); + } + splits.add(builder.build()); } } @@ -400,7 +429,7 @@ public Plan readIncrementalDiff(Snapshot before) { groupByPartFiles(plan.files(FileKind.ADD)); Map>> beforeFiles = groupByPartFiles(scan.withSnapshot(before).plan().files(FileKind.ADD)); - return toChangesPlan(false, plan, beforeFiles, dataFiles); + return toChangesPlan(false, plan, before.id(), beforeFiles, dataFiles); } private RecordComparator partitionComparator() { @@ -413,12 +442,35 @@ private RecordComparator partitionComparator() { return lazyPartitionComparator; } + private List getDeletionFiles( + List dataFiles, @Nullable IndexFileMeta indexFileMeta) { + List deletionFiles = new ArrayList<>(dataFiles.size()); + Map> deletionRanges = + indexFileMeta == null ? null : indexFileMeta.deletionVectorsRanges(); + for (DataFileMeta file : dataFiles) { + if (deletionRanges != null) { + Pair range = deletionRanges.get(file.fileName()); + if (range != null) { + deletionFiles.add( + new DeletionFile( + indexFileHandler.filePath(indexFileMeta).toString(), + range.getKey(), + range.getValue())); + continue; + } + } + deletionFiles.add(null); + } + + return deletionFiles; + } + private List convertToRawFiles( BinaryRow partition, int bucket, List dataFiles) { String bucketPath = pathFactory.bucketPath(partition, bucket).toString(); - // append only files can be returned - if (tableSchema.primaryKeys().isEmpty()) { + // append only or deletionVectors files can be returned + if (tableSchema.primaryKeys().isEmpty() || deletionVectors) { return makeRawTableFiles(bucketPath, dataFiles); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java index 1d0bb649f6e1..663d0dad8922 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java @@ -105,11 +105,17 @@ public void testMergeTree() { fromMinMax("5", 82, 85), fromMinMax("6", 100, 200)); Comparator comparator = Comparator.comparingInt(o -> o.getInt(0)); - assertThat(toNames(new MergeTreeSplitGenerator(comparator, 100, 2).splitForBatch(files))) + assertThat( + toNames( + new MergeTreeSplitGenerator(comparator, 100, 2, false) + .splitForBatch(files))) .containsExactlyInAnyOrder( Arrays.asList("1", "2", "4", "3", "5"), Collections.singletonList("6")); - assertThat(toNames(new MergeTreeSplitGenerator(comparator, 100, 30).splitForBatch(files))) + assertThat( + toNames( + new MergeTreeSplitGenerator(comparator, 100, 30, false) + .splitForBatch(files))) .containsExactlyInAnyOrder( Arrays.asList("1", "2", "4", "3"), Collections.singletonList("5"), diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index 244449f9baa1..bbd272547fc9 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -138,9 +138,7 @@ private TableRead createRead( ignore -> avro, pathFactory, EXTRACTOR, - new CoreOptions(new HashMap<>())), - new CoreOptions(new HashMap<>()), - null); + new CoreOptions(new HashMap<>()))); return new KeyValueTableRead(read, null) { @Override diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala index 7b49e9de9a42..01a2ca165411 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala @@ -20,7 +20,7 @@ package org.apache.paimon.spark import org.apache.paimon.CoreOptions import org.apache.paimon.io.DataFileMeta -import org.apache.paimon.table.source.{DataSplit, RawFile, Split} +import org.apache.paimon.table.source.{DataSplit, DeletionFile, RawFile, Split} import org.apache.spark.sql.SparkSession @@ -33,6 +33,8 @@ trait ScanHelper { val coreOptions: CoreOptions + private lazy val deletionVectors: Boolean = coreOptions.deletionVectorsEnabled() + private lazy val openCostInBytes: Long = coreOptions.splitOpenFileCost() private lazy val leafNodeDefaultParallelism: Int = { @@ -61,15 +63,18 @@ trait ScanHelper { var currentSplit: Option[DataSplit] = None val currentDataFiles = new ArrayBuffer[DataFileMeta] + val currentDeletionFiles = new ArrayBuffer[DeletionFile] val currentRawFiles = new ArrayBuffer[RawFile] var currentSize = 0L def closeDataSplit(): Unit = { if (currentSplit.nonEmpty && currentDataFiles.nonEmpty) { - val newSplit = copyDataSplit(currentSplit.get, currentDataFiles, currentRawFiles) + val newSplit = + copyDataSplit(currentSplit.get, currentDataFiles, currentDeletionFiles, currentRawFiles) newSplits += newSplit } currentDataFiles.clear() + currentDeletionFiles.clear() currentRawFiles.clear() currentSize = 0 } @@ -86,6 +91,9 @@ trait ScanHelper { } currentSize += file.fileSize + openCostInBytes currentDataFiles += file + if (deletionVectors) { + currentDeletionFiles += split.deletionFiles().get().get(idx) + } if (hasRawFiles) { currentRawFiles += split.convertToRawFiles().get().get(idx) } @@ -107,6 +115,7 @@ trait ScanHelper { private def copyDataSplit( split: DataSplit, dataFiles: Seq[DataFileMeta], + deletionFiles: Seq[DeletionFile], rawFiles: Seq[RawFile]): DataSplit = { val builder = DataSplit .builder() @@ -115,6 +124,9 @@ trait ScanHelper { .withBucket(split.bucket()) .withDataFiles(dataFiles.toList.asJava) .rawFiles(rawFiles.toList.asJava) + if (deletionVectors) { + builder.withDataDeletionFiles(deletionFiles.toList.asJava) + } builder.build() } From 339ef74eaf78ef461e13ce807e640410a6d5ac78 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Tue, 12 Mar 2024 13:49:40 +0800 Subject: [PATCH 2/2] unify deletion vector apply --- .../deletionvectors/DeletionVector.java | 45 +++++++++++++++++++ .../paimon/io/KeyValueFileReaderFactory.java | 34 +++++--------- .../operation/KeyValueFileStoreRead.java | 29 ++++++------ .../operation/KeyValueFileStoreWrite.java | 10 ++--- .../paimon/table/query/LocalTableQuery.java | 5 ++- .../snapshot/IncrementalStartingScanner.java | 1 + .../paimon/io/KeyValueFileReadWriteTest.java | 3 +- .../paimon/mergetree/ContainsLevelsTest.java | 3 +- .../paimon/mergetree/LookupLevelsTest.java | 3 +- .../paimon/mergetree/MergeTreeTestBase.java | 7 ++- 10 files changed, 90 insertions(+), 50 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java index 50aa8f64ea09..be20d3891410 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java @@ -21,11 +21,18 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.table.source.DeletionFile; +import javax.annotation.Nullable; + import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; /** * The DeletionVector can efficiently record the positions of rows that are deleted in a file, which @@ -118,4 +125,42 @@ static DeletionVector read(FileIO fileIO, DeletionFile deletionFile) throws IOEx } } } + + static Factory emptyFactory() { + return fileName -> Optional.empty(); + } + + static Factory factory(@Nullable DeletionVectorsMaintainer dvMaintainer) { + if (dvMaintainer == null) { + return emptyFactory(); + } + return dvMaintainer::deletionVectorOf; + } + + static Factory factory( + FileIO fileIO, List files, @Nullable List deletionFiles) { + if (deletionFiles == null) { + return emptyFactory(); + } + Map fileToDeletion = new HashMap<>(); + for (int i = 0; i < files.size(); i++) { + DeletionFile deletionFile = deletionFiles.get(i); + if (deletionFile != null) { + fileToDeletion.put(files.get(i).fileName(), deletionFile); + } + } + return fileName -> { + DeletionFile deletionFile = fileToDeletion.get(fileName); + if (deletionFile == null) { + return Optional.empty(); + } + + return Optional.of(DeletionVector.read(fileIO, deletionFile)); + }; + } + + /** Interface to create {@link DeletionVector}. */ + interface Factory { + Optional create(String fileName) throws IOException; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index b946756aeee2..63fef31fc142 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -45,7 +45,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.Function; import java.util.function.Supplier; /** Factory to create {@link RecordReader}s for reading {@link KeyValue} files. */ @@ -63,9 +62,7 @@ public class KeyValueFileReaderFactory { private final Map bulkFormatMappings; private final BinaryRow partition; - - // FileName to its corresponding deletion vector - private final @Nullable Function> deletionVectorSupplier; + private final DeletionVector.Factory dvFactory; private KeyValueFileReaderFactory( FileIO fileIO, @@ -77,7 +74,7 @@ private KeyValueFileReaderFactory( DataFilePathFactory pathFactory, long asyncThreshold, BinaryRow partition, - @Nullable Function> deletionVectorSupplier) { + DeletionVector.Factory dvFactory) { this.fileIO = fileIO; this.schemaManager = schemaManager; this.schemaId = schemaId; @@ -88,7 +85,7 @@ private KeyValueFileReaderFactory( this.asyncThreshold = asyncThreshold; this.partition = partition; this.bulkFormatMappings = new HashMap<>(); - this.deletionVectorSupplier = deletionVectorSupplier; + this.dvFactory = dvFactory; } public RecordReader createRecordReader( @@ -134,13 +131,9 @@ private RecordReader createRecordReader( bulkFormatMapping.getIndexMapping(), bulkFormatMapping.getCastMapping(), PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition)); - if (deletionVectorSupplier != null) { - Optional optionalDeletionVector = - deletionVectorSupplier.apply(fileName); - if (optionalDeletionVector.isPresent() && !optionalDeletionVector.get().isEmpty()) { - recordReader = - new ApplyDeletionVectorReader<>(recordReader, optionalDeletionVector.get()); - } + Optional deletionVector = dvFactory.create(fileName); + if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) { + recordReader = new ApplyDeletionVectorReader<>(recordReader, deletionVector.get()); } return recordReader; } @@ -185,7 +178,6 @@ public static class Builder { private int[][] valueProjection; private RowType projectedKeyType; private RowType projectedValueType; - private @Nullable Function> deletionVectorSupplier; private Builder( FileIO fileIO, @@ -238,12 +230,6 @@ public Builder withValueProjection(int[][] projection) { return this; } - public Builder withDeletionVectorSupplier( - Function> deletionVectorSupplier) { - this.deletionVectorSupplier = deletionVectorSupplier; - return this; - } - public RowType keyType() { return keyType; } @@ -252,13 +238,15 @@ public RowType projectedValueType() { return projectedValueType; } - public KeyValueFileReaderFactory build(BinaryRow partition, int bucket) { - return build(partition, bucket, true, Collections.emptyList()); + public KeyValueFileReaderFactory build( + BinaryRow partition, int bucket, DeletionVector.Factory dvFactory) { + return build(partition, bucket, dvFactory, true, Collections.emptyList()); } public KeyValueFileReaderFactory build( BinaryRow partition, int bucket, + DeletionVector.Factory dvFactory, boolean projectKeys, @Nullable List filters) { int[][] keyProjection = projectKeys ? this.keyProjection : fullKeyProjection; @@ -275,7 +263,7 @@ public KeyValueFileReaderFactory build( pathFactory.createDataFilePathFactory(partition, bucket), options.fileReaderAsyncThreshold().getBytes(), partition, - deletionVectorSupplier); + dvFactory); } private void applyProjection() { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java index f1e7a80a2975..a166a8526aa4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java @@ -23,7 +23,6 @@ import org.apache.paimon.KeyValueFileStore; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.disk.IOManager; import org.apache.paimon.fs.FileIO; @@ -247,9 +246,11 @@ private RecordReader mergeRead( // Sections are read by SortMergeReader, which sorts and merges records by keys. // So we cannot project keys or else the sorting will be incorrect. KeyValueFileReaderFactory overlappedSectionFactory = - readerFactoryBuilder.build(partition, bucket, false, filtersForKeys); + readerFactoryBuilder.build( + partition, bucket, DeletionVector.emptyFactory(), false, filtersForKeys); KeyValueFileReaderFactory nonOverlappedSectionFactory = - readerFactoryBuilder.build(partition, bucket, false, filtersForAll); + readerFactoryBuilder.build( + partition, bucket, DeletionVector.emptyFactory(), false, filtersForAll); List> sectionReaders = new ArrayList<>(); MergeFunctionWrapper mergeFuncWrapper = @@ -284,27 +285,23 @@ private RecordReader noMergeRead( @Nullable List deletionFiles, boolean onlyFilterKey) throws IOException { + DeletionVector.Factory dvFactory = DeletionVector.factory(fileIO, files, deletionFiles); KeyValueFileReaderFactory readerFactory = readerFactoryBuilder.build( - partition, bucket, true, onlyFilterKey ? filtersForKeys : filtersForAll); + partition, + bucket, + dvFactory, + true, + onlyFilterKey ? filtersForKeys : filtersForAll); List> suppliers = new ArrayList<>(); - for (int i = 0; i < files.size(); i++) { - DataFileMeta file = files.get(i); - DeletionFile deletionFile = deletionFiles == null ? null : deletionFiles.get(i); + for (DataFileMeta file : files) { suppliers.add( () -> { // We need to check extraFiles to be compatible with Paimon 0.2. // See comments on DataFileMeta#extraFiles. String fileName = changelogFile(file).orElse(file.fileName()); - RecordReader reader = - readerFactory.createRecordReader( - file.schemaId(), fileName, file.fileSize(), file.level()); - if (deletionFile != null) { - DeletionVector deletionVector = - DeletionVector.read(fileIO, deletionFile); - reader = ApplyDeletionVectorReader.create(reader, deletionVector); - } - return reader; + return readerFactory.createRecordReader( + file.schemaId(), fileName, file.fileSize(), file.level()); }); } return ConcatRecordReader.create(suppliers); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 441f8f841003..41b77d612885 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -29,6 +29,7 @@ import org.apache.paimon.compact.NoopCompactManager; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.fs.FileIO; @@ -255,10 +256,9 @@ private MergeTreeCompactRewriter createRewriter( @Nullable DeletionVectorsMaintainer dvMaintainer) { KeyValueFileReaderFactory.Builder readerFactoryBuilder = this.readerFactoryBuilder.copyWithoutProjection(); - if (dvMaintainer != null) { - readerFactoryBuilder.withDeletionVectorSupplier(dvMaintainer::deletionVectorOf); - } - KeyValueFileReaderFactory readerFactory = readerFactoryBuilder.build(partition, bucket); + DeletionVector.Factory dvFactory = DeletionVector.factory(dvMaintainer); + KeyValueFileReaderFactory readerFactory = + readerFactoryBuilder.build(partition, bucket, dvFactory); KeyValueFileWriterFactory writerFactory = writerFactoryBuilder.build(partition, bucket, options); MergeSorter mergeSorter = new MergeSorter(options, keyType, valueType, ioManager); @@ -290,7 +290,7 @@ private MergeTreeCompactRewriter createRewriter( lookupReaderFactory = readerFactoryBuilder .withValueProjection(new int[0][]) - .build(partition, bucket); + .build(partition, bucket, dvFactory); processor = new ContainsValueProcessor(); wrapperFactory = new FirstRowMergeFunctionWrapperFactory(); } else { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index e10184fefc43..99ad909ff751 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -26,6 +26,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.data.serializer.InternalSerializers; +import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.disk.IOManager; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.KeyValueFileReaderFactory; @@ -123,7 +124,9 @@ public void refreshFiles( private void newLookupLevels(BinaryRow partition, int bucket, List dataFiles) { Levels levels = new Levels(keyComparatorSupplier.get(), dataFiles, options.numLevels()); - KeyValueFileReaderFactory factory = readerFactoryBuilder.build(partition, bucket); + // TODO pass DeletionVector factory + KeyValueFileReaderFactory factory = + readerFactoryBuilder.build(partition, bucket, DeletionVector.emptyFactory()); Options options = this.options.toConfiguration(); LookupLevels lookupLevels = new LookupLevels<>( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java index 024525c208ce..17df85832de0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java @@ -67,6 +67,7 @@ public Result scan(SnapshotReader reader) { int bucket = entry.getKey().getRight(); for (List files : reader.splitGenerator().splitForBatch(entry.getValue())) { + // TODO pass deletion files result.add( DataSplit.builder() .withSnapshot(endingSnapshotId) diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index df6110ce645c..05f260097a4d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -25,6 +25,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.format.FieldStats; import org.apache.paimon.format.FlushingFileFormat; import org.apache.paimon.fs.FileIO; @@ -294,7 +295,7 @@ private KeyValueFileReaderFactory createReaderFactory( if (valueProjection != null) { builder.withValueProjection(valueProjection); } - return builder.build(BinaryRow.EMPTY_ROW, 0); + return builder.build(BinaryRow.EMPTY_ROW, 0, DeletionVector.emptyFactory()); } private void assertData( diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java index 631a6e47f9dd..58d9dbe904d9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.format.FlushingFileFormat; import org.apache.paimon.fs.FileIOFinder; import org.apache.paimon.fs.Path; @@ -251,7 +252,7 @@ public List valueFields(TableSchema schema) { } }, new CoreOptions(new HashMap<>())); - return builder.build(BinaryRow.EMPTY_ROW, 0); + return builder.build(BinaryRow.EMPTY_ROW, 0, DeletionVector.emptyFactory()); } private SchemaManager createSchemaManager(Path path) { diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index e7df8fa8da65..00d8eeb5a8e4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.format.FlushingFileFormat; import org.apache.paimon.fs.FileIOFinder; import org.apache.paimon.fs.Path; @@ -331,7 +332,7 @@ public List valueFields(TableSchema schema) { } }, new CoreOptions(new HashMap<>())); - return builder.build(BinaryRow.EMPTY_ROW, 0); + return builder.build(BinaryRow.EMPTY_ROW, 0, DeletionVector.emptyFactory()); } private SchemaManager createSchemaManager(Path path) { diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index 8e8f315c95d9..0e28ce970b3d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -26,6 +26,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.FlushingFileFormat; import org.apache.paimon.fs.FileStatus; @@ -170,8 +171,10 @@ public List valueFields(TableSchema schema) { } }, new CoreOptions(new HashMap<>())); - readerFactory = readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0); - compactReaderFactory = readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0); + readerFactory = + readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, DeletionVector.emptyFactory()); + compactReaderFactory = + readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, DeletionVector.emptyFactory()); Map pathFactoryMap = new HashMap<>(); pathFactoryMap.put(identifier, pathFactory);