Skip to content

Commit 29d55eb

Browse files
committed
work
1 parent 9d17990 commit 29d55eb

File tree

6 files changed

+137
-79
lines changed

6 files changed

+137
-79
lines changed

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
2020
use std::collections::VecDeque;
2121
use std::sync::Arc;
22-
22+
use num::ToPrimitive;
2323
use arrow_array::cast::AsArray;
2424
use arrow_array::Array;
2525
use arrow_array::{RecordBatch, RecordBatchReader};
@@ -42,7 +42,7 @@ mod filter;
4242
mod selection;
4343
pub mod statistics;
4444

45-
use crate::encryption::ciphers::FileDecryptionProperties;
45+
use crate::encryption::ciphers::{CryptoContext, FileDecryptionProperties};
4646

4747
/// Builder for constructing parquet readers into arrow.
4848
///
@@ -695,7 +695,18 @@ impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
695695
let total_rows = rg.num_rows() as usize;
696696
let reader = self.reader.clone();
697697

698-
let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations);
698+
let file_decryptor = Arc::new(self.metadata.file_decryptor().clone().unwrap());
699+
// let aad_file_unique = file_decryptor?.aad_file_unique();
700+
// let aad_prefix = file_decryptor?.aad_prefix();
701+
//
702+
// let file_decryptor = FileDecryptor::new(file_decryptor, aad_file_unique.clone(), aad_prefix.clone());
703+
704+
let crypto_context = CryptoContext::new(
705+
meta.dictionary_page_offset().is_some(), rg_idx.to_i16()?, self.column_idx.to_i16()?, file_decryptor.clone(), file_decryptor);
706+
let crypto_context = Arc::new(crypto_context);
707+
708+
let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations, Some(crypto_context));
709+
// let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations);
699710
Some(ret.map(|x| Box::new(x) as _))
700711
}
701712
}
@@ -1728,6 +1739,11 @@ mod tests {
17281739
});
17291740

17301741
// todo: decrypting data
1742+
let decryption_properties = Some(
1743+
ciphers::FileDecryptionProperties::builder()
1744+
.with_footer_key(key_code.to_vec())
1745+
.build(),
1746+
);
17311747
let record_reader =
17321748
ParquetRecordBatchReader::try_new_with_decryption(file, 128, decryption_properties.as_ref())
17331749
.unwrap();

parquet/src/arrow/async_reader/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -842,6 +842,7 @@ impl RowGroups for InMemoryRowGroup<'_> {
842842
self.metadata.column(i),
843843
self.row_count,
844844
page_locations,
845+
None,
845846
)?);
846847

