Skip to content

Commit

Permalink
refactor(rust): Add pre-filtered decode to new parquet source (#18715)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Sep 13, 2024
1 parent 7232554 commit a8e18e6
Show file tree
Hide file tree
Showing 12 changed files with 345 additions and 75 deletions.
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/legacy/kernels/ewm/variance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ mod test {
);
assert_allclose!(
ewm_var(YS.to_vec(), ALPHA, false, true, 0, false),
PrimitiveArray::from([None, Some(0.0), Some(1.0), None, None, Some(4.2), Some(3.1),]),
PrimitiveArray::from([None, Some(0.0), Some(1.0), None, None, Some(4.2), Some(3.1)]),
EPS
);
assert_allclose!(
Expand Down
1 change: 1 addition & 0 deletions crates/polars-io/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,5 @@ pub use utils::materialize_empty_df;
pub mod _internal {
pub use super::mmap::to_deserializer;
pub use super::predicates::read_this_row_group;
pub use super::read_impl::{calc_prefilter_cost, PrefilterMaskSetting};
}
120 changes: 65 additions & 55 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,19 +291,7 @@ fn rg_to_dfs_prefiltered(
debug_assert_eq!(live_idx_to_col_idx.len(), num_live_columns);
debug_assert_eq!(dead_idx_to_col_idx.len(), num_dead_columns);

enum MaskSetting {
Auto,
Pre,
Post,
}

let mask_setting =
std::env::var("POLARS_PQ_PREFILTERED_MASK").map_or(MaskSetting::Auto, |v| match &v[..] {
"auto" => MaskSetting::Auto,
"pre" => MaskSetting::Pre,
"post" => MaskSetting::Post,
_ => panic!("Invalid `POLARS_PQ_PREFILTERED_MASK` value '{v}'."),
});
let mask_setting = PrefilterMaskSetting::init_from_env();

let dfs: Vec<Option<DataFrame>> = POOL.install(|| {
// Set partitioned fields to prevent quadratic behavior.
Expand Down Expand Up @@ -381,29 +369,8 @@ fn rg_to_dfs_prefiltered(
return Ok(Some(df));
}

let prefilter_cost = matches!(mask_setting, MaskSetting::Auto)
.then(|| {
let num_edges = filter_mask.num_edges() as f64;
let rg_len = filter_mask.len() as f64;

// @GB: I did quite some analysis on this.
//
// Pre-filtered and Post-filtered can both be faster in certain scenarios.
//
// - Pre-filtered is faster when there is some amount of clustering or
// sorting involved or if the number of values selected is small.
// - Post-filtering is faster when the predicate selects a somewhat random
// elements throughout the row group.
//
// The following is a heuristic value to try and estimate which one is
// faster. Essentially, it sees how many times it needs to switch between
// skipping items and collecting items and compares it against the number
// of values that it will collect.
//
// Closer to 0: pre-filtering is probably better.
// Closer to 1: post-filtering is probably better.
(num_edges / rg_len).clamp(0.0, 1.0)
})
let prefilter_cost = matches!(mask_setting, PrefilterMaskSetting::Auto)
.then(|| calc_prefilter_cost(&filter_mask))
.unwrap_or_default();

let rg_columns = (0..num_dead_columns)
Expand Down Expand Up @@ -450,25 +417,13 @@ fn rg_to_dfs_prefiltered(
array.filter(&mask_arr)
};

let array = match mask_setting {
MaskSetting::Auto => {
// Prefiltering is more expensive for nested types so we make the cut-off
// higher.
let is_nested =
schema.get_at_index(col_idx).unwrap().1.dtype.is_nested();

// We empirically selected these numbers.
let do_prefilter = (is_nested && prefilter_cost <= 0.01)
|| (!is_nested && prefilter_cost <= 0.02);

if do_prefilter {
pre()?
} else {
post()?
}
},
MaskSetting::Pre => pre()?,
MaskSetting::Post => post()?,
let array = if mask_setting.should_prefilter(
prefilter_cost,
&schema.get_at_index(col_idx).unwrap().1.dtype,
) {
pre()?
} else {
post()?
};

debug_assert_eq!(array.len(), filter_mask.set_bits());
Expand Down Expand Up @@ -1247,3 +1202,58 @@ impl BatchedParquetIter {
}
}
}

pub fn calc_prefilter_cost(mask: &arrow::bitmap::Bitmap) -> f64 {
let num_edges = mask.num_edges() as f64;
let rg_len = mask.len() as f64;

// @GB: I did quite some analysis on this.
//
// Pre-filtered and Post-filtered can both be faster in certain scenarios.
//
// - Pre-filtered is faster when there is some amount of clustering or
// sorting involved or if the number of values selected is small.
// - Post-filtering is faster when the predicate selects a somewhat random
// elements throughout the row group.
//
// The following is a heuristic value to try and estimate which one is
// faster. Essentially, it sees how many times it needs to switch between
// skipping items and collecting items and compares it against the number
// of values that it will collect.
//
// Closer to 0: pre-filtering is probably better.
// Closer to 1: post-filtering is probably better.
(num_edges / rg_len).clamp(0.0, 1.0)
}

pub enum PrefilterMaskSetting {
Auto,
Pre,
Post,
}

impl PrefilterMaskSetting {
pub fn init_from_env() -> Self {
std::env::var("POLARS_PQ_PREFILTERED_MASK").map_or(Self::Auto, |v| match &v[..] {
"auto" => Self::Auto,
"pre" => Self::Pre,
"post" => Self::Post,
_ => panic!("Invalid `POLARS_PQ_PREFILTERED_MASK` value '{v}'."),
})
}

pub fn should_prefilter(&self, prefilter_cost: f64, dtype: &ArrowDataType) -> bool {
match self {
Self::Auto => {
// Prefiltering is more expensive for nested types so we make the cut-off
// higher.
let is_nested = dtype.is_nested();

// We empirically selected these numbers.
(is_nested && prefilter_cost <= 0.01) || (!is_nested && prefilter_cost <= 0.02)
},
Self::Pre => true,
Self::Post => false,
}
}
}
1 change: 1 addition & 0 deletions crates/polars-io/src/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub trait PhysicalIoExpr: Send + Sync {
fn evaluate_io(&self, df: &DataFrame) -> PolarsResult<Series>;

/// Get the variables that are used in the expression i.e. live variables.
/// This can contain duplicates.
fn live_variables(&self) -> Option<Vec<PlSmallStr>>;

/// Can take &dyn Statistics and determine of a file should be
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/tests/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1403,7 +1403,7 @@ fn test_categorical_addition() -> PolarsResult<()> {
#[test]
fn test_error_duplicate_names() {
let df = fruits_cars();
assert!(df.lazy().select([col("*"), col("*"),]).collect().is_err());
assert!(df.lazy().select([col("*"), col("*")]).collect().is_err());
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-row/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ mod test {
let c = Utf8ViewArray::from_slice([Some("a"), Some(""), Some("meep")]);

let encoded = convert_columns_no_order(&[Box::new(a), Box::new(b), Box::new(c)]);
assert_eq!(encoded.offsets, &[0, 44, 55, 99,]);
assert_eq!(encoded.offsets, &[0, 44, 55, 99]);
assert_eq!(encoded.values.len(), 99);
assert!(encoded.values.ends_with(&[0, 0, 0, 4]));
assert!(encoded.values.starts_with(&[1, 128, 0, 0, 1, 1, 128]));
Expand Down
90 changes: 89 additions & 1 deletion crates/polars-stream/src/nodes/parquet_source/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use futures::StreamExt;
use polars_core::frame::DataFrame;
use polars_error::PolarsResult;
use polars_io::prelude::ParallelStrategy;
use polars_io::prelude::_internal::PrefilterMaskSetting;

use super::row_group_data_fetch::RowGroupDataFetcher;
use super::row_group_decode::RowGroupDecoder;
Expand Down Expand Up @@ -287,12 +288,69 @@ impl ParquetSourceNode {
let ideal_morsel_size = get_ideal_morsel_size();
let min_values_per_thread = self.config.min_values_per_thread;

let use_prefiltered = physical_predicate.is_some()
let mut use_prefiltered = physical_predicate.is_some()
&& matches!(
self.options.parallel,
ParallelStrategy::Auto | ParallelStrategy::Prefiltered
);

let predicate_arrow_field_indices = if use_prefiltered {
let v = physical_predicate
.as_ref()
.unwrap()
.live_variables()
.and_then(|x| {
let mut out = x
.iter()
// Can be `None` - if the column is e.g. a hive column, or the row index column.
.filter_map(|x| projected_arrow_schema.index_of(x))
.collect::<Vec<_>>();

out.sort_unstable();
out.dedup();
// There is at least one non-predicate column, or pre-filtering was
// explicitly requested (only useful for testing).
(out.len() < projected_arrow_schema.len()
|| matches!(self.options.parallel, ParallelStrategy::Prefiltered))
.then_some(out)
});

use_prefiltered &= v.is_some();

v.unwrap_or_default()
} else {
vec![]
};

let use_prefiltered = use_prefiltered.then(PrefilterMaskSetting::init_from_env);

let non_predicate_arrow_field_indices = if use_prefiltered.is_some() {
filtered_range(
predicate_arrow_field_indices.as_slice(),
projected_arrow_schema.len(),
)
} else {
vec![]
};

if use_prefiltered.is_some() && self.verbose {
eprintln!(
"[ParquetSource]: Pre-filtered decode enabled ({} live, {} non-live)",
predicate_arrow_field_indices.len(),
non_predicate_arrow_field_indices.len()
)
}

let predicate_arrow_field_mask = if use_prefiltered.is_some() {
let mut out = vec![false; projected_arrow_schema.len()];
for i in predicate_arrow_field_indices.iter() {
out[*i] = true;
}
out
} else {
vec![]
};

RowGroupDecoder {
scan_sources,
hive_partitions,
Expand All @@ -302,6 +360,9 @@ impl ParquetSourceNode {
row_index,
physical_predicate,
use_prefiltered,
predicate_arrow_field_indices,
non_predicate_arrow_field_indices,
predicate_arrow_field_mask,
ideal_morsel_size,
min_values_per_thread,
}
Expand Down Expand Up @@ -341,3 +402,30 @@ impl ParquetSourceNode {
}
}
}

/// Returns 0..len in a Vec, excluding indices in `exclude`.
/// `exclude` needs to be a sorted list of unique values.
fn filtered_range(exclude: &[usize], len: usize) -> Vec<usize> {
let mut j = 0;

(0..len)
.filter(|&i| {
if j == exclude.len() || i != exclude[j] {
true
} else {
j += 1;
false
}
})
.collect()
}

mod tests {

#[test]
fn test_filtered_range() {
use super::filtered_range;
assert_eq!(filtered_range(&[1, 3], 7).as_slice(), &[0, 2, 4, 5, 6]);
assert_eq!(filtered_range(&[1, 6], 7).as_slice(), &[0, 2, 3, 4, 5]);
}
}
6 changes: 0 additions & 6 deletions crates/polars-stream/src/nodes/parquet_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use polars_io::utils::byte_source::DynByteSourceBuilder;
use polars_plan::plans::hive::HivePartitions;
use polars_plan::plans::{FileInfo, ScanSources};
use polars_plan::prelude::FileScanOptions;
use row_group_decode::RowGroupDecoder;

use super::compute_node_prelude::*;
use super::{MorselSeq, TaskPriority};
Expand Down Expand Up @@ -55,7 +54,6 @@ pub struct ParquetSourceNode {
morsel_stream_starter: Option<tokio::sync::oneshot::Sender<()>>,
// This is behind a Mutex so that we can call `shutdown()` asynchronously.
async_task_data: Arc<tokio::sync::Mutex<AsyncTaskData>>,
row_group_decoder: Option<Arc<RowGroupDecoder>>,
is_finished: Arc<AtomicBool>,
}

Expand Down Expand Up @@ -120,7 +118,6 @@ impl ParquetSourceNode {

morsel_stream_starter: None,
async_task_data: Arc::new(tokio::sync::Mutex::new(None)),
row_group_decoder: None,
is_finished: Arc::new(AtomicBool::new(false)),
}
}
Expand Down Expand Up @@ -166,9 +163,6 @@ impl ComputeNode for ParquetSourceNode {
.try_lock()
.unwrap()
.replace((raw_morsel_receivers, morsel_stream_task_handle));

let row_group_decoder = self.init_row_group_decoder();
self.row_group_decoder = Some(Arc::new(row_group_decoder));
}

fn update_state(&mut self, recv: &mut [PortState], send: &mut [PortState]) -> PolarsResult<()> {
Expand Down
Loading

0 comments on commit a8e18e6

Please sign in to comment.