Skip to content

Commit

Permalink
Improve ParquetExec and related documentation (apache#10647)
Browse files Browse the repository at this point in the history
* Improve ParquetExec and related documentation

* Improve documentation

* Update datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs

Co-authored-by: Oleks V <[email protected]>

* fix link

---------

Co-authored-by: Oleks V <[email protected]>
  • Loading branch information
2 people authored and findepi committed Jul 16, 2024
1 parent c4c4226 commit 2ebb30b
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 24 deletions.
127 changes: 114 additions & 13 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,84 @@ use crate::datasource::schema_adapter::{
pub use metrics::ParquetFileMetrics;
pub use statistics::{RequestedStatistics, StatisticsConverter};

/// Execution plan for scanning one or more Parquet partitions
/// Execution plan for reading one or more Parquet files.
///
/// ```text
/// ▲
/// │
/// │ Produce a stream of
/// │ RecordBatches
/// │
/// ┌───────────────────────┐
/// │ │
/// │ ParquetExec │
/// │ │
/// └───────────────────────┘
/// ▲
/// │ Asynchronously read from one
/// │ or more parquet files via
/// │ ObjectStore interface
/// │
/// │
/// .───────────────────.
/// │ )
/// │`───────────────────'│
/// │ ObjectStore │
/// │.───────────────────.│
/// │ )
/// `───────────────────'
///
/// ```
/// # Features
///
/// Supports the following optimizations:
///
/// * Concurrent reads: Can read from one or more files in parallel as multiple
/// partitions, including concurrently reading multiple row groups from a single
/// file.
///
/// * Predicate push down: skips row groups and pages based on
/// min/max/null_counts in the row group metadata, the page index and bloom
/// filters.
///
/// * Projection pushdown: reads and decodes only the columns required.
///
/// * Limit pushdown: stop execution early after some number of rows are read.
///
/// * Custom readers: customize reading parquet files, e.g. to cache metadata,
/// coalesce I/O operations, etc. See [`ParquetFileReaderFactory`] for more
/// details.
///
/// * Schema adapters: read parquet files with different schemas into a unified
/// table schema. This can be used to implement "schema evolution". See
/// [`SchemaAdapterFactory`] for more details.
///
/// * metadata_size_hint: controls the number of bytes read from the end of the
/// file in the initial I/O when the default [`ParquetFileReaderFactory`]. If a
/// custom reader is used, it supplies the metadata directly and this parameter
/// is ignored. See [`Self::with_parquet_file_reader_factory`] for more details.
///
/// # Execution Overview
///
/// * Step 1: [`ParquetExec::execute`] is called, returning a [`FileStream`]
/// configured to open parquet files with a [`ParquetOpener`].
///
/// * Step 2: When the stream is polled, the [`ParquetOpener`] is called to open
/// the file.
///
/// * Step 3: The `ParquetOpener` gets the file metadata via
/// [`ParquetFileReaderFactory`] and applies any predicates
/// and projections to determine what pages must be read.
///
/// * Step 4: The stream begins reading data, fetching the required pages
/// and incrementally decoding them.
///
/// * Step 5: As each [`RecordBatch]` is read, it may be adapted by a
/// [`SchemaAdapter`] to match the table schema. By default missing columns are
/// filled with nulls, but this can be customized via [`SchemaAdapterFactory`].
///
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
/// [`SchemaAdapter`]: crate::datasource::schema_adapter::SchemaAdapter
#[derive(Debug, Clone)]
pub struct ParquetExec {
/// Base configuration for this scan
Expand All @@ -86,9 +163,9 @@ pub struct ParquetExec {
metrics: ExecutionPlanMetricsSet,
/// Optional predicate for row filtering during parquet scan
predicate: Option<Arc<dyn PhysicalExpr>>,
/// Optional predicate for pruning row groups
/// Optional predicate for pruning row groups (derived from `predicate`)
pruning_predicate: Option<Arc<PruningPredicate>>,
/// Optional predicate for pruning pages
/// Optional predicate for pruning pages (derived from `predicate`)
page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
/// Optional hint for the size of the parquet metadata
metadata_size_hint: Option<usize>,
Expand Down Expand Up @@ -190,11 +267,13 @@ impl ParquetExec {

/// Optional user defined parquet file reader factory.
///
/// `ParquetFileReaderFactory` complements `TableProvider`, It enables users to provide custom
/// implementation for data access operations.
/// You can use [`ParquetFileReaderFactory`] to more precisely control how
/// data is read from parquet files (e.g. skip re-reading metadata, coalesce
/// I/O operations, etc).
///
/// If custom `ParquetFileReaderFactory` is provided, then data access operations will be routed
/// to this factory instead of `ObjectStore`.
/// The default reader factory reads directly from an [`ObjectStore`]
/// instance using individual I/O operations for the footer and then for
/// each page.
pub fn with_parquet_file_reader_factory(
mut self,
parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
Expand Down Expand Up @@ -643,11 +722,21 @@ fn should_enable_page_index(
.unwrap_or(false)
}

/// Factory of parquet file readers.
/// Interface for reading parquet files.
///
/// Provides means to implement custom data access interface.
/// The combined implementations of [`ParquetFileReaderFactory`] and
/// [`AsyncFileReader`] can be used to provide custom data access operations
/// such as pre-cached data, I/O coalescing, etc.
///
/// See [`DefaultParquetFileReaderFactory`] for a simple implementation.
pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
/// Provides `AsyncFileReader` over parquet file specified in `FileMeta`
/// Provides an `AsyncFileReader` for reading data from a parquet file specified
///
/// # Arguments
/// * partition_index - Index of the partition (for reporting metrics)
/// * file_meta - The file to be read
/// * metadata_size_hint - If specified, the first IO reads this many bytes from the footer
/// * metrics - Execution metrics
fn create_reader(
&self,
partition_index: usize,
Expand All @@ -657,20 +746,32 @@ pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
) -> Result<Box<dyn AsyncFileReader + Send>>;
}

/// Default parquet reader factory.
/// Default implementation of [`ParquetFileReaderFactory`]
///
/// This implementation:
/// 1. Reads parquet directly from an underlying [`ObjectStore`] instance.
/// 2. Reads the footer and page metadata on demand.
/// 3. Does not cache metadata or coalesce I/O operations.
#[derive(Debug)]
pub struct DefaultParquetFileReaderFactory {
store: Arc<dyn ObjectStore>,
}

impl DefaultParquetFileReaderFactory {
/// Create a factory.
/// Create a new `DefaultParquetFileReaderFactory`.
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self { store }
}
}

/// Implements [`AsyncFileReader`] for a parquet file in object storage
/// Implements [`AsyncFileReader`] for a parquet file in object storage.
///
/// This implementation uses the [`ParquetObjectReader`] to read data from the
/// object store on demand, as required, tracking the number of bytes read.
///
/// This implementation does not coalesce I/O operations or cache bytes. Such
/// optimizations can be done either at the object store level or by providing a
/// custom implementation of [`ParquetFileReaderFactory`].
pub(crate) struct ParquetFileReader {
file_metrics: ParquetFileMetrics,
inner: ParquetObjectReader,
Expand Down
28 changes: 17 additions & 11 deletions datafusion/core/src/datasource/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.

//! Schema Adapter provides a method of translating the RecordBatches that come out of the
//! [`SchemaAdapter`] and [`SchemaAdapterFactory`] to adapt file-level record batches to a table schema.
//!
//! Adapter provides a method of translating the RecordBatches that come out of the
//! physical format into how they should be used by DataFusion. For instance, a schema
//! can be stored external to a parquet file that maps parquet logical types to arrow types.

Expand All @@ -26,35 +28,38 @@ use datafusion_common::plan_err;
use std::fmt::Debug;
use std::sync::Arc;

/// Factory of schema adapters.
/// Factory for creating [`SchemaAdapter`]
///
/// Provides means to implement custom schema adaptation.
/// This interface provides a way to implement custom schema adaptation logic
/// for ParquetExec (for example, to fill missing columns with default value
/// other than null)
pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
/// Provides `SchemaAdapter`.
fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter>;
}

/// A utility which can adapt file-level record batches to a table schema which may have a schema
/// Adapt file-level [`RecordBatch`]es to a table schema, which may have a schema
/// obtained from merging multiple file-level schemas.
///
/// This is useful for enabling schema evolution in partitioned datasets.
///
/// This has to be done in two stages.
///
/// 1. Before reading the file, we have to map projected column indexes from the table schema to
/// the file schema.
/// 1. Before reading the file, we have to map projected column indexes from the
/// table schema to the file schema.
///
/// 2. After reading a record batch we need to map the read columns back to the expected columns
/// indexes and insert null-valued columns wherever the file schema was missing a colum present
/// in the table schema.
/// 2. After reading a record batch map the read columns back to the expected
/// columns indexes and insert null-valued columns wherever the file schema was
/// missing a column present in the table schema.
pub trait SchemaAdapter: Send + Sync {
/// Map a column index in the table schema to a column index in a particular
/// file schema
///
/// Panics if index is not in range for the table schema
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize>;

/// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema.
/// Creates a `SchemaMapping` that can be used to cast or map the columns
/// from the file schema to the table schema.
///
/// If the provided `file_schema` contains columns of a different type to the expected
/// `table_schema`, the method will attempt to cast the array data from the file schema
Expand All @@ -68,7 +73,8 @@ pub trait SchemaAdapter: Send + Sync {
) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
}

/// Transforms a RecordBatch from the physical layer to a RecordBatch that meets the table schema.
/// Creates a `SchemaMapping` that can be used to cast or map the columns
/// from the file schema to the table schema.
pub trait SchemaMapper: Send + Sync {
/// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions.
fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;
Expand Down

0 comments on commit 2ebb30b

Please sign in to comment.