Skip to content

Commit

Permalink
Review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Mar 8, 2025
1 parent ee7643e commit 27a1071
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 172 deletions.
81 changes: 2 additions & 79 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,8 @@ mod tests {
use crate::file::writer::SerializedFileWriter;
use crate::schema::parser::parse_message_type;
use crate::schema::types::{Type, TypePtr};
#[cfg(feature = "encryption")]
use crate::util::test_common::encryption_util::verify_encryption_test_file_read;
use crate::util::test_common::rand_gen::RandGen;

#[test]
Expand Down Expand Up @@ -1993,85 +1995,6 @@ mod tests {
};
}

#[cfg(feature = "encryption")]
fn verify_encryption_test_file_read(
file: File,
decryption_properties: FileDecryptionProperties,
) {
let options = ArrowReaderOptions::default()
.with_file_decryption_properties(decryption_properties.clone());
let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap();
let file_metadata = metadata.metadata.file_metadata();

let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
let record_reader = builder.build().unwrap();

assert_eq!(file_metadata.num_rows(), 50);
assert_eq!(file_metadata.schema_descr().num_columns(), 8);
assert_eq!(
file_metadata.created_by().unwrap(),
"parquet-cpp-arrow version 19.0.0-SNAPSHOT"
);

metadata.metadata.row_groups().iter().for_each(|rg| {
assert_eq!(rg.num_columns(), 8);
assert_eq!(rg.num_rows(), 50);
});

let mut row_count = 0;
for batch in record_reader {
let batch = batch.unwrap();
row_count += batch.num_rows();

let bool_col = batch.column(0).as_boolean();
let time_col = batch
.column(1)
.as_primitive::<types::Time32MillisecondType>();
let list_col = batch.column(2).as_list::<i32>();
let timestamp_col = batch
.column(3)
.as_primitive::<types::TimestampNanosecondType>();
let f32_col = batch.column(4).as_primitive::<types::Float32Type>();
let f64_col = batch.column(5).as_primitive::<types::Float64Type>();
let binary_col = batch.column(6).as_binary::<i32>();
let fixed_size_binary_col = batch.column(7).as_fixed_size_binary();

for (i, x) in bool_col.iter().enumerate() {
assert_eq!(x.unwrap(), i % 2 == 0);
}
for (i, x) in time_col.iter().enumerate() {
assert_eq!(x.unwrap(), i as i32);
}
for (i, list_item) in list_col.iter().enumerate() {
let list_item = list_item.unwrap();
let list_item = list_item.as_primitive::<types::Int64Type>();
assert_eq!(list_item.len(), 2);
assert_eq!(list_item.value(0), ((i * 2) * 1000000000000) as i64);
assert_eq!(list_item.value(1), ((i * 2 + 1) * 1000000000000) as i64);
}
for x in timestamp_col.iter() {
assert!(x.is_some());
}
for (i, x) in f32_col.iter().enumerate() {
assert_eq!(x.unwrap(), i as f32 * 1.1f32);
}
for (i, x) in f64_col.iter().enumerate() {
assert_eq!(x.unwrap(), i as f64 * 1.1111111f64);
}
for (i, x) in binary_col.iter().enumerate() {
assert_eq!(x.is_some(), i % 2 == 0);
if let Some(x) = x {
assert_eq!(&x[0..7], b"parquet");
}
}
for (i, x) in fixed_size_binary_col.iter().enumerate() {
assert_eq!(x.unwrap(), &[i as u8; 10]);
}
}

assert_eq!(row_count, file_metadata.num_rows() as usize);
}

