Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce deletion files to DataSplit #2988

Merged
merged 2 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ protected IndexManifestFile.Factory indexManifestFileFactory() {
public IndexFileHandler newIndexFileHandler() {
return new IndexFileHandler(
snapshotManager(),
pathFactory().indexFileFactory(),
indexManifestFileFactory().create(),
new HashIndexFile(fileIO, pathFactory().indexFileFactory()),
new DeletionVectorsIndexFile(fileIO, pathFactory().indexFileFactory()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,7 @@ public KeyValueFileStoreRead newRead() {
newKeyComparator(),
userDefinedSeqComparator(),
mfFactory,
newReaderFactoryBuilder(),
options,
newIndexFileHandler());
newReaderFactoryBuilder());
}

public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Optional;

import static org.apache.paimon.utils.Preconditions.checkArgument;

Expand All @@ -39,6 +40,18 @@ public ApplyDeletionVectorReader(RecordReader<T> reader, DeletionVector deletion
this.deletionVector = deletionVector;
}

public static <T> RecordReader<T> create(RecordReader<T> reader, Optional<DeletionVector> dv) {
return create(reader, dv.orElse(null));
}

public static <T> RecordReader<T> create(RecordReader<T> reader, @Nullable DeletionVector dv) {
if (dv == null) {
return reader;
}

return new ApplyDeletionVectorReader<>(reader, dv);
}

@Nullable
@Override
public RecordIterator<T> readBatch() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

package org.apache.paimon.deletionvectors;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.table.source.DeletionFile;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -91,4 +96,26 @@ static DeletionVector deserializeFromBytes(byte[] bytes) {
throw new RuntimeException("Unable to deserialize deletion vector", e);
}
}

static DeletionVector read(FileIO fileIO, DeletionFile deletionFile) throws IOException {
Path path = new Path(deletionFile.path());
try (SeekableInputStream input = fileIO.newInputStream(path)) {
input.seek(deletionFile.offset());
DataInputStream dis = new DataInputStream(input);
int actualLength = dis.readInt();
if (actualLength != deletionFile.length()) {
throw new RuntimeException(
"Size not match, actual size: "
+ actualLength
+ ", expert size: "
+ deletionFile.length());
}
int magicNum = dis.readInt();
if (magicNum == BitmapDeletionVector.MAGIC_NUMBER) {
return BitmapDeletionVector.deserializeFromDataInput(dis);
} else {
throw new RuntimeException("Invalid magic number: " + magicNum);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;

/** Maintainer of deletionVectors index. */
public class DeletionVectorsMaintainer {

Expand All @@ -48,11 +50,7 @@ private DeletionVectorsMaintainer(
snapshotId == null
? null
: fileHandler
.scan(
snapshotId,
DeletionVectorsIndexFile.DELETION_VECTORS_INDEX,
partition,
bucket)
.scan(snapshotId, DELETION_VECTORS_INDEX, partition, bucket)
.orElse(null);
this.deletionVectors =
indexFile == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public HashIndexFile(FileIO fileIO, PathFactory pathFactory) {
super(fileIO, pathFactory);
}

public Path path(String fileName) {
return pathFactory.toPath(fileName);
}

public IntIterator read(String fileName) throws IOException {
return readInts(fileIO, pathFactory.toPath(fileName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.utils.IntIterator;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
import org.apache.paimon.utils.SnapshotManager;

import java.io.IOException;
Expand All @@ -43,16 +45,19 @@
public class IndexFileHandler {

private final SnapshotManager snapshotManager;
private final PathFactory pathFactory;
private final IndexManifestFile indexManifestFile;
private final HashIndexFile hashIndex;
private final DeletionVectorsIndexFile deletionVectorsIndex;

public IndexFileHandler(
SnapshotManager snapshotManager,
PathFactory pathFactory,
IndexManifestFile indexManifestFile,
HashIndexFile hashIndex,
DeletionVectorsIndexFile deletionVectorsIndex) {
this.snapshotManager = snapshotManager;
this.pathFactory = pathFactory;
this.indexManifestFile = indexManifestFile;
this.hashIndex = hashIndex;
this.deletionVectorsIndex = deletionVectorsIndex;
Expand Down Expand Up @@ -102,6 +107,10 @@ public List<IndexManifestEntry> scan(long snapshotId, String indexType, BinaryRo
return result;
}

public Path filePath(IndexFileMeta file) {
return pathFactory.toPath(file.fileName());
}

public List<Integer> readHashIndexList(IndexFileMeta file) {
return IntIterator.toIntList(readHashIndex(file));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,5 +282,9 @@ private void applyProjection() {
projectedKeyType = Projection.of(keyProjection).project(keyType);
projectedValueType = Projection.of(valueProjection).project(valueType);
}

public FileIO fileIO() {
return fileIO;
}
}
}
Loading
Loading