Skip to content

Commit f6d1155

Browse files
committed
Work
1 parent 1b895b6 commit f6d1155

File tree

6 files changed

+101
-24
lines changed

6 files changed

+101
-24
lines changed

parquet/src/arrow/arrow_writer/mod.rs

+7-5
Original file line numberDiff line numberDiff line change
@@ -1127,9 +1127,9 @@ mod tests {
11271127

11281128
use std::fs::File;
11291129

1130-
use crate::arrow::arrow_reader::{
1131-
ArrowReaderOptions, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder,
1132-
};
1130+
#[cfg(feature = "encryption")]
1131+
use crate::arrow::arrow_reader::ArrowReaderOptions;
1132+
use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
11331133
use crate::arrow::ARROW_SCHEMA_META_KEY;
11341134
use arrow::datatypes::ToByteSlice;
11351135
use arrow::datatypes::{DataType, Schema};
@@ -1143,8 +1143,10 @@ mod tests {
11431143

11441144
use crate::basic::Encoding;
11451145
use crate::data_type::AsBytes;
1146-
use crate::encryption::decryption::FileDecryptionProperties;
1147-
use crate::encryption::encryption::FileEncryptionProperties;
1146+
#[cfg(feature = "encryption")]
1147+
use crate::encryption::{
1148+
decryption::FileDecryptionProperties, encryption::FileEncryptionProperties,
1149+
};
11481150
use crate::file::metadata::ParquetMetaData;
11491151
use crate::file::page_index::index::Index;
11501152
use crate::file::page_index::index_reader::read_offset_indexes;

parquet/src/column/writer/mod.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
419419
data_page_boundary_ascending: true,
420420
data_page_boundary_descending: true,
421421
last_non_null_data_page_min_max: None,
422+
// metadata_encryptor: metadata_encryptor,
423+
// data_encryptor: data_encryptor,
422424
}
423425
}
424426

@@ -3403,7 +3405,8 @@ mod tests {
34033405
.with_file_encryption_properties(file_encryption_properties)
34043406
.build(),
34053407
);
3406-
let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3408+
let mut _writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3409+
todo!()
34073410
}
34083411

34093412
#[test]

parquet/src/encryption/ciphers.rs

