Skip to content

Fix writing of invalid Parquet ColumnIndex when row group contains null pages #6319

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 69 additions & 8 deletions parquet/src/file/metadata/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,19 +450,57 @@ mod tests {
true,
)]));

let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(2)]));
// build row groups / pages that exercise different combinations of nulls and values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like these test additions, particularly that they build off of the tests added in #6197. They also seem much more thorough than what I proposed 😄 .

// note that below we set the row group and page sizes to 4 and 2 respectively
// so that these "groupings" make sense
let a: ArrayRef = Arc::new(Int32Array::from(vec![
// a row group that has all values
Some(i32::MIN),
Some(-1),
Some(1),
Some(i32::MAX),
// a row group with a page of all nulls and a page of all values
None,
None,
Some(2),
Some(3),
// a row group that has all null pages
None,
None,
None,
None,
// a row group having 1 page with all values and 1 page with some nulls
Some(4),
Some(5),
None,
Some(6),
// a row group having 1 page with all nulls and 1 page with some nulls
None,
None,
Some(7),
None,
// a row group having all pages with some nulls
None,
Some(8),
Some(9),
None,
]));

let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();

let writer_props = match write_page_index {
true => WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.build(),
false => WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk)
.build(),
let writer_props_builder = match write_page_index {
true => WriterProperties::builder().set_statistics_enabled(EnabledStatistics::Page),
false => WriterProperties::builder().set_statistics_enabled(EnabledStatistics::Chunk),
};

// tune the size or pages to the data above
// to make sure we exercise code paths where all items in a page are null, etc.
let writer_props = writer_props_builder
.set_max_row_group_size(4)
.set_data_page_row_count_limit(2)
.set_write_batch_size(2)
Comment on lines +498 to +501
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recognize that this is forcing things a bit. I leaned towards this approach because (1) it felt like a small step from where we are now and (2) I'm not that familiar with the lower level APIs that would be required if we wanted to build a ParquetMetadata by hand and use that for tests instead of relying on file writing to build one for us. I think a reasonable piece of feedback to this PR would be that now is the time to refactor the tests to build the ParquetMetadata by hand since we now care about getting pages of a certain size, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thik this is reasonable

.build();

let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(writer_props)).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
Expand Down Expand Up @@ -610,6 +648,29 @@ mod tests {
decoded_metadata.num_row_groups()
);

// check that the mins and maxes are what we expect for each page
// also indirectly checking that the pages were written out as we expected them to be laid out
// (if they're not, or something gets refactored in the future that breaks that assumption,
// this test may have to drop down to a lower level and create metadata directly instead of relying on
// writing an entire file)
let column_indexes = metadata.metadata.column_index().unwrap();
assert_eq!(column_indexes.len(), 6);
// make sure each row group has 2 pages by checking the first column
// page counts for each column for each row group, should all be the same and there should be
// 12 pages in total across 6 row groups / 1 column
let mut page_counts = vec![];
for row_group in column_indexes {
for column in row_group {
match column {
Index::INT32(column_index) => {
page_counts.push(column_index.indexes.len());
}
_ => panic!("unexpected column index type"),
}
}
}
assert_eq!(page_counts, vec![2; 6]);
Comment on lines +658 to +672
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless we refactor this test to create the metadata directly I think this is a good idea. Previously there was no assertion that we were creating more than 1 page or anything of the sort, since this is testing code that evidently diverges in code paths depending on how many pages there are, etc. it's good to at least have assertions that the data is laid out as we expect it to be.


metadata
.metadata
.row_groups()
Expand Down
10 changes: 4 additions & 6 deletions parquet/src/file/page_index/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,16 +230,14 @@ impl<T: ParquetValueType> NativeIndex<T> {
let min_values = self
.indexes
.iter()
.map(|x| x.min_bytes().map(|x| x.to_vec()))
.collect::<Option<Vec<_>>>()
.unwrap_or_else(|| vec![vec![]; self.indexes.len()]);
.map(|x| x.min_bytes().unwrap_or(&[]).to_vec())
.collect::<Vec<_>>();

let max_values = self
.indexes
.iter()
.map(|x| x.max_bytes().map(|x| x.to_vec()))
.collect::<Option<Vec<_>>>()
.unwrap_or_else(|| vec![vec![]; self.indexes.len()]);
.map(|x| x.max_bytes().unwrap_or(&[]).to_vec())
.collect::<Vec<_>>();

let null_counts = self
.indexes
Expand Down
Loading