Skip to content

Commit

Permalink
Use ParquetMetaData instead of RowGroupMetaData in InMemoryRowGroup. …
Browse files Browse the repository at this point in the history
…Change row_group_ordinal to row_group_idx.
  • Loading branch information
rok committed Mar 11, 2025
1 parent 90434d6 commit 5273244
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 29 deletions.
32 changes: 15 additions & 17 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use crate::bloom_filter::{
};
use crate::column::page::{PageIterator, PageReader};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
use crate::file::FOOTER_SIZE;
Expand Down Expand Up @@ -636,15 +636,13 @@ where
.map(|x| x[row_group_idx].as_slice());

let mut row_group = InMemoryRowGroup {
metadata: meta,
// schema: meta.schema_descr_ptr(),
row_count: meta.num_rows() as usize,
column_chunks: vec![None; meta.columns().len()],
offset_index,
#[cfg(feature = "encryption")]
row_group_ordinal: row_group_idx,
#[cfg(feature = "encryption")]
parquet_metadata: self.metadata.clone(),
row_group_idx,
metadata: self.metadata.as_ref(),
};

if let Some(filter) = self.filter.as_mut() {
Expand Down Expand Up @@ -926,14 +924,12 @@ where

/// An in-memory collection of column chunks
struct InMemoryRowGroup<'a> {
metadata: &'a RowGroupMetaData,
offset_index: Option<&'a [OffsetIndexMetaData]>,
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
row_count: usize,
#[cfg(feature = "encryption")]
row_group_ordinal: usize,
#[cfg(feature = "encryption")]
parquet_metadata: Arc<ParquetMetaData>,
row_group_idx: usize,
metadata: &'a ParquetMetaData,
}

