Skip to content

Commit

Permalink
perf: Use vertical parallelism if input is chunked for Filter,`Sele…
Browse files Browse the repository at this point in the history
…ct`,`WithColumns` (#15608)
  • Loading branch information
ritchie46 authored Apr 12, 2024
1 parent d7b8f99 commit 6d7ddcc
Show file tree
Hide file tree
Showing 12 changed files with 249 additions and 101 deletions.
22 changes: 22 additions & 0 deletions crates/polars-core/src/frame/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,25 @@ impl std::convert::TryFrom<(ArrowChunk, &[ArrowField])> for DataFrame {
DataFrame::new(columns?)
}
}

impl DataFrame {
pub fn split_chunks(mut self) -> impl Iterator<Item = DataFrame> {
self.align_chunks();

(0..self.n_chunks()).map(move |i| unsafe {
let columns = self
.get_columns()
.iter()
.map(|s| {
Series::from_chunks_and_dtype_unchecked(
s.name(),
vec![s.chunks()[i].clone()],
s.dtype(),
)
})
.collect::<Vec<_>>();

DataFrame::new_no_checks(columns)
})
}
}
60 changes: 44 additions & 16 deletions crates/polars-lazy/src/physical_plan/executors/filter.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,65 @@
use polars_core::utils::accumulate_dataframes_vertical_unchecked;

use super::*;

pub struct FilterExec {
pub(crate) predicate: Arc<dyn PhysicalExpr>,
pub(crate) input: Box<dyn Executor>,
// if the predicate contains a window function
has_window: bool,
streamable: bool,
}

fn series_to_mask(s: &Series) -> PolarsResult<&BooleanChunked> {
s.bool().map_err(|_| {
polars_err!(
ComputeError: "filter predicate must be of type `Boolean`, got `{}`", s.dtype()
)
})
}

impl FilterExec {
pub fn new(
predicate: Arc<dyn PhysicalExpr>,
input: Box<dyn Executor>,
has_window: bool,
streamable: bool,
) -> Self {
Self {
predicate,
input,
has_window,
streamable,
}
}

fn execute_impl(
&mut self,
df: DataFrame,
state: &mut ExecutionState,
) -> PolarsResult<DataFrame> {
// Vertical parallelism.
let df = if self.streamable && df.n_chunks() > 1 && df.height() > 0 {
let chunks = df.split_chunks().collect::<Vec<_>>();
let iter = chunks.into_par_iter().map(|df| {
let s = self.predicate.evaluate(&df, state)?;
df.filter(series_to_mask(&s)?)
});

let df = POOL.install(|| iter.collect::<PolarsResult<Vec<_>>>())?;
accumulate_dataframes_vertical_unchecked(df)
} else {
if self.has_window {
state.insert_has_window_function_flag()
}
let s = self.predicate.evaluate(&df, state)?;
if self.has_window {
state.clear_window_expr_cache()
}
df.filter(series_to_mask(&s)?)?
};
Ok(df)
}
}

