From 000334da01080490c6a9d3bef111f2c69cbff559 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Sat, 8 Mar 2025 02:42:14 +0100 Subject: [PATCH] Review feedback --- parquet/src/arrow/arrow_reader/mod.rs | 81 +--------- parquet/src/arrow/async_reader/mod.rs | 98 +------------ parquet/src/file/metadata/reader.rs | 2 +- .../src/util/test_common/encryption_util.rs | 138 ++++++++++++++++++ parquet/src/util/test_common/mod.rs | 3 + 5 files changed, 150 insertions(+), 172 deletions(-) create mode 100644 parquet/src/util/test_common/encryption_util.rs diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index eb47012321b8..5d796effb693 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -1020,6 +1020,8 @@ mod tests { use crate::file::writer::SerializedFileWriter; use crate::schema::parser::parse_message_type; use crate::schema::types::{Type, TypePtr}; + #[cfg(feature = "encryption")] + use crate::util::test_common::encryption_util::verify_encryption_test_file_read; use crate::util::test_common::rand_gen::RandGen; #[test] @@ -1993,85 +1995,6 @@ mod tests { }; } - #[cfg(feature = "encryption")] - fn verify_encryption_test_file_read( - file: File, - decryption_properties: FileDecryptionProperties, - ) { - let options = ArrowReaderOptions::default() - .with_file_decryption_properties(decryption_properties.clone()); - let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap(); - let file_metadata = metadata.metadata.file_metadata(); - - let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap(); - let record_reader = builder.build().unwrap(); - - assert_eq!(file_metadata.num_rows(), 50); - assert_eq!(file_metadata.schema_descr().num_columns(), 8); - assert_eq!( - file_metadata.created_by().unwrap(), - "parquet-cpp-arrow version 19.0.0-SNAPSHOT" - ); - - metadata.metadata.row_groups().iter().for_each(|rg| { - assert_eq!(rg.num_columns(), 8); - assert_eq!(rg.num_rows(), 50); - }); - - let mut row_count = 0; - for batch in record_reader { - let batch = batch.unwrap(); - row_count += batch.num_rows(); - - let bool_col = batch.column(0).as_boolean(); - let time_col = batch - .column(1) - .as_primitive::(); - let list_col = batch.column(2).as_list::(); - let timestamp_col = batch - .column(3) - .as_primitive::(); - let f32_col = batch.column(4).as_primitive::(); - let f64_col = batch.column(5).as_primitive::(); - let binary_col = batch.column(6).as_binary::(); - let fixed_size_binary_col = batch.column(7).as_fixed_size_binary(); - - for (i, x) in bool_col.iter().enumerate() { - assert_eq!(x.unwrap(), i % 2 == 0); - } - for (i, x) in time_col.iter().enumerate() { - assert_eq!(x.unwrap(), i as i32); - } - for (i, list_item) in list_col.iter().enumerate() { - let list_item = list_item.unwrap(); - let list_item = list_item.as_primitive::(); - assert_eq!(list_item.len(), 2); - assert_eq!(list_item.value(0), ((i * 2) * 1000000000000) as i64); - assert_eq!(list_item.value(1), ((i * 2 + 1) * 1000000000000) as i64); - } - for x in timestamp_col.iter() { - assert!(x.is_some()); - } - for (i, x) in f32_col.iter().enumerate() { - assert_eq!(x.unwrap(), i as f32 * 1.1f32); - } - for (i, x) in f64_col.iter().enumerate() { - assert_eq!(x.unwrap(), i as f64 * 1.1111111f64); - } - for (i, x) in binary_col.iter().enumerate() { - assert_eq!(x.is_some(), i % 2 == 0); - if let Some(x) = x { - assert_eq!(&x[0..7], b"parquet"); - } - } - for (i, x) in fixed_size_binary_col.iter().enumerate() { - assert_eq!(x.unwrap(), &[i as u8; 10]); - } - } - - assert_eq!(row_count, file_metadata.num_rows() as usize); - } - #[test] fn test_read_float32_float64_byte_stream_split() { let path = format!( diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index cfdfd00ec59a..ca66d20994b4 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -1167,10 +1167,13 @@ mod tests { use crate::arrow::arrow_reader::{ ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector, }; + use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use crate::arrow::schema::parquet_to_arrow_schema_and_fields; use crate::arrow::ArrowWriter; use crate::file::metadata::ParquetMetaDataReader; use crate::file::properties::WriterProperties; + #[cfg(feature = "encryption")] + use crate::util::test_common::encryption_util::verify_encryption_test_file_read_async; use arrow::compute::kernels::cmp::eq; use arrow::error::Result as ArrowResult; use arrow_array::builder::{ListBuilder, StringBuilder}; @@ -2492,95 +2495,6 @@ mod tests { assert_eq!(result.len(), 1); } - #[cfg(feature = "encryption")] - async fn verify_encryption_test_file_read( - file: &mut File, - decryption_properties: FileDecryptionProperties, - ) { - let options = - ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties); - - let metadata = ArrowReaderMetadata::load_async(file, options.clone()) - .await - .unwrap(); - let arrow_reader_metadata = ArrowReaderMetadata::load_async(file, options) - .await - .unwrap(); - let file_metadata = metadata.metadata.file_metadata(); - - let record_reader = ParquetRecordBatchStreamBuilder::new_with_metadata( - file.try_clone().await.unwrap(), - arrow_reader_metadata.clone(), - ) - .build() - .unwrap(); - let record_batches = record_reader.try_collect::>().await.unwrap(); - - assert_eq!(file_metadata.num_rows(), 50); - assert_eq!(file_metadata.schema_descr().num_columns(), 8); - assert_eq!( - file_metadata.created_by().unwrap(), - "parquet-cpp-arrow version 19.0.0-SNAPSHOT" - ); - - metadata.metadata.row_groups().iter().for_each(|rg| { - assert_eq!(rg.num_columns(), 8); - assert_eq!(rg.num_rows(), 50); - }); - - let mut row_count = 0; - for batch in record_batches { - row_count += batch.num_rows(); - - let bool_col = batch.column(0).as_boolean(); - let time_col = batch - .column(1) - .as_primitive::(); - let list_col = batch.column(2).as_list::(); - let timestamp_col = batch - .column(3) - .as_primitive::(); - let f32_col = batch.column(4).as_primitive::(); - let f64_col = batch.column(5).as_primitive::(); - let binary_col = batch.column(6).as_binary::(); - let fixed_size_binary_col = batch.column(7).as_fixed_size_binary(); - - for (i, x) in bool_col.iter().enumerate() { - assert_eq!(x.unwrap(), i % 2 == 0); - } - for (i, x) in time_col.iter().enumerate() { - assert_eq!(x.unwrap(), i as i32); - } - for (i, list_item) in list_col.iter().enumerate() { - let list_item = list_item.unwrap(); - let list_item = list_item.as_primitive::(); - assert_eq!(list_item.len(), 2); - assert_eq!(list_item.value(0), ((i * 2) * 1000000000000) as i64); - assert_eq!(list_item.value(1), ((i * 2 + 1) * 1000000000000) as i64); - } - for x in timestamp_col.iter() { - assert!(x.is_some()); - } - for (i, x) in f32_col.iter().enumerate() { - assert_eq!(x.unwrap(), i as f32 * 1.1f32); - } - for (i, x) in f64_col.iter().enumerate() { - assert_eq!(x.unwrap(), i as f64 * 1.1111111f64); - } - for (i, x) in binary_col.iter().enumerate() { - assert_eq!(x.is_some(), i % 2 == 0); - if let Some(x) = x { - assert_eq!(&x[0..7], b"parquet"); - } - } - for (i, x) in fixed_size_binary_col.iter().enumerate() { - assert_eq!(x.unwrap(), &[i as u8; 10]); - } - } - - assert_eq!(row_count, file_metadata.num_rows() as usize); - } - #[tokio::test] #[cfg(feature = "encryption")] async fn test_non_uniform_encryption_plaintext_footer() { @@ -2600,7 +2514,7 @@ mod tests { .build() .unwrap(); - verify_encryption_test_file_read(&mut file, decryption_properties).await; + verify_encryption_test_file_read_async(&mut file, decryption_properties).await; } #[tokio::test] @@ -2680,7 +2594,7 @@ mod tests { .build() .unwrap(); - verify_encryption_test_file_read(&mut file, decryption_properties).await; + verify_encryption_test_file_read_async(&mut file, decryption_properties).await; } #[tokio::test] @@ -2695,7 +2609,7 @@ mod tests { .build() .unwrap(); - verify_encryption_test_file_read(&mut file, decryption_properties).await; + verify_encryption_test_file_read_async(&mut file, decryption_properties).await; } #[tokio::test] diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index c5e134fe43c2..6e797e174964 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -697,7 +697,7 @@ impl ParquetMetaDataReader { /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata /// [Parquet Encryption Spec]: https://parquet.apache.org/docs/file-format/data-pages/encryption/ #[cfg(feature = "encryption")] - pub fn decrypt_metadata( + pub(crate) fn decrypt_metadata( buf: &[u8], encrypted_footer: bool, file_decryption_properties: Option<&FileDecryptionProperties>, diff --git a/parquet/src/util/test_common/encryption_util.rs b/parquet/src/util/test_common/encryption_util.rs new file mode 100644 index 000000000000..c4e06df1d0bc --- /dev/null +++ b/parquet/src/util/test_common/encryption_util.rs @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::arrow_reader::{ + ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, +}; +use crate::arrow::ParquetRecordBatchStreamBuilder; +use crate::encryption::decryption::FileDecryptionProperties; +use crate::file::metadata::FileMetaData; +use arrow_array::cast::AsArray; +use arrow_array::{types, RecordBatch}; +use futures::TryStreamExt; +use std::fs::File; + +pub(crate) fn verify_encryption_test_file_read( + file: File, + decryption_properties: FileDecryptionProperties, +) { + let options = ArrowReaderOptions::default() + .with_file_decryption_properties(decryption_properties.clone()); + let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap(); + let file_metadata = metadata.metadata.file_metadata(); + + let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap(); + let record_reader = builder.build().unwrap(); + let record_batches = record_reader + .map(|x| x.unwrap()) + .collect::>(); + + verify_encryption_test_data(record_batches, file_metadata.clone(), metadata); +} + +pub(crate) async fn verify_encryption_test_file_read_async( + file: &mut tokio::fs::File, + decryption_properties: FileDecryptionProperties, +) { + let options = ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties); + + let metadata = ArrowReaderMetadata::load_async(file, options.clone()) + .await + .unwrap(); + let arrow_reader_metadata = ArrowReaderMetadata::load_async(file, options) + .await + .unwrap(); + let file_metadata = metadata.metadata.file_metadata(); + + let record_reader = ParquetRecordBatchStreamBuilder::new_with_metadata( + file.try_clone().await.unwrap(), + arrow_reader_metadata.clone(), + ) + .build() + .unwrap(); + let record_batches = record_reader.try_collect::>().await.unwrap(); + + verify_encryption_test_data(record_batches, file_metadata.clone(), metadata); +} + +/// Tests reading an encrypted file from the parquet-testing repository +fn verify_encryption_test_data( + record_batches: Vec, + file_metadata: FileMetaData, + metadata: ArrowReaderMetadata, +) { + assert_eq!(file_metadata.num_rows(), 50); + assert_eq!(file_metadata.schema_descr().num_columns(), 8); + + metadata.metadata.row_groups().iter().for_each(|rg| { + assert_eq!(rg.num_columns(), 8); + assert_eq!(rg.num_rows(), 50); + }); + + let mut row_count = 0; + for batch in record_batches { + let batch = batch; + row_count += batch.num_rows(); + + let bool_col = batch.column(0).as_boolean(); + let time_col = batch + .column(1) + .as_primitive::(); + let list_col = batch.column(2).as_list::(); + let timestamp_col = batch + .column(3) + .as_primitive::(); + let f32_col = batch.column(4).as_primitive::(); + let f64_col = batch.column(5).as_primitive::(); + let binary_col = batch.column(6).as_binary::(); + let fixed_size_binary_col = batch.column(7).as_fixed_size_binary(); + + for (i, x) in bool_col.iter().enumerate() { + assert_eq!(x.unwrap(), i % 2 == 0); + } + for (i, x) in time_col.iter().enumerate() { + assert_eq!(x.unwrap(), i as i32); + } + for (i, list_item) in list_col.iter().enumerate() { + let list_item = list_item.unwrap(); + let list_item = list_item.as_primitive::(); + assert_eq!(list_item.len(), 2); + assert_eq!(list_item.value(0), ((i * 2) * 1000000000000) as i64); + assert_eq!(list_item.value(1), ((i * 2 + 1) * 1000000000000) as i64); + } + for x in timestamp_col.iter() { + assert!(x.is_some()); + } + for (i, x) in f32_col.iter().enumerate() { + assert_eq!(x.unwrap(), i as f32 * 1.1f32); + } + for (i, x) in f64_col.iter().enumerate() { + assert_eq!(x.unwrap(), i as f64 * 1.1111111f64); + } + for (i, x) in binary_col.iter().enumerate() { + assert_eq!(x.is_some(), i % 2 == 0); + if let Some(x) = x { + assert_eq!(&x[0..7], b"parquet"); + } + } + for (i, x) in fixed_size_binary_col.iter().enumerate() { + assert_eq!(x.unwrap(), &[i as u8; 10]); + } + } + + assert_eq!(row_count, file_metadata.num_rows() as usize); +} diff --git a/parquet/src/util/test_common/mod.rs b/parquet/src/util/test_common/mod.rs index 8cfc1e6dd423..ac36118c3702 100644 --- a/parquet/src/util/test_common/mod.rs +++ b/parquet/src/util/test_common/mod.rs @@ -22,3 +22,6 @@ pub mod file_util; #[cfg(test)] pub mod rand_gen; + +#[cfg(all(test, feature = "encryption", feature = "arrow"))] +pub mod encryption_util;