From d464aeca774dc60e04c2be458eeedc06ce583dd6 Mon Sep 17 00:00:00 2001 From: ritchie Date: Sun, 9 Jun 2024 19:56:30 +0200 Subject: [PATCH] refactor(rust): Remove unneeded code --- crates/polars-core/src/frame/mod.rs | 37 +------ crates/polars-core/src/series/mod.rs | 97 +------------------ crates/polars-core/src/utils/mod.rs | 37 +------ crates/polars-core/src/utils/series.rs | 2 +- crates/polars-ops/src/chunked_array/top_k.rs | 8 +- .../src/frame/join/hash_join/mod.rs | 2 +- .../join/hash_join/single_keys_dispatch.rs | 17 ++-- 7 files changed, 17 insertions(+), 183 deletions(-) diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index 93f965867110..2d264fe50115 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -9,7 +9,9 @@ use rayon::prelude::*; #[cfg(feature = "algorithm_group_by")] use crate::chunked_array::ops::unique::is_unique_helper; use crate::prelude::*; -use crate::utils::{slice_offsets, split_ca, split_df, try_get_supertype, NoNull}; +#[cfg(feature = "row_hash")] +use crate::utils::split_df; +use crate::utils::{slice_offsets, try_get_supertype, NoNull}; #[cfg(feature = "dataframe_arithmetic")] mod arithmetic; @@ -1623,36 +1625,6 @@ impl DataFrame { opt_idx.and_then(|idx| self.select_at_idx_mut(idx)) } - /// Does a filter but splits thread chunks vertically instead of horizontally - /// This yields a DataFrame with `n_chunks == n_threads`. - fn filter_vertical(&mut self, mask: &BooleanChunked) -> PolarsResult { - let n_threads = POOL.current_num_threads(); - - let masks = split_ca(mask, n_threads).unwrap(); - let dfs = split_df(self, n_threads, false); - let dfs: PolarsResult> = POOL.install(|| { - masks - .par_iter() - .zip(dfs) - .map(|(mask, df)| { - let cols = df - .columns - .iter() - .map(|s| s.filter(mask)) - .collect::>()?; - Ok(unsafe { DataFrame::new_no_checks(cols) }) - }) - .collect() - }); - - let mut iter = dfs?.into_iter(); - let first = iter.next().unwrap(); - Ok(iter.fold(first, |mut acc, df| { - acc.vstack_mut(&df).unwrap(); - acc - })) - } - /// Take the [`DataFrame`] rows by a boolean mask. /// /// # Example @@ -1665,9 +1637,6 @@ impl DataFrame { /// } /// ``` pub fn filter(&self, mask: &BooleanChunked) -> PolarsResult { - if std::env::var("POLARS_VERT_PAR").is_ok() { - return self.clone().filter_vertical(mask); - } let new_col = self.try_apply_columns_par(&|s| s.filter(mask))?; Ok(unsafe { DataFrame::new_no_checks(new_col) }) } diff --git a/crates/polars-core/src/series/mod.rs b/crates/polars-core/src/series/mod.rs index c02862cc7458..54bf60f395c5 100644 --- a/crates/polars-core/src/series/mod.rs +++ b/crates/polars-core/src/series/mod.rs @@ -23,16 +23,13 @@ use arrow::offset::Offsets; pub use from::*; pub use iterator::{SeriesIter, SeriesPhysIter}; use num_traits::NumCast; -use rayon::prelude::*; pub use series_trait::{IsSorted, *}; use crate::chunked_array::cast::CastOptions; use crate::chunked_array::metadata::{Metadata, MetadataFlags}; #[cfg(feature = "zip_with")] use crate::series::arithmetic::coerce_lhs_rhs; -use crate::utils::{ - _split_offsets, handle_casting_failures, materialize_dyn_int, split_ca, split_series, Wrap, -}; +use crate::utils::{handle_casting_failures, materialize_dyn_int, Wrap}; use crate::POOL; /// # Series @@ -618,41 +615,6 @@ impl Series { } } - fn finish_take_threaded(&self, s: Vec, rechunk: bool) -> Series { - let s = s - .into_iter() - .reduce(|mut s, s1| { - s.append(&s1).unwrap(); - s - }) - .unwrap(); - if rechunk { - s.rechunk() - } else { - s - } - } - - // Take a function pointer to reduce bloat. - fn threaded_op( - &self, - rechunk: bool, - len: usize, - func: &(dyn Fn(usize, usize) -> PolarsResult + Send + Sync), - ) -> PolarsResult { - let n_threads = POOL.current_num_threads(); - let offsets = _split_offsets(len, n_threads); - - let series: PolarsResult> = POOL.install(|| { - offsets - .into_par_iter() - .map(|(offset, len)| func(offset, len)) - .collect() - }); - - Ok(self.finish_take_threaded(series?, rechunk)) - } - /// Take by index if ChunkedArray contains a single chunk. /// /// # Safety @@ -661,40 +623,6 @@ impl Series { self.take_slice_unchecked(idx) } - /// Take by index if ChunkedArray contains a single chunk. - /// - /// # Safety - /// This doesn't check any bounds. Null validity is checked. - pub unsafe fn take_unchecked_threaded(&self, idx: &IdxCa, rechunk: bool) -> Series { - self.threaded_op(rechunk, idx.len(), &|offset, len| { - let idx = idx.slice(offset as i64, len); - Ok(self.take_unchecked(&idx)) - }) - .unwrap() - } - - /// Take by index if ChunkedArray contains a single chunk. - /// - /// # Safety - /// This doesn't check any bounds. Null validity is checked. - pub unsafe fn take_slice_unchecked_threaded(&self, idx: &[IdxSize], rechunk: bool) -> Series { - self.threaded_op(rechunk, idx.len(), &|offset, len| { - Ok(self.take_slice_unchecked(&idx[offset..offset + len])) - }) - .unwrap() - } - - /// Take by index. This operation is clone. - /// - /// # Notes - /// Out of bounds access doesn't Error but will return a Null value - pub fn take_threaded(&self, idx: &IdxCa, rechunk: bool) -> PolarsResult { - self.threaded_op(rechunk, idx.len(), &|offset, len| { - let idx = idx.slice(offset as i64, len); - self.take(&idx) - }) - } - /// Traverse and collect every nth element in a new array. pub fn gather_every(&self, n: usize, offset: usize) -> Series { let idx = ((offset as IdxSize)..self.len() as IdxSize) @@ -704,29 +632,6 @@ impl Series { unsafe { self.take_unchecked(&idx) } } - /// Filter by boolean mask. This operation clones data. - pub fn filter_threaded(&self, filter: &BooleanChunked, rechunk: bool) -> PolarsResult { - // This would fail if there is a broadcasting filter, because we cannot - // split that filter over threads besides they are a no-op, so we do the - // standard filter. - if filter.len() == 1 { - return self.filter(filter); - } - let n_threads = POOL.current_num_threads(); - let filters = split_ca(filter, n_threads).unwrap(); - let series = split_series(self, n_threads).unwrap(); - - let series: PolarsResult> = POOL.install(|| { - filters - .par_iter() - .zip(series) - .map(|(filter, s)| s.filter(filter)) - .collect() - }); - - Ok(self.finish_take_threaded(series?, rechunk)) - } - #[cfg(feature = "dot_product")] pub fn dot(&self, other: &Series) -> PolarsResult { (self * other).sum::() diff --git a/crates/polars-core/src/utils/mod.rs b/crates/polars-core/src/utils/mod.rs index 53f1f6231126..c187c81d7131 100644 --- a/crates/polars-core/src/utils/mod.rs +++ b/crates/polars-core/src/utils/mod.rs @@ -81,37 +81,6 @@ pub(crate) fn get_iter_capacity>(iter: &I) -> usize { } } -macro_rules! split_array { - ($ca: expr, $n: expr, $ty : ty) => {{ - if $n == 1 { - return Ok(vec![$ca.clone()]); - } - let total_len = $ca.len(); - let chunk_size = total_len / $n; - - let v = (0..$n) - .map(|i| { - let offset = i * chunk_size; - let len = if i == ($n - 1) { - total_len - offset - } else { - chunk_size - }; - $ca.slice((i * chunk_size) as $ty, len) - }) - .collect(); - Ok(v) - }}; -} - -// This one splits, but doesn't flatten chunks; -pub fn split_ca(ca: &ChunkedArray, n: usize) -> PolarsResult>> -where - T: PolarsDataType, -{ - split_array!(ca, n, i64) -} - // prefer this one over split_ca, as this can push the null_count into the thread pool // returns an `(offset, length)` tuple #[doc(hidden)] @@ -135,11 +104,6 @@ pub fn _split_offsets(len: usize, n: usize) -> Vec<(usize, usize)> { } } -#[doc(hidden)] -pub fn split_series(s: &Series, n: usize) -> PolarsResult> { - split_array!(s, n, i64) -} - #[allow(clippy::len_without_is_empty)] pub trait Container: Clone { fn slice(&self, offset: i64, len: usize) -> Self; @@ -237,6 +201,7 @@ fn split_impl(container: &C, target: usize, chunk_size: usize) -> out } +/// Splits, but doesn't flatten chunks. E.g. a container can still have multiple chunks. pub fn split(container: &C, target: usize) -> Vec { let total_len = container.len(); if total_len == 0 { diff --git a/crates/polars-core/src/utils/series.rs b/crates/polars-core/src/utils/series.rs index 1595101b2a43..adbc563ccd2d 100644 --- a/crates/polars-core/src/utils/series.rs +++ b/crates/polars-core/src/utils/series.rs @@ -17,7 +17,7 @@ where pub fn handle_casting_failures(input: &Series, output: &Series) -> PolarsResult<()> { let failure_mask = !input.is_null() & output.is_null(); - let failures = input.filter_threaded(&failure_mask, false)?; + let failures = input.filter(&failure_mask)?; let additional_info = match (input.dtype(), output.dtype()) { (DataType::String, DataType::Date | DataType::Datetime(_, _)) => { diff --git a/crates/polars-ops/src/chunked_array/top_k.rs b/crates/polars-ops/src/chunked_array/top_k.rs index c5edaaebd64f..9cc10bb903e6 100644 --- a/crates/polars-ops/src/chunked_array/top_k.rs +++ b/crates/polars-ops/src/chunked_array/top_k.rs @@ -287,12 +287,6 @@ fn top_k_by_impl( let idx = _arg_bottom_k(k, by, &mut sort_options)?; - let result = unsafe { - if multithreaded { - src.take_unchecked_threaded(&idx.into_inner(), false) - } else { - src.take_unchecked(&idx.into_inner()) - } - }; + let result = unsafe { src.take_unchecked(&idx.into_inner()) }; Ok(result) } diff --git a/crates/polars-ops/src/frame/join/hash_join/mod.rs b/crates/polars-ops/src/frame/join/hash_join/mod.rs index f9291fdf2da1..4e939e56c0c4 100644 --- a/crates/polars-ops/src/frame/join/hash_join/mod.rs +++ b/crates/polars-ops/src/frame/join/hash_join/mod.rs @@ -7,7 +7,7 @@ mod single_keys_outer; mod single_keys_semi_anti; pub(super) mod sort_merge; use arrow::array::ArrayRef; -use polars_core::utils::{_set_partition_size, split_ca}; +use polars_core::utils::_set_partition_size; use polars_core::POOL; use polars_utils::index::ChunkId; pub(super) use single_keys::*; diff --git a/crates/polars-ops/src/frame/join/hash_join/single_keys_dispatch.rs b/crates/polars-ops/src/frame/join/hash_join/single_keys_dispatch.rs index 1e7180de7d70..bf483c194d72 100644 --- a/crates/polars-ops/src/frame/join/hash_join/single_keys_dispatch.rs +++ b/crates/polars-ops/src/frame/join/hash_join/single_keys_dispatch.rs @@ -1,4 +1,5 @@ use arrow::array::PrimitiveArray; +use polars_core::utils::split; use polars_core::with_match_physical_float_polars_type; use polars_utils::hashing::DirtyHash; use polars_utils::nulls::IsNull; @@ -261,8 +262,8 @@ where { let n_threads = POOL.current_num_threads(); let (a, b, swapped) = det_hash_prone_order!(left, right); - let splitted_a = split_ca(a, n_threads).unwrap(); - let splitted_b = split_ca(b, n_threads).unwrap(); + let splitted_a = split(a, n_threads); + let splitted_b = split(b, n_threads); let splitted_a = get_arrays(&splitted_a); let splitted_b = get_arrays(&splitted_b); @@ -346,8 +347,8 @@ where as ToTotalOrd>::TotalOrdItem: Send + Sync + DirtyHash, { let n_threads = POOL.current_num_threads(); - let splitted_a = split_ca(left, n_threads).unwrap(); - let splitted_b = split_ca(right, n_threads).unwrap(); + let splitted_a = split(left, n_threads); + let splitted_b = split(right, n_threads); match ( left.null_count(), right.null_count(), @@ -405,8 +406,8 @@ where let (a, b, swapped) = det_hash_prone_order!(ca_in, other); let n_partitions = _set_partition_size(); - let splitted_a = split_ca(a, n_partitions).unwrap(); - let splitted_b = split_ca(b, n_partitions).unwrap(); + let splitted_a = split(a, n_partitions); + let splitted_b = split(b, n_partitions); match (a.null_count(), b.null_count()) { (0, 0) => { @@ -496,8 +497,8 @@ where as ToTotalOrd>::TotalOrdItem: Send + Sync + DirtyHash + IsNull, { let n_threads = POOL.current_num_threads(); - let splitted_a = split_ca(left, n_threads).unwrap(); - let splitted_b = split_ca(right, n_threads).unwrap(); + let splitted_a = split(left, n_threads); + let splitted_b = split(right, n_threads); match ( left.null_count(), right.null_count(),