Skip to content

Commit 3bc9987

Browse files
authored
Deprecate read_page_locations() and simplify offset index in ParquetMetaData (#6095)
* deprecate read_page_locations * add to_thrift() to OffsetIndexMetaData
1 parent 81c34ac commit 3bc9987

File tree

13 files changed

+118
-113
lines changed

13 files changed

+118
-113
lines changed

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ impl ArrowReaderMetadata {
394394
let offset_index = metadata
395395
.row_groups()
396396
.iter()
397-
.map(|rg| index_reader::read_pages_locations(reader, rg.columns()))
397+
.map(|rg| index_reader::read_offset_indexes(reader, rg.columns()))
398398
.collect::<Result<Vec<_>>>()?;
399399

400400
metadata.set_offset_index(Some(offset_index))
@@ -689,7 +689,7 @@ impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
689689
// To avoid `i[rg_idx][self.oolumn_idx`] panic, we need to filter out empty `i[rg_idx]`.
690690
let page_locations = offset_index
691691
.filter(|i| !i[rg_idx].is_empty())
692-
.map(|i| i[rg_idx][self.column_idx].clone());
692+
.map(|i| i[rg_idx][self.column_idx].page_locations.clone());
693693
let total_rows = rg.num_rows() as usize;
694694
let reader = self.reader.clone();
695695

parquet/src/arrow/arrow_reader/statistics.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1349,7 +1349,9 @@ impl<'a> StatisticsConverter<'a> {
13491349
let iter = row_group_indices.into_iter().map(|rg_index| {
13501350
let column_page_index_per_row_group_per_column =
13511351
&column_page_index[*rg_index][parquet_index];
1352-
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();
1352+
let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1353+
.page_locations()
1354+
.len();
13531355

13541356
(*num_data_pages, column_page_index_per_row_group_per_column)
13551357
});
@@ -1378,7 +1380,9 @@ impl<'a> StatisticsConverter<'a> {
13781380
let iter = row_group_indices.into_iter().map(|rg_index| {
13791381
let column_page_index_per_row_group_per_column =
13801382
&column_page_index[*rg_index][parquet_index];
1381-
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();
1383+
let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1384+
.page_locations()
1385+
.len();
13821386

13831387
(*num_data_pages, column_page_index_per_row_group_per_column)
13841388
});
@@ -1408,7 +1412,9 @@ impl<'a> StatisticsConverter<'a> {
14081412
let iter = row_group_indices.into_iter().map(|rg_index| {
14091413
let column_page_index_per_row_group_per_column =
14101414
&column_page_index[*rg_index][parquet_index];
1411-
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();
1415+
let num_data_pages = &column_offset_index[*rg_index][parquet_index]
1416+
.page_locations()
1417+
.len();
14121418

14131419
(*num_data_pages, column_page_index_per_row_group_per_column)
14141420
});
@@ -1450,7 +1456,7 @@ impl<'a> StatisticsConverter<'a> {
14501456

14511457
let mut row_count_total = Vec::new();
14521458
for rg_idx in row_group_indices {
1453-
let page_locations = &column_offset_index[*rg_idx][parquet_index];
1459+
let page_locations = &column_offset_index[*rg_idx][parquet_index].page_locations();
14541460

14551461
let row_count_per_page = page_locations
14561462
.windows(2)

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1096,7 +1096,7 @@ mod tests {
10961096
use crate::data_type::AsBytes;
10971097
use crate::file::metadata::ParquetMetaData;
10981098
use crate::file::page_index::index::Index;
1099-
use crate::file::page_index::index_reader::read_pages_locations;
1099+
use crate::file::page_index::index_reader::read_offset_indexes;
11001100
use crate::file::properties::{
11011101
BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
11021102
};
@@ -1669,16 +1669,16 @@ mod tests {
16691669
"Expected a dictionary page"
16701670
);
16711671

1672-
let page_locations = read_pages_locations(&file, column).unwrap();
1672+
let offset_indexes = read_offset_indexes(&file, column).unwrap();
16731673

1674-
let offset_index = page_locations[0].clone();
1674+
let page_locations = offset_indexes[0].page_locations.clone();
16751675

16761676
// We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
16771677
// so we expect one dictionary encoded page and then a page per row thereafter.
16781678
assert_eq!(
1679-
offset_index.len(),
1679+
page_locations.len(),
16801680
10,
1681-
"Expected 9 pages but got {offset_index:#?}"
1681+
"Expected 9 pages but got {page_locations:#?}"
16821682
);
16831683
}
16841684

@@ -3020,8 +3020,8 @@ mod tests {
30203020

30213021
assert_eq!(index.len(), 1);
30223022
assert_eq!(index[0].len(), 2); // 2 columns
3023-
assert_eq!(index[0][0].len(), 1); // 1 page
3024-
assert_eq!(index[0][1].len(), 1); // 1 page
3023+
assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
3024+
assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
30253025
}
30263026

30273027
#[test]

parquet/src/arrow/async_reader/metadata.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@ use crate::errors::{ParquetError, Result};
2020
use crate::file::footer::{decode_footer, decode_metadata};
2121
use crate::file::metadata::ParquetMetaData;
2222
use crate::file::page_index::index::Index;
23-
use crate::file::page_index::index_reader::{
24-
acc_range, decode_column_index, decode_page_locations,
25-
};
23+
use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
2624
use bytes::Bytes;
2725
use futures::future::BoxFuture;
2826
use futures::FutureExt;
@@ -179,9 +177,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
179177
x.columns()
180178
.iter()
181179
.map(|c| match c.offset_index_range() {
182-
Some(r) => {
183-
decode_page_locations(&data[r.start - offset..r.end - offset])
184-
}
180+
Some(r) => decode_offset_index(&data[r.start - offset..r.end - offset]),
185181
None => Err(general_err!("missing offset index")),
186182
})
187183
.collect::<Result<Vec<_>>>()

parquet/src/arrow/async_reader/mod.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,10 @@ use crate::column::page::{PageIterator, PageReader};
106106
use crate::errors::{ParquetError, Result};
107107
use crate::file::footer::{decode_footer, decode_metadata};
108108
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
109+
use crate::file::page_index::offset_index::OffsetIndexMetaData;
109110
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
110111
use crate::file::FOOTER_SIZE;
111-
use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, PageLocation};
112+
use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
112113

113114
mod metadata;
114115
pub use metadata::*;
@@ -489,7 +490,7 @@ where
489490
// TODO: calling build_array multiple times is wasteful
490491

491492
let meta = self.metadata.row_group(row_group_idx);
492-
let page_locations = self
493+
let offset_index = self
493494
.metadata
494495
.offset_index()
495496
.map(|x| x[row_group_idx].as_slice());
@@ -499,7 +500,7 @@ where
499500
// schema: meta.schema_descr_ptr(),
500501
row_count: meta.num_rows() as usize,
501502
column_chunks: vec![None; meta.columns().len()],
502-
page_locations,
503+
offset_index,
503504
};
504505

505506
if let Some(filter) = self.filter.as_mut() {
@@ -703,7 +704,7 @@ where
703704
/// An in-memory collection of column chunks
704705
struct InMemoryRowGroup<'a> {
705706
metadata: &'a RowGroupMetaData,
706-
page_locations: Option<&'a [Vec<PageLocation>]>,
707+
offset_index: Option<&'a [OffsetIndexMetaData]>,
707708
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
708709
row_count: usize,
709710
}
@@ -716,7 +717,7 @@ impl<'a> InMemoryRowGroup<'a> {
716717
projection: &ProjectionMask,
717718
selection: Option<&RowSelection>,
718719
) -> Result<()> {
719-
if let Some((selection, page_locations)) = selection.zip(self.page_locations) {
720+
if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
720721
// If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
721722
// `RowSelection`
722723
let mut page_start_offsets: Vec<Vec<usize>> = vec![];
@@ -734,14 +735,14 @@ impl<'a> InMemoryRowGroup<'a> {
734735
// then we need to also fetch a dictionary page.
735736
let mut ranges = vec![];
736737
let (start, _len) = chunk_meta.byte_range();
737-
match page_locations[idx].first() {
738+
match offset_index[idx].page_locations.first() {
738739
Some(first) if first.offset as u64 != start => {
739740
ranges.push(start as usize..first.offset as usize);
740741
}
741742
_ => (),
742743
}
743744

744-
ranges.extend(selection.scan_ranges(&page_locations[idx]));
745+
ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
745746
page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
746747

747748
ranges
@@ -812,7 +813,9 @@ impl<'a> RowGroups for InMemoryRowGroup<'a> {
812813
"Invalid column index {i}, column was not fetched"
813814
))),
814815
Some(data) => {
815-
let page_locations = self.page_locations.map(|index| index[i].clone());
816+
let page_locations = self
817+
.offset_index
818+
.map(|index| index[i].page_locations.clone());
816819
let page_reader: Box<dyn PageReader> = Box::new(SerializedPageReader::new(
817820
data.clone(),
818821
self.metadata.column(i),
@@ -1529,7 +1532,7 @@ mod tests {
15291532
let metadata = parse_metadata(&data).unwrap();
15301533

15311534
let offset_index =
1532-
index_reader::read_pages_locations(&data, metadata.row_group(0).columns())
1535+
index_reader::read_offset_indexes(&data, metadata.row_group(0).columns())
15331536
.expect("reading offset index");
15341537

15351538
let row_group_meta = metadata.row_group(0).clone();
@@ -1538,7 +1541,6 @@ mod tests {
15381541
vec![row_group_meta],
15391542
None,
15401543
Some(vec![offset_index.clone()]),
1541-
None,
15421544
);
15431545

15441546
let metadata = Arc::new(metadata);
@@ -1575,7 +1577,7 @@ mod tests {
15751577
};
15761578

15771579
let mut skip = true;
1578-
let mut pages = offset_index[0].iter().peekable();
1580+
let mut pages = offset_index[0].page_locations.iter().peekable();
15791581

15801582
// Setup `RowSelection` so that we can skip every other page, selecting the last page
15811583
let mut selectors = vec![];

parquet/src/bin/parquet-index.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
use clap::Parser;
3838
use parquet::errors::{ParquetError, Result};
3939
use parquet::file::page_index::index::{Index, PageIndex};
40+
use parquet::file::page_index::offset_index::OffsetIndexMetaData;
4041
use parquet::file::reader::{FileReader, SerializedFileReader};
4142
use parquet::file::serialized_reader::ReadOptionsBuilder;
4243
use parquet::format::PageLocation;
@@ -93,7 +94,8 @@ impl Args {
9394
))
9495
})?;
9596

96-
let row_counts = compute_row_counts(offset_index, row_group.num_rows());
97+
let row_counts =
98+
compute_row_counts(offset_index.page_locations.as_slice(), row_group.num_rows());
9799
match &column_indices[column_idx] {
98100
Index::NONE => println!("NO INDEX"),
99101
Index::BOOLEAN(v) => print_index(&v.indexes, offset_index, &row_counts)?,
@@ -131,20 +133,20 @@ fn compute_row_counts(offset_index: &[PageLocation], rows: i64) -> Vec<i64> {
131133
/// Prints index information for a single column chunk
132134
fn print_index<T: std::fmt::Display>(
133135
column_index: &[PageIndex<T>],
134-
offset_index: &[PageLocation],
136+
offset_index: &OffsetIndexMetaData,
135137
row_counts: &[i64],
136138
) -> Result<()> {
137-
if column_index.len() != offset_index.len() {
139+
if column_index.len() != offset_index.page_locations.len() {
138140
return Err(ParquetError::General(format!(
139141
"Index length mismatch, got {} and {}",
140142
column_index.len(),
141-
offset_index.len()
143+
offset_index.page_locations.len()
142144
)));
143145
}
144146

145147
for (idx, ((c, o), row_count)) in column_index
146148
.iter()
147-
.zip(offset_index)
149+
.zip(offset_index.page_locations())
148150
.zip(row_counts)
149151
.enumerate()
150152
{

parquet/src/file/metadata/memory.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::data_type::private::ParquetValueType;
2323
use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, KeyValue, RowGroupMetaData};
2424
use crate::file::page_encoding_stats::PageEncodingStats;
2525
use crate::file::page_index::index::{Index, NativeIndex, PageIndex};
26+
use crate::file::page_index::offset_index::OffsetIndexMetaData;
2627
use crate::file::statistics::{Statistics, ValueStatistics};
2728
use crate::format::{BoundaryOrder, PageLocation, SortingColumn};
2829
use std::sync::Arc;
@@ -144,6 +145,12 @@ impl HeapSize for Statistics {
144145
}
145146
}
146147

148+
impl HeapSize for OffsetIndexMetaData {
149+
fn heap_size(&self) -> usize {
150+
self.page_locations.heap_size() + self.unencoded_byte_array_data_bytes.heap_size()
151+
}
152+
}
153+
147154
impl HeapSize for Index {
148155
fn heap_size(&self) -> usize {
149156
match self {

0 commit comments

Comments
 (0)