Skip to content

Fix: StatisticsConverter counts for missing columns #10946

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
self.statistics_converter(column)
.and_then(|c| c.row_group_null_counts(self.metadata_iter()))
.ok()
.map(|counts| Arc::new(counts) as ArrayRef)
}

fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
Expand Down
43 changes: 21 additions & 22 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ where
/// of parquet page [`Index`]'es to an [`ArrayRef`]
///
/// The returned Array is an [`UInt64Array`]
pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result<ArrayRef>
pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result<UInt64Array>
where
I: Iterator<Item = (usize, &'a Index)>,
{
Expand All @@ -674,7 +674,7 @@ where
_ => unimplemented!(),
});

Ok(Arc::new(UInt64Array::from_iter(iter)))
Ok(UInt64Array::from_iter(iter))
}

/// Extracts Parquet statistics as Arrow arrays
Expand Down Expand Up @@ -868,21 +868,22 @@ impl<'a> StatisticsConverter<'a> {
/// Extract the null counts from row group statistics in [`RowGroupMetaData`]
///
/// See docs on [`Self::row_group_mins`] for details
pub fn row_group_null_counts<I>(&self, metadatas: I) -> Result<ArrayRef>
pub fn row_group_null_counts<I>(&self, metadatas: I) -> Result<UInt64Array>
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
return Ok(self.make_null_array(data_type, metadatas));
let num_row_groups = metadatas.into_iter().count();
return Ok(UInt64Array::from_iter(
std::iter::repeat(None).take(num_row_groups),
));
};

let null_counts = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics())
.map(|s| s.map(|s| s.null_count()));
Ok(Arc::new(UInt64Array::from_iter(null_counts)))
Ok(UInt64Array::from_iter(null_counts))
}

/// Extract the minimum values from Data Page statistics.
Expand Down Expand Up @@ -1001,14 +1002,15 @@ impl<'a> StatisticsConverter<'a> {
column_page_index: &ParquetColumnIndex,
column_offset_index: &ParquetOffsetIndex,
row_group_indices: I,
) -> Result<ArrayRef>
) -> Result<UInt64Array>
where
I: IntoIterator<Item = &'a usize>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
return Ok(self.make_null_array(data_type, row_group_indices));
let num_row_groups = row_group_indices.into_iter().count();
return Ok(UInt64Array::from_iter(
std::iter::repeat(None).take(num_row_groups),
));
};

let iter = row_group_indices.into_iter().map(|rg_index| {
Expand Down Expand Up @@ -1041,21 +1043,19 @@ impl<'a> StatisticsConverter<'a> {
pub fn data_page_row_counts<I>(
&self,
column_offset_index: &ParquetOffsetIndex,
row_group_metadatas: &[RowGroupMetaData],
row_group_metadatas: &'a [RowGroupMetaData],
row_group_indices: I,
) -> Result<ArrayRef>
) -> Result<Option<UInt64Array>>
where
I: IntoIterator<Item = &'a usize>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
return Ok(self.make_null_array(data_type, row_group_indices));
// no matching column found in parquet_index;
// thus we cannot extract page_locations in order to determine
// the row count on a per DataPage basis.
return Ok(None);
};

// `offset_index[row_group_number][column_number][page_number]` holds
// the [`PageLocation`] corresponding to page `page_number` of column
// `column_number`of row group `row_group_number`.
let mut row_count_total = Vec::new();
for rg_idx in row_group_indices {
let page_locations = &column_offset_index[*rg_idx][parquet_index];
Expand All @@ -1064,9 +1064,8 @@ impl<'a> StatisticsConverter<'a> {
Some(loc[1].first_row_index as u64 - loc[0].first_row_index as u64)
});

let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows();

// append the last page row count
let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows();
let row_count_per_page = row_count_per_page
.chain(std::iter::once(Some(
*num_rows_in_row_group as u64
Expand All @@ -1077,7 +1076,7 @@ impl<'a> StatisticsConverter<'a> {
row_count_total.extend(row_count_per_page);
}

Ok(Arc::new(UInt64Array::from_iter(row_count_total)))
Ok(Some(UInt64Array::from_iter(row_count_total)))
}

/// Returns a null array of data_type with one element per row group
Expand Down
Loading