Skip to content

Commit 24e70d9

Browse files
committed
Get per-column encryption working and various tidy ups
1 parent 073550b commit 24e70d9

File tree

7 files changed

+195
-72
lines changed

7 files changed

+195
-72
lines changed

parquet/src/arrow/arrow_writer/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ impl PageWriter for ArrowPageWriter {
510510
match self.page_encryptor.as_ref() {
511511
#[cfg(feature = "encryption")]
512512
Some(encryptor) => {
513-
encryptor.encrypt_page_header(page_header, &mut header)?;
513+
encryptor.encrypt_page_header(&page_header, &mut header)?;
514514
}
515515
_ => {
516516
let mut protocol = TCompactOutputProtocol::new(&mut header);
@@ -3824,8 +3824,8 @@ mod tests {
38243824
let column_1_key = EncryptionKey::new(column_1_key.as_bytes().to_vec());
38253825
let column_2_key = EncryptionKey::new(column_2_key.as_bytes().to_vec());
38263826
let file_encryption_properties = FileEncryptionProperties::builder(footer_key.to_vec())
3827-
.with_column_key("double_field".as_bytes().to_vec(), column_1_key)
3828-
.with_column_key("float_field".as_bytes().to_vec(), column_2_key)
3827+
.with_column_key("double_field".into(), column_1_key)
3828+
.with_column_key("float_field".into(), column_2_key)
38293829
.build();
38303830

38313831
let temp_file =

parquet/src/column/writer/mod.rs

+11-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
3232
use crate::data_type::private::ParquetValueType;
3333
use crate::data_type::*;
3434
use crate::encodings::levels::LevelEncoder;
35+
#[cfg(feature = "encryption")]
36+
use crate::encryption::encrypt::get_column_crypto_metadata;
3537
use crate::errors::{ParquetError, Result};
3638
use crate::file::metadata::{ColumnIndexBuilder, LevelHistogram, OffsetIndexBuilder};
3739
use crate::file::properties::EnabledStatistics;
@@ -1199,6 +1201,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
11991201
);
12001202
}
12011203

1204+
#[cfg(feature = "encryption")]
1205+
if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1206+
builder = builder.set_column_crypto_metadata(get_column_crypto_metadata(
1207+
encryption_properties,
1208+
&self.descr,
1209+
));
1210+
}
1211+
12021212
let metadata = builder.build()?;
12031213
Ok(metadata)
12041214
}
@@ -3505,7 +3515,7 @@ mod tests {
35053515
let footer_key: &[u8] = "0123456789012345".as_bytes();
35063516
let column_key = EncryptionKey::new(b"1234567890123450".to_vec());
35073517
let file_encryption_properties = FileEncryptionProperties::builder(footer_key.to_vec())
3508-
.with_column_key(b"a".to_vec(), column_key.clone())
3518+
.with_column_key("a".into(), column_key.clone())
35093519
.build();
35103520

35113521
let props = Arc::new(

parquet/src/encryption/ciphers.rs

+23-15
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ impl BlockDecryptor for RingGcmBlockDecryptor {
8282
}
8383

8484
pub trait BlockEncryptor: Debug + Send + Sync {
85-
fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec<u8>;
85+
fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Result<Vec<u8>>;
8686
}
8787

8888
#[derive(Debug, Clone)]
@@ -151,20 +151,28 @@ impl RingGcmBlockEncryptor {
151151
}
152152

153153
impl BlockEncryptor for RingGcmBlockEncryptor {
154-
fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec<u8> {
155-
let mut ciphertext = Vec::with_capacity(plaintext.len() + TAG_LEN);
154+
fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Result<Vec<u8>> {
155+
// Create encrypted buffer.
156+
// Format is: [ciphertext size, nonce, ciphertext, authentication tag]
157+
let ciphertext_length = NONCE_LEN + plaintext.len() + TAG_LEN;
158+
let mut ciphertext = Vec::with_capacity(SIZE_LEN + ciphertext_length);
159+
ciphertext.extend((ciphertext_length as u32).to_le_bytes());
160+
161+
let nonce = self.nonce_sequence.advance()?;
162+
ciphertext.extend(nonce.as_ref());
156163
ciphertext.extend(plaintext);
157-
let nonce = self.nonce_sequence.advance().unwrap();
158-
let nonce_bytes = *nonce.as_ref();
159-
self.key
160-
.seal_in_place_append_tag(nonce, Aad::from(aad), &mut ciphertext)
161-
.unwrap();
162-
163-
let mut out = Vec::with_capacity(ciphertext.len() + SIZE_LEN + NONCE_LEN);
164-
out.extend(((ciphertext.len() + NONCE_LEN) as u32).to_le_bytes());
165-
out.extend(nonce_bytes);
166-
out.extend_from_slice(ciphertext.as_ref());
167-
out
164+
165+
let tag = self.key.seal_in_place_separate_tag(
166+
nonce,
167+
Aad::from(aad),
168+
&mut ciphertext[SIZE_LEN + NONCE_LEN..],
169+
)?;
170+
171+
ciphertext.extend(tag.as_ref());
172+
173+
debug_assert_eq!(SIZE_LEN + ciphertext_length, ciphertext.len());
174+
175+
Ok(ciphertext)
168176
}
169177
}
170178

@@ -181,7 +189,7 @@ mod tests {
181189
let plaintext = b"hello, world!";
182190
let aad = b"some aad";
183191

184-
let ciphertext = encryptor.encrypt(plaintext, aad);
192+
let ciphertext = encryptor.encrypt(plaintext, aad).unwrap();
185193
let decrypted = decryptor.decrypt(&ciphertext, aad).unwrap();
186194

187195
assert_eq!(plaintext, decrypted.as_slice());

parquet/src/encryption/encrypt.rs

+48-12
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
use crate::encryption::ciphers::{BlockEncryptor, RingGcmBlockEncryptor};
1919
use crate::errors::Result;
20+
use crate::file::column_crypto_metadata::{ColumnCryptoMetaData, EncryptionWithColumnKey};
21+
use crate::schema::types::ColumnDescPtr;
2022
use crate::thrift::TSerializable;
2123
use ring::rand::{SecureRandom, SystemRandom};
2224
use std::collections::HashMap;
@@ -51,7 +53,7 @@ impl EncryptionKey {
5153
pub struct FileEncryptionProperties {
5254
encrypt_footer: bool,
5355
footer_key: EncryptionKey,
54-
column_keys: HashMap<Vec<u8>, EncryptionKey>,
56+
column_keys: HashMap<String, EncryptionKey>,
5557
aad_prefix: Option<Vec<u8>>,
5658
store_aad_prefix: bool,
5759
}
@@ -80,7 +82,7 @@ impl FileEncryptionProperties {
8082

8183
pub struct EncryptionPropertiesBuilder {
8284
footer_key: EncryptionKey,
83-
column_keys: HashMap<Vec<u8>, EncryptionKey>,
85+
column_keys: HashMap<String, EncryptionKey>,
8486
aad_prefix: Option<Vec<u8>>,
8587
encrypt_footer: bool,
8688
store_aad_prefix: bool,
@@ -107,8 +109,8 @@ impl EncryptionPropertiesBuilder {
107109
self
108110
}
109111

110-
pub fn with_column_key(mut self, column_name: Vec<u8>, encryption_key: EncryptionKey) -> Self {
111-
self.column_keys.insert(column_name, encryption_key);
112+
pub fn with_column_key(mut self, column_path: String, encryption_key: EncryptionKey) -> Self {
113+
self.column_keys.insert(column_path, encryption_key);
112114
self
113115
}
114116

@@ -179,18 +181,17 @@ impl FileEncryptor {
179181
if self.properties.column_keys.is_empty() {
180182
return self.get_footer_encryptor();
181183
}
182-
// TODO: Column paths should be stored as String
183-
let column_path = column_path.as_bytes();
184184
match self.properties.column_keys.get(column_path) {
185185
None => todo!("Handle unencrypted columns"),
186186
Some(column_key) => Ok(Box::new(RingGcmBlockEncryptor::new(column_key.key())?)),
187187
}
188188
}
189189
}
190190

191+
/// Encrypt a Thrift serializable object
191192
pub(crate) fn encrypt_object<T: TSerializable, W: Write>(
192-
object: T,
193-
encryptor: &FileEncryptor,
193+
object: &T,
194+
encryptor: &mut Box<dyn BlockEncryptor>,
194195
sink: &mut W,
195196
module_aad: &[u8],
196197
) -> Result<()> {
@@ -200,11 +201,46 @@ pub(crate) fn encrypt_object<T: TSerializable, W: Write>(
200201
object.write_to_out_protocol(&mut unencrypted_protocol)?;
201202
}
202203

203-
// TODO: Get correct encryptor (footer vs column, data vs metadata)
204-
let encrypted_buffer = encryptor
205-
.get_footer_encryptor()?
206-
.encrypt(buffer.as_ref(), module_aad);
204+
let encrypted_buffer = encryptor.encrypt(buffer.as_ref(), module_aad)?;
207205

208206
sink.write_all(&encrypted_buffer)?;
209207
Ok(())
210208
}
209+
210+
/// Encrypt a Thrift serializable object
211+
pub(crate) fn encrypt_object_to_vec<T: TSerializable>(
212+
object: &T,
213+
encryptor: &mut Box<dyn BlockEncryptor>,
214+
module_aad: &[u8],
215+
) -> Result<Vec<u8>> {
216+
let mut buffer: Vec<u8> = vec![];
217+
{
218+
let mut unencrypted_protocol = TCompactOutputProtocol::new(&mut buffer);
219+
object.write_to_out_protocol(&mut unencrypted_protocol)?;
220+
}
221+
222+
encryptor.encrypt(buffer.as_ref(), module_aad)
223+
}
224+
225+
/// Get the crypto metadata for a column from the file encryption properties
226+
pub fn get_column_crypto_metadata(
227+
properties: &FileEncryptionProperties,
228+
column: &ColumnDescPtr,
229+
) -> Option<ColumnCryptoMetaData> {
230+
if properties.column_keys.is_empty() {
231+
// Uniform encryption
232+
Some(ColumnCryptoMetaData::EncryptionWithFooterKey)
233+
} else {
234+
match properties.column_keys.get(&column.path().string()) {
235+
// Column is not encrypted
236+
None => None,
237+
// Column is encrypted with a column specific key
238+
Some(encryption_key) => Some(ColumnCryptoMetaData::EncryptionWithColumnKey(
239+
EncryptionWithColumnKey {
240+
path_in_schema: column.path().parts().to_vec(),
241+
key_metadata: encryption_key.key_metadata.clone(),
242+
},
243+
)),
244+
}
245+
}
246+
}

parquet/src/encryption/page_encryptor.rs

+7-5
Original file line numberDiff line numberDiff line change
@@ -68,16 +68,14 @@ impl PageEncryptor {
6868
let mut encryptor = self
6969
.file_encryptor
7070
.get_column_encryptor(&self.column_path)?;
71-
// todo: use column encryptor when needed
72-
// self.file_encryptor.get_column_encryptor(self.column_path.as_ref())
73-
let encrypted_buffer = encryptor.encrypt(page.data(), &aad);
71+
let encrypted_buffer = encryptor.encrypt(page.data(), &aad)?;
7472

7573
Ok(encrypted_buffer)
7674
}
7775

7876
pub fn encrypt_page_header<W: Write>(
7977
&self,
80-
page_header: PageHeader,
78+
page_header: &PageHeader,
8179
sink: &mut W,
8280
) -> crate::errors::Result<()> {
8381
let module_type = match page_header.type_ {
@@ -99,6 +97,10 @@ impl PageEncryptor {
9997
Some(self.page_index),
10098
)?;
10199

102-
encrypt_object(page_header, &self.file_encryptor, sink, &aad)
100+
let mut encryptor = self
101+
.file_encryptor
102+
.get_column_encryptor(&self.column_path)?;
103+
104+
encrypt_object(page_header, &mut encryptor, sink, &aad)
103105
}
104106
}

0 commit comments

Comments
 (0)