Skip to content

Commit 620f0bb

Browse files
committed
parquet: Add support for user-provided metadata loaders
This allows users to, for example, cache the Page Index so it does not need to be parsed every time we open the file.
1 parent a35d007 commit 620f0bb

File tree

4 files changed

+58
-15
lines changed

4 files changed

+58
-15
lines changed

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ use crate::datasource::schema_adapter::{
6060
pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
6161
pub use metrics::ParquetFileMetrics;
6262
use opener::ParquetOpener;
63-
pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
63+
pub use reader::{
64+
DefaultParquetFileReaderFactory, ParquetFileReader, ParquetFileReaderFactory,
65+
};
6466
pub use row_filter::can_expr_be_pushed_down_with_schemas;
6567
pub use writer::plan_to_parquet;
6668

datafusion/core/src/datasource/physical_plan/parquet/opener.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::datasource::file_format::coerce_file_schema_to_view_type;
2121
use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter;
2222
use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter;
2323
use crate::datasource::physical_plan::parquet::{
24-
row_filter, should_enable_page_index, ParquetAccessPlan,
24+
row_filter, should_enable_page_index, ParquetAccessPlan, ParquetFileReader,
2525
};
2626
use crate::datasource::physical_plan::{
2727
FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics, ParquetFileReaderFactory,
@@ -35,7 +35,6 @@ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
3535
use futures::{StreamExt, TryStreamExt};
3636
use log::debug;
3737
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
38-
use parquet::arrow::async_reader::AsyncFileReader;
3938
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
4039
use std::sync::Arc;
4140

@@ -87,7 +86,7 @@ impl FileOpener for ParquetOpener {
8786
let file_metrics =
8887
ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics);
8988

90-
let mut reader: Box<dyn AsyncFileReader> =
89+
let mut reader: Box<dyn ParquetFileReader> =
9190
self.parquet_file_reader_factory.create_reader(
9291
self.partition_index,
9392
file_meta,
@@ -118,8 +117,7 @@ impl FileOpener for ParquetOpener {
118117
Ok(Box::pin(async move {
119118
let options = ArrowReaderOptions::new().with_page_index(enable_page_index);
120119

121-
let metadata =
122-
ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?;
120+
let metadata = reader.load_metadata(options.clone()).await?;
123121
let mut schema = metadata.schema().clone();
124122
// read with view types
125123
if let Some(merged) = coerce_file_schema_to_view_type(&table_schema, &schema)
@@ -133,8 +131,10 @@ impl FileOpener for ParquetOpener {
133131
let metadata =
134132
ArrowReaderMetadata::try_new(metadata.metadata().clone(), options)?;
135133

136-
let mut builder =
137-
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata);
134+
let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
135+
reader.upcast(),
136+
metadata,
137+
);
138138

139139
let file_schema = builder.schema().clone();
140140

datafusion/core/src/datasource/physical_plan/parquet/reader.rs

+46-5
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use bytes::Bytes;
2323
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
2424
use futures::future::BoxFuture;
2525
use object_store::ObjectStore;
26+
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
2627
use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
2728
use parquet::file::metadata::ParquetMetaData;
2829
use std::fmt::Debug;
@@ -57,9 +58,49 @@ pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
5758
file_meta: FileMeta,
5859
metadata_size_hint: Option<usize>,
5960
metrics: &ExecutionPlanMetricsSet,
60-
) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>>;
61+
) -> datafusion_common::Result<Box<dyn ParquetFileReader>>;
6162
}
6263

64+
/// [`AsyncFileReader`] augmented with a method to customize how file metadata is loaded.
65+
pub trait ParquetFileReader: AsyncFileReader + Send + 'static {
66+
/// Returns a [`AsyncFileReader`] trait object
67+
///
68+
/// This can usually be implemented as `Box::new(*self)`
69+
fn upcast(self: Box<Self>) -> Box<dyn AsyncFileReader + 'static>;
70+
71+
/// Parses the file's metadata
72+
///
73+
/// The default implementation is:
74+
///
75+
/// ```
76+
/// Box::pin(ArrowReaderMetadata::load_async(self, options))
77+
/// ```
78+
fn load_metadata(
79+
&mut self,
80+
options: ArrowReaderOptions,
81+
) -> BoxFuture<'_, parquet::errors::Result<ArrowReaderMetadata>>;
82+
}
83+
84+
macro_rules! impl_ParquetFileReader {
85+
($type:ty) => {
86+
impl ParquetFileReader for $type {
87+
fn upcast(self: Box<Self>) -> Box<dyn AsyncFileReader + 'static> {
88+
Box::new(*self)
89+
}
90+
91+
fn load_metadata(
92+
&mut self,
93+
options: ArrowReaderOptions,
94+
) -> BoxFuture<'_, parquet::errors::Result<ArrowReaderMetadata>> {
95+
Box::pin(ArrowReaderMetadata::load_async(self, options))
96+
}
97+
}
98+
};
99+
}
100+
101+
impl_ParquetFileReader!(ParquetObjectReader);
102+
impl_ParquetFileReader!(DefaultParquetFileReader);
103+
63104
/// Default implementation of [`ParquetFileReaderFactory`]
64105
///
65106
/// This implementation:
@@ -86,12 +127,12 @@ impl DefaultParquetFileReaderFactory {
86127
/// This implementation does not coalesce I/O operations or cache bytes. Such
87128
/// optimizations can be done either at the object store level or by providing a
88129
/// custom implementation of [`ParquetFileReaderFactory`].
89-
pub(crate) struct ParquetFileReader {
130+
pub(crate) struct DefaultParquetFileReader {
90131
pub file_metrics: ParquetFileMetrics,
91132
pub inner: ParquetObjectReader,
92133
}
93134

94-
impl AsyncFileReader for ParquetFileReader {
135+
impl AsyncFileReader for DefaultParquetFileReader {
95136
fn get_bytes(
96137
&mut self,
97138
range: Range<usize>,
@@ -126,7 +167,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
126167
file_meta: FileMeta,
127168
metadata_size_hint: Option<usize>,
128169
metrics: &ExecutionPlanMetricsSet,
129-
) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
170+
) -> datafusion_common::Result<Box<dyn ParquetFileReader>> {
130171
let file_metrics = ParquetFileMetrics::new(
131172
partition_index,
132173
file_meta.location().as_ref(),
@@ -139,7 +180,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
139180
inner = inner.with_footer_size_hint(hint)
140181
};
141182

142-
Ok(Box::new(ParquetFileReader {
183+
Ok(Box::new(DefaultParquetFileReader {
143184
inner,
144185
file_metrics,
145186
}))

datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ mod tests {
416416
use std::sync::Arc;
417417

418418
use super::*;
419-
use crate::datasource::physical_plan::parquet::reader::ParquetFileReader;
419+
use crate::datasource::physical_plan::parquet::reader::DefaultParquetFileReader;
420420
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
421421

422422
use arrow::datatypes::DataType::Decimal128;
@@ -1516,7 +1516,7 @@ mod tests {
15161516
let metrics = ExecutionPlanMetricsSet::new();
15171517
let file_metrics =
15181518
ParquetFileMetrics::new(0, object_meta.location.as_ref(), &metrics);
1519-
let reader = ParquetFileReader {
1519+
let reader = DefaultParquetFileReader {
15201520
inner: ParquetObjectReader::new(Arc::new(in_memory), object_meta),
15211521
file_metrics: file_metrics.clone(),
15221522
};

0 commit comments

Comments
 (0)