847848
Ok(Box::new(ColumnChunkIterator {

parquet/src/encryption/ciphers.rs

Lines changed: 62 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! Encryption implementation specific to Parquet, as described
1919
//! in the [spec](https://github.com/apache/parquet-format/blob/master/Encryption.md).
2020
21+
use std::sync::Arc;
2122
use ring::aead::{Aad, LessSafeKey, NonceSequence, UnboundKey, AES_128_GCM};
2223
use ring::rand::{SecureRandom, SystemRandom};
2324
use crate::errors::{ParquetError, Result};
@@ -172,8 +173,12 @@ pub fn create_footer_aad(file_aad: &[u8]) -> Result<Vec<u8>> {
172173
create_module_aad(file_aad, ModuleType::Footer, -1, -1, -1)
173174
}
174175

175-
fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i32,
176-
column_ordinal: i32, page_ordinal: i32) -> Result<Vec<u8>> {
176+
pub fn create_page_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i16, column_ordinal: i16, page_ordinal: i32) -> Result<Vec<u8>> {
177+
create_module_aad(file_aad, module_type, row_group_ordinal, column_ordinal, page_ordinal)
178+
}
179+
180+
fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal: i16,
181+
column_ordinal: i16, page_ordinal: i32) -> Result<Vec<u8>> {
177182

178183
let module_buf = [module_type as u8];
179184

@@ -187,15 +192,15 @@ fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal
187192
if row_group_ordinal < 0 {
188193
return Err(general_err!("Wrong row group ordinal: {}", row_group_ordinal));
189194
}
190-
if row_group_ordinal > u16::MAX as i32 {
195+
if row_group_ordinal > i16::MAX {
191196
return Err(general_err!("Encrypted parquet files can't have more than {} row groups: {}",
192197
u16::MAX, row_group_ordinal));
193198
}
194199

195200
if column_ordinal < 0 {
196201
return Err(general_err!("Wrong column ordinal: {}", column_ordinal));
197202
}
198-
if column_ordinal > u16::MAX as i32 {
203+
if column_ordinal > i16::MAX {
199204
return Err(general_err!("Encrypted parquet files can't have more than {} columns: {}",
200205
u16::MAX, column_ordinal));
201206
}
@@ -205,25 +210,25 @@ fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal
205210
let mut aad = Vec::with_capacity(file_aad.len() + 5);
206211
aad.extend_from_slice(file_aad);
207212
aad.extend_from_slice(module_buf.as_ref());
208-
aad.extend_from_slice((row_group_ordinal as u16).to_le_bytes().as_ref());
209-
aad.extend_from_slice((column_ordinal as u16).to_le_bytes().as_ref());
213+
aad.extend_from_slice((row_group_ordinal as i16).to_le_bytes().as_ref());
214+
aad.extend_from_slice((column_ordinal as i16).to_le_bytes().as_ref());
210215
return Ok(aad)
211216
}
212217

213218
if page_ordinal < 0 {
214-
return Err(general_err!("Wrong column ordinal: {}", page_ordinal));
219+
return Err(general_err!("Wrong page ordinal: {}", page_ordinal));
215220
}
216-
if page_ordinal > u16::MAX as i32 {
221+
if page_ordinal > i32::MAX {
217222
return Err(general_err!("Encrypted parquet files can't have more than {} pages in a chunk: {}",
218223
u16::MAX, page_ordinal));
219224
}
220225

221226
let mut aad = Vec::with_capacity(file_aad.len() + 7);
222227
aad.extend_from_slice(file_aad);
223228
aad.extend_from_slice(module_buf.as_ref());
224-
aad.extend_from_slice((row_group_ordinal as u16).to_le_bytes().as_ref());
225-
aad.extend_from_slice((column_ordinal as u16).to_le_bytes().as_ref());
226-
aad.extend_from_slice((page_ordinal as u16).to_le_bytes().as_ref());
229+
aad.extend_from_slice(row_group_ordinal.to_le_bytes().as_ref());
230+
aad.extend_from_slice(column_ordinal.to_le_bytes().as_ref());
231+
aad.extend_from_slice(page_ordinal.to_le_bytes().as_ref());
227232
Ok(aad)
228233
}
229234

@@ -266,7 +271,9 @@ impl DecryptionPropertiesBuilder {
266271
pub struct FileDecryptor {
267272
decryption_properties: FileDecryptionProperties,
268273
// todo decr: change to BlockDecryptor
269-
footer_decryptor: RingGcmBlockDecryptor
274+
footer_decryptor: RingGcmBlockDecryptor,
275+
aad_file_unique: Vec<u8>,
276+
aad_prefix: Vec<u8>,
270277
}
271278

272279
impl PartialEq for FileDecryptor {
@@ -276,30 +283,63 @@ impl PartialEq for FileDecryptor {
276283
}
277284

278285
impl FileDecryptor {
279-
pub(crate) fn new(decryption_properties: &FileDecryptionProperties) -> Self {
286+
pub(crate) fn new(decryption_properties: &FileDecryptionProperties, aad_file_unique: Vec<u8>, aad_prefix: Vec<u8>) -> Self {
280287
Self {
281288
// todo decr: if no key available yet (not set in properties, will be retrieved from metadata)
282289
footer_decryptor: RingGcmBlockDecryptor::new(decryption_properties.footer_key.clone().unwrap().as_ref()),
283-
decryption_properties: decryption_properties.clone()
290+
decryption_properties: decryption_properties.clone(),
291+
aad_file_unique,
292+
aad_prefix,
284293
}
285294
}
286295

287296
// todo decr: change to BlockDecryptor
288297
pub(crate) fn get_footer_decryptor(self) -> RingGcmBlockDecryptor {
289298
self.footer_decryptor
290299
}
300+
301+
pub(crate) fn decryption_properties(&self) -> &FileDecryptionProperties {
302+
&self.decryption_properties
303+
}
304+
305+
pub(crate) fn footer_decryptor(&self) -> RingGcmBlockDecryptor {
306+
self.footer_decryptor.clone()
307+
}
308+
309+
pub(crate) fn aad_file_unique(&self) -> &Vec<u8> {
310+
&self.aad_file_unique
311+
}
312+
313+
pub(crate) fn aad_prefix(&self) -> &Vec<u8> {
314+
&self.aad_prefix
315+
}
291316
}
292317

318+
#[derive(Debug, Clone)]
293319
pub struct CryptoContext {
294-
row_group_ordinal: i32,
295-
column_ordinal: i32,
296-
metadata_decryptor: FileDecryptor,
297-
data_decryptor: FileDecryptor,
298-
file_decryption_properties: FileDecryptionProperties,
299-
aad: Vec<u8>,
320+
pub(crate) start_decrypt_with_dictionary_page: bool,
321+
pub(crate) row_group_ordinal: i16,
322+
pub(crate) column_ordinal: i16,
323+
pub(crate) data_decryptor: Arc<FileDecryptor>,
324+
pub(crate) metadata_decryptor: Arc<FileDecryptor>,
325+
300326
}
301327

302328
impl CryptoContext {
303-
pub fn data_decryptor(self) -> FileDecryptor { self.data_decryptor }
304-
pub fn file_decryption_properties(&self) -> &FileDecryptionProperties { &self.file_decryption_properties }
329+
pub fn new(start_decrypt_with_dictionary_page: bool, row_group_ordinal: i16,
330+
column_ordinal: i16, data_decryptor: Arc<FileDecryptor>,
331+
metadata_decryptor: Arc<FileDecryptor>) -> Self {
332+
Self {
333+
start_decrypt_with_dictionary_page,
334+
row_group_ordinal,
335+
column_ordinal,
336+
data_decryptor,
337+
metadata_decryptor,
338+
}
339+
}
340+
pub fn start_decrypt_with_dictionary_page(&self) -> &bool { &self.start_decrypt_with_dictionary_page }
341+
pub fn row_group_ordinal(&self) -> &i16 { &self.row_group_ordinal }
342+
pub fn column_ordinal(&self) -> &i16 { &self.column_ordinal }
343+
pub fn data_decryptor(&self) -> Arc<FileDecryptor> { self.data_decryptor.clone()}
344+
pub fn metadata_decryptor(&self) -> Arc<FileDecryptor> { self.metadata_decryptor.clone() }
305345
}

parquet/src/file/metadata/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,13 @@ impl ParquetMetaData {
218218
&self.file_metadata
219219
}
220220

221+
/// Returns file decryptor as reference.
222+
pub fn file_decryptor(&self) -> &Option<FileDecryptor> {
223+
&self.file_decryptor
224+
}
225+
226+
227+
221228
/// Returns number of row groups in this file.
222229
pub fn num_row_groups(&self) -> usize {
223230
self.row_groups.len()

parquet/src/file/metadata/reader.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -659,13 +659,19 @@ impl ParquetMetaDataReader {
659659
// todo decr: get key_metadata
660660

661661
// remaining buffer contains encrypted FileMetaData
662-
file_decryptor = Some(FileDecryptor::new(file_decryption_properties));
663-
let decryptor = file_decryptor.clone().unwrap().get_footer_decryptor();
662+
664663
// todo decr: get aad_prefix
665664
// todo decr: set both aad_prefix and aad_file_unique in file_decryptor
666-
let fmd_aad = create_footer_aad(aes_gcm_algo.aad_file_unique.unwrap().as_ref());
665+
let aad_file_unique = aes_gcm_algo.aad_file_unique.unwrap();
666+
let aad_footer = create_footer_aad(aad_file_unique.as_ref())?;
667+
let aad_prefix : Vec<u8> = aes_gcm_algo.aad_prefix.unwrap_or_default();
668+
669+
file_decryptor = Some(FileDecryptor::new(file_decryption_properties, aad_file_unique.clone(), aad_prefix.clone()));
670+
let decryptor = file_decryptor.clone().unwrap().get_footer_decryptor();
671+
// file_decryptor = Some(FileDecryptor::new(file_decryption_properties, aad, aad_prefix));
672+
667673
decrypted_fmd_buf =
668-
decryptor.decrypt(prot.as_slice().as_ref(), fmd_aad?.as_ref());
674+
decryptor.decrypt(prot.as_slice().as_ref(), aad_footer.as_ref());
669675
prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
670676
}
671677

@@ -694,7 +700,7 @@ impl ParquetMetaDataReader {
694700
schema_descr,
695701
column_orders,
696702
);
697-
Ok(ParquetMetaData::new(file_metadata, row_groups, file_decryptor))
703+
Ok(ParquetMetaData::new(file_metadata, row_groups, Some(file_decryptor.unwrap())))
698704
}
699705

700706
/// Parses column orders from Thrift definition.

0 commit comments

Comments
 (0)