Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Mar 11, 2024
1 parent f8f3cf7 commit e6fd09e
Showing 1 changed file with 14 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {

@Nullable private int[][] keyProjectedFields;

@Nullable private List<Predicate> filtersForMerge;
@Nullable private List<Predicate> filtersForKeys;

@Nullable private List<Predicate> filtersForNonMerge;
@Nullable private List<Predicate> filtersForAll;

@Nullable private int[][] pushdownProjection;
@Nullable private int[][] outerProjection;
Expand Down Expand Up @@ -164,8 +164,8 @@ public FileStoreRead<KeyValue> withFilter(Predicate predicate) {
// So for sections with overlapping runs, we only push down key filters.
// For sections with only one run, as each key only appears once, it is OK to push down
// value filters.
filtersForNonMerge = allFilters;
filtersForMerge = pkFilters;
filtersForAll = allFilters;
filtersForKeys = pkFilters;
return this;
}

Expand All @@ -191,7 +191,8 @@ private RecordReader<KeyValue> createReaderWithoutOuterProjection(DataSplit spli
split.partition(),
split.bucket(),
split.beforeFiles(),
split.beforeDeletionFiles().orElse(null)));
split.beforeDeletionFiles().orElse(null),
split.isStreaming()));
} else {
beforeSupplier =
() ->
Expand All @@ -211,7 +212,8 @@ private RecordReader<KeyValue> createReaderWithoutOuterProjection(DataSplit spli
split.partition(),
split.bucket(),
split.dataFiles(),
split.deletionFiles().orElse(null));
split.deletionFiles().orElse(null),
split.isStreaming());
} else {
dataSupplier =
() ->
Expand Down Expand Up @@ -245,9 +247,9 @@ private RecordReader<KeyValue> mergeRead(
// Sections are read by SortMergeReader, which sorts and merges records by keys.
// So we cannot project keys or else the sorting will be incorrect.
KeyValueFileReaderFactory overlappedSectionFactory =
readerFactoryBuilder.build(partition, bucket, false, filtersForMerge);
readerFactoryBuilder.build(partition, bucket, false, filtersForKeys);
KeyValueFileReaderFactory nonOverlappedSectionFactory =
readerFactoryBuilder.build(partition, bucket, false, filtersForNonMerge);
readerFactoryBuilder.build(partition, bucket, false, filtersForAll);

List<ReaderSupplier<KeyValue>> sectionReaders = new ArrayList<>();
MergeFunctionWrapper<KeyValue> mergeFuncWrapper =
Expand Down Expand Up @@ -279,10 +281,12 @@ private RecordReader<KeyValue> noMergeRead(
BinaryRow partition,
int bucket,
List<DataFileMeta> files,
@Nullable List<DeletionFile> deletionFiles)
@Nullable List<DeletionFile> deletionFiles,
boolean onlyFilterKey)
throws IOException {
KeyValueFileReaderFactory readerFactory =
readerFactoryBuilder.build(partition, bucket, true, filtersForNonMerge);
readerFactoryBuilder.build(
partition, bucket, true, onlyFilterKey ? filtersForKeys : filtersForAll);
List<ReaderSupplier<KeyValue>> suppliers = new ArrayList<>();
for (int i = 0; i < files.size(); i++) {
DataFileMeta file = files.get(i);
Expand Down

0 comments on commit e6fd09e

Please sign in to comment.