diff --git a/crates/polars-io/Cargo.toml b/crates/polars-io/Cargo.toml index e299003fab50..e273b475f4fa 100644 --- a/crates/polars-io/Cargo.toml +++ b/crates/polars-io/Cargo.toml @@ -38,7 +38,7 @@ rayon = { workspace = true } regex = { workspace = true } reqwest = { workspace = true, optional = true } ryu = { workspace = true, optional = true } -serde = { workspace = true, features = ["derive"], optional = true } +serde = { workspace = true, features = ["derive", "rc"], optional = true } serde_json = { version = "1", default-features = false, features = ["alloc", "raw_value"], optional = true } simd-json = { workspace = true, optional = true } simdutf8 = { workspace = true, optional = true } @@ -65,6 +65,7 @@ json = [ "dtype-struct", "csv", ] +serde = ["dep:serde", "polars-core/serde-lazy"] # support for arrows ipc file parsing ipc = ["arrow/io_ipc", "arrow/io_ipc_compression"] # support for arrows streaming ipc file parsing diff --git a/crates/polars-io/src/parquet/read.rs b/crates/polars-io/src/parquet/read.rs index 7d5a7b8ac4bf..2586ce0b25d8 100644 --- a/crates/polars-io/src/parquet/read.rs +++ b/crates/polars-io/src/parquet/read.rs @@ -76,7 +76,7 @@ impl ParquetReader { projection, &schema, Some(metadata), - predicate, + predicate.as_deref(), self.parallel, self.row_count, self.use_statistics, @@ -180,6 +180,7 @@ impl ParquetReader { schema, self.n_rows.unwrap_or(usize::MAX), self.projection, + None, self.row_count, chunk_size, self.use_statistics, @@ -342,6 +343,7 @@ impl ParquetAsyncReader { schema, self.n_rows.unwrap_or(usize::MAX), self.projection, + self.predicate.clone(), self.row_count, chunk_size, self.use_statistics, @@ -353,8 +355,9 @@ impl ParquetAsyncReader { self.reader.get_metadata().await } - pub async fn finish(self) -> PolarsResult { + pub async fn finish(mut self) -> PolarsResult { let rechunk = self.rechunk; + let schema = self.schema().await?; let predicate = self.predicate.clone(); // batched reader deals with slice pushdown @@ -369,6 +372,9 @@ impl ParquetAsyncReader { })?; chunks.push(out) } + if chunks.is_empty() { + return Ok(DataFrame::from(schema.as_ref())); + } let mut df = concat_df(&chunks)?; if rechunk { diff --git a/crates/polars-io/src/parquet/read_impl.rs b/crates/polars-io/src/parquet/read_impl.rs index cf3d1edce852..85fbb544ef25 100644 --- a/crates/polars-io/src/parquet/read_impl.rs +++ b/crates/polars-io/src/parquet/read_impl.rs @@ -115,7 +115,7 @@ fn rg_to_dfs( remaining_rows: &mut usize, file_metadata: &FileMetaData, schema: &SchemaRef, - predicate: Option>, + predicate: Option<&dyn PhysicalIoExpr>, row_count: Option, parallel: ParallelStrategy, projection: &[usize], @@ -166,7 +166,7 @@ fn rg_to_dfs_optionally_par_over_columns( remaining_rows: &mut usize, file_metadata: &FileMetaData, schema: &SchemaRef, - predicate: Option>, + predicate: Option<&dyn PhysicalIoExpr>, row_count: Option, parallel: ParallelStrategy, projection: &[usize], @@ -175,12 +175,12 @@ fn rg_to_dfs_optionally_par_over_columns( ) -> PolarsResult> { let mut dfs = Vec::with_capacity(row_group_end - row_group_start); - for rg in row_group_start..row_group_end { - let md = &file_metadata.row_groups[rg]; + for rg_idx in row_group_start..row_group_end { + let md = &file_metadata.row_groups[rg_idx]; let current_row_count = md.num_rows() as IdxSize; if use_statistics - && !read_this_row_group(predicate.as_deref(), &file_metadata.row_groups[rg], schema)? + && !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)? { *previous_row_count += current_row_count; continue; @@ -217,7 +217,8 @@ fn rg_to_dfs_optionally_par_over_columns( .collect::>>()? }; - *remaining_rows = remaining_rows.saturating_sub(file_metadata.row_groups[rg].num_rows()); + *remaining_rows = + remaining_rows.saturating_sub(file_metadata.row_groups[rg_idx].num_rows()); let mut df = DataFrame::new_no_checks(columns); if let Some(rc) = &row_count { @@ -225,7 +226,7 @@ fn rg_to_dfs_optionally_par_over_columns( } materialize_hive_partitions(&mut df, hive_partition_columns); - apply_predicate(&mut df, predicate.as_deref(), true)?; + apply_predicate(&mut df, predicate, true)?; *previous_row_count += current_row_count; dfs.push(df); @@ -247,7 +248,7 @@ fn rg_to_dfs_par_over_rg( remaining_rows: &mut usize, file_metadata: &FileMetaData, schema: &SchemaRef, - predicate: Option>, + predicate: Option<&dyn PhysicalIoExpr>, row_count: Option, projection: &[usize], use_statistics: bool, @@ -276,11 +277,7 @@ fn rg_to_dfs_par_over_rg( .map(|(rg_idx, md, local_limit, row_count_start)| { if local_limit == 0 || use_statistics - && !read_this_row_group( - predicate.as_deref(), - &file_metadata.row_groups[rg_idx], - schema, - )? + && !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)? { return Ok(None); } @@ -305,7 +302,7 @@ fn rg_to_dfs_par_over_rg( } materialize_hive_partitions(&mut df, hive_partition_columns); - apply_predicate(&mut df, predicate.as_deref(), false)?; + apply_predicate(&mut df, predicate, false)?; Ok(Some(df)) }) @@ -320,7 +317,7 @@ pub fn read_parquet( projection: Option<&[usize]>, schema: &SchemaRef, metadata: Option, - predicate: Option>, + predicate: Option<&dyn PhysicalIoExpr>, mut parallel: ParallelStrategy, row_count: Option, use_statistics: bool, @@ -464,6 +461,7 @@ pub struct BatchedParquetReader { projection: Vec, schema: SchemaRef, metadata: FileMetaDataRef, + predicate: Option>, row_count: Option, rows_read: IdxSize, row_group_offset: usize, @@ -483,6 +481,7 @@ impl BatchedParquetReader { schema: SchemaRef, limit: usize, projection: Option>, + predicate: Option>, row_count: Option, chunk_size: usize, use_statistics: bool, @@ -506,6 +505,7 @@ impl BatchedParquetReader { metadata, row_count, rows_read: 0, + predicate, row_group_offset: 0, n_row_groups, chunks_fifo: VecDeque::with_capacity(POOL.current_num_threads()), @@ -534,7 +534,7 @@ impl BatchedParquetReader { &mut self.limit, &self.metadata, &self.schema, - None, + self.predicate.as_deref(), self.row_count.clone(), self.parallel, &self.projection,