From c754a4c45e1f745f5c53364afab9bbdffb8132a0 Mon Sep 17 00:00:00 2001 From: ritchie Date: Fri, 13 Oct 2023 09:01:04 +0200 Subject: [PATCH 1/3] perf: process parquet statistics before downloading row-group --- .../src/io/parquet/read/statistics/mod.rs | 41 +++++++++---------- crates/polars-io/src/parquet/async_impl.rs | 2 +- crates/polars-io/src/parquet/predicates.rs | 15 ++----- crates/polars-io/src/parquet/read_impl.rs | 10 ++++- 4 files changed, 32 insertions(+), 36 deletions(-) diff --git a/crates/nano-arrow/src/io/parquet/read/statistics/mod.rs b/crates/nano-arrow/src/io/parquet/read/statistics/mod.rs index 3048952530a6..0d3ec5c00658 100644 --- a/crates/nano-arrow/src/io/parquet/read/statistics/mod.rs +++ b/crates/nano-arrow/src/io/parquet/read/statistics/mod.rs @@ -544,34 +544,31 @@ fn push( } } -/// Deserializes the statistics in the column chunks from all `row_groups` +/// Deserializes the statistics in the column chunks from a single `row_group` /// into [`Statistics`] associated from `field`'s name. /// /// # Errors /// This function errors if the deserialization of the statistics fails (e.g. invalid utf8) -pub fn deserialize(field: &Field, row_groups: &[RowGroupMetaData]) -> Result { +pub fn deserialize(field: &Field, row_group: &RowGroupMetaData) -> Result { let mut statistics = MutableStatistics::try_new(field)?; - // transpose - row_groups.iter().try_for_each(|group| { - let columns = get_field_columns(group.columns(), field.name.as_ref()); - let mut stats = columns - .into_iter() - .map(|column| { - Ok(( - column.statistics().transpose()?, - column.descriptor().descriptor.primitive_type.clone(), - )) - }) - .collect::, ParquetPrimitiveType)>>>()?; - push( - &mut stats, - statistics.min_value.as_mut(), - statistics.max_value.as_mut(), - statistics.distinct_count.as_mut(), - statistics.null_count.as_mut(), - ) - })?; + let columns = get_field_columns(row_group.columns(), field.name.as_ref()); + let mut stats = columns + .into_iter() + .map(|column| { + Ok(( + column.statistics().transpose()?, + column.descriptor().descriptor.primitive_type.clone(), + )) + }) + .collect::, ParquetPrimitiveType)>>>()?; + push( + &mut stats, + statistics.min_value.as_mut(), + statistics.max_value.as_mut(), + statistics.distinct_count.as_mut(), + statistics.null_count.as_mut(), + )?; Ok(statistics.into()) } diff --git a/crates/polars-io/src/parquet/async_impl.rs b/crates/polars-io/src/parquet/async_impl.rs index ce5ecb7ce7f4..c41bfdf70113 100644 --- a/crates/polars-io/src/parquet/async_impl.rs +++ b/crates/polars-io/src/parquet/async_impl.rs @@ -189,7 +189,7 @@ impl FetchRowGroupsFromObjectStore { row_groups: Range, ) -> PolarsResult { // Fetch the required row groups. - let row_groups = &self + let row_groups = self .row_groups_metadata .get(row_groups.clone()) .map_or_else( diff --git a/crates/polars-io/src/parquet/predicates.rs b/crates/polars-io/src/parquet/predicates.rs index 02262660e384..c2ebf1e9dec1 100644 --- a/crates/polars-io/src/parquet/predicates.rs +++ b/crates/polars-io/src/parquet/predicates.rs @@ -18,20 +18,14 @@ impl ColumnStats { /// Collect the statistics in a column chunk. pub(crate) fn collect_statistics( - md: &[RowGroupMetaData], + md: &RowGroupMetaData, arrow_schema: &ArrowSchema, - rg: Option, ) -> ArrowResult> { let mut schema = Schema::with_capacity(arrow_schema.fields.len()); let mut stats = vec![]; for fld in &arrow_schema.fields { - // note that we only select a single row group. - let st = match rg { - None => deserialize(fld, md)?, - // we select a single row group and collect only those stats - Some(rg) => deserialize(fld, &md[rg..rg + 1])?, - }; + let st = deserialize(fld, md)?; schema.with_column((&fld.name).into(), (&fld.data_type).into()); stats.push(ColumnStats::from_arrow_stats(st, fld)); } @@ -45,13 +39,12 @@ pub(crate) fn collect_statistics( pub(super) fn read_this_row_group( predicate: Option<&Arc>, - file_metadata: &arrow::io::parquet::read::FileMetaData, + md: &RowGroupMetaData, schema: &ArrowSchema, - rg: usize, ) -> PolarsResult { if let Some(pred) = &predicate { if let Some(pred) = pred.as_stats_evaluator() { - if let Some(stats) = collect_statistics(&file_metadata.row_groups, schema, Some(rg))? { + if let Some(stats) = collect_statistics(md, schema)? { let should_read = pred.should_read(&stats); // a parquet file may not have statistics of all columns if matches!(should_read, Ok(false)) { diff --git a/crates/polars-io/src/parquet/read_impl.rs b/crates/polars-io/src/parquet/read_impl.rs index 5f99e770b343..aef3895aa28e 100644 --- a/crates/polars-io/src/parquet/read_impl.rs +++ b/crates/polars-io/src/parquet/read_impl.rs @@ -178,7 +178,9 @@ fn rg_to_dfs_optionally_par_over_columns( let md = &file_metadata.row_groups[rg]; let current_row_count = md.num_rows() as IdxSize; - if use_statistics && !read_this_row_group(predicate.as_ref(), file_metadata, schema, rg)? { + if use_statistics + && !read_this_row_group(predicate.as_ref(), &file_metadata.row_groups[rg], schema)? + { *previous_row_count += current_row_count; continue; } @@ -273,7 +275,11 @@ fn rg_to_dfs_par_over_rg( .map(|(rg_idx, md, local_limit, row_count_start)| { if local_limit == 0 || use_statistics - && !read_this_row_group(predicate.as_ref(), file_metadata, schema, rg_idx)? + && !read_this_row_group( + predicate.as_ref(), + &file_metadata.row_groups[rg_idx], + schema, + )? { return Ok(None); } From 58631fbf2dd42e05d3ce96f1d5f1394817b91b4c Mon Sep 17 00:00:00 2001 From: ritchie Date: Fri, 13 Oct 2023 10:10:41 +0200 Subject: [PATCH 2/3] refactor to schema --- crates/polars-io/src/parquet/async_impl.rs | 24 +++++++- crates/polars-io/src/parquet/mod.rs | 3 + crates/polars-io/src/parquet/predicates.rs | 19 +++--- crates/polars-io/src/parquet/read.rs | 52 +++++++++++----- crates/polars-io/src/parquet/read_impl.rs | 59 +++++++++---------- crates/polars-io/src/predicates.rs | 16 +---- crates/polars-io/src/utils.rs | 23 +++++++- .../physical_plan/executors/scan/parquet.rs | 3 +- .../polars-plan/src/logical_plan/builder.rs | 9 +-- crates/polars-plan/src/logical_plan/hive.rs | 2 +- 10 files changed, 130 insertions(+), 80 deletions(-) diff --git a/crates/polars-io/src/parquet/async_impl.rs b/crates/polars-io/src/parquet/async_impl.rs index c41bfdf70113..415ba0e83bbf 100644 --- a/crates/polars-io/src/parquet/async_impl.rs +++ b/crates/polars-io/src/parquet/async_impl.rs @@ -1,4 +1,5 @@ //! Read parquet files in parallel from the Object Store without a third party crate. +use std::borrow::Cow; use std::ops::Range; use std::sync::Arc; @@ -19,6 +20,8 @@ use super::cloud::{build_object_store, CloudLocation, CloudReader}; use super::mmap; use super::mmap::ColumnStore; use crate::cloud::CloudOptions; +use crate::predicates::PhysicalIoExpr; +use crate::prelude::predicates::read_this_row_group; pub struct ParquetObjectStore { store: Arc, @@ -155,6 +158,8 @@ pub struct FetchRowGroupsFromObjectStore { reader: Arc, row_groups_metadata: Vec, projected_fields: Vec, + predicate: Option>, + schema: SchemaRef, logging: bool, } @@ -164,6 +169,7 @@ impl FetchRowGroupsFromObjectStore { metadata: &FileMetaData, schema: SchemaRef, projection: Option<&[usize]>, + predicate: Option>, ) -> PolarsResult { let logging = verbose(); @@ -180,6 +186,8 @@ impl FetchRowGroupsFromObjectStore { reader: Arc::new(reader), row_groups_metadata: metadata.row_groups.to_owned(), projected_fields, + predicate, + schema, logging, }) } @@ -199,9 +207,23 @@ impl FetchRowGroupsFromObjectStore { Ok, )?; + let row_groups = if let Some(pred) = self.predicate.as_deref() { + Cow::Owned( + row_groups + .iter() + .filter(|rg| { + matches!(read_this_row_group(Some(pred), rg, &self.schema), Ok(true)) + }) + .cloned() + .collect::>(), + ) + } else { + Cow::Borrowed(row_groups) + }; + // Package in the format required by ColumnStore. let downloaded = - download_projection(&self.projected_fields, row_groups, &self.reader).await?; + download_projection(&self.projected_fields, &row_groups, &self.reader).await?; if self.logging { eprintln!( diff --git a/crates/polars-io/src/parquet/mod.rs b/crates/polars-io/src/parquet/mod.rs index 78ac26ea78ba..1e9b4da863dc 100644 --- a/crates/polars-io/src/parquet/mod.rs +++ b/crates/polars-io/src/parquet/mod.rs @@ -22,9 +22,12 @@ mod read; mod read_impl; mod write; +use arrow::io::parquet::write::FileMetaData; pub use read::*; pub use write::{BrotliLevel, GzipLevel, ZstdLevel, *}; +pub type FileMetaDataRef = Arc; + use super::*; #[cfg(test)] diff --git a/crates/polars-io/src/parquet/predicates.rs b/crates/polars-io/src/parquet/predicates.rs index c2ebf1e9dec1..cd8a30349d0b 100644 --- a/crates/polars-io/src/parquet/predicates.rs +++ b/crates/polars-io/src/parquet/predicates.rs @@ -19,15 +19,14 @@ impl ColumnStats { /// Collect the statistics in a column chunk. pub(crate) fn collect_statistics( md: &RowGroupMetaData, - arrow_schema: &ArrowSchema, + schema: SchemaRef, ) -> ArrowResult> { - let mut schema = Schema::with_capacity(arrow_schema.fields.len()); let mut stats = vec![]; - for fld in &arrow_schema.fields { - let st = deserialize(fld, md)?; - schema.with_column((&fld.name).into(), (&fld.data_type).into()); - stats.push(ColumnStats::from_arrow_stats(st, fld)); + for (name, dt) in schema.iter() { + let fld = ArrowField::new(name.as_str(), dt.to_arrow(), true); + let st = deserialize(&fld, md)?; + stats.push(ColumnStats::from_arrow_stats(st, &fld)); } Ok(if stats.is_empty() { @@ -38,13 +37,13 @@ pub(crate) fn collect_statistics( } pub(super) fn read_this_row_group( - predicate: Option<&Arc>, + predicate: Option<&dyn PhysicalIoExpr>, md: &RowGroupMetaData, - schema: &ArrowSchema, + schema: &SchemaRef, ) -> PolarsResult { - if let Some(pred) = &predicate { + if let Some(pred) = predicate { if let Some(pred) = pred.as_stats_evaluator() { - if let Some(stats) = collect_statistics(md, schema)? { + if let Some(stats) = collect_statistics(md, schema.clone())? { let should_read = pred.should_read(&stats); // a parquet file may not have statistics of all columns if matches!(should_read, Ok(false)) { diff --git a/crates/polars-io/src/parquet/read.rs b/crates/polars-io/src/parquet/read.rs index 544c216975fd..7d5a7b8ac4bf 100644 --- a/crates/polars-io/src/parquet/read.rs +++ b/crates/polars-io/src/parquet/read.rs @@ -49,6 +49,7 @@ pub struct ParquetReader { columns: Option>, projection: Option>, parallel: ParallelStrategy, + schema: Option, row_count: Option, low_memory: bool, metadata: Option>, @@ -65,8 +66,8 @@ impl ParquetReader { projection: Option<&[usize]>, ) -> PolarsResult { // this path takes predicates and parallelism into account - let metadata = read::read_metadata(&mut self.reader)?; - let schema = read::schema::infer_schema(&metadata)?; + let metadata = self.get_metadata()?.clone(); + let schema = self.schema()?; let rechunk = self.rechunk; read_parquet( @@ -129,9 +130,16 @@ impl ParquetReader { } /// [`Schema`] of the file. - pub fn schema(&mut self) -> PolarsResult { - let metadata = self.get_metadata()?; - Ok(Schema::from_iter(&read::infer_schema(metadata)?.fields)) + pub fn schema(&mut self) -> PolarsResult { + match &self.schema { + Some(schema) => Ok(schema.clone()), + None => { + let metadata = self.get_metadata()?; + Ok(Arc::new(Schema::from_iter( + &read::infer_schema(metadata)?.fields, + ))) + }, + } } /// Use statistics in the parquet to determine if pages @@ -152,7 +160,7 @@ impl ParquetReader { self } - pub fn get_metadata(&mut self) -> PolarsResult<&Arc> { + pub fn get_metadata(&mut self) -> PolarsResult<&FileMetaDataRef> { if self.metadata.is_none() { self.metadata = Some(Arc::new(read::read_metadata(&mut self.reader)?)); } @@ -163,11 +171,13 @@ impl ParquetReader { impl ParquetReader { pub fn batched(mut self, chunk_size: usize) -> PolarsResult { let metadata = self.get_metadata()?.clone(); + let schema = self.schema()?; let row_group_fetcher = FetchRowGroupsFromMmapReader::new(Box::new(self.reader))?.into(); BatchedParquetReader::new( row_group_fetcher, metadata, + schema, self.n_rows.unwrap_or(usize::MAX), self.projection, self.row_count, @@ -191,6 +201,7 @@ impl SerReader for ParquetReader { row_count: None, low_memory: false, metadata: None, + schema: None, use_statistics: true, hive_partition_columns: None, } @@ -202,11 +213,11 @@ impl SerReader for ParquetReader { } fn finish(mut self) -> PolarsResult { - let metadata = read::read_metadata(&mut self.reader)?; - let schema = read::schema::infer_schema(&metadata)?; + let schema = self.schema()?; + let metadata = self.get_metadata()?.clone(); if let Some(cols) = &self.columns { - self.projection = Some(columns_to_projection(cols, &schema)?); + self.projection = Some(columns_to_projection_pl_schema(cols, schema.as_ref())?); } read_parquet( @@ -238,6 +249,7 @@ pub struct ParquetAsyncReader { n_rows: Option, rechunk: bool, projection: Option>, + predicate: Option>, row_count: Option, use_statistics: bool, hive_partition_columns: Option>, @@ -258,14 +270,18 @@ impl ParquetAsyncReader { n_rows: None, projection: None, row_count: None, + predicate: None, use_statistics: true, hive_partition_columns: None, schema, }) } - pub async fn schema(&mut self) -> PolarsResult { - self.reader.schema().await + pub async fn schema(&mut self) -> PolarsResult { + match &self.schema { + Some(schema) => Ok(schema.clone()), + None => self.reader.schema().await.map(Arc::new), + } } pub async fn num_rows(&mut self) -> PolarsResult { self.reader.num_rows().await @@ -291,6 +307,11 @@ impl ParquetAsyncReader { self } + pub fn with_predicate(mut self, predicate: Option>) -> Self { + self.predicate = predicate; + self + } + /// Use statistics in the parquet to determine if pages /// can be skipped from reading. pub fn use_statistics(mut self, toggle: bool) -> Self { @@ -305,17 +326,20 @@ impl ParquetAsyncReader { pub async fn batched(mut self, chunk_size: usize) -> PolarsResult { let metadata = self.reader.get_metadata().await?.clone(); + let schema = self.schema().await?; // row group fetched deals with projection let row_group_fetcher = FetchRowGroupsFromObjectStore::new( self.reader, &metadata, self.schema.unwrap(), self.projection.as_deref(), + self.predicate.clone(), )? .into(); BatchedParquetReader::new( row_group_fetcher, metadata, + schema, self.n_rows.unwrap_or(usize::MAX), self.projection, self.row_count, @@ -329,12 +353,10 @@ impl ParquetAsyncReader { self.reader.get_metadata().await } - pub async fn finish( - self, - predicate: Option>, - ) -> PolarsResult { + pub async fn finish(self) -> PolarsResult { let rechunk = self.rechunk; + let predicate = self.predicate.clone(); // batched reader deals with slice pushdown let reader = self.batched(usize::MAX).await?; let mut iter = reader.iter(16); diff --git a/crates/polars-io/src/parquet/read_impl.rs b/crates/polars-io/src/parquet/read_impl.rs index aef3895aa28e..cf3d1edce852 100644 --- a/crates/polars-io/src/parquet/read_impl.rs +++ b/crates/polars-io/src/parquet/read_impl.rs @@ -18,9 +18,9 @@ use crate::mmap::{MmapBytesReader, ReaderBytes}; use crate::parquet::async_impl::FetchRowGroupsFromObjectStore; use crate::parquet::mmap::mmap_columns; use crate::parquet::predicates::read_this_row_group; -use crate::parquet::{mmap, ParallelStrategy}; -use crate::predicates::{apply_predicate, arrow_schema_to_empty_df, PhysicalIoExpr}; -use crate::utils::{apply_projection, get_reader_bytes}; +use crate::parquet::{mmap, FileMetaDataRef, ParallelStrategy}; +use crate::predicates::{apply_predicate, PhysicalIoExpr}; +use crate::utils::{apply_projection_pl_schema, get_reader_bytes}; use crate::RowCount; fn enlarge_data_type(mut data_type: ArrowDataType) -> ArrowDataType { @@ -44,11 +44,12 @@ fn column_idx_to_series( column_i: usize, md: &RowGroupMetaData, remaining_rows: usize, - schema: &ArrowSchema, + schema: &Schema, store: &mmap::ColumnStore, chunk_size: usize, ) -> PolarsResult { - let mut field = schema.fields[column_i].clone(); + let (name, dt) = schema.get_at_index(column_i).unwrap(); + let mut field = ArrowField::new(name.as_str(), dt.to_arrow(), true); field.data_type = enlarge_data_type(field.data_type); @@ -113,7 +114,7 @@ fn rg_to_dfs( row_group_end: usize, remaining_rows: &mut usize, file_metadata: &FileMetaData, - schema: &ArrowSchema, + schema: &SchemaRef, predicate: Option>, row_count: Option, parallel: ParallelStrategy, @@ -164,7 +165,7 @@ fn rg_to_dfs_optionally_par_over_columns( row_group_end: usize, remaining_rows: &mut usize, file_metadata: &FileMetaData, - schema: &ArrowSchema, + schema: &SchemaRef, predicate: Option>, row_count: Option, parallel: ParallelStrategy, @@ -179,7 +180,7 @@ fn rg_to_dfs_optionally_par_over_columns( let current_row_count = md.num_rows() as IdxSize; if use_statistics - && !read_this_row_group(predicate.as_ref(), &file_metadata.row_groups[rg], schema)? + && !read_this_row_group(predicate.as_deref(), &file_metadata.row_groups[rg], schema)? { *previous_row_count += current_row_count; continue; @@ -245,7 +246,7 @@ fn rg_to_dfs_par_over_rg( previous_row_count: &mut IdxSize, remaining_rows: &mut usize, file_metadata: &FileMetaData, - schema: &ArrowSchema, + schema: &SchemaRef, predicate: Option>, row_count: Option, projection: &[usize], @@ -276,7 +277,7 @@ fn rg_to_dfs_par_over_rg( if local_limit == 0 || use_statistics && !read_this_row_group( - predicate.as_ref(), + predicate.as_deref(), &file_metadata.row_groups[rg_idx], schema, )? @@ -317,8 +318,8 @@ pub fn read_parquet( mut reader: R, mut limit: usize, projection: Option<&[usize]>, - schema: &ArrowSchema, - metadata: Option, + schema: &SchemaRef, + metadata: Option, predicate: Option>, mut parallel: ParallelStrategy, row_count: Option, @@ -327,7 +328,7 @@ pub fn read_parquet( ) -> PolarsResult { let file_metadata = metadata .map(Ok) - .unwrap_or_else(|| read::read_metadata(&mut reader))?; + .unwrap_or_else(|| read::read_metadata(&mut reader).map(Arc::new))?; let n_row_groups = file_metadata.row_groups.len(); // if there are multiple row groups and categorical data @@ -348,7 +349,7 @@ pub fn read_parquet( let projection = projection .map(Cow::Borrowed) - .unwrap_or_else(|| Cow::Owned((0usize..schema.fields.len()).collect::>())); + .unwrap_or_else(|| Cow::Owned((0usize..schema.len()).collect::>())); if let ParallelStrategy::Auto = parallel { if n_row_groups > projection.len() || n_row_groups > POOL.current_num_threads() { @@ -384,11 +385,14 @@ pub fn read_parquet( if dfs.is_empty() { let schema = if let Cow::Borrowed(_) = projection { - Cow::Owned(apply_projection(schema, &projection)) + Cow::Owned(Arc::new(apply_projection_pl_schema( + schema.as_ref(), + &projection, + ))) } else { Cow::Borrowed(schema) }; - let mut df = arrow_schema_to_empty_df(&schema); + let mut df = DataFrame::from(schema.as_ref().as_ref()); if let Some(parts) = hive_partition_columns { for s in parts { // SAFETY: length is equal @@ -458,8 +462,8 @@ pub struct BatchedParquetReader { row_group_fetcher: RowGroupFetcher, limit: usize, projection: Vec, - schema: Arc, - metadata: Arc, + schema: SchemaRef, + metadata: FileMetaDataRef, row_count: Option, rows_read: IdxSize, row_group_offset: usize, @@ -475,7 +479,8 @@ impl BatchedParquetReader { #[allow(clippy::too_many_arguments)] pub fn new( row_group_fetcher: RowGroupFetcher, - metadata: Arc, + metadata: FileMetaDataRef, + schema: SchemaRef, limit: usize, projection: Option>, row_count: Option, @@ -483,10 +488,8 @@ impl BatchedParquetReader { use_statistics: bool, hive_partition_columns: Option>, ) -> PolarsResult { - let schema = Arc::new(read::schema::infer_schema(&metadata)?); let n_row_groups = metadata.row_groups.len(); - let projection = - projection.unwrap_or_else(|| (0usize..schema.fields.len()).collect::>()); + let projection = projection.unwrap_or_else(|| (0usize..schema.len()).collect::>()); let parallel = if n_row_groups > projection.len() || n_row_groups > POOL.current_num_threads() { @@ -543,16 +546,8 @@ impl BatchedParquetReader { // case where there is no data in the file // the streaming engine needs at least a single chunk if self.rows_read == 0 && dfs.is_empty() { - let columns = self - .schema - .fields - .iter() - .map(|field| { - let dtype: DataType = (&field.data_type).into(); - Series::new_empty(&field.name, &dtype) - }) - .collect::>(); - return Ok(Some(vec![DataFrame::new_no_checks(columns)])); + let df = DataFrame::from(self.schema.as_ref()); + return Ok(Some(vec![df])); } // TODO! this is slower than it needs to be diff --git a/crates/polars-io/src/predicates.rs b/crates/polars-io/src/predicates.rs index 6beac2a3af12..b4dfa24b412d 100644 --- a/crates/polars-io/src/predicates.rs +++ b/crates/polars-io/src/predicates.rs @@ -19,16 +19,6 @@ pub trait StatsEvaluator { fn should_read(&self, stats: &BatchStats) -> PolarsResult; } -#[cfg(feature = "parquet")] -pub(crate) fn arrow_schema_to_empty_df(schema: &ArrowSchema) -> DataFrame { - let columns = schema - .fields - .iter() - .map(|fld| Series::full_null(&fld.name, 0, &fld.data_type().into())) - .collect(); - DataFrame::new_no_checks(columns) -} - #[cfg(any(feature = "parquet", feature = "json",))] pub(crate) fn apply_predicate( df: &mut DataFrame, @@ -172,12 +162,12 @@ impl ColumnStats { #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[derive(Debug)] pub struct BatchStats { - schema: Schema, + schema: SchemaRef, stats: Vec, } impl BatchStats { - pub fn new(schema: Schema, stats: Vec) -> Self { + pub fn new(schema: SchemaRef, stats: Vec) -> Self { Self { schema, stats } } @@ -186,7 +176,7 @@ impl BatchStats { } pub fn schema(&self) -> &Schema { - &self.schema + self.schema.as_ref() } pub fn column_stats(&self) -> &[ColumnStats] { diff --git a/crates/polars-io/src/utils.rs b/crates/polars-io/src/utils.rs index 2bf09380cd46..2f14628a1fd6 100644 --- a/crates/polars-io/src/utils.rs +++ b/crates/polars-io/src/utils.rs @@ -75,6 +75,25 @@ pub(crate) fn apply_projection(schema: &ArrowSchema, projection: &[usize]) -> Ar ArrowSchema::from(fields) } +#[cfg(feature = "parquet")] +pub(crate) fn apply_projection_pl_schema(schema: &Schema, projection: &[usize]) -> Schema { + Schema::from_iter(projection.iter().map(|idx| { + let (name, dt) = schema.get_at_index(*idx).unwrap(); + Field::new(name.as_str(), dt.clone()) + })) +} + +#[cfg(feature = "parquet")] +pub(crate) fn columns_to_projection_pl_schema( + columns: &[String], + schema: &Schema, +) -> PolarsResult> { + columns + .iter() + .map(|name| schema.try_get_full(name).map(|(idx, _, _)| idx)) + .collect() +} + #[cfg(any( feature = "ipc", feature = "ipc_streaming", @@ -85,11 +104,9 @@ pub(crate) fn columns_to_projection( columns: &[String], schema: &ArrowSchema, ) -> PolarsResult> { - use ahash::AHashMap; - let mut prj = Vec::with_capacity(columns.len()); if columns.len() > 100 { - let mut column_names = AHashMap::with_capacity(schema.fields.len()); + let mut column_names = PlHashMap::with_capacity(schema.fields.len()); schema.fields.iter().enumerate().for_each(|(i, c)| { column_names.insert(c.name.as_str(), i); }); diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs index 4bbae2254a87..143f3eeaaa43 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs @@ -80,9 +80,10 @@ impl ParquetExec { .with_row_count(mem::take(&mut self.file_options.row_count)) .with_projection(projection) .use_statistics(self.options.use_statistics) + .with_predicate(predicate) .with_hive_partition_columns(hive_partitions); - reader.finish(predicate).await + reader.finish().await }) } #[cfg(not(feature = "cloud"))] diff --git a/crates/polars-plan/src/logical_plan/builder.rs b/crates/polars-plan/src/logical_plan/builder.rs index ebe5fdff2f02..7bc50b97c076 100644 --- a/crates/polars-plan/src/logical_plan/builder.rs +++ b/crates/polars-plan/src/logical_plan/builder.rs @@ -85,11 +85,12 @@ macro_rules! try_delayed { } #[cfg(any(feature = "parquet", feature = "parquet_async",))] -fn prepare_schema(mut schema: Schema, row_count: Option<&RowCount>) -> SchemaRef { +fn prepare_schema(mut schema: SchemaRef, row_count: Option<&RowCount>) -> SchemaRef { + let schema_mut = Arc::make_mut(&mut schema); if let Some(rc) = row_count { - let _ = schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE); + let _ = schema_mut.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE); } - Arc::new(schema) + schema } impl LogicalPlanBuilder { @@ -167,7 +168,7 @@ impl LogicalPlanBuilder { let mut reader = ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), None, None) .await?; - let schema = Arc::new(reader.schema().await?); + let schema = reader.schema().await?; let num_rows = reader.num_rows().await?; let metadata = reader.get_metadata().await?.clone(); diff --git a/crates/polars-plan/src/logical_plan/hive.rs b/crates/polars-plan/src/logical_plan/hive.rs index 22745cf6247f..df820ac75d1e 100644 --- a/crates/polars-plan/src/logical_plan/hive.rs +++ b/crates/polars-plan/src/logical_plan/hive.rs @@ -76,7 +76,7 @@ impl HivePartitions { } else { let schema: Schema = partitions.as_slice().into(); let stats = BatchStats::new( - schema, + Arc::new(schema), partitions .into_iter() .map(ColumnStats::from_column_literal) From cee1f01b93dd52a52d271c2d4b0513b4c66a0b04 Mon Sep 17 00:00:00 2001 From: ritchie Date: Fri, 13 Oct 2023 10:52:52 +0200 Subject: [PATCH 3/3] features --- crates/polars-io/Cargo.toml | 3 ++- crates/polars-io/src/parquet/read.rs | 10 +++++-- crates/polars-io/src/parquet/read_impl.rs | 32 +++++++++++------------ 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/crates/polars-io/Cargo.toml b/crates/polars-io/Cargo.toml index e299003fab50..e273b475f4fa 100644 --- a/crates/polars-io/Cargo.toml +++ b/crates/polars-io/Cargo.toml @@ -38,7 +38,7 @@ rayon = { workspace = true } regex = { workspace = true } reqwest = { workspace = true, optional = true } ryu = { workspace = true, optional = true } -serde = { workspace = true, features = ["derive"], optional = true } +serde = { workspace = true, features = ["derive", "rc"], optional = true } serde_json = { version = "1", default-features = false, features = ["alloc", "raw_value"], optional = true } simd-json = { workspace = true, optional = true } simdutf8 = { workspace = true, optional = true } @@ -65,6 +65,7 @@ json = [ "dtype-struct", "csv", ] +serde = ["dep:serde", "polars-core/serde-lazy"] # support for arrows ipc file parsing ipc = ["arrow/io_ipc", "arrow/io_ipc_compression"] # support for arrows streaming ipc file parsing diff --git a/crates/polars-io/src/parquet/read.rs b/crates/polars-io/src/parquet/read.rs index 7d5a7b8ac4bf..2586ce0b25d8 100644 --- a/crates/polars-io/src/parquet/read.rs +++ b/crates/polars-io/src/parquet/read.rs @@ -76,7 +76,7 @@ impl ParquetReader { projection, &schema, Some(metadata), - predicate, + predicate.as_deref(), self.parallel, self.row_count, self.use_statistics, @@ -180,6 +180,7 @@ impl ParquetReader { schema, self.n_rows.unwrap_or(usize::MAX), self.projection, + None, self.row_count, chunk_size, self.use_statistics, @@ -342,6 +343,7 @@ impl ParquetAsyncReader { schema, self.n_rows.unwrap_or(usize::MAX), self.projection, + self.predicate.clone(), self.row_count, chunk_size, self.use_statistics, @@ -353,8 +355,9 @@ impl ParquetAsyncReader { self.reader.get_metadata().await } - pub async fn finish(self) -> PolarsResult { + pub async fn finish(mut self) -> PolarsResult { let rechunk = self.rechunk; + let schema = self.schema().await?; let predicate = self.predicate.clone(); // batched reader deals with slice pushdown @@ -369,6 +372,9 @@ impl ParquetAsyncReader { })?; chunks.push(out) } + if chunks.is_empty() { + return Ok(DataFrame::from(schema.as_ref())); + } let mut df = concat_df(&chunks)?; if rechunk { diff --git a/crates/polars-io/src/parquet/read_impl.rs b/crates/polars-io/src/parquet/read_impl.rs index cf3d1edce852..85fbb544ef25 100644 --- a/crates/polars-io/src/parquet/read_impl.rs +++ b/crates/polars-io/src/parquet/read_impl.rs @@ -115,7 +115,7 @@ fn rg_to_dfs( remaining_rows: &mut usize, file_metadata: &FileMetaData, schema: &SchemaRef, - predicate: Option>, + predicate: Option<&dyn PhysicalIoExpr>, row_count: Option, parallel: ParallelStrategy, projection: &[usize], @@ -166,7 +166,7 @@ fn rg_to_dfs_optionally_par_over_columns( remaining_rows: &mut usize, file_metadata: &FileMetaData, schema: &SchemaRef, - predicate: Option>, + predicate: Option<&dyn PhysicalIoExpr>, row_count: Option, parallel: ParallelStrategy, projection: &[usize], @@ -175,12 +175,12 @@ fn rg_to_dfs_optionally_par_over_columns( ) -> PolarsResult> { let mut dfs = Vec::with_capacity(row_group_end - row_group_start); - for rg in row_group_start..row_group_end { - let md = &file_metadata.row_groups[rg]; + for rg_idx in row_group_start..row_group_end { + let md = &file_metadata.row_groups[rg_idx]; let current_row_count = md.num_rows() as IdxSize; if use_statistics - && !read_this_row_group(predicate.as_deref(), &file_metadata.row_groups[rg], schema)? + && !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)? { *previous_row_count += current_row_count; continue; @@ -217,7 +217,8 @@ fn rg_to_dfs_optionally_par_over_columns( .collect::>>()? }; - *remaining_rows = remaining_rows.saturating_sub(file_metadata.row_groups[rg].num_rows()); + *remaining_rows = + remaining_rows.saturating_sub(file_metadata.row_groups[rg_idx].num_rows()); let mut df = DataFrame::new_no_checks(columns); if let Some(rc) = &row_count { @@ -225,7 +226,7 @@ fn rg_to_dfs_optionally_par_over_columns( } materialize_hive_partitions(&mut df, hive_partition_columns); - apply_predicate(&mut df, predicate.as_deref(), true)?; + apply_predicate(&mut df, predicate, true)?; *previous_row_count += current_row_count; dfs.push(df); @@ -247,7 +248,7 @@ fn rg_to_dfs_par_over_rg( remaining_rows: &mut usize, file_metadata: &FileMetaData, schema: &SchemaRef, - predicate: Option>, + predicate: Option<&dyn PhysicalIoExpr>, row_count: Option, projection: &[usize], use_statistics: bool, @@ -276,11 +277,7 @@ fn rg_to_dfs_par_over_rg( .map(|(rg_idx, md, local_limit, row_count_start)| { if local_limit == 0 || use_statistics - && !read_this_row_group( - predicate.as_deref(), - &file_metadata.row_groups[rg_idx], - schema, - )? + && !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)? { return Ok(None); } @@ -305,7 +302,7 @@ fn rg_to_dfs_par_over_rg( } materialize_hive_partitions(&mut df, hive_partition_columns); - apply_predicate(&mut df, predicate.as_deref(), false)?; + apply_predicate(&mut df, predicate, false)?; Ok(Some(df)) }) @@ -320,7 +317,7 @@ pub fn read_parquet( projection: Option<&[usize]>, schema: &SchemaRef, metadata: Option, - predicate: Option>, + predicate: Option<&dyn PhysicalIoExpr>, mut parallel: ParallelStrategy, row_count: Option, use_statistics: bool, @@ -464,6 +461,7 @@ pub struct BatchedParquetReader { projection: Vec, schema: SchemaRef, metadata: FileMetaDataRef, + predicate: Option>, row_count: Option, rows_read: IdxSize, row_group_offset: usize, @@ -483,6 +481,7 @@ impl BatchedParquetReader { schema: SchemaRef, limit: usize, projection: Option>, + predicate: Option>, row_count: Option, chunk_size: usize, use_statistics: bool, @@ -506,6 +505,7 @@ impl BatchedParquetReader { metadata, row_count, rows_read: 0, + predicate, row_group_offset: 0, n_row_groups, chunks_fifo: VecDeque::with_capacity(POOL.current_num_threads()), @@ -534,7 +534,7 @@ impl BatchedParquetReader { &mut self.limit, &self.metadata, &self.schema, - None, + self.predicate.as_deref(), self.row_count.clone(), self.parallel, &self.projection,