Skip to content

Commit

Permalink
perf: process parquet statistics before downloading row-group
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 13, 2023
1 parent 57139eb commit c754a4c
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 36 deletions.
41 changes: 19 additions & 22 deletions crates/nano-arrow/src/io/parquet/read/statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,34 +544,31 @@ fn push(
}
}

/// Deserializes the statistics in the column chunks from all `row_groups`
/// Deserializes the statistics in the column chunks from a single `row_group`
/// into [`Statistics`] associated from `field`'s name.
///
/// # Errors
/// This function errors if the deserialization of the statistics fails (e.g. invalid utf8)
pub fn deserialize(field: &Field, row_groups: &[RowGroupMetaData]) -> Result<Statistics> {
pub fn deserialize(field: &Field, row_group: &RowGroupMetaData) -> Result<Statistics> {
let mut statistics = MutableStatistics::try_new(field)?;

// transpose
row_groups.iter().try_for_each(|group| {
let columns = get_field_columns(group.columns(), field.name.as_ref());
let mut stats = columns
.into_iter()
.map(|column| {
Ok((
column.statistics().transpose()?,
column.descriptor().descriptor.primitive_type.clone(),
))
})
.collect::<Result<VecDeque<(Option<_>, ParquetPrimitiveType)>>>()?;
push(
&mut stats,
statistics.min_value.as_mut(),
statistics.max_value.as_mut(),
statistics.distinct_count.as_mut(),
statistics.null_count.as_mut(),
)
})?;
let columns = get_field_columns(row_group.columns(), field.name.as_ref());
let mut stats = columns
.into_iter()
.map(|column| {
Ok((
column.statistics().transpose()?,
column.descriptor().descriptor.primitive_type.clone(),
))
})
.collect::<Result<VecDeque<(Option<_>, ParquetPrimitiveType)>>>()?;
push(
&mut stats,
statistics.min_value.as_mut(),
statistics.max_value.as_mut(),
statistics.distinct_count.as_mut(),
statistics.null_count.as_mut(),
)?;

Ok(statistics.into())
}
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl FetchRowGroupsFromObjectStore {
row_groups: Range<usize>,
) -> PolarsResult<ColumnStore> {
// Fetch the required row groups.
let row_groups = &self
let row_groups = self
.row_groups_metadata
.get(row_groups.clone())
.map_or_else(
Expand Down
15 changes: 4 additions & 11 deletions crates/polars-io/src/parquet/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,14 @@ impl ColumnStats {

/// Collect the statistics in a column chunk.
pub(crate) fn collect_statistics(
md: &[RowGroupMetaData],
md: &RowGroupMetaData,
arrow_schema: &ArrowSchema,
rg: Option<usize>,
) -> ArrowResult<Option<BatchStats>> {
let mut schema = Schema::with_capacity(arrow_schema.fields.len());
let mut stats = vec![];

for fld in &arrow_schema.fields {
// note that we only select a single row group.
let st = match rg {
None => deserialize(fld, md)?,
// we select a single row group and collect only those stats
Some(rg) => deserialize(fld, &md[rg..rg + 1])?,
};
let st = deserialize(fld, md)?;
schema.with_column((&fld.name).into(), (&fld.data_type).into());
stats.push(ColumnStats::from_arrow_stats(st, fld));
}
Expand All @@ -45,13 +39,12 @@ pub(crate) fn collect_statistics(

pub(super) fn read_this_row_group(
predicate: Option<&Arc<dyn PhysicalIoExpr>>,
file_metadata: &arrow::io::parquet::read::FileMetaData,
md: &RowGroupMetaData,
schema: &ArrowSchema,
rg: usize,
) -> PolarsResult<bool> {
if let Some(pred) = &predicate {
if let Some(pred) = pred.as_stats_evaluator() {
if let Some(stats) = collect_statistics(&file_metadata.row_groups, schema, Some(rg))? {
if let Some(stats) = collect_statistics(md, schema)? {
let should_read = pred.should_read(&stats);
// a parquet file may not have statistics of all columns
if matches!(should_read, Ok(false)) {
Expand Down
10 changes: 8 additions & 2 deletions crates/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ fn rg_to_dfs_optionally_par_over_columns(
let md = &file_metadata.row_groups[rg];
let current_row_count = md.num_rows() as IdxSize;

if use_statistics && !read_this_row_group(predicate.as_ref(), file_metadata, schema, rg)? {
if use_statistics
&& !read_this_row_group(predicate.as_ref(), &file_metadata.row_groups[rg], schema)?
{
*previous_row_count += current_row_count;
continue;
}
Expand Down Expand Up @@ -273,7 +275,11 @@ fn rg_to_dfs_par_over_rg(
.map(|(rg_idx, md, local_limit, row_count_start)| {
if local_limit == 0
|| use_statistics
&& !read_this_row_group(predicate.as_ref(), file_metadata, schema, rg_idx)?
&& !read_this_row_group(
predicate.as_ref(),
&file_metadata.row_groups[rg_idx],
schema,
)?
{
return Ok(None);
}
Expand Down

0 comments on commit c754a4c

Please sign in to comment.