From 7dac4855195a98addc98d74f043141ad57b0a177 Mon Sep 17 00:00:00 2001 From: Corwin Joy Date: Wed, 26 Mar 2025 17:47:10 -0700 Subject: [PATCH 01/10] Remove default implementation for get_metadata_with_options and explain why. --- parquet/src/arrow/async_reader/mod.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 4478162e521e..d6b17d639a23 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -110,14 +110,12 @@ pub trait AsyncFileReader: Send { /// allowing fine-grained control over how metadata is sourced, in particular allowing /// for caching, pre-fetching, catalog metadata, decrypting, etc... /// - /// By default calls `get_metadata()` + /// No default here because options under encryption are significant, and we want + /// the end-user to be explicit. fn get_metadata_with_options<'a>( &'a mut self, options: &'a ArrowReaderOptions, - ) -> BoxFuture<'a, Result>> { - let _ = options; - self.get_metadata() - } + ) -> BoxFuture<'a, Result>>; } /// This allows Box to be used as an AsyncFileReader, From a86c499dc2cb170769eee3bc8d8cf183a3bf7547 Mon Sep 17 00:00:00 2001 From: Corwin Joy Date: Wed, 26 Mar 2025 18:00:38 -0700 Subject: [PATCH 02/10] Improve comment about missing implementation. --- parquet/src/arrow/async_reader/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index d6b17d639a23..155d59527383 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -110,8 +110,8 @@ pub trait AsyncFileReader: Send { /// allowing fine-grained control over how metadata is sourced, in particular allowing /// for caching, pre-fetching, catalog metadata, decrypting, etc... /// - /// No default here because options under encryption are significant, and we want - /// the end-user to be explicit. + /// No default because options under encryption are significant. + /// We want the end-user to be explicit about what they want here. fn get_metadata_with_options<'a>( &'a mut self, options: &'a ArrowReaderOptions, From a784060f773286f032608aa31e5b8aa29e328980 Mon Sep 17 00:00:00 2001 From: Corwin Joy Date: Fri, 28 Mar 2025 16:28:26 -0700 Subject: [PATCH 03/10] Update get_metadata method to take an optional ArrowReaderOptions. --- parquet/src/arrow/async_reader/mod.rs | 79 +++++++------------------ parquet/src/arrow/async_reader/store.rs | 25 +++----- 2 files changed, 30 insertions(+), 74 deletions(-) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 155d59527383..c7fa226f0404 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -104,17 +104,10 @@ pub trait AsyncFileReader: Send { /// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file, /// allowing fine-grained control over how metadata is sourced, in particular allowing /// for caching, pre-fetching, catalog metadata, etc... - fn get_metadata(&mut self) -> BoxFuture<'_, Result>>; - - /// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file, - /// allowing fine-grained control over how metadata is sourced, in particular allowing - /// for caching, pre-fetching, catalog metadata, decrypting, etc... - /// - /// No default because options under encryption are significant. - /// We want the end-user to be explicit about what they want here. - fn get_metadata_with_options<'a>( + /// ArrowReaderOptions may be provided to supply decryption parameters + fn get_metadata<'a>( &'a mut self, - options: &'a ArrowReaderOptions, + options: Option<&'a ArrowReaderOptions>, ) -> BoxFuture<'a, Result>>; } @@ -128,15 +121,11 @@ impl AsyncFileReader for Box { self.as_mut().get_byte_ranges(ranges) } - fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { - self.as_mut().get_metadata() - } - - fn get_metadata_with_options<'a>( + fn get_metadata<'a>( &'a mut self, - options: &'a ArrowReaderOptions, + options: Option<&'a ArrowReaderOptions>, ) -> BoxFuture<'a, Result>> { - self.as_mut().get_metadata_with_options(options) + self.as_mut().get_metadata(options) } } @@ -157,9 +146,9 @@ impl AsyncFileReader for T { .boxed() } - fn get_metadata_with_options<'a>( + fn get_metadata<'a>( &'a mut self, - options: &'a ArrowReaderOptions, + options: Option<&'a ArrowReaderOptions>, ) -> BoxFuture<'a, Result>> { const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64; async move { @@ -170,37 +159,9 @@ impl AsyncFileReader for T { let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?; let metadata_len = footer.metadata_length(); - self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64)) - .await?; - - let mut buf = Vec::with_capacity(metadata_len); - self.take(metadata_len as _).read_to_end(&mut buf).await?; - - let metadata_reader = ParquetMetaDataReader::new(); - - #[cfg(feature = "encryption")] - let metadata_reader = metadata_reader - .with_decryption_properties(options.file_decryption_properties.as_ref()); - - let parquet_metadata = metadata_reader.decode_footer_metadata(&buf, &footer)?; - - Ok(Arc::new(parquet_metadata)) - } - .boxed() - } - fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { - const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64; - async move { - self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?; - - let mut buf = [0_u8; FOOTER_SIZE]; - self.read_exact(&mut buf).await?; - - let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?; - let metadata_len = footer.metadata_length(); - - if footer.is_encrypted_footer() { + if footer.is_encrypted_footer() && + (options.is_none() || options.unwrap().file_decryption_properties.is_none()) { return Err(general_err!( "Parquet file has an encrypted footer but decryption properties were not provided" )); @@ -212,7 +173,17 @@ impl AsyncFileReader for T { let mut buf = Vec::with_capacity(metadata_len); self.take(metadata_len as _).read_to_end(&mut buf).await?; - Ok(Arc::new(ParquetMetaDataReader::decode_metadata(&buf)?)) + let mut metadata_reader = ParquetMetaDataReader::new(); + + #[cfg(feature = "encryption")] + if let Some(options) = options { + metadata_reader = metadata_reader + .with_decryption_properties(options.file_decryption_properties.as_ref()); + } + + let parquet_metadata = metadata_reader.decode_footer_metadata(&buf, &footer)?; + + Ok(Arc::new(parquet_metadata)) } .boxed() } @@ -1182,13 +1153,9 @@ mod tests { futures::future::ready(Ok(self.data.slice(range))).boxed() } - fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { - futures::future::ready(Ok(self.metadata.clone())).boxed() - } - - fn get_metadata_with_options<'a>( + fn get_metadata<'a>( &'a mut self, - _options: &'a ArrowReaderOptions, + _options: Option<&'a ArrowReaderOptions>, ) -> BoxFuture<'a, Result>> { futures::future::ready(Ok(self.metadata.clone())).boxed() } diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index a1e94efd1451..6b4fd797f943 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -163,33 +163,22 @@ impl AsyncFileReader for ParquetObjectReader { // an `impl MetadataFetch` and calls those methods to get data from it. Due to `Self`'s impl of // `AsyncFileReader`, the calls to `MetadataFetch::fetch` are just delegated to // `Self::get_bytes`. - fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { - Box::pin(async move { - let file_size = self.meta.size; - let metadata = ParquetMetaDataReader::new() - .with_column_indexes(self.preload_column_index) - .with_offset_indexes(self.preload_offset_index) - .with_prefetch_hint(self.metadata_size_hint) - .load_and_finish(self, file_size) - .await?; - Ok(Arc::new(metadata)) - }) - } - - fn get_metadata_with_options<'a>( + fn get_metadata<'a>( &'a mut self, - options: &'a ArrowReaderOptions, + options: Option<&'a ArrowReaderOptions>, ) -> BoxFuture<'a, Result>> { Box::pin(async move { let file_size = self.meta.size; - let metadata = ParquetMetaDataReader::new() + let mut metadata = ParquetMetaDataReader::new() .with_column_indexes(self.preload_column_index) .with_offset_indexes(self.preload_offset_index) .with_prefetch_hint(self.metadata_size_hint); #[cfg(feature = "encryption")] - let metadata = - metadata.with_decryption_properties(options.file_decryption_properties.as_ref()); + if let Some(options) = options { + metadata = metadata + .with_decryption_properties(options.file_decryption_properties.as_ref()); + } let metadata = metadata.load_and_finish(self, file_size).await?; From 62d6a844309bcc8f9d5a2e5d2beb29941df78473 Mon Sep 17 00:00:00 2001 From: Corwin Joy Date: Fri, 28 Mar 2025 16:49:55 -0700 Subject: [PATCH 04/10] Fix handling of unencrypted footer. --- parquet/src/arrow/async_reader/mod.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index c7fa226f0404..73bd516f5808 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -159,9 +159,10 @@ impl AsyncFileReader for T { let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?; let metadata_len = footer.metadata_length(); + let have_decryptor = options.is_some() && + options.unwrap().file_decryption_properties.is_some(); - if footer.is_encrypted_footer() && - (options.is_none() || options.unwrap().file_decryption_properties.is_none()) { + if footer.is_encrypted_footer() && !have_decryptor { return Err(general_err!( "Parquet file has an encrypted footer but decryption properties were not provided" )); @@ -181,7 +182,10 @@ impl AsyncFileReader for T { .with_decryption_properties(options.file_decryption_properties.as_ref()); } - let parquet_metadata = metadata_reader.decode_footer_metadata(&buf, &footer)?; + let parquet_metadata = match have_decryptor { + true => metadata_reader.decode_footer_metadata(&buf, &footer)?, + false => ParquetMetaDataReader::decode_metadata(&buf)? + }; Ok(Arc::new(parquet_metadata)) } From 32944d5cddf15946859e5b3830e6327f86daee01 Mon Sep 17 00:00:00 2001 From: Corwin Joy Date: Fri, 28 Mar 2025 17:03:58 -0700 Subject: [PATCH 05/10] Checking git tests, it seems I missed a couple dead calls. Fixed. --- parquet/src/arrow/async_reader/mod.rs | 2 +- parquet/tests/arrow_reader/encryption_async.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 73bd516f5808..30bda9dbaf83 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -209,7 +209,7 @@ impl ArrowReaderMetadata { ) -> Result { // TODO: this is all rather awkward. It would be nice if AsyncFileReader::get_metadata // took an argument to fetch the page indexes. - let mut metadata = input.get_metadata_with_options(&options).await?; + let mut metadata = input.get_metadata(Some(&options)).await?; if options.page_index && metadata.column_index().is_none() diff --git a/parquet/tests/arrow_reader/encryption_async.rs b/parquet/tests/arrow_reader/encryption_async.rs index eeac10f574f4..d3ab8a383169 100644 --- a/parquet/tests/arrow_reader/encryption_async.rs +++ b/parquet/tests/arrow_reader/encryption_async.rs @@ -277,7 +277,7 @@ async fn test_read_encrypted_file_from_object_store() { let options = ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties); let mut reader = ParquetObjectReader::new(store, meta); - let metadata = reader.get_metadata_with_options(&options).await.unwrap(); + let metadata = reader.get_metadata(Some(&options)).await.unwrap(); let builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, options) .await .unwrap(); From d09313f2c8601ab87d764f0ddeaa20b25f12f0ce Mon Sep 17 00:00:00 2001 From: Corwin Joy Date: Fri, 28 Mar 2025 17:36:12 -0700 Subject: [PATCH 06/10] Fix path with no encryption enabled. --- parquet/examples/read_with_rowgroup.rs | 2 +- parquet/src/arrow/async_reader/mod.rs | 16 +++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/parquet/examples/read_with_rowgroup.rs b/parquet/examples/read_with_rowgroup.rs index 8cccc7fe14ac..52b3d112274d 100644 --- a/parquet/examples/read_with_rowgroup.rs +++ b/parquet/examples/read_with_rowgroup.rs @@ -35,7 +35,7 @@ async fn main() -> Result<()> { let mut file = File::open(&path).await.unwrap(); // The metadata could be cached in other places, this example only shows how to read - let metadata = file.get_metadata().await?; + let metadata = file.get_metadata(None).await?; for rg in metadata.row_groups() { let mut rowgroup = InMemoryRowGroup::create(rg.clone(), ProjectionMask::all()); diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 30bda9dbaf83..ecb11eb4d9ba 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -159,9 +159,13 @@ impl AsyncFileReader for T { let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?; let metadata_len = footer.metadata_length(); + #[cfg(feature = "encryption")] let have_decryptor = options.is_some() && options.unwrap().file_decryption_properties.is_some(); + #[cfg(not(feature = "encryption"))] + let have_decryptor = options.is_some(); + if footer.is_encrypted_footer() && !have_decryptor { return Err(general_err!( "Parquet file has an encrypted footer but decryption properties were not provided" @@ -174,13 +178,15 @@ impl AsyncFileReader for T { let mut buf = Vec::with_capacity(metadata_len); self.take(metadata_len as _).read_to_end(&mut buf).await?; - let mut metadata_reader = ParquetMetaDataReader::new(); + let metadata_reader = ParquetMetaDataReader::new(); #[cfg(feature = "encryption")] - if let Some(options) = options { - metadata_reader = metadata_reader - .with_decryption_properties(options.file_decryption_properties.as_ref()); - } + let metadata_reader = match have_decryptor { + true => metadata_reader + .with_decryption_properties(options.unwrap().file_decryption_properties.as_ref()), + false => metadata_reader, + }; + let parquet_metadata = match have_decryptor { true => metadata_reader.decode_footer_metadata(&buf, &footer)?, From fbf2519d0c2690850b042a4535f2785e851e9183 Mon Sep 17 00:00:00 2001 From: Corwin Joy Date: Fri, 28 Mar 2025 17:43:39 -0700 Subject: [PATCH 07/10] Make have_decryptor a bit more bullet-proof. --- parquet/src/arrow/async_reader/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index ecb11eb4d9ba..135de8aa409f 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -164,7 +164,7 @@ impl AsyncFileReader for T { options.unwrap().file_decryption_properties.is_some(); #[cfg(not(feature = "encryption"))] - let have_decryptor = options.is_some(); + let have_decryptor = options.is_some() && cfg!(feature = "encryption"); // always false if footer.is_encrypted_footer() && !have_decryptor { return Err(general_err!( From 6615e27485fd37b2b20451d05de27eec67652f1a Mon Sep 17 00:00:00 2001 From: Corwin Joy Date: Fri, 28 Mar 2025 18:15:35 -0700 Subject: [PATCH 08/10] Update expected error message in test_decrypting_without_decryption_properties_fails. --- parquet/tests/arrow_reader/encryption_async.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/tests/arrow_reader/encryption_async.rs b/parquet/tests/arrow_reader/encryption_async.rs index d3ab8a383169..5082e3e0e715 100644 --- a/parquet/tests/arrow_reader/encryption_async.rs +++ b/parquet/tests/arrow_reader/encryption_async.rs @@ -239,7 +239,7 @@ async fn test_decrypting_without_decryption_properties_fails() { assert!(result.is_err()); assert_eq!( result.unwrap_err().to_string(), - "Parquet error: Parquet file has an encrypted footer but no decryption properties were provided" + "Parquet error: Parquet file has an encrypted footer but decryption properties were not provided" ); } From 2158f314a784c37b5e61aaab1dec0437c3ebc0cb Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Mon, 31 Mar 2025 13:26:10 +1300 Subject: [PATCH 09/10] Tidy up get_metadata --- parquet/src/arrow/async_reader/mod.rs | 26 ++++-------------------- parquet/src/file/metadata/reader.rs | 2 +- parquet/tests/arrow_reader/encryption.rs | 2 +- 3 files changed, 6 insertions(+), 24 deletions(-) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 135de8aa409f..4b6db3d523d7 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -159,18 +159,6 @@ impl AsyncFileReader for T { let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?; let metadata_len = footer.metadata_length(); - #[cfg(feature = "encryption")] - let have_decryptor = options.is_some() && - options.unwrap().file_decryption_properties.is_some(); - - #[cfg(not(feature = "encryption"))] - let have_decryptor = options.is_some() && cfg!(feature = "encryption"); // always false - - if footer.is_encrypted_footer() && !have_decryptor { - return Err(general_err!( - "Parquet file has an encrypted footer but decryption properties were not provided" - )); - } self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64)) .await?; @@ -181,17 +169,11 @@ impl AsyncFileReader for T { let metadata_reader = ParquetMetaDataReader::new(); #[cfg(feature = "encryption")] - let metadata_reader = match have_decryptor { - true => metadata_reader - .with_decryption_properties(options.unwrap().file_decryption_properties.as_ref()), - false => metadata_reader, - }; - + let metadata_reader = metadata_reader.with_decryption_properties( + options.and_then(|o| o.file_decryption_properties.as_ref()), + ); - let parquet_metadata = match have_decryptor { - true => metadata_reader.decode_footer_metadata(&buf, &footer)?, - false => ParquetMetaDataReader::decode_metadata(&buf)? - }; + let parquet_metadata = metadata_reader.decode_footer_metadata(&buf, &footer)?; Ok(Arc::new(parquet_metadata)) } diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 8532b5966760..6a44fdd738b6 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -751,7 +751,7 @@ impl ParquetMetaDataReader { file_decryptor = Some(decryptor); } else { - return Err(general_err!("Parquet file has an encrypted footer but no decryption properties were provided")); + return Err(general_err!("Parquet file has an encrypted footer but decryption properties were not provided")); } } diff --git a/parquet/tests/arrow_reader/encryption.rs b/parquet/tests/arrow_reader/encryption.rs index 521212488a49..362a58772ac6 100644 --- a/parquet/tests/arrow_reader/encryption.rs +++ b/parquet/tests/arrow_reader/encryption.rs @@ -154,7 +154,7 @@ fn test_decrypting_without_decryption_properties_fails() { assert!(result.is_err()); assert_eq!( result.unwrap_err().to_string(), - "Parquet error: Parquet file has an encrypted footer but no decryption properties were provided" + "Parquet error: Parquet file has an encrypted footer but decryption properties were not provided" ); } From d1a584382fb5c3c1a6e18efac5ba37b0011108f7 Mon Sep 17 00:00:00 2001 From: Corwin Joy Date: Mon, 31 Mar 2025 17:42:09 -0700 Subject: [PATCH 10/10] Fix merge conflicts. --- parquet/tests/arrow_reader/encryption_async.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/tests/arrow_reader/encryption_async.rs b/parquet/tests/arrow_reader/encryption_async.rs index 16eef4a9679a..60eb97363dae 100644 --- a/parquet/tests/arrow_reader/encryption_async.rs +++ b/parquet/tests/arrow_reader/encryption_async.rs @@ -277,7 +277,7 @@ async fn test_read_encrypted_file_from_object_store() { let options = ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties); let mut reader = ParquetObjectReader::new(store, meta.location).with_file_size(meta.size); - let metadata = reader.get_metadata_with_options(Some(&options)).await.unwrap(); + let metadata = reader.get_metadata(Some(&options)).await.unwrap(); let builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, options) .await .unwrap();