Skip to content

Commit

Permalink
Handle non-encrypted columns
Browse files Browse the repository at this point in the history
  • Loading branch information
adamreeve committed Mar 6, 2025
1 parent cc58a3a commit 2cf343b
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 36 deletions.
11 changes: 7 additions & 4 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -876,10 +876,13 @@ impl ArrowColumnWriterFactory {
column_descriptor: &ColumnDescPtr,
column_index: usize,
) -> Box<ArrowPageWriter> {
let page_encryptor = self.file_encryptor.as_ref().map(|fe| {
let column_path = column_descriptor.path().string();
PageEncryptor::new(fe.clone(), self.row_group_index, column_index, column_path)
});
let column_path = column_descriptor.path().string();
let page_encryptor = PageEncryptor::create_if_column_encrypted(
&self.file_encryptor,
self.row_group_index,
column_index,
column_path,
);
Box::new(ArrowPageWriter::default().with_encryptor(page_encryptor))
}

Expand Down
18 changes: 15 additions & 3 deletions parquet/src/encryption/encrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::encryption::ciphers::{BlockEncryptor, RingGcmBlockEncryptor};
use crate::errors::Result;
use crate::errors::{ParquetError, Result};
use crate::file::column_crypto_metadata::{ColumnCryptoMetaData, EncryptionWithColumnKey};
use crate::schema::types::ColumnDescPtr;
use crate::thrift::TSerializable;
Expand Down Expand Up @@ -168,12 +168,24 @@ impl FileEncryptor {
&self.aad_file_unique
}

/// Returns whether data for the specified column is encrypted
pub fn is_column_encrypted(&self, column_path: &str) -> bool {
if self.properties.column_keys.is_empty() {
// Uniform encryption
true
} else {
self.properties.column_keys.contains_key(column_path)
}
}

pub(crate) fn get_footer_encryptor(&self) -> Result<Box<dyn BlockEncryptor>> {
Ok(Box::new(RingGcmBlockEncryptor::new(
&self.properties.footer_key.key,
)?))
}

/// Get the encryptor for a column.
/// Will return an error if the column is not an encrypted column.
pub(crate) fn get_column_encryptor(
&self,
column_path: &str,
Expand All @@ -182,7 +194,7 @@ impl FileEncryptor {
return self.get_footer_encryptor();
}
match self.properties.column_keys.get(column_path) {
None => todo!("Handle unencrypted columns"),
None => Err(general_err!("Column '{}' is not encrypted", column_path)),
Some(column_key) => Ok(Box::new(RingGcmBlockEncryptor::new(column_key.key())?)),
}
}
Expand Down Expand Up @@ -223,7 +235,7 @@ pub(crate) fn encrypt_object_to_vec<T: TSerializable>(
}

/// Get the crypto metadata for a column from the file encryption properties
pub fn get_column_crypto_metadata(
pub(crate) fn get_column_crypto_metadata(
properties: &FileEncryptionProperties,
column: &ColumnDescPtr,
) -> Option<ColumnCryptoMetaData> {
Expand Down
23 changes: 14 additions & 9 deletions parquet/src/encryption/page_encryptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,23 @@ pub struct PageEncryptor {
}

impl PageEncryptor {
pub fn new(
file_encryptor: Arc<FileEncryptor>,
pub fn create_if_column_encrypted(
file_encryptor: &Option<Arc<FileEncryptor>>,
row_group_index: usize,
column_index: usize,
column_path: String,
) -> Self {
Self {
file_encryptor,
row_group_index,
column_index,
column_path,
page_index: 0,
) -> Option<Self> {
match file_encryptor {
Some(file_encryptor) if file_encryptor.is_column_encrypted(&column_path) => {
Some(Self {
file_encryptor: file_encryptor.clone(),
row_group_index,
column_index,
column_path,
page_index: 0,
})
}
_ => None,
}
}

Expand Down
30 changes: 18 additions & 12 deletions parquet/src/file/metadata/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
match self.file_encryptor.as_ref() {
#[cfg(feature = "encryption")]
Some(file_encryptor) => {
write_encrypted_column_object(
write_column_object_with_encryption(
offset_index,
&mut self.buf,
file_encryptor,
Expand Down Expand Up @@ -114,7 +114,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
match self.file_encryptor.as_ref() {
#[cfg(feature = "encryption")]
Some(file_encryptor) => {
write_encrypted_column_object(
write_column_object_with_encryption(
column_index,
&mut self.buf,
file_encryptor,
Expand Down Expand Up @@ -546,7 +546,7 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
}

#[cfg(feature = "encryption")]
fn write_encrypted_column_object<T, W>(
fn write_column_object_with_encryption<T, W>(
object: &T,
sink: &mut W,
file_encryptor: &FileEncryptor,
Expand All @@ -559,13 +559,6 @@ where
T: TSerializable,
W: Write,
{
let aad = create_module_aad(
file_encryptor.file_aad(),
module_type,
row_group_index,
column_index,
None,
)?;
let column_path = column_metadata
.meta_data
.as_ref()
Expand All @@ -577,6 +570,19 @@ where
})?
.path_in_schema
.join(".");
let mut encryptor = file_encryptor.get_column_encryptor(&column_path)?;
encrypt_object(object, &mut encryptor, sink, &aad)
if file_encryptor.is_column_encrypted(&column_path) {
let aad = create_module_aad(
file_encryptor.file_aad(),
module_type,
row_group_index,
column_index,
None,
)?;
let mut encryptor = file_encryptor.get_column_encryptor(&column_path)?;
encrypt_object(object, &mut encryptor, sink, &aad)
} else {
let mut protocol = TCompactOutputProtocol::new(sink);
object.write_to_out_protocol(&mut protocol)?;
Ok(())
}
}
14 changes: 6 additions & 8 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,14 +593,12 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
let (buf, on_close) = self.get_on_close();

#[cfg(feature = "encryption")]
let page_encryptor = file_encryptor.map(|file_encryptor| {
PageEncryptor::new(
file_encryptor,
row_group_index,
column_index,
column.path().string(),
)
});
let page_encryptor = PageEncryptor::create_if_column_encrypted(
&file_encryptor,
row_group_index,
column_index,
column.path().string(),
);

#[cfg(feature = "encryption")]
let page_writer =
Expand Down

0 comments on commit 2cf343b

Please sign in to comment.