Skip to content

Commit 8e6a4be

Browse files
committed
Work
1 parent 1b895b6 commit 8e6a4be

File tree

6 files changed

+64
-23
lines changed

6 files changed

+64
-23
lines changed

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1127,9 +1127,9 @@ mod tests {
11271127

11281128
use std::fs::File;
11291129

1130-
use crate::arrow::arrow_reader::{
1131-
ArrowReaderOptions, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder,
1132-
};
1130+
#[cfg(feature = "encryption")]
1131+
use crate::arrow::arrow_reader::ArrowReaderOptions;
1132+
use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
11331133
use crate::arrow::ARROW_SCHEMA_META_KEY;
11341134
use arrow::datatypes::ToByteSlice;
11351135
use arrow::datatypes::{DataType, Schema};
@@ -1143,8 +1143,10 @@ mod tests {
11431143

11441144
use crate::basic::Encoding;
11451145
use crate::data_type::AsBytes;
1146-
use crate::encryption::decryption::FileDecryptionProperties;
1147-
use crate::encryption::encryption::FileEncryptionProperties;
1146+
#[cfg(feature = "encryption")]
1147+
use crate::encryption::{
1148+
decryption::FileDecryptionProperties, encryption::FileEncryptionProperties,
1149+
};
11481150
use crate::file::metadata::ParquetMetaData;
11491151
use crate::file::page_index::index::Index;
11501152
use crate::file::page_index::index_reader::read_offset_indexes;

parquet/src/column/writer/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
419419
data_page_boundary_ascending: true,
420420
data_page_boundary_descending: true,
421421
last_non_null_data_page_min_max: None,
422+
// metadata_encryptor: metadata_encryptor,
423+
// data_encryptor: data_encryptor,
422424
}
423425
}
424426

@@ -3403,7 +3405,8 @@ mod tests {
34033405
.with_file_encryption_properties(file_encryption_properties)
34043406
.build(),
34053407
);
3406-
let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3408+
let mut _writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3409+
todo!()
34073410
}
34083411

34093412
#[test]

parquet/src/encryption/ciphers.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,3 +138,6 @@ impl BlockEncryptor for RingGcmBlockEncryptor {
138138
todo!()
139139
}
140140
}
141+
142+
143+
// todo: test encrypting to a buffer and then decrypting from it

parquet/src/encryption/encryption.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::collections::HashMap;
1919
use std::sync::Arc;
2020
use crate::encryption::ciphers::{RingGcmBlockEncryptor, BlockEncryptor};
2121