impl Executor for FilterExec {
Expand All @@ -32,32 +73,19 @@ impl Executor for FilterExec {
}
let df = self.input.execute(state)?;

if self.has_window {
state.insert_has_window_function_flag()
}
let s = self.predicate.evaluate(&df, state)?;
if self.has_window {
state.clear_window_expr_cache()
}
let mask = s.bool().map_err(|_| {
polars_err!(
ComputeError: "filter predicate must be of type `Boolean`, got `{}`", s.dtype()
)
})?;

let profile_name = if state.has_node_timer() {
Cow::Owned(format!(".filter({})", &self.predicate.as_ref()))
} else {
Cow::Borrowed("")
};

state.record(
state.clone().record(
|| {
let df = df.filter(mask)?;
let df = self.execute_impl(df, state);
if state.verbose() {
eprintln!("dataframe filtered");
}
Ok(df)
df
},
profile_name,
)
Expand Down
56 changes: 45 additions & 11 deletions crates/polars-lazy/src/physical_plan/executors/projection.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use polars_core::utils::accumulate_dataframes_vertical_unchecked;

use super::*;

/// Take an input Executor (creates the input DataFrame)
Expand All @@ -11,6 +13,8 @@ pub struct ProjectionExec {
#[cfg(test)]
pub(crate) schema: SchemaRef,
pub(crate) options: ProjectionOptions,
// Can run all operations elementwise
pub(crate) streamable: bool,
}

impl ProjectionExec {
Expand All @@ -19,17 +23,47 @@ impl ProjectionExec {
state: &ExecutionState,
mut df: DataFrame,
) -> PolarsResult<DataFrame> {
#[allow(clippy::let_and_return)]
let selected_cols = evaluate_physical_expressions(
&mut df,
&self.cse_exprs,
&self.expr,
state,
self.has_windows,
self.options.run_parallel,
)?;
#[allow(unused_mut)]
let mut df = check_expand_literals(selected_cols, df.height() == 0)?;
// Vertical and horizontal parallelism.
let df =
if self.streamable && df.n_chunks() > 1 && df.height() > 0 && self.options.run_parallel
{
let chunks = df.split_chunks().collect::<Vec<_>>();
let iter = chunks.into_par_iter().map(|mut df| {
let selected_cols = evaluate_physical_expressions(
&mut df,
&self.cse_exprs,
&self.expr,
state,
self.has_windows,
self.options.run_parallel,
)?;
check_expand_literals(
selected_cols,
df.height() == 0,
self.options.duplicate_check,
)
});

let df = POOL.install(|| iter.collect::<PolarsResult<Vec<_>>>())?;
accumulate_dataframes_vertical_unchecked(df)
}
// Only horizontal parallelism.
else {
#[allow(clippy::let_and_return)]
let selected_cols = evaluate_physical_expressions(
&mut df,
&self.cse_exprs,
&self.expr,
state,
self.has_windows,
self.options.run_parallel,
)?;
check_expand_literals(
selected_cols,
df.height() == 0,
self.options.duplicate_check,
)?
};

// this only runs during testing and check if the runtime type matches the predicted schema
#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ pub(super) fn evaluate_physical_expressions(
pub(super) fn check_expand_literals(
mut selected_columns: Vec<Series>,
zero_length: bool,
duplicate_check: bool,
) -> PolarsResult<DataFrame> {
let Some(first_len) = selected_columns.first().map(|s| s.len()) else {
return Ok(DataFrame::empty());
Expand All @@ -285,7 +286,7 @@ pub(super) fn check_expand_literals(
}
let name = s.name();

if !names.insert(name) {
if duplicate_check && !names.insert(name) {
let msg = format!(
"the name: '{}' is duplicate\n\n\
It's possible that multiple expressions are returning the same default column \
Expand Down
52 changes: 41 additions & 11 deletions crates/polars-lazy/src/physical_plan/executors/stack.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use polars_core::utils::accumulate_dataframes_vertical_unchecked;

use super::*;

pub struct StackExec {
Expand All @@ -7,6 +9,8 @@ pub struct StackExec {
pub(crate) exprs: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) input_schema: SchemaRef,
pub(crate) options: ProjectionOptions,
// Can run all operations elementwise
pub(crate) streamable: bool,
}

impl StackExec {
Expand All @@ -15,18 +19,44 @@ impl StackExec {
state: &ExecutionState,
mut df: DataFrame,
) -> PolarsResult<DataFrame> {
let res = evaluate_physical_expressions(
&mut df,
&self.cse_exprs,
&self.exprs,
state,
self.has_windows,
self.options.run_parallel,
)?;
state.clear_window_expr_cache();

let schema = &*self.input_schema;
df._add_columns(res, schema)?;

// Vertical and horizontal parallelism.
let df =
if self.streamable && df.n_chunks() > 1 && df.height() > 0 && self.options.run_parallel
{
let chunks = df.split_chunks().collect::<Vec<_>>();
let iter = chunks.into_par_iter().map(|mut df| {
let res = evaluate_physical_expressions(
&mut df,
&self.cse_exprs,
&self.exprs,
state,
self.has_windows,
self.options.run_parallel,
)?;
df._add_columns(res, schema)?;
Ok(df)
});

let df = POOL.install(|| iter.collect::<PolarsResult<Vec<_>>>())?;
accumulate_dataframes_vertical_unchecked(df)
}
// Only horizontal parallelism
else {
let res = evaluate_physical_expressions(
&mut df,
&self.cse_exprs,
&self.exprs,
state,
self.has_windows,
self.options.run_parallel,
)?;
df._add_columns(res, schema)?;
df
};

state.clear_window_expr_cache();

Ok(df)
}
Expand Down
16 changes: 16 additions & 0 deletions crates/polars-lazy/src/physical_plan/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ pub fn create_physical_plan(
Ok(Box::new(executors::SliceExec { input, offset, len }))
},
Filter { input, predicate } => {
let streamable = is_streamable(predicate.node(), expr_arena, Context::Default);
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
let input = create_physical_plan(input, lp_arena, expr_arena)?;
let mut state = ExpressionConversionState::default();
Expand All @@ -186,6 +187,7 @@ pub fn create_physical_plan(
predicate,
input,
state.has_windows,
streamable,
)))
},
#[allow(unused_variables)]
Expand Down Expand Up @@ -276,6 +278,12 @@ pub fn create_physical_plan(
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
let input = create_physical_plan(input, lp_arena, expr_arena)?;
let mut state = ExpressionConversionState::new(POOL.current_num_threads() > expr.len());

let streamable = if expr.has_sub_exprs() {
false
} else {
all_streamable(&expr, expr_arena, Context::Default)
};
let phys_expr = create_physical_expressions_from_irs(
expr.default_exprs(),
Context::Default,
Expand All @@ -299,6 +307,7 @@ pub fn create_physical_plan(
#[cfg(test)]
schema: _schema,
options,
streamable,
}))
},
DataFrameScan {
Expand Down Expand Up @@ -522,6 +531,12 @@ pub fn create_physical_plan(
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
let input = create_physical_plan(input, lp_arena, expr_arena)?;

let streamable = if exprs.has_sub_exprs() {
false
} else {
all_streamable(&exprs, expr_arena, Context::Default)
};

let mut state =
ExpressionConversionState::new(POOL.current_num_threads() > exprs.len());

Expand All @@ -547,6 +562,7 @@ pub fn create_physical_plan(
exprs: phys_exprs,
input_schema,
options,
streamable,
}))
},
MapFunction {
Expand Down
Loading

0 comments on commit 6d7ddcc

Please sign in to comment.