Skip to content

Commit 4884c08

Browse files
authored
Use upstream StatisticsConveter (#11479)
1 parent 9dd2cfc commit 4884c08

File tree

10 files changed

+31
-5554
lines changed

10 files changed

+31
-5554
lines changed

datafusion-examples/examples/parquet_index.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,10 @@ use arrow_schema::SchemaRef;
2525
use async_trait::async_trait;
2626
use datafusion::catalog::Session;
2727
use datafusion::datasource::listing::PartitionedFile;
28-
use datafusion::datasource::physical_plan::{
29-
parquet::StatisticsConverter,
30-
{FileScanConfig, ParquetExec},
31-
};
28+
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
3229
use datafusion::datasource::TableProvider;
3330
use datafusion::execution::object_store::ObjectStoreUrl;
31+
use datafusion::parquet::arrow::arrow_reader::statistics::StatisticsConverter;
3432
use datafusion::parquet::arrow::{
3533
arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter,
3634
};

datafusion/core/Cargo.toml

-4
Original file line numberDiff line numberDiff line change
@@ -217,10 +217,6 @@ name = "sort"
217217
harness = false
218218
name = "topk_aggregate"
219219

220-
[[bench]]
221-
harness = false
222-
name = "parquet_statistic"
223-
224220
[[bench]]
225221
harness = false
226222
name = "map_query_sql"

datafusion/core/benches/parquet_statistic.rs

-287
This file was deleted.

datafusion/core/src/datasource/file_format/parquet.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,11 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
7575
use tokio::sync::mpsc::{self, Receiver, Sender};
7676
use tokio::task::JoinSet;
7777

78-
use crate::datasource::physical_plan::parquet::{
79-
ParquetExecBuilder, StatisticsConverter,
80-
};
78+
use crate::datasource::physical_plan::parquet::ParquetExecBuilder;
8179
use futures::{StreamExt, TryStreamExt};
8280
use object_store::path::Path;
8381
use object_store::{ObjectMeta, ObjectStore};
82+
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
8483

8584
/// Initial writing buffer size. Note this is just a size hint for efficiency. It
8685
/// will grow beyond the set value if needed.

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

-2
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ mod page_filter;
5252
mod reader;
5353
mod row_filter;
5454
mod row_group_filter;
55-
mod statistics;
5655
mod writer;
5756

5857
use crate::datasource::schema_adapter::{
@@ -62,7 +61,6 @@ pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
6261
pub use metrics::ParquetFileMetrics;
6362
use opener::ParquetOpener;
6463
pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
65-
pub use statistics::StatisticsConverter;
6664
pub use writer::plan_to_parquet;
6765

6866
/// Execution plan for reading one or more Parquet files.

datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,16 @@
1717

1818
//! Contains code to filter entire pages
1919
20+
use super::metrics::ParquetFileMetrics;
2021
use crate::datasource::physical_plan::parquet::ParquetAccessPlan;
21-
use crate::datasource::physical_plan::parquet::StatisticsConverter;
2222
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
2323
use arrow::array::BooleanArray;
2424
use arrow::{array::ArrayRef, datatypes::SchemaRef};
2525
use arrow_schema::Schema;
2626
use datafusion_common::ScalarValue;
2727
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
2828
use log::{debug, trace};
29+
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
2930
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex};
3031
use parquet::format::PageLocation;
3132
use parquet::schema::types::SchemaDescriptor;
@@ -36,8 +37,6 @@ use parquet::{
3637
use std::collections::HashSet;
3738
use std::sync::Arc;
3839

39-
use super::metrics::ParquetFileMetrics;
40-
4140
/// Filters a [`ParquetAccessPlan`] based on the [Parquet PageIndex], if present
4241
///
4342
/// It does so by evaluating statistics from the [`ParquetColumnIndex`] and
@@ -377,7 +376,7 @@ impl<'a> PagesPruningStatistics<'a> {
377376
converter: StatisticsConverter<'a>,
378377
parquet_metadata: &'a ParquetMetaData,
379378
) -> Option<Self> {
380-
let Some(parquet_column_index) = converter.parquet_index() else {
379+
let Some(parquet_column_index) = converter.parquet_column_index() else {
381380
trace!(
382381
"Column {:?} not in parquet file, skipping",
383382
converter.arrow_field()
@@ -432,7 +431,6 @@ impl<'a> PagesPruningStatistics<'a> {
432431
Some(vec)
433432
}
434433
}
435-
436434
impl<'a> PruningStatistics for PagesPruningStatistics<'a> {
437435
fn min_values(&self, _column: &datafusion_common::Column) -> Option<ArrayRef> {
438436
match self.converter.data_page_mins(

0 commit comments

Comments
 (0)