impl InMemoryRowGroup<'_> {
Expand All @@ -944,6 +940,7 @@ impl InMemoryRowGroup<'_> {
projection: &ProjectionMask,
selection: Option<&RowSelection>,
) -> Result<()> {
let metadata = self.metadata.row_group(self.row_group_idx);
if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
// If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
// `RowSelection`
Expand All @@ -952,7 +949,7 @@ impl InMemoryRowGroup<'_> {
let fetch_ranges = self
.column_chunks
.iter()
.zip(self.metadata.columns())
.zip(metadata.columns())
.enumerate()
.filter(|&(idx, (chunk, _chunk_meta))| {
chunk.is_none() && projection.leaf_included(idx)
Expand Down Expand Up @@ -991,7 +988,7 @@ impl InMemoryRowGroup<'_> {
}

*chunk = Some(Arc::new(ColumnChunkData::Sparse {
length: self.metadata.column(idx).byte_range().1 as usize,
length: metadata.column(idx).byte_range().1 as usize,
data: offsets.into_iter().zip(chunks.into_iter()).collect(),
}))
}
Expand All @@ -1003,7 +1000,7 @@ impl InMemoryRowGroup<'_> {
.enumerate()
.filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
.map(|(idx, _chunk)| {
let column = self.metadata.column(idx);
let column = metadata.column(idx);
let (start, length) = column.byte_range();
start as usize..(start + length) as usize
})
Expand All @@ -1018,7 +1015,7 @@ impl InMemoryRowGroup<'_> {

if let Some(data) = chunk_data.next() {
*chunk = Some(Arc::new(ColumnChunkData::Dense {
offset: self.metadata.column(idx).byte_range().0 as usize,
offset: metadata.column(idx).byte_range().0 as usize,
data,
}));
}
Expand All @@ -1037,9 +1034,9 @@ impl RowGroups for InMemoryRowGroup<'_> {
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
#[cfg(feature = "encryption")]
let crypto_context =
if let Some(file_decryptor) = &self.parquet_metadata.clone().file_decryptor().clone() {
if let Some(file_decryptor) = self.metadata.clone().file_decryptor().clone() {
let column_name = &self
.parquet_metadata
.metadata
.clone()
.file_metadata()
.schema_descr()
Expand All @@ -1052,7 +1049,7 @@ impl RowGroups for InMemoryRowGroup<'_> {
file_decryptor.get_column_metadata_decryptor(column_name.name())?;

let crypto_context = CryptoContext::new(
self.row_group_ordinal,
self.row_group_idx,
i,
data_decryptor,
metadata_decryptor,
Expand All @@ -1076,9 +1073,10 @@ impl RowGroups for InMemoryRowGroup<'_> {
// filter out empty offset indexes (old versions specified Some(vec![]) when no present)
.filter(|index| !index.is_empty())
.map(|index| index[i].page_locations.clone());
let metadata = self.metadata.row_group(self.row_group_idx);
let page_reader = SerializedPageReader::new(
data.clone(),
self.metadata.column(i),
metadata.column(i),
self.row_count,
page_locations,
)?;
Expand Down
14 changes: 7 additions & 7 deletions parquet/src/encryption/decrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub fn read_and_decrypt<T: Read>(
// decrypt parquet modules (data pages, dictionary pages, etc.).
#[derive(Debug, Clone)]
pub(crate) struct CryptoContext {
pub(crate) row_group_ordinal: usize,
pub(crate) row_group_idx: usize,
pub(crate) column_ordinal: usize,
pub(crate) page_ordinal: Option<usize>,
pub(crate) dictionary_page: bool,
Expand All @@ -54,14 +54,14 @@ pub(crate) struct CryptoContext {

impl CryptoContext {
pub(crate) fn new(
row_group_ordinal: usize,
row_group_idx: usize,
column_ordinal: usize,
data_decryptor: Arc<dyn BlockDecryptor>,
metadata_decryptor: Arc<dyn BlockDecryptor>,
file_aad: Vec<u8>,
) -> Self {
Self {
row_group_ordinal,
row_group_idx,
column_ordinal,
page_ordinal: None,
dictionary_page: false,
Expand All @@ -73,7 +73,7 @@ impl CryptoContext {

pub(crate) fn with_page_ordinal(&self, page_ordinal: usize) -> Self {
Self {
row_group_ordinal: self.row_group_ordinal,
row_group_idx: self.row_group_idx,
column_ordinal: self.column_ordinal,
page_ordinal: Some(page_ordinal),
dictionary_page: false,
Expand All @@ -93,7 +93,7 @@ impl CryptoContext {
create_module_aad(
self.file_aad(),
module_type,
self.row_group_ordinal,
self.row_group_idx,
self.column_ordinal,
self.page_ordinal,
)
Expand All @@ -109,15 +109,15 @@ impl CryptoContext {
create_module_aad(
self.file_aad(),
module_type,
self.row_group_ordinal,
self.row_group_idx,
self.column_ordinal,
self.page_ordinal,
)
}

pub(crate) fn for_dictionary_page(&self) -> Self {
Self {
row_group_ordinal: self.row_group_ordinal,
row_group_idx: self.row_group_idx,
column_ordinal: self.column_ordinal,
page_ordinal: self.page_ordinal,
dictionary_page: true,
Expand Down
10 changes: 5 additions & 5 deletions parquet/src/encryption/modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub fn create_footer_aad(file_aad: &[u8]) -> crate::errors::Result<Vec<u8>> {
pub(crate) fn create_module_aad(
file_aad: &[u8],
module_type: ModuleType,
row_group_ordinal: usize,
row_group_idx: usize,
column_ordinal: usize,
page_ordinal: Option<usize>,
) -> crate::errors::Result<Vec<u8>> {
Expand All @@ -47,11 +47,11 @@ pub(crate) fn create_module_aad(
return Ok(aad);
}

if row_group_ordinal > i16::MAX as usize {
if row_group_idx > i16::MAX as usize {
return Err(general_err!(
"Encrypted parquet files can't have more than {} row groups: {}",
i16::MAX,
row_group_ordinal
row_group_idx
));
}
if column_ordinal > i16::MAX as usize {
Expand All @@ -68,7 +68,7 @@ pub(crate) fn create_module_aad(
let mut aad = Vec::with_capacity(file_aad.len() + 5);
aad.extend_from_slice(file_aad);
aad.extend_from_slice(module_buf.as_ref());
aad.extend_from_slice((row_group_ordinal as i16).to_le_bytes().as_ref());
aad.extend_from_slice((row_group_idx as i16).to_le_bytes().as_ref());
aad.extend_from_slice((column_ordinal as i16).to_le_bytes().as_ref());
return Ok(aad);
}
Expand All @@ -87,7 +87,7 @@ pub(crate) fn create_module_aad(
let mut aad = Vec::with_capacity(file_aad.len() + 7);
aad.extend_from_slice(file_aad);
aad.extend_from_slice(module_buf.as_ref());
aad.extend_from_slice((row_group_ordinal as i16).to_le_bytes().as_ref());
aad.extend_from_slice((row_group_idx as i16).to_le_bytes().as_ref());
aad.extend_from_slice((column_ordinal as i16).to_le_bytes().as_ref());
aad.extend_from_slice((page_ordinal as i16).to_le_bytes().as_ref());
Ok(aad)
Expand Down

0 comments on commit 5273244

Please sign in to comment.