Skip to content

Commit

Permalink
feat: add ParquetEncryptionMode, to permit unencrypted files in initi…
Browse files Browse the repository at this point in the history
…al key rotation
  • Loading branch information
srh committed Sep 11, 2024
1 parent 9d6173c commit b059926
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 22 deletions.
23 changes: 18 additions & 5 deletions parquet/src/file/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,41 @@ pub struct ParquetEncryptionKeyInfo {
pub key: ParquetEncryptionKey,
}

/// Tells what mode (and also the key value(s)) a file is to be encrypted in (when writing) or is
/// permitted to be encrypted in (when reading).
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum ParquetEncryptionMode {
/// Means the file is unencrypted
Unencrypted,
/// Means the file is footer-encrypted -- well, fully-encrypted. The same key is used for all
/// the columns too, in this implementation.
FooterEncrypted(ParquetEncryptionKeyInfo),
}

/// Describes general parquet encryption configuration -- new files are encrypted with the
/// write_key(), but old files can be decrypted with any of the valid read keys.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ParquetEncryptionConfig {
// The last key is the write key, and all the keys are valid read keys.
keys: Vec<ParquetEncryptionKeyInfo>,
// The last mode is the write mode (i.e. it has the write key), and all the prior modes are
// valid read modes (i.e. valid read keys, or Unencrypted mode, if a user turned on encryption
// but hasn't key-rotated unencrypted files away yet).
keys: Vec<ParquetEncryptionMode>,
}

