Skip to content

Commit

Permalink
refactor: sum_horizontal to expression architecture
Browse files Browse the repository at this point in the history
  • Loading branch information
reswqa committed Oct 11, 2023
1 parent 32e3652 commit 931fdfa
Show file tree
Hide file tree
Showing 11 changed files with 106 additions and 55 deletions.
2 changes: 1 addition & 1 deletion crates/polars-core/src/chunked_array/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub(crate) mod downcast;
pub(crate) mod explode;
mod explode_and_offsets;
mod extend;
mod fill_null;
pub mod fill_null;
mod filter;
mod for_each;
pub mod full;
Expand Down
1 change: 1 addition & 0 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use polars_arrow::prelude::QuantileInterpolOptions;
use polars_core::frame::explode::MeltArgs;
use polars_core::prelude::*;
use polars_io::RowCount;
use polars_plan::dsl::all_horizontal;
pub use polars_plan::frame::{AllowedOptimizations, OptState};
use polars_plan::global::FETCH_ROWS;
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
Expand Down
68 changes: 68 additions & 0 deletions crates/polars-ops/src/series/ops/horizontal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use std::ops::{BitAnd, BitOr};

use polars_core::prelude::*;
use polars_core::POOL;
use rayon::prelude::*;

pub fn sum_horizontal(s: &[Series]) -> PolarsResult<Series> {
let out = POOL
.install(|| {
s.par_iter()
.try_fold(
|| UInt32Chunked::new("", &[0u32]).into_series(),
|acc, b| {
PolarsResult::Ok(
acc.fill_null(FillNullStrategy::Zero)?
+ b.fill_null(FillNullStrategy::Zero)?,
)
},
)
.try_reduce(
|| UInt32Chunked::new("", &[0u32]).into_series(),
|a, b| {
PolarsResult::Ok(
a.fill_null(FillNullStrategy::Zero)?
+ b.fill_null(FillNullStrategy::Zero)?,
)
},
)
})?
.with_name("sum");
Ok(out)
}

pub fn any_horizontal(s: &[Series]) -> PolarsResult<Series> {
let out = POOL
.install(|| {
s.par_iter()
.try_fold(
|| BooleanChunked::new("", &[false]),
|acc, b| {
let b = b.cast(&DataType::Boolean)?;
let b = b.bool()?;
PolarsResult::Ok((&acc).bitor(b))
},
)
.try_reduce(|| BooleanChunked::new("", [false]), |a, b| Ok(a.bitor(b)))
})?
.with_name("any");
Ok(out.into_series())
}

pub fn all_horizontal(s: &[Series]) -> PolarsResult<Series> {
let out = POOL
.install(|| {
s.par_iter()
.try_fold(
|| BooleanChunked::new("", &[true]),
|acc, b| {
let b = b.cast(&DataType::Boolean)?;
let b = b.bool()?;
PolarsResult::Ok((&acc).bitand(b))
},
)
.try_reduce(|| BooleanChunked::new("", [true]), |a, b| Ok(a.bitand(b)))
})?
.with_name("all");
Ok(out.into_series())
}
2 changes: 2 additions & 0 deletions crates/polars-ops/src/series/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod cut;
mod floor_divide;
#[cfg(feature = "fused")]
mod fused;
mod horizontal;
#[cfg(feature = "convert_index")]
mod index;
#[cfg(feature = "is_first_distinct")]
Expand Down Expand Up @@ -44,6 +45,7 @@ pub use cut::*;
pub use floor_divide::*;
#[cfg(feature = "fused")]
pub use fused::*;
pub use horizontal::*;
#[cfg(feature = "convert_index")]
pub use index::*;
#[cfg(feature = "is_first_distinct")]
Expand Down
49 changes: 10 additions & 39 deletions crates/polars-plan/src/dsl/function_expr/boolean.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use std::ops::{BitAnd, BitOr, Not};

use polars_core::POOL;
use rayon::prelude::*;
use std::ops::Not;

use super::*;
use crate::{map, wrap};
use crate::{map, map_as_slice, wrap};

#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Clone, PartialEq, Debug, Eq, Hash)]
Expand Down Expand Up @@ -99,8 +96,8 @@ impl From<BooleanFunction> for SpecialEq<Arc<dyn SeriesUdf>> {
IsDuplicated => map!(is_duplicated),
#[cfg(feature = "is_in")]
IsIn => wrap!(is_in),
AllHorizontal => wrap!(all_horizontal),
AnyHorizontal => wrap!(any_horizontal),
AllHorizontal => map_as_slice!(all_horizontal),
AnyHorizontal => map_as_slice!(any_horizontal),
Not => map!(not_),
}
}
Expand Down Expand Up @@ -181,38 +178,12 @@ fn is_in(s: &mut [Series]) -> PolarsResult<Option<Series>> {
polars_ops::prelude::is_in(left, other).map(|ca| Some(ca.into_series()))
}

fn any_horizontal(s: &mut [Series]) -> PolarsResult<Option<Series>> {
let mut out = POOL.install(|| {
s.par_iter()
.try_fold(
|| BooleanChunked::new("", &[false]),
|acc, b| {
let b = b.cast(&DataType::Boolean)?;
let b = b.bool()?;
PolarsResult::Ok((&acc).bitor(b))
},
)
.try_reduce(|| BooleanChunked::new("", [false]), |a, b| Ok(a.bitor(b)))
})?;
out.rename("any");
Ok(Some(out.into_series()))
}

