diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 41531432742b..1ee3a464f8cd 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -384,7 +384,7 @@ impl ArrowReaderMetadata { pub fn load( reader: &T, options: ArrowReaderOptions, - file_decryption_properties: Option, + file_decryption_properties: Option<&FileDecryptionProperties>, ) -> Result { let metadata = ParquetMetaDataReader::new() .with_page_indexes(options.page_index) @@ -543,7 +543,7 @@ impl ParquetRecordBatchReaderBuilder { pub fn try_new_with_decryption( reader: T, options: ArrowReaderOptions, - file_decryption_properties: Option, + file_decryption_properties: Option<&FileDecryptionProperties>, ) -> Result { let metadata = ArrowReaderMetadata::load(&reader, options, file_decryption_properties)?; Ok(Self::new_with_metadata(reader, metadata)) @@ -809,10 +809,11 @@ impl ParquetRecordBatchReader { /// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader and [`FileDecryptionProperties`] /// /// Note: this is needed when the parquet file is encrypted + // todo: add options or put file_decryption_properties into options pub fn try_new_with_decryption( reader: T, batch_size: usize, - file_decryption_properties: Option, + file_decryption_properties: Option<&FileDecryptionProperties>, ) -> Result { ParquetRecordBatchReaderBuilder::try_new_with_decryption( reader, @@ -1713,7 +1714,7 @@ mod tests { .build(), ); - let metadata = ArrowReaderMetadata::load(&file, Default::default(), decryption_properties.clone()).unwrap(); + let metadata = ArrowReaderMetadata::load(&file, Default::default(), decryption_properties.as_ref()).unwrap(); let file_metadata = metadata.metadata.file_metadata(); assert_eq!(file_metadata.num_rows(), 50); @@ -1727,9 +1728,27 @@ mod tests { }); // todo: decrypting data - // let record_reader = - // ParquetRecordBatchReader::try_new_with_decryption(file, 128, decryption_properties) - // .unwrap(); + let record_reader = + ParquetRecordBatchReader::try_new_with_decryption(file, 128, decryption_properties.as_ref()) + .unwrap(); + // todo check contents + let mut row_count = 0; + for batch in record_reader { + let batch = batch.unwrap(); + row_count += batch.num_rows(); + let f32_col = batch.column(0).as_primitive::(); + let f64_col = batch.column(1).as_primitive::(); + + // This file contains floats from a standard normal distribution + for &x in f32_col.values() { + assert!(x > -10.0); + assert!(x < 10.0); + } + for &x in f64_col.values() { + assert!(x > -10.0); + assert!(x < 10.0); + } + } } #[test] diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 9bd79840f760..2c7976f3fb3f 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -2027,6 +2027,7 @@ mod tests { r.rows_written as usize, None, Arc::new(props), + None, ) .unwrap(); @@ -2079,6 +2080,7 @@ mod tests { r.rows_written as usize, None, Arc::new(props), + None, ) .unwrap(); @@ -2214,6 +2216,7 @@ mod tests { r.rows_written as usize, None, Arc::new(props), + None, ) .unwrap(), ); @@ -3484,6 +3487,7 @@ mod tests { result.rows_written as usize, None, Arc::new(props), + None, ) .unwrap(), ); diff --git a/parquet/src/encryption/ciphers.rs b/parquet/src/encryption/ciphers.rs index 2b7ffb933281..b4b7f47df5b0 100644 --- a/parquet/src/encryption/ciphers.rs +++ b/parquet/src/encryption/ciphers.rs @@ -119,6 +119,7 @@ impl BlockEncryptor for RingGcmBlockEncryptor { } } +#[derive(Debug, Clone)] pub(crate) struct RingGcmBlockDecryptor { key: LessSafeKey, } @@ -226,7 +227,7 @@ fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal Ok(aad) } -#[derive(Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct FileDecryptionProperties { footer_key: Option> } @@ -261,18 +262,25 @@ impl DecryptionPropertiesBuilder { } } +#[derive(Debug, Clone)] pub struct FileDecryptor { decryption_properties: FileDecryptionProperties, // todo decr: change to BlockDecryptor footer_decryptor: RingGcmBlockDecryptor } +impl PartialEq for FileDecryptor { + fn eq(&self, other: &Self) -> bool { + self.decryption_properties == other.decryption_properties + } +} + impl FileDecryptor { - pub(crate) fn new(decryption_properties: FileDecryptionProperties) -> Self { + pub(crate) fn new(decryption_properties: &FileDecryptionProperties) -> Self { Self { // todo decr: if no key available yet (not set in properties, will be retrieved from metadata) footer_decryptor: RingGcmBlockDecryptor::new(decryption_properties.footer_key.clone().unwrap().as_ref()), - decryption_properties + decryption_properties: decryption_properties.clone() } } @@ -281,3 +289,17 @@ impl FileDecryptor { self.footer_decryptor } } + +pub struct CryptoContext { + row_group_ordinal: i32, + column_ordinal: i32, + metadata_decryptor: FileDecryptor, + data_decryptor: FileDecryptor, + file_decryption_properties: FileDecryptionProperties, + aad: Vec, +} + +impl CryptoContext { + pub fn data_decryptor(self) -> FileDecryptor { self.data_decryptor } + pub fn file_decryption_properties(&self) -> &FileDecryptionProperties { &self.file_decryption_properties } +} diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs index 3192eac4cde0..af34fafb2e81 100644 --- a/parquet/src/file/footer.rs +++ b/parquet/src/file/footer.rs @@ -60,7 +60,7 @@ pub fn parse_metadata(chunk_reader: &R) -> Result, + file_decryption_properties: Option<&FileDecryptionProperties>, ) -> Result { ParquetMetaDataReader::decode_metadata(buf, file_decryption_properties) } diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 32b985710023..b762548eb202 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -117,6 +117,7 @@ use crate::schema::types::{ pub use reader::ParquetMetaDataReader; pub use writer::ParquetMetaDataWriter; pub(crate) use writer::ThriftMetadataWriter; +use crate::encryption::ciphers::FileDecryptor; /// Page level statistics for each column chunk of each row group. /// @@ -174,15 +175,18 @@ pub struct ParquetMetaData { column_index: Option, /// Offset index for each page in each column chunk offset_index: Option, + /// Optional file decryptor + file_decryptor: Option, } impl ParquetMetaData { /// Creates Parquet metadata from file metadata and a list of row /// group metadata - pub fn new(file_metadata: FileMetaData, row_groups: Vec) -> Self { + pub fn new(file_metadata: FileMetaData, row_groups: Vec, file_decryptor: Option) -> Self { ParquetMetaData { file_metadata, row_groups, + file_decryptor, column_index: None, offset_index: None, } @@ -337,7 +341,7 @@ pub struct ParquetMetaDataBuilder(ParquetMetaData); impl ParquetMetaDataBuilder { /// Create a new builder from a file metadata, with no row groups pub fn new(file_meta_data: FileMetaData) -> Self { - Self(ParquetMetaData::new(file_meta_data, vec![])) + Self(ParquetMetaData::new(file_meta_data, vec![], None)) } /// Create a new builder from an existing ParquetMetaData @@ -540,6 +544,8 @@ pub struct RowGroupMetaData { ordinal: Option, } +// todo:rok + impl RowGroupMetaData { /// Returns builder for row group metadata. pub fn builder(schema_descr: SchemaDescPtr) -> RowGroupMetaDataBuilder { @@ -1861,7 +1867,7 @@ mod tests { let parquet_meta = ParquetMetaDataBuilder::new(file_metadata.clone()) .set_row_groups(row_group_meta_with_stats) .build(); - let base_expected_size = 2312; + let base_expected_size = 2896; assert_eq!(parquet_meta.memory_size(), base_expected_size); @@ -1888,7 +1894,7 @@ mod tests { ]])) .build(); - let bigger_expected_size = 2816; + let bigger_expected_size = 3400; // more set fields means more memory usage assert!(bigger_expected_size > base_expected_size); assert_eq!(parquet_meta.memory_size(), bigger_expected_size); diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 96d19fbfb83d..a8686e695adb 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -138,9 +138,9 @@ impl ParquetMetaDataReader { /// This is only necessary when the file is encrypted. pub fn with_encryption_properties( mut self, - properties: Option, + properties: Option<&FileDecryptionProperties>, ) -> Self { - self.file_decryption_properties = properties; + self.file_decryption_properties = properties.cloned(); self } @@ -364,7 +364,7 @@ impl ParquetMetaDataReader { &mut fetch, file_size, self.get_prefetch_size(), - self.file_decryption_properties.clone(), + self.file_decryption_properties.as_ref(), ) .await?; @@ -532,7 +532,7 @@ impl ParquetMetaDataReader { let start = file_size - footer_metadata_len as u64; Self::decode_metadata( chunk_reader.get_bytes(start, metadata_len)?.as_ref(), - self.file_decryption_properties.clone(), + self.file_decryption_properties.as_ref(), ) } @@ -554,7 +554,7 @@ impl ParquetMetaDataReader { fetch: &mut F, file_size: usize, prefetch: usize, - file_decryption_properties: Option, + file_decryption_properties: Option<&FileDecryptionProperties>, ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> { if file_size < FOOTER_SIZE { return Err(eof_err!("file size of {} is less than footer", file_size)); @@ -639,10 +639,11 @@ impl ParquetMetaDataReader { /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata pub fn decode_metadata( buf: &[u8], - file_decryption_properties: Option, + file_decryption_properties: Option<&FileDecryptionProperties>, ) -> Result { let mut prot = TCompactSliceInputProtocol::new(buf); + let mut file_decryptor = None; let decrypted_fmd_buf; if let Some(file_decryption_properties) = file_decryption_properties { let t_file_crypto_metadata: TFileCryptoMetaData = @@ -658,13 +659,13 @@ impl ParquetMetaDataReader { // todo decr: get key_metadata // remaining buffer contains encrypted FileMetaData - let file_decryptor = FileDecryptor::new(file_decryption_properties); - let decryptor = file_decryptor.get_footer_decryptor(); + file_decryptor = Some(FileDecryptor::new(file_decryption_properties)); + let decryptor = file_decryptor.clone().unwrap().get_footer_decryptor(); // todo decr: get aad_prefix // todo decr: set both aad_prefix and aad_file_unique in file_decryptor let fmd_aad = create_footer_aad(aes_gcm_algo.aad_file_unique.unwrap().as_ref()); decrypted_fmd_buf = - decryptor.decrypt(prot.as_slice().as_ref(), fmd_aad.unwrap().as_ref()); + decryptor.decrypt(prot.as_slice().as_ref(), fmd_aad?.as_ref()); prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref()); } @@ -693,7 +694,7 @@ impl ParquetMetaDataReader { schema_descr, column_orders, ); - Ok(ParquetMetaData::new(file_metadata, row_groups)) + Ok(ParquetMetaData::new(file_metadata, row_groups, file_decryptor)) } /// Parses column orders from Thrift definition. diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 3262d1fba704..57411264773e 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -21,7 +21,6 @@ use std::collections::VecDeque; use std::iter; use std::{fs::File, io::Read, path::Path, sync::Arc}; - use crate::basic::{Encoding, Type}; use crate::bloom_filter::Sbbf; use crate::column::page::{Page, PageMetadata, PageReader}; @@ -34,13 +33,14 @@ use crate::file::{ reader::*, statistics, }; -use crate::format::{PageHeader, PageLocation, PageType}; +use crate::format::{PageHeader, PageLocation, PageType, FileCryptoMetaData as TFileCryptoMetaData, EncryptionAlgorithm}; use crate::record::reader::RowIter; use crate::record::Row; use crate::schema::types::Type as SchemaType; use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use thrift::protocol::TCompactInputProtocol; +use crate::encryption::ciphers::{create_footer_aad, BlockDecryptor, CryptoContext, FileDecryptionProperties, FileDecryptor, RingGcmBlockDecryptor}; impl TryFrom for SerializedFileReader { type Error = ParquetError; @@ -324,6 +324,7 @@ impl RowGroupReader for SerializedRowGroupReader<'_, R self.metadata.num_rows() as usize, page_locations, props, + None, )?)) } @@ -338,14 +339,37 @@ impl RowGroupReader for SerializedRowGroupReader<'_, R } /// Reads a [`PageHeader`] from the provided [`Read`] -pub(crate) fn read_page_header(input: &mut T) -> Result { - let mut prot = TCompactInputProtocol::new(input); +pub(crate) fn read_page_header(input: &mut T, crypto_context: Option<&CryptoContext>) -> Result { + let buf = &mut []; + let size = input.read(buf)?; + + // todo: decrypt buffer + let mut prot = TCompactSliceInputProtocol::new(buf.as_slice()); + let t_file_crypto_metadata: TFileCryptoMetaData = + TFileCryptoMetaData::read_from_in_protocol(&mut prot) + .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?; + + let file_decryption_properties = crypto_context.unwrap().file_decryption_properties(); + let file_decryptor = FileDecryptor::new(file_decryption_properties); + + // let fmd_aad = create_footer_aad(aes_gcm_algo.aad_file_unique.unwrap().as_ref()); + let algo = t_file_crypto_metadata.encryption_algorithm; + let aes_gcm_algo = if let EncryptionAlgorithm::AESGCMV1(a) = algo { + a + } else { + unreachable!() + }; // todo decr: add support for GCMCTRV1 + let fmd_aad = create_footer_aad(aes_gcm_algo.aad_file_unique.unwrap().as_ref()); + let buf2 = file_decryptor.get_footer_decryptor().decrypt(prot.as_slice().as_ref(), fmd_aad?.as_ref()); + + let mut prot = TCompactInputProtocol::new(buf2.reader()); + let page_header = PageHeader::read_from_in_protocol(&mut prot)?; Ok(page_header) } /// Reads a [`PageHeader`] from the provided [`Read`] returning the number of bytes read -fn read_page_header_len(input: &mut T) -> Result<(usize, PageHeader)> { +fn read_page_header_len(input: &mut T, crypto_context: Option<&CryptoContext>) -> Result<(usize, PageHeader)> { /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read struct TrackedRead { inner: R, @@ -364,7 +388,7 @@ fn read_page_header_len(input: &mut T) -> Result<(usize, PageHeader)> { inner: input, bytes_read: 0, }; - let header = read_page_header(&mut tracked)?; + let header = read_page_header(&mut tracked, crypto_context)?; Ok((tracked.bytes_read, header)) } @@ -512,6 +536,9 @@ pub struct SerializedPageReader { physical_type: Type, state: SerializedPageReaderState, + + /// Crypto context + crypto_context: Option<&'static CryptoContext>, } impl SerializedPageReader { @@ -523,7 +550,7 @@ impl SerializedPageReader { page_locations: Option>, ) -> Result { let props = Arc::new(ReaderProperties::builder().build()); - SerializedPageReader::new_with_properties(reader, meta, total_rows, page_locations, props) + SerializedPageReader::new_with_properties(reader, meta, total_rows, page_locations, props, None) } /// Creates a new serialized page with custom options. @@ -533,6 +560,7 @@ impl SerializedPageReader { total_rows: usize, page_locations: Option>, props: ReaderPropertiesPtr, + crypto_context: Option<&'static CryptoContext>, ) -> Result { let decompressor = create_codec(meta.compression(), props.codec_options())?; let (start, len) = meta.byte_range(); @@ -560,12 +588,21 @@ impl SerializedPageReader { next_page_header: None, }, }; - + if crypto_context.is_some() { + return Ok(Self { + reader, + decompressor, + state, + physical_type: meta.column_type(), + crypto_context, + }) + } Ok(Self { reader, decompressor, state, physical_type: meta.column_type(), + crypto_context: None, }) } } @@ -592,10 +629,26 @@ impl PageReader for SerializedPageReader { } let mut read = self.reader.get_read(*offset as u64)?; + // let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref()); + + // let decrypted_fmd_buf = + // decryptor.decrypt(prot.as_slice().as_ref(), fmd_aad?.as_ref()); + + // if let Some(z) = self.crypto_context.as_ref() { + // let c = read.take(1); + // // read = z.get_data_decryptor().decrypt(&read, b"aaaaa"); + // // let (header_len, header) = read_page_header_len(&mut read)?; + // // header + // // let dec = z.get_data_decryptor().decrypt(header_len, header); + // } + // let file_decryptor = self.crypto_context.unwrap().get_data_decryptor().unwrap(); + let file_decryption_properties = + FileDecryptionProperties::builder().with_footer_key("0123456789012345".into()).build(); + // let file_decryptor = FileDecryptor::new(&file_decryption_properties); let header = if let Some(header) = next_page_header.take() { *header } else { - let (header_len, header) = read_page_header_len(&mut read)?; + let (header_len, header) = read_page_header_len(&mut read, self.crypto_context)?; *offset += header_len; *remaining -= header_len; header @@ -682,7 +735,7 @@ impl PageReader for SerializedPageReader { } } else { let mut read = self.reader.get_read(*offset as u64)?; - let (header_len, header) = read_page_header_len(&mut read)?; + let (header_len, header) = read_page_header_len(&mut read, None)?; *offset += header_len; *remaining_bytes -= header_len; let page_meta = if let Ok(page_meta) = (&header).try_into() { @@ -738,7 +791,7 @@ impl PageReader for SerializedPageReader { *remaining_bytes -= buffered_header.compressed_page_size as usize; } else { let mut read = self.reader.get_read(*offset as u64)?; - let (header_len, header) = read_page_header_len(&mut read)?; + let (header_len, header) = read_page_header_len(&mut read, None)?; let data_page_size = header.compressed_page_size as usize; *offset += header_len + data_page_size; *remaining_bytes -= header_len + data_page_size; diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index b84c57a60e19..6011795a93be 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -1314,6 +1314,7 @@ mod tests { total_num_values as usize, None, Arc::new(props), + None, ) .unwrap();