From 29173993bc9ad166e9ff97f2e3a888ac6b3358d7 Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Thu, 21 Mar 2024 15:13:27 +0200 Subject: [PATCH] Rebased on apache/arrow-rs/pull/6637 --- .github/workflows/parquet.yml | 2 + parquet/Cargo.toml | 5 + parquet/README.md | 3 + parquet/examples/read_with_rowgroup.rs | 9 +- parquet/src/arrow/arrow_reader/mod.rs | 318 ++++++++++++- parquet/src/arrow/arrow_writer/mod.rs | 67 +++ parquet/src/arrow/async_reader/metadata.rs | 20 +- parquet/src/arrow/async_reader/mod.rs | 426 +++++++++++++++++- parquet/src/arrow/async_reader/store.rs | 77 +++- parquet/src/column/page.rs | 10 + parquet/src/column/writer/mod.rs | 8 + parquet/src/encryption/ciphers.rs | 63 +++ parquet/src/encryption/decrypt.rs | 260 +++++++++++ parquet/src/encryption/decryption.rs | 255 +++++++++++ parquet/src/encryption/mod.rs | 23 + parquet/src/encryption/modules.rs | 94 ++++ parquet/src/errors.rs | 7 + parquet/src/file/footer.rs | 16 +- parquet/src/file/metadata/mod.rs | 129 +++++- parquet/src/file/metadata/reader.rs | 253 ++++++++++- parquet/src/file/mod.rs | 1 + parquet/src/file/properties.rs | 2 + parquet/src/file/serialized_reader.rs | 141 +++++- parquet/src/file/writer.rs | 2 + parquet/src/lib.rs | 4 + .../src/util/test_common/encryption_util.rs | 135 ++++++ parquet/src/util/test_common/mod.rs | 3 + parquet/tests/arrow_writer_layout.rs | 2 + 28 files changed, 2255 insertions(+), 80 deletions(-) create mode 100644 parquet/src/encryption/ciphers.rs create mode 100644 parquet/src/encryption/decrypt.rs create mode 100644 parquet/src/encryption/decryption.rs create mode 100644 parquet/src/encryption/mod.rs create mode 100644 parquet/src/encryption/modules.rs create mode 100644 parquet/src/util/test_common/encryption_util.rs diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml index 4c46fde198bd..96c7ab8f4e3a 100644 --- a/.github/workflows/parquet.yml +++ b/.github/workflows/parquet.yml @@ -111,6 +111,8 @@ jobs: run: cargo check -p parquet --all-targets --all-features - name: Check compilation --all-targets --no-default-features --features json run: cargo check -p parquet --all-targets --no-default-features --features json + - name: Check compilation --no-default-features --features encryption --features async + run: cargo check -p parquet --no-default-features --features encryption --features async # test the parquet crate builds against wasm32 in stable rust wasm32-build: diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 00d4c5b750f8..809b5cf6c210 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -30,6 +30,8 @@ rust-version = { workspace = true } [target.'cfg(target_arch = "wasm32")'.dependencies] ahash = { version = "0.8", default-features = false, features = ["compile-time-rng"] } +# See https://github.com/briansmith/ring/issues/918#issuecomment-2077788925 +ring = { version = "0.17", default-features = false, features = ["wasm32_unknown_unknown_js", "std"], optional = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } @@ -70,6 +72,7 @@ half = { version = "2.1", default-features = false, features = ["num-traits"] } sysinfo = { version = "0.33.0", optional = true, default-features = false, features = ["system"] } crc32fast = { version = "1.4.2", optional = true, default-features = false } simdutf8 = { version = "0.1.5", optional = true, default-features = false } +ring = { version = "0.17", default-features = false, features = ["std"], optional = true } [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } @@ -125,6 +128,8 @@ sysinfo = ["dep:sysinfo"] crc = ["dep:crc32fast"] # Enable SIMD UTF-8 validation simdutf8 = ["dep:simdutf8"] +# Enable Parquet modular encryption support +encryption = ["dep:ring"] [[example]] diff --git a/parquet/README.md b/parquet/README.md index 1224e52f3f5a..9245664b4ef0 100644 --- a/parquet/README.md +++ b/parquet/README.md @@ -63,6 +63,7 @@ The `parquet` crate provides the following features which may be enabled in your - `crc` - enables functionality to automatically verify checksums of each page (if present) when decoding - `experimental` - Experimental APIs which may change, even between minor releases - `simdutf8` (default) - Use the [`simdutf8`] crate for SIMD-accelerated UTF-8 validation +- `encryption` - support for reading / writing encrypted Parquet files [`arrow`]: https://crates.io/crates/arrow [`simdutf8`]: https://crates.io/crates/simdutf8 @@ -76,12 +77,14 @@ The `parquet` crate provides the following features which may be enabled in your - [x] Row record reader - [x] Arrow record reader - [x] Async support (to Arrow) + - [x] Encrypted files - [x] Statistics support - [x] Write support - [x] Primitive column value writers - [ ] Row record writer - [x] Arrow record writer - [x] Async support + - [ ] Encrypted files - [x] Predicate pushdown - [x] Parquet format 4.0.0 support diff --git a/parquet/examples/read_with_rowgroup.rs b/parquet/examples/read_with_rowgroup.rs index 8cccc7fe14ac..44d25596110e 100644 --- a/parquet/examples/read_with_rowgroup.rs +++ b/parquet/examples/read_with_rowgroup.rs @@ -35,7 +35,12 @@ async fn main() -> Result<()> { let mut file = File::open(&path).await.unwrap(); // The metadata could be cached in other places, this example only shows how to read - let metadata = file.get_metadata().await?; + let metadata = file + .get_metadata( + #[cfg(feature = "encryption")] + None, + ) + .await?; for rg in metadata.row_groups() { let mut rowgroup = InMemoryRowGroup::create(rg.clone(), ProjectionMask::all()); @@ -121,6 +126,8 @@ impl RowGroups for InMemoryRowGroup { self.metadata.column(i), self.num_rows(), None, + #[cfg(feature = "encryption")] + None, )?); Ok(Box::new(ColumnChunkIterator { diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 6eba04c86f91..2ee69dcf1068 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -17,9 +17,6 @@ //! Contains reader which reads parquet data into arrow [`RecordBatch`] -use std::collections::VecDeque; -use std::sync::Arc; - use arrow_array::cast::AsArray; use arrow_array::Array; use arrow_array::{RecordBatch, RecordBatchReader}; @@ -27,12 +24,16 @@ use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; use arrow_select::filter::prep_null_mask_filter; pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; pub use selection::{RowSelection, RowSelector}; +use std::collections::VecDeque; +use std::sync::Arc; pub use crate::arrow::array_reader::RowGroups; use crate::arrow::array_reader::{build_array_reader, ArrayReader}; use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField}; use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask}; use crate::column::page::{PageIterator, PageReader}; +#[cfg(feature = "encryption")] +use crate::encryption::decrypt::{CryptoContext, FileDecryptionProperties}; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use crate::file::reader::{ChunkReader, SerializedPageReader}; @@ -252,6 +253,9 @@ pub struct ArrowReaderOptions { supplied_schema: Option, /// If true, attempt to read `OffsetIndex` and `ColumnIndex` pub(crate) page_index: bool, + /// If encryption is enabled, the file decryption properties can be provided + #[cfg(feature = "encryption")] + pub(crate) file_decryption_properties: Option, } impl ArrowReaderOptions { @@ -317,7 +321,7 @@ impl ArrowReaderOptions { /// /// // Create the reader and read the data using the supplied schema. /// let mut reader = builder.build().unwrap(); - /// let _batch = reader.next().unwrap().unwrap(); + /// let _batch = reader.next().unwrap().unwrap(); /// ``` pub fn with_schema(self, schema: SchemaRef) -> Self { Self { @@ -342,6 +346,20 @@ impl ArrowReaderOptions { pub fn with_page_index(self, page_index: bool) -> Self { Self { page_index, ..self } } + + /// Provide the file decryption properties to use when reading encrypted parquet files. + /// + /// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided. + #[cfg(feature = "encryption")] + pub fn with_file_decryption_properties( + self, + file_decryption_properties: FileDecryptionProperties, + ) -> Self { + Self { + file_decryption_properties: Some(file_decryption_properties), + ..self + } + } } /// The metadata necessary to construct a [`ArrowReaderBuilder`] @@ -380,9 +398,11 @@ impl ArrowReaderMetadata { /// `Self::metadata` is missing the page index, this function will attempt /// to load the page index by making an object store request. pub fn load(reader: &T, options: ArrowReaderOptions) -> Result { - let metadata = ParquetMetaDataReader::new() - .with_page_indexes(options.page_index) - .parse_and_finish(reader)?; + let metadata = ParquetMetaDataReader::new().with_page_indexes(options.page_index); + #[cfg(feature = "encryption")] + let metadata = + metadata.with_decryption_properties(options.file_decryption_properties.as_ref()); + let metadata = metadata.parse_and_finish(reader)?; Self::try_new(Arc::new(metadata), options) } @@ -553,6 +573,7 @@ impl ParquetRecordBatchReaderBuilder { /// # use arrow_schema::{DataType, Field, Schema}; /// # use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder}; /// # use parquet::arrow::ArrowWriter; + /// # /// # let mut file: Vec = Vec::with_capacity(1024); /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)])); /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap(); @@ -670,14 +691,55 @@ impl Iterator for ReaderPageIterator { let meta = rg.column(self.column_idx); let offset_index = self.metadata.offset_index(); // `offset_index` may not exist and `i[rg_idx]` will be empty. - // To avoid `i[rg_idx][self.oolumn_idx`] panic, we need to filter out empty `i[rg_idx]`. + // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out empty `i[rg_idx]`. let page_locations = offset_index .filter(|i| !i[rg_idx].is_empty()) .map(|i| i[rg_idx][self.column_idx].page_locations.clone()); let total_rows = rg.num_rows() as usize; let reader = self.reader.clone(); + #[cfg(feature = "encryption")] + let crypto_context = if let Some(file_decryptor) = self.metadata.file_decryptor() { + let column_name = self + .metadata + .file_metadata() + .schema_descr() + .column(self.column_idx); + + if file_decryptor.is_column_encrypted(column_name.name()) { + let data_decryptor = file_decryptor.get_column_data_decryptor(column_name.name()); + let data_decryptor = match data_decryptor { + Ok(data_decryptor) => data_decryptor, + Err(err) => return Some(Err(err)), + }; + + let metadata_decryptor = + file_decryptor.get_column_metadata_decryptor(column_name.name()); + let metadata_decryptor = match metadata_decryptor { + Ok(metadata_decryptor) => metadata_decryptor, + Err(err) => return Some(Err(err)), + }; + + let crypto_context = CryptoContext::new( + rg_idx, + self.column_idx, + data_decryptor, + metadata_decryptor, + file_decryptor.file_aad().clone(), + ); + Some(Arc::new(crypto_context)) + } else { + None + } + } else { + None + }; + let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations); + + #[cfg(feature = "encryption")] + let ret = ret.map(|reader| reader.with_crypto_context(crypto_context)); + Some(ret.map(|x| Box::new(x) as _)) } } @@ -943,6 +1005,7 @@ mod tests { }; use arrow_select::concat::concat_batches; + use crate::arrow::arrow_reader::ArrowReaderMetadata; use crate::arrow::arrow_reader::{ ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector, @@ -955,11 +1018,15 @@ mod tests { BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType, FloatType, Int32Type, Int64Type, Int96Type, }; + #[cfg(feature = "encryption")] + use crate::encryption::decrypt::FileDecryptionProperties; use crate::errors::Result; use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion}; use crate::file::writer::SerializedFileWriter; use crate::schema::parser::parse_message_type; use crate::schema::types::{Type, TypePtr}; + #[cfg(feature = "encryption")] + use crate::util::test_common::encryption_util::verify_encryption_test_file_read; use crate::util::test_common::rand_gen::RandGen; #[test] @@ -1788,6 +1855,241 @@ mod tests { assert!(col.value(2).is_nan()); } + #[test] + #[cfg(feature = "encryption")] + fn test_non_uniform_encryption_plaintext_footer() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted"); + let file = File::open(path).unwrap(); + + // There is always a footer key even with a plaintext footer, + // but this is used for signing the footer. + let footer_key = "0123456789012345".as_bytes(); // 128bit/16 + let column_1_key = "1234567890123450".as_bytes(); + let column_2_key = "1234567890123451".as_bytes(); + + let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec()) + .with_column_key("double_field", column_1_key.to_vec()) + .with_column_key("float_field", column_2_key.to_vec()) + .build() + .unwrap(); + + verify_encryption_test_file_read(file, decryption_properties); + } + + #[test] + #[cfg(feature = "encryption")] + fn test_non_uniform_encryption_disabled_aad_storage() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = + format!("{testdata}/encrypt_columns_and_footer_disable_aad_storage.parquet.encrypted"); + let file = File::open(path.clone()).unwrap(); + + let footer_key = "0123456789012345".as_bytes(); // 128bit/16 + let column_1_key = "1234567890123450".as_bytes(); + let column_2_key = "1234567890123451".as_bytes(); + + // Can read successfully when providing the correct AAD prefix + let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec()) + .with_column_key("double_field", column_1_key.to_vec()) + .with_column_key("float_field", column_2_key.to_vec()) + .with_aad_prefix("tester".as_bytes().to_vec()) + .build() + .unwrap(); + + verify_encryption_test_file_read(file, decryption_properties); + + // Using wrong AAD prefix should fail + let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec()) + .with_column_key("double_field", column_1_key.to_vec()) + .with_column_key("float_field", column_2_key.to_vec()) + .with_aad_prefix("wrong_aad_prefix".as_bytes().to_vec()) + .build() + .unwrap(); + + let file = File::open(path.clone()).unwrap(); + let options = ArrowReaderOptions::default() + .with_file_decryption_properties(decryption_properties.clone()); + let result = ArrowReaderMetadata::load(&file, options.clone()); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Parquet error: Provided footer key and AAD were unable to decrypt parquet footer" + ); + + // Not providing any AAD prefix should fail as it isn't stored in the file + let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec()) + .with_column_key("double_field", column_1_key.to_vec()) + .with_column_key("float_field", column_2_key.to_vec()) + .build() + .unwrap(); + + let file = File::open(path).unwrap(); + let options = ArrowReaderOptions::default() + .with_file_decryption_properties(decryption_properties.clone()); + let result = ArrowReaderMetadata::load(&file, options.clone()); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Parquet error: Provided footer key and AAD were unable to decrypt parquet footer" + ); + } + + #[test] + fn test_non_uniform_encryption_plaintext_footer_without_decryption() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted"); + let file = File::open(&path).unwrap(); + + let metadata = ArrowReaderMetadata::load(&file, Default::default()).unwrap(); + let file_metadata = metadata.metadata.file_metadata(); + + assert_eq!(file_metadata.num_rows(), 50); + assert_eq!(file_metadata.schema_descr().num_columns(), 8); + assert_eq!( + file_metadata.created_by().unwrap(), + "parquet-cpp-arrow version 19.0.0-SNAPSHOT" + ); + + metadata.metadata.row_groups().iter().for_each(|rg| { + assert_eq!(rg.num_columns(), 8); + assert_eq!(rg.num_rows(), 50); + }); + + // Should be able to read unencrypted columns. Test reading one column. + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + let mask = ProjectionMask::leaves(builder.parquet_schema(), [1]); + let record_reader = builder.with_projection(mask).build().unwrap(); + + let mut row_count = 0; + for batch in record_reader { + let batch = batch.unwrap(); + row_count += batch.num_rows(); + + let time_col = batch + .column(0) + .as_primitive::(); + for (i, x) in time_col.iter().enumerate() { + assert_eq!(x.unwrap(), i as i32); + } + } + + assert_eq!(row_count, file_metadata.num_rows() as usize); + + // Reading an encrypted column should fail + let file = File::open(&path).unwrap(); + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + let mask = ProjectionMask::leaves(builder.parquet_schema(), [4]); + let mut record_reader = builder.with_projection(mask).build().unwrap(); + + match record_reader.next() { + Some(Err(ArrowError::ParquetError(s))) => { + assert!(s.contains("protocol error")); + } + _ => { + panic!("Expected ArrowError::ParquetError"); + } + }; + } + + #[test] + #[cfg(feature = "encryption")] + fn test_non_uniform_encryption() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); + let file = File::open(path).unwrap(); + + let footer_key = "0123456789012345".as_bytes(); // 128bit/16 + let column_1_key = "1234567890123450".as_bytes(); + let column_2_key = "1234567890123451".as_bytes(); + + let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec()) + .with_column_key("double_field", column_1_key.to_vec()) + .with_column_key("float_field", column_2_key.to_vec()) + .build() + .unwrap(); + + verify_encryption_test_file_read(file, decryption_properties); + } + + #[test] + #[cfg(feature = "encryption")] + fn test_uniform_encryption() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/uniform_encryption.parquet.encrypted"); + let file = File::open(path).unwrap(); + + let key_code: &[u8] = "0123456789012345".as_bytes(); + let decryption_properties = FileDecryptionProperties::builder(key_code.to_vec()) + .build() + .unwrap(); + + verify_encryption_test_file_read(file, decryption_properties); + } + + #[test] + #[cfg(not(feature = "encryption"))] + fn test_decrypting_without_encryption_flag_fails() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/uniform_encryption.parquet.encrypted"); + let file = File::open(path).unwrap(); + + let options = ArrowReaderOptions::default(); + let result = ArrowReaderMetadata::load(&file, options.clone()); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Parquet error: Parquet file has an encrypted footer but the encryption feature is disabled" + ); + } + + #[test] + #[cfg(feature = "encryption")] + fn test_decrypting_without_decryption_properties_fails() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/uniform_encryption.parquet.encrypted"); + let file = File::open(path).unwrap(); + + let options = ArrowReaderOptions::default(); + let result = ArrowReaderMetadata::load(&file, options.clone()); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Parquet error: Parquet file has an encrypted footer but no decryption properties were provided" + ); + } + + #[test] + #[cfg(feature = "encryption")] + fn test_aes_ctr_encryption() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_and_footer_ctr.parquet.encrypted"); + let file = File::open(path).unwrap(); + + let footer_key = "0123456789012345".as_bytes(); + let column_1_key = "1234567890123450".as_bytes(); + let column_2_key = "1234567890123451".as_bytes(); + + let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec()) + .with_column_key("double_field", column_1_key.to_vec()) + .with_column_key("float_field", column_2_key.to_vec()) + .build() + .unwrap(); + + let options = + ArrowReaderOptions::default().with_file_decryption_properties(decryption_properties); + let metadata = ArrowReaderMetadata::load(&file, options); + + match metadata { + Err(crate::errors::ParquetError::NYI(s)) => { + assert!(s.contains("AES_GCM_CTR_V1")); + } + _ => { + panic!("Expected ParquetError::NYI"); + } + }; + } + #[test] fn test_read_float32_float64_byte_stream_split() { let path = format!( diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index f1081f1481cc..1cf274249638 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -3527,4 +3527,71 @@ mod tests { .unwrap(); assert_eq!(batches.len(), 0); } + + #[cfg(feature = "encryption")] + #[test] + fn test_uniform_encryption_roundtrip() { + let arrays = [ + Int32Array::from((0..100).collect::>()), + Int32Array::from((0..50).collect::>()), + Int32Array::from((200..500).collect::>()), + ]; + + let schema = Arc::new(Schema::new(vec![Field::new( + "int", + ArrowDataType::Int32, + false, + )])); + + let file = tempfile::tempfile().unwrap(); + + // todo: add encryption + let props = WriterProperties::builder() + .set_max_row_group_size(200) + .build(); + + let mut writer = + ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap(); + + for array in arrays { + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap(); + writer.write(&batch).unwrap(); + } + + writer.close().unwrap(); + + // todo: try_new_with_decryption + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]); + + let batches = builder + .with_batch_size(100) + .build() + .unwrap() + .collect::>>() + .unwrap(); + + assert_eq!(batches.len(), 5); + assert!(batches.iter().all(|x| x.num_columns() == 1)); + + let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect(); + + assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]); + + let values: Vec<_> = batches + .iter() + .flat_map(|x| { + x.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .cloned() + }) + .collect(); + + let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect(); + assert_eq!(&values, &expected_values) + } } diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index 526818845b5c..4f41cd6ceebd 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -113,7 +113,8 @@ impl MetadataLoader { let mut footer = [0; FOOTER_SIZE]; footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]); - let length = ParquetMetaDataReader::decode_footer(&footer)?; + let footer = ParquetMetaDataReader::decode_footer_tail(&footer)?; + let length = footer.metadata_length(); if file_size < length + FOOTER_SIZE { return Err(ParquetError::EOF(format!( @@ -127,13 +128,26 @@ impl MetadataLoader { let (metadata, remainder) = if length > suffix_len - FOOTER_SIZE { let metadata_start = file_size - length - FOOTER_SIZE; let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?; - (ParquetMetaDataReader::decode_metadata(&meta)?, None) + ( + ParquetMetaDataReader::decode_metadata( + &meta, + footer.is_encrypted_footer(), + #[cfg(feature = "encryption")] + None, + )?, + None, + ) } else { let metadata_start = file_size - length - FOOTER_SIZE - footer_start; let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE]; ( - ParquetMetaDataReader::decode_metadata(slice)?, + ParquetMetaDataReader::decode_metadata( + slice, + footer.is_encrypted_footer(), + #[cfg(feature = "encryption")] + None, + )?, Some((footer_start, suffix.slice(..metadata_start))), ) }; diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 2c8a59399de1..9afd7d835528 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -52,7 +52,7 @@ use crate::bloom_filter::{ }; use crate::column::page::{PageIterator, PageReader}; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; +use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; use crate::file::FOOTER_SIZE; @@ -61,6 +61,9 @@ use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHas mod metadata; pub use metadata::*; +#[cfg(feature = "encryption")] +use crate::encryption::decrypt::CryptoContext; + #[cfg(feature = "object_store")] mod store; @@ -104,6 +107,19 @@ pub trait AsyncFileReader: Send { /// allowing fine-grained control over how metadata is sourced, in particular allowing /// for caching, pre-fetching, catalog metadata, etc... fn get_metadata(&mut self) -> BoxFuture<'_, Result>>; + + /// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file, + /// allowing fine-grained control over how metadata is sourced, in particular allowing + /// for caching, pre-fetching, catalog metadata, decrypting, etc... + /// + /// By default calls `get_metadata()` + fn get_metadata_with_options<'a>( + &'a mut self, + options: &'a ArrowReaderOptions, + ) -> BoxFuture<'a, Result>> { + let _ = options; + self.get_metadata() + } } /// This allows Box to be used as an AsyncFileReader, @@ -119,6 +135,13 @@ impl AsyncFileReader for Box { fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { self.as_mut().get_metadata() } + + fn get_metadata_with_options<'a>( + &'a mut self, + options: &'a ArrowReaderOptions, + ) -> BoxFuture<'a, Result>> { + self.as_mut().get_metadata_with_options(options) + } } impl AsyncFileReader for T { @@ -138,6 +161,38 @@ impl AsyncFileReader for T { .boxed() } + fn get_metadata_with_options<'a>( + &'a mut self, + options: &'a ArrowReaderOptions, + ) -> BoxFuture<'a, Result>> { + const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64; + async move { + self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?; + + let mut buf = [0_u8; FOOTER_SIZE]; + self.read_exact(&mut buf).await?; + + let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?; + let metadata_len = footer.metadata_length(); + self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64)) + .await?; + + let mut buf = Vec::with_capacity(metadata_len); + self.take(metadata_len as _).read_to_end(&mut buf).await?; + + let metadata_reader = ParquetMetaDataReader::new(); + + #[cfg(feature = "encryption")] + let metadata_reader = metadata_reader + .with_decryption_properties(options.file_decryption_properties.as_ref()); + + let parquet_metadata = metadata_reader.decode_footer_metadata(&buf, &footer)?; + + Ok(Arc::new(parquet_metadata)) + } + .boxed() + } + fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64; async move { @@ -146,7 +201,15 @@ impl AsyncFileReader for T { let mut buf = [0_u8; FOOTER_SIZE]; self.read_exact(&mut buf).await?; - let metadata_len = ParquetMetaDataReader::decode_footer(&buf)?; + let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?; + let metadata_len = footer.metadata_length(); + + if footer.is_encrypted_footer() { + return Err(general_err!( + "Parquet file has an encrypted footer but decryption properties were not provided" + )); + } + self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64)) .await?; @@ -175,7 +238,7 @@ impl ArrowReaderMetadata { ) -> Result { // TODO: this is all rather awkward. It would be nice if AsyncFileReader::get_metadata // took an argument to fetch the page indexes. - let mut metadata = input.get_metadata().await?; + let mut metadata = input.get_metadata_with_options(&options).await?; if options.page_index && metadata.column_index().is_none() @@ -183,6 +246,13 @@ impl ArrowReaderMetadata { { let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone()); let mut reader = ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true); + + #[cfg(feature = "encryption")] + { + reader = + reader.with_decryption_properties(options.file_decryption_properties.as_ref()); + } + reader.load_page_index(input).await?; metadata = Arc::new(reader.finish()?) } @@ -201,7 +271,7 @@ pub struct AsyncReader(T); /// /// This builder handles reading the parquet file metadata, allowing consumers /// to use this information to select what specific columns, row groups, etc... -/// they wish to be read by the resulting stream +/// they wish to be read by the resulting stream. /// /// See examples on [`ParquetRecordBatchStreamBuilder::new`] /// @@ -344,7 +414,7 @@ impl ParquetRecordBatchStreamBuilder { } /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided async source - /// and [`ArrowReaderOptions`] + /// and [`ArrowReaderOptions`]. pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result { let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?; Ok(Self::new_with_metadata(input, metadata)) @@ -563,11 +633,12 @@ where .map(|x| x[row_group_idx].as_slice()); let mut row_group = InMemoryRowGroup { - metadata: meta, // schema: meta.schema_descr_ptr(), row_count: meta.num_rows() as usize, column_chunks: vec![None; meta.columns().len()], offset_index, + row_group_idx, + metadata: self.metadata.as_ref(), }; if let Some(filter) = self.filter.as_mut() { @@ -849,10 +920,11 @@ where /// An in-memory collection of column chunks struct InMemoryRowGroup<'a> { - metadata: &'a RowGroupMetaData, offset_index: Option<&'a [OffsetIndexMetaData]>, column_chunks: Vec>>, row_count: usize, + row_group_idx: usize, + metadata: &'a ParquetMetaData, } impl InMemoryRowGroup<'_> { @@ -863,6 +935,7 @@ impl InMemoryRowGroup<'_> { projection: &ProjectionMask, selection: Option<&RowSelection>, ) -> Result<()> { + let metadata = self.metadata.row_group(self.row_group_idx); if let Some((selection, offset_index)) = selection.zip(self.offset_index) { // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the // `RowSelection` @@ -871,7 +944,7 @@ impl InMemoryRowGroup<'_> { let fetch_ranges = self .column_chunks .iter() - .zip(self.metadata.columns()) + .zip(metadata.columns()) .enumerate() .filter(|&(idx, (chunk, _chunk_meta))| { chunk.is_none() && projection.leaf_included(idx) @@ -910,7 +983,7 @@ impl InMemoryRowGroup<'_> { } *chunk = Some(Arc::new(ColumnChunkData::Sparse { - length: self.metadata.column(idx).byte_range().1 as usize, + length: metadata.column(idx).byte_range().1 as usize, data: offsets.into_iter().zip(chunks.into_iter()).collect(), })) } @@ -922,7 +995,7 @@ impl InMemoryRowGroup<'_> { .enumerate() .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx)) .map(|(idx, _chunk)| { - let column = self.metadata.column(idx); + let column = metadata.column(idx); let (start, length) = column.byte_range(); start as usize..(start + length) as usize }) @@ -937,7 +1010,7 @@ impl InMemoryRowGroup<'_> { if let Some(data) = chunk_data.next() { *chunk = Some(Arc::new(ColumnChunkData::Dense { - offset: self.metadata.column(idx).byte_range().0 as usize, + offset: metadata.column(idx).byte_range().0 as usize, data, })); } @@ -954,6 +1027,36 @@ impl RowGroups for InMemoryRowGroup<'_> { } fn column_chunks(&self, i: usize) -> Result> { + #[cfg(feature = "encryption")] + let crypto_context = if let Some(file_decryptor) = self.metadata.clone().file_decryptor() { + let column_name = &self + .metadata + .clone() + .file_metadata() + .schema_descr() + .column(i); + + if file_decryptor.is_column_encrypted(column_name.name()) { + let data_decryptor = + file_decryptor.get_column_data_decryptor(column_name.name())?; + let metadata_decryptor = + file_decryptor.get_column_metadata_decryptor(column_name.name())?; + + let crypto_context = CryptoContext::new( + self.row_group_idx, + i, + data_decryptor, + metadata_decryptor, + file_decryptor.file_aad().clone(), + ); + Some(Arc::new(crypto_context)) + } else { + None + } + } else { + None + }; + match &self.column_chunks[i] { None => Err(ParquetError::General(format!( "Invalid column index {i}, column was not fetched" @@ -964,12 +1067,18 @@ impl RowGroups for InMemoryRowGroup<'_> { // filter out empty offset indexes (old versions specified Some(vec![]) when no present) .filter(|index| !index.is_empty()) .map(|index| index[i].page_locations.clone()); - let page_reader: Box = Box::new(SerializedPageReader::new( + let metadata = self.metadata.row_group(self.row_group_idx); + let page_reader = SerializedPageReader::new( data.clone(), - self.metadata.column(i), + metadata.column(i), self.row_count, page_locations, - )?); + )?; + + #[cfg(feature = "encryption")] + let page_reader = page_reader.with_crypto_context(crypto_context); + + let page_reader: Box = Box::new(page_reader); Ok(Box::new(ColumnChunkIterator { reader: Some(Ok(page_reader)), @@ -1055,14 +1164,20 @@ mod tests { use crate::arrow::arrow_reader::{ ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector, }; + use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use crate::arrow::schema::parquet_to_arrow_schema_and_fields; use crate::arrow::ArrowWriter; + #[cfg(feature = "encryption")] + use crate::encryption::decrypt::FileDecryptionProperties; use crate::file::metadata::ParquetMetaDataReader; use crate::file::properties::WriterProperties; + #[cfg(feature = "encryption")] + use crate::util::test_common::encryption_util::verify_encryption_test_file_read_async; use arrow::compute::kernels::cmp::eq; use arrow::error::Result as ArrowResult; use arrow_array::builder::{ListBuilder, StringBuilder}; use arrow_array::cast::AsArray; + use arrow_array::types; use arrow_array::types::Int32Type; use arrow_array::{ Array, ArrayRef, Int32Array, Int8Array, RecordBatchReader, Scalar, StringArray, @@ -1074,6 +1189,7 @@ mod tests { use std::collections::HashMap; use std::sync::{Arc, Mutex}; use tempfile::tempfile; + use tokio::fs::File; #[derive(Clone)] struct TestReader { @@ -1091,6 +1207,13 @@ mod tests { fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { futures::future::ready(Ok(self.metadata.clone())).boxed() } + + fn get_metadata_with_options<'a>( + &'a mut self, + _options: &'a ArrowReaderOptions, + ) -> BoxFuture<'a, Result>> { + futures::future::ready(Ok(self.metadata.clone())).boxed() + } } #[tokio::test] @@ -2336,4 +2459,279 @@ mod tests { let result = reader.try_collect::>().await.unwrap(); assert_eq!(result.len(), 1); } + + #[tokio::test] + #[cfg(feature = "encryption")] + async fn test_non_uniform_encryption_plaintext_footer() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted"); + let mut file = File::open(&path).await.unwrap(); + + // There is always a footer key even with a plaintext footer, + // but this is used for signing the footer. + let footer_key = "0123456789012345".as_bytes().to_vec(); // 128bit/16 + let column_1_key = "1234567890123450".as_bytes().to_vec(); + let column_2_key = "1234567890123451".as_bytes().to_vec(); + + let decryption_properties = FileDecryptionProperties::builder(footer_key) + .with_column_key("double_field", column_1_key) + .with_column_key("float_field", column_2_key) + .build() + .unwrap(); + + verify_encryption_test_file_read_async(&mut file, decryption_properties) + .await + .unwrap(); + } + + #[tokio::test] + #[cfg(feature = "encryption")] + async fn test_misspecified_encryption_keys() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); + + // There is always a footer key even with a plaintext footer, + // but this is used for signing the footer. + let footer_key = "0123456789012345".as_bytes(); // 128bit/16 + let column_1_key = "1234567890123450".as_bytes(); + let column_2_key = "1234567890123451".as_bytes(); + + // read file with keys and check for expected error message + async fn check_for_error( + expected_message: &str, + path: &String, + footer_key: &[u8], + column_1_key: &[u8], + column_2_key: &[u8], + ) { + let mut file = File::open(&path).await.unwrap(); + + let mut decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec()); + + if !column_1_key.is_empty() { + decryption_properties = + decryption_properties.with_column_key("double_field", column_1_key.to_vec()); + } + + if !column_2_key.is_empty() { + decryption_properties = + decryption_properties.with_column_key("float_field", column_2_key.to_vec()); + } + + let decryption_properties = decryption_properties.build().unwrap(); + + match verify_encryption_test_file_read_async(&mut file, decryption_properties).await { + Ok(_) => { + panic!("did not get expected error") + } + Err(e) => { + assert_eq!(e.to_string(), expected_message); + } + } + } + + // Too short footer key + check_for_error( + "Parquet error: Invalid footer key. Failed to create AES key", + &path, + "bad_pwd".as_bytes(), + column_1_key, + column_2_key, + ) + .await; + + // Wrong footer key + check_for_error( + "Parquet error: Provided footer key and AAD were unable to decrypt parquet footer", + &path, + "1123456789012345".as_bytes(), + column_1_key, + column_2_key, + ) + .await; + + // Missing column key + check_for_error("Parquet error: Unable to decrypt column 'double_field', perhaps the column key is wrong or missing?", + &path, footer_key, "".as_bytes(), column_2_key).await; + + // Too short column key + check_for_error( + "Parquet error: Failed to create AES key", + &path, + footer_key, + "abc".as_bytes(), + column_2_key, + ) + .await; + + // Wrong column key + check_for_error("Parquet error: Unable to decrypt column 'double_field', perhaps the column key is wrong or missing?", + &path, footer_key, "1123456789012345".as_bytes(), column_2_key).await; + + // Mixed up keys + check_for_error("Parquet error: Unable to decrypt column 'float_field', perhaps the column key is wrong or missing?", + &path, footer_key, column_2_key, column_1_key).await; + } + + #[tokio::test] + async fn test_non_uniform_encryption_plaintext_footer_without_decryption() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted"); + let mut file = File::open(&path).await.unwrap(); + + let metadata = ArrowReaderMetadata::load_async(&mut file, Default::default()) + .await + .unwrap(); + let file_metadata = metadata.metadata.file_metadata(); + + assert_eq!(file_metadata.num_rows(), 50); + assert_eq!(file_metadata.schema_descr().num_columns(), 8); + assert_eq!( + file_metadata.created_by().unwrap(), + "parquet-cpp-arrow version 19.0.0-SNAPSHOT" + ); + + metadata.metadata.row_groups().iter().for_each(|rg| { + assert_eq!(rg.num_columns(), 8); + assert_eq!(rg.num_rows(), 50); + }); + + // Should be able to read unencrypted columns. Test reading one column. + let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap(); + let mask = ProjectionMask::leaves(builder.parquet_schema(), [1]); + let record_reader = builder.with_projection(mask).build().unwrap(); + let record_batches = record_reader.try_collect::>().await.unwrap(); + + let mut row_count = 0; + for batch in record_batches { + let batch = batch; + row_count += batch.num_rows(); + + let time_col = batch + .column(0) + .as_primitive::(); + for (i, x) in time_col.iter().enumerate() { + assert_eq!(x.unwrap(), i as i32); + } + } + + assert_eq!(row_count, file_metadata.num_rows() as usize); + + // Reading an encrypted column should fail + let file = File::open(&path).await.unwrap(); + let builder = ParquetRecordBatchStreamBuilder::new(file).await.unwrap(); + let mask = ProjectionMask::leaves(builder.parquet_schema(), [4]); + let mut record_reader = builder.with_projection(mask).build().unwrap(); + + match record_reader.next().await { + Some(Err(ParquetError::ArrowError(s))) => { + assert!(s.contains("protocol error")); + } + _ => { + panic!("Expected ArrowError::ParquetError"); + } + }; + } + + #[tokio::test] + #[cfg(feature = "encryption")] + async fn test_non_uniform_encryption() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); + let mut file = File::open(&path).await.unwrap(); + + let footer_key = "0123456789012345".as_bytes().to_vec(); // 128bit/16 + let column_1_key = "1234567890123450".as_bytes().to_vec(); + let column_2_key = "1234567890123451".as_bytes().to_vec(); + + let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec()) + .with_column_key("double_field", column_1_key) + .with_column_key("float_field", column_2_key) + .build() + .unwrap(); + + verify_encryption_test_file_read_async(&mut file, decryption_properties) + .await + .unwrap(); + } + + #[tokio::test] + #[cfg(feature = "encryption")] + async fn test_uniform_encryption() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/uniform_encryption.parquet.encrypted"); + let mut file = File::open(&path).await.unwrap(); + + let key_code: &[u8] = "0123456789012345".as_bytes(); + let decryption_properties = FileDecryptionProperties::builder(key_code.to_vec()) + .build() + .unwrap(); + + verify_encryption_test_file_read_async(&mut file, decryption_properties) + .await + .unwrap(); + } + + #[tokio::test] + #[cfg(feature = "encryption")] + async fn test_aes_ctr_encryption() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_and_footer_ctr.parquet.encrypted"); + let mut file = File::open(&path).await.unwrap(); + + let footer_key = "0123456789012345".as_bytes().to_vec(); + let column_1_key = "1234567890123450".as_bytes().to_vec(); + let column_2_key = "1234567890123451".as_bytes().to_vec(); + + let decryption_properties = FileDecryptionProperties::builder(footer_key) + .with_column_key("double_field", column_1_key) + .with_column_key("float_field", column_2_key) + .build() + .unwrap(); + + let options = + ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties); + let metadata = ArrowReaderMetadata::load_async(&mut file, options).await; + + match metadata { + Err(ParquetError::NYI(s)) => { + assert!(s.contains("AES_GCM_CTR_V1")); + } + _ => { + panic!("Expected ParquetError::NYI"); + } + }; + } + + #[tokio::test] + #[cfg(not(feature = "encryption"))] + async fn test_decrypting_without_encryption_flag_fails() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/uniform_encryption.parquet.encrypted"); + let mut file = File::open(&path).await.unwrap(); + + let options = ArrowReaderOptions::new(); + let result = ArrowReaderMetadata::load_async(&mut file, options).await; + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Parquet error: Parquet file has an encrypted footer but the encryption feature is disabled" + ); + } + + #[tokio::test] + #[cfg(feature = "encryption")] + async fn test_decrypting_without_decryption_properties_fails() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/uniform_encryption.parquet.encrypted"); + let mut file = File::open(&path).await.unwrap(); + + let options = ArrowReaderOptions::new(); + let result = ArrowReaderMetadata::load_async(&mut file, options).await; + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Parquet error: Parquet file has an encrypted footer but no decryption properties were provided" + ); + } } diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index fd0397b5e1fc..6922f3d1f7a3 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -17,15 +17,15 @@ use std::{ops::Range, sync::Arc}; +use crate::arrow::arrow_reader::ArrowReaderOptions; +use crate::arrow::async_reader::AsyncFileReader; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use bytes::Bytes; use futures::{future::BoxFuture, FutureExt, TryFutureExt}; use object_store::{path::Path, ObjectMeta, ObjectStore}; use tokio::runtime::Handle; -use crate::arrow::async_reader::AsyncFileReader; -use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; - /// Reads Parquet files in object storage using [`ObjectStore`]. /// /// ```no_run @@ -175,6 +175,27 @@ impl AsyncFileReader for ParquetObjectReader { Ok(Arc::new(metadata)) }) } + + fn get_metadata_with_options<'a>( + &'a mut self, + options: &'a ArrowReaderOptions, + ) -> BoxFuture<'a, Result>> { + Box::pin(async move { + let file_size = self.meta.size; + let metadata = ParquetMetaDataReader::new() + .with_column_indexes(self.preload_column_index) + .with_offset_indexes(self.preload_offset_index) + .with_prefetch_hint(self.metadata_size_hint); + + #[cfg(feature = "encryption")] + let metadata = + metadata.with_decryption_properties(options.file_decryption_properties.as_ref()); + + let metadata = metadata.load_and_finish(self, file_size).await?; + + Ok(Arc::new(metadata)) + }) + } } #[cfg(test)] @@ -186,16 +207,17 @@ mod tests { use futures::TryStreamExt; + use crate::arrow::arrow_reader::ArrowReaderOptions; + use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; + use crate::arrow::ParquetRecordBatchStreamBuilder; + use crate::encryption::decrypt::FileDecryptionProperties; + use crate::errors::ParquetError; use arrow::util::test_util::parquet_test_data; use futures::FutureExt; use object_store::local::LocalFileSystem; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; - use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; - use crate::arrow::ParquetRecordBatchStreamBuilder; - use crate::errors::ParquetError; - async fn get_meta_store() -> (ObjectMeta, Arc) { let res = parquet_test_data(); let store = LocalFileSystem::new_with_prefix(res).unwrap(); @@ -208,6 +230,45 @@ mod tests { (meta, Arc::new(store) as Arc) } + #[cfg(feature = "encryption")] + async fn get_encrypted_meta_store() -> (ObjectMeta, Arc) { + let res = parquet_test_data(); + let store = LocalFileSystem::new_with_prefix(res).unwrap(); + + let meta = store + .head(&Path::from("uniform_encryption.parquet.encrypted")) + .await + .unwrap(); + + (meta, Arc::new(store) as Arc) + } + + #[tokio::test] + #[cfg(feature = "encryption")] + async fn test_encrypted() { + let (meta, store) = get_encrypted_meta_store().await; + + let key_code: &[u8] = "0123456789012345".as_bytes(); + let decryption_properties = FileDecryptionProperties::builder(key_code.to_vec()) + .build() + .unwrap(); + let options = + ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties); + let mut reader = ParquetObjectReader::new(store.clone(), meta.clone()); + let metadata = reader.get_metadata_with_options(&options).await.unwrap(); + + assert_eq!(metadata.num_row_groups(), 1); + + let reader = ParquetObjectReader::new(store, meta); + let builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, options) + .await + .unwrap(); + let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap(); + + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 50); + } + #[tokio::test] async fn test_simple() { let (meta, store) = get_meta_store().await; diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index 5c866318e185..b5afe6b93389 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -90,6 +90,16 @@ impl Page { } } + /// Returns whether this page is any version of a data page + pub fn is_data_page(&self) -> bool { + matches!(self, Page::DataPage { .. } | Page::DataPageV2 { .. }) + } + + /// Returns whether this page is a dictionary page + pub fn is_dictionary_page(&self) -> bool { + matches!(self, Page::DictionaryPage { .. }) + } + /// Returns internal byte buffer reference for this page. pub fn buffer(&self) -> &Bytes { match self { diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 5f34f34cbb7a..50156a26e276 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -2105,6 +2105,8 @@ mod tests { r.rows_written as usize, None, Arc::new(props), + #[cfg(feature = "encryption")] + None, ) .unwrap(); @@ -2157,6 +2159,8 @@ mod tests { r.rows_written as usize, None, Arc::new(props), + #[cfg(feature = "encryption")] + None, ) .unwrap(); @@ -2292,6 +2296,8 @@ mod tests { r.rows_written as usize, None, Arc::new(props), + #[cfg(feature = "encryption")] + None, ) .unwrap(), ); @@ -3741,6 +3747,8 @@ mod tests { result.rows_written as usize, None, Arc::new(props), + #[cfg(feature = "encryption")] + None, ) .unwrap(), ); diff --git a/parquet/src/encryption/ciphers.rs b/parquet/src/encryption/ciphers.rs new file mode 100644 index 000000000000..9b5a04d622f0 --- /dev/null +++ b/parquet/src/encryption/ciphers.rs @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::errors::ParquetError::General; +use crate::errors::Result; +use ring::aead::{Aad, LessSafeKey, UnboundKey, AES_128_GCM}; +use std::fmt::Debug; + +const NONCE_LEN: usize = 12; +const TAG_LEN: usize = 16; +const SIZE_LEN: usize = 4; + +pub trait BlockDecryptor: Debug + Send + Sync { + fn decrypt(&self, length_and_ciphertext: &[u8], aad: &[u8]) -> Result>; +} + +#[derive(Debug, Clone)] +pub(crate) struct RingGcmBlockDecryptor { + key: LessSafeKey, +} + +impl RingGcmBlockDecryptor { + pub(crate) fn new(key_bytes: &[u8]) -> Result { + // todo support other key sizes + let key = UnboundKey::new(&AES_128_GCM, key_bytes) + .map_err(|_| General("Failed to create AES key".to_string()))?; + + Ok(Self { + key: LessSafeKey::new(key), + }) + } +} + +impl BlockDecryptor for RingGcmBlockDecryptor { + fn decrypt(&self, length_and_ciphertext: &[u8], aad: &[u8]) -> Result> { + let mut result = Vec::with_capacity(length_and_ciphertext.len() - SIZE_LEN - NONCE_LEN); + result.extend_from_slice(&length_and_ciphertext[SIZE_LEN + NONCE_LEN..]); + + let nonce = ring::aead::Nonce::try_assume_unique_for_key( + &length_and_ciphertext[SIZE_LEN..SIZE_LEN + NONCE_LEN], + )?; + + self.key.open_in_place(nonce, Aad::from(aad), &mut result)?; + + // Truncate result to remove the tag + result.resize(result.len() - TAG_LEN, 0u8); + Ok(result) + } +} diff --git a/parquet/src/encryption/decrypt.rs b/parquet/src/encryption/decrypt.rs new file mode 100644 index 000000000000..d5bfe3cfc076 --- /dev/null +++ b/parquet/src/encryption/decrypt.rs @@ -0,0 +1,260 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::encryption::ciphers::{BlockDecryptor, RingGcmBlockDecryptor}; +use crate::encryption::modules::{create_module_aad, ModuleType}; +use crate::errors::{ParquetError, Result}; +use std::collections::HashMap; +use std::io::Read; +use std::sync::Arc; + +pub fn read_and_decrypt( + decryptor: &Arc, + input: &mut T, + aad: &[u8], +) -> Result> { + let mut len_bytes = [0; 4]; + input.read_exact(&mut len_bytes)?; + let ciphertext_len = u32::from_le_bytes(len_bytes) as usize; + let mut ciphertext = vec![0; 4 + ciphertext_len]; + input.read_exact(&mut ciphertext[4..])?; + + decryptor.decrypt(&ciphertext, aad.as_ref()) +} + +// CryptoContext is a data structure that holds the context required to +// decrypt parquet modules (data pages, dictionary pages, etc.). +#[derive(Debug, Clone)] +pub(crate) struct CryptoContext { + pub(crate) row_group_idx: usize, + pub(crate) column_ordinal: usize, + pub(crate) page_ordinal: Option, + pub(crate) dictionary_page: bool, + // We have separate data and metadata decryptors because + // in GCM CTR mode, the metadata and data pages use + // different algorithms. + data_decryptor: Arc, + metadata_decryptor: Arc, + file_aad: Vec, +} + +impl CryptoContext { + pub(crate) fn new( + row_group_idx: usize, + column_ordinal: usize, + data_decryptor: Arc, + metadata_decryptor: Arc, + file_aad: Vec, + ) -> Self { + Self { + row_group_idx, + column_ordinal, + page_ordinal: None, + dictionary_page: false, + data_decryptor, + metadata_decryptor, + file_aad, + } + } + + pub(crate) fn with_page_ordinal(&self, page_ordinal: usize) -> Self { + Self { + row_group_idx: self.row_group_idx, + column_ordinal: self.column_ordinal, + page_ordinal: Some(page_ordinal), + dictionary_page: false, + data_decryptor: self.data_decryptor.clone(), + metadata_decryptor: self.metadata_decryptor.clone(), + file_aad: self.file_aad.clone(), + } + } + + pub(crate) fn create_page_header_aad(&self) -> Result> { + let module_type = if self.dictionary_page { + ModuleType::DictionaryPageHeader + } else { + ModuleType::DataPageHeader + }; + + create_module_aad( + self.file_aad(), + module_type, + self.row_group_idx, + self.column_ordinal, + self.page_ordinal, + ) + } + + pub(crate) fn create_page_aad(&self) -> Result> { + let module_type = if self.dictionary_page { + ModuleType::DictionaryPage + } else { + ModuleType::DataPage + }; + + create_module_aad( + self.file_aad(), + module_type, + self.row_group_idx, + self.column_ordinal, + self.page_ordinal, + ) + } + + pub(crate) fn for_dictionary_page(&self) -> Self { + Self { + row_group_idx: self.row_group_idx, + column_ordinal: self.column_ordinal, + page_ordinal: self.page_ordinal, + dictionary_page: true, + data_decryptor: self.data_decryptor.clone(), + metadata_decryptor: self.metadata_decryptor.clone(), + file_aad: self.file_aad.clone(), + } + } + + pub(crate) fn data_decryptor(&self) -> &Arc { + &self.data_decryptor + } + + pub(crate) fn file_aad(&self) -> &Vec { + &self.file_aad + } +} + +/// FileDecryptionProperties hold keys and AAD data required to decrypt a Parquet file. +#[derive(Debug, Clone, PartialEq)] +pub struct FileDecryptionProperties { + footer_key: Vec, + column_keys: HashMap>, + pub(crate) aad_prefix: Option>, +} + +impl FileDecryptionProperties { + /// Returns a new FileDecryptionProperties builder + pub fn builder(footer_key: Vec) -> DecryptionPropertiesBuilder { + DecryptionPropertiesBuilder::new(footer_key) + } +} + +pub struct DecryptionPropertiesBuilder { + footer_key: Vec, + column_keys: HashMap>, + aad_prefix: Option>, +} + +impl DecryptionPropertiesBuilder { + pub fn new(footer_key: Vec) -> DecryptionPropertiesBuilder { + Self { + footer_key, + column_keys: HashMap::default(), + aad_prefix: None, + } + } + + pub fn build(self) -> Result { + Ok(FileDecryptionProperties { + footer_key: self.footer_key, + column_keys: self.column_keys, + aad_prefix: self.aad_prefix, + }) + } + + pub fn with_aad_prefix(mut self, value: Vec) -> Self { + self.aad_prefix = Some(value); + self + } + + pub fn with_column_key(mut self, column_name: &str, decryption_key: Vec) -> Self { + self.column_keys + .insert(column_name.to_string(), decryption_key); + self + } +} + +#[derive(Clone, Debug)] +pub(crate) struct FileDecryptor { + decryption_properties: FileDecryptionProperties, + footer_decryptor: Option>, + file_aad: Vec, +} + +impl PartialEq for FileDecryptor { + fn eq(&self, other: &Self) -> bool { + self.decryption_properties == other.decryption_properties + } +} + +impl FileDecryptor { + pub(crate) fn new( + decryption_properties: &FileDecryptionProperties, + aad_file_unique: Vec, + aad_prefix: Vec, + ) -> Result { + let file_aad = [aad_prefix.as_slice(), aad_file_unique.as_slice()].concat(); + // todo decr: if no key available yet (not set in properties, should be retrieved from metadata) + let footer_decryptor = RingGcmBlockDecryptor::new(&decryption_properties.footer_key) + .map_err(|e| { + general_err!( + "Invalid footer key. {}", + e.to_string().replace("Parquet error: ", "") + ) + })?; + Ok(Self { + footer_decryptor: Some(Arc::new(footer_decryptor)), + decryption_properties: decryption_properties.clone(), + file_aad, + }) + } + + pub(crate) fn get_footer_decryptor(&self) -> Result, ParquetError> { + Ok(self.footer_decryptor.clone().unwrap()) + } + + pub(crate) fn get_column_data_decryptor( + &self, + column_name: &str, + ) -> Result, ParquetError> { + match self.decryption_properties.column_keys.get(column_name) { + Some(column_key) => Ok(Arc::new(RingGcmBlockDecryptor::new(column_key)?)), + None => self.get_footer_decryptor(), + } + } + + pub(crate) fn get_column_metadata_decryptor( + &self, + column_name: &str, + ) -> Result, ParquetError> { + // Once GCM CTR mode is implemented, data and metadata decryptors may be different + self.get_column_data_decryptor(column_name) + } + + pub(crate) fn file_aad(&self) -> &Vec { + &self.file_aad + } + + pub(crate) fn is_column_encrypted(&self, column_name: &str) -> bool { + // Column is encrypted if either uniform encryption is used or an encryption key is set for the column + match self.decryption_properties.column_keys.is_empty() { + false => self + .decryption_properties + .column_keys + .contains_key(column_name), + true => true, + } + } +} diff --git a/parquet/src/encryption/decryption.rs b/parquet/src/encryption/decryption.rs new file mode 100644 index 000000000000..a019ace580e3 --- /dev/null +++ b/parquet/src/encryption/decryption.rs @@ -0,0 +1,255 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::encryption::ciphers::{BlockDecryptor, RingGcmBlockDecryptor}; +use crate::encryption::modules::{create_module_aad, ModuleType}; +use crate::errors::Result; +use std::collections::HashMap; +use std::io::Read; +use std::sync::Arc; + +pub fn read_and_decrypt( + decryptor: &Arc, + input: &mut T, + aad: &[u8], +) -> Result> { + let mut len_bytes = [0; 4]; + input.read_exact(&mut len_bytes)?; + let ciphertext_len = u32::from_le_bytes(len_bytes) as usize; + let mut ciphertext = vec![0; 4 + ciphertext_len]; + input.read_exact(&mut ciphertext[4..])?; + + decryptor.decrypt(&ciphertext, aad.as_ref()) +} + +#[derive(Debug, Clone)] +pub struct CryptoContext { + pub(crate) row_group_ordinal: usize, + pub(crate) column_ordinal: usize, + pub(crate) page_ordinal: Option, + pub(crate) dictionary_page: bool, + // We have separate data and metadata decryptors because + // in GCM CTR mode, the metadata and data pages use + // different algorithms. + data_decryptor: Arc, + metadata_decryptor: Arc, + file_aad: Vec, +} + +impl CryptoContext { + pub fn new( + row_group_ordinal: usize, + column_ordinal: usize, + data_decryptor: Arc, + metadata_decryptor: Arc, + file_aad: Vec, + ) -> Self { + Self { + row_group_ordinal, + column_ordinal, + page_ordinal: None, + dictionary_page: false, + data_decryptor, + metadata_decryptor, + file_aad, + } + } + + pub fn with_page_ordinal(&self, page_ordinal: usize) -> Self { + Self { + row_group_ordinal: self.row_group_ordinal, + column_ordinal: self.column_ordinal, + page_ordinal: Some(page_ordinal), + dictionary_page: false, + data_decryptor: self.data_decryptor.clone(), + metadata_decryptor: self.metadata_decryptor.clone(), + file_aad: self.file_aad.clone(), + } + } + + pub(crate) fn create_page_header_aad(&self) -> Result> { + let module_type = if self.dictionary_page { + ModuleType::DictionaryPageHeader + } else { + ModuleType::DataPageHeader + }; + + create_module_aad( + self.file_aad(), + module_type, + self.row_group_ordinal, + self.column_ordinal, + self.page_ordinal, + ) + } + + pub(crate) fn create_page_aad(&self) -> Result> { + let module_type = if self.dictionary_page { + ModuleType::DictionaryPage + } else { + ModuleType::DataPage + }; + + create_module_aad( + self.file_aad(), + module_type, + self.row_group_ordinal, + self.column_ordinal, + self.page_ordinal, + ) + } + + pub fn for_dictionary_page(&self) -> Self { + Self { + row_group_ordinal: self.row_group_ordinal, + column_ordinal: self.column_ordinal, + page_ordinal: self.page_ordinal, + dictionary_page: true, + data_decryptor: self.data_decryptor.clone(), + metadata_decryptor: self.metadata_decryptor.clone(), + file_aad: self.file_aad.clone(), + } + } + + pub fn data_decryptor(&self) -> &Arc { + &self.data_decryptor + } + + pub fn metadata_decryptor(&self) -> &Arc { + &self.metadata_decryptor + } + + pub fn file_aad(&self) -> &Vec { + &self.file_aad + } +} + +/// FileDecryptionProperties hold keys and AAD data required to decrypt a Parquet file. +#[derive(Debug, Clone, PartialEq)] +pub struct FileDecryptionProperties { + footer_key: Vec, + column_keys: Option, Vec>>, + aad_prefix: Option>, +} + +impl FileDecryptionProperties { + /// Returns a new FileDecryptionProperties builder + pub fn builder(footer_key: Vec) -> DecryptionPropertiesBuilder { + DecryptionPropertiesBuilder::new(footer_key) + } +} + +pub struct DecryptionPropertiesBuilder { + footer_key: Vec, + column_keys: Option, Vec>>, + aad_prefix: Option>, +} + +impl DecryptionPropertiesBuilder { + pub fn new(footer_key: Vec) -> DecryptionPropertiesBuilder { + Self { + footer_key, + column_keys: None, + aad_prefix: None, + } + } + + pub fn build(self) -> Result { + Ok(FileDecryptionProperties { + footer_key: self.footer_key, + column_keys: self.column_keys, + aad_prefix: self.aad_prefix, + }) + } + + pub fn with_aad_prefix(mut self, value: Vec) -> Self { + self.aad_prefix = Some(value); + self + } + + pub fn with_column_key(mut self, column_name: Vec, decryption_key: Vec) -> Self { + let mut column_keys = self.column_keys.unwrap_or_default(); + column_keys.insert(column_name, decryption_key); + self.column_keys = Some(column_keys); + self + } +} + +#[derive(Clone, Debug)] +pub struct FileDecryptor { + decryption_properties: FileDecryptionProperties, + footer_decryptor: Option>, + file_aad: Vec, +} + +impl PartialEq for FileDecryptor { + fn eq(&self, other: &Self) -> bool { + self.decryption_properties == other.decryption_properties + } +} + +impl FileDecryptor { + pub(crate) fn new( + decryption_properties: &FileDecryptionProperties, + aad_file_unique: Vec, + aad_prefix: Vec, + ) -> Self { + let file_aad = [aad_prefix.as_slice(), aad_file_unique.as_slice()].concat(); + let footer_decryptor = RingGcmBlockDecryptor::new(&decryption_properties.footer_key); + + Self { + // todo decr: if no key available yet (not set in properties, will be retrieved from metadata) + footer_decryptor: Some(Arc::new(footer_decryptor)), + decryption_properties: decryption_properties.clone(), + file_aad, + } + } + + pub(crate) fn get_footer_decryptor(&self) -> Arc { + self.footer_decryptor.clone().unwrap() + } + + pub(crate) fn get_column_data_decryptor(&self, column_name: &[u8]) -> Arc { + match self.decryption_properties.column_keys.as_ref() { + None => self.get_footer_decryptor(), + Some(column_keys) => match column_keys.get(column_name) { + None => self.get_footer_decryptor(), + Some(column_key) => Arc::new(RingGcmBlockDecryptor::new(column_key)), + }, + } + } + + pub(crate) fn get_column_metadata_decryptor( + &self, + column_name: &[u8], + ) -> Arc { + // Once GCM CTR mode is implemented, data and metadata decryptors may be different + self.get_column_data_decryptor(column_name) + } + + pub(crate) fn file_aad(&self) -> &Vec { + &self.file_aad + } + + pub(crate) fn is_column_encrypted(&self, column_name: &[u8]) -> bool { + // Column is encrypted if either uniform encryption is used or an encryption key is set for the column + match self.decryption_properties.column_keys.as_ref() { + None => true, + Some(keys) => keys.contains_key(column_name), + } + } +} diff --git a/parquet/src/encryption/mod.rs b/parquet/src/encryption/mod.rs new file mode 100644 index 000000000000..6e9168edb9f2 --- /dev/null +++ b/parquet/src/encryption/mod.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Encryption implementation specific to Parquet, as described +//! in the [spec](https://github.com/apache/parquet-format/blob/master/Encryption.md). + +pub(crate) mod ciphers; +pub(crate) mod decrypt; +pub(crate) mod modules; diff --git a/parquet/src/encryption/modules.rs b/parquet/src/encryption/modules.rs new file mode 100644 index 000000000000..6bf9306b256d --- /dev/null +++ b/parquet/src/encryption/modules.rs @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::errors::ParquetError; + +#[derive(PartialEq)] +pub(crate) enum ModuleType { + Footer = 0, + ColumnMetaData = 1, + DataPage = 2, + DictionaryPage = 3, + DataPageHeader = 4, + DictionaryPageHeader = 5, +} + +pub fn create_footer_aad(file_aad: &[u8]) -> crate::errors::Result> { + create_module_aad(file_aad, ModuleType::Footer, 0, 0, None) +} + +pub(crate) fn create_module_aad( + file_aad: &[u8], + module_type: ModuleType, + row_group_idx: usize, + column_ordinal: usize, + page_ordinal: Option, +) -> crate::errors::Result> { + let module_buf = [module_type as u8]; + + if module_buf[0] == (ModuleType::Footer as u8) { + let mut aad = Vec::with_capacity(file_aad.len() + 1); + aad.extend_from_slice(file_aad); + aad.extend_from_slice(module_buf.as_ref()); + return Ok(aad); + } + + if row_group_idx > i16::MAX as usize { + return Err(general_err!( + "Encrypted parquet files can't have more than {} row groups: {}", + i16::MAX, + row_group_idx + )); + } + if column_ordinal > i16::MAX as usize { + return Err(general_err!( + "Encrypted parquet files can't have more than {} columns: {}", + i16::MAX, + column_ordinal + )); + } + + if module_buf[0] != (ModuleType::DataPageHeader as u8) + && module_buf[0] != (ModuleType::DataPage as u8) + { + let mut aad = Vec::with_capacity(file_aad.len() + 5); + aad.extend_from_slice(file_aad); + aad.extend_from_slice(module_buf.as_ref()); + aad.extend_from_slice((row_group_idx as i16).to_le_bytes().as_ref()); + aad.extend_from_slice((column_ordinal as i16).to_le_bytes().as_ref()); + return Ok(aad); + } + + let page_ordinal = + page_ordinal.ok_or_else(|| general_err!("Page ordinal must be set for data pages"))?; + + if page_ordinal > i16::MAX as usize { + return Err(general_err!( + "Encrypted parquet files can't have more than {} pages per column chunk: {}", + i16::MAX, + page_ordinal + )); + } + + let mut aad = Vec::with_capacity(file_aad.len() + 7); + aad.extend_from_slice(file_aad); + aad.extend_from_slice(module_buf.as_ref()); + aad.extend_from_slice((row_group_idx as i16).to_le_bytes().as_ref()); + aad.extend_from_slice((column_ordinal as i16).to_le_bytes().as_ref()); + aad.extend_from_slice((page_ordinal as i16).to_le_bytes().as_ref()); + Ok(aad) +} diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs index d749287bba62..4cb1f99c3cf6 100644 --- a/parquet/src/errors.rs +++ b/parquet/src/errors.rs @@ -132,6 +132,13 @@ impl From for ParquetError { } } +#[cfg(feature = "encryption")] +impl From for ParquetError { + fn from(e: ring::error::Unspecified) -> ParquetError { + ParquetError::External(Box::new(e)) + } +} + /// A specialized `Result` for Parquet errors. pub type Result = result::Result; diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs index bd31c9142f56..5be084259e18 100644 --- a/parquet/src/file/footer.rs +++ b/parquet/src/file/footer.rs @@ -52,13 +52,18 @@ pub fn parse_metadata(chunk_reader: &R) -> Result Result { - ParquetMetaDataReader::decode_metadata(buf) + ParquetMetaDataReader::decode_metadata( + buf, + false, + #[cfg(feature = "encryption")] + None, + ) } /// Decodes the Parquet footer returning the metadata length in bytes @@ -72,7 +77,10 @@ pub fn decode_metadata(buf: &[u8]) -> Result { /// | len | 'PAR1' | /// +-----+--------+ /// ``` -#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader::decode_footer")] +#[deprecated( + since = "53.1.0", + note = "Use ParquetMetaDataReader::decode_footer_tail" +)] pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result { - ParquetMetaDataReader::decode_footer(slice) + ParquetMetaDataReader::decode_footer_tail(slice).map(|f| f.metadata_length()) } diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 252cb99f3f36..217685049ea9 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -95,26 +95,33 @@ mod memory; pub(crate) mod reader; mod writer; -use std::ops::Range; -use std::sync::Arc; - -use crate::format::{ - BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation, RowGroup, - SizeStatistics, SortingColumn, -}; - use crate::basic::{ColumnOrder, Compression, Encoding, Type}; +#[cfg(feature = "encryption")] +use crate::encryption::{ + decrypt::FileDecryptor, + modules::{create_module_aad, ModuleType}, +}; use crate::errors::{ParquetError, Result}; pub(crate) use crate::file::metadata::memory::HeapSize; use crate::file::page_encoding_stats::{self, PageEncodingStats}; use crate::file::page_index::index::Index; use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::statistics::{self, Statistics}; +#[cfg(feature = "encryption")] +use crate::format::ColumnCryptoMetaData; +use crate::format::{ + BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation, RowGroup, + SizeStatistics, SortingColumn, +}; use crate::schema::types::{ ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, SchemaDescriptor, Type as SchemaType, }; +#[cfg(feature = "encryption")] +use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; pub use reader::ParquetMetaDataReader; +use std::ops::Range; +use std::sync::Arc; pub use writer::ParquetMetaDataWriter; pub(crate) use writer::ThriftMetadataWriter; @@ -174,6 +181,9 @@ pub struct ParquetMetaData { column_index: Option, /// Offset index for each page in each column chunk offset_index: Option, + /// Optional file decryptor + #[cfg(feature = "encryption")] + file_decryptor: Option, } impl ParquetMetaData { @@ -183,11 +193,20 @@ impl ParquetMetaData { ParquetMetaData { file_metadata, row_groups, + #[cfg(feature = "encryption")] + file_decryptor: None, column_index: None, offset_index: None, } } + /// Adds [`FileDecryptor`] to this metadata instance to enable decryption of + /// encrypted data. + #[cfg(feature = "encryption")] + pub(crate) fn with_file_decryptor(&mut self, file_decryptor: Option) { + self.file_decryptor = file_decryptor; + } + /// Creates Parquet metadata from file metadata, a list of row /// group metadata, and the column index structures. #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataBuilder")] @@ -214,6 +233,12 @@ impl ParquetMetaData { &self.file_metadata } + /// Returns file decryptor as reference. + #[cfg(feature = "encryption")] + pub(crate) fn file_decryptor(&self) -> Option<&FileDecryptor> { + self.file_decryptor.as_ref() + } + /// Returns number of row groups in this file. pub fn num_row_groups(&self) -> usize { self.row_groups.len() @@ -599,6 +624,81 @@ impl RowGroupMetaData { self.file_offset } + /// Method to convert from encrypted Thrift. + #[cfg(feature = "encryption")] + fn from_encrypted_thrift( + schema_descr: SchemaDescPtr, + mut rg: RowGroup, + decryptor: Option<&FileDecryptor>, + ) -> Result { + if schema_descr.num_columns() != rg.columns.len() { + return Err(general_err!( + "Column count mismatch. Schema has {} columns while Row Group has {}", + schema_descr.num_columns(), + rg.columns.len() + )); + } + let total_byte_size = rg.total_byte_size; + let num_rows = rg.num_rows; + let mut columns = vec![]; + + for (i, (mut c, d)) in rg + .columns + .drain(0..) + .zip(schema_descr.columns()) + .enumerate() + { + // Read encrypted metadata if it's present and we have a decryptor. + if let (true, Some(decryptor)) = (c.encrypted_column_metadata.is_some(), decryptor) { + let column_decryptor = match c.crypto_metadata.as_ref() { + None => { + return Err(general_err!( + "No crypto_metadata is set for column '{}', which has encrypted metadata", + d.path().string() + )); + } + Some(ColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(crypto_metadata)) => { + let column_name = crypto_metadata.path_in_schema.join("."); + decryptor.get_column_metadata_decryptor(column_name.as_str())? + } + Some(ColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_)) => { + decryptor.get_footer_decryptor()? + } + }; + + let column_aad = create_module_aad( + decryptor.file_aad(), + ModuleType::ColumnMetaData, + rg.ordinal.unwrap() as usize, + i, + None, + )?; + + let buf = c.encrypted_column_metadata.clone().unwrap(); + let decrypted_cc_buf = + column_decryptor.decrypt(buf.as_slice(), column_aad.as_ref()).map_err(|_| { + general_err!("Unable to decrypt column '{}', perhaps the column key is wrong or missing?", + d.path().string()) + })?; + + let mut prot = TCompactSliceInputProtocol::new(decrypted_cc_buf.as_slice()); + c.meta_data = Some(ColumnMetaData::read_from_in_protocol(&mut prot)?); + } + columns.push(ColumnChunkMetaData::from_thrift(d.clone(), c)?); + } + + let sorting_columns = rg.sorting_columns; + Ok(RowGroupMetaData { + columns, + num_rows, + sorting_columns, + total_byte_size, + schema_descr, + file_offset: rg.file_offset, + ordinal: rg.ordinal, + }) + } + /// Method to convert from Thrift. pub fn from_thrift(schema_descr: SchemaDescPtr, mut rg: RowGroup) -> Result { if schema_descr.num_columns() != rg.columns.len() { @@ -611,10 +711,11 @@ impl RowGroupMetaData { let total_byte_size = rg.total_byte_size; let num_rows = rg.num_rows; let mut columns = vec![]; + for (c, d) in rg.columns.drain(0..).zip(schema_descr.columns()) { - let cc = ColumnChunkMetaData::from_thrift(d.clone(), c)?; - columns.push(cc); + columns.push(ColumnChunkMetaData::from_thrift(d.clone(), c)?); } + let sorting_columns = rg.sorting_columns; Ok(RowGroupMetaData { columns, @@ -1849,7 +1950,11 @@ mod tests { let parquet_meta = ParquetMetaDataBuilder::new(file_metadata.clone()) .set_row_groups(row_group_meta_with_stats) .build(); + + #[cfg(not(feature = "encryption"))] let base_expected_size = 2312; + #[cfg(feature = "encryption")] + let base_expected_size = 2448; assert_eq!(parquet_meta.memory_size(), base_expected_size); @@ -1876,7 +1981,11 @@ mod tests { ]])) .build(); + #[cfg(not(feature = "encryption"))] let bigger_expected_size = 2816; + #[cfg(feature = "encryption")] + let bigger_expected_size = 2952; + // more set fields means more memory usage assert!(bigger_expected_size > base_expected_size); assert_eq!(parquet_meta.memory_size(), bigger_expected_size); diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index d465a49c3544..b80e76d7929a 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -20,13 +20,21 @@ use std::{io::Read, ops::Range, sync::Arc}; use bytes::Bytes; use crate::basic::ColumnOrder; +#[cfg(feature = "encryption")] +use crate::encryption::{ + decrypt::{FileDecryptionProperties, FileDecryptor}, + modules::create_footer_aad, +}; + use crate::errors::{ParquetError, Result}; use crate::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData}; use crate::file::page_index::index::Index; use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index}; use crate::file::reader::ChunkReader; -use crate::file::{FOOTER_SIZE, PARQUET_MAGIC}; +use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER}; use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData}; +#[cfg(feature = "encryption")] +use crate::format::{EncryptionAlgorithm, FileCryptoMetaData as TFileCryptoMetaData}; use crate::schema::types; use crate::schema::types::SchemaDescriptor; use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; @@ -68,6 +76,28 @@ pub struct ParquetMetaDataReader { // Size of the serialized thrift metadata plus the 8 byte footer. Only set if // `self.parse_metadata` is called. metadata_size: Option, + #[cfg(feature = "encryption")] + file_decryption_properties: Option, +} + +/// Describes how the footer metadata is stored +/// +/// This is parsed from the last 8 bytes of the Parquet file +pub struct FooterTail { + metadata_length: usize, + encrypted_footer: bool, +} + +impl FooterTail { + /// The length of the footer metadata in bytes + pub fn metadata_length(&self) -> usize { + self.metadata_length + } + + /// Whether the footer metadata is encrypted + pub fn is_encrypted_footer(&self) -> bool { + self.encrypted_footer + } } impl ParquetMetaDataReader { @@ -126,6 +156,18 @@ impl ParquetMetaDataReader { self } + /// Provide the FileDecryptionProperties to use when decrypting the file. + /// + /// This is only necessary when the file is encrypted. + #[cfg(feature = "encryption")] + pub fn with_decryption_properties( + mut self, + properties: Option<&FileDecryptionProperties>, + ) -> Self { + self.file_decryption_properties = properties.cloned(); + self + } + /// Indicates whether this reader has a [`ParquetMetaData`] internally. pub fn has_metadata(&self) -> bool { self.metadata.is_some() @@ -372,8 +414,9 @@ impl ParquetMetaDataReader { mut fetch: F, file_size: usize, ) -> Result<()> { - let (metadata, remainder) = - Self::load_metadata(&mut fetch, file_size, self.get_prefetch_size()).await?; + let (metadata, remainder) = self + .load_metadata(&mut fetch, file_size, self.get_prefetch_size()) + .await?; self.metadata = Some(metadata); @@ -510,7 +553,8 @@ impl ParquetMetaDataReader { .get_read(file_size - 8)? .read_exact(&mut footer)?; - let metadata_len = Self::decode_footer(&footer)?; + let footer = Self::decode_footer_tail(&footer)?; + let metadata_len = footer.metadata_length(); let footer_metadata_len = FOOTER_SIZE + metadata_len; self.metadata_size = Some(footer_metadata_len); @@ -519,7 +563,10 @@ impl ParquetMetaDataReader { } let start = file_size - footer_metadata_len as u64; - Self::decode_metadata(chunk_reader.get_bytes(start, metadata_len)?.as_ref()) + self.decode_footer_metadata( + chunk_reader.get_bytes(start, metadata_len)?.as_ref(), + &footer, + ) } /// Return the number of bytes to read in the initial pass. If `prefetch_size` has @@ -537,6 +584,7 @@ impl ParquetMetaDataReader { #[cfg(all(feature = "async", feature = "arrow"))] async fn load_metadata( + &self, fetch: &mut F, file_size: usize, prefetch: usize, @@ -564,7 +612,8 @@ impl ParquetMetaDataReader { let mut footer = [0; FOOTER_SIZE]; footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]); - let length = Self::decode_footer(&footer)?; + let footer = Self::decode_footer_tail(&footer)?; + let length = footer.metadata_length(); if file_size < length + FOOTER_SIZE { return Err(eof_err!( @@ -578,53 +627,188 @@ impl ParquetMetaDataReader { if length > suffix_len - FOOTER_SIZE { let metadata_start = file_size - length - FOOTER_SIZE; let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?; - Ok((Self::decode_metadata(&meta)?, None)) + Ok((self.decode_footer_metadata(&meta, &footer)?, None)) } else { let metadata_start = file_size - length - FOOTER_SIZE - footer_start; let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE]; Ok(( - Self::decode_metadata(slice)?, + self.decode_footer_metadata(slice, &footer)?, Some((footer_start, suffix.slice(..metadata_start))), )) } } - /// Decodes the Parquet footer returning the metadata length in bytes + /// Decodes the end of the Parquet footer /// - /// A parquet footer is 8 bytes long and has the following layout: + /// There are 8 bytes at the end of the Parquet footer with the following layout: /// * 4 bytes for the metadata length - /// * 4 bytes for the magic bytes 'PAR1' + /// * 4 bytes for the magic bytes 'PAR1' or 'PARE' (encrypted footer) /// /// ```text - /// +-----+--------+ - /// | len | 'PAR1' | - /// +-----+--------+ + /// +-----+------------------+ + /// | len | 'PAR1' or 'PARE' | + /// +-----+------------------+ /// ``` - pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result { - // check this is indeed a parquet file - if slice[4..] != PARQUET_MAGIC { + pub fn decode_footer_tail(slice: &[u8; FOOTER_SIZE]) -> Result { + let magic = &slice[4..]; + let encrypted_footer = if magic == PARQUET_MAGIC_ENCR_FOOTER { + true + } else if magic == PARQUET_MAGIC { + false + } else { return Err(general_err!("Invalid Parquet file. Corrupt footer")); - } - + }; // get the metadata length from the footer let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap()); - // u32 won't be larger than usize in most cases - Ok(metadata_len as usize) + Ok(FooterTail { + // u32 won't be larger than usize in most cases + metadata_length: metadata_len as usize, + encrypted_footer, + }) + } + + /// Decodes the Parquet footer, returning the metadata length in bytes + #[deprecated(note = "use decode_footer_tail instead")] + pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result { + Self::decode_footer_tail(slice).map(|f| f.metadata_length) } /// Decodes [`ParquetMetaData`] from the provided bytes. /// + /// Typically, this is used to decode the metadata from the end of a parquet + /// file. The format of `buf` is the Thrift compact binary protocol, as specified + /// by the [Parquet Spec]. + /// + /// This method handles using either `decode_metadata` or + /// `decode_metadata_with_encryption` depending on whether the encryption + /// feature is enabled. + /// + /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata + pub(crate) fn decode_footer_metadata( + &self, + buf: &[u8], + footer_tail: &FooterTail, + ) -> Result { + #[cfg(feature = "encryption")] + let result = Self::decode_metadata_with_encryption( + buf, + footer_tail.is_encrypted_footer(), + self.file_decryption_properties.as_ref(), + ); + #[cfg(not(feature = "encryption"))] + let result = { + if footer_tail.is_encrypted_footer() { + Err(general_err!( + "Parquet file has an encrypted footer but the encryption feature is disabled" + )) + } else { + Self::decode_metadata(buf) + } + }; + result + } + + /// Decodes [`ParquetMetaData`] from the provided bytes, handling metadata that may be encrypted. + /// /// Typically this is used to decode the metadata from the end of a parquet - /// file. The format of `buf` is the Thift compact binary protocol, as specified + /// file. The format of `buf` is the Thrift compact binary protocol, as specified + /// by the [Parquet Spec]. Buffer can be encrypted with AES GCM or AES CTR + /// ciphers as specfied in the [Parquet Encryption Spec]. + /// + /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata + /// [Parquet Encryption Spec]: https://parquet.apache.org/docs/file-format/data-pages/encryption/ + #[cfg(feature = "encryption")] + fn decode_metadata_with_encryption( + buf: &[u8], + encrypted_footer: bool, + file_decryption_properties: Option<&FileDecryptionProperties>, + ) -> Result { + let mut prot = TCompactSliceInputProtocol::new(buf); + let mut file_decryptor = None; + let decrypted_fmd_buf; + + if encrypted_footer { + if let Some(file_decryption_properties) = file_decryption_properties { + let t_file_crypto_metadata: TFileCryptoMetaData = + TFileCryptoMetaData::read_from_in_protocol(&mut prot) + .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?; + let decryptor = get_file_decryptor( + t_file_crypto_metadata.encryption_algorithm, + file_decryption_properties, + )?; + let footer_decryptor = decryptor.get_footer_decryptor(); + let aad_footer = create_footer_aad(decryptor.file_aad())?; + + decrypted_fmd_buf = footer_decryptor? + .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref()) + .map_err(|_| { + general_err!( + "Provided footer key and AAD were unable to decrypt parquet footer" + ) + })?; + prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref()); + + file_decryptor = Some(decryptor); + } else { + return Err(general_err!("Parquet file has an encrypted footer but no decryption properties were provided")); + } + } + + let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) + .map_err(|e| general_err!("Could not parse metadata: {}", e))?; + let schema = types::from_thrift(&t_file_metadata.schema)?; + let schema_descr = Arc::new(SchemaDescriptor::new(schema)); + + if let (Some(algo), Some(file_decryption_properties)) = ( + t_file_metadata.encryption_algorithm, + file_decryption_properties, + ) { + // File has a plaintext footer but encryption algorithm is set + file_decryptor = Some(get_file_decryptor(algo, file_decryption_properties)?); + } + + let mut row_groups = Vec::new(); + for rg in t_file_metadata.row_groups { + let r = RowGroupMetaData::from_encrypted_thrift( + schema_descr.clone(), + rg, + file_decryptor.as_ref(), + )?; + row_groups.push(r); + } + let column_orders = + Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?; + + let file_metadata = FileMetaData::new( + t_file_metadata.version, + t_file_metadata.num_rows, + t_file_metadata.created_by, + t_file_metadata.key_value_metadata, + schema_descr, + column_orders, + ); + let mut metadata = ParquetMetaData::new(file_metadata, row_groups); + + metadata.with_file_decryptor(file_decryptor); + + Ok(metadata) + } + + /// Decodes [`ParquetMetaData`] from the provided bytes. + /// + /// Typically this is used to decode the metadata from the end of a parquet + /// file. The format of `buf` is the Thrift compact binary protocol, as specified /// by the [Parquet Spec]. /// /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata pub fn decode_metadata(buf: &[u8]) -> Result { let mut prot = TCompactSliceInputProtocol::new(buf); + let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) .map_err(|e| general_err!("Could not parse metadata: {}", e))?; let schema = types::from_thrift(&t_file_metadata.schema)?; let schema_descr = Arc::new(SchemaDescriptor::new(schema)); + let mut row_groups = Vec::new(); for rg in t_file_metadata.row_groups { row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?); @@ -640,6 +824,7 @@ impl ParquetMetaDataReader { schema_descr, column_orders, ); + Ok(ParquetMetaData::new(file_metadata, row_groups)) } @@ -675,6 +860,30 @@ impl ParquetMetaDataReader { } } +#[cfg(feature = "encryption")] +fn get_file_decryptor( + encryption_algorithm: EncryptionAlgorithm, + file_decryption_properties: &FileDecryptionProperties, +) -> Result { + match encryption_algorithm { + EncryptionAlgorithm::AESGCMV1(algo) => { + let aad_file_unique = algo + .aad_file_unique + .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?; + let aad_prefix = if file_decryption_properties.aad_prefix.is_some() { + file_decryption_properties.aad_prefix.clone().unwrap() + } else { + algo.aad_prefix.unwrap_or_default() + }; + + FileDecryptor::new(file_decryption_properties, aad_file_unique, aad_prefix) + } + EncryptionAlgorithm::AESGCMCTRV1(_) => Err(nyi_err!( + "The AES_GCM_CTR_V1 encryption algorithm is not yet supported" + )), + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/parquet/src/file/mod.rs b/parquet/src/file/mod.rs index 12ff35b51646..b36ef752ae6f 100644 --- a/parquet/src/file/mod.rs +++ b/parquet/src/file/mod.rs @@ -110,3 +110,4 @@ pub mod writer; /// The length of the parquet footer in bytes pub const FOOTER_SIZE: usize = 8; const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1']; +const PARQUET_MAGIC_ENCR_FOOTER: [u8; 4] = [b'P', b'A', b'R', b'E']; diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index dc918f6b5634..a4ffb162d09d 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -169,6 +169,8 @@ pub struct WriterProperties { column_index_truncate_length: Option, statistics_truncate_length: Option, coerce_types: bool, + #[cfg(feature = "encryption")] + file_encryption_properties: Option, } impl Default for WriterProperties { diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 81ba0a66463e..2673f4ac52a7 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -18,14 +18,12 @@ //! Contains implementations of the reader traits FileReader, RowGroupReader and PageReader //! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM) -use std::collections::VecDeque; -use std::iter; -use std::{fs::File, io::Read, path::Path, sync::Arc}; - use crate::basic::{Encoding, Type}; use crate::bloom_filter::Sbbf; use crate::column::page::{Page, PageMetadata, PageReader}; use crate::compression::{create_codec, Codec}; +#[cfg(feature = "encryption")] +use crate::encryption::decrypt::{read_and_decrypt, CryptoContext}; use crate::errors::{ParquetError, Result}; use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::{ @@ -40,6 +38,9 @@ use crate::record::Row; use crate::schema::types::Type as SchemaType; use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; use bytes::Bytes; +use std::collections::VecDeque; +use std::iter; +use std::{fs::File, io::Read, path::Path, sync::Arc}; use thrift::protocol::TCompactInputProtocol; impl TryFrom for SerializedFileReader { @@ -340,11 +341,53 @@ impl RowGroupReader for SerializedRowGroupReader<'_, R /// Reads a [`PageHeader`] from the provided [`Read`] pub(crate) fn read_page_header(input: &mut T) -> Result { let mut prot = TCompactInputProtocol::new(input); - let page_header = PageHeader::read_from_in_protocol(&mut prot)?; - Ok(page_header) + Ok(PageHeader::read_from_in_protocol(&mut prot)?) +} + +#[cfg(feature = "encryption")] +pub(crate) fn read_encrypted_page_header( + input: &mut T, + crypto_context: Arc, +) -> Result { + let data_decryptor = crypto_context.data_decryptor(); + let aad = crypto_context.create_page_header_aad()?; + + let buf = read_and_decrypt(data_decryptor, input, aad.as_ref())?; + + let mut prot = TCompactSliceInputProtocol::new(buf.as_slice()); + Ok(PageHeader::read_from_in_protocol(&mut prot)?) +} + +/// Reads a [`PageHeader`] from the provided [`Read`] returning the number of bytes read. +/// If the page header is encrypted [`CryptoContext`] must be provided. +#[cfg(feature = "encryption")] +fn read_encrypted_page_header_len( + input: &mut T, + crypto_context: Option>, +) -> Result<(usize, PageHeader)> { + /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read + struct TrackedRead { + inner: R, + bytes_read: usize, + } + + impl Read for TrackedRead { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let v = self.inner.read(buf)?; + self.bytes_read += v; + Ok(v) + } + } + + let mut tracked = TrackedRead { + inner: input, + bytes_read: 0, + }; + let header = read_encrypted_page_header(&mut tracked, crypto_context.unwrap())?; + Ok((tracked.bytes_read, header)) } -/// Reads a [`PageHeader`] from the provided [`Read`] returning the number of bytes read +/// Reads a [`PageHeader`] from the provided [`Read`] returning the number of bytes read. fn read_page_header_len(input: &mut T) -> Result<(usize, PageHeader)> { /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read struct TrackedRead { @@ -489,6 +532,12 @@ enum SerializedPageReaderState { // If the next page header has already been "peeked", we will cache it and it`s length here next_page_header: Option>, + + /// The index of the data page within this column chunk + page_ordinal: usize, + + /// Whether the next page is expected to be a dictionary page + require_dictionary: bool, }, Pages { /// Remaining page locations @@ -512,6 +561,10 @@ pub struct SerializedPageReader { physical_type: Type, state: SerializedPageReaderState, + + /// Crypto context carrying objects required for decryption + #[cfg(feature = "encryption")] + crypto_context: Option>, } impl SerializedPageReader { @@ -526,6 +579,16 @@ impl SerializedPageReader { SerializedPageReader::new_with_properties(reader, meta, total_rows, page_locations, props) } + /// Adds cryptographical information to the reader. + #[cfg(feature = "encryption")] + pub(crate) fn with_crypto_context( + mut self, + crypto_context: Option>, + ) -> Self { + self.crypto_context = crypto_context; + self + } + /// Creates a new serialized page with custom options. pub fn new_with_properties( reader: Arc, @@ -558,14 +621,17 @@ impl SerializedPageReader { offset: start as usize, remaining_bytes: len as usize, next_page_header: None, + page_ordinal: 0, + require_dictionary: meta.dictionary_page_offset().is_some(), }, }; - Ok(Self { reader, decompressor, state, physical_type: meta.column_type(), + #[cfg(feature = "encryption")] + crypto_context: None, }) } @@ -581,6 +647,7 @@ impl SerializedPageReader { offset, remaining_bytes, next_page_header, + .. } => { loop { if *remaining_bytes == 0 { @@ -664,6 +731,8 @@ impl PageReader for SerializedPageReader { offset, remaining_bytes: remaining, next_page_header, + page_ordinal, + require_dictionary, } => { if *remaining == 0 { return Ok(None); @@ -673,7 +742,21 @@ impl PageReader for SerializedPageReader { let header = if let Some(header) = next_page_header.take() { *header } else { + #[cfg(feature = "encryption")] + let (header_len, header) = if self.crypto_context.is_some() { + let crypto_context = page_crypto_context( + &self.crypto_context, + *page_ordinal, + *require_dictionary, + )?; + read_encrypted_page_header_len(&mut read, crypto_context)? + } else { + read_page_header_len(&mut read)? + }; + + #[cfg(not(feature = "encryption"))] let (header_len, header) = read_page_header_len(&mut read)?; + verify_page_header_len(header_len, *remaining)?; *offset += header_len; *remaining -= header_len; @@ -703,12 +786,33 @@ impl PageReader for SerializedPageReader { )); } - decode_page( + #[cfg(feature = "encryption")] + let crypto_context = page_crypto_context( + &self.crypto_context, + *page_ordinal, + *require_dictionary, + )?; + #[cfg(feature = "encryption")] + let buffer: Vec = if let Some(crypto_context) = crypto_context { + let decryptor = crypto_context.data_decryptor(); + let aad = crypto_context.create_page_aad()?; + decryptor.decrypt(buffer.as_ref(), &aad)? + } else { + buffer + }; + + let page = decode_page( header, Bytes::from(buffer), self.physical_type, self.decompressor.as_mut(), - )? + )?; + if page.is_data_page() { + *page_ordinal += 1; + } else if page.is_dictionary_page() { + *require_dictionary = false; + } + page } SerializedPageReaderState::Pages { page_locations, @@ -751,6 +855,7 @@ impl PageReader for SerializedPageReader { offset, remaining_bytes, next_page_header, + .. } => { loop { if *remaining_bytes == 0 { @@ -816,6 +921,7 @@ impl PageReader for SerializedPageReader { offset, remaining_bytes, next_page_header, + .. } => { if let Some(buffered_header) = next_page_header.take() { verify_page_size( @@ -857,6 +963,21 @@ impl PageReader for SerializedPageReader { } } +#[cfg(feature = "encryption")] +fn page_crypto_context( + crypto_context: &Option>, + page_ordinal: usize, + dictionary_page: bool, +) -> Result>> { + Ok(crypto_context.as_ref().map(|c| { + Arc::new(if dictionary_page { + c.for_dictionary_page() + } else { + c.with_page_ordinal(page_ordinal) + }) + })) +} + #[cfg(test)] mod tests { use std::collections::HashSet; diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 6b7707f03cd9..2fa2d2dcf910 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -1314,6 +1314,8 @@ mod tests { total_num_values as usize, None, Arc::new(props), + #[cfg(feature = "encryption")] + None, ) .unwrap(); diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs index 3ca0dbe98791..6a9f9947195b 100644 --- a/parquet/src/lib.rs +++ b/parquet/src/lib.rs @@ -140,6 +140,10 @@ pub mod column; experimental!(mod compression); experimental!(mod encodings); pub mod bloom_filter; + +#[cfg(feature = "encryption")] +experimental!(mod encryption); + pub mod file; pub mod record; pub mod schema; diff --git a/parquet/src/util/test_common/encryption_util.rs b/parquet/src/util/test_common/encryption_util.rs new file mode 100644 index 000000000000..2f6e5bc45636 --- /dev/null +++ b/parquet/src/util/test_common/encryption_util.rs @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::arrow_reader::{ + ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, +}; +use crate::arrow::ParquetRecordBatchStreamBuilder; +use crate::encryption::decrypt::FileDecryptionProperties; +use crate::errors::ParquetError; +use crate::file::metadata::FileMetaData; +use arrow_array::cast::AsArray; +use arrow_array::{types, RecordBatch}; +use futures::TryStreamExt; +use std::fs::File; + +pub(crate) fn verify_encryption_test_file_read( + file: File, + decryption_properties: FileDecryptionProperties, +) { + let options = ArrowReaderOptions::default() + .with_file_decryption_properties(decryption_properties.clone()); + let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap(); + let file_metadata = metadata.metadata.file_metadata(); + + let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap(); + let record_reader = builder.build().unwrap(); + let record_batches = record_reader + .map(|x| x.unwrap()) + .collect::>(); + + verify_encryption_test_data(record_batches, file_metadata.clone(), metadata); +} + +pub(crate) async fn verify_encryption_test_file_read_async( + file: &mut tokio::fs::File, + decryption_properties: FileDecryptionProperties, +) -> Result<(), ParquetError> { + let options = ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties); + + let metadata = ArrowReaderMetadata::load_async(file, options.clone()).await?; + let arrow_reader_metadata = ArrowReaderMetadata::load_async(file, options).await?; + let file_metadata = metadata.metadata.file_metadata(); + + let record_reader = ParquetRecordBatchStreamBuilder::new_with_metadata( + file.try_clone().await?, + arrow_reader_metadata.clone(), + ) + .build()?; + let record_batches = record_reader.try_collect::>().await?; + + verify_encryption_test_data(record_batches, file_metadata.clone(), metadata); + Ok(()) +} + +/// Tests reading an encrypted file from the parquet-testing repository +fn verify_encryption_test_data( + record_batches: Vec, + file_metadata: FileMetaData, + metadata: ArrowReaderMetadata, +) { + assert_eq!(file_metadata.num_rows(), 50); + assert_eq!(file_metadata.schema_descr().num_columns(), 8); + + metadata.metadata.row_groups().iter().for_each(|rg| { + assert_eq!(rg.num_columns(), 8); + assert_eq!(rg.num_rows(), 50); + }); + + let mut row_count = 0; + for batch in record_batches { + let batch = batch; + row_count += batch.num_rows(); + + let bool_col = batch.column(0).as_boolean(); + let time_col = batch + .column(1) + .as_primitive::(); + let list_col = batch.column(2).as_list::(); + let timestamp_col = batch + .column(3) + .as_primitive::(); + let f32_col = batch.column(4).as_primitive::(); + let f64_col = batch.column(5).as_primitive::(); + let binary_col = batch.column(6).as_binary::(); + let fixed_size_binary_col = batch.column(7).as_fixed_size_binary(); + + for (i, x) in bool_col.iter().enumerate() { + assert_eq!(x.unwrap(), i % 2 == 0); + } + for (i, x) in time_col.iter().enumerate() { + assert_eq!(x.unwrap(), i as i32); + } + for (i, list_item) in list_col.iter().enumerate() { + let list_item = list_item.unwrap(); + let list_item = list_item.as_primitive::(); + assert_eq!(list_item.len(), 2); + assert_eq!(list_item.value(0), ((i * 2) * 1000000000000) as i64); + assert_eq!(list_item.value(1), ((i * 2 + 1) * 1000000000000) as i64); + } + for x in timestamp_col.iter() { + assert!(x.is_some()); + } + for (i, x) in f32_col.iter().enumerate() { + assert_eq!(x.unwrap(), i as f32 * 1.1f32); + } + for (i, x) in f64_col.iter().enumerate() { + assert_eq!(x.unwrap(), i as f64 * 1.1111111f64); + } + for (i, x) in binary_col.iter().enumerate() { + assert_eq!(x.is_some(), i % 2 == 0); + if let Some(x) = x { + assert_eq!(&x[0..7], b"parquet"); + } + } + for (i, x) in fixed_size_binary_col.iter().enumerate() { + assert_eq!(x.unwrap(), &[i as u8; 10]); + } + } + + assert_eq!(row_count, file_metadata.num_rows() as usize); +} diff --git a/parquet/src/util/test_common/mod.rs b/parquet/src/util/test_common/mod.rs index 8cfc1e6dd423..ac36118c3702 100644 --- a/parquet/src/util/test_common/mod.rs +++ b/parquet/src/util/test_common/mod.rs @@ -22,3 +22,6 @@ pub mod file_util; #[cfg(test)] pub mod rand_gen; + +#[cfg(all(test, feature = "encryption", feature = "arrow"))] +pub mod encryption_util; diff --git a/parquet/tests/arrow_writer_layout.rs b/parquet/tests/arrow_writer_layout.rs index 9a66d13f84d7..9297b8d13f07 100644 --- a/parquet/tests/arrow_writer_layout.rs +++ b/parquet/tests/arrow_writer_layout.rs @@ -141,6 +141,8 @@ fn assert_layout(file_reader: &Bytes, meta: &ParquetMetaData, layout: &Layout) { row_group.num_rows() as usize, None, Arc::new(properties), + #[cfg(feature = "encryption")] + None, ) .unwrap();