Skip to content

Commit 1b895b6

Browse files
committed
Start encryption
1 parent a4ec1ab commit 1b895b6

File tree

7 files changed

+241
-4
lines changed

7 files changed

+241
-4
lines changed

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1127,7 +1127,9 @@ mod tests {
11271127

11281128
use std::fs::File;
11291129

1130-
use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1130+
use crate::arrow::arrow_reader::{
1131+
ArrowReaderOptions, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder,
1132+
};
11311133
use crate::arrow::ARROW_SCHEMA_META_KEY;
11321134
use arrow::datatypes::ToByteSlice;
11331135
use arrow::datatypes::{DataType, Schema};
@@ -1141,6 +1143,8 @@ mod tests {
11411143

11421144
use crate::basic::Encoding;
11431145
use crate::data_type::AsBytes;
1146+
use crate::encryption::decryption::FileDecryptionProperties;
1147+
use crate::encryption::encryption::FileEncryptionProperties;
11441148
use crate::file::metadata::ParquetMetaData;
11451149
use crate::file::page_index::index::Index;
11461150
use crate::file::page_index::index_reader::read_offset_indexes;
@@ -3546,8 +3550,14 @@ mod tests {
35463550
let file = tempfile::tempfile().unwrap();
35473551

35483552
// todo: add encryption
3553+
let key_code: &[u8] = "0123456789012345".as_bytes();
3554+
let file_encryption_properties = FileEncryptionProperties::builder(key_code.to_vec())
3555+
.build()
3556+
.unwrap();
3557+
35493558
let props = WriterProperties::builder()
35503559
.set_max_row_group_size(200)
3560+
.with_file_encryption_properties(file_encryption_properties)
35513561
.build();
35523562

35533563
let mut writer =
@@ -3560,8 +3570,19 @@ mod tests {
35603570

35613571
writer.close().unwrap();
35623572

3563-
// todo: try_new_with_decryption
3564-
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3573+
let footer_key = "0123456789012345".as_bytes();
3574+
let column_key = "1234567890123450".as_bytes();
3575+
3576+
let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec())
3577+
.with_column_key("int".as_bytes().to_vec(), column_key.to_vec())
3578+
.build()
3579+
.unwrap();
3580+
3581+
let options =
3582+
ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);
3583+
3584+
// todo: remove with_file_decryption_properties from things that are not ArrowReaderOptions
3585+
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
35653586
assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
35663587

35673588
let batches = builder

parquet/src/column/writer/mod.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1535,6 +1535,8 @@ mod tests {
15351535
page::PageReader,
15361536
reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
15371537
};
1538+
#[cfg(feature = "encryption")]
1539+
use crate::encryption::encryption::FileEncryptionProperties;
15381540
use crate::file::writer::TrackedWrite;
15391541
use crate::file::{
15401542
properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
@@ -3379,6 +3381,31 @@ mod tests {
33793381
);
33803382
}
33813383

3384+
#[cfg(feature = "encryption")]
3385+
#[test]
3386+
fn test_encryption_writer() {
3387+
let message_type = "
3388+
message test_schema {
3389+
OPTIONAL BYTE_ARRAY a (UTF8);
3390+
}
3391+
";
3392+
let schema = Arc::new(parse_message_type(message_type).unwrap());
3393+
let file: File = tempfile::tempfile().unwrap();
3394+
3395+
let builder = WriterProperties::builder();
3396+
let key_code: &[u8] = "0123456789012345".as_bytes();
3397+
let file_encryption_properties = FileEncryptionProperties::builder(key_code.to_vec())
3398+
.build()
3399+
.unwrap();
3400+
3401+
let props = Arc::new(
3402+
builder
3403+
.with_file_encryption_properties(file_encryption_properties)
3404+
.build(),
3405+
);
3406+
let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3407+
}
3408+
33823409
#[test]
33833410
fn test_increment_max_binary_chars() {
33843411
let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);

parquet/src/encryption/ciphers.rs

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
// under the License.
1717

1818
use crate::errors::Result;
19-
use ring::aead::{Aad, LessSafeKey, UnboundKey, AES_128_GCM};
19+
use ring::aead::{Aad, LessSafeKey, NonceSequence, UnboundKey, AES_128_GCM};
20+
use ring::rand::{SecureRandom, SystemRandom};
2021
use std::fmt::Debug;
2122

23+
const RIGHT_TWELVE: u128 = 0x0000_0000_ffff_ffff_ffff_ffff_ffff_ffff;
2224
const NONCE_LEN: usize = 12;
2325
const TAG_LEN: usize = 16;
2426
const SIZE_LEN: usize = 4;
@@ -60,3 +62,79 @@ impl BlockDecryptor for RingGcmBlockDecryptor {
6062
Ok(result)
6163
}
6264
}
65+
66+
pub trait BlockEncryptor: Debug + Send + Sync {
67+
fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec<u8>;
68+
}
69+
70+
#[derive(Debug, Clone)]
71+
struct CounterNonce {
72+
start: u128,
73+
counter: u128,
74+
}
75+
76+
impl CounterNonce {
77+
pub fn new(rng: &SystemRandom) -> Self {
78+
let mut buf = [0; 16];
79+
rng.fill(&mut buf).unwrap();
80+
81+
// Since this is a random seed value, endianess doesn't matter at all,
82+
// and we can use whatever is platform-native.
83+
let start = u128::from_ne_bytes(buf) & RIGHT_TWELVE;
84+
let counter = start.wrapping_add(1);
85+
86+
Self { start, counter }
87+
}
88+
89+
/// One accessor for the nonce bytes to avoid potentially flipping endianess
90+
#[inline]
91+
pub fn get_bytes(&self) -> [u8; NONCE_LEN] {
92+
self.counter.to_le_bytes()[0..NONCE_LEN].try_into().unwrap()
93+
}
94+
}
95+
96+
impl NonceSequence for CounterNonce {
97+
fn advance(&mut self) -> Result<ring::aead::Nonce, ring::error::Unspecified> {
98+
// If we've wrapped around, we've exhausted this nonce sequence
99+
if (self.counter & RIGHT_TWELVE) == (self.start & RIGHT_TWELVE) {
100+
Err(ring::error::Unspecified)
101+
} else {
102+
// Otherwise, just advance and return the new value
103+
let buf: [u8; NONCE_LEN] = self.get_bytes();
104+
self.counter = self.counter.wrapping_add(1);
105+
Ok(ring::aead::Nonce::assume_unique_for_key(buf))
106+
}
107+
}
108+
}
109+
110+
#[derive(Debug, Clone)]
111+
pub(crate) struct RingGcmBlockEncryptor {
112+
key: LessSafeKey,
113+
nonce_sequence: CounterNonce,
114+
}
115+
116+
impl RingGcmBlockEncryptor {
117+
// todo TBD: some KMS systems produce data keys, need to be able to pass them to Encryptor.
118+
// todo TBD: for other KMSs, we will create data keys inside arrow-rs, making sure to use SystemRandom
119+
/// Create a new `RingGcmBlockEncryptor` with a given key and random nonce.
120+
/// The nonce will advance appropriately with each block encryption and
121+
/// return an error if it wraps around.
122+
pub(crate) fn new(key_bytes: &[u8]) -> Self {
123+
let rng = SystemRandom::new();
124+
125+
// todo support other key sizes
126+
let key = UnboundKey::new(&AES_128_GCM, key_bytes.as_ref()).unwrap();
127+
let nonce = CounterNonce::new(&rng);
128+
129+
Self {
130+
key: LessSafeKey::new(key),
131+
nonce_sequence: nonce,
132+
}
133+
}
134+
}
135+
136+
impl BlockEncryptor for RingGcmBlockEncryptor {
137+
fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec<u8> {
138+
todo!()
139+
}
140+
}

parquet/src/encryption/encryption.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
use std::sync::Arc;
20+
use crate::encryption::ciphers::{RingGcmBlockEncryptor, BlockEncryptor};
21+
22+
#[derive(Debug, Clone)]
23+
pub struct FileEncryptionProperties {
24+
encrypt_footer: bool,
25+
footer_key: Vec<u8>,
26+
column_keys: Option<HashMap<Vec<u8>, Vec<u8>>>,
27+
aad_prefix: Option<Vec<u8>>,
28+
}
29+
30+
impl FileEncryptionProperties {
31+
pub fn builder(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder {
32+
EncryptionPropertiesBuilder::new(footer_key)
33+
}
34+
}
35+
36+
pub struct EncryptionPropertiesBuilder {
37+
footer_key: Vec<u8>,
38+
column_keys: Option<HashMap<Vec<u8>, Vec<u8>>>,
39+
aad_prefix: Option<Vec<u8>>,
40+
}
41+
42+
impl EncryptionPropertiesBuilder {
43+
pub fn new(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder {
44+
Self {
45+
footer_key,
46+
column_keys: None,
47+
aad_prefix: None,
48+
}
49+
}
50+
51+
pub fn build(self) -> crate::errors::Result<FileEncryptionProperties> {
52+
Ok(FileEncryptionProperties {
53+
encrypt_footer: true,
54+
footer_key: self.footer_key,
55+
column_keys: self.column_keys,
56+
aad_prefix: self.aad_prefix,
57+
})
58+
}
59+
}
60+
61+
#[derive(Clone, Debug)]
62+
pub struct FileEncryptor {
63+
encryption_properties: FileEncryptionProperties,
64+
footer_encryptor: Option<Arc<dyn BlockEncryptor>>,
65+
file_aad: Vec<u8>,
66+
}
67+
68+
impl FileEncryptor {
69+
pub(crate) fn new(
70+
encryption_properties: FileEncryptionProperties,
71+
aad_file_unique: Vec<u8>,
72+
aad_prefix: Vec<u8>,
73+
) -> Self {
74+
let file_aad = [aad_prefix.as_slice(), aad_file_unique.as_slice()].concat();
75+
let footer_encryptor = RingGcmBlockEncryptor::new(&encryption_properties.footer_key);
76+
Self {
77+
encryption_properties,
78+
footer_encryptor: Some(Arc::new(footer_encryptor)),
79+
file_aad,
80+
}
81+
}
82+
}

parquet/src/encryption/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@
2020
2121
pub mod ciphers;
2222
pub mod decryption;
23+
pub mod encryption;
2324
pub mod modules;

parquet/src/file/properties.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
//! Configuration via [`WriterProperties`] and [`ReaderProperties`]
1919
use crate::basic::{Compression, Encoding};
2020
use crate::compression::{CodecOptions, CodecOptionsBuilder};
21+
#[cfg(feature = "encryption")]
22+
use crate::encryption::encryption::FileEncryptionProperties;
2123
use crate::file::metadata::KeyValue;
2224
use crate::format::SortingColumn;
2325
use crate::schema::types::ColumnPath;
@@ -372,6 +374,11 @@ impl WriterProperties {
372374
.and_then(|c| c.bloom_filter_properties())
373375
.or_else(|| self.default_column_properties.bloom_filter_properties())
374376
}
377+
378+
#[cfg(feature = "encryption")]
379+
pub fn file_encryption_properties(&self) -> Option<&FileEncryptionProperties> {
380+
self.file_encryption_properties.as_ref()
381+
}
375382
}
376383

377384
/// Builder for [`WriterProperties`] parquet writer configuration.
@@ -394,6 +401,8 @@ pub struct WriterPropertiesBuilder {
394401
column_index_truncate_length: Option<usize>,
395402
statistics_truncate_length: Option<usize>,
396403
coerce_types: bool,
404+
#[cfg(feature = "encryption")]
405+
file_encryption_properties: Option<FileEncryptionProperties>,
397406
}
398407

399408
impl WriterPropertiesBuilder {
@@ -416,6 +425,8 @@ impl WriterPropertiesBuilder {
416425
column_index_truncate_length: DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
417426
statistics_truncate_length: DEFAULT_STATISTICS_TRUNCATE_LENGTH,
418427
coerce_types: DEFAULT_COERCE_TYPES,
428+
#[cfg(feature = "encryption")]
429+
file_encryption_properties: None,
419430
}
420431
}
421432

@@ -438,6 +449,8 @@ impl WriterPropertiesBuilder {
438449
column_index_truncate_length: self.column_index_truncate_length,
439450
statistics_truncate_length: self.statistics_truncate_length,
440451
coerce_types: self.coerce_types,
452+
#[cfg(feature = "encryption")]
453+
file_encryption_properties: self.file_encryption_properties,
441454
}
442455
}
443456

@@ -810,6 +823,16 @@ impl WriterPropertiesBuilder {
810823
self.coerce_types = coerce_types;
811824
self
812825
}
826+
827+
/// Sets FileEncryptionProperties.
828+
#[cfg(feature = "encryption")]
829+
pub fn with_file_encryption_properties(
830+
mut self,
831+
file_encryption_properties: FileEncryptionProperties,
832+
) -> Self {
833+
self.file_encryption_properties = Some(file_encryption_properties);
834+
self
835+
}
813836
}
814837

815838
/// Controls the level of statistics to be computed by the writer and stored in

parquet/src/file/writer.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ use crate::column::{
3333
writer::{get_column_writer, ColumnWriter},
3434
};
3535
use crate::data_type::DataType;
36+
use crate::encryption::ciphers::RingGcmBlockEncryptor;
37+
use crate::encryption::encryption::FileEncryptor;
3638
use crate::errors::{ParquetError, Result};
3739
use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
3840
use crate::file::reader::ChunkReader;
@@ -523,6 +525,9 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
523525
) -> Result<C>,
524526
{
525527
self.assert_previous_writer_closed()?;
528+
let file_encryption_properties = self.props.file_encryption_properties();
529+
let file_encryptor =
530+
FileEncryptor::new(file_encryption_properties.unwrap().clone(), vec![], vec![]);
526531
Ok(match self.next_column_desc() {
527532
Some(column) => {
528533
let props = self.props.clone();

0 commit comments

Comments
 (0)