Skip to content

Commit

Permalink
feat(python,rust!): Allow specifying Hive schema in `read/scan_parque…
Browse files Browse the repository at this point in the history
…t` (#15434)
  • Loading branch information
stinodego authored Apr 2, 2024
1 parent 802d6c8 commit cdbfdc1
Show file tree
Hide file tree
Showing 12 changed files with 410 additions and 168 deletions.
18 changes: 18 additions & 0 deletions crates/polars-io/src/options.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use polars_core::schema::SchemaRef;
use polars_utils::IdxSize;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
Expand All @@ -8,3 +9,20 @@ pub struct RowIndex {
pub name: String,
pub offset: IdxSize,
}

/// Options for Hive partitioning.
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct HiveOptions {
pub enabled: bool,
pub schema: Option<SchemaRef>,
}

impl Default for HiveOptions {
fn default() -> Self {
Self {
enabled: true,
schema: None,
}
}
}
131 changes: 86 additions & 45 deletions crates/polars-io/src/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,24 @@ pub fn apply_predicate(
Ok(())
}

/// The statistics for a column in a Parquet file
/// or Hive partition.
/// they typically hold
/// - max value
/// - min value
/// - null_count
/// Statistics of the values in a column.
///
/// The following statistics are tracked for each row group:
/// - Null count
/// - Minimum value
/// - Maximum value
#[derive(Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ColumnStats {
field: Field,
// The array may hold the null count for every row group,
// or for a single row group.
// Each Series contains the stats for each row group.
null_count: Option<Series>,
min_value: Option<Series>,
max_value: Option<Series>,
}

