Skip to content

Commit

Permalink
Fix cases when the reader is closed but the stream still tries to acc…
Browse files Browse the repository at this point in the history
…ess iterator that backs the stream
  • Loading branch information
Minutis committed Jun 12, 2024
1 parent 6612a57 commit 2f07bc0
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

import com.exacaster.deltafetch.search.ColumnValueFilter;
import com.exacaster.deltafetch.search.parquet.readsupport.MapReadSupport;
import com.exacaster.deltafetch.search.parquet.readsupport.ParquetIterator;
import com.google.common.collect.Iterators;
import com.google.common.collect.Streams;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.filter2.compat.FilterCompat;
Expand All @@ -16,6 +13,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -35,12 +33,24 @@ public ParquetLookupReader(Configuration conf, String path) {
public Stream<Map<String, Object>> find(List<ColumnValueFilter> filters, int limit) {
LOG.debug("Reading: {} with filters {}", path, filters);
try (var reader = prepareReader(filters)) {
return Streams.stream(Iterators.limit(new ParquetIterator(reader), limit));
return readItemsWithLimit(reader, limit).stream();
} catch (IOException e) {
throw new IllegalStateException("Failed building reader", e);
}
}

private List<Map<String, Object>> readItemsWithLimit(ParquetReader<Map<String, Object>> parquetReader, int limit) throws IOException {
List<Map<String, Object>> data = new ArrayList<>();
for (int i = 0; i < limit; i++) {
Map<String, Object> item = parquetReader.read();
if (item == null) {
break;
}
data.add(item);
}
return data;
}

private ParquetReader<Map<String, Object>> prepareReader(List<ColumnValueFilter> filters) throws IOException {
var readerBuilder = ParquetReader
.builder(new MapReadSupport(), new Path(path))
Expand Down

This file was deleted.

0 comments on commit 2f07bc0

Please sign in to comment.