From cdbfdc109dd637a21a42a04cdccee5f00c7d2fdf Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Tue, 2 Apr 2024 23:17:00 +0200 Subject: [PATCH] feat(python,rust!): Allow specifying Hive schema in `read/scan_parquet` (#15434) --- crates/polars-io/src/options.rs | 18 ++ crates/polars-io/src/predicates.rs | 131 +++++++---- crates/polars-lazy/src/scan/parquet.rs | 8 +- .../polars-plan/src/logical_plan/builder.rs | 30 ++- crates/polars-plan/src/logical_plan/hive.rs | 205 ++++++++++++------ .../optimizer/predicate_pushdown/mod.rs | 7 +- .../polars-plan/src/logical_plan/options.rs | 6 +- crates/polars-plan/src/logical_plan/schema.rs | 68 ++++-- py-polars/polars/io/parquet/functions.py | 23 +- py-polars/polars/lazyframe/frame.py | 2 + py-polars/src/lazyframe/mod.rs | 19 +- py-polars/tests/unit/io/test_hive.py | 61 +++++- 12 files changed, 410 insertions(+), 168 deletions(-) diff --git a/crates/polars-io/src/options.rs b/crates/polars-io/src/options.rs index 13a6b56bcdef..995bf5ec2904 100644 --- a/crates/polars-io/src/options.rs +++ b/crates/polars-io/src/options.rs @@ -1,3 +1,4 @@ +use polars_core::schema::SchemaRef; use polars_utils::IdxSize; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; @@ -8,3 +9,20 @@ pub struct RowIndex { pub name: String, pub offset: IdxSize, } + +/// Options for Hive partitioning. +#[derive(Clone, Debug, Eq, PartialEq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct HiveOptions { + pub enabled: bool, + pub schema: Option, +} + +impl Default for HiveOptions { + fn default() -> Self { + Self { + enabled: true, + schema: None, + } + } +} diff --git a/crates/polars-io/src/predicates.rs b/crates/polars-io/src/predicates.rs index 7c3dca6b654e..86f36b867bdd 100644 --- a/crates/polars-io/src/predicates.rs +++ b/crates/polars-io/src/predicates.rs @@ -38,24 +38,24 @@ pub fn apply_predicate( Ok(()) } -/// The statistics for a column in a Parquet file -/// or Hive partition. -/// they typically hold -/// - max value -/// - min value -/// - null_count +/// Statistics of the values in a column. +/// +/// The following statistics are tracked for each row group: +/// - Null count +/// - Minimum value +/// - Maximum value #[derive(Debug)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct ColumnStats { field: Field, - // The array may hold the null count for every row group, - // or for a single row group. + // Each Series contains the stats for each row group. null_count: Option, min_value: Option, max_value: Option, } impl ColumnStats { + /// Constructs a new [`ColumnStats`]. pub fn new( field: Field, null_count: Option, @@ -70,6 +70,17 @@ impl ColumnStats { } } + /// Constructs a new [`ColumnStats`] with only the [`Field`] information and no statistics. + pub fn from_field(field: Field) -> Self { + Self { + field, + null_count: None, + min_value: None, + max_value: None, + } + } + + /// Constructs a new [`ColumnStats`] from a single-value Series. pub fn from_column_literal(s: Series) -> Self { debug_assert_eq!(s.len(), 1); Self { @@ -80,16 +91,33 @@ impl ColumnStats { } } + /// Returns the [`DataType`] of the column. pub fn dtype(&self) -> &DataType { self.field.data_type() } + /// Returns the null count of each row group of the column. + pub fn get_null_count_state(&self) -> Option<&Series> { + self.null_count.as_ref() + } + + /// Returns the minimum value of each row group of the column. + pub fn get_min_state(&self) -> Option<&Series> { + self.min_value.as_ref() + } + + /// Returns the maximum value of each row group of the column. + pub fn get_max_state(&self) -> Option<&Series> { + self.max_value.as_ref() + } + + /// Returns the null count of the column. pub fn null_count(&self) -> Option { - match self.field.data_type() { + match self.dtype() { #[cfg(feature = "dtype-struct")] DataType::Struct(_) => None, _ => { - let s = self.null_count.as_ref()?; + let s = self.get_null_count_state()?; // if all null, there are no statistics. if s.null_count() != s.len() { s.sum().ok() @@ -100,34 +128,33 @@ impl ColumnStats { } } + /// Returns the minimum and maximum values of the column as a single [`Series`]. pub fn to_min_max(&self) -> Option { - let max_val = self.max_value.as_ref()?; - let min_val = self.min_value.as_ref()?; + let min_val = self.get_min_state()?; + let max_val = self.get_max_state()?; + let dtype = self.dtype(); - let dtype = min_val.dtype(); + if !use_min_max(dtype) { + return None; + } - if Self::use_min_max(dtype) { - let mut min_max_values = min_val.clone(); - min_max_values.append(max_val).unwrap(); - if min_max_values.null_count() > 0 { - None - } else { - Some(min_max_values) - } - } else { + let mut min_max_values = min_val.clone(); + min_max_values.append(max_val).unwrap(); + if min_max_values.null_count() > 0 { None + } else { + Some(min_max_values) } } - pub fn get_min_state(&self) -> Option<&Series> { - self.min_value.as_ref() - } - + /// Returns the minimum value of the column as a single-value [`Series`]. + /// + /// Returns `None` if no maximum value is available. pub fn to_min(&self) -> Option<&Series> { let min_val = self.min_value.as_ref()?; let dtype = min_val.dtype(); - if !Self::use_min_max(dtype) || min_val.len() != 1 { + if !use_min_max(dtype) || min_val.len() != 1 { return None; } @@ -138,11 +165,14 @@ impl ColumnStats { } } + /// Returns the maximum value of the column as a single-value [`Series`]. + /// + /// Returns `None` if no maximum value is available. pub fn to_max(&self) -> Option<&Series> { let max_val = self.max_value.as_ref()?; let dtype = max_val.dtype(); - if !Self::use_min_max(dtype) || max_val.len() != 1 { + if !use_min_max(dtype) || max_val.len() != 1 { return None; } @@ -152,14 +182,15 @@ impl ColumnStats { Some(max_val) } } +} - fn use_min_max(dtype: &DataType) -> bool { - dtype.is_numeric() - || matches!( - dtype, - DataType::String | DataType::Binary | DataType::Boolean - ) - } +/// Returns whether the [`DataType`] supports minimum/maximum operations. +fn use_min_max(dtype: &DataType) -> bool { + dtype.is_numeric() + || matches!( + dtype, + DataType::String | DataType::Binary | DataType::Boolean + ) } /// A collection of column stats with a known schema. @@ -168,12 +199,14 @@ impl ColumnStats { pub struct BatchStats { schema: SchemaRef, stats: Vec, - // This might not be available, - // as when prunnign hive partitions. + // This might not be available, as when pruning hive partitions. num_rows: Option, } impl BatchStats { + /// Constructs a new [`BatchStats`]. + /// + /// The `stats` should match the order of the `schema`. pub fn new(schema: SchemaRef, stats: Vec, num_rows: Option) -> Self { Self { schema, @@ -182,19 +215,27 @@ impl BatchStats { } } - pub fn get_stats(&self, column: &str) -> polars_core::error::PolarsResult<&ColumnStats> { - self.schema.try_index_of(column).map(|i| &self.stats[i]) - } - - pub fn num_rows(&self) -> Option { - self.num_rows - } - + /// Returns the [`Schema`] of the batch. pub fn schema(&self) -> &SchemaRef { &self.schema } + /// Returns the [`ColumnStats`] of all columns in the batch, if known. pub fn column_stats(&self) -> &[ColumnStats] { self.stats.as_ref() } + + /// Returns the [`ColumnStats`] of a single column in the batch. + /// + /// Returns an `Err` if no statistics are available for the given column. + pub fn get_stats(&self, column: &str) -> PolarsResult<&ColumnStats> { + self.schema.try_index_of(column).map(|i| &self.stats[i]) + } + + /// Returns the number of rows in the batch. + /// + /// Returns `None` if the number of rows is unknown. + pub fn num_rows(&self) -> Option { + self.num_rows + } } diff --git a/crates/polars-lazy/src/scan/parquet.rs b/crates/polars-lazy/src/scan/parquet.rs index aa0dc47b9ca5..7c4f39dd4c54 100644 --- a/crates/polars-lazy/src/scan/parquet.rs +++ b/crates/polars-lazy/src/scan/parquet.rs @@ -3,7 +3,7 @@ use std::path::{Path, PathBuf}; use polars_core::prelude::*; use polars_io::cloud::CloudOptions; use polars_io::parquet::ParallelStrategy; -use polars_io::RowIndex; +use polars_io::{HiveOptions, RowIndex}; use crate::prelude::*; @@ -17,7 +17,7 @@ pub struct ScanArgsParquet { pub low_memory: bool, pub cloud_options: Option, pub use_statistics: bool, - pub hive_partitioning: bool, + pub hive_options: HiveOptions, } impl Default for ScanArgsParquet { @@ -31,7 +31,7 @@ impl Default for ScanArgsParquet { low_memory: false, cloud_options: None, use_statistics: true, - hive_partitioning: false, + hive_options: Default::default(), } } } @@ -83,7 +83,7 @@ impl LazyFileListReader for LazyParquetReader { self.args.low_memory, self.args.cloud_options, self.args.use_statistics, - self.args.hive_partitioning, + self.args.hive_options, )? .build() .into(); diff --git a/crates/polars-plan/src/logical_plan/builder.rs b/crates/polars-plan/src/logical_plan/builder.rs index 9a257eb95b9f..7075e278ce0b 100644 --- a/crates/polars-plan/src/logical_plan/builder.rs +++ b/crates/polars-plan/src/logical_plan/builder.rs @@ -10,6 +10,7 @@ use polars_io::parquet::ParquetAsyncReader; use polars_io::parquet::ParquetReader; #[cfg(all(feature = "cloud", feature = "parquet"))] use polars_io::pl_async::get_runtime; +use polars_io::HiveOptions; #[cfg(any( feature = "parquet", feature = "parquet_async", @@ -116,7 +117,11 @@ impl LogicalPlanBuilder { row_index: None, rechunk: false, file_counter: Default::default(), - hive_partitioning: false, + // TODO: Support Hive partitioning. + hive_options: HiveOptions { + enabled: false, + ..Default::default() + }, }; Ok(LogicalPlan::Scan { @@ -147,7 +152,7 @@ impl LogicalPlanBuilder { low_memory: bool, cloud_options: Option, use_statistics: bool, - hive_partitioning: bool, + hive_options: HiveOptions, ) -> PolarsResult { use polars_io::{is_cloud_url, SerReader as _}; @@ -197,10 +202,8 @@ impl LogicalPlanBuilder { (num_rows, num_rows.unwrap_or(0)), ); - // We set the hive partitions of the first path to determine the schema. - // On iteration the partition values will be re-set per file. - if hive_partitioning { - file_info.init_hive_partitions(path.as_path())?; + if hive_options.enabled { + file_info.init_hive_partitions(path.as_path(), hive_options.schema.clone())? } let options = FileScanOptions { @@ -210,7 +213,7 @@ impl LogicalPlanBuilder { rechunk, row_index, file_counter: Default::default(), - hive_partitioning, + hive_options, }; Ok(LogicalPlan::Scan { paths, @@ -285,7 +288,11 @@ impl LogicalPlanBuilder { rechunk, row_index, file_counter: Default::default(), - hive_partitioning: false, + // TODO: Support Hive partitioning. + hive_options: HiveOptions { + enabled: false, + ..Default::default() + }, }, predicate: None, scan_type: FileScan::Ipc { @@ -393,8 +400,11 @@ impl LogicalPlanBuilder { rechunk, row_index, file_counter: Default::default(), - // TODO! add - hive_partitioning: false, + // TODO: Support Hive partitioning. + hive_options: HiveOptions { + enabled: false, + ..Default::default() + }, }; Ok(LogicalPlan::Scan { paths, diff --git a/crates/polars-plan/src/logical_plan/hive.rs b/crates/polars-plan/src/logical_plan/hive.rs index 46f4d5a50722..d22625160db0 100644 --- a/crates/polars-plan/src/logical_plan/hive.rs +++ b/crates/polars-plan/src/logical_plan/hive.rs @@ -15,92 +15,68 @@ pub struct HivePartitions { stats: BatchStats, } -#[cfg(target_os = "windows")] -fn separator(url: &Path) -> char { - if polars_io::is_cloud_url(url) { - '/' - } else { - '\\' - } -} - -#[cfg(not(target_os = "windows"))] -fn separator(_url: &Path) -> char { - '/' -} - impl HivePartitions { - pub fn get_statistics(&self) -> &BatchStats { - &self.stats + /// Constructs a new [`HivePartitions`] from a schema reference. + pub fn from_schema_ref(schema: SchemaRef) -> Self { + let column_stats = schema.iter_fields().map(ColumnStats::from_field).collect(); + let stats = BatchStats::new(schema, column_stats, None); + Self { stats } } - /// Parse a url and optionally return HivePartitions - pub(crate) fn parse_url(url: &Path) -> Option { - let sep = separator(url); - - let url_string = url.display().to_string(); + /// Constructs a new [`HivePartitions`] from a path. + /// + /// Returns `None` if the path does not contain any Hive partitions. + /// Returns `Err` if the Hive partitions cannot be parsed correctly or do not match the given + /// [`Schema`]. + pub fn try_from_path(path: &Path, schema: Option) -> PolarsResult> { + let sep = separator(path); - let pre_filt = url_string.split(sep); + let path_string = path.display().to_string(); + let path_parts = path_string.split(sep); - let split_count_m1 = pre_filt.clone().count() - 1; + // Last part is the file, which should be skipped. + let file_index = path_parts.clone().count() - 1; - let partitions = pre_filt + let partitions = path_parts .enumerate() .filter_map(|(index, part)| { - let mut it = part.split('='); - let name = it.next()?; - let value = it.next()?; - - // Don't see files `foo=1.parquet` as hive partitions. - // So we return globs and paths with extensions. - if value.contains('*') { - return None; - } - - // Identify file by index location - if index == split_count_m1 { - return None; - } - - // Having multiple '=' doesn't seem like valid hive partition, - // continue as url. - if it.next().is_some() { + if index == file_index { return None; } - - let s = if INTEGER_RE.is_match(value) { - let value = value.parse::().ok()?; - Series::new(name, &[value]) - } else if BOOLEAN_RE.is_match(value) { - let value = value.parse::().ok()?; - Series::new(name, &[value]) - } else if FLOAT_RE.is_match(value) { - let value = value.parse::().ok()?; - Series::new(name, &[value]) - } else if value == "__HIVE_DEFAULT_PARTITION__" { - Series::new_null(name, 1) - } else { - Series::new(name, &[percent_decode_str(value).decode_utf8().ok()?]) - }; - Some(s) + parse_hive_string(part) }) - .collect::>(); + .map(|(name, value)| hive_info_to_series(name, value, schema.clone())) + .collect::>>()?; if partitions.is_empty() { - None - } else { - let schema: Schema = partitions.as_slice().into(); - let stats = BatchStats::new( - Arc::new(schema), - partitions - .into_iter() - .map(ColumnStats::from_column_literal) - .collect(), - None, - ); - - Some(HivePartitions { stats }) + return Ok(None); } + + let schema = match schema { + Some(s) => { + polars_ensure!( + s.len() == partitions.len(), + SchemaMismatch: "path does not match the provided Hive schema" + ); + s + }, + None => Arc::new(partitions.as_slice().into()), + }; + + let stats = BatchStats::new( + schema, + partitions + .into_iter() + .map(ColumnStats::from_column_literal) + .collect(), + None, + ); + + Ok(Some(HivePartitions { stats })) + } + + pub fn get_statistics(&self) -> &BatchStats { + &self.stats } pub(crate) fn schema(&self) -> &SchemaRef { @@ -115,3 +91,88 @@ impl HivePartitions { .collect() } } + +/// Determine the path separator for identifying Hive partitions. +#[cfg(target_os = "windows")] +fn separator(url: &Path) -> char { + if polars_io::is_cloud_url(url) { + '/' + } else { + '\\' + } +} + +/// Determine the path separator for identifying Hive partitions. +#[cfg(not(target_os = "windows"))] +fn separator(_url: &Path) -> char { + '/' +} + +/// Parse a Hive partition string (e.g. "column=1.5") into a name and value part. +/// +/// Returns `None` if the string is not a Hive partition string. +fn parse_hive_string(part: &'_ str) -> Option<(&'_ str, &'_ str)> { + let mut it = part.split('='); + let name = it.next()?; + let value = it.next()?; + + // Having multiple '=' doesn't seem like a valid Hive partition. + if it.next().is_some() { + return None; + } + + // Files are not Hive partitions, so globs are not valid. + if value.contains('*') { + return None; + } + + Some((name, value)) +} + +/// Convert Hive partition string information to a single-value [`Series`]. +fn hive_info_to_series(name: &str, value: &str, schema: Option) -> PolarsResult { + let dtype = match schema { + Some(ref s) => { + let dtype = s.try_get(name).map_err(|_| { + polars_err!( + SchemaFieldNotFound: + "path contains column not present in the given Hive schema: {:?}", name + ) + })?; + Some(dtype) + }, + None => None, + }; + + value_to_series(name, value, dtype) +} + +/// Parse a string value into a single-value [`Series`]. +fn value_to_series(name: &str, value: &str, dtype: Option<&DataType>) -> PolarsResult { + let fn_err = || polars_err!(ComputeError: "unable to parse Hive partition value: {:?}", value); + + let mut s = if INTEGER_RE.is_match(value) { + let value = value.parse::().map_err(|_| fn_err())?; + Series::new(name, &[value]) + } else if BOOLEAN_RE.is_match(value) { + let value = value.parse::().map_err(|_| fn_err())?; + Series::new(name, &[value]) + } else if FLOAT_RE.is_match(value) { + let value = value.parse::().map_err(|_| fn_err())?; + Series::new(name, &[value]) + } else if value == "__HIVE_DEFAULT_PARTITION__" { + Series::new_null(name, 1) + } else { + let value = percent_decode_str(value) + .decode_utf8() + .map_err(|_| fn_err())?; + Series::new(name, &[value]) + }; + + // TODO: Avoid expensive logic above when dtype is known + if let Some(dt) = dtype { + s = s.strict_cast(dt)?; + } + + Ok(s) +} diff --git a/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs b/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs index c3bf2d2a2ea3..6abfa45210ab 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs @@ -363,7 +363,12 @@ impl<'a> PredicatePushDown<'a> { for path in paths.as_ref().iter() { file_info.update_hive_partitions(path)?; - let hive_part_stats = file_info.hive_parts.as_deref().ok_or_else(|| polars_err!(ComputeError: "cannot combine hive partitioned directories with non-hive partitioned ones"))?; + let hive_part_stats = file_info.hive_parts.as_deref().ok_or_else(|| { + polars_err!( + ComputeError: + "cannot combine hive partitioned directories with non-hive partitioned ones" + ) + })?; if stats_evaluator.should_read(hive_part_stats.get_statistics())? { new_paths.push(path.clone()); diff --git a/crates/polars-plan/src/logical_plan/options.rs b/crates/polars-plan/src/logical_plan/options.rs index 628aacc4e7b0..76e35467822c 100644 --- a/crates/polars-plan/src/logical_plan/options.rs +++ b/crates/polars-plan/src/logical_plan/options.rs @@ -11,7 +11,7 @@ use polars_io::csv::{CommentPrefix, CsvEncoding, NullValues}; use polars_io::ipc::IpcCompression; #[cfg(feature = "parquet")] use polars_io::parquet::ParquetCompression; -use polars_io::RowIndex; +use polars_io::{HiveOptions, RowIndex}; #[cfg(feature = "dynamic_group_by")] use polars_time::{DynamicGroupOptions, RollingGroupOptions}; #[cfg(feature = "serde")] @@ -117,7 +117,7 @@ pub struct IpcScanOptions { #[derive(Clone, Debug, PartialEq, Eq, Default, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -/// Generic options for all file types +/// Generic options for all file types. pub struct FileScanOptions { pub n_rows: Option, pub with_columns: Option>>, @@ -125,7 +125,7 @@ pub struct FileScanOptions { pub row_index: Option, pub rechunk: bool, pub file_counter: FileCount, - pub hive_partitioning: bool, + pub hive_options: HiveOptions, } #[derive(Clone, Debug, Copy, Default, Eq, PartialEq, Hash)] diff --git a/crates/polars-plan/src/logical_plan/schema.rs b/crates/polars-plan/src/logical_plan/schema.rs index cef56be58061..bf107a439d01 100644 --- a/crates/polars-plan/src/logical_plan/schema.rs +++ b/crates/polars-plan/src/logical_plan/schema.rs @@ -7,6 +7,7 @@ use polars_utils::format_smartstring; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; +use super::hive::HivePartitions; use crate::prelude::*; impl LogicalPlan { @@ -53,10 +54,11 @@ pub struct FileInfo { /// - known size /// - estimated size pub row_estimation: (Option, usize), - pub hive_parts: Option>, + pub hive_parts: Option>, } impl FileInfo { + /// Constructs a new [`FileInfo`]. pub fn new( schema: SchemaRef, reader_schema: Option, @@ -70,35 +72,57 @@ impl FileInfo { } } - /// Updates the statistics and merges the hive partitions schema with the file one. - pub fn init_hive_partitions(&mut self, url: &Path) -> PolarsResult<()> { - self.hive_parts = hive::HivePartitions::parse_url(url).map(|hive_parts| { - let hive_schema = hive_parts.get_statistics().schema().clone(); - let expected_len = self.schema.len() + hive_schema.len(); + /// Set the [`HivePartitions`] information for this [`FileInfo`] from a path and an + /// optional schema. + pub fn init_hive_partitions( + &mut self, + path: &Path, + schema: Option, + ) -> PolarsResult<()> { + let hp = HivePartitions::try_from_path(path, schema)?; + if let Some(hp) = hp { + let hive_schema = hp.schema().clone(); + self.update_schema_with_hive_schema(hive_schema)?; + self.hive_parts = Some(Arc::new(hp)); + } + Ok(()) + } - let schema = Arc::make_mut(&mut self.schema); - schema.merge((**hive_parts.get_statistics().schema()).clone()); + /// Merge the [`Schema`] of a [`HivePartitions`] with the schema of this [`FileInfo`]. + /// + /// Returns an `Err` if any of the columns in either schema overlap. + fn update_schema_with_hive_schema(&mut self, hive_schema: SchemaRef) -> PolarsResult<()> { + let expected_len = self.schema.len() + hive_schema.len(); - polars_ensure!(schema.len() == expected_len, ComputeError: "invalid hive partitions\n\n\ - Extending the schema with the hive partitioned columns creates duplicate fields."); + let file_schema = Arc::make_mut(&mut self.schema); + file_schema.merge(Arc::unwrap_or_clone(hive_schema)); - Ok(Arc::new(hive_parts)) - }).transpose()?; + polars_ensure!( + file_schema.len() == expected_len, + Duplicate: "invalid Hive partition schema\n\n\ + Extending the schema with the Hive partition schema would create duplicate fields." + ); Ok(()) } - /// Updates the statistics, but not the schema. - pub fn update_hive_partitions(&mut self, url: &Path) -> PolarsResult<()> { + /// Update the [`HivePartitions`] statistics for this [`FileInfo`]. + /// + /// If the Hive partitions were not yet initialized, this function has no effect. + pub fn update_hive_partitions(&mut self, path: &Path) -> PolarsResult<()> { if let Some(current) = &mut self.hive_parts { - let new = hive::HivePartitions::parse_url(url).ok_or_else(|| polars_err!(ComputeError: "expected hive partitioned path, got {}\n\n\ - This error occurs if 'hive_partitioning=true' some paths are hive partitioned and some paths are not.", url.display()))?; + let schema = current.schema().clone(); + let hp = HivePartitions::try_from_path(path, Some(schema))?; + let Some(new) = hp else { + polars_bail!( + ComputeError: "expected Hive partitioned path, got {}\n\n\ + This error occurs if some paths are Hive partitioned and some paths are not.", + path.display() + ) + }; + match Arc::get_mut(current) { - Some(current) => { - *current = new; - }, - _ => { - *current = Arc::new(new); - }, + Some(hp) => *hp = new, + None => *current = Arc::new(new), } } Ok(()) diff --git a/py-polars/polars/io/parquet/functions.py b/py-polars/polars/io/parquet/functions.py index fcc94f8cc31a..ed7634a22fae 100644 --- a/py-polars/polars/io/parquet/functions.py +++ b/py-polars/polars/io/parquet/functions.py @@ -17,7 +17,7 @@ if TYPE_CHECKING: from polars import DataFrame, DataType, LazyFrame - from polars.type_aliases import ParallelStrategy + from polars.type_aliases import ParallelStrategy, SchemaDict @deprecate_renamed_parameter("row_count_name", "row_index_name", version="0.20.4") @@ -32,6 +32,7 @@ def read_parquet( parallel: ParallelStrategy = "auto", use_statistics: bool = True, hive_partitioning: bool = True, + hive_schema: SchemaDict | None = None, rechunk: bool = True, low_memory: bool = False, storage_options: dict[str, Any] | None = None, @@ -69,8 +70,11 @@ def read_parquet( Use statistics in the parquet to determine if pages can be skipped from reading. hive_partitioning - Infer statistics and schema from hive partitioned URL and use them + Infer statistics and schema from Hive partitioned URL and use them to prune reads. + hive_schema + The column names and data types of the columns by which the data is partitioned. + If set to `None` (default), the schema of the Hive partitions is inferred. rechunk Make sure that all columns are contiguous in memory by aggregating the chunks into a single array. @@ -113,9 +117,6 @@ def read_parquet( Notes ----- - * Partitioned files: - If you have a directory-nested (hive-style) partitioned dataset, you should - use the :func:`scan_pyarrow_dataset` method instead. * When benchmarking: This operation defaults to a `rechunk` operation at the end, meaning that all data will be stored continuously in memory. Set `rechunk=False` if you are @@ -132,6 +133,12 @@ def read_parquet( if n_rows is not None: msg = "`n_rows` cannot be used with `use_pyarrow=True`" raise ValueError(msg) + if hive_schema is not None: + msg = ( + "cannot use `hive_partitions` with `use_pyarrow=True`" + "\n\nHint: Pass `pyarrow_options` instead with a 'partitioning' entry." + ) + raise TypeError(msg) import pyarrow as pa import pyarrow.parquet @@ -176,6 +183,7 @@ def read_parquet( parallel=parallel, use_statistics=use_statistics, hive_partitioning=hive_partitioning, + hive_schema=hive_schema, rechunk=rechunk, low_memory=low_memory, cache=False, @@ -224,6 +232,7 @@ def scan_parquet( parallel: ParallelStrategy = "auto", use_statistics: bool = True, hive_partitioning: bool = True, + hive_schema: SchemaDict | None = None, rechunk: bool = False, low_memory: bool = False, cache: bool = True, @@ -257,6 +266,9 @@ def scan_parquet( hive_partitioning Infer statistics and schema from hive partitioned URL and use them to prune reads. + hive_schema + The column names and data types of the columns by which the data is partitioned. + If set to `None` (default), the schema of the Hive partitions is inferred. rechunk In case of reading multiple files via a glob pattern rechunk the final DataFrame into contiguous memory chunks. @@ -320,5 +332,6 @@ def scan_parquet( low_memory=low_memory, use_statistics=use_statistics, hive_partitioning=hive_partitioning, + hive_schema=hive_schema, retries=retries, ) diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 4ec0a5632e46..f96adfd0bbc4 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -430,6 +430,7 @@ def _scan_parquet( low_memory: bool = False, use_statistics: bool = True, hive_partitioning: bool = True, + hive_schema: SchemaDict | None = None, retries: int = 0, ) -> Self: """ @@ -481,6 +482,7 @@ def _scan_parquet( cloud_options=storage_options, use_statistics=use_statistics, hive_partitioning=hive_partitioning, + hive_schema=hive_schema, retries=retries, ) return self diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index f6fc7679a680..3b0e9188a137 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -6,10 +6,10 @@ use std::num::NonZeroUsize; use std::path::PathBuf; pub use exitable::PyInProcessQuery; -use polars::io::RowIndex; +use polars::io::cloud::CloudOptions; +use polars::io::{HiveOptions, RowIndex}; use polars::time::*; use polars_core::prelude::*; -use polars_rs::io::cloud::CloudOptions; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3::types::{PyBytes, PyDict, PyList}; @@ -243,7 +243,7 @@ impl PyLazyFrame { #[cfg(feature = "parquet")] #[staticmethod] #[pyo3(signature = (path, paths, n_rows, cache, parallel, rechunk, row_index, - low_memory, cloud_options, use_statistics, hive_partitioning, retries) + low_memory, cloud_options, use_statistics, hive_partitioning, hive_schema, retries) )] fn new_from_parquet( path: Option, @@ -257,8 +257,12 @@ impl PyLazyFrame { cloud_options: Option>, use_statistics: bool, hive_partitioning: bool, + hive_schema: Option>, retries: usize, ) -> PyResult { + let parallel = parallel.0; + let hive_schema = hive_schema.map(|s| Arc::new(s.0)); + let first_path = if let Some(path) = &path { path } else { @@ -281,16 +285,21 @@ impl PyLazyFrame { }); } let row_index = row_index.map(|(name, offset)| RowIndex { name, offset }); + let hive_options = HiveOptions { + enabled: hive_partitioning, + schema: hive_schema, + }; + let args = ScanArgsParquet { n_rows, cache, - parallel: parallel.0, + parallel, rechunk, row_index, low_memory, cloud_options, use_statistics, - hive_partitioning, + hive_options, }; let lf = if path.is_some() { diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index d8f4a81beddb..535f1412eb7b 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -1,4 +1,5 @@ import warnings +from collections import OrderedDict from pathlib import Path from typing import Any @@ -180,7 +181,7 @@ def test_hive_partitioned_err(io_files_path: Path, tmp_path: Path) -> None: root.mkdir() df.write_parquet(root / "file.parquet") - with pytest.raises(pl.ComputeError, match="invalid hive partitions"): + with pytest.raises(pl.DuplicateError, match="invalid Hive partition schema"): pl.scan_parquet(root / "**/*.parquet", hive_partitioning=True) @@ -207,3 +208,61 @@ def test_hive_partitioned_projection_skip_files( .collect() ) assert_frame_equal(df, test_df) + + +@pytest.fixture() +def dataset_path(tmp_path: Path) -> Path: + tmp_path.mkdir(exist_ok=True) + + # Set up Hive partitioned Parquet file + root = tmp_path / "dataset" + part1 = root / "c=1" + part2 = root / "c=2" + root.mkdir() + part1.mkdir() + part2.mkdir() + df1 = pl.DataFrame({"a": [1, 2], "b": [11.0, 12.0]}) + df2 = pl.DataFrame({"a": [3, 4], "b": [13.0, 14.0]}) + df3 = pl.DataFrame({"a": [5, 6], "b": [15.0, 16.0]}) + df1.write_parquet(part1 / "one.parquet") + df2.write_parquet(part1 / "two.parquet") + df3.write_parquet(part2 / "three.parquet") + + return root + + +@pytest.mark.write_disk() +def test_scan_parquet_hive_schema(dataset_path: Path) -> None: + result = pl.scan_parquet(dataset_path / "**/*.parquet", hive_partitioning=True) + assert result.schema == OrderedDict({"a": pl.Int64, "b": pl.Float64, "c": pl.Int64}) + + result = pl.scan_parquet( + dataset_path / "**/*.parquet", + hive_partitioning=True, + hive_schema={"c": pl.Int32}, + ) + + expected_schema = OrderedDict({"a": pl.Int64, "b": pl.Float64, "c": pl.Int32}) + assert result.schema == expected_schema + assert result.collect().schema == expected_schema + + +@pytest.mark.write_disk() +def test_read_parquet_invalid_hive_schema(dataset_path: Path) -> None: + with pytest.raises( + pl.SchemaFieldNotFoundError, + match='path contains column not present in the given Hive schema: "c"', + ): + pl.read_parquet( + dataset_path / "**/*.parquet", + hive_partitioning=True, + hive_schema={"nonexistent": pl.Int32}, + ) + + +def test_read_parquet_hive_schema_with_pyarrow() -> None: + with pytest.raises( + TypeError, + match="cannot use `hive_partitions` with `use_pyarrow=True`", + ): + pl.read_parquet("test.parquet", hive_schema={"c": pl.Int32}, use_pyarrow=True)