Skip to content

Parquet: Support reading Parquet metadata via suffix range requests #7334

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 31, 2025
10 changes: 10 additions & 0 deletions parquet/src/arrow/async_reader/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ impl<T: AsyncFileReader> MetadataFetch for &mut T {
}
}

/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`] via suffix
/// requests, without knowing the file size
pub trait MetadataSuffixFetch: MetadataFetch {
/// Return a future that fetches the last `n` bytes asynchronously
///
/// Note the returned type is a boxed future, often created by
/// [FutureExt::boxed]. See the trait documentation for an example
fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>>;
}

/// An asynchronous interface to load [`ParquetMetaData`] from an async source
pub struct MetadataLoader<F> {
/// Function that fetches byte ranges asynchronously
Expand Down
109 changes: 84 additions & 25 deletions parquet/src/arrow/async_reader/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
use std::{ops::Range, sync::Arc};

use crate::arrow::arrow_reader::ArrowReaderOptions;
use crate::arrow::async_reader::AsyncFileReader;
use crate::arrow::async_reader::{AsyncFileReader, MetadataSuffixFetch};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use bytes::Bytes;
use futures::{future::BoxFuture, FutureExt, TryFutureExt};
use object_store::{path::Path, ObjectMeta, ObjectStore};
use object_store::{path::Path, ObjectStore};
use object_store::{GetOptions, GetRange};
use tokio::runtime::Handle;

/// Reads Parquet files in object storage using [`ObjectStore`].
Expand All @@ -45,29 +46,29 @@ use tokio::runtime::Handle;
/// println!("Found Blob with {}B at {}", meta.size, meta.location);
///
/// // Show Parquet metadata
/// let reader = ParquetObjectReader::new(storage_container, meta);
/// let reader = ParquetObjectReader::new(storage_container, meta.location).with_file_size(meta.size);
/// let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
/// print_parquet_metadata(&mut stdout(), builder.metadata());
/// # }
/// ```
#[derive(Clone, Debug)]
pub struct ParquetObjectReader {
store: Arc<dyn ObjectStore>,
meta: ObjectMeta,
path: Path,
file_size: Option<usize>,
metadata_size_hint: Option<usize>,
preload_column_index: bool,
preload_offset_index: bool,
runtime: Option<Handle>,
}

