Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: process parquet statistics before downloading row-group #11709

Merged
merged 3 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 19 additions & 22 deletions crates/nano-arrow/src/io/parquet/read/statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,34 +544,31 @@ fn push(
}
}

/// Deserializes the statistics in the column chunks from all `row_groups`
/// Deserializes the statistics in the column chunks from a single `row_group`
/// into [`Statistics`] associated from `field`'s name.
///
/// # Errors
/// This function errors if the deserialization of the statistics fails (e.g. invalid utf8)
pub fn deserialize(field: &Field, row_groups: &[RowGroupMetaData]) -> Result<Statistics> {
pub fn deserialize(field: &Field, row_group: &RowGroupMetaData) -> Result<Statistics> {
let mut statistics = MutableStatistics::try_new(field)?;

// transpose
row_groups.iter().try_for_each(|group| {
let columns = get_field_columns(group.columns(), field.name.as_ref());
let mut stats = columns
.into_iter()
.map(|column| {
Ok((
column.statistics().transpose()?,
column.descriptor().descriptor.primitive_type.clone(),
))
})
.collect::<Result<VecDeque<(Option<_>, ParquetPrimitiveType)>>>()?;
push(
&mut stats,
statistics.min_value.as_mut(),
statistics.max_value.as_mut(),
statistics.distinct_count.as_mut(),
statistics.null_count.as_mut(),
)
})?;
let columns = get_field_columns(row_group.columns(), field.name.as_ref());
let mut stats = columns
.into_iter()
.map(|column| {
Ok((
column.statistics().transpose()?,
column.descriptor().descriptor.primitive_type.clone(),
))
})
.collect::<Result<VecDeque<(Option<_>, ParquetPrimitiveType)>>>()?;
push(
&mut stats,
statistics.min_value.as_mut(),
statistics.max_value.as_mut(),
statistics.distinct_count.as_mut(),
statistics.null_count.as_mut(),
)?;

Ok(statistics.into())
}
3 changes: 2 additions & 1 deletion crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ rayon = { workspace = true }
regex = { workspace = true }
reqwest = { workspace = true, optional = true }
ryu = { workspace = true, optional = true }
serde = { workspace = true, features = ["derive"], optional = true }
serde = { workspace = true, features = ["derive", "rc"], optional = true }
serde_json = { version = "1", default-features = false, features = ["alloc", "raw_value"], optional = true }
simd-json = { workspace = true, optional = true }
simdutf8 = { workspace = true, optional = true }
Expand All @@ -65,6 +65,7 @@ json = [
"dtype-struct",
"csv",
]
serde = ["dep:serde", "polars-core/serde-lazy"]
# support for arrows ipc file parsing
ipc = ["arrow/io_ipc", "arrow/io_ipc_compression"]
# support for arrows streaming ipc file parsing
Expand Down
26 changes: 24 additions & 2 deletions crates/polars-io/src/parquet/async_impl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Read parquet files in parallel from the Object Store without a third party crate.
use std::borrow::Cow;
use std::ops::Range;
use std::sync::Arc;

Expand All @@ -19,6 +20,8 @@ use super::cloud::{build_object_store, CloudLocation, CloudReader};
use super::mmap;
use super::mmap::ColumnStore;
use crate::cloud::CloudOptions;
use crate::predicates::PhysicalIoExpr;
use crate::prelude::predicates::read_this_row_group;

pub struct ParquetObjectStore {
store: Arc<dyn ObjectStore>,
Expand Down Expand Up @@ -155,6 +158,8 @@ pub struct FetchRowGroupsFromObjectStore {
reader: Arc<ParquetObjectStore>,
row_groups_metadata: Vec<RowGroupMetaData>,
projected_fields: Vec<SmartString>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
schema: SchemaRef,
logging: bool,
}

Expand All @@ -164,6 +169,7 @@ impl FetchRowGroupsFromObjectStore {
metadata: &FileMetaData,
schema: SchemaRef,
projection: Option<&[usize]>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
) -> PolarsResult<Self> {
let logging = verbose();

Expand All @@ -180,6 +186,8 @@ impl FetchRowGroupsFromObjectStore {
reader: Arc::new(reader),
row_groups_metadata: metadata.row_groups.to_owned(),
projected_fields,
predicate,
schema,
logging,
})
}
Expand All @@ -189,7 +197,7 @@ impl FetchRowGroupsFromObjectStore {
row_groups: Range<usize>,
) -> PolarsResult<ColumnStore> {
// Fetch the required row groups.
let row_groups = &self
let row_groups = self
.row_groups_metadata
.get(row_groups.clone())
.map_or_else(
Expand All @@ -199,9 +207,23 @@ impl FetchRowGroupsFromObjectStore {
Ok,
)?;

let row_groups = if let Some(pred) = self.predicate.as_deref() {
Cow::Owned(
row_groups
.iter()
.filter(|rg| {
matches!(read_this_row_group(Some(pred), rg, &self.schema), Ok(true))
})
.cloned()
.collect::<Vec<_>>(),
)
} else {
Cow::Borrowed(row_groups)
};

// Package in the format required by ColumnStore.
let downloaded =
download_projection(&self.projected_fields, row_groups, &self.reader).await?;
download_projection(&self.projected_fields, &row_groups, &self.reader).await?;

if self.logging {
eprintln!(
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-io/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ mod read;
mod read_impl;
mod write;

use arrow::io::parquet::write::FileMetaData;
pub use read::*;
pub use write::{BrotliLevel, GzipLevel, ZstdLevel, *};

pub type FileMetaDataRef = Arc<FileMetaData>;

use super::*;

#[cfg(test)]
Expand Down
30 changes: 11 additions & 19 deletions crates/polars-io/src/parquet/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,15 @@ impl ColumnStats {

/// Collect the statistics in a column chunk.
pub(crate) fn collect_statistics(
md: &[RowGroupMetaData],
arrow_schema: &ArrowSchema,
rg: Option<usize>,
md: &RowGroupMetaData,
schema: SchemaRef,
) -> ArrowResult<Option<BatchStats>> {
let mut schema = Schema::with_capacity(arrow_schema.fields.len());
let mut stats = vec![];

for fld in &arrow_schema.fields {
// note that we only select a single row group.
let st = match rg {
None => deserialize(fld, md)?,
// we select a single row group and collect only those stats
Some(rg) => deserialize(fld, &md[rg..rg + 1])?,
};
schema.with_column((&fld.name).into(), (&fld.data_type).into());
stats.push(ColumnStats::from_arrow_stats(st, fld));
for (name, dt) in schema.iter() {
let fld = ArrowField::new(name.as_str(), dt.to_arrow(), true);
let st = deserialize(&fld, md)?;
stats.push(ColumnStats::from_arrow_stats(st, &fld));
}

Ok(if stats.is_empty() {
Expand All @@ -44,14 +37,13 @@ pub(crate) fn collect_statistics(
}

pub(super) fn read_this_row_group(
predicate: Option<&Arc<dyn PhysicalIoExpr>>,
file_metadata: &arrow::io::parquet::read::FileMetaData,
schema: &ArrowSchema,
rg: usize,
predicate: Option<&dyn PhysicalIoExpr>,
md: &RowGroupMetaData,
schema: &SchemaRef,
) -> PolarsResult<bool> {
if let Some(pred) = &predicate {
if let Some(pred) = predicate {
if let Some(pred) = pred.as_stats_evaluator() {
if let Some(stats) = collect_statistics(&file_metadata.row_groups, schema, Some(rg))? {
if let Some(stats) = collect_statistics(md, schema.clone())? {
let should_read = pred.should_read(&stats);
// a parquet file may not have statistics of all columns
if matches!(should_read, Ok(false)) {
Expand Down
60 changes: 44 additions & 16 deletions crates/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub struct ParquetReader<R: Read + Seek> {
columns: Option<Vec<String>>,
projection: Option<Vec<usize>>,
parallel: ParallelStrategy,
schema: Option<SchemaRef>,
row_count: Option<RowCount>,
low_memory: bool,
metadata: Option<Arc<FileMetaData>>,
Expand All @@ -65,8 +66,8 @@ impl<R: MmapBytesReader> ParquetReader<R> {
projection: Option<&[usize]>,
) -> PolarsResult<DataFrame> {
// this path takes predicates and parallelism into account
let metadata = read::read_metadata(&mut self.reader)?;
let schema = read::schema::infer_schema(&metadata)?;
let metadata = self.get_metadata()?.clone();
let schema = self.schema()?;

let rechunk = self.rechunk;
read_parquet(
Expand All @@ -75,7 +76,7 @@ impl<R: MmapBytesReader> ParquetReader<R> {
projection,
&schema,
Some(metadata),
predicate,
predicate.as_deref(),
self.parallel,
self.row_count,
self.use_statistics,
Expand Down Expand Up @@ -129,9 +130,16 @@ impl<R: MmapBytesReader> ParquetReader<R> {
}

/// [`Schema`] of the file.
pub fn schema(&mut self) -> PolarsResult<Schema> {
let metadata = self.get_metadata()?;
Ok(Schema::from_iter(&read::infer_schema(metadata)?.fields))
pub fn schema(&mut self) -> PolarsResult<SchemaRef> {
match &self.schema {
Some(schema) => Ok(schema.clone()),
None => {
let metadata = self.get_metadata()?;
Ok(Arc::new(Schema::from_iter(
&read::infer_schema(metadata)?.fields,
)))
},
}
}

/// Use statistics in the parquet to determine if pages
Expand All @@ -152,7 +160,7 @@ impl<R: MmapBytesReader> ParquetReader<R> {
self
}

pub fn get_metadata(&mut self) -> PolarsResult<&Arc<FileMetaData>> {
pub fn get_metadata(&mut self) -> PolarsResult<&FileMetaDataRef> {
if self.metadata.is_none() {
self.metadata = Some(Arc::new(read::read_metadata(&mut self.reader)?));
}
Expand All @@ -163,13 +171,16 @@ impl<R: MmapBytesReader> ParquetReader<R> {
impl<R: MmapBytesReader + 'static> ParquetReader<R> {
pub fn batched(mut self, chunk_size: usize) -> PolarsResult<BatchedParquetReader> {
let metadata = self.get_metadata()?.clone();
let schema = self.schema()?;

let row_group_fetcher = FetchRowGroupsFromMmapReader::new(Box::new(self.reader))?.into();
BatchedParquetReader::new(
row_group_fetcher,
metadata,
schema,
self.n_rows.unwrap_or(usize::MAX),
self.projection,
None,
self.row_count,
chunk_size,
self.use_statistics,
Expand All @@ -191,6 +202,7 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
row_count: None,
low_memory: false,
metadata: None,
schema: None,
use_statistics: true,
hive_partition_columns: None,
}
Expand All @@ -202,11 +214,11 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
}

fn finish(mut self) -> PolarsResult<DataFrame> {
let metadata = read::read_metadata(&mut self.reader)?;
let schema = read::schema::infer_schema(&metadata)?;
let schema = self.schema()?;
let metadata = self.get_metadata()?.clone();

if let Some(cols) = &self.columns {
self.projection = Some(columns_to_projection(cols, &schema)?);
self.projection = Some(columns_to_projection_pl_schema(cols, schema.as_ref())?);
}

read_parquet(
Expand Down Expand Up @@ -238,6 +250,7 @@ pub struct ParquetAsyncReader {
n_rows: Option<usize>,
rechunk: bool,
projection: Option<Vec<usize>>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
row_count: Option<RowCount>,
use_statistics: bool,
hive_partition_columns: Option<Vec<Series>>,
Expand All @@ -258,14 +271,18 @@ impl ParquetAsyncReader {
n_rows: None,
projection: None,
row_count: None,
predicate: None,
use_statistics: true,
hive_partition_columns: None,
schema,
})
}

pub async fn schema(&mut self) -> PolarsResult<Schema> {
self.reader.schema().await
pub async fn schema(&mut self) -> PolarsResult<SchemaRef> {
match &self.schema {
Some(schema) => Ok(schema.clone()),
None => self.reader.schema().await.map(Arc::new),
}
}
pub async fn num_rows(&mut self) -> PolarsResult<usize> {
self.reader.num_rows().await
Expand All @@ -291,6 +308,11 @@ impl ParquetAsyncReader {
self
}

pub fn with_predicate(mut self, predicate: Option<Arc<dyn PhysicalIoExpr>>) -> Self {
self.predicate = predicate;
self
}

/// Use statistics in the parquet to determine if pages
/// can be skipped from reading.
pub fn use_statistics(mut self, toggle: bool) -> Self {
Expand All @@ -305,19 +327,23 @@ impl ParquetAsyncReader {

pub async fn batched(mut self, chunk_size: usize) -> PolarsResult<BatchedParquetReader> {
let metadata = self.reader.get_metadata().await?.clone();
let schema = self.schema().await?;
// row group fetched deals with projection
let row_group_fetcher = FetchRowGroupsFromObjectStore::new(
self.reader,
&metadata,
self.schema.unwrap(),
self.projection.as_deref(),
self.predicate.clone(),
)?
.into();
BatchedParquetReader::new(
row_group_fetcher,
metadata,
schema,
self.n_rows.unwrap_or(usize::MAX),
self.projection,
self.predicate.clone(),
self.row_count,
chunk_size,
self.use_statistics,
Expand All @@ -329,12 +355,11 @@ impl ParquetAsyncReader {
self.reader.get_metadata().await
}

pub async fn finish(
self,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
) -> PolarsResult<DataFrame> {
pub async fn finish(mut self) -> PolarsResult<DataFrame> {
let rechunk = self.rechunk;
let schema = self.schema().await?;

let predicate = self.predicate.clone();
// batched reader deals with slice pushdown
let reader = self.batched(usize::MAX).await?;
let mut iter = reader.iter(16);
Expand All @@ -347,6 +372,9 @@ impl ParquetAsyncReader {
})?;
chunks.push(out)
}
if chunks.is_empty() {
return Ok(DataFrame::from(schema.as_ref()));
}
let mut df = concat_df(&chunks)?;

if rechunk {
Expand Down
Loading