Skip to content

Commit

Permalink
refactor to schema
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 13, 2023
1 parent c754a4c commit 58631fb
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 80 deletions.
24 changes: 23 additions & 1 deletion crates/polars-io/src/parquet/async_impl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Read parquet files in parallel from the Object Store without a third party crate.
use std::borrow::Cow;
use std::ops::Range;
use std::sync::Arc;

Expand All @@ -19,6 +20,8 @@ use super::cloud::{build_object_store, CloudLocation, CloudReader};
use super::mmap;
use super::mmap::ColumnStore;
use crate::cloud::CloudOptions;
use crate::predicates::PhysicalIoExpr;
use crate::prelude::predicates::read_this_row_group;

pub struct ParquetObjectStore {
store: Arc<dyn ObjectStore>,
Expand Down Expand Up @@ -155,6 +158,8 @@ pub struct FetchRowGroupsFromObjectStore {
reader: Arc<ParquetObjectStore>,
row_groups_metadata: Vec<RowGroupMetaData>,
projected_fields: Vec<SmartString>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
schema: SchemaRef,
logging: bool,
}

Expand All @@ -164,6 +169,7 @@ impl FetchRowGroupsFromObjectStore {
metadata: &FileMetaData,
schema: SchemaRef,
projection: Option<&[usize]>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
) -> PolarsResult<Self> {
let logging = verbose();

Expand All @@ -180,6 +186,8 @@ impl FetchRowGroupsFromObjectStore {
reader: Arc::new(reader),
row_groups_metadata: metadata.row_groups.to_owned(),
projected_fields,
predicate,
schema,
logging,
})
}
Expand All @@ -199,9 +207,23 @@ impl FetchRowGroupsFromObjectStore {
Ok,
)?;

let row_groups = if let Some(pred) = self.predicate.as_deref() {
Cow::Owned(
row_groups
.iter()
.filter(|rg| {
matches!(read_this_row_group(Some(pred), rg, &self.schema), Ok(true))
})
.cloned()
.collect::<Vec<_>>(),
)
} else {
Cow::Borrowed(row_groups)
};

// Package in the format required by ColumnStore.
let downloaded =
download_projection(&self.projected_fields, row_groups, &self.reader).await?;
download_projection(&self.projected_fields, &row_groups, &self.reader).await?;

