Skip to content

Commit 90f89e0

Browse files
authored
Optimize Parquet RowGroup pruning, update StatisticsExtractor API (#10802)
1 parent cfbfc03 commit 90f89e0

File tree

8 files changed

+245
-195
lines changed

8 files changed

+245
-195
lines changed

datafusion-examples/examples/parquet_index.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use arrow_schema::SchemaRef;
2525
use async_trait::async_trait;
2626
use datafusion::datasource::listing::PartitionedFile;
2727
use datafusion::datasource::physical_plan::{
28-
parquet::{RequestedStatistics, StatisticsConverter},
28+
parquet::StatisticsConverter,
2929
{FileScanConfig, ParquetExec},
3030
};
3131
use datafusion::datasource::TableProvider;
@@ -518,21 +518,17 @@ impl ParquetMetadataIndexBuilder {
518518

519519
// extract the parquet statistics from the file's footer
520520
let metadata = reader.metadata();
521+
let row_groups = metadata.row_groups();
521522

522523
// Extract the min/max values for each row group from the statistics
523-
let row_counts = StatisticsConverter::row_counts(reader.metadata())?;
524-
let value_column_mins = StatisticsConverter::try_new(
524+
let converter = StatisticsConverter::try_new(
525525
"value",
526-
RequestedStatistics::Min,
527526
reader.schema(),
528-
)?
529-
.extract(reader.metadata())?;
530-
let value_column_maxes = StatisticsConverter::try_new(
531-
"value",
532-
RequestedStatistics::Max,
533-
reader.schema(),
534-
)?
535-
.extract(reader.metadata())?;
527+
reader.parquet_schema(),
528+
)?;
529+
let row_counts = StatisticsConverter::row_group_row_counts(row_groups.iter())?;
530+
let value_column_mins = converter.row_group_mins(row_groups.iter())?;
531+
let value_column_maxes = converter.row_group_maxes(row_groups.iter())?;
536532

537533
// In a real system you would have to handle nulls, which represent
538534
// unknown statistics. All statistics are known in this example

datafusion/core/benches/parquet_statistic.rs

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@ use arrow_schema::{
2424
Field, Schema,
2525
};
2626
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
27-
use datafusion::datasource::physical_plan::parquet::{
28-
RequestedStatistics, StatisticsConverter,
29-
};
27+
use datafusion::datasource::physical_plan::parquet::StatisticsConverter;
3028
use parquet::arrow::{arrow_reader::ArrowReaderBuilder, ArrowWriter};
3129
use parquet::file::properties::WriterProperties;
3230
use std::sync::Arc;
@@ -159,41 +157,26 @@ fn criterion_benchmark(c: &mut Criterion) {
159157
let file = file.reopen().unwrap();
160158
let reader = ArrowReaderBuilder::try_new(file).unwrap();
161159
let metadata = reader.metadata();
160+
let row_groups = metadata.row_groups();
162161

163162
let mut group =
164163
c.benchmark_group(format!("Extract statistics for {}", dtype.clone()));
165164
group.bench_function(
166165
BenchmarkId::new("extract_statistics", dtype.clone()),
167166
|b| {
168167
b.iter(|| {
169-
let _ = StatisticsConverter::try_new(
170-
"col",
171-
RequestedStatistics::Min,
172-
reader.schema(),
173-
)
174-
.unwrap()
175-
.extract(metadata)
176-
.unwrap();
177-
178-
let _ = StatisticsConverter::try_new(
179-
"col",
180-
RequestedStatistics::Max,
181-
reader.schema(),
182-
)
183-
.unwrap()
184-
.extract(reader.metadata())
185-
.unwrap();
186-
187-
let _ = StatisticsConverter::try_new(
168+
let converter = StatisticsConverter::try_new(
188169
"col",
189-
RequestedStatistics::NullCount,
190170
reader.schema(),
171+
reader.parquet_schema(),
191172
)
192-
.unwrap()
193-
.extract(reader.metadata())
194173
.unwrap();
195174

196-
let _ = StatisticsConverter::row_counts(reader.metadata()).unwrap();
175+
let _ = converter.row_group_mins(row_groups.iter()).unwrap();
176+
let _ = converter.row_group_maxes(row_groups.iter()).unwrap();
177+
let _ = converter.row_group_null_counts(row_groups.iter()).unwrap();
178+
let _ = StatisticsConverter::row_group_row_counts(row_groups.iter())
179+
.unwrap();
197180
})
198181
},
199182
);

datafusion/core/src/datasource/physical_plan/parquet/metrics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ use crate::physical_plan::metrics::{
2929
pub struct ParquetFileMetrics {
3030
/// Number of times the predicate could not be evaluated
3131
pub predicate_evaluation_errors: Count,
32-
/// Number of row groups whose bloom filters were checked and matched
32+
/// Number of row groups whose bloom filters were checked and matched (not pruned)
3333
pub row_groups_matched_bloom_filter: Count,
3434
/// Number of row groups pruned by bloom filters
3535
pub row_groups_pruned_bloom_filter: Count,
36-
/// Number of row groups whose statistics were checked and matched
36+
/// Number of row groups whose statistics were checked and matched (not pruned)
3737
pub row_groups_matched_statistics: Count,
3838
/// Number of row groups pruned by statistics
3939
pub row_groups_pruned_statistics: Count,

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
6464
pub use metrics::ParquetFileMetrics;
6565
use opener::ParquetOpener;
6666
pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
67-
pub use statistics::{RequestedStatistics, StatisticsConverter};
67+
pub use statistics::StatisticsConverter;
6868
pub use writer::plan_to_parquet;
6969

7070
/// Execution plan for reading one or more Parquet files.

datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs

Lines changed: 59 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,23 @@
1717

1818
use arrow::{array::ArrayRef, datatypes::Schema};
1919
use arrow_array::BooleanArray;
20-
use arrow_schema::FieldRef;
21-
use datafusion_common::{Column, ScalarValue};
20+
use datafusion_common::{Column, Result, ScalarValue};
2221
use parquet::basic::Type;
2322
use parquet::data_type::Decimal;
24-
use parquet::file::metadata::ColumnChunkMetaData;
2523
use parquet::schema::types::SchemaDescriptor;
2624
use parquet::{
2725
arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder},
2826
bloom_filter::Sbbf,
2927
file::metadata::RowGroupMetaData,
3028
};
3129
use std::collections::{HashMap, HashSet};
30+
use std::sync::Arc;
3231

3332
use crate::datasource::listing::FileRange;
34-
use crate::datasource::physical_plan::parquet::statistics::{
35-
max_statistics, min_statistics, parquet_column,
36-
};
33+
use crate::datasource::physical_plan::parquet::statistics::parquet_column;
3734
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
3835

39-
use super::{ParquetAccessPlan, ParquetFileMetrics};
36+
use super::{ParquetAccessPlan, ParquetFileMetrics, StatisticsConverter};
4037

4138
/// Reduces the [`ParquetAccessPlan`] based on row group level metadata.
4239
///
@@ -113,32 +110,37 @@ impl RowGroupAccessPlanFilter {
113110
metrics: &ParquetFileMetrics,
114111
) {
115112
assert_eq!(groups.len(), self.access_plan.len());
116-
for (idx, metadata) in groups.iter().enumerate() {
117-
if !self.access_plan.should_scan(idx) {
118-
continue;
119-
}
120-
let pruning_stats = RowGroupPruningStatistics {
121-
parquet_schema,
122-
row_group_metadata: metadata,
123-
arrow_schema,
124-
};
125-
match predicate.prune(&pruning_stats) {
126-
Ok(values) => {
127-
// NB: false means don't scan row group
128-
if !values[0] {
113+
// Indexes of row groups still to scan
114+
let row_group_indexes = self.access_plan.row_group_indexes();
115+
let row_group_metadatas = row_group_indexes
116+
.iter()
117+
.map(|&i| &groups[i])
118+
.collect::<Vec<_>>();
119+
120+
let pruning_stats = RowGroupPruningStatistics {
121+
parquet_schema,
122+
row_group_metadatas,
123+
arrow_schema,
124+
};
125+
126+
// try to prune the row groups in a single call
127+
match predicate.prune(&pruning_stats) {
128+
Ok(values) => {
129+
// values[i] is false means the predicate could not be true for row group i
130+
for (idx, &value) in row_group_indexes.iter().zip(values.iter()) {
131+
if !value {
132+
self.access_plan.skip(*idx);
129133
metrics.row_groups_pruned_statistics.add(1);
130-
self.access_plan.skip(idx);
131-
continue;
134+
} else {
135+
metrics.row_groups_matched_statistics.add(1);
132136
}
133137
}
134-
// stats filter array could not be built
135-
// don't prune this row group
136-
Err(e) => {
137-
log::debug!("Error evaluating row group predicate values {e}");
138-
metrics.predicate_evaluation_errors.add(1);
139-
}
140138
}
141-
metrics.row_groups_matched_statistics.add(1);
139+
// stats filter array could not be built, so we can't prune
140+
Err(e) => {
141+
log::debug!("Error evaluating row group predicate values {e}");
142+
metrics.predicate_evaluation_errors.add(1);
143+
}
142144
}
143145
}
144146

@@ -337,49 +339,55 @@ impl PruningStatistics for BloomFilterStatistics {
337339
}
338340
}
339341

340-
/// Wraps [`RowGroupMetaData`] in a way that implements [`PruningStatistics`]
341-
///
342-
/// Note: This should be implemented for an array of [`RowGroupMetaData`] instead
343-
/// of per row-group
342+
/// Wraps a slice of [`RowGroupMetaData`] in a way that implements [`PruningStatistics`]
344343
struct RowGroupPruningStatistics<'a> {
345344
parquet_schema: &'a SchemaDescriptor,
346-
row_group_metadata: &'a RowGroupMetaData,
345+
row_group_metadatas: Vec<&'a RowGroupMetaData>,
347346
arrow_schema: &'a Schema,
348347
}
349348

350349
impl<'a> RowGroupPruningStatistics<'a> {
351-
/// Lookups up the parquet column by name
352-
fn column(&self, name: &str) -> Option<(&ColumnChunkMetaData, &FieldRef)> {
353-
let (idx, field) = parquet_column(self.parquet_schema, self.arrow_schema, name)?;
354-
Some((self.row_group_metadata.column(idx), field))
350+
/// Return an iterator over the row group metadata
351+
fn metadata_iter(&'a self) -> impl Iterator<Item = &'a RowGroupMetaData> + 'a {
352+
self.row_group_metadatas.iter().copied()
353+
}
354+
355+
fn statistics_converter<'b>(
356+
&'a self,
357+
column: &'b Column,
358+
) -> Result<StatisticsConverter<'a>> {
359+
StatisticsConverter::try_new(&column.name, self.arrow_schema, self.parquet_schema)
355360
}
356361
}
357362

358363
impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
359364
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
360-
let (column, field) = self.column(&column.name)?;
361-
min_statistics(field.data_type(), std::iter::once(column.statistics())).ok()
365+
self.statistics_converter(column)
366+
.and_then(|c| c.row_group_mins(self.metadata_iter()))
367+
.ok()
362368
}
363369

364370
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
365-
let (column, field) = self.column(&column.name)?;
366-
max_statistics(field.data_type(), std::iter::once(column.statistics())).ok()
371+
self.statistics_converter(column)
372+
.and_then(|c| c.row_group_maxes(self.metadata_iter()))
373+
.ok()
367374
}
368375

369376
fn num_containers(&self) -> usize {
370-
1
377+
self.row_group_metadatas.len()
371378
}
372379

373380
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
374-
let (c, _) = self.column(&column.name)?;
375-
let scalar = ScalarValue::UInt64(Some(c.statistics()?.null_count()));
376-
scalar.to_array().ok()
381+
self.statistics_converter(column)
382+
.and_then(|c| c.row_group_null_counts(self.metadata_iter()))
383+
.ok()
377384
}
378385

379-
fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
380-
let (c, _) = self.column(&column.name)?;
381-
let scalar = ScalarValue::UInt64(Some(c.num_values() as u64));
382-
scalar.to_array().ok()
386+
fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
387+
// row counts are the same for all columns in a row group
388+
StatisticsConverter::row_group_row_counts(self.metadata_iter())
389+
.ok()
390+
.map(|counts| Arc::new(counts) as ArrayRef)
383391
}
384392

385393
fn contained(
@@ -406,6 +414,7 @@ mod tests {
406414
use parquet::arrow::async_reader::ParquetObjectReader;
407415
use parquet::basic::LogicalType;
408416
use parquet::data_type::{ByteArray, FixedLenByteArray};
417+
use parquet::file::metadata::ColumnChunkMetaData;
409418
use parquet::{
410419
basic::Type as PhysicalType, file::statistics::Statistics as ParquetStatistics,
411420
schema::types::SchemaDescPtr,

0 commit comments

Comments
 (0)