Skip to content

Commit

Permalink
perf: process parquet statistics before downloading row-group (#11709)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Oct 13, 2023
1 parent 62baaa9 commit 2e83540
Show file tree
Hide file tree
Showing 12 changed files with 177 additions and 124 deletions.
41 changes: 19 additions & 22 deletions crates/nano-arrow/src/io/parquet/read/statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,34 +544,31 @@ fn push(
}
}

/// Deserializes the statistics in the column chunks from all `row_groups`
/// Deserializes the statistics in the column chunks from a single `row_group`
/// into [`Statistics`] associated from `field`'s name.
///
/// # Errors
/// This function errors if the deserialization of the statistics fails (e.g. invalid utf8)
pub fn deserialize(field: &Field, row_groups: &[RowGroupMetaData]) -> Result<Statistics> {
pub fn deserialize(field: &Field, row_group: &RowGroupMetaData) -> Result<Statistics> {
let mut statistics = MutableStatistics::try_new(field)?;

// transpose
row_groups.iter().try_for_each(|group| {
let columns = get_field_columns(group.columns(), field.name.as_ref());
let mut stats = columns
.into_iter()
.map(|column| {
Ok((
column.statistics().transpose()?,
column.descriptor().descriptor.primitive_type.clone(),
))
})
.collect::<Result<VecDeque<(Option<_>, ParquetPrimitiveType)>>>()?;
push(
&mut stats,
statistics.min_value.as_mut(),
statistics.max_value.as_mut(),
statistics.distinct_count.as_mut(),
statistics.null_count.as_mut(),
)
})?;
let columns = get_field_columns(row_group.columns(), field.name.as_ref());
let mut stats = columns
.into_iter()
.map(|column| {
Ok((
column.statistics().transpose()?,
column.descriptor().descriptor.primitive_type.clone(),
))
})
.collect::<Result<VecDeque<(Option<_>, ParquetPrimitiveType)>>>()?;
push(
&mut stats,
statistics.min_value.as_mut(),
statistics.max_value.as_mut(),
statistics.distinct_count.as_mut(),
statistics.null_count.as_mut(),
)?;

Ok(statistics.into())
}
3 changes: 2 additions & 1 deletion crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ rayon = { workspace = true }
regex = { workspace = true }
reqwest = { workspace = true, optional = true }
ryu = { workspace = true, optional = true }
serde = { workspace = true, features = ["derive"], optional = true }
serde = { workspace = true, features = ["derive", "rc"], optional = true }
serde_json = { version = "1", default-features = false, features = ["alloc", "raw_value"], optional = true }
simd-json = { workspace = true, optional = true }
simdutf8 = { workspace = true, optional = true }
Expand All @@ -65,6 +65,7 @@ json = [
"dtype-struct",
"csv",
]
serde = ["dep:serde", "polars-core/serde-lazy"]
# support for arrows ipc file parsing
ipc = ["arrow/io_ipc", "arrow/io_ipc_compression"]
# support for arrows streaming ipc file parsing
Expand Down
26 changes: 24 additions & 2 deletions crates/polars-io/src/parquet/async_impl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Read parquet files in parallel from the Object Store without a third party crate.
use std::borrow::Cow;
use std::ops::Range;
use std::sync::Arc;

Expand All @@ -19,6 +20,8 @@ use super::cloud::{build_object_store, CloudLocation, CloudReader};
use super::mmap;
use super::mmap::ColumnStore;
use crate::cloud::CloudOptions;
use crate::predicates::PhysicalIoExpr;
use crate::prelude::predicates::read_this_row_group;

pub struct ParquetObjectStore {
store: Arc<dyn ObjectStore>,
Expand Down Expand Up @@ -155,6 +158,8 @@ pub struct FetchRowGroupsFromObjectStore {
reader: Arc<ParquetObjectStore>,
row_groups_metadata: Vec<RowGroupMetaData>,
projected_fields: Vec<SmartString>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
schema: SchemaRef,
logging: bool,
}

Expand All @@ -164,6 +169,7 @@ impl FetchRowGroupsFromObjectStore {
metadata: &FileMetaData,
schema: SchemaRef,
projection: Option<&[usize]>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
) -> PolarsResult<Self> {
let logging = verbose();

Expand All @@ -180,6 +186,8 @@ impl FetchRowGroupsFromObjectStore {
reader: Arc::new(reader),
row_groups_metadata: metadata.row_groups.to_owned(),
projected_fields,
predicate,
schema,
logging,
})
}
Expand All @@ -189,7 +197,7 @@ impl FetchRowGroupsFromObjectStore {
row_groups: Range<usize>,
) -> PolarsResult<ColumnStore> {
// Fetch the required row groups.
let row_groups = &self
let row_groups = self
.row_groups_metadata
.get(row_groups.clone())
.map_or_else(
Expand All @@ -199,9 +207,23 @@ impl FetchRowGroupsFromObjectStore {
Ok,
)?;

let row_groups = if let Some(pred) = self.predicate.as_deref() {
Cow::Owned(
row_groups
.iter()
.filter(|rg| {
matches!(read_this_row_group(Some(pred), rg, &self.schema), Ok(true))
})
.cloned()
.collect::<Vec<_>>(),
)
} else {
Cow::Borrowed(row_groups)
};

// Package in the format required by ColumnStore.
let downloaded =
download_projection(&self.projected_fields, row_groups, &self.reader).await?;
download_projection(&self.projected_fields, &row_groups, &self.reader).await?;

if self.logging {
eprintln!(
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-io/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ mod read;
mod read_impl;
mod write;

use arrow::io::parquet::write::FileMetaData;
pub use read::*;
pub use write::{BrotliLevel, GzipLevel, ZstdLevel, *};

pub type FileMetaDataRef = Arc<FileMetaData>;

use super::*;

#[cfg(test)]
Expand Down
30 changes: 11 additions & 19 deletions crates/polars-io/src/parquet/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,15 @@ impl ColumnStats {

/// Collect the statistics in a column chunk.
pub(crate) fn collect_statistics(
md: &[RowGroupMetaData],
arrow_schema: &ArrowSchema,
rg: Option<usize>,
md: &RowGroupMetaData,
schema: SchemaRef,
) -> ArrowResult<Option<BatchStats>> {
let mut schema = Schema::with_capacity(arrow_schema.fields.len());
let mut stats = vec![];

for fld in &arrow_schema.fields {
// note that we only select a single row group.
let st = match rg {
None => deserialize(fld, md)?,
// we select a single row group and collect only those stats
Some(rg) => deserialize(fld, &md[rg..rg + 1])?,
};
schema.with_column((&fld.name).into(), (&fld.data_type).into());
stats.push(ColumnStats::from_arrow_stats(st, fld));
for (name, dt) in schema.iter() {
let fld = ArrowField::new(name.as_str(), dt.to_arrow(), true);
let st = deserialize(&fld, md)?;
stats.push(ColumnStats::from_arrow_stats(st, &fld));
}

Ok(if stats.is_empty() {
Expand All @@ -44,14 +37,13 @@ pub(crate) fn collect_statistics(
}

pub(super) fn read_this_row_group(
predicate: Option<&Arc<dyn PhysicalIoExpr>>,
file_metadata: &arrow::io::parquet::read::FileMetaData,
schema: &ArrowSchema,
rg: usize,
predicate: Option<&dyn PhysicalIoExpr>,
md: &RowGroupMetaData,
schema: &SchemaRef,
) -> PolarsResult<bool> {
if let Some(pred) = &predicate {
if let Some(pred) = predicate {
if let Some(pred) = pred.as_stats_evaluator() {
if let Some(stats) = collect_statistics(&file_metadata.row_groups, schema, Some(rg))? {
if let Some(stats) = collect_statistics(md, schema.clone())? {
let should_read = pred.should_read(&stats);
// a parquet file may not have statistics of all columns
if matches!(should_read, Ok(false)) {
Expand Down
60 changes: 44 additions & 16 deletions crates/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub struct ParquetReader<R: Read + Seek> {
columns: Option<Vec<String>>,
projection: Option<Vec<usize>>,
parallel: ParallelStrategy,
schema: Option<SchemaRef>,
row_count: Option<RowCount>,
low_memory: bool,
metadata: Option<Arc<FileMetaData>>,
Expand All @@ -65,8 +66,8 @@ impl<R: MmapBytesReader> ParquetReader<R> {
projection: Option<&[usize]>,
) -> PolarsResult<DataFrame> {
// this path takes predicates and parallelism into account
let metadata = read::read_metadata(&mut self.reader)?;
let schema = read::schema::infer_schema(&metadata)?;
let metadata = self.get_metadata()?.clone();
let schema = self.schema()?;

let rechunk = self.rechunk;
read_parquet(
Expand All @@ -75,7 +76,7 @@ impl<R: MmapBytesReader> ParquetReader<R> {
projection,
&schema,
Some(metadata),
predicate,
predicate.as_deref(),
self.parallel,
self.row_count,
self.use_statistics,
Expand Down Expand Up @@ -129,9 +130,16 @@ impl<R: MmapBytesReader> ParquetReader<R> {
}

/// [`Schema`] of the file.
pub fn schema(&mut self) -> PolarsResult<Schema> {
let metadata = self.get_metadata()?;
Ok(Schema::from_iter(&read::infer_schema(metadata)?.fields))
pub fn schema(&mut self) -> PolarsResult<SchemaRef> {
match &self.schema {
Some(schema) => Ok(schema.clone()),
None => {
let metadata = self.get_metadata()?;
Ok(Arc::new(Schema::from_iter(
&read::infer_schema(metadata)?.fields,
)))
},
}
}

/// Use statistics in the parquet to determine if pages
Expand All @@ -152,7 +160,7 @@ impl<R: MmapBytesReader> ParquetReader<R> {
self
}

pub fn get_metadata(&mut self) -> PolarsResult<&Arc<FileMetaData>> {
pub fn get_metadata(&mut self) -> PolarsResult<&FileMetaDataRef> {
if self.metadata.is_none() {
self.metadata = Some(Arc::new(read::read_metadata(&mut self.reader)?));
}
Expand All @@ -163,13 +171,16 @@ impl<R: MmapBytesReader> ParquetReader<R> {
impl<R: MmapBytesReader + 'static> ParquetReader<R> {
pub fn batched(mut self, chunk_size: usize) -> PolarsResult<BatchedParquetReader> {
let metadata = self.get_metadata()?.clone();
let schema = self.schema()?;

let row_group_fetcher = FetchRowGroupsFromMmapReader::new(Box::new(self.reader))?.into();
BatchedParquetReader::new(
row_group_fetcher,
metadata,
schema,
self.n_rows.unwrap_or(usize::MAX),
self.projection,
None,
self.row_count,
chunk_size,
self.use_statistics,
Expand All @@ -191,6 +202,7 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
row_count: None,
low_memory: false,
metadata: None,
schema: None,
use_statistics: true,
hive_partition_columns: None,
}
Expand All @@ -202,11 +214,11 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
}

fn finish(mut self) -> PolarsResult<DataFrame> {
let metadata = read::read_metadata(&mut self.reader)?;
let schema = read::schema::infer_schema(&metadata)?;
let schema = self.schema()?;
let metadata = self.get_metadata()?.clone();

if let Some(cols) = &self.columns {
self.projection = Some(columns_to_projection(cols, &schema)?);
self.projection = Some(columns_to_projection_pl_schema(cols, schema.as_ref())?);
}

read_parquet(
Expand Down Expand Up @@ -238,6 +250,7 @@ pub struct ParquetAsyncReader {
n_rows: Option<usize>,
rechunk: bool,
projection: Option<Vec<usize>>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
row_count: Option<RowCount>,
use_statistics: bool,
hive_partition_columns: Option<Vec<Series>>,
Expand All @@ -258,14 +271,18 @@ impl ParquetAsyncReader {
n_rows: None,
projection: None,
row_count: None,
predicate: None,
use_statistics: true,
hive_partition_columns: None,
schema,
})
}

pub async fn schema(&mut self) -> PolarsResult<Schema> {
self.reader.schema().await
pub async fn schema(&mut self) -> PolarsResult<SchemaRef> {
match &self.schema {
Some(schema) => Ok(schema.clone()),
None => self.reader.schema().await.map(Arc::new),
}
}
pub async fn num_rows(&mut self) -> PolarsResult<usize> {
self.reader.num_rows().await
Expand All @@ -291,6 +308,11 @@ impl ParquetAsyncReader {
self
}

pub fn with_predicate(mut self, predicate: Option<Arc<dyn PhysicalIoExpr>>) -> Self {
self.predicate = predicate;
self
}

/// Use statistics in the parquet to determine if pages
/// can be skipped from reading.
pub fn use_statistics(mut self, toggle: bool) -> Self {
Expand All @@ -305,19 +327,23 @@ impl ParquetAsyncReader {

pub async fn batched(mut self, chunk_size: usize) -> PolarsResult<BatchedParquetReader> {
let metadata = self.reader.get_metadata().await?.clone();
let schema = self.schema().await?;
// row group fetched deals with projection
let row_group_fetcher = FetchRowGroupsFromObjectStore::new(
self.reader,
&metadata,
self.schema.unwrap(),
self.projection.as_deref(),
self.predicate.clone(),
)?
.into();
BatchedParquetReader::new(
row_group_fetcher,
metadata,
schema,
self.n_rows.unwrap_or(usize::MAX),
self.projection,
self.predicate.clone(),
self.row_count,
chunk_size,
self.use_statistics,
Expand All @@ -329,12 +355,11 @@ impl ParquetAsyncReader {
self.reader.get_metadata().await
}

pub async fn finish(
self,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
) -> PolarsResult<DataFrame> {
pub async fn finish(mut self) -> PolarsResult<DataFrame> {
let rechunk = self.rechunk;
let schema = self.schema().await?;

let predicate = self.predicate.clone();
// batched reader deals with slice pushdown
let reader = self.batched(usize::MAX).await?;
let mut iter = reader.iter(16);
Expand All @@ -347,6 +372,9 @@ impl ParquetAsyncReader {
})?;
chunks.push(out)
}
if chunks.is_empty() {
return Ok(DataFrame::from(schema.as_ref()));
}
let mut df = concat_df(&chunks)?;

if rechunk {
Expand Down
Loading

0 comments on commit 2e83540

Please sign in to comment.