Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Mar 11, 2024
1 parent e6fd09e commit 423295b
Show file tree
Hide file tree
Showing 17 changed files with 244 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ protected IndexManifestFile.Factory indexManifestFileFactory() {
public IndexFileHandler newIndexFileHandler() {
return new IndexFileHandler(
snapshotManager(),
pathFactory().indexFileFactory(),
indexManifestFileFactory().create(),
new HashIndexFile(fileIO, pathFactory().indexFileFactory()),
new DeletionVectorsIndexFile(fileIO, pathFactory().indexFileFactory()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Optional;

import static org.apache.paimon.utils.Preconditions.checkArgument;

Expand All @@ -39,6 +40,18 @@ public ApplyDeletionVectorReader(RecordReader<T> reader, DeletionVector deletion
this.deletionVector = deletionVector;
}

public static <T> RecordReader<T> create(RecordReader<T> reader, Optional<DeletionVector> dv) {
return create(reader, dv.orElse(null));
}

public static <T> RecordReader<T> create(RecordReader<T> reader, @Nullable DeletionVector dv) {
if (dv == null) {
return reader;
}

return new ApplyDeletionVectorReader<>(reader, dv);
}

@Nullable
@Override
public RecordIterator<T> readBatch() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ static DeletionVector read(FileIO fileIO, DeletionFile deletionFile) throws IOEx
try (SeekableInputStream input = fileIO.newInputStream(path)) {
input.seek(deletionFile.offset());
DataInputStream dis = new DataInputStream(input);
int actualLength = dis.readInt();
if (actualLength != deletionFile.length()) {
throw new RuntimeException(
"Size not match, actual size: "
+ actualLength
+ ", expert size: "
+ deletionFile.length());
}
int magicNum = dis.readInt();
if (magicNum == BitmapDeletionVector.MAGIC_NUMBER) {
return BitmapDeletionVector.deserializeFromDataInput(dis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;

/** Maintainer of deletionVectors index. */
public class DeletionVectorsMaintainer {

Expand All @@ -48,11 +50,7 @@ private DeletionVectorsMaintainer(
snapshotId == null
? null
: fileHandler
.scan(
snapshotId,
DeletionVectorsIndexFile.DELETION_VECTORS_INDEX,
partition,
bucket)
.scan(snapshotId, DELETION_VECTORS_INDEX, partition, bucket)
.orElse(null);
this.deletionVectors =
indexFile == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public HashIndexFile(FileIO fileIO, PathFactory pathFactory) {
super(fileIO, pathFactory);
}

public Path path(String fileName) {
return pathFactory.toPath(fileName);
}

public IntIterator read(String fileName) throws IOException {
return readInts(fileIO, pathFactory.toPath(fileName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.utils.IntIterator;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
import org.apache.paimon.utils.SnapshotManager;

import java.io.IOException;
Expand All @@ -43,16 +45,19 @@
public class IndexFileHandler {

private final SnapshotManager snapshotManager;
private final PathFactory pathFactory;
private final IndexManifestFile indexManifestFile;
private final HashIndexFile hashIndex;
private final DeletionVectorsIndexFile deletionVectorsIndex;

public IndexFileHandler(
SnapshotManager snapshotManager,
PathFactory pathFactory,
IndexManifestFile indexManifestFile,
HashIndexFile hashIndex,
DeletionVectorsIndexFile deletionVectorsIndex) {
this.snapshotManager = snapshotManager;
this.pathFactory = pathFactory;
this.indexManifestFile = indexManifestFile;
this.hashIndex = hashIndex;
this.deletionVectorsIndex = deletionVectorsIndex;
Expand Down Expand Up @@ -102,6 +107,10 @@ public List<IndexManifestEntry> scan(long snapshotId, String indexType, BinaryRo
return result;
}

public Path filePath(IndexFileMeta file) {
return pathFactory.toPath(file.fileName());
}

public List<Integer> readHashIndexList(IndexFileMeta file) {
return IntIterator.toIntList(readHashIndex(file));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.format.FormatKey;
import org.apache.paimon.fs.FileIO;
Expand All @@ -42,6 +44,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;

/** Factory to create {@link RecordReader}s for reading {@link KeyValue} files. */
Expand All @@ -60,6 +64,9 @@ public class KeyValueFileReaderFactory {
private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
private final BinaryRow partition;

// FileName to its corresponding deletion vector
private final @Nullable Function<String, Optional<DeletionVector>> deletionVectorSupplier;

private KeyValueFileReaderFactory(
FileIO fileIO,
SchemaManager schemaManager,
Expand All @@ -69,7 +76,8 @@ private KeyValueFileReaderFactory(
BulkFormatMapping.BulkFormatMappingBuilder bulkFormatMappingBuilder,
DataFilePathFactory pathFactory,
long asyncThreshold,
BinaryRow partition) {
BinaryRow partition,
@Nullable Function<String, Optional<DeletionVector>> deletionVectorSupplier) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schemaId = schemaId;
Expand All @@ -80,6 +88,7 @@ private KeyValueFileReaderFactory(
this.asyncThreshold = asyncThreshold;
this.partition = partition;
this.bulkFormatMappings = new HashMap<>();
this.deletionVectorSupplier = deletionVectorSupplier;
}

public RecordReader<KeyValue> createRecordReader(
Expand Down Expand Up @@ -125,6 +134,14 @@ private RecordReader<KeyValue> createRecordReader(
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
if (deletionVectorSupplier != null) {
Optional<DeletionVector> optionalDeletionVector =
deletionVectorSupplier.apply(fileName);
if (optionalDeletionVector.isPresent() && !optionalDeletionVector.get().isEmpty()) {
recordReader =
new ApplyDeletionVectorReader<>(recordReader, optionalDeletionVector.get());
}
}
return recordReader;
}

Expand Down Expand Up @@ -168,6 +185,7 @@ public static class Builder {
private int[][] valueProjection;
private RowType projectedKeyType;
private RowType projectedValueType;
private @Nullable Function<String, Optional<DeletionVector>> deletionVectorSupplier;

private Builder(
FileIO fileIO,
Expand Down Expand Up @@ -220,6 +238,12 @@ public Builder withValueProjection(int[][] projection) {
return this;
}

public Builder withDeletionVectorSupplier(
Function<String, Optional<DeletionVector>> deletionVectorSupplier) {
this.deletionVectorSupplier = deletionVectorSupplier;
return this;
}

public RowType keyType() {
return keyType;
}
Expand Down Expand Up @@ -250,7 +274,8 @@ public KeyValueFileReaderFactory build(
formatDiscover, extractor, keyProjection, valueProjection, filters),
pathFactory.createDataFilePathFactory(partition, bucket),
options.fileReaderAsyncThreshold().getBytes(),
partition);
partition,
deletionVectorSupplier);
}

private void applyProjection() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ private RecordReader<KeyValue> noMergeRead(
if (deletionFile != null) {
DeletionVector deletionVector =
DeletionVector.read(fileIO, deletionFile);
reader = new ApplyDeletionVectorReader<>(reader, deletionVector);
reader = ApplyDeletionVectorReader.create(reader, deletionVector);
}
return reader;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,12 @@ private MergeTreeCompactRewriter createRewriter(
Comparator<InternalRow> keyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
Levels levels,
@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
@Nullable DeletionVectorsMaintainer dvMaintainer) {
KeyValueFileReaderFactory.Builder readerFactoryBuilder =
this.readerFactoryBuilder.copyWithoutProjection();
if (dvMaintainer != null) {
readerFactoryBuilder.withDeletionVectorSupplier(dvMaintainer::deletionVectorOf);
}
KeyValueFileReaderFactory readerFactory = readerFactoryBuilder.build(partition, bucket);
KeyValueFileWriterFactory writerFactory =
writerFactoryBuilder.build(partition, bucket, options);
Expand Down Expand Up @@ -289,7 +294,6 @@ private MergeTreeCompactRewriter createRewriter(
}
lookupReaderFactory =
readerFactoryBuilder
.copyWithoutProjection()
.withValueProjection(new int[0][])
.build(partition, bucket);
processor = new ContainsValueProcessor();
Expand Down Expand Up @@ -318,7 +322,7 @@ private MergeTreeCompactRewriter createRewriter(
mergeSorter,
wrapperFactory,
lookupStrategy,
deletionVectorsMaintainer);
dvMaintainer);
} else {
return new MergeTreeCompactRewriter(
readerFactory,
Expand All @@ -327,7 +331,7 @@ private MergeTreeCompactRewriter createRewriter(
userDefinedSeqComparator,
mfFactory,
mergeSorter,
deletionVectorsMaintainer);
dvMaintainer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ public SnapshotReader newSnapshotReader(String branchName) {
nonPartitionFilterConsumer(),
DefaultValueAssigner.create(tableSchema),
store().pathFactory(),
name());
name(),
store().newIndexFileHandler());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ protected SplitGenerator splitGenerator() {
return new MergeTreeSplitGenerator(
store().newKeyComparator(),
store().options().splitTargetSize(),
store().options().splitOpenFileCost());
store().options().splitOpenFileCost(),
store().options().deletionVectorsEnabled());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ private void assign(DataSplit other) {
this.partition = other.partition;
this.bucket = other.bucket;
this.beforeFiles = other.beforeFiles;
this.beforeDeletionFiles = other.beforeDeletionFiles;
this.dataFiles = other.dataFiles;
this.dataDeletionFiles = other.dataDeletionFiles;
this.isStreaming = other.isStreaming;
this.rawFiles = other.rawFiles;
}
Expand All @@ -176,11 +178,15 @@ public void serialize(DataOutputView out) throws IOException {
dataFileSer.serialize(file, out);
}

DeletionFile.serializeList(out, beforeDeletionFiles);

out.writeInt(dataFiles.size());
for (DataFileMeta file : dataFiles) {
dataFileSer.serialize(file, out);
}

DeletionFile.serializeList(out, dataDeletionFiles);

out.writeBoolean(isStreaming);

out.writeInt(rawFiles.size());
Expand All @@ -201,12 +207,16 @@ public static DataSplit deserialize(DataInputView in) throws IOException {
beforeFiles.add(dataFileSer.deserialize(in));
}

List<DeletionFile> beforeDeletionFiles = DeletionFile.deserializeList(in);

int fileNumber = in.readInt();
List<DataFileMeta> dataFiles = new ArrayList<>(fileNumber);
for (int i = 0; i < fileNumber; i++) {
dataFiles.add(dataFileSer.deserialize(in));
}

List<DeletionFile> dataDeletionFiles = DeletionFile.deserializeList(in);

boolean isStreaming = in.readBoolean();

int rawFileNum = in.readInt();
Expand All @@ -215,15 +225,22 @@ public static DataSplit deserialize(DataInputView in) throws IOException {
rawFiles.add(RawFile.deserialize(in));
}

return builder()
.withSnapshot(snapshotId)
.withPartition(partition)
.withBucket(bucket)
.withBeforeFiles(beforeFiles)
.withDataFiles(dataFiles)
.isStreaming(isStreaming)
.rawFiles(rawFiles)
.build();
DataSplit.Builder builder =
builder()
.withSnapshot(snapshotId)
.withPartition(partition)
.withBucket(bucket)
.withBeforeFiles(beforeFiles)
.withDataFiles(dataFiles)
.isStreaming(isStreaming)
.rawFiles(rawFiles);
if (beforeDeletionFiles != null) {
builder.withBeforeDeletionFiles(beforeDeletionFiles);
}
if (dataDeletionFiles != null) {
builder.withDataDeletionFiles(dataDeletionFiles);
}
return builder.build();
}

public static Builder builder() {
Expand Down Expand Up @@ -251,22 +268,22 @@ public Builder withBucket(int bucket) {
}

public Builder withBeforeFiles(List<DataFileMeta> beforeFiles) {
this.split.beforeFiles = beforeFiles;
this.split.beforeFiles = new ArrayList<>(beforeFiles);
return this;
}

public Builder withBeforeDeletionFiles(List<DeletionFile> beforeDeletionFiles) {
this.split.beforeDeletionFiles = beforeDeletionFiles;
this.split.beforeDeletionFiles = new ArrayList<>(beforeDeletionFiles);
return this;
}

public Builder withDataFiles(List<DataFileMeta> dataFiles) {
this.split.dataFiles = dataFiles;
this.split.dataFiles = new ArrayList<>(dataFiles);
return this;
}

public Builder withDataDeletionFiles(List<DeletionFile> dataDeletionFiles) {
this.split.dataDeletionFiles = dataDeletionFiles;
this.split.dataDeletionFiles = new ArrayList<>(dataDeletionFiles);
return this;
}

Expand Down
Loading

0 comments on commit 423295b

Please sign in to comment.