From 3e7646d8ae060adcb78c8fdfa94efe43956d7ff4 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 9 Jan 2025 22:59:19 +0100 Subject: [PATCH] work --- parquet/src/arrow/arrow_reader/mod.rs | 10 +++++----- parquet/src/encryption/ciphers.rs | 26 ++++++++++++++++---------- parquet/src/file/metadata/mod.rs | 4 ++++ parquet/src/file/serialized_reader.rs | 20 +++++++++----------- 4 files changed, 34 insertions(+), 26 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index a2124b1fada2..d5133e243e36 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -710,18 +710,18 @@ impl Iterator for ReaderPageIterator { #[cfg(feature = "encryption")] let crypto_context = if self.metadata.file_decryptor().is_some() { - let file_decryptor = Arc::new(self.metadata.file_decryptor().clone().unwrap()); - let metadata_decryptor = Arc::new(self.metadata.file_decryptor().clone().unwrap()); let column_name = self .metadata .file_metadata() .schema_descr() .column(self.column_idx); - let data_decryptor = - Arc::new(file_decryptor.get_column_decryptor(column_name.name().as_bytes())); + + let file_decryptor = self.metadata.file_decryptor().clone().unwrap().get_column_decryptor(column_name.name().as_bytes()); + let data_decryptor = Arc::new(file_decryptor.clone()); + let metadata_decryptor = Arc::new(file_decryptor.clone()); let crypto_context = - CryptoContext::new(rg_idx, self.column_idx, metadata_decryptor, data_decryptor); + CryptoContext::new(rg_idx, self.column_idx, data_decryptor, metadata_decryptor); Some(Arc::new(crypto_context)) } else { None diff --git a/parquet/src/encryption/ciphers.rs b/parquet/src/encryption/ciphers.rs index 52680169eb55..ef5d122dc442 100644 --- a/parquet/src/encryption/ciphers.rs +++ b/parquet/src/encryption/ciphers.rs @@ -329,23 +329,29 @@ impl FileDecryptor { RingGcmBlockDecryptor::new(self.decryption_properties.footer_key.as_ref().unwrap()) } + pub(crate) fn has_column_key(&self, column_name: &[u8]) -> bool { + self.decryption_properties.column_keys.clone().unwrap().contains_key(column_name) + } + pub(crate) fn get_column_decryptor(&self, column_name: &[u8]) -> FileDecryptor { - if self.decryption_properties.column_keys.is_none() { + if self.decryption_properties.column_keys.is_none() || !self.has_column_key(column_name) { return self.clone(); } let column_keys = &self.decryption_properties.column_keys.clone().unwrap(); - let decryptor = if let Some(column_key) = column_keys.get(column_name) { - Some(RingGcmBlockDecryptor::new(&column_key)) + let decryption_properties = if let Some(column_key) = column_keys.get(column_name) { + DecryptionPropertiesBuilder::with_defaults() + .with_footer_key(column_key.clone()) + .with_aad_prefix(self.aad_prefix.clone()) + .build() } else { - None + self.decryption_properties.clone() }; - FileDecryptor { - decryption_properties: self.decryption_properties.clone(), - footer_decryptor: decryptor, - aad_file_unique: self.aad_file_unique.clone(), - aad_prefix: self.aad_prefix.clone(), - } + FileDecryptor::new( + &decryption_properties, + self.aad_file_unique.clone(), + self.aad_prefix.clone(), + ) } pub(crate) fn decryption_properties(&self) -> &FileDecryptionProperties { diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 8ac9a096f36c..530978050018 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -661,6 +661,10 @@ impl RowGroupMetaData { cc = ColumnChunkMetaData::from_thrift(d.clone(), c)?; } else { let column_name = crypto_metadata.path_in_schema.join("."); + if !decryptor.unwrap().has_column_key(&column_name.as_bytes()) { + cc = ColumnChunkMetaData::from_thrift(d.clone(), c)?; + break; + } let aad_file_unique = decryptor.unwrap().aad_file_unique(); let aad_prefix = decryptor .unwrap() diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 4131fcac2478..dae37d35ac0d 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -350,8 +350,15 @@ pub(crate) fn read_page_header( #[cfg(feature = "encryption")] if let Some(crypto_context) = crypto_context { let decryptor = &crypto_context.data_decryptor(); + // todo: in case of per-column key, decryptor should be column decryptor + if !decryptor.has_footer_key() || !decryptor.footer_decryptor().is_some() { + let mut prot = TCompactInputProtocol::new(input); + let page_header = PageHeader::read_from_in_protocol(&mut prot)?; + return Ok(page_header) + }; - let file_decryptor = decryptor.column_decryptor(); + // let file_decryptor = decryptor.column_decryptor(); + let data_decryptor = &crypto_context.data_decryptor(); let aad_file_unique = decryptor.aad_file_unique(); let aad_prefix = decryptor.aad_prefix(); @@ -370,19 +377,10 @@ pub(crate) fn read_page_header( crypto_context.page_ordinal, )?; - // let mut len_bytes = [0; 4]; - // input.read_exact(&mut len_bytes)?; - // let ciphertext_len = u32::from_le_bytes(len_bytes) as usize; - // let mut ciphertext = vec![0; 4 + ciphertext_len]; - // input.read_exact(&mut ciphertext[4..])?; - // let mut ciphertext = Vec::new(); - // input.read_to_end(&mut ciphertext)?; - let mut ciphertext: Vec = vec![]; input.read_to_end(&mut ciphertext)?; - // let ciphertext = input.read_to_end(); - let buf = file_decryptor.decrypt(&ciphertext, aad.as_ref())?; + let buf = data_decryptor.footer_decryptor().unwrap().decrypt(&ciphertext, aad.as_ref())?; let mut prot = TCompactSliceInputProtocol::new(buf.as_slice()); let page_header = PageHeader::read_from_in_protocol(&mut prot)?;