fn all_horizontal(s: &mut [Series]) -> PolarsResult<Option<Series>> {
let mut out = POOL.install(|| {
s.par_iter()
.try_fold(
|| BooleanChunked::new("", &[true]),
|acc, b| {
let b = b.cast(&DataType::Boolean)?;
let b = b.bool()?;
PolarsResult::Ok((&acc).bitand(b))
},
)
.try_reduce(|| BooleanChunked::new("", [true]), |a, b| Ok(a.bitand(b)))
})?;
out.rename("all");
Ok(Some(out.into_series()))
fn any_horizontal(s: &[Series]) -> PolarsResult<Series> {
polars_ops::prelude::any_horizontal(s)
}

fn all_horizontal(s: &[Series]) -> PolarsResult<Series> {
polars_ops::prelude::all_horizontal(s)
}

fn not_(s: &Series) -> PolarsResult<Series> {
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-plan/src/dsl/function_expr/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,7 @@ pub(super) fn backward_fill(s: &Series, limit: FillNullLimit) -> PolarsResult<Se
pub(super) fn forward_fill(s: &Series, limit: FillNullLimit) -> PolarsResult<Series> {
s.fill_null(FillNullStrategy::Forward(limit))
}

pub(super) fn sum_horizontal(s: &[Series]) -> PolarsResult<Series> {
polars_ops::prelude::sum_horizontal(s)
}
3 changes: 3 additions & 0 deletions crates/polars-plan/src/dsl/function_expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ pub enum FunctionExpr {
ForwardFill {
limit: FillNullLimit,
},
SumHorizontal,
}

impl Hash for FunctionExpr {
Expand Down Expand Up @@ -427,6 +428,7 @@ impl Display for FunctionExpr {
FfiPlugin { lib, symbol, .. } => return write!(f, "{lib}:{symbol}"),
BackwardFill { .. } => "backward_fill",
ForwardFill { .. } => "forward_fill",
SumHorizontal => "sum_horizontal",
};
write!(f, "{s}")
}
Expand Down Expand Up @@ -746,6 +748,7 @@ impl From<FunctionExpr> for SpecialEq<Arc<dyn SeriesUdf>> {
},
BackwardFill { limit } => map!(dispatch::backward_fill, limit),
ForwardFill { limit } => map!(dispatch::forward_fill, limit),
SumHorizontal => map_as_slice!(dispatch::sum_horizontal),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/dsl/function_expr/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ impl FunctionExpr {
},
BackwardFill { .. } => mapper.with_same_dtype(),
ForwardFill { .. } => mapper.with_same_dtype(),
SumHorizontal => mapper.map_to_supertype(),
}
}
}
Expand Down
27 changes: 14 additions & 13 deletions crates/polars-plan/src/dsl/functions/horizontal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,19 +262,20 @@ pub fn min_horizontal<E: AsRef<[Expr]>>(exprs: E) -> Expr {
///
/// The name of the resulting column will be `"sum"`; use [`alias`](Expr::alias) to choose a different name.
pub fn sum_horizontal<E: AsRef<[Expr]>>(exprs: E) -> Expr {
let mut exprs = exprs.as_ref().to_vec();
let func = |s1: Series, s2: Series| {
Ok(Some(
&s1.fill_null(FillNullStrategy::Zero).unwrap()
+ &s2.fill_null(FillNullStrategy::Zero).unwrap(),
))
};
let init = match exprs.pop() {
Some(e) => e,
// use u32 as that is not cast to float as eagerly
_ => lit(0u32),
};
fold_exprs(init, func, exprs).alias("sum")
let exprs = exprs.as_ref().to_vec();

Expr::Function {
input: exprs,
function: FunctionExpr::SumHorizontal,
options: FunctionOptions {
collect_groups: ApplyOptions::ApplyFlat,
input_wildcard_expansion: true,
auto_explode: true,
cast_to_supertypes: false,
allow_rename: true,
..Default::default()
},
}
}

/// Folds the expressions from left to right keeping the first non-null values.
Expand Down
2 changes: 1 addition & 1 deletion crates/polars/tests/it/lazy/folds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn test_fold_wildcard() -> PolarsResult<()> {
// test if we don't panic due to wildcard
let _out = df1
.lazy()
.select([all_horizontal([col("*").is_not_null()])])
.select([polars_lazy::dsl::all_horizontal([col("*").is_not_null()])])
.collect()?;
Ok(())
}
2 changes: 1 addition & 1 deletion py-polars/tests/unit/test_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ def test_sort_by_err_9259() -> None:
def test_empty_inputs_error() -> None:
df = pl.DataFrame({"col1": [1]})
with pytest.raises(
pl.ComputeError, match="expression: 'fold' didn't get any inputs"
pl.ComputeError, match="expression: 'sum_horizontal' didn't get any inputs"
):
df.select(pl.sum_horizontal(pl.exclude("col1")))

Expand Down

0 comments on commit 931fdfa

Please sign in to comment.