Skip to content

Commit

Permalink
Add TestReader::new, less pub functions, add test_non_uniform_encrypt…
Browse files Browse the repository at this point in the history
…ion_disabled_aad_storage
  • Loading branch information
rok committed Mar 10, 2025
1 parent 751b54e commit fb6d3e3
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 52 deletions.
58 changes: 58 additions & 0 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1881,6 +1881,64 @@ mod tests {
verify_encryption_test_file_read(file, decryption_properties);
}

#[test]
#[cfg(feature = "encryption")]
fn test_non_uniform_encryption_disabled_aad_storage() {
let testdata = arrow::util::test_util::parquet_test_data();
let path =
format!("{testdata}/encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted");
let file = File::open(path.clone()).unwrap();

let footer_key = "0123456789012345".as_bytes(); // 128bit/16
let column_1_key = "1234567890123450".as_bytes();
let column_2_key = "1234567890123451".as_bytes();

// Provided AAD prefix overrides the one stored in the file
let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec())
.with_column_key("double_field", column_1_key.to_vec())
.with_column_key("float_field", column_2_key.to_vec())
.with_aad_prefix("tester")
.build()
.unwrap();

verify_encryption_test_file_read(file, decryption_properties);

// Using wrong AAD prefix should fail
let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec())
.with_column_key("double_field", column_1_key.to_vec())
.with_column_key("float_field", column_2_key.to_vec())
.with_aad_prefix("wrong_aad_prefix")
.build()
.unwrap();

let file = File::open(path.clone()).unwrap();
let options = ArrowReaderOptions::default()
.with_file_decryption_properties(decryption_properties.clone());
let result = ArrowReaderMetadata::load(&file, options.clone());
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"Parquet error: Provided footer key and AAD were unable to decrypt parquet footer"
);

// Using wrong AAD prefix stored in the file should fail
let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec())
.with_column_key("double_field", column_1_key.to_vec())
.with_column_key("float_field", column_2_key.to_vec())
.build()
.unwrap();

let file = File::open(path).unwrap();
let options = ArrowReaderOptions::default()
.with_file_decryption_properties(decryption_properties.clone());
let result = ArrowReaderMetadata::load(&file, options.clone());
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"Parquet error: Provided footer key and AAD were unable to decrypt parquet footer"
);
}