#[test]
fn test_read_float32_float64_byte_stream_split() {
let path = format!(
Expand Down
98 changes: 6 additions & 92 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1167,10 +1167,13 @@ mod tests {
use crate::arrow::arrow_reader::{
ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
};
use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
use crate::arrow::ArrowWriter;
use crate::file::metadata::ParquetMetaDataReader;
use crate::file::properties::WriterProperties;
#[cfg(feature = "encryption")]
use crate::util::test_common::encryption_util::verify_encryption_test_file_read_async;
use arrow::compute::kernels::cmp::eq;
use arrow::error::Result as ArrowResult;
use arrow_array::builder::{ListBuilder, StringBuilder};
Expand Down Expand Up @@ -2492,95 +2495,6 @@ mod tests {
assert_eq!(result.len(), 1);
}

#[cfg(feature = "encryption")]
async fn verify_encryption_test_file_read(
file: &mut File,
decryption_properties: FileDecryptionProperties,
) {
let options =
ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);

let metadata = ArrowReaderMetadata::load_async(file, options.clone())
.await
.unwrap();
let arrow_reader_metadata = ArrowReaderMetadata::load_async(file, options)
.await
.unwrap();
let file_metadata = metadata.metadata.file_metadata();

let record_reader = ParquetRecordBatchStreamBuilder::new_with_metadata(
file.try_clone().await.unwrap(),
arrow_reader_metadata.clone(),
)
.build()
.unwrap();
let record_batches = record_reader.try_collect::<Vec<_>>().await.unwrap();

assert_eq!(file_metadata.num_rows(), 50);
assert_eq!(file_metadata.schema_descr().num_columns(), 8);
assert_eq!(
file_metadata.created_by().unwrap(),
"parquet-cpp-arrow version 19.0.0-SNAPSHOT"
);

metadata.metadata.row_groups().iter().for_each(|rg| {
assert_eq!(rg.num_columns(), 8);
assert_eq!(rg.num_rows(), 50);
});

let mut row_count = 0;
for batch in record_batches {
row_count += batch.num_rows();

let bool_col = batch.column(0).as_boolean();
let time_col = batch
.column(1)
.as_primitive::<types::Time32MillisecondType>();
let list_col = batch.column(2).as_list::<i32>();
let timestamp_col = batch
.column(3)
.as_primitive::<types::TimestampNanosecondType>();
let f32_col = batch.column(4).as_primitive::<types::Float32Type>();
let f64_col = batch.column(5).as_primitive::<types::Float64Type>();
let binary_col = batch.column(6).as_binary::<i32>();
let fixed_size_binary_col = batch.column(7).as_fixed_size_binary();

for (i, x) in bool_col.iter().enumerate() {
assert_eq!(x.unwrap(), i % 2 == 0);
}
for (i, x) in time_col.iter().enumerate() {
assert_eq!(x.unwrap(), i as i32);
}
for (i, list_item) in list_col.iter().enumerate() {
let list_item = list_item.unwrap();
let list_item = list_item.as_primitive::<types::Int64Type>();
assert_eq!(list_item.len(), 2);
assert_eq!(list_item.value(0), ((i * 2) * 1000000000000) as i64);
assert_eq!(list_item.value(1), ((i * 2 + 1) * 1000000000000) as i64);
}
for x in timestamp_col.iter() {
assert!(x.is_some());
}
for (i, x) in f32_col.iter().enumerate() {
assert_eq!(x.unwrap(), i as f32 * 1.1f32);
}
for (i, x) in f64_col.iter().enumerate() {
assert_eq!(x.unwrap(), i as f64 * 1.1111111f64);
}
for (i, x) in binary_col.iter().enumerate() {
assert_eq!(x.is_some(), i % 2 == 0);
if let Some(x) = x {
assert_eq!(&x[0..7], b"parquet");
}
}
for (i, x) in fixed_size_binary_col.iter().enumerate() {
assert_eq!(x.unwrap(), &[i as u8; 10]);
}
}

assert_eq!(row_count, file_metadata.num_rows() as usize);
}

#[tokio::test]
#[cfg(feature = "encryption")]
async fn test_non_uniform_encryption_plaintext_footer() {
Expand All @@ -2600,7 +2514,7 @@ mod tests {
.build()
.unwrap();

verify_encryption_test_file_read(&mut file, decryption_properties).await;
verify_encryption_test_file_read_async(&mut file, decryption_properties).await;
}

#[tokio::test]
Expand Down Expand Up @@ -2680,7 +2594,7 @@ mod tests {
.build()
.unwrap();

verify_encryption_test_file_read(&mut file, decryption_properties).await;
verify_encryption_test_file_read_async(&mut file, decryption_properties).await;
}

#[tokio::test]
Expand All @@ -2695,7 +2609,7 @@ mod tests {
.build()
.unwrap();

verify_encryption_test_file_read(&mut file, decryption_properties).await;
verify_encryption_test_file_read_async(&mut file, decryption_properties).await;
}

#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ impl ParquetMetaDataReader {
/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
/// [Parquet Encryption Spec]: https://parquet.apache.org/docs/file-format/data-pages/encryption/
#[cfg(feature = "encryption")]
pub fn decrypt_metadata(
pub(crate) fn decrypt_metadata(
buf: &[u8],
encrypted_footer: bool,
file_decryption_properties: Option<&FileDecryptionProperties>,
Expand Down
138 changes: 138 additions & 0 deletions parquet/src/util/test_common/encryption_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::arrow::arrow_reader::{
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder
};
use crate::arrow::ParquetRecordBatchStreamBuilder;
use crate::encryption::decryption::FileDecryptionProperties;
use crate::file::metadata::FileMetaData;
use arrow_array::cast::AsArray;
use arrow_array::{types, RecordBatch};
use futures::TryStreamExt;
use std::fs::File;

pub(crate) fn verify_encryption_test_file_read(
file: File,
decryption_properties: FileDecryptionProperties,
) {
let options = ArrowReaderOptions::default()
.with_file_decryption_properties(decryption_properties.clone());
let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap();
let file_metadata = metadata.metadata.file_metadata();

let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
let record_reader = builder.build().unwrap();
let record_batches = record_reader
.map(|x| x.unwrap())
.collect::<Vec<RecordBatch>>();

verify_encryption_test_data(record_batches, file_metadata.clone(), metadata);
}

pub(crate) async fn verify_encryption_test_file_read_async(
file: &mut tokio::fs::File,
decryption_properties: FileDecryptionProperties,
) {
let options = ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);

let metadata = ArrowReaderMetadata::load_async(file, options.clone())
.await
.unwrap();
let arrow_reader_metadata = ArrowReaderMetadata::load_async(file, options)
.await
.unwrap();
let file_metadata = metadata.metadata.file_metadata();

let record_reader = ParquetRecordBatchStreamBuilder::new_with_metadata(
file.try_clone().await.unwrap(),
arrow_reader_metadata.clone(),
)
.build()
.unwrap();
let record_batches = record_reader.try_collect::<Vec<_>>().await.unwrap();

verify_encryption_test_data(record_batches, file_metadata.clone(), metadata);
}

/// Tests reading an encrypted file from the parquet-testing repository
fn verify_encryption_test_data(
record_batches: Vec<RecordBatch>,
file_metadata: FileMetaData,
metadata: ArrowReaderMetadata,
) {
assert_eq!(file_metadata.num_rows(), 50);
assert_eq!(file_metadata.schema_descr().num_columns(), 8);

metadata.metadata.row_groups().iter().for_each(|rg| {
assert_eq!(rg.num_columns(), 8);
assert_eq!(rg.num_rows(), 50);
});

let mut row_count = 0;
for batch in record_batches {
let batch = batch;
row_count += batch.num_rows();

let bool_col = batch.column(0).as_boolean();
let time_col = batch
.column(1)
.as_primitive::<types::Time32MillisecondType>();
let list_col = batch.column(2).as_list::<i32>();
let timestamp_col = batch
.column(3)
.as_primitive::<types::TimestampNanosecondType>();
let f32_col = batch.column(4).as_primitive::<types::Float32Type>();
let f64_col = batch.column(5).as_primitive::<types::Float64Type>();
let binary_col = batch.column(6).as_binary::<i32>();
let fixed_size_binary_col = batch.column(7).as_fixed_size_binary();

for (i, x) in bool_col.iter().enumerate() {
assert_eq!(x.unwrap(), i % 2 == 0);
}
for (i, x) in time_col.iter().enumerate() {
assert_eq!(x.unwrap(), i as i32);
}
for (i, list_item) in list_col.iter().enumerate() {
let list_item = list_item.unwrap();
let list_item = list_item.as_primitive::<types::Int64Type>();
assert_eq!(list_item.len(), 2);
assert_eq!(list_item.value(0), ((i * 2) * 1000000000000) as i64);
assert_eq!(list_item.value(1), ((i * 2 + 1) * 1000000000000) as i64);
}
for x in timestamp_col.iter() {
assert!(x.is_some());
}
for (i, x) in f32_col.iter().enumerate() {
assert_eq!(x.unwrap(), i as f32 * 1.1f32);
}
for (i, x) in f64_col.iter().enumerate() {
assert_eq!(x.unwrap(), i as f64 * 1.1111111f64);
}
for (i, x) in binary_col.iter().enumerate() {
assert_eq!(x.is_some(), i % 2 == 0);
if let Some(x) = x {
assert_eq!(&x[0..7], b"parquet");
}
}
for (i, x) in fixed_size_binary_col.iter().enumerate() {
assert_eq!(x.unwrap(), &[i as u8; 10]);
}
}

assert_eq!(row_count, file_metadata.num_rows() as usize);
}
3 changes: 3 additions & 0 deletions parquet/src/util/test_common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ pub mod file_util;

#[cfg(test)]
pub mod rand_gen;

#[cfg(all(test, feature = "encryption", feature = "arrow"))]
pub mod encryption_util;

0 comments on commit 27a1071

Please sign in to comment.