Skip to content

Commit

Permalink
features
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 13, 2023
1 parent 58631fb commit cee1f01
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 19 deletions.
3 changes: 2 additions & 1 deletion crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ rayon = { workspace = true }
regex = { workspace = true }
reqwest = { workspace = true, optional = true }
ryu = { workspace = true, optional = true }
serde = { workspace = true, features = ["derive"], optional = true }
serde = { workspace = true, features = ["derive", "rc"], optional = true }
serde_json = { version = "1", default-features = false, features = ["alloc", "raw_value"], optional = true }
simd-json = { workspace = true, optional = true }
simdutf8 = { workspace = true, optional = true }
Expand All @@ -65,6 +65,7 @@ json = [
"dtype-struct",
"csv",
]
serde = ["dep:serde", "polars-core/serde-lazy"]
# support for arrows ipc file parsing
ipc = ["arrow/io_ipc", "arrow/io_ipc_compression"]
# support for arrows streaming ipc file parsing
Expand Down
10 changes: 8 additions & 2 deletions crates/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl<R: MmapBytesReader> ParquetReader<R> {
projection,
&schema,
Some(metadata),
predicate,
predicate.as_deref(),
self.parallel,
self.row_count,
self.use_statistics,
Expand Down Expand Up @@ -180,6 +180,7 @@ impl<R: MmapBytesReader + 'static> ParquetReader<R> {
schema,
self.n_rows.unwrap_or(usize::MAX),
self.projection,
None,
self.row_count,
chunk_size,
self.use_statistics,
Expand Down Expand Up @@ -342,6 +343,7 @@ impl ParquetAsyncReader {
schema,
self.n_rows.unwrap_or(usize::MAX),
self.projection,
self.predicate.clone(),
self.row_count,
chunk_size,
self.use_statistics,
Expand All @@ -353,8 +355,9 @@ impl ParquetAsyncReader {
self.reader.get_metadata().await
}

pub async fn finish(self) -> PolarsResult<DataFrame> {
pub async fn finish(mut self) -> PolarsResult<DataFrame> {
let rechunk = self.rechunk;
let schema = self.schema().await?;

let predicate = self.predicate.clone();
// batched reader deals with slice pushdown
Expand All @@ -369,6 +372,9 @@ impl ParquetAsyncReader {
})?;
chunks.push(out)
}
if chunks.is_empty() {
return Ok(DataFrame::from(schema.as_ref()));
}
let mut df = concat_df(&chunks)?;

if rechunk {
Expand Down
32 changes: 16 additions & 16 deletions crates/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ fn rg_to_dfs(
remaining_rows: &mut usize,
file_metadata: &FileMetaData,
schema: &SchemaRef,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
predicate: Option<&dyn PhysicalIoExpr>,
row_count: Option<RowCount>,
parallel: ParallelStrategy,
projection: &[usize],
Expand Down Expand Up @@ -166,7 +166,7 @@ fn rg_to_dfs_optionally_par_over_columns(
remaining_rows: &mut usize,
file_metadata: &FileMetaData,
schema: &SchemaRef,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
predicate: Option<&dyn PhysicalIoExpr>,
row_count: Option<RowCount>,
parallel: ParallelStrategy,
projection: &[usize],
Expand All @@ -175,12 +175,12 @@ fn rg_to_dfs_optionally_par_over_columns(
) -> PolarsResult<Vec<DataFrame>> {
let mut dfs = Vec::with_capacity(row_group_end - row_group_start);

for rg in row_group_start..row_group_end {
let md = &file_metadata.row_groups[rg];
for rg_idx in row_group_start..row_group_end {
let md = &file_metadata.row_groups[rg_idx];
let current_row_count = md.num_rows() as IdxSize;

if use_statistics
&& !read_this_row_group(predicate.as_deref(), &file_metadata.row_groups[rg], schema)?
&& !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)?
{
*previous_row_count += current_row_count;
continue;
Expand Down Expand Up @@ -217,15 +217,16 @@ fn rg_to_dfs_optionally_par_over_columns(
.collect::<PolarsResult<Vec<_>>>()?
};

*remaining_rows = remaining_rows.saturating_sub(file_metadata.row_groups[rg].num_rows());
*remaining_rows =
remaining_rows.saturating_sub(file_metadata.row_groups[rg_idx].num_rows());

let mut df = DataFrame::new_no_checks(columns);
if let Some(rc) = &row_count {
df.with_row_count_mut(&rc.name, Some(*previous_row_count + rc.offset));
}
materialize_hive_partitions(&mut df, hive_partition_columns);

apply_predicate(&mut df, predicate.as_deref(), true)?;
apply_predicate(&mut df, predicate, true)?;

*previous_row_count += current_row_count;
dfs.push(df);
Expand All @@ -247,7 +248,7 @@ fn rg_to_dfs_par_over_rg(
remaining_rows: &mut usize,
file_metadata: &FileMetaData,
schema: &SchemaRef,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
predicate: Option<&dyn PhysicalIoExpr>,
row_count: Option<RowCount>,
projection: &[usize],
use_statistics: bool,
Expand Down Expand Up @@ -276,11 +277,7 @@ fn rg_to_dfs_par_over_rg(
.map(|(rg_idx, md, local_limit, row_count_start)| {
if local_limit == 0
|| use_statistics
&& !read_this_row_group(
predicate.as_deref(),
&file_metadata.row_groups[rg_idx],
schema,
)?
&& !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)?
{
return Ok(None);
}
Expand All @@ -305,7 +302,7 @@ fn rg_to_dfs_par_over_rg(
}
materialize_hive_partitions(&mut df, hive_partition_columns);

apply_predicate(&mut df, predicate.as_deref(), false)?;
apply_predicate(&mut df, predicate, false)?;

Ok(Some(df))
})
Expand All @@ -320,7 +317,7 @@ pub fn read_parquet<R: MmapBytesReader>(
projection: Option<&[usize]>,
schema: &SchemaRef,
metadata: Option<FileMetaDataRef>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
predicate: Option<&dyn PhysicalIoExpr>,
mut parallel: ParallelStrategy,
row_count: Option<RowCount>,
use_statistics: bool,
Expand Down Expand Up @@ -464,6 +461,7 @@ pub struct BatchedParquetReader {
projection: Vec<usize>,
schema: SchemaRef,
metadata: FileMetaDataRef,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
row_count: Option<RowCount>,
rows_read: IdxSize,
row_group_offset: usize,
Expand All @@ -483,6 +481,7 @@ impl BatchedParquetReader {
schema: SchemaRef,
limit: usize,
projection: Option<Vec<usize>>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
row_count: Option<RowCount>,
chunk_size: usize,
use_statistics: bool,
Expand All @@ -506,6 +505,7 @@ impl BatchedParquetReader {
metadata,
row_count,
rows_read: 0,
predicate,
row_group_offset: 0,
n_row_groups,
chunks_fifo: VecDeque::with_capacity(POOL.current_num_threads()),
Expand Down Expand Up @@ -534,7 +534,7 @@ impl BatchedParquetReader {
&mut self.limit,
&self.metadata,
&self.schema,
None,
self.predicate.as_deref(),
self.row_count.clone(),
self.parallel,
&self.projection,
Expand Down

0 comments on commit cee1f01

Please sign in to comment.