diff --git a/parquet/examples/read_with_rowgroup.rs b/parquet/examples/read_with_rowgroup.rs index 8cccc7fe14ac..52b3d112274d 100644 --- a/parquet/examples/read_with_rowgroup.rs +++ b/parquet/examples/read_with_rowgroup.rs @@ -35,7 +35,7 @@ async fn main() -> Result<()> { let mut file = File::open(&path).await.unwrap(); // The metadata could be cached in other places, this example only shows how to read - let metadata = file.get_metadata().await?; + let metadata = file.get_metadata(None).await?; for rg in metadata.row_groups() { let mut rowgroup = InMemoryRowGroup::create(rg.clone(), ProjectionMask::all()); diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index b0d5b0a71030..5d5a7036eefb 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -101,20 +101,11 @@ pub trait AsyncFileReader: Send { /// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file, /// allowing fine-grained control over how metadata is sourced, in particular allowing /// for caching, pre-fetching, catalog metadata, etc... - fn get_metadata(&mut self) -> BoxFuture<'_, Result>>; - - /// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file, - /// allowing fine-grained control over how metadata is sourced, in particular allowing - /// for caching, pre-fetching, catalog metadata, decrypting, etc... - /// - /// By default calls `get_metadata()` - fn get_metadata_with_options<'a>( + /// ArrowReaderOptions may be provided to supply decryption parameters + fn get_metadata<'a>( &'a mut self, - options: &'a ArrowReaderOptions, - ) -> BoxFuture<'a, Result>> { - let _ = options; - self.get_metadata() - } + options: Option<&'a ArrowReaderOptions>, + ) -> BoxFuture<'a, Result>>; } /// This allows Box to be used as an AsyncFileReader, @@ -127,15 +118,11 @@ impl AsyncFileReader for Box { self.as_mut().get_byte_ranges(ranges) } - fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { - self.as_mut().get_metadata() - } - - fn get_metadata_with_options<'a>( + fn get_metadata<'a>( &'a mut self, - options: &'a ArrowReaderOptions, + options: Option<&'a ArrowReaderOptions>, ) -> BoxFuture<'a, Result>> { - self.as_mut().get_metadata_with_options(options) + self.as_mut().get_metadata(options) } } @@ -156,9 +143,9 @@ impl AsyncFileReader for T { .boxed() } - fn get_metadata_with_options<'a>( + fn get_metadata<'a>( &'a mut self, - options: &'a ArrowReaderOptions, + options: Option<&'a ArrowReaderOptions>, ) -> BoxFuture<'a, Result>> { const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64; async move { @@ -169,6 +156,7 @@ impl AsyncFileReader for T { let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?; let metadata_len = footer.metadata_length(); + self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64)) .await?; @@ -178,8 +166,9 @@ impl AsyncFileReader for T { let metadata_reader = ParquetMetaDataReader::new(); #[cfg(feature = "encryption")] - let metadata_reader = metadata_reader - .with_decryption_properties(options.file_decryption_properties.as_ref()); + let metadata_reader = metadata_reader.with_decryption_properties( + options.and_then(|o| o.file_decryption_properties.as_ref()), + ); let parquet_metadata = metadata_reader.decode_footer_metadata(&buf, &footer)?; @@ -187,34 +176,6 @@ impl AsyncFileReader for T { } .boxed() } - - fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { - const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64; - async move { - self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?; - - let mut buf = [0_u8; FOOTER_SIZE]; - self.read_exact(&mut buf).await?; - - let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?; - let metadata_len = footer.metadata_length(); - - if footer.is_encrypted_footer() { - return Err(general_err!( - "Parquet file has an encrypted footer but decryption properties were not provided" - )); - } - - self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64)) - .await?; - - let mut buf = Vec::with_capacity(metadata_len); - self.take(metadata_len as _).read_to_end(&mut buf).await?; - - Ok(Arc::new(ParquetMetaDataReader::decode_metadata(&buf)?)) - } - .boxed() - } } impl ArrowReaderMetadata { @@ -233,7 +194,7 @@ impl ArrowReaderMetadata { ) -> Result { // TODO: this is all rather awkward. It would be nice if AsyncFileReader::get_metadata // took an argument to fetch the page indexes. - let mut metadata = input.get_metadata_with_options(&options).await?; + let mut metadata = input.get_metadata(Some(&options)).await?; if options.page_index && metadata.column_index().is_none() @@ -1169,13 +1130,9 @@ mod tests { futures::future::ready(Ok(self.data.slice(range))).boxed() } - fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { - futures::future::ready(Ok(self.metadata.clone())).boxed() - } - - fn get_metadata_with_options<'a>( + fn get_metadata<'a>( &'a mut self, - _options: &'a ArrowReaderOptions, + _options: Option<&'a ArrowReaderOptions>, ) -> BoxFuture<'a, Result>> { futures::future::ready(Ok(self.metadata.clone())).boxed() } diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index 9934d2b93d24..dff7d9362aec 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -194,35 +194,21 @@ impl AsyncFileReader for ParquetObjectReader { // an `impl MetadataFetch` and calls those methods to get data from it. Due to `Self`'s impl of // `AsyncFileReader`, the calls to `MetadataFetch::fetch` are just delegated to // `Self::get_bytes`. - fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { - Box::pin(async move { - let metadata_reader = ParquetMetaDataReader::new() - .with_column_indexes(self.preload_column_index) - .with_offset_indexes(self.preload_offset_index) - .with_prefetch_hint(self.metadata_size_hint); - let metadata = if let Some(file_size) = self.file_size { - metadata_reader.load_and_finish(self, file_size).await? - } else { - metadata_reader.load_via_suffix_and_finish(self).await? - }; - - Ok(Arc::new(metadata)) - }) - } - - fn get_metadata_with_options<'a>( + fn get_metadata<'a>( &'a mut self, - options: &'a ArrowReaderOptions, + options: Option<&'a ArrowReaderOptions>, ) -> BoxFuture<'a, Result>> { Box::pin(async move { - let metadata = ParquetMetaDataReader::new() + let mut metadata = ParquetMetaDataReader::new() .with_column_indexes(self.preload_column_index) .with_offset_indexes(self.preload_offset_index) .with_prefetch_hint(self.metadata_size_hint); #[cfg(feature = "encryption")] - let metadata = - metadata.with_decryption_properties(options.file_decryption_properties.as_ref()); + if let Some(options) = options { + metadata = metadata + .with_decryption_properties(options.file_decryption_properties.as_ref()); + } let metadata = if let Some(file_size) = self.file_size { metadata.load_and_finish(self, file_size).await? diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index c2423ecc1bdc..f0eedd2bb99d 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -834,7 +834,7 @@ impl ParquetMetaDataReader { file_decryptor = Some(decryptor); } else { - return Err(general_err!("Parquet file has an encrypted footer but no decryption properties were provided")); + return Err(general_err!("Parquet file has an encrypted footer but decryption properties were not provided")); } } diff --git a/parquet/tests/arrow_reader/encryption.rs b/parquet/tests/arrow_reader/encryption.rs index 521212488a49..362a58772ac6 100644 --- a/parquet/tests/arrow_reader/encryption.rs +++ b/parquet/tests/arrow_reader/encryption.rs @@ -154,7 +154,7 @@ fn test_decrypting_without_decryption_properties_fails() { assert!(result.is_err()); assert_eq!( result.unwrap_err().to_string(), - "Parquet error: Parquet file has an encrypted footer but no decryption properties were provided" + "Parquet error: Parquet file has an encrypted footer but decryption properties were not provided" ); } diff --git a/parquet/tests/arrow_reader/encryption_async.rs b/parquet/tests/arrow_reader/encryption_async.rs index 9c9b3245823d..60eb97363dae 100644 --- a/parquet/tests/arrow_reader/encryption_async.rs +++ b/parquet/tests/arrow_reader/encryption_async.rs @@ -239,7 +239,7 @@ async fn test_decrypting_without_decryption_properties_fails() { assert!(result.is_err()); assert_eq!( result.unwrap_err().to_string(), - "Parquet error: Parquet file has an encrypted footer but no decryption properties were provided" + "Parquet error: Parquet file has an encrypted footer but decryption properties were not provided" ); } @@ -277,7 +277,7 @@ async fn test_read_encrypted_file_from_object_store() { let options = ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties); let mut reader = ParquetObjectReader::new(store, meta.location).with_file_size(meta.size); - let metadata = reader.get_metadata_with_options(&options).await.unwrap(); + let metadata = reader.get_metadata(Some(&options)).await.unwrap(); let builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, options) .await .unwrap();