Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce deletion files to DataSplit #2988

Merged
merged 2 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ protected IndexManifestFile.Factory indexManifestFileFactory() {
public IndexFileHandler newIndexFileHandler() {
return new IndexFileHandler(
snapshotManager(),
pathFactory().indexFileFactory(),
indexManifestFileFactory().create(),
new HashIndexFile(fileIO, pathFactory().indexFileFactory()),
new DeletionVectorsIndexFile(fileIO, pathFactory().indexFileFactory()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,7 @@ public KeyValueFileStoreRead newRead() {
newKeyComparator(),
userDefinedSeqComparator(),
mfFactory,
newReaderFactoryBuilder(),
options,
newIndexFileHandler());
newReaderFactoryBuilder());
}

public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Optional;

import static org.apache.paimon.utils.Preconditions.checkArgument;

Expand All @@ -39,6 +40,18 @@ public ApplyDeletionVectorReader(RecordReader<T> reader, DeletionVector deletion
this.deletionVector = deletionVector;
}

public static <T> RecordReader<T> create(RecordReader<T> reader, Optional<DeletionVector> dv) {
return create(reader, dv.orElse(null));
}

public static <T> RecordReader<T> create(RecordReader<T> reader, @Nullable DeletionVector dv) {
if (dv == null) {
return reader;
}

return new ApplyDeletionVectorReader<>(reader, dv);
}

@Nullable
@Override
public RecordIterator<T> readBatch() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,21 @@

package org.apache.paimon.deletionvectors;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.source.DeletionFile;

import javax.annotation.Nullable;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* The DeletionVector can efficiently record the positions of rows that are deleted in a file, which
Expand Down Expand Up @@ -91,4 +103,64 @@ static DeletionVector deserializeFromBytes(byte[] bytes) {
throw new RuntimeException("Unable to deserialize deletion vector", e);
}
}

static DeletionVector read(FileIO fileIO, DeletionFile deletionFile) throws IOException {
Path path = new Path(deletionFile.path());
try (SeekableInputStream input = fileIO.newInputStream(path)) {
input.seek(deletionFile.offset());
DataInputStream dis = new DataInputStream(input);
int actualLength = dis.readInt();
if (actualLength != deletionFile.length()) {
throw new RuntimeException(
"Size not match, actual size: "
+ actualLength
+ ", expert size: "
+ deletionFile.length());
}
int magicNum = dis.readInt();
if (magicNum == BitmapDeletionVector.MAGIC_NUMBER) {
return BitmapDeletionVector.deserializeFromDataInput(dis);
} else {
throw new RuntimeException("Invalid magic number: " + magicNum);
}
}
}

static Factory emptyFactory() {
return fileName -> Optional.empty();
}

static Factory factory(@Nullable DeletionVectorsMaintainer dvMaintainer) {
if (dvMaintainer == null) {
return emptyFactory();
}
return dvMaintainer::deletionVectorOf;
}

static Factory factory(
FileIO fileIO, List<DataFileMeta> files, @Nullable List<DeletionFile> deletionFiles) {
if (deletionFiles == null) {
return emptyFactory();
}
Map<String, DeletionFile> fileToDeletion = new HashMap<>();
for (int i = 0; i < files.size(); i++) {
DeletionFile deletionFile = deletionFiles.get(i);
if (deletionFile != null) {
fileToDeletion.put(files.get(i).fileName(), deletionFile);
}
}
return fileName -> {
DeletionFile deletionFile = fileToDeletion.get(fileName);
if (deletionFile == null) {
return Optional.empty();
}

return Optional.of(DeletionVector.read(fileIO, deletionFile));
};
}

/** Interface to create {@link DeletionVector}. */
interface Factory {
Optional<DeletionVector> create(String fileName) throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;

/** Maintainer of deletionVectors index. */
public class DeletionVectorsMaintainer {

Expand All @@ -48,11 +50,7 @@ private DeletionVectorsMaintainer(
snapshotId == null
? null
: fileHandler
.scan(
snapshotId,
DeletionVectorsIndexFile.DELETION_VECTORS_INDEX,
partition,
bucket)
.scan(snapshotId, DELETION_VECTORS_INDEX, partition, bucket)
.orElse(null);
this.deletionVectors =
indexFile == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public HashIndexFile(FileIO fileIO, PathFactory pathFactory) {
super(fileIO, pathFactory);
}

public Path path(String fileName) {
return pathFactory.toPath(fileName);
}

public IntIterator read(String fileName) throws IOException {
return readInts(fileIO, pathFactory.toPath(fileName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.utils.IntIterator;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
import org.apache.paimon.utils.SnapshotManager;

import java.io.IOException;
Expand All @@ -43,16 +45,19 @@
public class IndexFileHandler {

private final SnapshotManager snapshotManager;
private final PathFactory pathFactory;
private final IndexManifestFile indexManifestFile;
private final HashIndexFile hashIndex;
private final DeletionVectorsIndexFile deletionVectorsIndex;

public IndexFileHandler(
SnapshotManager snapshotManager,
PathFactory pathFactory,
IndexManifestFile indexManifestFile,
HashIndexFile hashIndex,
DeletionVectorsIndexFile deletionVectorsIndex) {
this.snapshotManager = snapshotManager;
this.pathFactory = pathFactory;
this.indexManifestFile = indexManifestFile;
this.hashIndex = hashIndex;
this.deletionVectorsIndex = deletionVectorsIndex;
Expand Down Expand Up @@ -102,6 +107,10 @@ public List<IndexManifestEntry> scan(long snapshotId, String indexType, BinaryRo
return result;
}

public Path filePath(IndexFileMeta file) {
return pathFactory.toPath(file.fileName());
}

public List<Integer> readHashIndexList(IndexFileMeta file) {
return IntIterator.toIntList(readHashIndex(file));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;

/** Factory to create {@link RecordReader}s for reading {@link KeyValue} files. */
Expand All @@ -63,9 +62,7 @@ public class KeyValueFileReaderFactory {

private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
private final BinaryRow partition;

// FileName to its corresponding deletion vector
private final @Nullable Function<String, Optional<DeletionVector>> deletionVectorSupplier;
private final DeletionVector.Factory dvFactory;

private KeyValueFileReaderFactory(
FileIO fileIO,
Expand All @@ -77,7 +74,7 @@ private KeyValueFileReaderFactory(
DataFilePathFactory pathFactory,
long asyncThreshold,
BinaryRow partition,
@Nullable Function<String, Optional<DeletionVector>> deletionVectorSupplier) {
DeletionVector.Factory dvFactory) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schemaId = schemaId;
Expand All @@ -88,7 +85,7 @@ private KeyValueFileReaderFactory(
this.asyncThreshold = asyncThreshold;
this.partition = partition;
this.bulkFormatMappings = new HashMap<>();
this.deletionVectorSupplier = deletionVectorSupplier;
this.dvFactory = dvFactory;
}

public RecordReader<KeyValue> createRecordReader(
Expand Down Expand Up @@ -134,13 +131,9 @@ private RecordReader<KeyValue> createRecordReader(
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
if (deletionVectorSupplier != null) {
Optional<DeletionVector> optionalDeletionVector =
deletionVectorSupplier.apply(fileName);
if (optionalDeletionVector.isPresent() && !optionalDeletionVector.get().isEmpty()) {
recordReader =
new ApplyDeletionVectorReader<>(recordReader, optionalDeletionVector.get());
}
Optional<DeletionVector> deletionVector = dvFactory.create(fileName);
if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
recordReader = new ApplyDeletionVectorReader<>(recordReader, deletionVector.get());
}
return recordReader;
}
Expand Down Expand Up @@ -185,7 +178,6 @@ public static class Builder {
private int[][] valueProjection;
private RowType projectedKeyType;
private RowType projectedValueType;
private @Nullable Function<String, Optional<DeletionVector>> deletionVectorSupplier;

private Builder(
FileIO fileIO,
Expand Down Expand Up @@ -238,12 +230,6 @@ public Builder withValueProjection(int[][] projection) {
return this;
}

public Builder withDeletionVectorSupplier(
Function<String, Optional<DeletionVector>> deletionVectorSupplier) {
this.deletionVectorSupplier = deletionVectorSupplier;
return this;
}

public RowType keyType() {
return keyType;
}
Expand All @@ -252,13 +238,15 @@ public RowType projectedValueType() {
return projectedValueType;
}

public KeyValueFileReaderFactory build(BinaryRow partition, int bucket) {
return build(partition, bucket, true, Collections.emptyList());
public KeyValueFileReaderFactory build(
BinaryRow partition, int bucket, DeletionVector.Factory dvFactory) {
return build(partition, bucket, dvFactory, true, Collections.emptyList());
}

public KeyValueFileReaderFactory build(
BinaryRow partition,
int bucket,
DeletionVector.Factory dvFactory,
boolean projectKeys,
@Nullable List<Predicate> filters) {
int[][] keyProjection = projectKeys ? this.keyProjection : fullKeyProjection;
Expand All @@ -275,12 +263,16 @@ public KeyValueFileReaderFactory build(
pathFactory.createDataFilePathFactory(partition, bucket),
options.fileReaderAsyncThreshold().getBytes(),
partition,
deletionVectorSupplier);
dvFactory);
}

private void applyProjection() {
projectedKeyType = Projection.of(keyProjection).project(keyType);
projectedValueType = Projection.of(valueProjection).project(valueType);
}

public FileIO fileIO() {
return fileIO;
}
}
}
Loading
Loading