Description
Is your feature request related to a problem or challenge?
There are at least three places that parquet statistics are extracted into ArrayRefs today
- ParquetExec (pruning Row Groups): https://github.com/apache/datafusion/blob/465c89f7f16d48b030d4a384733567b91dab88fa/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs#L18-L17
-
ParquetExec (Pruning pages): https://github.com/apache/datafusion/blob/671cef85c550969ab2c86d644968a048cb181c0c/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs#L393-L392
-
ListingTable (pruning files): https://github.com/apache/datafusion/blob/97148bd105fc2102b0444f2d67ef535937da5dfe/datafusion/core/src/datasource/file_format/parquet.rs#L295-L294
Not only are there three copies of the code, they are all subtly different (e.g. #8295) and have varying degrees of testing
Describe the solution you'd like
I would like one API with the following properties:
- Extracts statistics from one or more parquet files as
ArrayRef
s suitable to pass to PruningPredicate - Does so correctly (applies the appropriate schema coercion / conversion rules)
- Does so quickly and efficiently (e.g. does not do this once per row group), is suitable for 1000s of parquet files
Describe alternatives you've considered
Some ideas from apache/arrow-rs#4328
Subtasks
- feat: API for collecting statistics/index for metadata of a parquet file + tests #10537
- Implement a benchmark for extracting arrow statistics from parquet #10606
- Incorrect statistics read for
i8
i16
columns in parquet #10585 - DataFusion ignores "column order" parquet statistics specification #10586
- DataFusion reads Date32 and Date64 parquet statistics in as Int32Array #10587
- Complete porting tests from
datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
todatafusion/core/tests/parquet/arrow_statistics.rs
- Convert ParquetExec to use the new API for RowGroup pruning
- Convert ParquetExec to use the new API for PagePruning
- Convert
ListingTable
to use the new API for file pruning - Incorrect statistics read for unsigned integer columns in parquet #10604
- Incorrect statistics read for binary columns in parquet #10605
- Incorrect statistics read for struct array in parquet #10609
- Improve performance of extracting statistics from parquet files #10626
- Minor: Add tests for extracting dictionary parquet statistics #10729
- Extract parquet statistics from timestamps with timezones #10758
- Extract parquet statistics from
f16
columns #10757 - Extract parquet statistics from
Time32
andTime64
columns #10751 - Extract parquet statistics from
Interval
columns #10752 - Extract parquet statistics from
Duration
columns #10754 - Extract parquet statistics from
LargeBinary
columns #10753 - Extract parquet statistics from
LargeUtf8
columns #10756 - Extract parquet statistics from
Decimal256
columns #10755
Follow on projects:
Here is a proposed API:
/// statistics extracted from `Statistics` as Arrow `ArrayRef`s
///
/// # Note:
/// If the corresponding `Statistics` is not present, or has no information for
/// a column, a NULL is present in the corresponding array entry
pub struct ArrowStatistics {
/// min values
min: ArrayRef,
/// max values
max: ArrayRef,
/// Row counts (UInt64Array)
row_count: ArrayRef,
/// Null Counts (UInt64Array)
null_count: ArrayRef,
}
// (TODO accessors for min/max/row_count/null_count)
/// Extract `ArrowStatistics` from the parquet [`Statistics`]
pub fn parquet_stats_to_arrow(
arrow_datatype: &DataType,
statistics: impl IntoIterator<Item = Option<&Statistics>>
) -> Result<ArrowStatisics> {
todo!()
}
Maybe it would make sense to have something more builder style:
struct ParquetStatisticsExtractor {
...
}
// create an extractor that can extract data from parquet files
let extractor = ParquetStatisticsExtractor::new(arrow_schema, parquet_schema)
// get parquet statistics (one for each row group) somehow:
let parquet_stats: Vec<&Statistics> = ...;
// extract min/max values for column "a" and "b";
let col_a stats = extractor.extract("a", parquet_stats.iter());
let col_b stats = extractor.extract("b", parquet_stats.iter());
(This is similar to the existing API parquet::arrow::parquet_to_arrow_schema)
Note Statistics
above is Statistics
There is a version of this code here in DataFusion that could perhaps be adapted:
datafusion/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Lines 179 to 186 in accce97
Testing
I suggest we add a new module to the existing parquet test in https://github.com/apache/datafusion/blob/main/datafusion/core/tests/parquet_exec.rs
The tests should look like:
let record_batch = make_batch_with_relevant_datatype();
// write batch/batches to file
// open file / extract stats from metadata
// compare stats
I can help writing these tests
I personally suggest:
- Make a PR with the basic API and a single basic types (like Int/UInt or String) and figure out the test pattern (I can definitely help here)
- Then we can fill out support for the rest of the types in a follow on PR
cc @tustvold in case you have other ideas
Additional context
This code likely eventually would be good to have in the parquet crate -- see apache/arrow-rs#4328. However, I think initially we should do it in DataFusion to iterate faster and figure out the API before moving it up there
There are a bunch of related improvements that I think become much simpler with this feature: