diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 16da9c2d771a3..6655125ea8766 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -76,7 +76,84 @@ use crate::datasource::schema_adapter::{ pub use metrics::ParquetFileMetrics; pub use statistics::{RequestedStatistics, StatisticsConverter}; -/// Execution plan for scanning one or more Parquet partitions +/// Execution plan for reading one or more Parquet files. +/// +/// ```text +/// ▲ +/// │ +/// │ Produce a stream of +/// │ RecordBatches +/// │ +/// ┌───────────────────────┐ +/// │ │ +/// │ ParquetExec │ +/// │ │ +/// └───────────────────────┘ +/// ▲ +/// │ Asynchronously read from one +/// │ or more parquet files via +/// │ ObjectStore interface +/// │ +/// │ +/// .───────────────────. +/// │ ) +/// │`───────────────────'│ +/// │ ObjectStore │ +/// │.───────────────────.│ +/// │ ) +/// `───────────────────' +/// +/// ``` +/// # Features +/// +/// Supports the following optimizations: +/// +/// * Concurrent reads: Can read from one or more files in parallel as multiple +/// partitions, including concurrently reading multiple row groups from a single +/// file. +/// +/// * Predicate push down: skips row groups and pages based on +/// min/max/null_counts in the row group metadata, the page index and bloom +/// filters. +/// +/// * Projection pushdown: reads and decodes only the columns required. +/// +/// * Limit pushdown: stop execution early after some number of rows are read. +/// +/// * Custom readers: customize reading parquet files, e.g. to cache metadata, +/// coalesce I/O operations, etc. See [`ParquetFileReaderFactory`] for more +/// details. +/// +/// * Schema adapters: read parquet files with different schemas into a unified +/// table schema. This can be used to implement "schema evolution". See +/// [`SchemaAdapterFactory`] for more details. +/// +/// * metadata_size_hint: controls the number of bytes read from the end of the +/// file in the initial I/O when the default [`ParquetFileReaderFactory`]. If a +/// custom reader is used, it supplies the metadata directly and this parameter +/// is ignored. See [`Self::with_parquet_file_reader_factory`] for more details. +/// +/// # Execution Overview +/// +/// * Step 1: [`ParquetExec::execute`] is called, returning a [`FileStream`] +/// configured to open parquet files with a [`ParquetOpener`]. +/// +/// * Step 2: When the stream is polled, the [`ParquetOpener`] is called to open +/// the file. +/// +/// * Step 3: The `ParquetOpener` gets the file metadata via +/// [`ParquetFileReaderFactory`] and applies any predicates +/// and projections to determine what pages must be read. +/// +/// * Step 4: The stream begins reading data, fetching the required pages +/// and incrementally decoding them. +/// +/// * Step 5: As each [`RecordBatch]` is read, it may be adapted by a +/// [`SchemaAdapter`] to match the table schema. By default missing columns are +/// filled with nulls, but this can be customized via [`SchemaAdapterFactory`]. +/// +/// [`RecordBatch`]: arrow::record_batch::RecordBatch +/// [`SchemaAdapter`]: crate::datasource::schema_adapter::SchemaAdapter #[derive(Debug, Clone)] pub struct ParquetExec { /// Base configuration for this scan @@ -86,9 +163,9 @@ pub struct ParquetExec { metrics: ExecutionPlanMetricsSet, /// Optional predicate for row filtering during parquet scan predicate: Option>, - /// Optional predicate for pruning row groups + /// Optional predicate for pruning row groups (derived from `predicate`) pruning_predicate: Option>, - /// Optional predicate for pruning pages + /// Optional predicate for pruning pages (derived from `predicate`) page_pruning_predicate: Option>, /// Optional hint for the size of the parquet metadata metadata_size_hint: Option, @@ -190,11 +267,13 @@ impl ParquetExec { /// Optional user defined parquet file reader factory. /// - /// `ParquetFileReaderFactory` complements `TableProvider`, It enables users to provide custom - /// implementation for data access operations. + /// You can use [`ParquetFileReaderFactory`] to more precisely control how + /// data is read from parquet files (e.g. skip re-reading metadata, coalesce + /// I/O operations, etc). /// - /// If custom `ParquetFileReaderFactory` is provided, then data access operations will be routed - /// to this factory instead of `ObjectStore`. + /// The default reader factory reads directly from an [`ObjectStore`] + /// instance using individual I/O operations for the footer and then for + /// each page. pub fn with_parquet_file_reader_factory( mut self, parquet_file_reader_factory: Arc, @@ -643,11 +722,21 @@ fn should_enable_page_index( .unwrap_or(false) } -/// Factory of parquet file readers. +/// Interface for reading parquet files. /// -/// Provides means to implement custom data access interface. +/// The combined implementations of [`ParquetFileReaderFactory`] and +/// [`AsyncFileReader`] can be used to provide custom data access operations +/// such as pre-cached data, I/O coalescing, etc. +/// +/// See [`DefaultParquetFileReaderFactory`] for a simple implementation. pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static { - /// Provides `AsyncFileReader` over parquet file specified in `FileMeta` + /// Provides an `AsyncFileReader` for reading data from a parquet file specified + /// + /// # Arguments + /// * partition_index - Index of the partition (for reporting metrics) + /// * file_meta - The file to be read + /// * metadata_size_hint - If specified, the first IO reads this many bytes from the footer + /// * metrics - Execution metrics fn create_reader( &self, partition_index: usize, @@ -657,20 +746,32 @@ pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static { ) -> Result>; } -/// Default parquet reader factory. +/// Default implementation of [`ParquetFileReaderFactory`] +/// +/// This implementation: +/// 1. Reads parquet directly from an underlying [`ObjectStore`] instance. +/// 2. Reads the footer and page metadata on demand. +/// 3. Does not cache metadata or coalesce I/O operations. #[derive(Debug)] pub struct DefaultParquetFileReaderFactory { store: Arc, } impl DefaultParquetFileReaderFactory { - /// Create a factory. + /// Create a new `DefaultParquetFileReaderFactory`. pub fn new(store: Arc) -> Self { Self { store } } } -/// Implements [`AsyncFileReader`] for a parquet file in object storage +/// Implements [`AsyncFileReader`] for a parquet file in object storage. +/// +/// This implementation uses the [`ParquetObjectReader`] to read data from the +/// object store on demand, as required, tracking the number of bytes read. +/// +/// This implementation does not coalesce I/O operations or cache bytes. Such +/// optimizations can be done either at the object store level or by providing a +/// custom implementation of [`ParquetFileReaderFactory`]. pub(crate) struct ParquetFileReader { file_metrics: ParquetFileMetrics, inner: ParquetObjectReader, diff --git a/datafusion/core/src/datasource/schema_adapter.rs b/datafusion/core/src/datasource/schema_adapter.rs index 36d33379b8877..1838a3354b9ce 100644 --- a/datafusion/core/src/datasource/schema_adapter.rs +++ b/datafusion/core/src/datasource/schema_adapter.rs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. -//! Schema Adapter provides a method of translating the RecordBatches that come out of the +//! [`SchemaAdapter`] and [`SchemaAdapterFactory`] to adapt file-level record batches to a table schema. +//! +//! Adapter provides a method of translating the RecordBatches that come out of the //! physical format into how they should be used by DataFusion. For instance, a schema //! can be stored external to a parquet file that maps parquet logical types to arrow types. @@ -26,27 +28,29 @@ use datafusion_common::plan_err; use std::fmt::Debug; use std::sync::Arc; -/// Factory of schema adapters. +/// Factory for creating [`SchemaAdapter`] /// -/// Provides means to implement custom schema adaptation. +/// This interface provides a way to implement custom schema adaptation logic +/// for ParquetExec (for example, to fill missing columns with default value +/// other than null) pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static { /// Provides `SchemaAdapter`. fn create(&self, schema: SchemaRef) -> Box; } -/// A utility which can adapt file-level record batches to a table schema which may have a schema +/// Adapt file-level [`RecordBatch`]es to a table schema, which may have a schema /// obtained from merging multiple file-level schemas. /// /// This is useful for enabling schema evolution in partitioned datasets. /// /// This has to be done in two stages. /// -/// 1. Before reading the file, we have to map projected column indexes from the table schema to -/// the file schema. +/// 1. Before reading the file, we have to map projected column indexes from the +/// table schema to the file schema. /// -/// 2. After reading a record batch we need to map the read columns back to the expected columns -/// indexes and insert null-valued columns wherever the file schema was missing a colum present -/// in the table schema. +/// 2. After reading a record batch map the read columns back to the expected +/// columns indexes and insert null-valued columns wherever the file schema was +/// missing a column present in the table schema. pub trait SchemaAdapter: Send + Sync { /// Map a column index in the table schema to a column index in a particular /// file schema @@ -54,7 +58,8 @@ pub trait SchemaAdapter: Send + Sync { /// Panics if index is not in range for the table schema fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option; - /// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema. + /// Creates a `SchemaMapping` that can be used to cast or map the columns + /// from the file schema to the table schema. /// /// If the provided `file_schema` contains columns of a different type to the expected /// `table_schema`, the method will attempt to cast the array data from the file schema @@ -68,7 +73,8 @@ pub trait SchemaAdapter: Send + Sync { ) -> datafusion_common::Result<(Arc, Vec)>; } -/// Transforms a RecordBatch from the physical layer to a RecordBatch that meets the table schema. +/// Creates a `SchemaMapping` that can be used to cast or map the columns +/// from the file schema to the table schema. pub trait SchemaMapper: Send + Sync { /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions. fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result;