Skip to content

Commit 9d17990

Browse files
committed
save progress
1 parent 3f9a143 commit 9d17990

File tree

8 files changed

+143
-37
lines changed

8 files changed

+143
-37
lines changed

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ impl ArrowReaderMetadata {
384384
pub fn load<T: ChunkReader>(
385385
reader: &T,
386386
options: ArrowReaderOptions,
387-
file_decryption_properties: Option<FileDecryptionProperties>,
387+
file_decryption_properties: Option<&FileDecryptionProperties>,
388388
) -> Result<Self> {
389389
let metadata = ParquetMetaDataReader::new()
390390
.with_page_indexes(options.page_index)
@@ -543,7 +543,7 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
543543
pub fn try_new_with_decryption(
544544
reader: T,
545545
options: ArrowReaderOptions,
546-
file_decryption_properties: Option<FileDecryptionProperties>,
546+
file_decryption_properties: Option<&FileDecryptionProperties>,
547547
) -> Result<Self> {
548548
let metadata = ArrowReaderMetadata::load(&reader, options, file_decryption_properties)?;
549549
Ok(Self::new_with_metadata(reader, metadata))
@@ -809,10 +809,11 @@ impl ParquetRecordBatchReader {
809809
/// Create a new [`ParquetRecordBatchReader`] from the provided chunk reader and [`FileDecryptionProperties`]
810810
///
811811
/// Note: this is needed when the parquet file is encrypted
812+
// todo: add options or put file_decryption_properties into options
812813
pub fn try_new_with_decryption<T: ChunkReader + 'static>(
813814
reader: T,
814815
batch_size: usize,
815-
file_decryption_properties: Option<FileDecryptionProperties>,
816+
file_decryption_properties: Option<&FileDecryptionProperties>,
816817
) -> Result<Self> {
817818
ParquetRecordBatchReaderBuilder::try_new_with_decryption(
818819
reader,
@@ -1713,7 +1714,7 @@ mod tests {
17131714
.build(),
17141715
);
17151716

1716-
let metadata = ArrowReaderMetadata::load(&file, Default::default(), decryption_properties.clone()).unwrap();
1717+
let metadata = ArrowReaderMetadata::load(&file, Default::default(), decryption_properties.as_ref()).unwrap();
17171718
let file_metadata = metadata.metadata.file_metadata();
17181719

17191720
assert_eq!(file_metadata.num_rows(), 50);
@@ -1727,9 +1728,27 @@ mod tests {
17271728
});
17281729

17291730
// todo: decrypting data
1730-
// let record_reader =
1731-
// ParquetRecordBatchReader::try_new_with_decryption(file, 128, decryption_properties)
1732-
// .unwrap();
1731+
let record_reader =
1732+
ParquetRecordBatchReader::try_new_with_decryption(file, 128, decryption_properties.as_ref())
1733+
.unwrap();
1734+
// todo check contents
1735+
let mut row_count = 0;
1736+
for batch in record_reader {
1737+
let batch = batch.unwrap();
1738+
row_count += batch.num_rows();
1739+
let f32_col = batch.column(0).as_primitive::<Float32Type>();
1740+
let f64_col = batch.column(1).as_primitive::<Float64Type>();
1741+
1742+
// This file contains floats from a standard normal distribution
1743+
for &x in f32_col.values() {
1744+
assert!(x > -10.0);
1745+
assert!(x < 10.0);
1746+
}
1747+
for &x in f64_col.values() {
1748+
assert!(x > -10.0);
1749+
assert!(x < 10.0);
1750+
}
1751+
}
17331752
}
17341753

17351754
#[test]

parquet/src/column/writer/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2027,6 +2027,7 @@ mod tests {
20272027
r.rows_written as usize,
20282028
None,
20292029
Arc::new(props),
2030+
None,
20302031
)
20312032
.unwrap();
20322033

@@ -2079,6 +2080,7 @@ mod tests {
20792080
r.rows_written as usize,
20802081
None,
20812082
Arc::new(props),
2083+
None,
20822084
)
20832085
.unwrap();
20842086

@@ -2214,6 +2216,7 @@ mod tests {
22142216
r.rows_written as usize,
22152217
None,
22162218
Arc::new(props),
2219+
None,
22172220
)
22182221
.unwrap(),
22192222
);
@@ -3484,6 +3487,7 @@ mod tests {
34843487
result.rows_written as usize,
34853488
None,
34863489
Arc::new(props),
3490+
None,
34873491
)
34883492
.unwrap(),
34893493
);

parquet/src/encryption/ciphers.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ impl BlockEncryptor for RingGcmBlockEncryptor {
119119
}
120120
}
121121