impl ParquetObjectReader {
/// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`] and [`ObjectMeta`]
///
/// [`ObjectMeta`] can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`]
pub fn new(store: Arc<dyn ObjectStore>, meta: ObjectMeta) -> Self {
/// Creates a new [`ParquetObjectReader`] for the provided [`ObjectStore`] and [`Path`].
pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
Self {
store,
meta,
path,
file_size: None,
metadata_size_hint: None,
preload_column_index: false,
preload_offset_index: false,
Expand All @@ -84,6 +85,22 @@ impl ParquetObjectReader {
}
}

/// Provide the byte size of this file.
///
/// If provided, the file size will ensure that only bounded range requests are used. If file
/// size is not provided, the reader will use suffix range requests to fetch the metadata.
///
/// Providing this size up front is an important optimization to avoid extra calls when the
/// underlying store does not support suffix range requests.
///
/// The file size can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`].
pub fn with_file_size(self, file_size: usize) -> Self {
Self {
file_size: Some(file_size),
..self
}
}

/// Load the Column Index as part of [`Self::get_metadata`]
pub fn with_preload_column_index(self, preload_column_index: bool) -> Self {
Self {
Expand Down Expand Up @@ -125,7 +142,7 @@ impl ParquetObjectReader {
{
match &self.runtime {
Some(handle) => {
let path = self.meta.location.clone();
let path = self.path.clone();
let store = Arc::clone(&self.store);
handle
.spawn(async move { f(&store, &path).await })
Expand All @@ -138,13 +155,27 @@ impl ParquetObjectReader {
)
.boxed()
}
None => f(&self.store, &self.meta.location)
.map_err(|e| e.into())
.boxed(),
None => f(&self.store, &self.path).map_err(|e| e.into()).boxed(),
}
}
}

impl MetadataSuffixFetch for &mut ParquetObjectReader {
fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
let options = GetOptions {
range: Some(GetRange::Suffix(suffix)),
..Default::default()
};
self.spawn(|store, path| {
async move {
let resp = store.get_opts(path, options).await?;
Ok::<_, ParquetError>(resp.bytes().await?)
}
.boxed()
})
}
}

impl AsyncFileReader for ParquetObjectReader {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
self.spawn(|store, path| store.get_range(path, range))
Expand All @@ -165,13 +196,16 @@ impl AsyncFileReader for ParquetObjectReader {
// `Self::get_bytes`.
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let file_size = self.meta.size;
let metadata = ParquetMetaDataReader::new()
let metadata_reader = ParquetMetaDataReader::new()
.with_column_indexes(self.preload_column_index)
.with_offset_indexes(self.preload_offset_index)
.with_prefetch_hint(self.metadata_size_hint)
.load_and_finish(self, file_size)
.await?;
.with_prefetch_hint(self.metadata_size_hint);
let metadata = if let Some(file_size) = self.file_size {
metadata_reader.load_and_finish(self, file_size).await?
} else {
metadata_reader.load_via_suffix_and_finish(self).await?
};

Ok(Arc::new(metadata))
})
}
Expand All @@ -181,7 +215,6 @@ impl AsyncFileReader for ParquetObjectReader {
options: &'a ArrowReaderOptions,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let file_size = self.meta.size;
let metadata = ParquetMetaDataReader::new()
.with_column_indexes(self.preload_column_index)
.with_offset_indexes(self.preload_offset_index)
Expand All @@ -191,7 +224,11 @@ impl AsyncFileReader for ParquetObjectReader {
let metadata =
metadata.with_decryption_properties(options.file_decryption_properties.as_ref());

let metadata = metadata.load_and_finish(self, file_size).await?;
let metadata = if let Some(file_size) = self.file_size {
metadata.load_and_finish(self, file_size).await?
} else {
metadata.load_via_suffix_and_finish(self).await?
};

Ok(Arc::new(metadata))
})
Expand Down Expand Up @@ -231,7 +268,22 @@ mod tests {
#[tokio::test]
async fn test_simple() {
let (meta, store) = get_meta_store().await;
let object_reader = ParquetObjectReader::new(store, meta);
let object_reader =
ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);

let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
.await
.unwrap();
let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();

assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 8);
}

#[tokio::test]
async fn test_simple_without_file_length() {
let (meta, store) = get_meta_store().await;
let object_reader = ParquetObjectReader::new(store, meta.location);

let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
.await
Expand All @@ -247,7 +299,8 @@ mod tests {
let (mut meta, store) = get_meta_store().await;
meta.location = Path::from("I don't exist.parquet");

let object_reader = ParquetObjectReader::new(store, meta);
let object_reader =
ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
// Cannot use unwrap_err as ParquetRecordBatchStreamBuilder: !Debug
match ParquetRecordBatchStreamBuilder::new(object_reader).await {
Ok(_) => panic!("expected failure"),
Expand Down Expand Up @@ -280,7 +333,9 @@ mod tests {

let initial_actions = num_actions.load(Ordering::Relaxed);

let reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone());
let reader = ParquetObjectReader::new(store, meta.location)
.with_file_size(meta.size)
.with_runtime(rt.handle().clone());

let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
Expand All @@ -306,7 +361,9 @@ mod tests {

let (meta, store) = get_meta_store().await;

let reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone());
let reader = ParquetObjectReader::new(store, meta.location)
.with_file_size(meta.size)
.with_runtime(rt.handle().clone());

let current_id = std::thread::current().id();

Expand All @@ -329,7 +386,9 @@ mod tests {

let (meta, store) = get_meta_store().await;

let mut reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone());
let mut reader = ParquetObjectReader::new(store, meta.location)
.with_file_size(meta.size)
.with_runtime(rt.handle().clone());

rt.shutdown_background();

Expand Down
Loading
Loading