From 4facac0e0e549370856c060df950ad812734044b Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Wed, 26 Jun 2024 16:13:18 +1000 Subject: [PATCH 01/11] c --- .../src/plans/conversion/dsl_to_ir.rs | 52 +++++++++++++++- crates/polars-plan/src/plans/hive.rs | 25 ++++++-- .../optimizer/projection_pushdown/mod.rs | 57 +++++++++++++++-- crates/polars-plan/src/plans/schema.rs | 24 ++++--- py-polars/tests/unit/io/test_hive.py | 62 +++++++++++++++---- 5 files changed, 182 insertions(+), 38 deletions(-) diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index 0d94c775c984..1f3d8bf9a249 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -1,3 +1,4 @@ +use either::Either; use expr_expansion::{is_regex_projection, rewrite_projections}; use hive::hive_partitions_from_paths; @@ -86,7 +87,7 @@ pub fn to_alp_impl( paths, predicate, mut scan_type, - file_options, + mut file_options, } => { let mut file_info = if let Some(file_info) = file_info { file_info @@ -137,19 +138,66 @@ pub fn to_alp_impl( let hive_parts = if hive_parts.is_some() { hive_parts } else if file_options.hive_options.enabled.unwrap() { + #[allow(unused_assignments)] + let mut owned = None; + hive_partitions_from_paths( paths.as_ref(), file_options.hive_options.hive_start_idx, file_options.hive_options.schema.clone(), + match file_info.reader_schema.as_ref().unwrap() { + Either::Left(v) => { + owned = Some(Schema::from(v)); + owned.as_ref().unwrap() + }, + Either::Right(v) => v.as_ref(), + }, )? } else { None }; if let Some(ref hive_parts) = hive_parts { - file_info.update_schema_with_hive_schema(hive_parts[0].schema().clone())?; + let hive_schema = hive_parts[0].schema(); + file_info.update_schema_with_hive_schema(hive_schema.clone()); } + (|| { + // Update `with_columns` with a projection so that hive columns aren't loaded from the + // file + let Some(ref hive_parts) = hive_parts else { + return; + }; + + let hive_schema = hive_parts[0].schema(); + + let Some((first_hive_name, _)) = hive_schema.get_at_index(0) else { + return; + }; + + let names = match file_info.reader_schema.as_ref().unwrap() { + Either::Left(ref v) => { + let names = v.get_names(); + names.contains(&first_hive_name.as_str()).then_some(names) + }, + Either::Right(ref v) => { + v.contains(first_hive_name.as_str()).then(|| v.get_names()) + }, + }; + + let Some(names) = names else { + return; + }; + + file_options.with_columns = Some( + names + .iter() + .filter(|x| !hive_schema.contains(x)) + .map(ToString::to_string) + .collect::>(), + ); + })(); + if let Some(row_index) = &file_options.row_index { let schema = Arc::make_mut(&mut file_info.schema); *schema = schema diff --git a/crates/polars-plan/src/plans/hive.rs b/crates/polars-plan/src/plans/hive.rs index e0c2b0282d71..497a128528ae 100644 --- a/crates/polars-plan/src/plans/hive.rs +++ b/crates/polars-plan/src/plans/hive.rs @@ -17,11 +17,10 @@ pub struct HivePartitions { } impl HivePartitions { - pub fn get_projection_schema_and_indices>( + pub fn get_projection_schema_and_indices( &self, - names: &[T], + names: &PlHashSet, ) -> (SchemaRef, Vec) { - let names = names.iter().map(T::as_ref).collect::>(); let mut out_schema = Schema::with_capacity(self.stats.schema().len()); let mut out_indices = Vec::with_capacity(self.stats.column_stats().len()); @@ -66,6 +65,7 @@ pub fn hive_partitions_from_paths( paths: &[PathBuf], hive_start_idx: usize, schema: Option, + reader_schema: &Schema, ) -> PolarsResult>> { let Some(path) = paths.first() else { return Ok(None); @@ -88,14 +88,29 @@ pub fn hive_partitions_from_paths( }}; } - let hive_schema = if let Some(v) = schema { - v + let hive_schema = if let Some(ref schema) = schema { + Arc::new(get_hive_parts_iter!(path_string).map(|(name, _)| { + let Some(dtype) = schema.get(name) else { + polars_bail!( + SchemaFieldNotFound: + "path contains column not present in the given Hive schema: {:?}, path = {:?}", + name, + path + ) + }; + Ok(Field::new(name, dtype.clone())) + }).collect::>()?) } else { let mut hive_schema = Schema::with_capacity(16); let mut schema_inference_map: PlHashMap<&str, PlHashSet> = PlHashMap::with_capacity(16); for (name, _) in get_hive_parts_iter!(path_string) { + if let Some(dtype) = reader_schema.get(name) { + hive_schema.insert_at_index(hive_schema.len(), name.into(), dtype.clone())?; + continue; + } + hive_schema.insert_at_index(hive_schema.len(), name.into(), DataType::String)?; schema_inference_map.insert(name, PlHashSet::with_capacity(4)); } diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs index e7a65b50ec79..36ba7bbcea78 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs @@ -9,6 +9,7 @@ mod rename; #[cfg(feature = "semi_anti_join")] mod semi_anti_join; +use either::Either; use polars_core::datatypes::PlHashSet; use polars_core::prelude::*; use polars_io::RowIndex; @@ -35,7 +36,7 @@ fn init_set() -> PlHashSet> { /// utility function to get names of the columns needed in projection at scan level fn get_scan_columns( - acc_projections: &mut Vec, + acc_projections: &Vec, expr_arena: &Arena, row_index: Option<&RowIndex>, ) -> Option> { @@ -378,7 +379,7 @@ impl ProjectionPushDown { mut options, predicate, } => { - options.with_columns = get_scan_columns(&mut acc_projections, expr_arena, None); + options.with_columns = get_scan_columns(&acc_projections, expr_arena, None); options.output_schema = if options.with_columns.is_none() { None @@ -417,7 +418,7 @@ impl ProjectionPushDown { if do_optimization { file_options.with_columns = get_scan_columns( - &mut acc_projections, + &acc_projections, expr_arena, file_options.row_index.as_ref(), ); @@ -432,7 +433,9 @@ impl ProjectionPushDown { hive_parts = if let Some(hive_parts) = hive_parts { let (new_schema, projected_indices) = hive_parts[0] - .get_projection_schema_and_indices(with_columns.as_ref()); + .get_projection_schema_and_indices( + &with_columns.iter().cloned().collect::>(), + ); Some( hive_parts @@ -448,15 +451,22 @@ impl ProjectionPushDown { .collect::>(), ) } else { - hive_parts + None }; // Hive partitions are created AFTER the projection, so the output // schema is incorrect. Here we ensure the columns that are projected and hive // parts are added at the proper place in the schema, which is at the end. - if let Some(ref mut hive_parts) = hive_parts { + if let Some(ref hive_parts) = hive_parts { let partition_schema = hive_parts.first().unwrap().schema(); + file_options.with_columns = file_options.with_columns.map(|x| { + x.iter() + .filter(|x| !partition_schema.contains(x)) + .cloned() + .collect::>() + }); + for (name, _) in partition_schema.iter() { if let Some(dt) = schema.shift_remove(name) { schema.with_column(name.clone(), dt); @@ -465,6 +475,41 @@ impl ProjectionPushDown { } Some(Arc::new(schema)) } else { + (|| { + // Update `with_columns` with a projection so that hive columns aren't loaded from the + // file + let Some(ref hive_parts) = hive_parts else { + return; + }; + + let hive_schema = hive_parts[0].schema(); + + let Some((first_hive_name, _)) = hive_schema.get_at_index(0) else { + return; + }; + + let names = match file_info.reader_schema.as_ref().unwrap() { + Either::Left(ref v) => { + let names = v.get_names(); + names.contains(&first_hive_name.as_str()).then_some(names) + }, + Either::Right(ref v) => { + v.contains(first_hive_name.as_str()).then(|| v.get_names()) + }, + }; + + let Some(names) = names else { + return; + }; + + file_options.with_columns = Some( + names + .iter() + .filter(|x| !hive_schema.contains(x)) + .map(ToString::to_string) + .collect::>(), + ); + })(); None }; } diff --git a/crates/polars-plan/src/plans/schema.rs b/crates/polars-plan/src/plans/schema.rs index 1f97e045eab4..82b3a17fd3f5 100644 --- a/crates/polars-plan/src/plans/schema.rs +++ b/crates/polars-plan/src/plans/schema.rs @@ -51,20 +51,18 @@ impl FileInfo { } /// Merge the [`Schema`] of a [`HivePartitions`] with the schema of this [`FileInfo`]. - /// - /// Returns an `Err` if any of the columns in either schema overlap. - pub fn update_schema_with_hive_schema(&mut self, hive_schema: SchemaRef) -> PolarsResult<()> { - let expected_len = self.schema.len() + hive_schema.len(); + pub fn update_schema_with_hive_schema(&mut self, hive_schema: SchemaRef) { + let schema = Arc::make_mut(&mut self.schema); - let file_schema = Arc::make_mut(&mut self.schema); - file_schema.merge(Arc::unwrap_or_clone(hive_schema)); - - polars_ensure!( - file_schema.len() == expected_len, - Duplicate: "invalid Hive partition schema\n\n\ - Extending the schema with the Hive partition schema would create duplicate fields." - ); - Ok(()) + for field in hive_schema.iter_fields() { + if let Ok(existing) = schema.try_get_mut(&field.name) { + *existing = field.data_type().clone(); + } else { + schema + .insert_at_index(schema.len(), field.name, field.dtype.clone()) + .unwrap(); + } + } } } diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 78ac01336f18..7549a5ac475c 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -10,7 +10,7 @@ import pytest import polars as pl -from polars.exceptions import DuplicateError, SchemaFieldNotFoundError +from polars.exceptions import SchemaFieldNotFoundError from polars.testing import assert_frame_equal, assert_series_equal @@ -247,17 +247,6 @@ def test_hive_partitioned_projection_pushdown( assert_frame_equal(result, expected) -@pytest.mark.write_disk() -def test_hive_partitioned_err(io_files_path: Path, tmp_path: Path) -> None: - df = pl.read_ipc(io_files_path / "*.ipc") - root = tmp_path / "sugars_g=10" - root.mkdir() - df.write_parquet(root / "file.parquet") - - with pytest.raises(DuplicateError, match="invalid Hive partition schema"): - pl.scan_parquet(tmp_path, hive_partitioning=True).collect() - - @pytest.mark.write_disk() def test_hive_partitioned_projection_skip_files( io_files_path: Path, tmp_path: Path @@ -538,3 +527,52 @@ def test_hive_partition_force_async_17155(tmp_path: Path, monkeypatch: Any) -> N assert_frame_equal( lf.collect(), pl.DataFrame({k: [1, 2, 3] for k in ["x", "a", "b"]}) ) + + +@pytest.mark.parametrize("projection_pushdown", [True, False]) +def test_hive_partition_columns_contained_in_file( + tmp_path: Path, projection_pushdown: bool +) -> None: + path = tmp_path / "a=1/b=2/data.bin" + path.parent.mkdir(exist_ok=True, parents=True) + df = pl.DataFrame( + {"x": 1, "y": 1, "a": 1, "b": 2}, + schema={"x": pl.Int32, "y": pl.Int32, "a": pl.Int8, "b": pl.Int16}, + ) + df.write_parquet(path) + + def assert_with_projections(lf: pl.LazyFrame, df: pl.DataFrame) -> None: + for projection in [ + ["a"], + ["b"], + ["x"], + ["y"], + ["a", "x"], + ["b", "x"], + ["a", "y"], + ["b", "y"], + ["x", "y"], + ["a", "b", "x"], + ["a", "b", "y"], + ]: + assert_frame_equal( + lf.select(projection).collect(projection_pushdown=projection_pushdown), + df.select(projection), + ) + + lf = pl.scan_parquet(path, hive_partitioning=True) + rhs = df + assert_frame_equal(lf.collect(projection_pushdown=projection_pushdown), rhs) + assert_with_projections(lf, rhs) + + lf = pl.scan_parquet( + path, + hive_schema={"a": pl.String, "b": pl.String}, + hive_partitioning=True, + ) + rhs = df.with_columns(pl.col("a", "b").cast(pl.String)) + assert_frame_equal( + lf.collect(projection_pushdown=projection_pushdown), + rhs, + ) + assert_with_projections(lf, rhs) From bbd7d7b746f3aa45b8f1c029def38c5934261c0a Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Wed, 26 Jun 2024 16:24:13 +1000 Subject: [PATCH 02/11] c --- .../src/plans/conversion/dsl_to_ir.rs | 79 ++++++++++--------- .../optimizer/projection_pushdown/mod.rs | 40 +--------- 2 files changed, 47 insertions(+), 72 deletions(-) diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index 1f3d8bf9a249..ed71f7858b48 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -1,6 +1,7 @@ +use arrow::datatypes::ArrowSchemaRef; use either::Either; use expr_expansion::{is_regex_projection, rewrite_projections}; -use hive::hive_partitions_from_paths; +use hive::{hive_partitions_from_paths, HivePartitions}; use super::stack_opt::ConversionOptimizer; use super::*; @@ -162,41 +163,10 @@ pub fn to_alp_impl( file_info.update_schema_with_hive_schema(hive_schema.clone()); } - (|| { - // Update `with_columns` with a projection so that hive columns aren't loaded from the - // file - let Some(ref hive_parts) = hive_parts else { - return; - }; - - let hive_schema = hive_parts[0].schema(); - - let Some((first_hive_name, _)) = hive_schema.get_at_index(0) else { - return; - }; - - let names = match file_info.reader_schema.as_ref().unwrap() { - Either::Left(ref v) => { - let names = v.get_names(); - names.contains(&first_hive_name.as_str()).then_some(names) - }, - Either::Right(ref v) => { - v.contains(first_hive_name.as_str()).then(|| v.get_names()) - }, - }; - - let Some(names) = names else { - return; - }; - - file_options.with_columns = Some( - names - .iter() - .filter(|x| !hive_schema.contains(x)) - .map(ToString::to_string) - .collect::>(), - ); - })(); + file_options.with_columns = maybe_init_projection_excluding_hive( + file_info.reader_schema.as_ref().unwrap(), + hive_parts.as_ref(), + ); if let Some(row_index) = &file_options.row_index { let schema = Arc::make_mut(&mut file_info.schema); @@ -850,3 +820,40 @@ where }) .collect() } + +pub(crate) fn maybe_init_projection_excluding_hive( + reader_schema: &Either, + hive_parts: Option<&Arc<[HivePartitions]>>, +) -> Option> { + // Update `with_columns` with a projection so that hive columns aren't loaded from the + // file + let Some(ref hive_parts) = hive_parts else { + return None; + }; + + let hive_schema = hive_parts[0].schema(); + + let Some((first_hive_name, _)) = hive_schema.get_at_index(0) else { + return None; + }; + + let names = match reader_schema { + Either::Left(ref v) => { + let names = v.get_names(); + names.contains(&first_hive_name.as_str()).then_some(names) + }, + Either::Right(ref v) => v.contains(first_hive_name.as_str()).then(|| v.get_names()), + }; + + let Some(names) = names else { + return None; + }; + + Some( + names + .iter() + .filter(|x| !hive_schema.contains(x)) + .map(ToString::to_string) + .collect::>(), + ) +} diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs index 36ba7bbcea78..017333920de2 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs @@ -9,7 +9,6 @@ mod rename; #[cfg(feature = "semi_anti_join")] mod semi_anti_join; -use either::Either; use polars_core::datatypes::PlHashSet; use polars_core::prelude::*; use polars_io::RowIndex; @@ -475,41 +474,10 @@ impl ProjectionPushDown { } Some(Arc::new(schema)) } else { - (|| { - // Update `with_columns` with a projection so that hive columns aren't loaded from the - // file - let Some(ref hive_parts) = hive_parts else { - return; - }; - - let hive_schema = hive_parts[0].schema(); - - let Some((first_hive_name, _)) = hive_schema.get_at_index(0) else { - return; - }; - - let names = match file_info.reader_schema.as_ref().unwrap() { - Either::Left(ref v) => { - let names = v.get_names(); - names.contains(&first_hive_name.as_str()).then_some(names) - }, - Either::Right(ref v) => { - v.contains(first_hive_name.as_str()).then(|| v.get_names()) - }, - }; - - let Some(names) = names else { - return; - }; - - file_options.with_columns = Some( - names - .iter() - .filter(|x| !hive_schema.contains(x)) - .map(ToString::to_string) - .collect::>(), - ); - })(); + file_options.with_columns = maybe_init_projection_excluding_hive( + file_info.reader_schema.as_ref().unwrap(), + hive_parts.as_ref(), + ); None }; } From 958d1c6c2ca4b336d55413727b2ef7d3000f190c Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Wed, 26 Jun 2024 16:31:26 +1000 Subject: [PATCH 03/11] c --- crates/polars-plan/src/plans/conversion/dsl_to_ir.rs | 8 ++++---- .../src/plans/optimizer/projection_pushdown/mod.rs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index ed71f7858b48..5c036b866496 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -165,7 +165,7 @@ pub fn to_alp_impl( file_options.with_columns = maybe_init_projection_excluding_hive( file_info.reader_schema.as_ref().unwrap(), - hive_parts.as_ref(), + hive_parts.as_ref().map(|x| &x[0]), ); if let Some(row_index) = &file_options.row_index { @@ -823,15 +823,15 @@ where pub(crate) fn maybe_init_projection_excluding_hive( reader_schema: &Either, - hive_parts: Option<&Arc<[HivePartitions]>>, + hive_parts: Option<&HivePartitions>, ) -> Option> { // Update `with_columns` with a projection so that hive columns aren't loaded from the // file - let Some(ref hive_parts) = hive_parts else { + let Some(hive_parts) = hive_parts else { return None; }; - let hive_schema = hive_parts[0].schema(); + let hive_schema = hive_parts.schema(); let Some((first_hive_name, _)) = hive_schema.get_at_index(0) else { return None; diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs index 017333920de2..173498519f8f 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs @@ -476,7 +476,7 @@ impl ProjectionPushDown { } else { file_options.with_columns = maybe_init_projection_excluding_hive( file_info.reader_schema.as_ref().unwrap(), - hive_parts.as_ref(), + hive_parts.as_ref().map(|x| &x[0]), ); None }; From 03c7d90f38d67c60b66a2933a375837c349c194b Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Wed, 26 Jun 2024 16:36:31 +1000 Subject: [PATCH 04/11] c --- .../src/plans/conversion/dsl_to_ir.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index 5c036b866496..894f32afd49a 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -138,7 +138,9 @@ pub fn to_alp_impl( let hive_parts = if hive_parts.is_some() { hive_parts - } else if file_options.hive_options.enabled.unwrap() { + } else if file_options.hive_options.enabled.unwrap() + && file_info.reader_schema.is_some() + { #[allow(unused_assignments)] let mut owned = None; @@ -163,10 +165,14 @@ pub fn to_alp_impl( file_info.update_schema_with_hive_schema(hive_schema.clone()); } - file_options.with_columns = maybe_init_projection_excluding_hive( - file_info.reader_schema.as_ref().unwrap(), - hive_parts.as_ref().map(|x| &x[0]), - ); + file_options.with_columns = if file_info.reader_schema.is_some() { + maybe_init_projection_excluding_hive( + file_info.reader_schema.as_ref().unwrap(), + hive_parts.as_ref().map(|x| &x[0]), + ) + } else { + None + }; if let Some(row_index) = &file_options.row_index { let schema = Arc::make_mut(&mut file_info.schema); From 3a600301366457229ed053ac5a604a51cb6e7147 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Wed, 26 Jun 2024 16:39:11 +1000 Subject: [PATCH 05/11] c --- crates/polars-plan/src/plans/conversion/dsl_to_ir.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index 894f32afd49a..0836e0ca305f 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -833,15 +833,11 @@ pub(crate) fn maybe_init_projection_excluding_hive( ) -> Option> { // Update `with_columns` with a projection so that hive columns aren't loaded from the // file - let Some(hive_parts) = hive_parts else { - return None; - }; + let hive_parts = hive_parts?; let hive_schema = hive_parts.schema(); - let Some((first_hive_name, _)) = hive_schema.get_at_index(0) else { - return None; - }; + let (first_hive_name, _) = hive_schema.get_at_index(0)?; let names = match reader_schema { Either::Left(ref v) => { @@ -851,9 +847,7 @@ pub(crate) fn maybe_init_projection_excluding_hive( Either::Right(ref v) => v.contains(first_hive_name.as_str()).then(|| v.get_names()), }; - let Some(names) = names else { - return None; - }; + let names = names?; Some( names From cc03b5eda1d2e96da6c22f738b95a9f054881a1c Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Wed, 26 Jun 2024 17:43:56 +1000 Subject: [PATCH 06/11] c --- crates/polars-io/src/options.rs | 3 +++ crates/polars-mem-engine/src/planner/lp.rs | 2 +- py-polars/src/lazyframe/mod.rs | 1 + py-polars/tests/unit/io/test_hive.py | 4 ++-- 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/polars-io/src/options.rs b/crates/polars-io/src/options.rs index 607e66052e29..3310953a46ad 100644 --- a/crates/polars-io/src/options.rs +++ b/crates/polars-io/src/options.rs @@ -22,6 +22,8 @@ pub struct HiveOptions { pub enabled: Option, pub hive_start_idx: usize, pub schema: Option, + /// This is false if the partitions columns are contained in the file + pub materialize: bool, } impl Default for HiveOptions { @@ -30,6 +32,7 @@ impl Default for HiveOptions { enabled: Some(true), hive_start_idx: 0, schema: None, + materialize: true, } } } diff --git a/crates/polars-mem-engine/src/planner/lp.rs b/crates/polars-mem-engine/src/planner/lp.rs index 323e6078e0b1..fe044941a0fd 100644 --- a/crates/polars-mem-engine/src/planner/lp.rs +++ b/crates/polars-mem-engine/src/planner/lp.rs @@ -284,7 +284,7 @@ fn create_physical_plan_impl( } => Ok(Box::new(executors::ParquetExec::new( paths, file_info, - hive_parts, + hive_parts.filter(|_| file_options.hive_options.materialize), predicate, options, cloud_options, diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index e4b9794868d0..990c3a48467e 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -336,6 +336,7 @@ impl PyLazyFrame { enabled: hive_partitioning, hive_start_idx: 0, schema: hive_schema, + materialize: true, }; let args = ScanArgsParquet { diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 7549a5ac475c..f3307d84b463 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -536,8 +536,8 @@ def test_hive_partition_columns_contained_in_file( path = tmp_path / "a=1/b=2/data.bin" path.parent.mkdir(exist_ok=True, parents=True) df = pl.DataFrame( - {"x": 1, "y": 1, "a": 1, "b": 2}, - schema={"x": pl.Int32, "y": pl.Int32, "a": pl.Int8, "b": pl.Int16}, + {"x": 1, "a": 1, "b": 2, "y": 1}, + schema={"x": pl.Int32, "a": pl.Int8, "b": pl.Int16, "y": pl.Int32}, ) df.write_parquet(path) From a88ea73e77be652428c2d7cffc2ebec6e21a01bb Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Wed, 26 Jun 2024 20:48:29 +1000 Subject: [PATCH 07/11] c --- crates/polars-core/src/frame/mod.rs | 2 +- .../polars-io/src/parquet/read/read_impl.rs | 37 +++++++++++++++++-- crates/polars-io/src/parquet/read/utils.rs | 2 +- 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index 0a8f130dfe51..36ab89ef03cd 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -1121,7 +1121,7 @@ impl DataFrame { series: Series, ) -> PolarsResult<&mut Self> { polars_ensure!( - series.len() == self.height(), + self.height() == 0 || series.len() == self.height(), ShapeMismatch: "unable to add a column of length {} to a DataFrame of height {}", series.len(), self.height(), ); diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 26442980b05f..2d36ddd7b225 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -158,12 +158,31 @@ pub(super) fn array_iter_to_series( /// num_rows equals the height of the df when the df height is non-zero. pub(crate) fn materialize_hive_partitions( df: &mut DataFrame, + reader_schema: &ArrowSchema, hive_partition_columns: Option<&[Series]>, num_rows: usize, ) { if let Some(hive_columns) = hive_partition_columns { - for s in hive_columns { - unsafe { df.with_column_unchecked(s.new_from_index(0, num_rows)) }; + let Some(first) = hive_columns.first() else { + return; + }; + + if reader_schema.index_of(first.name()).is_some() { + for s in hive_columns { + let i = match df.get_columns().binary_search_by_key( + &reader_schema.index_of(s.name()).unwrap_or(usize::MAX), + |s| reader_schema.index_of(s.name()).unwrap_or(usize::MIN), + ) { + Ok(i) => i, + Err(i) => i, + }; + + df.insert_column(i, s.new_from_index(0, num_rows)).unwrap(); + } + } else { + for s in hive_columns { + unsafe { df.with_column_unchecked(s.new_from_index(0, num_rows)) }; + } } } } @@ -294,7 +313,12 @@ fn rg_to_dfs_optionally_par_over_columns( df.with_row_index_mut(&rc.name, Some(*previous_row_count + rc.offset)); } - materialize_hive_partitions(&mut df, hive_partition_columns, projection_height); + materialize_hive_partitions( + &mut df, + schema.as_ref(), + hive_partition_columns, + projection_height, + ); apply_predicate(&mut df, predicate, true)?; *previous_row_count += current_row_count; @@ -382,7 +406,12 @@ fn rg_to_dfs_par_over_rg( df.with_row_index_mut(&rc.name, Some(row_count_start as IdxSize + rc.offset)); } - materialize_hive_partitions(&mut df, hive_partition_columns, projection_height); + materialize_hive_partitions( + &mut df, + schema.as_ref(), + hive_partition_columns, + projection_height, + ); apply_predicate(&mut df, predicate, false)?; Ok(Some(df)) diff --git a/crates/polars-io/src/parquet/read/utils.rs b/crates/polars-io/src/parquet/read/utils.rs index f3b79f3cd756..d7a2faa37a40 100644 --- a/crates/polars-io/src/parquet/read/utils.rs +++ b/crates/polars-io/src/parquet/read/utils.rs @@ -24,7 +24,7 @@ pub fn materialize_empty_df( .unwrap(); } - materialize_hive_partitions(&mut df, hive_partition_columns, 0); + materialize_hive_partitions(&mut df, reader_schema, hive_partition_columns, 0); df } From cabd51752f43333a5628fe9677e52817fe037963 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Wed, 26 Jun 2024 20:51:38 +1000 Subject: [PATCH 08/11] c --- crates/polars-core/src/frame/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index 36ab89ef03cd..ab172e4ddc47 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -1121,7 +1121,7 @@ impl DataFrame { series: Series, ) -> PolarsResult<&mut Self> { polars_ensure!( - self.height() == 0 || series.len() == self.height(), + self.width() == 0 || series.len() == self.height(), ShapeMismatch: "unable to add a column of length {} to a DataFrame of height {}", series.len(), self.height(), ); From ea934d1349205e8670207df85f58e7983ed9ebad Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Wed, 26 Jun 2024 22:07:26 +1000 Subject: [PATCH 09/11] c --- crates/polars-io/src/options.rs | 3 --- crates/polars-io/src/parquet/read/read_impl.rs | 1 + crates/polars-mem-engine/src/planner/lp.rs | 2 +- crates/polars-plan/src/plans/hive.rs | 1 + py-polars/src/lazyframe/mod.rs | 1 - 5 files changed, 3 insertions(+), 5 deletions(-) diff --git a/crates/polars-io/src/options.rs b/crates/polars-io/src/options.rs index 3310953a46ad..607e66052e29 100644 --- a/crates/polars-io/src/options.rs +++ b/crates/polars-io/src/options.rs @@ -22,8 +22,6 @@ pub struct HiveOptions { pub enabled: Option, pub hive_start_idx: usize, pub schema: Option, - /// This is false if the partitions columns are contained in the file - pub materialize: bool, } impl Default for HiveOptions { @@ -32,7 +30,6 @@ impl Default for HiveOptions { enabled: Some(true), hive_start_idx: 0, schema: None, - materialize: true, } } } diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 2d36ddd7b225..cd869b96638c 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -168,6 +168,7 @@ pub(crate) fn materialize_hive_partitions( }; if reader_schema.index_of(first.name()).is_some() { + // Insert these hive columns in the order they are stored in the file. for s in hive_columns { let i = match df.get_columns().binary_search_by_key( &reader_schema.index_of(s.name()).unwrap_or(usize::MAX), diff --git a/crates/polars-mem-engine/src/planner/lp.rs b/crates/polars-mem-engine/src/planner/lp.rs index fe044941a0fd..323e6078e0b1 100644 --- a/crates/polars-mem-engine/src/planner/lp.rs +++ b/crates/polars-mem-engine/src/planner/lp.rs @@ -284,7 +284,7 @@ fn create_physical_plan_impl( } => Ok(Box::new(executors::ParquetExec::new( paths, file_info, - hive_parts.filter(|_| file_options.hive_options.materialize), + hive_parts, predicate, options, cloud_options, diff --git a/crates/polars-plan/src/plans/hive.rs b/crates/polars-plan/src/plans/hive.rs index 497a128528ae..44937dd3ecef 100644 --- a/crates/polars-plan/src/plans/hive.rs +++ b/crates/polars-plan/src/plans/hive.rs @@ -106,6 +106,7 @@ pub fn hive_partitions_from_paths( PlHashMap::with_capacity(16); for (name, _) in get_hive_parts_iter!(path_string) { + // If the column is also in the file we can use the dtype stored there. if let Some(dtype) = reader_schema.get(name) { hive_schema.insert_at_index(hive_schema.len(), name.into(), dtype.clone())?; continue; diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index 990c3a48467e..e4b9794868d0 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -336,7 +336,6 @@ impl PyLazyFrame { enabled: hive_partitioning, hive_start_idx: 0, schema: hive_schema, - materialize: true, }; let args = ScanArgsParquet { From bb9610510246fc4b05540a2fad6837282fd547e8 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Wed, 26 Jun 2024 15:13:17 +0200 Subject: [PATCH 10/11] Update py-polars/tests/unit/io/test_hive.py --- py-polars/tests/unit/io/test_hive.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index f3307d84b463..7602f54344d6 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -528,7 +528,7 @@ def test_hive_partition_force_async_17155(tmp_path: Path, monkeypatch: Any) -> N lf.collect(), pl.DataFrame({k: [1, 2, 3] for k in ["x", "a", "b"]}) ) - +@pytest.mark.write_disk() @pytest.mark.parametrize("projection_pushdown", [True, False]) def test_hive_partition_columns_contained_in_file( tmp_path: Path, projection_pushdown: bool From 0d77247a0f842ffe252cfd99e288dd82d7511eb5 Mon Sep 17 00:00:00 2001 From: ritchie Date: Wed, 26 Jun 2024 15:38:35 +0200 Subject: [PATCH 11/11] fmt --- py-polars/tests/unit/io/test_hive.py | 1 + 1 file changed, 1 insertion(+) diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 7602f54344d6..4563885c72ad 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -528,6 +528,7 @@ def test_hive_partition_force_async_17155(tmp_path: Path, monkeypatch: Any) -> N lf.collect(), pl.DataFrame({k: [1, 2, 3] for k in ["x", "a", "b"]}) ) + @pytest.mark.write_disk() @pytest.mark.parametrize("projection_pushdown", [True, False]) def test_hive_partition_columns_contained_in_file(