if self.logging {
eprintln!(
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-io/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ mod read;
mod read_impl;
mod write;

use arrow::io::parquet::write::FileMetaData;
pub use read::*;
pub use write::{BrotliLevel, GzipLevel, ZstdLevel, *};

pub type FileMetaDataRef = Arc<FileMetaData>;

use super::*;

#[cfg(test)]
Expand Down
19 changes: 9 additions & 10 deletions crates/polars-io/src/parquet/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ impl ColumnStats {
/// Collect the statistics in a column chunk.
pub(crate) fn collect_statistics(
md: &RowGroupMetaData,
arrow_schema: &ArrowSchema,
schema: SchemaRef,
) -> ArrowResult<Option<BatchStats>> {
let mut schema = Schema::with_capacity(arrow_schema.fields.len());
let mut stats = vec![];

for fld in &arrow_schema.fields {
let st = deserialize(fld, md)?;
schema.with_column((&fld.name).into(), (&fld.data_type).into());
stats.push(ColumnStats::from_arrow_stats(st, fld));
for (name, dt) in schema.iter() {
let fld = ArrowField::new(name.as_str(), dt.to_arrow(), true);
let st = deserialize(&fld, md)?;
stats.push(ColumnStats::from_arrow_stats(st, &fld));
}

Ok(if stats.is_empty() {
Expand All @@ -38,13 +37,13 @@ pub(crate) fn collect_statistics(
}

pub(super) fn read_this_row_group(
predicate: Option<&Arc<dyn PhysicalIoExpr>>,
predicate: Option<&dyn PhysicalIoExpr>,
md: &RowGroupMetaData,
schema: &ArrowSchema,
schema: &SchemaRef,
) -> PolarsResult<bool> {
if let Some(pred) = &predicate {
if let Some(pred) = predicate {
if let Some(pred) = pred.as_stats_evaluator() {
if let Some(stats) = collect_statistics(md, schema)? {
if let Some(stats) = collect_statistics(md, schema.clone())? {
let should_read = pred.should_read(&stats);
// a parquet file may not have statistics of all columns
if matches!(should_read, Ok(false)) {
Expand Down
52 changes: 37 additions & 15 deletions crates/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub struct ParquetReader<R: Read + Seek> {
columns: Option<Vec<String>>,
projection: Option<Vec<usize>>,
parallel: ParallelStrategy,
schema: Option<SchemaRef>,
row_count: Option<RowCount>,
low_memory: bool,
metadata: Option<Arc<FileMetaData>>,
Expand All @@ -65,8 +66,8 @@ impl<R: MmapBytesReader> ParquetReader<R> {
projection: Option<&[usize]>,
) -> PolarsResult<DataFrame> {
// this path takes predicates and parallelism into account
let metadata = read::read_metadata(&mut self.reader)?;
let schema = read::schema::infer_schema(&metadata)?;
let metadata = self.get_metadata()?.clone();
let schema = self.schema()?;

let rechunk = self.rechunk;
read_parquet(
Expand Down Expand Up @@ -129,9 +130,16 @@ impl<R: MmapBytesReader> ParquetReader<R> {
}

/// [`Schema`] of the file.
pub fn schema(&mut self) -> PolarsResult<Schema> {
let metadata = self.get_metadata()?;
Ok(Schema::from_iter(&read::infer_schema(metadata)?.fields))
pub fn schema(&mut self) -> PolarsResult<SchemaRef> {
match &self.schema {
Some(schema) => Ok(schema.clone()),
None => {
let metadata = self.get_metadata()?;
Ok(Arc::new(Schema::from_iter(
&read::infer_schema(metadata)?.fields,
)))
},
}
}

/// Use statistics in the parquet to determine if pages
Expand All @@ -152,7 +160,7 @@ impl<R: MmapBytesReader> ParquetReader<R> {
self
}

pub fn get_metadata(&mut self) -> PolarsResult<&Arc<FileMetaData>> {
pub fn get_metadata(&mut self) -> PolarsResult<&FileMetaDataRef> {
if self.metadata.is_none() {
self.metadata = Some(Arc::new(read::read_metadata(&mut self.reader)?));
}
Expand All @@ -163,11 +171,13 @@ impl<R: MmapBytesReader> ParquetReader<R> {
impl<R: MmapBytesReader + 'static> ParquetReader<R> {
pub fn batched(mut self, chunk_size: usize) -> PolarsResult<BatchedParquetReader> {
let metadata = self.get_metadata()?.clone();
let schema = self.schema()?;

let row_group_fetcher = FetchRowGroupsFromMmapReader::new(Box::new(self.reader))?.into();
BatchedParquetReader::new(
row_group_fetcher,
metadata,
schema,
self.n_rows.unwrap_or(usize::MAX),
self.projection,
self.row_count,
Expand All @@ -191,6 +201,7 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
row_count: None,
low_memory: false,
metadata: None,
schema: None,
use_statistics: true,
hive_partition_columns: None,
}
Expand All @@ -202,11 +213,11 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
}

fn finish(mut self) -> PolarsResult<DataFrame> {
let metadata = read::read_metadata(&mut self.reader)?;
let schema = read::schema::infer_schema(&metadata)?;
let schema = self.schema()?;
let metadata = self.get_metadata()?.clone();

if let Some(cols) = &self.columns {
self.projection = Some(columns_to_projection(cols, &schema)?);
self.projection = Some(columns_to_projection_pl_schema(cols, schema.as_ref())?);
}

read_parquet(
Expand Down Expand Up @@ -238,6 +249,7 @@ pub struct ParquetAsyncReader {
n_rows: Option<usize>,
rechunk: bool,
projection: Option<Vec<usize>>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
row_count: Option<RowCount>,
use_statistics: bool,
hive_partition_columns: Option<Vec<Series>>,
Expand All @@ -258,14 +270,18 @@ impl ParquetAsyncReader {
n_rows: None,
projection: None,
row_count: None,
predicate: None,
use_statistics: true,
hive_partition_columns: None,
schema,
})
}

pub async fn schema(&mut self) -> PolarsResult<Schema> {
self.reader.schema().await
pub async fn schema(&mut self) -> PolarsResult<SchemaRef> {
match &self.schema {
Some(schema) => Ok(schema.clone()),
None => self.reader.schema().await.map(Arc::new),
}
}
pub async fn num_rows(&mut self) -> PolarsResult<usize> {
self.reader.num_rows().await
Expand All @@ -291,6 +307,11 @@ impl ParquetAsyncReader {
self
}

pub fn with_predicate(mut self, predicate: Option<Arc<dyn PhysicalIoExpr>>) -> Self {
self.predicate = predicate;
self
}

/// Use statistics in the parquet to determine if pages
/// can be skipped from reading.
pub fn use_statistics(mut self, toggle: bool) -> Self {
Expand All @@ -305,17 +326,20 @@ impl ParquetAsyncReader {

pub async fn batched(mut self, chunk_size: usize) -> PolarsResult<BatchedParquetReader> {
let metadata = self.reader.get_metadata().await?.clone();
let schema = self.schema().await?;
// row group fetched deals with projection
let row_group_fetcher = FetchRowGroupsFromObjectStore::new(
self.reader,
&metadata,
self.schema.unwrap(),
self.projection.as_deref(),
self.predicate.clone(),
)?
.into();
BatchedParquetReader::new(
row_group_fetcher,
metadata,
schema,
self.n_rows.unwrap_or(usize::MAX),
self.projection,
self.row_count,
Expand All @@ -329,12 +353,10 @@ impl ParquetAsyncReader {
self.reader.get_metadata().await
}

pub async fn finish(
self,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
) -> PolarsResult<DataFrame> {
pub async fn finish(self) -> PolarsResult<DataFrame> {
let rechunk = self.rechunk;

let predicate = self.predicate.clone();
// batched reader deals with slice pushdown
let reader = self.batched(usize::MAX).await?;
let mut iter = reader.iter(16);
Expand Down
Loading

0 comments on commit 58631fb

Please sign in to comment.