122+
#[derive(Debug, Clone)]
122123
pub(crate) struct RingGcmBlockDecryptor {
123124
key: LessSafeKey,
124125
}
@@ -226,7 +227,7 @@ fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ordinal
226227
Ok(aad)
227228
}
228229

229-
#[derive(Clone)]
230+
#[derive(Debug, Clone, PartialEq)]
230231
pub struct FileDecryptionProperties {
231232
footer_key: Option<Vec<u8>>
232233
}
@@ -261,18 +262,25 @@ impl DecryptionPropertiesBuilder {
261262
}
262263
}
263264

265+
#[derive(Debug, Clone)]
264266
pub struct FileDecryptor {
265267
decryption_properties: FileDecryptionProperties,
266268
// todo decr: change to BlockDecryptor
267269
footer_decryptor: RingGcmBlockDecryptor
268270
}
269271

272+
impl PartialEq for FileDecryptor {
273+
fn eq(&self, other: &Self) -> bool {
274+
self.decryption_properties == other.decryption_properties
275+
}
276+
}
277+
270278
impl FileDecryptor {
271-
pub(crate) fn new(decryption_properties: FileDecryptionProperties) -> Self {
279+
pub(crate) fn new(decryption_properties: &FileDecryptionProperties) -> Self {
272280
Self {
273281
// todo decr: if no key available yet (not set in properties, will be retrieved from metadata)
274282
footer_decryptor: RingGcmBlockDecryptor::new(decryption_properties.footer_key.clone().unwrap().as_ref()),
275-
decryption_properties
283+
decryption_properties: decryption_properties.clone()
276284
}
277285
}
278286

@@ -281,3 +289,17 @@ impl FileDecryptor {
281289
self.footer_decryptor
282290
}
283291
}
292+
293+
pub struct CryptoContext {
294+
row_group_ordinal: i32,
295+
column_ordinal: i32,
296+
metadata_decryptor: FileDecryptor,
297+
data_decryptor: FileDecryptor,
298+
file_decryption_properties: FileDecryptionProperties,
299+
aad: Vec<u8>,
300+
}
301+
302+
impl CryptoContext {
303+
pub fn data_decryptor(self) -> FileDecryptor { self.data_decryptor }
304+
pub fn file_decryption_properties(&self) -> &FileDecryptionProperties { &self.file_decryption_properties }
305+
}

parquet/src/file/footer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ pub fn parse_metadata<R: ChunkReader>(chunk_reader: &R) -> Result<ParquetMetaDat
6060
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader::decode_metadata")]
6161
pub fn decode_metadata(
6262
buf: &[u8],
63-
file_decryption_properties: Option<FileDecryptionProperties>,
63+
file_decryption_properties: Option<&FileDecryptionProperties>,
6464
) -> Result<ParquetMetaData> {
6565
ParquetMetaDataReader::decode_metadata(buf, file_decryption_properties)
6666
}

parquet/src/file/metadata/mod.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ use crate::schema::types::{
117117
pub use reader::ParquetMetaDataReader;
118118
pub use writer::ParquetMetaDataWriter;
119119
pub(crate) use writer::ThriftMetadataWriter;
120+
use crate::encryption::ciphers::FileDecryptor;
120121

121122
/// Page level statistics for each column chunk of each row group.
122123
///
@@ -174,15 +175,18 @@ pub struct ParquetMetaData {
174175
column_index: Option<ParquetColumnIndex>,
175176
/// Offset index for each page in each column chunk
176177
offset_index: Option<ParquetOffsetIndex>,
178+
/// Optional file decryptor
179+
file_decryptor: Option<FileDecryptor>,
177180
}
178181

179182
impl ParquetMetaData {
180183
/// Creates Parquet metadata from file metadata and a list of row
181184
/// group metadata
182-
pub fn new(file_metadata: FileMetaData, row_groups: Vec<RowGroupMetaData>) -> Self {
185+
pub fn new(file_metadata: FileMetaData, row_groups: Vec<RowGroupMetaData>, file_decryptor: Option<FileDecryptor>) -> Self {
183186
ParquetMetaData {
184187
file_metadata,
185188
row_groups,
189+
file_decryptor,
186190
column_index: None,
187191
offset_index: None,
188192
}
@@ -337,7 +341,7 @@ pub struct ParquetMetaDataBuilder(ParquetMetaData);
337341
impl ParquetMetaDataBuilder {
338342
/// Create a new builder from a file metadata, with no row groups
339343
pub fn new(file_meta_data: FileMetaData) -> Self {
340-
Self(ParquetMetaData::new(file_meta_data, vec![]))
344+
Self(ParquetMetaData::new(file_meta_data, vec![], None))
341345
}
342346

343347
/// Create a new builder from an existing ParquetMetaData
@@ -540,6 +544,8 @@ pub struct RowGroupMetaData {
540544
ordinal: Option<i16>,
541545
}
542546

547+
// todo:rok
548+
543549
impl RowGroupMetaData {
544550
/// Returns builder for row group metadata.
545551
pub fn builder(schema_descr: SchemaDescPtr) -> RowGroupMetaDataBuilder {
@@ -1861,7 +1867,7 @@ mod tests {
18611867
let parquet_meta = ParquetMetaDataBuilder::new(file_metadata.clone())
18621868
.set_row_groups(row_group_meta_with_stats)
18631869
.build();
1864-
let base_expected_size = 2312;
1870+
let base_expected_size = 2896;
18651871

18661872
assert_eq!(parquet_meta.memory_size(), base_expected_size);
18671873

@@ -1888,7 +1894,7 @@ mod tests {
18881894
]]))
18891895
.build();
18901896

1891-
let bigger_expected_size = 2816;
1897+
let bigger_expected_size = 3400;
18921898
// more set fields means more memory usage
18931899
assert!(bigger_expected_size > base_expected_size);
18941900
assert_eq!(parquet_meta.memory_size(), bigger_expected_size);

parquet/src/file/metadata/reader.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,9 @@ impl ParquetMetaDataReader {
138138
/// This is only necessary when the file is encrypted.
139139
pub fn with_encryption_properties(
140140
mut self,
141-
properties: Option<FileDecryptionProperties>,
141+
properties: Option<&FileDecryptionProperties>,
142142
) -> Self {
143-
self.file_decryption_properties = properties;
143+
self.file_decryption_properties = properties.cloned();
144144
self
145145
}
146146

@@ -364,7 +364,7 @@ impl ParquetMetaDataReader {
364364
&mut fetch,
365365
file_size,
366366
self.get_prefetch_size(),
367-
self.file_decryption_properties.clone(),
367+
self.file_decryption_properties.as_ref(),
368368
)
369369
.await?;
370370

@@ -532,7 +532,7 @@ impl ParquetMetaDataReader {
532532
let start = file_size - footer_metadata_len as u64;
533533
Self::decode_metadata(
534534
chunk_reader.get_bytes(start, metadata_len)?.as_ref(),
535-
self.file_decryption_properties.clone(),
535+
self.file_decryption_properties.as_ref(),
536536
)
537537
}
538538

@@ -554,7 +554,7 @@ impl ParquetMetaDataReader {
554554
fetch: &mut F,
555555
file_size: usize,
556556
prefetch: usize,
557-
file_decryption_properties: Option<FileDecryptionProperties>,
557+
file_decryption_properties: Option<&FileDecryptionProperties>,
558558
) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
559559
if file_size < FOOTER_SIZE {
560560
return Err(eof_err!("file size of {} is less than footer", file_size));
@@ -639,10 +639,11 @@ impl ParquetMetaDataReader {
639639
/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
640640
pub fn decode_metadata(
641641
buf: &[u8],
642-
file_decryption_properties: Option<FileDecryptionProperties>,
642+
file_decryption_properties: Option<&FileDecryptionProperties>,
643643
) -> Result<ParquetMetaData> {
644644
let mut prot = TCompactSliceInputProtocol::new(buf);
645645

646+
let mut file_decryptor = None;
646647
let decrypted_fmd_buf;
647648
if let Some(file_decryption_properties) = file_decryption_properties {
648649
let t_file_crypto_metadata: TFileCryptoMetaData =
@@ -658,13 +659,13 @@ impl ParquetMetaDataReader {
658659
// todo decr: get key_metadata
659660

660661
// remaining buffer contains encrypted FileMetaData
661-
let file_decryptor = FileDecryptor::new(file_decryption_properties);
662-
let decryptor = file_decryptor.get_footer_decryptor();
662+
file_decryptor = Some(FileDecryptor::new(file_decryption_properties));
663+
let decryptor = file_decryptor.clone().unwrap().get_footer_decryptor();
663664
// todo decr: get aad_prefix
664665
// todo decr: set both aad_prefix and aad_file_unique in file_decryptor
665666
let fmd_aad = create_footer_aad(aes_gcm_algo.aad_file_unique.unwrap().as_ref());
666667
decrypted_fmd_buf =
667-
decryptor.decrypt(prot.as_slice().as_ref(), fmd_aad.unwrap().as_ref());
668+
decryptor.decrypt(prot.as_slice().as_ref(), fmd_aad?.as_ref());
668669
prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
669670
}
670671

@@ -693,7 +694,7 @@ impl ParquetMetaDataReader {
693694
schema_descr,
694695
column_orders,
695696
);
696-
Ok(ParquetMetaData::new(file_metadata, row_groups))
697+
Ok(ParquetMetaData::new(file_metadata, row_groups, file_decryptor))
697698
}
698699

699700
/// Parses column orders from Thrift definition.

0 commit comments

Comments
 (0)