diff --git a/crates/polars-arrow/src/array/iterator.rs b/crates/polars-arrow/src/array/iterator.rs index 46b585ef2a36..59e71968fde7 100644 --- a/crates/polars-arrow/src/array/iterator.rs +++ b/crates/polars-arrow/src/array/iterator.rs @@ -117,3 +117,12 @@ impl<'a, A: ArrayAccessor<'a> + ?Sized> Iterator for NonNullValuesIter<'a, A> { } unsafe impl<'a, A: ArrayAccessor<'a> + ?Sized> TrustedLen for NonNullValuesIter<'a, A> {} + +impl<'a, A: ?Sized> Clone for NonNullValuesIter<'a, A> { + fn clone(&self) -> Self { + Self { + accessor: self.accessor, + idxs: self.idxs.clone(), + } + } +} diff --git a/crates/polars-arrow/src/bitmap/iterator.rs b/crates/polars-arrow/src/bitmap/iterator.rs index 63c61afd83f7..bd48fb706c0a 100644 --- a/crates/polars-arrow/src/bitmap/iterator.rs +++ b/crates/polars-arrow/src/bitmap/iterator.rs @@ -25,6 +25,7 @@ fn calc_iters_remaining(length: usize, min_length_for_iter: usize, consume: usiz 1 + obvious_iters // Thus always exactly 1 more iter. } +#[derive(Clone)] pub struct TrueIdxIter<'a> { mask: BitMask<'a>, first_unknown: usize, diff --git a/crates/polars-compute/src/lib.rs b/crates/polars-compute/src/lib.rs index 30efdd59adc7..a0957daeafcc 100644 --- a/crates/polars-compute/src/lib.rs +++ b/crates/polars-compute/src/lib.rs @@ -1,4 +1,6 @@ #![cfg_attr(feature = "simd", feature(portable_simd))] +#![cfg_attr(feature = "simd", feature(core_intrinsics))] // For fadd_algebraic. +#![cfg_attr(feature = "simd", allow(internal_features))] #![cfg_attr(feature = "simd", feature(avx512_target_feature))] #![cfg_attr( all(feature = "simd", target_arch = "x86_64"), @@ -21,6 +23,7 @@ pub mod if_then_else; pub mod min_max; pub mod size; pub mod unique; +pub mod var_cov; // Trait to enable the scalar blanket implementation. pub trait NotSimdPrimitive: NativeType {} diff --git a/crates/polars-compute/src/var_cov.rs b/crates/polars-compute/src/var_cov.rs new file mode 100644 index 000000000000..6f932b679800 --- /dev/null +++ b/crates/polars-compute/src/var_cov.rs @@ -0,0 +1,319 @@ +// Some formulae: +// mean_x = sum(weight[i] * x[i]) / sum(weight) +// dp_xy = weighted sum of deviation products of variables x, y, written in +// the paper as simply XY. +// dp_xy = sum(weight[i] * (x[i] - mean_x) * (y[i] - mean_y)) +// +// cov(x, y) = dp_xy / sum(weight) +// var(x) = cov(x, x) +// +// Algorithms from: +// Numerically stable parallel computation of (co-)variance. +// Schubert, E., & Gertz, M. (2018). +// +// Key equations from the paper: +// (17) for mean update, (23) for dp update (and also Table 1). + +use arrow::array::{Array, PrimitiveArray}; +use arrow::types::NativeType; +use num_traits::AsPrimitive; + +const CHUNK_SIZE: usize = 128; + +#[inline(always)] +fn alg_add(a: f64, b: f64) -> f64 { + #[cfg(feature = "simd")] + { + std::intrinsics::fadd_algebraic(a, b) + } + #[cfg(not(feature = "simd"))] + { + a + b + } +} + +fn alg_sum(it: impl IntoIterator) -> f64 { + it.into_iter().fold(0.0, alg_add) +} + +#[derive(Default)] +pub struct VarState { + weight: f64, + mean: f64, + dp: f64, +} + +#[derive(Default)] +pub struct CovState { + weight: f64, + mean_x: f64, + mean_y: f64, + dp_xy: f64, +} + +#[derive(Default)] +pub struct PearsonState { + weight: f64, + mean_x: f64, + mean_y: f64, + dp_xx: f64, + dp_xy: f64, + dp_yy: f64, +} + +impl VarState { + fn new(x: &[f64]) -> Self { + if x.is_empty() { + return Self::default(); + } + + let weight = x.len() as f64; + let mean = alg_sum(x.iter().copied()) / weight; + Self { + weight, + mean, + dp: alg_sum(x.iter().map(|&xi| (xi - mean) * (xi - mean))), + } + } + + pub fn combine(&mut self, other: &Self) { + if other.weight == 0.0 { + return; + } + + let new_weight = self.weight + other.weight; + let inv_weight = 1.0 / new_weight; + let other_weight_frac = other.weight * inv_weight; + let delta_mean = self.mean - other.mean; + let new_mean = self.mean - delta_mean * other_weight_frac; + self.dp += other.dp + other.weight * (new_mean - other.mean) * delta_mean; + self.weight = new_weight; + self.mean = new_mean; + } + + pub fn finalize(&mut self, ddof: u8) -> Option { + if self.weight <= ddof as f64 { + None + } else { + Some(self.dp / (self.weight - ddof as f64)) + } + } +} + +impl CovState { + fn new(x: &[f64], y: &[f64]) -> Self { + assert!(x.len() == y.len()); + if x.is_empty() { + return Self::default(); + } + + let weight = x.len() as f64; + let inv_weight = 1.0 / weight; + let mean_x = alg_sum(x.iter().copied()) * inv_weight; + let mean_y = alg_sum(y.iter().copied()) * inv_weight; + Self { + weight, + mean_x, + mean_y, + dp_xy: alg_sum( + x.iter() + .zip(y) + .map(|(&xi, &yi)| (xi - mean_x) * (yi - mean_y)), + ), + } + } + + pub fn combine(&mut self, other: &Self) { + if other.weight == 0.0 { + return; + } + + let new_weight = self.weight + other.weight; + let inv_weight = 1.0 / new_weight; + let other_weight_frac = other.weight * inv_weight; + let delta_mean_x = self.mean_x - other.mean_x; + let delta_mean_y = self.mean_y - other.mean_y; + let new_mean_x = self.mean_x - delta_mean_x * other_weight_frac; + let new_mean_y = self.mean_y - delta_mean_y * other_weight_frac; + self.dp_xy += other.dp_xy + other.weight * (new_mean_x - other.mean_x) * delta_mean_y; + self.weight = new_weight; + self.mean_x = new_mean_x; + self.mean_y = new_mean_y; + } + + pub fn finalize(&mut self, ddof: u8) -> Option { + if self.weight <= ddof as f64 { + None + } else { + Some(self.dp_xy / (self.weight - ddof as f64)) + } + } +} + +impl PearsonState { + fn new(x: &[f64], y: &[f64]) -> Self { + assert!(x.len() == y.len()); + if x.is_empty() { + return Self::default(); + } + + let weight = x.len() as f64; + let inv_weight = 1.0 / weight; + let mean_x = alg_sum(x.iter().copied()) * inv_weight; + let mean_y = alg_sum(y.iter().copied()) * inv_weight; + let mut dp_xx = 0.0; + let mut dp_xy = 0.0; + let mut dp_yy = 0.0; + for (xi, yi) in x.iter().zip(y.iter()) { + dp_xx = alg_add(dp_xx, (xi - mean_x) * (xi - mean_x)); + dp_xy = alg_add(dp_xy, (xi - mean_x) * (yi - mean_y)); + dp_yy = alg_add(dp_yy, (yi - mean_y) * (yi - mean_y)); + } + Self { + weight, + mean_x, + mean_y, + dp_xx, + dp_xy, + dp_yy, + } + } + + pub fn combine(&mut self, other: &Self) { + if other.weight == 0.0 { + return; + } + + let new_weight = self.weight + other.weight; + let inv_weight = 1.0 / new_weight; + let other_weight_frac = other.weight * inv_weight; + let delta_mean_x = self.mean_x - other.mean_x; + let delta_mean_y = self.mean_y - other.mean_y; + let new_mean_x = self.mean_x - delta_mean_x * other_weight_frac; + let new_mean_y = self.mean_y - delta_mean_y * other_weight_frac; + self.dp_xx += other.dp_xx + other.weight * (new_mean_x - other.mean_x) * delta_mean_x; + self.dp_xy += other.dp_xy + other.weight * (new_mean_x - other.mean_x) * delta_mean_y; + self.dp_yy += other.dp_yy + other.weight * (new_mean_y - other.mean_y) * delta_mean_y; + self.weight = new_weight; + self.mean_x = new_mean_x; + self.mean_y = new_mean_y; + } + + pub fn finalize(&mut self, _ddof: u8) -> f64 { + // The division by sample_weight - ddof on both sides cancels out. + let denom = (self.dp_xx * self.dp_yy).sqrt(); + if denom == 0.0 { + f64::NAN + } else { + self.dp_xy / denom + } + } +} + +fn chunk_as_float(it: I, mut f: F) +where + T: NativeType + AsPrimitive, + I: IntoIterator, + F: FnMut(&[f64]), +{ + let mut chunk = [0.0; CHUNK_SIZE]; + let mut i = 0; + for val in it { + if i >= CHUNK_SIZE { + f(&chunk); + i = 0; + } + chunk[i] = val.as_(); + i += 1; + } + if i > 0 { + f(&chunk[..i]); + } +} + +fn chunk_as_float_binary(it: I, mut f: F) +where + T: NativeType + AsPrimitive, + U: NativeType + AsPrimitive, + I: IntoIterator, + F: FnMut(&[f64], &[f64]), +{ + let mut left_chunk = [0.0; CHUNK_SIZE]; + let mut right_chunk = [0.0; CHUNK_SIZE]; + let mut i = 0; + for (l, r) in it { + if i >= CHUNK_SIZE { + f(&left_chunk, &right_chunk); + i = 0; + } + left_chunk[i] = l.as_(); + right_chunk[i] = r.as_(); + i += 1; + } + if i > 0 { + f(&left_chunk[..i], &right_chunk[..i]); + } +} + +pub fn var(arr: &PrimitiveArray) -> VarState +where + T: NativeType + AsPrimitive, +{ + let mut out = VarState::default(); + if arr.has_nulls() { + chunk_as_float(arr.non_null_values_iter(), |chunk| { + out.combine(&VarState::new(chunk)) + }); + } else { + chunk_as_float(arr.values().iter().copied(), |chunk| { + out.combine(&VarState::new(chunk)) + }); + } + out +} + +pub fn cov(x: &PrimitiveArray, y: &PrimitiveArray) -> CovState +where + T: NativeType + AsPrimitive, + U: NativeType + AsPrimitive, +{ + assert!(x.len() == y.len()); + let mut out = CovState::default(); + if x.has_nulls() || y.has_nulls() { + chunk_as_float_binary( + x.iter() + .zip(y.iter()) + .filter_map(|(l, r)| l.copied().zip(r.copied())), + |l, r| out.combine(&CovState::new(l, r)), + ); + } else { + chunk_as_float_binary( + x.values().iter().copied().zip(y.values().iter().copied()), + |l, r| out.combine(&CovState::new(l, r)), + ); + } + out +} + +pub fn pearson_corr(x: &PrimitiveArray, y: &PrimitiveArray) -> PearsonState +where + T: NativeType + AsPrimitive, + U: NativeType + AsPrimitive, +{ + assert!(x.len() == y.len()); + let mut out = PearsonState::default(); + if x.has_nulls() || y.has_nulls() { + chunk_as_float_binary( + x.iter() + .zip(y.iter()) + .filter_map(|(l, r)| l.copied().zip(r.copied())), + |l, r| out.combine(&PearsonState::new(l, r)), + ); + } else { + chunk_as_float_binary( + x.values().iter().copied().zip(y.values().iter().copied()), + |l, r| out.combine(&PearsonState::new(l, r)), + ); + } + out +} diff --git a/crates/polars-core/src/chunked_array/ops/aggregate/var.rs b/crates/polars-core/src/chunked_array/ops/aggregate/var.rs index 1ca04cc2b30e..ea332f0cc432 100644 --- a/crates/polars-core/src/chunked_array/ops/aggregate/var.rs +++ b/crates/polars-core/src/chunked_array/ops/aggregate/var.rs @@ -1,4 +1,4 @@ -use arity::unary_elementwise_values; +use polars_compute::var_cov::VarState; use super::*; @@ -15,20 +15,11 @@ where ChunkedArray: ChunkAgg, { fn var(&self, ddof: u8) -> Option { - let n_values = self.len() - self.null_count(); - if n_values <= ddof as usize { - return None; + let mut out = VarState::default(); + for arr in self.downcast_iter() { + out.combine(&polars_compute::var_cov::var(arr)) } - - let mean = self.mean()?; - let squared: Float64Chunked = unary_elementwise_values(self, |value| { - let tmp = value.to_f64().unwrap() - mean; - tmp * tmp - }); - - squared - .sum() - .map(|sum| sum / (n_values as f64 - ddof as f64)) + out.finalize(ddof) } fn std(&self, ddof: u8) -> Option { diff --git a/crates/polars-ops/src/chunked_array/cov.rs b/crates/polars-ops/src/chunked_array/cov.rs index 5a9b952097b8..dbfa6b48f4fb 100644 --- a/crates/polars-ops/src/chunked_array/cov.rs +++ b/crates/polars-ops/src/chunked_array/cov.rs @@ -1,196 +1,34 @@ -use num_traits::{ToPrimitive, Zero}; -use polars_compute::float_sum::FloatSum; +use num_traits::AsPrimitive; +use polars_compute::var_cov::{CovState, PearsonState}; use polars_core::prelude::*; use polars_core::utils::align_chunks_binary; -const COV_BUF_SIZE: usize = 64; - -/// Calculates the sum of x[i] * y[i] from 0..k. -fn multiply_sum(x: &[f64; COV_BUF_SIZE], y: &[f64; COV_BUF_SIZE], k: usize) -> f64 { - assert!(k <= COV_BUF_SIZE); - let tmp: [f64; COV_BUF_SIZE] = std::array::from_fn(|i| x[i] * y[i]); - FloatSum::sum(&tmp[..k]) -} - /// Compute the covariance between two columns. pub fn cov(a: &ChunkedArray, b: &ChunkedArray, ddof: u8) -> Option where T: PolarsNumericType, - T::Native: ToPrimitive, -{ - if a.len() != b.len() { - None - } else { - let (a, b) = align_chunks_binary(a, b); - - let out = if a.null_count() > 0 || b.null_count() > 0 { - let iters = a.downcast_iter().zip(b.downcast_iter()).map(|(a, b)| { - a.into_iter().zip(b).filter_map(|(a, b)| match (a, b) { - (Some(a), Some(b)) => Some((*a, *b)), - _ => None, - }) - }); - online_cov(iters, ddof) - } else { - let iters = a - .downcast_iter() - .zip(b.downcast_iter()) - .map(|(a, b)| a.values_iter().copied().zip(b.values_iter().copied())); - online_cov(iters, ddof) - }; - Some(out) - } -} - -/// # Arguments -/// `iter` - Iterator over `T` tuple where any `Option` would skip the tuple. -fn online_cov(iters: I, ddof: u8) -> f64 -where - I: Iterator, - J: IntoIterator + Clone, - T: ToPrimitive, + T::Native: AsPrimitive, + ChunkedArray: ChunkVar, { - // The algorithm is derived from - // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Weighted_batched_version - // We simply set the weights to 1.0. This allows us to simplify the expressions - // a lot, and move out subtractions out of sums. - let mut mean_x = 0.0; - let mut mean_y = 0.0; - let mut cxy = 0.0; - let mut n = 0.0; - - let mut x_tmp = [0.0; COV_BUF_SIZE]; - let mut y_tmp = [0.0; COV_BUF_SIZE]; - - for iter in iters { - let mut iter = iter.clone().into_iter(); - - loop { - let mut k = 0; - for (x, y) in iter.by_ref().take(COV_BUF_SIZE) { - let x = x.to_f64().unwrap(); - let y = y.to_f64().unwrap(); - - x_tmp[k] = x; - y_tmp[k] = y; - k += 1; - } - if k == 0 { - break; - } - - // TODO: combine these all in one SIMD'ized pass. - let xsum: f64 = FloatSum::sum(&x_tmp[..k]); - let ysum: f64 = FloatSum::sum(&y_tmp[..k]); - let xysum = multiply_sum(&x_tmp, &y_tmp, k); - - let old_mean_x = mean_x; - let old_mean_y = mean_y; - n += k as f64; - mean_x += (xsum - k as f64 * old_mean_x) / n; - mean_y += (ysum - k as f64 * old_mean_y) / n; - - cxy += xysum - xsum * old_mean_y - ysum * mean_x + mean_x * old_mean_y * (k as f64); - } + let (a, b) = align_chunks_binary(a, b); + let mut out = CovState::default(); + for (a, b) in a.downcast_iter().zip(b.downcast_iter()) { + out.combine(&polars_compute::var_cov::cov(a, b)) } - - cxy / (n - ddof as f64) + out.finalize(ddof) } /// Compute the pearson correlation between two columns. pub fn pearson_corr(a: &ChunkedArray, b: &ChunkedArray, ddof: u8) -> Option where T: PolarsNumericType, - T::Native: ToPrimitive, + T::Native: AsPrimitive, ChunkedArray: ChunkVar, { let (a, b) = align_chunks_binary(a, b); - - let out = if a.null_count() > 0 || b.null_count() > 0 { - let iters = a.downcast_iter().zip(b.downcast_iter()).map(|(a, b)| { - a.into_iter().zip(b).filter_map(|(a, b)| match (a, b) { - (Some(a), Some(b)) => Some((*a, *b)), - _ => None, - }) - }); - online_pearson_corr(iters, ddof) - } else { - let iters = a - .downcast_iter() - .zip(b.downcast_iter()) - .map(|(a, b)| a.values_iter().copied().zip(b.values_iter().copied())); - online_pearson_corr(iters, ddof) - }; - Some(out) -} - -/// # Arguments -/// `iter` - Iterator over `T` tuple where any `Option` would skip the tuple. -fn online_pearson_corr(iters: I, ddof: u8) -> f64 -where - I: Iterator, - J: IntoIterator + Clone, - T: ToPrimitive, -{ - // Algorithm is same as cov, we just maintain cov(X, X), cov(X, Y), and - // cov(Y, Y), noting that var(X) = cov(X, X). - // Then corr(X, Y) = cov(X, Y)/(std(X) * std(Y)). - let mut mean_x = 0.0; - let mut mean_y = 0.0; - let mut cxy = 0.0; - let mut cxx = 0.0; - let mut cyy = 0.0; - let mut n = 0.0; - - let mut x_tmp = [0.0; COV_BUF_SIZE]; - let mut y_tmp = [0.0; COV_BUF_SIZE]; - - for iter in iters { - let mut iter = iter.clone().into_iter(); - - loop { - let mut k = 0; - for (x, y) in iter.by_ref().take(COV_BUF_SIZE) { - let x = x.to_f64().unwrap(); - let y = y.to_f64().unwrap(); - - x_tmp[k] = x; - y_tmp[k] = y; - k += 1; - } - if k == 0 { - break; - } - - // TODO: combine these all in one SIMD'ized pass. - let xsum: f64 = FloatSum::sum(&x_tmp[..k]); - let ysum: f64 = FloatSum::sum(&y_tmp[..k]); - let xxsum = multiply_sum(&x_tmp, &x_tmp, k); - let xysum = multiply_sum(&x_tmp, &y_tmp, k); - let yysum = multiply_sum(&y_tmp, &y_tmp, k); - - let old_mean_x = mean_x; - let old_mean_y = mean_y; - n += k as f64; - mean_x += (xsum - k as f64 * old_mean_x) / n; - mean_y += (ysum - k as f64 * old_mean_y) / n; - - cxx += xxsum - xsum * old_mean_x - xsum * mean_x + mean_x * old_mean_x * (k as f64); - cxy += xysum - xsum * old_mean_y - ysum * mean_x + mean_x * old_mean_y * (k as f64); - cyy += yysum - ysum * old_mean_y - ysum * mean_y + mean_y * old_mean_y * (k as f64); - } - } - - let sample_n = n - ddof as f64; - let sample_cov = cxy / sample_n; - let sample_std_x = (cxx / sample_n).sqrt(); - let sample_std_y = (cyy / sample_n).sqrt(); - - let denom = sample_std_x * sample_std_y; - let result = sample_cov / denom; - if denom.is_zero() { - f64::NAN - } else { - result + let mut out = PearsonState::default(); + for (a, b) in a.downcast_iter().zip(b.downcast_iter()) { + out.combine(&polars_compute::var_cov::pearson_corr(a, b)) } + Some(out.finalize(ddof)) }