impl ParquetEncryptionConfig {
pub fn new(keys: Vec<ParquetEncryptionKeyInfo>) -> Option<ParquetEncryptionConfig> {
pub fn new(keys: Vec<ParquetEncryptionMode>) -> Option<ParquetEncryptionConfig> {
if keys.is_empty() {
None
} else {
Some(ParquetEncryptionConfig{ keys })
}
}

pub fn write_key(&self) -> &ParquetEncryptionKeyInfo {
pub fn write_key(&self) -> &ParquetEncryptionMode {
self.keys.last().unwrap()
}

pub fn read_keys(&self) -> &[ParquetEncryptionKeyInfo] {
pub fn read_keys(&self) -> &[ParquetEncryptionMode] {
self.keys.as_slice()
}
}
Expand Down
47 changes: 33 additions & 14 deletions parquet/src/file/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ use crate::file::{

use crate::schema::types::{self, SchemaDescriptor};

use crate::file::{encryption::{decrypt_module, parquet_magic, ParquetEncryptionConfig, PARQUET_KEY_HASH_LENGTH, ParquetEncryptionKey, ParquetEncryptionKeyInfo, RandomFileIdentifier, AAD_FILE_UNIQUE_SIZE}, PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE, PARQUET_MAGIC_UNSUPPORTED_PARE};
use crate::file::{encryption::{decrypt_module, ParquetEncryptionConfig, ParquetEncryptionMode,
ParquetEncryptionKey,
PARQUET_KEY_HASH_LENGTH, RandomFileIdentifier, AAD_FILE_UNIQUE_SIZE},
PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE, PARQUET_MAGIC_UNSUPPORTED_PARE};

fn select_key(encryption_config: &ParquetEncryptionConfig, key_metadata: &Option<Vec<u8>>) -> Result<ParquetEncryptionKey> {
if let Some(key_id) = key_metadata {
Expand All @@ -44,10 +47,15 @@ fn select_key(encryption_config: &ParquetEncryptionConfig, key_metadata: &Option
}
let mut key_id_arr = [0u8; PARQUET_KEY_HASH_LENGTH];
key_id_arr.copy_from_slice(&key_id);
let read_keys: &[ParquetEncryptionKeyInfo] = encryption_config.read_keys();
for key_info in read_keys {
if key_info.key.compute_key_hash() == key_id_arr {
return Ok(key_info.key)
let read_keys: &[ParquetEncryptionMode] = encryption_config.read_keys();
for mode in read_keys {
match mode {
ParquetEncryptionMode::Unencrypted => { },
ParquetEncryptionMode::FooterEncrypted(key_info) => {
if key_info.key.compute_key_hash() == key_id_arr {
return Ok(key_info.key)
}
}
}
}
return Err(general_err!("Parquet file is encrypted with an unknown or out-of-rotation key"));
Expand Down Expand Up @@ -81,18 +89,28 @@ pub fn parse_metadata<R: ChunkReader>(chunk_reader: &R, encryption_config: &Opti
default_end_reader.read_exact(&mut default_len_end_buf)?;

// check this is indeed a parquet file
let encrypted_footer: bool;
{
// and check that its encryption setting conceivably matches our encryption_config (but without yet checking keys)
let trailing_magic: &[u8] = &default_len_end_buf[default_end_len - 4..];
if trailing_magic != parquet_magic(encryption_config.is_some()) {
if trailing_magic == PARQUET_MAGIC {
return Err(general_err!("Invalid Parquet file in encrypted mode. File (or at least the Parquet footer) is not encrypted"));
} else if trailing_magic == PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE {
if trailing_magic == PARQUET_MAGIC {
if let Some(config) = encryption_config {
if !config.read_keys().iter().any(|m| matches!(m, ParquetEncryptionMode::Unencrypted)) {
return Err(general_err!("Invalid Parquet file in encrypted mode. File (or at least the Parquet footer) is not encrypted"));
}
}
encrypted_footer = false;
} else if trailing_magic == PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE {
let has_keys = encryption_config.as_ref().map_or(false,
|config| config.read_keys().iter().any(|m| matches!(m, ParquetEncryptionMode::FooterEncrypted(_))));
if !has_keys {
return Err(general_err!("Invalid Parquet file in unencrypted mode. File is encrypted"));
} else if trailing_magic == PARQUET_MAGIC_UNSUPPORTED_PARE {
return Err(general_err!("Unsupported Parquet file. File is encrypted with the standard PARE encryption format"));
} else {
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
}
encrypted_footer = true;
} else if trailing_magic == PARQUET_MAGIC_UNSUPPORTED_PARE {
return Err(general_err!("Unsupported Parquet file. File is encrypted with the standard PARE encryption format"));
} else {
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
}
}

Expand Down Expand Up @@ -135,7 +153,8 @@ pub fn parse_metadata<R: ChunkReader>(chunk_reader: &R, encryption_config: &Opti
let returned_encryption_key: Option<ParquetEncryptionKey>;

let random_file_identifier: Option<RandomFileIdentifier>;
if let Some(encryption_config) = encryption_config {
if encrypted_footer {
let encryption_config: &ParquetEncryptionConfig = encryption_config.as_ref().unwrap();
let file_crypto_metadata = {
let mut prot = TCompactInputProtocol::new(&mut metadata_read);
TFileCryptoMetaData::read_from_in_protocol(&mut prot)
Expand Down
8 changes: 5 additions & 3 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ mod tests {
use crate::basic::{Compression, Encoding, IntType, LogicalType, Repetition, Type};
use crate::column::page::PageReader;
use crate::compression::{create_codec, Codec};
use crate::file::encryption::{generate_random_file_identifier, ParquetEncryptionConfig, ParquetEncryptionKeyInfo};
use crate::file::encryption::{generate_random_file_identifier, ParquetEncryptionConfig, ParquetEncryptionKeyInfo, ParquetEncryptionMode};
use crate::file::reader::Length;
use crate::file::{PARQUET_MAGIC, PARQUET_MAGIC_ENCRYPTED_FOOTER_CUBE};
use crate::file::{
Expand Down Expand Up @@ -1209,7 +1209,8 @@ mod tests {

file_writer.close().unwrap();

let encryption_config = encryption_info.map(|(key_info, _)| ParquetEncryptionConfig::new(vec![key_info]).unwrap());
let encryption_config = encryption_info.map(|(key_info, _)|
ParquetEncryptionConfig::new(vec![ParquetEncryptionMode::FooterEncrypted(key_info)]).unwrap());
let reader = assert_send(SerializedFileReader::new_maybe_encrypted(file, &encryption_config).unwrap());
assert_eq!(reader.num_row_groups(), data.len());
assert_eq!(
Expand Down Expand Up @@ -1303,7 +1304,8 @@ mod tests {
let buffer = cursor.into_inner().unwrap();

let reading_cursor = crate::file::serialized_reader::SliceableCursor::new(buffer);
let encryption_config = encryption_info.map(|(key_info, _)| ParquetEncryptionConfig::new(vec![key_info]).unwrap());
let encryption_config = encryption_info
.map(|(key_info, _)| ParquetEncryptionConfig::new(vec![ParquetEncryptionMode::FooterEncrypted(key_info)]).unwrap());
let reader = SerializedFileReader::new_maybe_encrypted(reading_cursor, &encryption_config).unwrap();

assert_eq!(reader.num_row_groups(), data.len());
Expand Down

0 comments on commit b059926

Please sign in to comment.