diff --git a/crates/polars-arrow/src/legacy/kernels/ewm/variance.rs b/crates/polars-arrow/src/legacy/kernels/ewm/variance.rs index ab4112697839..42a1d0c95e21 100644 --- a/crates/polars-arrow/src/legacy/kernels/ewm/variance.rs +++ b/crates/polars-arrow/src/legacy/kernels/ewm/variance.rs @@ -408,7 +408,7 @@ mod test { ); assert_allclose!( ewm_var(YS.to_vec(), ALPHA, false, true, 0, false), - PrimitiveArray::from([None, Some(0.0), Some(1.0), None, None, Some(4.2), Some(3.1),]), + PrimitiveArray::from([None, Some(0.0), Some(1.0), None, None, Some(4.2), Some(3.1)]), EPS ); assert_allclose!( diff --git a/crates/polars-io/src/parquet/read/mod.rs b/crates/polars-io/src/parquet/read/mod.rs index 14c24bce12ac..5c722c5b77cd 100644 --- a/crates/polars-io/src/parquet/read/mod.rs +++ b/crates/polars-io/src/parquet/read/mod.rs @@ -41,4 +41,5 @@ pub use utils::materialize_empty_df; pub mod _internal { pub use super::mmap::to_deserializer; pub use super::predicates::read_this_row_group; + pub use super::read_impl::{calc_prefilter_cost, PrefilterMaskSetting}; } diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index cefa05a40072..63fb51464038 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -291,19 +291,7 @@ fn rg_to_dfs_prefiltered( debug_assert_eq!(live_idx_to_col_idx.len(), num_live_columns); debug_assert_eq!(dead_idx_to_col_idx.len(), num_dead_columns); - enum MaskSetting { - Auto, - Pre, - Post, - } - - let mask_setting = - std::env::var("POLARS_PQ_PREFILTERED_MASK").map_or(MaskSetting::Auto, |v| match &v[..] { - "auto" => MaskSetting::Auto, - "pre" => MaskSetting::Pre, - "post" => MaskSetting::Post, - _ => panic!("Invalid `POLARS_PQ_PREFILTERED_MASK` value '{v}'."), - }); + let mask_setting = PrefilterMaskSetting::init_from_env(); let dfs: Vec> = POOL.install(|| { // Set partitioned fields to prevent quadratic behavior. @@ -381,29 +369,8 @@ fn rg_to_dfs_prefiltered( return Ok(Some(df)); } - let prefilter_cost = matches!(mask_setting, MaskSetting::Auto) - .then(|| { - let num_edges = filter_mask.num_edges() as f64; - let rg_len = filter_mask.len() as f64; - - // @GB: I did quite some analysis on this. - // - // Pre-filtered and Post-filtered can both be faster in certain scenarios. - // - // - Pre-filtered is faster when there is some amount of clustering or - // sorting involved or if the number of values selected is small. - // - Post-filtering is faster when the predicate selects a somewhat random - // elements throughout the row group. - // - // The following is a heuristic value to try and estimate which one is - // faster. Essentially, it sees how many times it needs to switch between - // skipping items and collecting items and compares it against the number - // of values that it will collect. - // - // Closer to 0: pre-filtering is probably better. - // Closer to 1: post-filtering is probably better. - (num_edges / rg_len).clamp(0.0, 1.0) - }) + let prefilter_cost = matches!(mask_setting, PrefilterMaskSetting::Auto) + .then(|| calc_prefilter_cost(&filter_mask)) .unwrap_or_default(); let rg_columns = (0..num_dead_columns) @@ -450,25 +417,13 @@ fn rg_to_dfs_prefiltered( array.filter(&mask_arr) }; - let array = match mask_setting { - MaskSetting::Auto => { - // Prefiltering is more expensive for nested types so we make the cut-off - // higher. - let is_nested = - schema.get_at_index(col_idx).unwrap().1.dtype.is_nested(); - - // We empirically selected these numbers. - let do_prefilter = (is_nested && prefilter_cost <= 0.01) - || (!is_nested && prefilter_cost <= 0.02); - - if do_prefilter { - pre()? - } else { - post()? - } - }, - MaskSetting::Pre => pre()?, - MaskSetting::Post => post()?, + let array = if mask_setting.should_prefilter( + prefilter_cost, + &schema.get_at_index(col_idx).unwrap().1.dtype, + ) { + pre()? + } else { + post()? }; debug_assert_eq!(array.len(), filter_mask.set_bits()); @@ -1247,3 +1202,58 @@ impl BatchedParquetIter { } } } + +pub fn calc_prefilter_cost(mask: &arrow::bitmap::Bitmap) -> f64 { + let num_edges = mask.num_edges() as f64; + let rg_len = mask.len() as f64; + + // @GB: I did quite some analysis on this. + // + // Pre-filtered and Post-filtered can both be faster in certain scenarios. + // + // - Pre-filtered is faster when there is some amount of clustering or + // sorting involved or if the number of values selected is small. + // - Post-filtering is faster when the predicate selects a somewhat random + // elements throughout the row group. + // + // The following is a heuristic value to try and estimate which one is + // faster. Essentially, it sees how many times it needs to switch between + // skipping items and collecting items and compares it against the number + // of values that it will collect. + // + // Closer to 0: pre-filtering is probably better. + // Closer to 1: post-filtering is probably better. + (num_edges / rg_len).clamp(0.0, 1.0) +} + +pub enum PrefilterMaskSetting { + Auto, + Pre, + Post, +} + +impl PrefilterMaskSetting { + pub fn init_from_env() -> Self { + std::env::var("POLARS_PQ_PREFILTERED_MASK").map_or(Self::Auto, |v| match &v[..] { + "auto" => Self::Auto, + "pre" => Self::Pre, + "post" => Self::Post, + _ => panic!("Invalid `POLARS_PQ_PREFILTERED_MASK` value '{v}'."), + }) + } + + pub fn should_prefilter(&self, prefilter_cost: f64, dtype: &ArrowDataType) -> bool { + match self { + Self::Auto => { + // Prefiltering is more expensive for nested types so we make the cut-off + // higher. + let is_nested = dtype.is_nested(); + + // We empirically selected these numbers. + (is_nested && prefilter_cost <= 0.01) || (!is_nested && prefilter_cost <= 0.02) + }, + Self::Pre => true, + Self::Post => false, + } + } +} diff --git a/crates/polars-io/src/predicates.rs b/crates/polars-io/src/predicates.rs index b46600666c44..317bd420d99a 100644 --- a/crates/polars-io/src/predicates.rs +++ b/crates/polars-io/src/predicates.rs @@ -8,6 +8,7 @@ pub trait PhysicalIoExpr: Send + Sync { fn evaluate_io(&self, df: &DataFrame) -> PolarsResult; /// Get the variables that are used in the expression i.e. live variables. + /// This can contain duplicates. fn live_variables(&self) -> Option>; /// Can take &dyn Statistics and determine of a file should be diff --git a/crates/polars-lazy/src/tests/queries.rs b/crates/polars-lazy/src/tests/queries.rs index 49d7aa120ea4..d1566cbb8680 100644 --- a/crates/polars-lazy/src/tests/queries.rs +++ b/crates/polars-lazy/src/tests/queries.rs @@ -1403,7 +1403,7 @@ fn test_categorical_addition() -> PolarsResult<()> { #[test] fn test_error_duplicate_names() { let df = fruits_cars(); - assert!(df.lazy().select([col("*"), col("*"),]).collect().is_err()); + assert!(df.lazy().select([col("*"), col("*")]).collect().is_err()); } #[test] diff --git a/crates/polars-row/src/encode.rs b/crates/polars-row/src/encode.rs index a415e2fe1915..363a43b6a9e8 100644 --- a/crates/polars-row/src/encode.rs +++ b/crates/polars-row/src/encode.rs @@ -528,7 +528,7 @@ mod test { let c = Utf8ViewArray::from_slice([Some("a"), Some(""), Some("meep")]); let encoded = convert_columns_no_order(&[Box::new(a), Box::new(b), Box::new(c)]); - assert_eq!(encoded.offsets, &[0, 44, 55, 99,]); + assert_eq!(encoded.offsets, &[0, 44, 55, 99]); assert_eq!(encoded.values.len(), 99); assert!(encoded.values.ends_with(&[0, 0, 0, 4])); assert!(encoded.values.starts_with(&[1, 128, 0, 0, 1, 1, 128])); diff --git a/crates/polars-stream/src/nodes/parquet_source/init.rs b/crates/polars-stream/src/nodes/parquet_source/init.rs index 6703acaf47a0..18aebaf62312 100644 --- a/crates/polars-stream/src/nodes/parquet_source/init.rs +++ b/crates/polars-stream/src/nodes/parquet_source/init.rs @@ -6,6 +6,7 @@ use futures::StreamExt; use polars_core::frame::DataFrame; use polars_error::PolarsResult; use polars_io::prelude::ParallelStrategy; +use polars_io::prelude::_internal::PrefilterMaskSetting; use super::row_group_data_fetch::RowGroupDataFetcher; use super::row_group_decode::RowGroupDecoder; @@ -287,12 +288,69 @@ impl ParquetSourceNode { let ideal_morsel_size = get_ideal_morsel_size(); let min_values_per_thread = self.config.min_values_per_thread; - let use_prefiltered = physical_predicate.is_some() + let mut use_prefiltered = physical_predicate.is_some() && matches!( self.options.parallel, ParallelStrategy::Auto | ParallelStrategy::Prefiltered ); + let predicate_arrow_field_indices = if use_prefiltered { + let v = physical_predicate + .as_ref() + .unwrap() + .live_variables() + .and_then(|x| { + let mut out = x + .iter() + // Can be `None` - if the column is e.g. a hive column, or the row index column. + .filter_map(|x| projected_arrow_schema.index_of(x)) + .collect::>(); + + out.sort_unstable(); + out.dedup(); + // There is at least one non-predicate column, or pre-filtering was + // explicitly requested (only useful for testing). + (out.len() < projected_arrow_schema.len() + || matches!(self.options.parallel, ParallelStrategy::Prefiltered)) + .then_some(out) + }); + + use_prefiltered &= v.is_some(); + + v.unwrap_or_default() + } else { + vec![] + }; + + let use_prefiltered = use_prefiltered.then(PrefilterMaskSetting::init_from_env); + + let non_predicate_arrow_field_indices = if use_prefiltered.is_some() { + filtered_range( + predicate_arrow_field_indices.as_slice(), + projected_arrow_schema.len(), + ) + } else { + vec![] + }; + + if use_prefiltered.is_some() && self.verbose { + eprintln!( + "[ParquetSource]: Pre-filtered decode enabled ({} live, {} non-live)", + predicate_arrow_field_indices.len(), + non_predicate_arrow_field_indices.len() + ) + } + + let predicate_arrow_field_mask = if use_prefiltered.is_some() { + let mut out = vec![false; projected_arrow_schema.len()]; + for i in predicate_arrow_field_indices.iter() { + out[*i] = true; + } + out + } else { + vec![] + }; + RowGroupDecoder { scan_sources, hive_partitions, @@ -302,6 +360,9 @@ impl ParquetSourceNode { row_index, physical_predicate, use_prefiltered, + predicate_arrow_field_indices, + non_predicate_arrow_field_indices, + predicate_arrow_field_mask, ideal_morsel_size, min_values_per_thread, } @@ -341,3 +402,30 @@ impl ParquetSourceNode { } } } + +/// Returns 0..len in a Vec, excluding indices in `exclude`. +/// `exclude` needs to be a sorted list of unique values. +fn filtered_range(exclude: &[usize], len: usize) -> Vec { + let mut j = 0; + + (0..len) + .filter(|&i| { + if j == exclude.len() || i != exclude[j] { + true + } else { + j += 1; + false + } + }) + .collect() +} + +mod tests { + + #[test] + fn test_filtered_range() { + use super::filtered_range; + assert_eq!(filtered_range(&[1, 3], 7).as_slice(), &[0, 2, 4, 5, 6]); + assert_eq!(filtered_range(&[1, 6], 7).as_slice(), &[0, 2, 3, 4, 5]); + } +} diff --git a/crates/polars-stream/src/nodes/parquet_source/mod.rs b/crates/polars-stream/src/nodes/parquet_source/mod.rs index dfede52f13cd..0c88ff36ec2d 100644 --- a/crates/polars-stream/src/nodes/parquet_source/mod.rs +++ b/crates/polars-stream/src/nodes/parquet_source/mod.rs @@ -14,7 +14,6 @@ use polars_io::utils::byte_source::DynByteSourceBuilder; use polars_plan::plans::hive::HivePartitions; use polars_plan::plans::{FileInfo, ScanSources}; use polars_plan::prelude::FileScanOptions; -use row_group_decode::RowGroupDecoder; use super::compute_node_prelude::*; use super::{MorselSeq, TaskPriority}; @@ -55,7 +54,6 @@ pub struct ParquetSourceNode { morsel_stream_starter: Option>, // This is behind a Mutex so that we can call `shutdown()` asynchronously. async_task_data: Arc>, - row_group_decoder: Option>, is_finished: Arc, } @@ -120,7 +118,6 @@ impl ParquetSourceNode { morsel_stream_starter: None, async_task_data: Arc::new(tokio::sync::Mutex::new(None)), - row_group_decoder: None, is_finished: Arc::new(AtomicBool::new(false)), } } @@ -166,9 +163,6 @@ impl ComputeNode for ParquetSourceNode { .try_lock() .unwrap() .replace((raw_morsel_receivers, morsel_stream_task_handle)); - - let row_group_decoder = self.init_row_group_decoder(); - self.row_group_decoder = Some(Arc::new(row_group_decoder)); } fn update_state(&mut self, recv: &mut [PortState], send: &mut [PortState]) -> PolarsResult<()> { diff --git a/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs b/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs index 668bf7cee340..e0944203cfe6 100644 --- a/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs +++ b/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs @@ -5,8 +5,11 @@ use polars_core::prelude::{ ArrowField, ArrowSchema, BooleanChunked, ChunkFull, IdxCa, StringChunked, }; use polars_core::series::{IntoSeries, IsSorted, Series}; +use polars_core::utils::arrow::bitmap::{Bitmap, MutableBitmap}; use polars_error::{polars_bail, PolarsResult}; use polars_io::predicates::PhysicalIoExpr; +use polars_io::prelude::_internal::calc_prefilter_cost; +pub use polars_io::prelude::_internal::PrefilterMaskSetting; use polars_io::RowIndex; use polars_plan::plans::hive::HivePartitions; use polars_plan::plans::ScanSources; @@ -26,7 +29,13 @@ pub(super) struct RowGroupDecoder { pub(super) projected_arrow_schema: Arc, pub(super) row_index: Option, pub(super) physical_predicate: Option>, - pub(super) use_prefiltered: bool, + pub(super) use_prefiltered: Option, + /// Indices into `projected_arrow_schema. This must be sorted. + pub(super) predicate_arrow_field_indices: Vec, + /// Indices into `projected_arrow_schema. This must be sorted. + pub(super) non_predicate_arrow_field_indices: Vec, + /// The nth bit is set to `true` if the field at that index is used in the predicate. + pub(super) predicate_arrow_field_mask: Vec, pub(super) ideal_morsel_size: usize, pub(super) min_values_per_thread: usize, } @@ -36,7 +45,7 @@ impl RowGroupDecoder { &self, row_group_data: RowGroupData, ) -> PolarsResult> { - if self.use_prefiltered { + if self.use_prefiltered.is_some() { self.row_group_data_to_df_prefiltered(row_group_data).await } else { self.row_group_data_to_df_impl(row_group_data).await @@ -421,8 +430,175 @@ impl RowGroupDecoder { &self, row_group_data: RowGroupData, ) -> PolarsResult> { - // TODO: actually prefilter - self.row_group_data_to_df_impl(row_group_data).await + debug_assert!(row_group_data.slice.is_none()); // Invariant of the optimizer. + assert!(self.predicate_arrow_field_indices.len() <= self.projected_arrow_schema.len()); + + let prefilter_setting = self.use_prefiltered.as_ref().unwrap(); + + let row_group_data = Arc::new(row_group_data); + + let mut live_columns = { + let capacity = self.row_index.is_some() as usize + + self.predicate_arrow_field_indices.len() + + self.hive_partitions_width + + self.include_file_paths.is_some() as usize; + + Vec::with_capacity(capacity) + }; + + if let Some(s) = self.materialize_row_index( + row_group_data.as_ref(), + 0..row_group_data.row_group_metadata.num_rows(), + )? { + live_columns.push(s); + } + + let shared_file_state = row_group_data + .shared_file_state + .get_or_init(|| self.shared_file_state_init_func(&row_group_data)) + .await; + + assert_eq!(shared_file_state.path_index, row_group_data.path_index); + + let projection_height = row_group_data.row_group_metadata.num_rows(); + + for s in &shared_file_state.hive_series { + debug_assert!(s.len() >= projection_height); + live_columns.push(s.slice(0, projection_height)); + } + + if let Some(file_path_series) = &shared_file_state.file_path_series { + debug_assert!(file_path_series.len() >= projection_height); + live_columns.push(file_path_series.slice(0, projection_height)); + } + + for s in self + .predicate_arrow_field_indices + .iter() + .map(|&i| self.projected_arrow_schema.get_at_index(i).unwrap()) + .map(|(_, arrow_field)| decode_column(arrow_field, &row_group_data, None)) + { + live_columns.push(s?); + } + + let live_df = unsafe { DataFrame::new_no_checks(live_columns) }; + let mask = self + .physical_predicate + .as_deref() + .unwrap() + .evaluate_io(&live_df)?; + let mask = mask.bool().unwrap(); + + let live_df_filtered = unsafe { + DataFrame::new_no_checks( + filter_cols(live_df.take_columns(), mask, self.min_values_per_thread).await?, + ) + }; + + let mask_bitmap = { + let mut mask_bitmap = MutableBitmap::with_capacity(mask.len()); + + for chunk in mask.downcast_iter() { + match chunk.validity() { + None => mask_bitmap.extend_from_bitmap(chunk.values()), + Some(validity) => mask_bitmap.extend_from_bitmap(&(validity & chunk.values())), + } + } + + mask_bitmap.freeze() + }; + + assert_eq!(mask_bitmap.len(), projection_height); + + let prefilter_cost = calc_prefilter_cost(&mask_bitmap); + + let dead_cols_filtered = self + .non_predicate_arrow_field_indices + .iter() + .map(|&i| self.projected_arrow_schema.get_at_index(i).unwrap()) + .map(|(_, arrow_field)| { + decode_column_prefiltered( + arrow_field, + &row_group_data, + prefilter_cost, + prefilter_setting, + mask, + &mask_bitmap, + ) + }) + .collect::>>()?; + + let mut out_columns = + Vec::with_capacity(live_df_filtered.width() + dead_cols_filtered.len()); + + let mut iters = [ + dead_cols_filtered.into_iter(), + live_df_filtered.take_columns().into_iter(), + ]; + + // dead_cols_filtered + // [ ..arrow_fields ] + // live_df_filtered + // [ row_index?, ..arrow_fields, ..hive_cols, file_path? ] + + if self.row_index.is_some() { + out_columns.push(iters[1].next().unwrap()); + } + + for is_live in self.predicate_arrow_field_mask.iter() { + out_columns.push(iters[*is_live as usize].next().unwrap()) + } + + let [dead_rem, live_rem] = iters; + + out_columns.extend(live_rem); // optional hive cols, file path col + assert_eq!(dead_rem.len(), 0); + + let df = unsafe { DataFrame::new_no_checks(out_columns) }; + Ok(self.split_to_morsels(df)) + } +} + +fn decode_column_prefiltered( + arrow_field: &ArrowField, + row_group_data: &RowGroupData, + prefilter_cost: f64, + prefilter_setting: &PrefilterMaskSetting, + mask: &BooleanChunked, + mask_bitmap: &Bitmap, +) -> PolarsResult { + let columns_to_deserialize = row_group_data + .row_group_metadata + .columns_under_root_iter(&arrow_field.name) + .map(|col_md| { + let byte_range = col_md.byte_range(); + + ( + col_md, + row_group_data + .fetched_bytes + .get_range(byte_range.start as usize..byte_range.end as usize), + ) + }) + .collect::>(); + + let prefilter = prefilter_setting.should_prefilter(prefilter_cost, &arrow_field.dtype); + + let deserialize_filter = + prefilter.then(|| polars_parquet::read::Filter::Mask(mask_bitmap.clone())); + + let array = polars_io::prelude::_internal::to_deserializer( + columns_to_deserialize, + arrow_field.clone(), + deserialize_filter, + )?; + + let series = Series::try_from((arrow_field, array))?; + + if !prefilter { + series.filter(mask) + } else { + Ok(series) } } diff --git a/crates/polars/tests/it/core/rolling_window.rs b/crates/polars/tests/it/core/rolling_window.rs index a58280e09345..609b101ce9f4 100644 --- a/crates/polars/tests/it/core/rolling_window.rs +++ b/crates/polars/tests/it/core/rolling_window.rs @@ -156,7 +156,7 @@ fn test_rolling_mean() { let out = out.f64().unwrap(); assert_eq!( Vec::from(out), - &[None, Some(4.5), Some(7.0), Some(4.0), Some(9.0), Some(13.0),] + &[None, Some(4.5), Some(7.0), Some(4.0), Some(9.0), Some(13.0)] ); } @@ -190,7 +190,7 @@ fn test_rolling_map() { assert_eq!( Vec::from(out), - &[None, None, Some(3.0), None, None, None, None,] + &[None, None, Some(3.0), None, None, None, None] ); } @@ -234,7 +234,7 @@ fn test_rolling_var() { let out = out.i32().unwrap(); assert_eq!( Vec::from(out), - &[None, None, Some(1), None, None, None, None,] + &[None, None, Some(1), None, None, None, None] ); let s = Float64Chunked::from_slice("".into(), &[0.0, 2.0, 8.0, 3.0, 12.0, 1.0]).into_series(); @@ -247,7 +247,7 @@ fn test_rolling_var() { assert_eq!( Vec::from(out), - &[None, None, Some(17), Some(10), Some(20), Some(34),] + &[None, None, Some(17), Some(10), Some(20), Some(34)] ); // check centered rolling window diff --git a/crates/polars/tests/it/io/csv.rs b/crates/polars/tests/it/io/csv.rs index 992754436c60..7c08998e69af 100644 --- a/crates/polars/tests/it/io/csv.rs +++ b/crates/polars/tests/it/io/csv.rs @@ -965,7 +965,7 @@ fn test_infer_schema_eol() -> PolarsResult<()> { let no_eol = "colx,coly\nabcdef,1234"; let file = Cursor::new(no_eol); let df = CsvReader::new(file).finish()?; - assert_eq!(df.dtypes(), &[DataType::String, DataType::Int64,]); + assert_eq!(df.dtypes(), &[DataType::String, DataType::Int64]); Ok(()) } diff --git a/crates/polars/tests/it/lazy/predicate_queries.rs b/crates/polars/tests/it/lazy/predicate_queries.rs index ac180917fab3..63cccd65aeaa 100644 --- a/crates/polars/tests/it/lazy/predicate_queries.rs +++ b/crates/polars/tests/it/lazy/predicate_queries.rs @@ -197,7 +197,7 @@ fn test_binaryexpr_pushdown_left_join_9506() -> PolarsResult<()> { fn test_count_blocked_at_union_3963() -> PolarsResult<()> { let lf1 = df![ "k" => ["x", "x", "y"], - "v" => [3, 2, 6,] + "v" => [3, 2, 6] ]? .lazy();