Skip to content

Commit 6f055f9

Browse files
committed
Use ParquetMetaDataReader
1 parent 9f663ba commit 6f055f9

File tree

5 files changed

+127
-324
lines changed

5 files changed

+127
-324
lines changed

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 33 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@ mod filter;
4242
mod selection;
4343
pub mod statistics;
4444

45-
use crate::file::footer;
46-
use crate::file::page_index::index_reader;
4745
use crate::encryption::ciphers::FileDecryptionProperties;
4846

4947
/// Builder for constructing parquet readers into arrow.
@@ -373,35 +371,6 @@ pub struct ArrowReaderMetadata {
373371
}
374372

375373
impl ArrowReaderMetadata {
376-
/// Loads [`ArrowReaderMetadata`] from the provided [`ChunkReader`]
377-
///
378-
/// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for how this can be used
379-
pub fn load2<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
380-
Self::load_with_decryption(reader, options, FileDecryptionProperties::builder().build())
381-
}
382-
383-
pub fn load_with_decryption<T: ChunkReader>(reader: &T, options: ArrowReaderOptions,
384-
file_decryption_properties: FileDecryptionProperties) -> Result<Self> {
385-
let mut metadata = footer::parse_metadata_with_decryption(reader, file_decryption_properties)?;
386-
if options.page_index {
387-
let column_index = metadata
388-
.row_groups()
389-
.iter()
390-
.map(|rg| index_reader::read_columns_indexes(reader, rg.columns()))
391-
.collect::<Result<Vec<_>>>()?;
392-
metadata.set_column_index(Some(column_index));
393-
394-
let offset_index = metadata
395-
.row_groups()
396-
.iter()
397-
.map(|rg| index_reader::read_offset_indexes(reader, rg.columns()))
398-
.collect::<Result<Vec<_>>>()?;
399-
400-
metadata.set_offset_index(Some(offset_index))
401-
}
402-
Self::try_new(Arc::new(metadata), options)
403-
}
404-
405374
/// Loads [`ArrowReaderMetadata`] from the provided [`ChunkReader`], if necessary
406375
///
407376
/// See [`ParquetRecordBatchReaderBuilder::new_with_metadata`] for an
@@ -412,9 +381,14 @@ impl ArrowReaderMetadata {
412381
/// If `options` has [`ArrowReaderOptions::with_page_index`] true, but
413382
/// `Self::metadata` is missing the page index, this function will attempt
414383
/// to load the page index by making an object store request.
415-
pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
384+
pub fn load<T: ChunkReader>(
385+
reader: &T,
386+
options: ArrowReaderOptions,
387+
file_decryption_properties: Option<FileDecryptionProperties>,
388+
) -> Result<Self> {
416389
let metadata = ParquetMetaDataReader::new()
417390
.with_page_indexes(options.page_index)
391+
.with_encryption_properties(file_decryption_properties)
418392
.parse_and_finish(reader)?;
419393
Self::try_new(Arc::new(metadata), options)
420394
}
@@ -561,12 +535,16 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
561535

562536
/// Create a new [`ParquetRecordBatchReaderBuilder`] with [`ArrowReaderOptions`]
563537
pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
564-
let metadata = ArrowReaderMetadata::load(&reader, options)?;
538+
let metadata = ArrowReaderMetadata::load(&reader, options, None)?;
565539
Ok(Self::new_with_metadata(reader, metadata))
566540
}
567541

568-
pub fn try_new_with_decryption(reader: T, options: ArrowReaderOptions, file_decryption_properties: FileDecryptionProperties) -> Result<Self> {
569-
let metadata = ArrowReaderMetadata::load_with_decryption(&reader, options, file_decryption_properties)?;
542+
pub fn try_new_with_decryption(
543+
reader: T,
544+
options: ArrowReaderOptions,
545+
file_decryption_properties: Option<FileDecryptionProperties>,
546+
) -> Result<Self> {
547+
let metadata = ArrowReaderMetadata::load(&reader, options, file_decryption_properties)?;
570548
Ok(Self::new_with_metadata(reader, metadata))
571549
}
572550

@@ -826,11 +804,18 @@ impl ParquetRecordBatchReader {
826804
.build()
827805
}
828806

829-
pub fn try_new_with_decryption<T: ChunkReader + 'static>(reader: T, batch_size: usize,
830-
file_decryption_properties: FileDecryptionProperties) -> Result<Self> {
831-
ParquetRecordBatchReaderBuilder::try_new_with_decryption(reader, Default::default(), file_decryption_properties)?
832-
.with_batch_size(batch_size)
833-
.build()
807+
pub fn try_new_with_decryption<T: ChunkReader + 'static>(
808+
reader: T,
809+
batch_size: usize,
810+
file_decryption_properties: Option<FileDecryptionProperties>,
811+
) -> Result<Self> {
812+
ParquetRecordBatchReaderBuilder::try_new_with_decryption(
813+
reader,
814+
Default::default(),
815+
file_decryption_properties,
816+
)?
817+
.with_batch_size(batch_size)
818+
.build()
834819
}
835820

836821
/// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
@@ -1719,10 +1704,14 @@ mod tests {
17191704
// todo
17201705
let key_code: &[u8] = "0123456789012345".as_bytes();
17211706
// todo
1722-
let decryption_properties = ciphers::FileDecryptionProperties::builder()
1723-
.with_footer_key(key_code.to_vec())
1724-
.build();
1725-
let record_reader = ParquetRecordBatchReader::try_new_with_decryption(file, 128, decryption_properties).unwrap();
1707+
let decryption_properties = Some(
1708+
ciphers::FileDecryptionProperties::builder()
1709+
.with_footer_key(key_code.to_vec())
1710+
.build(),
1711+
);
1712+
let record_reader =
1713+
ParquetRecordBatchReader::try_new_with_decryption(file, 128, decryption_properties)
1714+
.unwrap();
17261715
// todo check contents
17271716
}
17281717

parquet/src/arrow/async_reader/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,10 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
205205
let mut buf = Vec::with_capacity(metadata_len);
206206
self.take(metadata_len as _).read_to_end(&mut buf).await?;
207207

208-
Ok(Arc::new(ParquetMetaDataReader::decode_metadata(&buf)?))
208+
// TODO: add self.file_decryption_properties
209+
Ok(Arc::new(ParquetMetaDataReader::decode_metadata(
210+
&buf, None,
211+
)?))
209212
}
210213
.boxed()
211214
}

parquet/src/encryption/ciphers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ pub fn create_module_aad(file_aad: &[u8], module_type: ModuleType, row_group_ord
226226
Ok(aad)
227227
}
228228

229+
#[derive(Clone)]
229230
pub struct FileDecryptionProperties {
230231
footer_key: Option<Vec<u8>>
231232
}

0 commit comments

Comments
 (0)