22-
#[derive(Debug, Clone)]
22+
#[derive(Debug, Clone, PartialEq)]
2323
pub struct FileEncryptionProperties {
2424
encrypt_footer: bool,
2525
footer_key: Vec<u8>,
@@ -62,20 +62,26 @@ impl EncryptionPropertiesBuilder {
6262
pub struct FileEncryptor {
6363
encryption_properties: FileEncryptionProperties,
6464
footer_encryptor: Option<Arc<dyn BlockEncryptor>>,
65+
column_encryptors: Option<HashMap<Vec<u8>, Arc<dyn BlockEncryptor>>>,
6566
file_aad: Vec<u8>,
6667
}
6768

6869
impl FileEncryptor {
6970
pub(crate) fn new(
70-
encryption_properties: FileEncryptionProperties,
71-
aad_file_unique: Vec<u8>,
72-
aad_prefix: Vec<u8>,
71+
encryption_properties: FileEncryptionProperties, file_aad: Vec<u8>,
7372
) -> Self {
74-
let file_aad = [aad_prefix.as_slice(), aad_file_unique.as_slice()].concat();
75-
let footer_encryptor = RingGcmBlockEncryptor::new(&encryption_properties.footer_key);
73+
let footer_encryptor = RingGcmBlockEncryptor::new(&encryption_properties.footer_key.clone());
74+
let mut column_encryptors: HashMap<Vec<u8>, Arc<dyn BlockEncryptor>> = HashMap::new();
75+
if let Some(column_keys) = encryption_properties.column_keys.clone() {
76+
for (column_name, key) in column_keys.iter() {
77+
let column_encryptor = Arc::new(RingGcmBlockEncryptor::new(key));
78+
column_encryptors.insert(column_name.clone(), column_encryptor);
79+
}
80+
}
7681
Self {
7782
encryption_properties,
7883
footer_encryptor: Some(Arc::new(footer_encryptor)),
84+
column_encryptors: Some(column_encryptors),
7985
file_aad,
8086
}
8187
}

parquet/src/file/properties.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ pub struct WriterProperties {
172172
statistics_truncate_length: Option<usize>,
173173
coerce_types: bool,
174174
#[cfg(feature = "encryption")]
175-
file_encryption_properties: Option<FileEncryptionProperties>,
175+
pub(crate) file_encryption_properties: Option<FileEncryptionProperties>,
176176
}
177177

178178
impl Default for WriterProperties {
@@ -374,11 +374,6 @@ impl WriterProperties {
374374
.and_then(|c| c.bloom_filter_properties())
375375
.or_else(|| self.default_column_properties.bloom_filter_properties())
376376
}
377-
378-
#[cfg(feature = "encryption")]
379-
pub fn file_encryption_properties(&self) -> Option<&FileEncryptionProperties> {
380-
self.file_encryption_properties.as_ref()
381-
}
382377
}
383378

384379
/// Builder for [`WriterProperties`] parquet writer configuration.

parquet/src/file/writer.rs

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::column::{
3333
writer::{get_column_writer, ColumnWriter},
3434
};
3535
use crate::data_type::DataType;
36-
use crate::encryption::ciphers::RingGcmBlockEncryptor;
36+
#[cfg(feature = "encryption")]
3737
use crate::encryption::encryption::FileEncryptor;
3838
use crate::errors::{ParquetError, Result};
3939
use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
@@ -155,6 +155,8 @@ pub struct SerializedFileWriter<W: Write> {
155155
// kv_metadatas will be appended to `props` when `write_metadata`
156156
kv_metadatas: Vec<KeyValue>,
157157
finished: bool,
158+
#[cfg(feature = "encryption")]
159+
file_encryptor: Option<FileEncryptor>,
158160
}
159161

160162
impl<W: Write> Debug for SerializedFileWriter<W> {
@@ -173,19 +175,41 @@ impl<W: Write + Send> SerializedFileWriter<W> {
173175
/// Creates new file writer.
174176
pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
175177
let mut buf = TrackedWrite::new(buf);
178+
#[cfg(feature = "encryption")]
179+
let file_encryptor = if properties.file_encryption_properties.is_some() {
180+
Some(FileEncryptor::new(
181+
properties.file_encryption_properties.as_ref().unwrap().clone(),
182+
vec![],
183+
))
184+
} else {
185+
None
186+
};
187+
188+
#[cfg(feature = "encryption")]
189+
if properties.file_encryption_properties.is_some() {
190+
// todo: check if all columns in properties.file_encryption_properties.column_keys
191+
// are present in the schema
192+
let _fep = properties.file_encryption_properties.clone().unwrap();
193+
Self::start_encrypted_file(&mut buf)?;
194+
} else {
195+
Self::start_file(&mut buf)?;
196+
}
197+
#[cfg(not(feature = "encryption"))]
176198
Self::start_file(&mut buf)?;
177199
Ok(Self {
178200
buf,
179201
schema: schema.clone(),
180202
descr: Arc::new(SchemaDescriptor::new(schema)),
181-
props: properties,
203+
props: properties.clone(),
182204
row_groups: vec![],
183205
bloom_filters: vec![],
184206
column_indexes: Vec::new(),
185207
offset_indexes: Vec::new(),
186208
row_group_index: 0,
187209
kv_metadatas: Vec::new(),
188210
finished: false,
211+
#[cfg(feature = "encryption")]
212+
file_encryptor,
189213
})
190214
}
191215

@@ -274,6 +298,11 @@ impl<W: Write + Send> SerializedFileWriter<W> {
274298
Ok(())
275299
}
276300

301+
fn start_encrypted_file(buf: &mut TrackedWrite<W>) -> Result<()> {
302+
buf.write_all(&PARQUET_MAGIC)?;
303+
Ok(())
304+
}
305+
277306
/// Assembles and writes metadata at the end of the file.
278307
fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
279308
self.finished = true;
@@ -525,9 +554,12 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
525554
) -> Result<C>,
526555
{
527556
self.assert_previous_writer_closed()?;
528-
let file_encryption_properties = self.props.file_encryption_properties();
529-
let file_encryptor =
530-
FileEncryptor::new(file_encryption_properties.unwrap().clone(), vec![], vec![]);
557+
#[cfg(feature = "encryption")]
558+
let file_encryptor = FileEncryptor::new(
559+
self.props.file_encryption_properties.as_ref().unwrap().clone(),
560+
vec![],
561+
);
562+
531563
Ok(match self.next_column_desc() {
532564
Some(column) => {
533565
let props = self.props.clone();

0 commit comments

Comments
 (0)