Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parquet: Add option to cache file metadata #12548

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,11 @@ config_namespace! {
/// multiple parquet files with schemas containing compatible types but different metadata
pub skip_metadata: bool, default = true

/// (reading) If true, caches the Parquet file-level metadata so it does not need
/// to be parsed on every query. This should typically be true when running more than
/// one (short) query on a table.
pub cache_metadata: bool, default = false

/// (reading) If specified, the parquet reader will try and fetch the last `size_hint`
/// bytes of the parquet file optimistically. If not specified, two reads are required:
/// One read to fetch the 8-byte parquet footer and
Expand Down
1 change: 1 addition & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ impl ParquetOptions {
enable_page_index: _,
pruning: _,
skip_metadata: _,
cache_metadata: _,
metadata_size_hint: _,
pushdown_filters: _,
reorder_filters: _,
Expand Down
63 changes: 59 additions & 4 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ use datafusion_common::{
};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
use datafusion_expr::Expr;
use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
Expand All @@ -60,6 +62,7 @@ use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use dashmap::DashMap;
use hashbrown::HashMap;
use log::debug;
use object_store::buffered::BufWriter;
Expand All @@ -80,7 +83,8 @@ use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinSet;

use crate::datasource::physical_plan::parquet::{
can_expr_be_pushed_down_with_schemas, ParquetExecBuilder,
can_expr_be_pushed_down_with_schemas, CachedParquetFileReaderFactory,
DefaultParquetFileReaderFactory, ParquetExecBuilder, ParquetFileReaderFactory,
};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use futures::{StreamExt, TryStreamExt};
Expand Down Expand Up @@ -140,7 +144,9 @@ impl FileFormatFactory for ParquetFormatFactory {
};

Ok(Arc::new(
ParquetFormat::default().with_options(parquet_options),
ParquetFormat::default()
.with_options(parquet_options)
.with_runtime_env(Some(state.runtime_env().clone())),
))
}

Expand Down Expand Up @@ -171,6 +177,8 @@ impl fmt::Debug for ParquetFormatFactory {
#[derive(Debug, Default)]
pub struct ParquetFormat {
options: TableParquetOptions,
runtime_env: Option<Arc<RuntimeEnv>>,
reader_factory: DashMap<ObjectStoreUrl, Arc<dyn ParquetFileReaderFactory>>,
}

impl ParquetFormat {
Expand All @@ -182,6 +190,7 @@ impl ParquetFormat {
/// Activate statistics based row group level pruning
/// - If `None`, defaults to value on `config_options`
pub fn with_enable_pruning(mut self, enable: bool) -> Self {
self.reader_factory.clear();
self.options.global.pruning = enable;
self
}
Expand All @@ -198,6 +207,7 @@ impl ParquetFormat {
///
/// - If `None`, defaults to value on `config_options`
pub fn with_metadata_size_hint(mut self, size_hint: Option<usize>) -> Self {
self.reader_factory.clear();
self.options.global.metadata_size_hint = size_hint;
self
}
Expand All @@ -213,6 +223,7 @@ impl ParquetFormat {
///
/// - If `None`, defaults to value on `config_options`
pub fn with_skip_metadata(mut self, skip_metadata: bool) -> Self {
self.reader_factory.clear();
self.options.global.skip_metadata = skip_metadata;
self
}
Expand All @@ -225,6 +236,7 @@ impl ParquetFormat {

/// Set Parquet options for the ParquetFormat
pub fn with_options(mut self, options: TableParquetOptions) -> Self {
self.reader_factory.clear();
self.options = options;
self
}
Expand All @@ -234,6 +246,18 @@ impl ParquetFormat {
&self.options
}

/// Sets the session this factory will create tables for.
pub fn with_runtime_env(mut self, runtime_env: Option<Arc<RuntimeEnv>>) -> Self {
self.reader_factory.clear();
self.runtime_env = runtime_env;
self
}

/// Returns the session this factory creates tables for, if any.
pub fn runtime_env(&self) -> Option<&Arc<RuntimeEnv>> {
self.runtime_env.as_ref()
}

/// Return `true` if should use view types.
///
/// If this returns true, DataFusion will instruct the parquet reader
Expand All @@ -253,9 +277,37 @@ impl ParquetFormat {
///
/// Refer to [`Self::force_view_types`].
pub fn with_force_view_types(mut self, use_views: bool) -> Self {
self.reader_factory.clear();
self.options.global.schema_force_view_types = use_views;
self
}

/// Returns the current [`ParquetFileReaderFactory`], if a [runtime environment is set](`Self::with_runtime_env`)
///
/// This may create it if it was not accessed before.
pub fn reader_factory(
&self,
object_store_url: ObjectStoreUrl,
) -> Result<Option<Arc<dyn ParquetFileReaderFactory>>> {
let cache_metadata = self.options.global.cache_metadata;
self.runtime_env
.as_ref()
.map(|runtime_env| {
let store = runtime_env.object_store(&object_store_url)?;
Ok(Arc::clone(
&self
.reader_factory
.entry(object_store_url.clone())
.or_insert_with(|| match cache_metadata {
false => {
Arc::new(DefaultParquetFileReaderFactory::new(store))
}
true => Arc::new(CachedParquetFileReaderFactory::new(store)),
}),
))
})
.transpose()
}
}

/// Clears all metadata (Schema level and field level) on an iterator
Expand Down Expand Up @@ -378,9 +430,14 @@ impl FileFormat for ParquetFormat {
conf: FileScanConfig,
filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let object_store_url = conf.object_store_url.clone();
let mut builder =
ParquetExecBuilder::new_with_options(conf, self.options.clone());

if let Some(reader_factory) = self.reader_factory(object_store_url)? {
builder = builder.with_parquet_file_reader_factory(reader_factory);
}

// If enable pruning then combine the filters to build the predicate.
// If disable pruning then set the predicate to None, thus readers
// will not prune data based on the statistics.
Expand Down Expand Up @@ -1303,8 +1360,6 @@ mod tests {
use datafusion_common::config::ParquetOptions;
use datafusion_common::ScalarValue;
use datafusion_common::ScalarValue::Utf8;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use futures::stream::BoxStream;
use log::error;
Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ use crate::datasource::schema_adapter::{
pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
pub use metrics::ParquetFileMetrics;
use opener::ParquetOpener;
pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
pub use reader::{
CachedParquetFileReaderFactory, DefaultParquetFileReaderFactory,
ParquetFileReaderFactory,
};
pub use row_filter::can_expr_be_pushed_down_with_schemas;
pub use writer::plan_to_parquet;

Expand Down
103 changes: 99 additions & 4 deletions datafusion/core/src/datasource/physical_plan/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@

use crate::datasource::physical_plan::{FileMeta, ParquetFileMetrics};
use bytes::Bytes;
use dashmap::DashMap;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::future::BoxFuture;
use futures::future::{BoxFuture, Either};
use futures::FutureExt;
use object_store::ObjectStore;
use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use parquet::file::metadata::ParquetMetaData;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;
use std::sync::{Arc, OnceLock};

/// Interface for reading parquet files.
///
Expand Down Expand Up @@ -65,7 +67,8 @@ pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
/// This implementation:
/// 1. Reads parquet directly from an underlying [`ObjectStore`] instance.
/// 2. Reads the footer and page metadata on demand.
/// 3. Does not cache metadata or coalesce I/O operations.
/// 3. Does not cache metadata
/// 4. Does not coalesce I/O operations.
#[derive(Debug)]
pub struct DefaultParquetFileReaderFactory {
store: Arc<dyn ObjectStore>,
Expand All @@ -78,6 +81,34 @@ impl DefaultParquetFileReaderFactory {
}
}

/// Caching implementation of [`ParquetFileReaderFactory`]
///
/// This implementation:
/// 1. Reads parquet directly from an underlying [`ObjectStore`] instance.
/// 2. Reads the footer and page metadata on demand (but may cache them in the future).
/// 3. Ches metadata
/// 4. Does not coalesce I/O operations.
#[derive(Debug)]
pub struct CachedParquetFileReaderFactory {
store: Arc<dyn ObjectStore>,
/// The parquet metadata for each file in the index, keyed by the file name
/// (e.g. `file1.parquet`).
///
/// There are two layers of Arc. The outer one allows sharing the lock while a future is
/// executing, while the inner one shares the metadata between readers once it is cached
metadata: DashMap<String, Arc<OnceLock<Arc<ParquetMetaData>>>>,
}

impl CachedParquetFileReaderFactory {
/// Create a new `CachedParquetFileReaderFactory`.
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self {
store,
metadata: DashMap::new(),
}
}
}

/// Implements [`AsyncFileReader`] for a parquet file in object storage.
///
/// This implementation uses the [`ParquetObjectReader`] to read data from the
Expand All @@ -86,9 +117,12 @@ impl DefaultParquetFileReaderFactory {
/// This implementation does not coalesce I/O operations or cache bytes. Such
/// optimizations can be done either at the object store level or by providing a
/// custom implementation of [`ParquetFileReaderFactory`].
///
/// It will cache file metadata if `metadata_cache` is set.
pub(crate) struct ParquetFileReader {
pub file_metrics: ParquetFileMetrics,
pub inner: ParquetObjectReader,
pub metadata_cache: Option<Arc<OnceLock<Arc<ParquetMetaData>>>>,
}

impl AsyncFileReader for ParquetFileReader {
Expand All @@ -115,7 +149,24 @@ impl AsyncFileReader for ParquetFileReader {
fn get_metadata(
&mut self,
) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
self.inner.get_metadata()
Box::pin(match &self.metadata_cache {
None => Either::Left(self.inner.get_metadata()),
Some(metadata_cache) => Either::Right(match metadata_cache.get() {
Some(metadata) => {
Either::Left(std::future::ready(Ok(Arc::clone(metadata))))
}
None => {
let metadata_cache = Arc::clone(&metadata_cache);
Either::Right(self.inner.get_metadata().inspect(move |metadata| {
if let Ok(metadata) = metadata {
// TODO: use metadata.try_insert when
// https://github.com/rust-lang/rust/issues/116693 is stabilized
metadata_cache.get_or_init(|| Arc::clone(metadata));
}
}))
}
}),
})
}
}

Expand All @@ -142,6 +193,50 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
Ok(Box::new(ParquetFileReader {
inner,
file_metrics,
metadata_cache: None,
}))
}
}

impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
fn create_reader(
&self,
partition_index: usize,
file_meta: FileMeta,
metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
let filename = file_meta
.location()
.parts()
.last()
.expect("No path in location")
.as_ref()
.to_string();

// TODO: cache metrics?
let file_metrics = ParquetFileMetrics::new(
partition_index,
file_meta.location().as_ref(),
metrics,
);
let object_store = Arc::clone(&self.store);
let mut inner = ParquetObjectReader::new(object_store, file_meta.object_meta);

if let Some(hint) = metadata_size_hint {
inner = inner.with_footer_size_hint(hint)
};

let metadata = Arc::clone(
self.metadata
.entry(filename)
.or_insert_with(|| Arc::new(OnceLock::new()))
.value(),
);
Ok(Box::new(ParquetFileReader {
inner,
file_metrics,
metadata_cache: Some(metadata),
}))
}
}
Loading