Skip to content

Remove AsyncFileReader::get_metadata_with_options, add options to AsyncFileReader::get_metadata #7342

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 1, 2025
Merged
2 changes: 1 addition & 1 deletion parquet/examples/read_with_rowgroup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async fn main() -> Result<()> {
let mut file = File::open(&path).await.unwrap();

// The metadata could be cached in other places, this example only shows how to read
let metadata = file.get_metadata().await?;
let metadata = file.get_metadata(None).await?;

for rg in metadata.row_groups() {
let mut rowgroup = InMemoryRowGroup::create(rg.clone(), ProjectionMask::all());
Expand Down
75 changes: 16 additions & 59 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,20 +101,11 @@ pub trait AsyncFileReader: Send {
/// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file,
/// allowing fine-grained control over how metadata is sourced, in particular allowing
/// for caching, pre-fetching, catalog metadata, etc...
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;

/// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file,
/// allowing fine-grained control over how metadata is sourced, in particular allowing
/// for caching, pre-fetching, catalog metadata, decrypting, etc...
///
/// By default calls `get_metadata()`
fn get_metadata_with_options<'a>(
/// ArrowReaderOptions may be provided to supply decryption parameters
fn get_metadata<'a>(
&'a mut self,
options: &'a ArrowReaderOptions,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
let _ = options;
self.get_metadata()
}
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
}

/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
Expand All @@ -127,15 +118,11 @@ impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
self.as_mut().get_byte_ranges(ranges)
}

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
self.as_mut().get_metadata()
}

fn get_metadata_with_options<'a>(
fn get_metadata<'a>(
&'a mut self,
options: &'a ArrowReaderOptions,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
self.as_mut().get_metadata_with_options(options)
self.as_mut().get_metadata(options)
}
}

Expand All @@ -156,9 +143,9 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
.boxed()
}

fn get_metadata_with_options<'a>(
fn get_metadata<'a>(
&'a mut self,
options: &'a ArrowReaderOptions,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
async move {
Expand All @@ -169,6 +156,7 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {

let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?;
let metadata_len = footer.metadata_length();

self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
.await?;

Expand All @@ -178,43 +166,16 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
let metadata_reader = ParquetMetaDataReader::new();

#[cfg(feature = "encryption")]
let metadata_reader = metadata_reader
.with_decryption_properties(options.file_decryption_properties.as_ref());
let metadata_reader = metadata_reader.with_decryption_properties(
options.and_then(|o| o.file_decryption_properties.as_ref()),
);

let parquet_metadata = metadata_reader.decode_footer_metadata(&buf, &footer)?;

Ok(Arc::new(parquet_metadata))
}
.boxed()
}

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
async move {
self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;

let mut buf = [0_u8; FOOTER_SIZE];
self.read_exact(&mut buf).await?;

let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?;
let metadata_len = footer.metadata_length();

if footer.is_encrypted_footer() {
return Err(general_err!(
"Parquet file has an encrypted footer but decryption properties were not provided"
));
}

self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
.await?;

let mut buf = Vec::with_capacity(metadata_len);
self.take(metadata_len as _).read_to_end(&mut buf).await?;

Ok(Arc::new(ParquetMetaDataReader::decode_metadata(&buf)?))
}
.boxed()
}
}

impl ArrowReaderMetadata {
Expand All @@ -233,7 +194,7 @@ impl ArrowReaderMetadata {
) -> Result<Self> {
// TODO: this is all rather awkward. It would be nice if AsyncFileReader::get_metadata
// took an argument to fetch the page indexes.
let mut metadata = input.get_metadata_with_options(&options).await?;
let mut metadata = input.get_metadata(Some(&options)).await?;

if options.page_index
&& metadata.column_index().is_none()
Expand Down Expand Up @@ -1169,13 +1130,9 @@ mod tests {
futures::future::ready(Ok(self.data.slice(range))).boxed()
}

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
futures::future::ready(Ok(self.metadata.clone())).boxed()
}

fn get_metadata_with_options<'a>(
fn get_metadata<'a>(
&'a mut self,
_options: &'a ArrowReaderOptions,
_options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
futures::future::ready(Ok(self.metadata.clone())).boxed()
}
Expand Down
28 changes: 7 additions & 21 deletions parquet/src/arrow/async_reader/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,35 +194,21 @@ impl AsyncFileReader for ParquetObjectReader {
// an `impl MetadataFetch` and calls those methods to get data from it. Due to `Self`'s impl of
// `AsyncFileReader`, the calls to `MetadataFetch::fetch` are just delegated to
// `Self::get_bytes`.
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let metadata_reader = ParquetMetaDataReader::new()
.with_column_indexes(self.preload_column_index)
.with_offset_indexes(self.preload_offset_index)
.with_prefetch_hint(self.metadata_size_hint);
let metadata = if let Some(file_size) = self.file_size {
metadata_reader.load_and_finish(self, file_size).await?
} else {
metadata_reader.load_via_suffix_and_finish(self).await?
};

Ok(Arc::new(metadata))
})
}

fn get_metadata_with_options<'a>(
fn get_metadata<'a>(
&'a mut self,
options: &'a ArrowReaderOptions,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let metadata = ParquetMetaDataReader::new()
let mut metadata = ParquetMetaDataReader::new()
.with_column_indexes(self.preload_column_index)
.with_offset_indexes(self.preload_offset_index)
.with_prefetch_hint(self.metadata_size_hint);

#[cfg(feature = "encryption")]
let metadata =
metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
if let Some(options) = options {
metadata = metadata
.with_decryption_properties(options.file_decryption_properties.as_ref());
}

let metadata = if let Some(file_size) = self.file_size {
metadata.load_and_finish(self, file_size).await?
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ impl ParquetMetaDataReader {

file_decryptor = Some(decryptor);
} else {
return Err(general_err!("Parquet file has an encrypted footer but no decryption properties were provided"));
return Err(general_err!("Parquet file has an encrypted footer but decryption properties were not provided"));
}
}

Expand Down
2 changes: 1 addition & 1 deletion parquet/tests/arrow_reader/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ fn test_decrypting_without_decryption_properties_fails() {
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"Parquet error: Parquet file has an encrypted footer but no decryption properties were provided"
"Parquet error: Parquet file has an encrypted footer but decryption properties were not provided"
);
}

Expand Down
4 changes: 2 additions & 2 deletions parquet/tests/arrow_reader/encryption_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ async fn test_decrypting_without_decryption_properties_fails() {
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"Parquet error: Parquet file has an encrypted footer but no decryption properties were provided"
"Parquet error: Parquet file has an encrypted footer but decryption properties were not provided"
);
}

Expand Down Expand Up @@ -277,7 +277,7 @@ async fn test_read_encrypted_file_from_object_store() {
let options = ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);

let mut reader = ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
let metadata = reader.get_metadata_with_options(&options).await.unwrap();
let metadata = reader.get_metadata(Some(&options)).await.unwrap();
let builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
.await
.unwrap();
Expand Down
Loading