Skip to content

Commit

Permalink
get_metadata_with_encryption -> get_metadata_with_options
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Mar 11, 2025
1 parent cc27421 commit 90434d6
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 40 deletions.
53 changes: 27 additions & 26 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,15 @@ pub trait AsyncFileReader: Send {
/// for caching, pre-fetching, catalog metadata, etc...
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;

/// Provides asynchronous access to the [`ParquetMetaData`] of encrypted parquet
/// files, like get_metadata does for unencrypted ones.
#[cfg(feature = "encryption")]
fn get_metadata_with_encryption(
&mut self,
file_decryption_properties: Option<FileDecryptionProperties>,
) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;
/// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file,
/// allowing fine-grained control over how metadata is sourced, in particular allowing
/// for caching, pre-fetching, catalog metadata, decrypting, etc...
///
/// By default calls `get_metadata()`
fn get_metadata_with_options<'a>(
&'a mut self,
options: &'a ArrowReaderOptions,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
}

/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
Expand All @@ -131,13 +133,14 @@ impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
self.as_mut().get_metadata()
}

#[cfg(feature = "encryption")]
fn get_metadata_with_encryption(
&mut self,
file_decryption_properties: Option<FileDecryptionProperties>,
) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
self.as_mut()
.get_metadata_with_encryption(file_decryption_properties)
fn get_metadata_with_options<'a>(
&'a mut self,
options: &'a ArrowReaderOptions,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
#[cfg(feature = "encryption")]
return self.as_mut().get_metadata_with_options(options);
#[cfg(not(feature = "encryption"))]
self.as_mut().get_metadata()
}
}

Expand All @@ -159,10 +162,10 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
}

#[cfg(feature = "encryption")]
fn get_metadata_with_encryption(
&mut self,
file_decryption_properties: Option<FileDecryptionProperties>,
) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
fn get_metadata_with_options<'a>(
&'a mut self,
options: &'a ArrowReaderOptions,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
async move {
self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;
Expand All @@ -181,7 +184,7 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
let parquet_metadata_reader = ParquetMetaDataReader::decode_metadata_with_encryption(
&buf,
footer.is_encrypted_footer(),
file_decryption_properties.as_ref(),
options.file_decryption_properties.as_ref(),
)?;
Ok(Arc::new(parquet_metadata_reader))
}
Expand Down Expand Up @@ -235,9 +238,7 @@ impl ArrowReaderMetadata {
// TODO: this is all rather awkward. It would be nice if AsyncFileReader::get_metadata
// took an argument to fetch the page indexes.
#[cfg(feature = "encryption")]
let mut metadata = input
.get_metadata_with_encryption(options.file_decryption_properties.clone())
.await?;
let mut metadata = input.get_metadata_with_options(&options).await?;

#[cfg(not(feature = "encryption"))]
let mut metadata = input.get_metadata().await?;
Expand Down Expand Up @@ -1214,10 +1215,10 @@ mod tests {
}

#[cfg(feature = "encryption")]
fn get_metadata_with_encryption(
&mut self,
_file_decryption_properties: Option<FileDecryptionProperties>,
) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
fn get_metadata_with_options<'a>(
&'a mut self,
options: &'a ArrowReaderOptions,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
futures::future::ready(Ok(self.metadata.clone())).boxed()
}
}
Expand Down
66 changes: 52 additions & 14 deletions parquet/src/arrow/async_reader/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@

use std::{ops::Range, sync::Arc};

use crate::arrow::arrow_reader::ArrowReaderOptions;
use crate::arrow::async_reader::AsyncFileReader;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use bytes::Bytes;
use futures::{future::BoxFuture, FutureExt, TryFutureExt};
use object_store::{path::Path, ObjectMeta, ObjectStore};
use tokio::runtime::Handle;

use crate::arrow::async_reader::AsyncFileReader;
#[cfg(feature = "encryption")]
use crate::encryption::decrypt::FileDecryptionProperties;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};

/// Reads Parquet files in object storage using [`ObjectStore`].
///
/// ```no_run
Expand Down Expand Up @@ -179,16 +177,17 @@ impl AsyncFileReader for ParquetObjectReader {
}

#[cfg(feature = "encryption")]
fn get_metadata_with_encryption(
&mut self,
_file_decryption_properties: Option<FileDecryptionProperties>,
) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
fn get_metadata_with_options<'a>(
&'a mut self,
options: &'a ArrowReaderOptions,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let file_size = self.meta.size;
let metadata = ParquetMetaDataReader::new()
.with_column_indexes(self.preload_column_index)
.with_offset_indexes(self.preload_offset_index)
.with_prefetch_hint(self.metadata_size_hint)
.with_decryption_properties(options.file_decryption_properties.as_ref())
.load_and_finish(self, file_size)
.await?;

Expand All @@ -206,16 +205,17 @@ mod tests {

use futures::TryStreamExt;

use crate::arrow::arrow_reader::ArrowReaderOptions;
use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use crate::arrow::ParquetRecordBatchStreamBuilder;
use crate::encryption::decrypt::FileDecryptionProperties;
use crate::errors::ParquetError;
use arrow::util::test_util::parquet_test_data;
use futures::FutureExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};

use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use crate::arrow::ParquetRecordBatchStreamBuilder;
use crate::errors::ParquetError;

async fn get_meta_store() -> (ObjectMeta, Arc<dyn ObjectStore>) {
let res = parquet_test_data();
let store = LocalFileSystem::new_with_prefix(res).unwrap();
Expand All @@ -228,6 +228,44 @@ mod tests {
(meta, Arc::new(store) as Arc<dyn ObjectStore>)
}

#[cfg(feature = "encryption")]
async fn get_encrypted_meta_store() -> (ObjectMeta, Arc<dyn ObjectStore>) {
let res = parquet_test_data();
let store = LocalFileSystem::new_with_prefix(res).unwrap();

let meta = store
.head(&Path::from("uniform_encryption.parquet.encrypted"))
.await
.unwrap();

(meta, Arc::new(store) as Arc<dyn ObjectStore>)
}

#[tokio::test]
#[cfg(feature = "encryption")]
async fn test_encrypted() {
let (meta, store) = get_encrypted_meta_store().await;

let key_code: &[u8] = "0123456789012345".as_bytes();
let decryption_properties = FileDecryptionProperties::builder(key_code.to_vec())
.build()
.unwrap();
let options =
ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);
let mut binding = ParquetObjectReader::new(store, meta);
let binding = binding.get_metadata_with_options(&options);

let object_reader = binding.await.unwrap();
// todo: this should pass
// let builder = ParquetRecordBatchStreamBuilder::new_with_options(object_reader, options)
// .await
// .unwrap();
// let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
//
// assert_eq!(batches.len(), 1);
// assert_eq!(batches[0].num_rows(), 8);
}

#[tokio::test]
async fn test_simple() {
let (meta, store) = get_meta_store().await;
Expand Down

0 comments on commit 90434d6

Please sign in to comment.