Skip to content

Commit 23f3efd

Browse files
committed
Update the parquet code prune_pages_in_one_row_group to use the StatisticsExtractor
1 parent f204869 commit 23f3efd

File tree

5 files changed

+242
-357
lines changed

5 files changed

+242
-357
lines changed

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

+11-40
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::sync::Arc;
2424
use crate::datasource::listing::PartitionedFile;
2525
use crate::datasource::physical_plan::file_stream::FileStream;
2626
use crate::datasource::physical_plan::{
27-
parquet::page_filter::PagePruningPredicate, DisplayAs, FileGroupPartitioner,
27+
parquet::page_filter::PagePruningAccessPlanFilter, DisplayAs, FileGroupPartitioner,
2828
FileScanConfig,
2929
};
3030
use crate::{
@@ -39,13 +39,11 @@ use crate::{
3939
},
4040
};
4141

42-
use arrow::datatypes::{DataType, SchemaRef};
42+
use arrow::datatypes::SchemaRef;
4343
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};
4444

4545
use itertools::Itertools;
4646
use log::debug;
47-
use parquet::basic::{ConvertedType, LogicalType};
48-
use parquet::schema::types::ColumnDescriptor;
4947

5048
mod access_plan;
5149
mod metrics;
@@ -225,7 +223,7 @@ pub struct ParquetExec {
225223
/// Optional predicate for pruning row groups (derived from `predicate`)
226224
pruning_predicate: Option<Arc<PruningPredicate>>,
227225
/// Optional predicate for pruning pages (derived from `predicate`)
228-
page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
226+
page_pruning_predicate: Option<Arc<PagePruningAccessPlanFilter>>,
229227
/// Optional hint for the size of the parquet metadata
230228
metadata_size_hint: Option<usize>,
231229
/// Optional user defined parquet file reader factory
@@ -381,19 +379,12 @@ impl ParquetExecBuilder {
381379
})
382380
.filter(|p| !p.always_true());
383381

384-
let page_pruning_predicate = predicate.as_ref().and_then(|predicate_expr| {
385-
match PagePruningPredicate::try_new(predicate_expr, file_schema.clone()) {
386-
Ok(pruning_predicate) => Some(Arc::new(pruning_predicate)),
387-
Err(e) => {
388-
debug!(
389-
"Could not create page pruning predicate for '{:?}': {}",
390-
pruning_predicate, e
391-
);
392-
predicate_creation_errors.add(1);
393-
None
394-
}
395-
}
396-
});
382+
let page_pruning_predicate = predicate
383+
.as_ref()
384+
.map(|predicate_expr| {
385+
PagePruningAccessPlanFilter::new(predicate_expr, file_schema.clone())
386+
})
387+
.map(Arc::new);
397388

398389
let (projected_schema, projected_statistics, projected_output_ordering) =
399390
base_config.project();
@@ -739,7 +730,7 @@ impl ExecutionPlan for ParquetExec {
739730

740731
fn should_enable_page_index(
741732
enable_page_index: bool,
742-
page_pruning_predicate: &Option<Arc<PagePruningPredicate>>,
733+
page_pruning_predicate: &Option<Arc<PagePruningAccessPlanFilter>>,
743734
) -> bool {
744735
enable_page_index
745736
&& page_pruning_predicate.is_some()
@@ -749,26 +740,6 @@ fn should_enable_page_index(
749740
.unwrap_or(false)
750741
}
751742

752-
// Convert parquet column schema to arrow data type, and just consider the
753-
// decimal data type.
754-
pub(crate) fn parquet_to_arrow_decimal_type(
755-
parquet_column: &ColumnDescriptor,
756-
) -> Option<DataType> {
757-
let type_ptr = parquet_column.self_type_ptr();
758-
match type_ptr.get_basic_info().logical_type() {
759-
Some(LogicalType::Decimal { scale, precision }) => {
760-
Some(DataType::Decimal128(precision as u8, scale as i8))
761-
}
762-
_ => match type_ptr.get_basic_info().converted_type() {
763-
ConvertedType::DECIMAL => Some(DataType::Decimal128(
764-
type_ptr.get_precision() as u8,
765-
type_ptr.get_scale() as i8,
766-
)),
767-
_ => None,
768-
},
769-
}
770-
}
771-
772743
#[cfg(test)]
773744
mod tests {
774745
// See also `parquet_exec` integration test
@@ -798,7 +769,7 @@ mod tests {
798769
};
799770
use arrow::datatypes::{Field, Schema, SchemaBuilder};
800771
use arrow::record_batch::RecordBatch;
801-
use arrow_schema::Fields;
772+
use arrow_schema::{DataType, Fields};
802773
use datafusion_common::{assert_contains, ScalarValue};
803774
use datafusion_expr::{col, lit, when, Expr};
804775
use datafusion_physical_expr::planner::logical2physical;

datafusion/core/src/datasource/physical_plan/parquet/opener.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
//! [`ParquetOpener`] for opening Parquet files
1919
20-
use crate::datasource::physical_plan::parquet::page_filter::PagePruningPredicate;
20+
use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter;
2121
use crate::datasource::physical_plan::parquet::row_groups::RowGroupAccessPlanFilter;
2222
use crate::datasource::physical_plan::parquet::{
2323
row_filter, should_enable_page_index, ParquetAccessPlan,
@@ -46,7 +46,7 @@ pub(super) struct ParquetOpener {
4646
pub limit: Option<usize>,
4747
pub predicate: Option<Arc<dyn PhysicalExpr>>,
4848
pub pruning_predicate: Option<Arc<PruningPredicate>>,
49-
pub page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
49+
pub page_pruning_predicate: Option<Arc<PagePruningAccessPlanFilter>>,
5050
pub table_schema: SchemaRef,
5151
pub metadata_size_hint: Option<usize>,
5252
pub metrics: ExecutionPlanMetricsSet,

0 commit comments

Comments
 (0)