+32-1
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,37 @@ impl RingGcmBlockEncryptor {
135135

136136
impl BlockEncryptor for RingGcmBlockEncryptor {
137137
fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec<u8> {
138-
todo!()
138+
let mut ciphertext = Vec::with_capacity(plaintext.len() + TAG_LEN);
139+
ciphertext.extend(plaintext);
140+
let nonce = self.nonce_sequence.advance().unwrap();
141+
let nonce_bytes = nonce.as_ref().clone();
142+
self.key.seal_in_place_append_tag(nonce, Aad::from(aad), &mut ciphertext).unwrap();
143+
144+
let mut out = Vec::with_capacity(ciphertext.len() + SIZE_LEN + NONCE_LEN);
145+
out.extend(ciphertext.len().to_be_bytes());
146+
out.extend(nonce_bytes);
147+
out.extend_from_slice(ciphertext.as_ref());
148+
out
149+
}
150+
}
151+
152+
153+
#[cfg(test)]
154+
mod tests {
155+
use super::*;
156+
157+
#[test]
158+
fn test_round_trip() {
159+
let key = [0u8; 16];
160+
let mut encryptor = RingGcmBlockEncryptor::new(&key);
161+
let decryptor = RingGcmBlockDecryptor::new(&key);
162+
163+
let plaintext = b"hello, world!";
164+
let aad = b"some aad";
165+
166+
let ciphertext = encryptor.encrypt(plaintext, aad);
167+
let decrypted = decryptor.decrypt(&ciphertext, aad).unwrap();
168+
169+
assert_eq!(plaintext, decrypted.as_slice());
139170
}
140171
}

parquet/src/encryption/encryption.rs

+12-6
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::collections::HashMap;
1919
use std::sync::Arc;
2020
use crate::encryption::ciphers::{RingGcmBlockEncryptor, BlockEncryptor};
2121

22-
#[derive(Debug, Clone)]
22+
#[derive(Debug, Clone, PartialEq)]
2323
pub struct FileEncryptionProperties {
2424
encrypt_footer: bool,
2525
footer_key: Vec<u8>,
@@ -62,20 +62,26 @@ impl EncryptionPropertiesBuilder {
6262
pub struct FileEncryptor {
6363
encryption_properties: FileEncryptionProperties,
6464
footer_encryptor: Option<Arc<dyn BlockEncryptor>>,
65+
column_encryptors: Option<HashMap<Vec<u8>, Arc<dyn BlockEncryptor>>>,
6566
file_aad: Vec<u8>,
6667
}
6768

6869
impl FileEncryptor {
6970
pub(crate) fn new(
70-
encryption_properties: FileEncryptionProperties,
71-
aad_file_unique: Vec<u8>,
72-
aad_prefix: Vec<u8>,
71+
encryption_properties: FileEncryptionProperties, file_aad: Vec<u8>,
7372
) -> Self {
74-
let file_aad = [aad_prefix.as_slice(), aad_file_unique.as_slice()].concat();
75-
let footer_encryptor = RingGcmBlockEncryptor::new(&encryption_properties.footer_key);
73+
let footer_encryptor = RingGcmBlockEncryptor::new(&encryption_properties.footer_key.clone());
74+
let mut column_encryptors: HashMap<Vec<u8>, Arc<dyn BlockEncryptor>> = HashMap::new();
75+
if let Some(column_keys) = encryption_properties.column_keys.clone() {
76+
for (column_name, key) in column_keys.iter() {
77+
let column_encryptor = Arc::new(RingGcmBlockEncryptor::new(key));
78+
column_encryptors.insert(column_name.clone(), column_encryptor);
79+
}
80+
}
7681
Self {
7782
encryption_properties,
7883
footer_encryptor: Some(Arc::new(footer_encryptor)),
84+
column_encryptors: Some(column_encryptors),
7985
file_aad,
8086
}
8187
}

parquet/src/file/properties.rs

+1-6
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ pub struct WriterProperties {
172172
statistics_truncate_length: Option<usize>,
173173
coerce_types: bool,
174174
#[cfg(feature = "encryption")]
175-
file_encryption_properties: Option<FileEncryptionProperties>,
175+
pub(crate) file_encryption_properties: Option<FileEncryptionProperties>,
176176
}
177177

178178
impl Default for WriterProperties {
@@ -374,11 +374,6 @@ impl WriterProperties {
374374
.and_then(|c| c.bloom_filter_properties())
375375
.or_else(|| self.default_column_properties.bloom_filter_properties())
376376
}
377-
378-
#[cfg(feature = "encryption")]
379-
pub fn file_encryption_properties(&self) -> Option<&FileEncryptionProperties> {
380-
self.file_encryption_properties.as_ref()
381-
}
382377
}
383378

384379
/// Builder for [`WriterProperties`] parquet writer configuration.

parquet/src/file/writer.rs

+45-5
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::column::{
3333
writer::{get_column_writer, ColumnWriter},
3434
};
3535
use crate::data_type::DataType;
36-
use crate::encryption::ciphers::RingGcmBlockEncryptor;
36+
#[cfg(feature = "encryption")]
3737
use crate::encryption::encryption::FileEncryptor;
3838
use crate::errors::{ParquetError, Result};
3939
use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
@@ -155,6 +155,8 @@ pub struct SerializedFileWriter<W: Write> {
155155
// kv_metadatas will be appended to `props` when `write_metadata`
156156
kv_metadatas: Vec<KeyValue>,
157157
finished: bool,
158+
#[cfg(feature = "encryption")]
159+
file_encryptor: Option<FileEncryptor>,
158160
}
159161

160162
impl<W: Write> Debug for SerializedFileWriter<W> {
@@ -173,19 +175,45 @@ impl<W: Write + Send> SerializedFileWriter<W> {
173175
/// Creates new file writer.
174176
pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
175177
let mut buf = TrackedWrite::new(buf);
178+
#[cfg(feature = "encryption")]
179+
let file_encryptor = if properties.file_encryption_properties.is_some() {
180+
Some(FileEncryptor::new(
181+
properties
182+
.file_encryption_properties
183+
.as_ref()
184+
.unwrap()
185+
.clone(),
186+
vec![],
187+
))
188+
} else {
189+
None
190+
};
191+
192+
#[cfg(feature = "encryption")]
193+
if properties.file_encryption_properties.is_some() {
194+
// todo: check if all columns in properties.file_encryption_properties.column_keys
195+
// are present in the schema
196+
let _fep = properties.file_encryption_properties.clone().unwrap();
197+
Self::start_encrypted_file(&mut buf)?;
198+
} else {
199+
Self::start_file(&mut buf)?;
200+
}
201+
#[cfg(not(feature = "encryption"))]
176202
Self::start_file(&mut buf)?;
177203
Ok(Self {
178204
buf,
179205
schema: schema.clone(),
180206
descr: Arc::new(SchemaDescriptor::new(schema)),
181-
props: properties,
207+
props: properties.clone(),
182208
row_groups: vec![],
183209
bloom_filters: vec![],
184210
column_indexes: Vec::new(),
185211
offset_indexes: Vec::new(),
186212
row_group_index: 0,
187213
kv_metadatas: Vec::new(),
188214
finished: false,
215+
#[cfg(feature = "encryption")]
216+
file_encryptor,
189217
})
190218
}
191219

@@ -274,6 +302,11 @@ impl<W: Write + Send> SerializedFileWriter<W> {
274302
Ok(())
275303
}
276304

305+
fn start_encrypted_file(buf: &mut TrackedWrite<W>) -> Result<()> {
306+
buf.write_all(&PARQUET_MAGIC)?;
307+
Ok(())
308+
}
309+
277310
/// Assembles and writes metadata at the end of the file.
278311
fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
279312
self.finished = true;
@@ -525,9 +558,16 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
525558
) -> Result<C>,
526559
{
527560
self.assert_previous_writer_closed()?;
528-
let file_encryption_properties = self.props.file_encryption_properties();
529-
let file_encryptor =
530-
FileEncryptor::new(file_encryption_properties.unwrap().clone(), vec![], vec![]);
561+
#[cfg(feature = "encryption")]
562+
let file_encryptor = FileEncryptor::new(
563+
self.props
564+
.file_encryption_properties
565+
.as_ref()
566+
.unwrap()
567+
.clone(),
568+
vec![],
569+
);
570+
531571
Ok(match self.next_column_desc() {
532572
Some(column) => {
533573
let props = self.props.clone();

0 commit comments

Comments
 (0)