#[test]
fn test_non_uniform_encryption_plaintext_footer_without_decryption() {
let testdata = arrow::util::test_util::parquet_test_data();
Expand Down
57 changes: 21 additions & 36 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1198,8 +1198,21 @@ mod tests {
data: Bytes,
metadata: Arc<ParquetMetaData>,
requests: Arc<Mutex<Vec<Range<usize>>>>,
#[cfg(feature = "encryption")]
file_decryption_properties: Option<FileDecryptionProperties>,
}

#[cfg(feature = "encryption")]
impl TestReader {
async fn new(
data: Bytes,
metadata: Arc<ParquetMetaData>,
requests: Arc<Mutex<Vec<Range<usize>>>>,
) -> Self {
Self {
data,
metadata,
requests,
}
}
}

impl AsyncFileReader for TestReader {
Expand Down Expand Up @@ -1238,8 +1251,6 @@ mod tests {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
#[cfg(feature = "encryption")]
file_decryption_properties: None,
};

let requests = async_reader.requests.clone();
Expand Down Expand Up @@ -1297,8 +1308,6 @@ mod tests {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
#[cfg(feature = "encryption")]
file_decryption_properties: None,
};

let requests = async_reader.requests.clone();
Expand Down Expand Up @@ -1364,8 +1373,6 @@ mod tests {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
#[cfg(feature = "encryption")]
file_decryption_properties: None,
};

let options = ArrowReaderOptions::new().with_page_index(true);
Expand Down Expand Up @@ -1434,8 +1441,6 @@ mod tests {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
#[cfg(feature = "encryption")]
file_decryption_properties: None,
};

let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
Expand Down Expand Up @@ -1482,8 +1487,6 @@ mod tests {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
#[cfg(feature = "encryption")]
file_decryption_properties: None,
};

let options = ArrowReaderOptions::new().with_page_index(true);
Expand Down Expand Up @@ -1567,8 +1570,6 @@ mod tests {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
#[cfg(feature = "encryption")]
file_decryption_properties: None,
};

let options = ArrowReaderOptions::new().with_page_index(true);
Expand Down Expand Up @@ -1640,8 +1641,6 @@ mod tests {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
#[cfg(feature = "encryption")]
file_decryption_properties: None,
};

let options = ArrowReaderOptions::new().with_page_index(true);
Expand Down Expand Up @@ -1692,8 +1691,6 @@ mod tests {
data,
metadata: Arc::new(metadata),
requests: Default::default(),
#[cfg(feature = "encryption")]
file_decryption_properties: None,
};
let requests = test.requests.clone();

Expand Down Expand Up @@ -1771,8 +1768,6 @@ mod tests {
data,
metadata: Arc::new(metadata),
requests: Default::default(),
#[cfg(feature = "encryption")]
file_decryption_properties: None,
};

let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
Expand Down Expand Up @@ -1865,8 +1860,6 @@ mod tests {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
#[cfg(feature = "encryption")]
file_decryption_properties: None,
};

let a_filter =
Expand Down Expand Up @@ -1935,8 +1928,6 @@ mod tests {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
#[cfg(feature = "encryption")]
file_decryption_properties: None,
};

let requests = async_reader.requests.clone();
Expand Down Expand Up @@ -2013,8 +2004,6 @@ mod tests {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
#[cfg(feature = "encryption")]
file_decryption_properties: None,
};

let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
Expand Down Expand Up @@ -2160,8 +2149,6 @@ mod tests {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
#[cfg(feature = "encryption")]
file_decryption_properties: None,
};
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
.await
Expand Down Expand Up @@ -2199,8 +2186,6 @@ mod tests {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
#[cfg(feature = "encryption")]
file_decryption_properties: None,
};

let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
Expand Down Expand Up @@ -2338,8 +2323,6 @@ mod tests {
data,
metadata: Arc::new(metadata),
requests: Default::default(),
#[cfg(feature = "encryption")]
file_decryption_properties: None,
};
let requests = test.requests.clone();

Expand Down Expand Up @@ -2514,7 +2497,9 @@ mod tests {
.build()
.unwrap();

verify_encryption_test_file_read_async(&mut file, decryption_properties).await.unwrap();
verify_encryption_test_file_read_async(&mut file, decryption_properties)
.await
.unwrap();
}

#[tokio::test]
Expand Down Expand Up @@ -2575,7 +2560,7 @@ mod tests {

// Wrong footer key
check_for_error(
"Parquet error: Provided footer key was unable to decrypt parquet footer",
"Parquet error: Provided footer key and AAD were unable to decrypt parquet footer",
&path,
"1123456789012345".as_bytes(),
column_1_key,
Expand Down Expand Up @@ -2687,7 +2672,7 @@ mod tests {
.build()
.unwrap();

let _ = verify_encryption_test_file_read_async(&mut file, decryption_properties).await;
verify_encryption_test_file_read_async(&mut file, decryption_properties).await;
}

#[tokio::test]
Expand All @@ -2702,7 +2687,7 @@ mod tests {
.build()
.unwrap();

let _ = verify_encryption_test_file_read_async(&mut file, decryption_properties).await;
verify_encryption_test_file_read_async(&mut file, decryption_properties).await;
}

#[tokio::test]
Expand Down
9 changes: 3 additions & 6 deletions parquet/src/arrow/async_reader/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,9 @@ impl AsyncFileReader for ParquetObjectReader {
let metadata = ParquetMetaDataReader::new()
.with_column_indexes(self.preload_column_index)
.with_offset_indexes(self.preload_offset_index)
.with_prefetch_hint(self.metadata_size_hint);
#[cfg(feature = "encryption")]
let metadata = metadata
.with_decryption_properties(self.file_decryption_properties.clone().as_ref());

let metadata = metadata.load_and_finish(self, file_size).await?;
.with_prefetch_hint(self.metadata_size_hint)
.load_and_finish(self, file_size)
.await?;
Ok(Arc::new(metadata))
})
}
Expand Down
10 changes: 5 additions & 5 deletions parquet/src/encryption/decryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub fn read_and_decrypt<T: Read>(
// CryptoContext is a data structure that holds the context required to
// decrypt parquet modules (data pages, dictionary pages, etc.).
#[derive(Debug, Clone)]
pub struct CryptoContext {
pub(crate) struct CryptoContext {
pub(crate) row_group_ordinal: usize,
pub(crate) column_ordinal: usize,
pub(crate) page_ordinal: Option<usize>,
Expand Down Expand Up @@ -145,7 +145,7 @@ impl CryptoContext {
pub struct FileDecryptionProperties {
footer_key: Vec<u8>,
column_keys: HashMap<String, Vec<u8>>,
aad_prefix: Option<Vec<u8>>,
pub(crate) aad_prefix: Option<Vec<u8>>,
}

impl FileDecryptionProperties {
Expand Down Expand Up @@ -178,8 +178,8 @@ impl DecryptionPropertiesBuilder {
})
}

pub fn with_aad_prefix(mut self, value: Vec<u8>) -> Self {
self.aad_prefix = Some(value);
pub fn with_aad_prefix(mut self, value: &str) -> Self {
self.aad_prefix = Some(value.as_bytes().to_vec());
self
}

Expand All @@ -191,7 +191,7 @@ impl DecryptionPropertiesBuilder {
}

#[derive(Clone, Debug)]
pub struct FileDecryptor {
pub(crate) struct FileDecryptor {
decryption_properties: FileDecryptionProperties,
footer_decryptor: Option<Arc<dyn BlockDecryptor>>,
file_aad: Vec<u8>,
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/encryption/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@
//! Encryption implementation specific to Parquet, as described
//! in the [spec](https://github.com/apache/parquet-format/blob/master/Encryption.md).
pub mod ciphers;
pub mod decryption;
pub mod modules;
mod ciphers;
pub(crate) mod decryption;
pub(crate) mod modules;
10 changes: 8 additions & 2 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,9 @@ impl ParquetMetaDataReader {
decrypted_fmd_buf = footer_decryptor?
.decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
.map_err(|_| {
general_err!("Provided footer key was unable to decrypt parquet footer")
general_err!(
"Provided footer key and AAD were unable to decrypt parquet footer"
)
})?;
prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());

Expand Down Expand Up @@ -847,7 +849,11 @@ fn get_file_decryptor(
let aad_file_unique = algo
.aad_file_unique
.ok_or_else(|| general_err!("AAD unique file identifier is not set"))?;
let aad_prefix: Vec<u8> = algo.aad_prefix.unwrap_or_default();
let aad_prefix = if file_decryption_properties.aad_prefix.is_some() {
file_decryption_properties.aad_prefix.clone().unwrap()
} else {
algo.aad_prefix.unwrap_or_default()
};

FileDecryptor::new(file_decryption_properties, aad_file_unique, aad_prefix)
}
Expand Down

0 comments on commit fb6d3e3

Please sign in to comment.