impl ColumnStats {
/// Constructs a new [`ColumnStats`].
pub fn new(
field: Field,
null_count: Option<Series>,
Expand All @@ -70,6 +70,17 @@ impl ColumnStats {
}
}

/// Constructs a new [`ColumnStats`] with only the [`Field`] information and no statistics.
pub fn from_field(field: Field) -> Self {
Self {
field,
null_count: None,
min_value: None,
max_value: None,
}
}

/// Constructs a new [`ColumnStats`] from a single-value Series.
pub fn from_column_literal(s: Series) -> Self {
debug_assert_eq!(s.len(), 1);
Self {
Expand All @@ -80,16 +91,33 @@ impl ColumnStats {
}
}

/// Returns the [`DataType`] of the column.
pub fn dtype(&self) -> &DataType {
self.field.data_type()
}

/// Returns the null count of each row group of the column.
pub fn get_null_count_state(&self) -> Option<&Series> {
self.null_count.as_ref()
}

/// Returns the minimum value of each row group of the column.
pub fn get_min_state(&self) -> Option<&Series> {
self.min_value.as_ref()
}

/// Returns the maximum value of each row group of the column.
pub fn get_max_state(&self) -> Option<&Series> {
self.max_value.as_ref()
}

/// Returns the null count of the column.
pub fn null_count(&self) -> Option<usize> {
match self.field.data_type() {
match self.dtype() {
#[cfg(feature = "dtype-struct")]
DataType::Struct(_) => None,
_ => {
let s = self.null_count.as_ref()?;
let s = self.get_null_count_state()?;
// if all null, there are no statistics.
if s.null_count() != s.len() {
s.sum().ok()
Expand All @@ -100,34 +128,33 @@ impl ColumnStats {
}
}

/// Returns the minimum and maximum values of the column as a single [`Series`].
pub fn to_min_max(&self) -> Option<Series> {
let max_val = self.max_value.as_ref()?;
let min_val = self.min_value.as_ref()?;
let min_val = self.get_min_state()?;
let max_val = self.get_max_state()?;
let dtype = self.dtype();

let dtype = min_val.dtype();
if !use_min_max(dtype) {
return None;
}

if Self::use_min_max(dtype) {
let mut min_max_values = min_val.clone();
min_max_values.append(max_val).unwrap();
if min_max_values.null_count() > 0 {
None
} else {
Some(min_max_values)
}
} else {
let mut min_max_values = min_val.clone();
min_max_values.append(max_val).unwrap();
if min_max_values.null_count() > 0 {
None
} else {
Some(min_max_values)
}
}

pub fn get_min_state(&self) -> Option<&Series> {
self.min_value.as_ref()
}

/// Returns the minimum value of the column as a single-value [`Series`].
///
/// Returns `None` if no maximum value is available.
pub fn to_min(&self) -> Option<&Series> {
let min_val = self.min_value.as_ref()?;
let dtype = min_val.dtype();

if !Self::use_min_max(dtype) || min_val.len() != 1 {
if !use_min_max(dtype) || min_val.len() != 1 {
return None;
}

Expand All @@ -138,11 +165,14 @@ impl ColumnStats {
}
}

/// Returns the maximum value of the column as a single-value [`Series`].
///
/// Returns `None` if no maximum value is available.
pub fn to_max(&self) -> Option<&Series> {
let max_val = self.max_value.as_ref()?;
let dtype = max_val.dtype();

if !Self::use_min_max(dtype) || max_val.len() != 1 {
if !use_min_max(dtype) || max_val.len() != 1 {
return None;
}

Expand All @@ -152,14 +182,15 @@ impl ColumnStats {
Some(max_val)
}
}
}

fn use_min_max(dtype: &DataType) -> bool {
dtype.is_numeric()
|| matches!(
dtype,
DataType::String | DataType::Binary | DataType::Boolean
)
}
/// Returns whether the [`DataType`] supports minimum/maximum operations.
fn use_min_max(dtype: &DataType) -> bool {
dtype.is_numeric()
|| matches!(
dtype,
DataType::String | DataType::Binary | DataType::Boolean
)
}

/// A collection of column stats with a known schema.
Expand All @@ -168,12 +199,14 @@ impl ColumnStats {
pub struct BatchStats {
schema: SchemaRef,
stats: Vec<ColumnStats>,
// This might not be available,
// as when prunnign hive partitions.
// This might not be available, as when pruning hive partitions.
num_rows: Option<usize>,
}

impl BatchStats {
/// Constructs a new [`BatchStats`].
///
/// The `stats` should match the order of the `schema`.
pub fn new(schema: SchemaRef, stats: Vec<ColumnStats>, num_rows: Option<usize>) -> Self {
Self {
schema,
Expand All @@ -182,19 +215,27 @@ impl BatchStats {
}
}

pub fn get_stats(&self, column: &str) -> polars_core::error::PolarsResult<&ColumnStats> {
self.schema.try_index_of(column).map(|i| &self.stats[i])
}

pub fn num_rows(&self) -> Option<usize> {
self.num_rows
}

/// Returns the [`Schema`] of the batch.
pub fn schema(&self) -> &SchemaRef {
&self.schema
}

/// Returns the [`ColumnStats`] of all columns in the batch, if known.
pub fn column_stats(&self) -> &[ColumnStats] {
self.stats.as_ref()
}

/// Returns the [`ColumnStats`] of a single column in the batch.
///
/// Returns an `Err` if no statistics are available for the given column.
pub fn get_stats(&self, column: &str) -> PolarsResult<&ColumnStats> {
self.schema.try_index_of(column).map(|i| &self.stats[i])
}

/// Returns the number of rows in the batch.
///
/// Returns `None` if the number of rows is unknown.
pub fn num_rows(&self) -> Option<usize> {
self.num_rows
}
}
8 changes: 4 additions & 4 deletions crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::path::{Path, PathBuf};
use polars_core::prelude::*;
use polars_io::cloud::CloudOptions;
use polars_io::parquet::ParallelStrategy;
use polars_io::RowIndex;
use polars_io::{HiveOptions, RowIndex};

use crate::prelude::*;

Expand All @@ -17,7 +17,7 @@ pub struct ScanArgsParquet {
pub low_memory: bool,
pub cloud_options: Option<CloudOptions>,
pub use_statistics: bool,
pub hive_partitioning: bool,
pub hive_options: HiveOptions,
}

impl Default for ScanArgsParquet {
Expand All @@ -31,7 +31,7 @@ impl Default for ScanArgsParquet {
low_memory: false,
cloud_options: None,
use_statistics: true,
hive_partitioning: false,
hive_options: Default::default(),
}
}
}
Expand Down Expand Up @@ -83,7 +83,7 @@ impl LazyFileListReader for LazyParquetReader {
self.args.low_memory,
self.args.cloud_options,
self.args.use_statistics,
self.args.hive_partitioning,
self.args.hive_options,
)?
.build()
.into();
Expand Down
30 changes: 20 additions & 10 deletions crates/polars-plan/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use polars_io::parquet::ParquetAsyncReader;
use polars_io::parquet::ParquetReader;
#[cfg(all(feature = "cloud", feature = "parquet"))]
use polars_io::pl_async::get_runtime;
use polars_io::HiveOptions;
#[cfg(any(
feature = "parquet",
feature = "parquet_async",
Expand Down Expand Up @@ -116,7 +117,11 @@ impl LogicalPlanBuilder {
row_index: None,
rechunk: false,
file_counter: Default::default(),
hive_partitioning: false,
// TODO: Support Hive partitioning.
hive_options: HiveOptions {
enabled: false,
..Default::default()
},
};

Ok(LogicalPlan::Scan {
Expand Down Expand Up @@ -147,7 +152,7 @@ impl LogicalPlanBuilder {
low_memory: bool,
cloud_options: Option<CloudOptions>,
use_statistics: bool,
hive_partitioning: bool,
hive_options: HiveOptions,
) -> PolarsResult<Self> {
use polars_io::{is_cloud_url, SerReader as _};

Expand Down Expand Up @@ -197,10 +202,8 @@ impl LogicalPlanBuilder {
(num_rows, num_rows.unwrap_or(0)),
);

// We set the hive partitions of the first path to determine the schema.
// On iteration the partition values will be re-set per file.
if hive_partitioning {
file_info.init_hive_partitions(path.as_path())?;
if hive_options.enabled {
file_info.init_hive_partitions(path.as_path(), hive_options.schema.clone())?
}

let options = FileScanOptions {
Expand All @@ -210,7 +213,7 @@ impl LogicalPlanBuilder {
rechunk,
row_index,
file_counter: Default::default(),
hive_partitioning,
hive_options,
};
Ok(LogicalPlan::Scan {
paths,
Expand Down Expand Up @@ -285,7 +288,11 @@ impl LogicalPlanBuilder {
rechunk,
row_index,
file_counter: Default::default(),
hive_partitioning: false,
// TODO: Support Hive partitioning.
hive_options: HiveOptions {
enabled: false,
..Default::default()
},
},
predicate: None,
scan_type: FileScan::Ipc {
Expand Down Expand Up @@ -393,8 +400,11 @@ impl LogicalPlanBuilder {
rechunk,
row_index,
file_counter: Default::default(),
// TODO! add
hive_partitioning: false,
// TODO: Support Hive partitioning.
hive_options: HiveOptions {
enabled: false,
..Default::default()
},
};
Ok(LogicalPlan::Scan {
paths,
Expand Down
Loading

0 comments on commit cdbfdc1

Please sign in to comment.