diff --git a/crates/polars-arrow/src/array/dictionary/value_map.rs b/crates/polars-arrow/src/array/dictionary/value_map.rs index d818b7e6b25c..0659e7565200 100644 --- a/crates/polars-arrow/src/array/dictionary/value_map.rs +++ b/crates/polars-arrow/src/array/dictionary/value_map.rs @@ -1,9 +1,9 @@ use std::borrow::Borrow; use std::fmt::{self, Debug}; -use std::hash::{BuildHasherDefault, Hash, Hasher}; +use std::hash::Hash; -use hashbrown::hash_map::RawEntryMut; -use hashbrown::HashMap; +use hashbrown::hash_table::Entry; +use hashbrown::HashTable; use polars_error::{polars_bail, polars_err, PolarsResult}; use polars_utils::aliases::PlRandomState; @@ -12,47 +12,10 @@ use crate::array::indexable::{AsIndexed, Indexable}; use crate::array::{Array, MutableArray}; use crate::datatypes::ArrowDataType; -/// Hasher for pre-hashed values; similar to `hash_hasher` but with native endianness. -/// -/// We know that we'll only use it for `u64` values, so we can avoid endian conversion. -/// -/// Invariant: hash of a u64 value is always equal to itself. -#[derive(Copy, Clone, Default)] -pub struct PassthroughHasher(u64); - -impl Hasher for PassthroughHasher { - #[inline] - fn write_u64(&mut self, value: u64) { - self.0 = value; - } - - fn write(&mut self, _: &[u8]) { - unreachable!(); - } - - #[inline] - fn finish(&self) -> u64 { - self.0 - } -} - -#[derive(Clone)] -pub struct Hashed { - hash: u64, - key: K, -} - -impl Hash for Hashed { - #[inline] - fn hash(&self, state: &mut H) { - self.hash.hash(state) - } -} - #[derive(Clone)] pub struct ValueMap { pub values: M, - pub map: HashMap, (), BuildHasherDefault>, // NB: *only* use insert_hashed_nocheck() and no other hashmap API + pub map: HashTable<(u64, K)>, random_state: PlRandomState, } @@ -63,7 +26,7 @@ impl ValueMap { } Ok(Self { values, - map: HashMap::default(), + map: HashTable::default(), random_state: PlRandomState::default(), }) } @@ -73,10 +36,7 @@ impl ValueMap { M: Indexable, M::Type: Eq + Hash, { - let mut map = HashMap::, _, _>::with_capacity_and_hasher( - values.len(), - BuildHasherDefault::::default(), - ); + let mut map: HashTable<(u64, K)> = HashTable::with_capacity(values.len()); let random_state = PlRandomState::default(); for index in 0..values.len() { let key = K::try_from(index).map_err(|_| polars_err!(ComputeError: "overflow"))?; @@ -84,18 +44,21 @@ impl ValueMap { let value = unsafe { values.value_unchecked_at(index) }; let hash = random_state.hash_one(value.borrow()); - let entry = map.raw_entry_mut().from_hash(hash, |item| { - // SAFETY: invariant of the struct, it's always in bounds since we maintain it - let stored_value = unsafe { values.value_unchecked_at(item.key.as_usize()) }; - stored_value.borrow() == value.borrow() - }); + let entry = map.entry( + hash, + |(_h, key)| { + // SAFETY: invariant of the struct, it's always in bounds. + let stored_value = unsafe { values.value_unchecked_at(key.as_usize()) }; + stored_value.borrow() == value.borrow() + }, + |(h, _key)| *h, + ); match entry { - RawEntryMut::Occupied(_) => { + Entry::Occupied(_) => { polars_bail!(InvalidOperation: "duplicate value in dictionary values array") }, - RawEntryMut::Vacant(entry) => { - // NB: don't use .insert() here! - entry.insert_hashed_nocheck(hash, Hashed { hash, key }, ()); + Entry::Vacant(entry) => { + entry.insert((hash, key)); }, } } @@ -137,19 +100,21 @@ impl ValueMap { M::Type: Eq + Hash, { let hash = self.random_state.hash_one(value.as_indexed()); - let entry = self.map.raw_entry_mut().from_hash(hash, |item| { - // SAFETY: we've already checked (the inverse) when we pushed it, so it should be ok? - let index = unsafe { item.key.as_usize() }; - // SAFETY: invariant of the struct, it's always in bounds since we maintain it - let stored_value = unsafe { self.values.value_unchecked_at(index) }; - stored_value.borrow() == value.as_indexed() - }); + let entry = self.map.entry( + hash, + |(_h, key)| { + // SAFETY: invariant of the struct, it's always in bounds. + let stored_value = unsafe { self.values.value_unchecked_at(key.as_usize()) }; + stored_value.borrow() == value.as_indexed() + }, + |(h, _key)| *h, + ); let out = match entry { - RawEntryMut::Occupied(entry) => entry.key().key, - RawEntryMut::Vacant(entry) => { + Entry::Occupied(entry) => entry.get().1, + Entry::Vacant(entry) => { let index = self.values.len(); let key = K::try_from(index).map_err(|_| polars_err!(ComputeError: "overflow"))?; - entry.insert_hashed_nocheck(hash, Hashed { hash, key }, ()); // NB: don't use .insert() here! + entry.insert((hash, key)); push(&mut self.values, value)?; debug_assert_eq!(self.values.len(), index + 1); key diff --git a/crates/polars-core/src/frame/group_by/hashing.rs b/crates/polars-core/src/frame/group_by/hashing.rs index 418471abc388..0af7b5159c8a 100644 --- a/crates/polars-core/src/frame/group_by/hashing.rs +++ b/crates/polars-core/src/frame/group_by/hashing.rs @@ -1,10 +1,9 @@ -use std::hash::{BuildHasher, Hash, Hasher}; - -use hashbrown::hash_map::RawEntryMut; +use hashbrown::hash_map::Entry; use polars_utils::hashing::{hash_to_partition, DirtyHash}; use polars_utils::idx_vec::IdxVec; +use polars_utils::itertools::Itertools; use polars_utils::sync::SyncPtr; -use polars_utils::total_ord::{ToTotalOrd, TotalHash}; +use polars_utils::total_ord::{ToTotalOrd, TotalHash, TotalOrdWrap}; use polars_utils::unitvec; use rayon::prelude::*; @@ -73,50 +72,42 @@ fn finish_group_order(mut out: Vec>, sorted: bool) -> GroupsProxy { } } -pub(crate) fn group_by(a: impl Iterator, sorted: bool) -> GroupsProxy +pub(crate) fn group_by(keys: impl Iterator, sorted: bool) -> GroupsProxy where - T: TotalHash + TotalEq, + K: TotalHash + TotalEq, { let init_size = get_init_size(); - let mut hash_tbl: PlHashMap = PlHashMap::with_capacity(init_size); - let hasher = hash_tbl.hasher().clone(); - let mut cnt = 0; - a.for_each(|k| { - let idx = cnt; - cnt += 1; - - let mut state = hasher.build_hasher(); - k.tot_hash(&mut state); - let h = state.finish(); - let entry = hash_tbl.raw_entry_mut().from_hash(h, |k_| k.tot_eq(k_)); - - match entry { - RawEntryMut::Vacant(entry) => { - let tuples = unitvec![idx]; - entry.insert_with_hasher(h, k, (idx, tuples), |k| { - let mut state = hasher.build_hasher(); - k.tot_hash(&mut state); - state.finish() - }); - }, - RawEntryMut::Occupied(mut entry) => { - let v = entry.get_mut(); - v.1.push(idx); - }, - } - }); + let (mut first, mut groups); if sorted { - let mut groups = hash_tbl - .into_iter() - .map(|(_k, v)| v) - .collect_trusted::>(); - groups.sort_unstable_by_key(|g| g.0); - let mut idx: GroupsIdx = groups.into_iter().collect(); - idx.sorted = true; - GroupsProxy::Idx(idx) + groups = Vec::with_capacity(get_init_size()); + first = Vec::with_capacity(get_init_size()); + let mut hash_tbl = PlHashMap::with_capacity(init_size); + for (idx, k) in keys.enumerate_idx() { + match hash_tbl.entry(TotalOrdWrap(k)) { + Entry::Vacant(entry) => { + let group_idx = groups.len() as IdxSize; + entry.insert(group_idx); + groups.push(unitvec![idx]); + first.push(idx); + }, + Entry::Occupied(entry) => unsafe { + groups.get_unchecked_mut(*entry.get() as usize).push(idx) + }, + } + } } else { - GroupsProxy::Idx(hash_tbl.into_values().collect()) + let mut hash_tbl = PlHashMap::with_capacity(init_size); + for (idx, k) in keys.enumerate_idx() { + match hash_tbl.entry(TotalOrdWrap(k)) { + Entry::Vacant(entry) => { + entry.insert((idx, unitvec![idx])); + }, + Entry::Occupied(mut entry) => entry.get_mut().1.push(idx), + } + } + (first, groups) = hash_tbl.into_values().unzip(); } + GroupsProxy::Idx(GroupsIdx::new(first, groups, sorted)) } // giving the slice info to the compiler is much @@ -128,8 +119,8 @@ pub(crate) fn group_by_threaded_slice( sorted: bool, ) -> GroupsProxy where - T: TotalHash + TotalEq + ToTotalOrd, - ::TotalOrdItem: Send + Hash + Eq + Sync + Copy + DirtyHash, + T: ToTotalOrd, + ::TotalOrdItem: Send + Sync + Copy + DirtyHash, IntoSlice: AsRef<[T]> + Send + Sync, { let init_size = get_init_size(); @@ -141,39 +132,28 @@ where (0..n_partitions) .into_par_iter() .map(|thread_no| { - let mut hash_tbl: PlHashMap = - PlHashMap::with_capacity(init_size); + let mut hash_tbl = PlHashMap::with_capacity(init_size); let mut offset = 0; for keys in &keys { let keys = keys.as_ref(); let len = keys.len() as IdxSize; - let hasher = hash_tbl.hasher().clone(); - let mut cnt = 0; - keys.iter().for_each(|k| { + for (key_idx, k) in keys.iter().enumerate_idx() { let k = k.to_total_ord(); - let idx = cnt + offset; - cnt += 1; + let idx = key_idx + offset; if thread_no == hash_to_partition(k.dirty_hash(), n_partitions) { - let hash = hasher.hash_one(k); - let entry = hash_tbl.raw_entry_mut().from_key_hashed_nocheck(hash, &k); - - match entry { - RawEntryMut::Vacant(entry) => { - let tuples = unitvec![idx]; - entry.insert_with_hasher(hash, k, (idx, tuples), |k| { - hasher.hash_one(*k) - }); + match hash_tbl.entry(k) { + Entry::Vacant(entry) => { + entry.insert((idx, unitvec![idx])); }, - RawEntryMut::Occupied(mut entry) => { - let v = entry.get_mut(); - v.1.push(idx); + Entry::Occupied(mut entry) => { + entry.get_mut().1.push(idx); }, } } - }); + } offset += len; } hash_tbl @@ -194,8 +174,8 @@ pub(crate) fn group_by_threaded_iter( where I: IntoIterator + Send + Sync + Clone, I::IntoIter: ExactSizeIterator, - T: TotalHash + TotalEq + DirtyHash + ToTotalOrd, - ::TotalOrdItem: Send + Hash + Eq + Sync + Copy + DirtyHash, + T: ToTotalOrd, + ::TotalOrdItem: Send + Sync + Copy + DirtyHash, { let init_size = get_init_size(); @@ -206,39 +186,29 @@ where (0..n_partitions) .into_par_iter() .map(|thread_no| { - let mut hash_tbl: PlHashMap = + let mut hash_tbl: PlHashMap = PlHashMap::with_capacity(init_size); let mut offset = 0; for keys in keys { let keys = keys.clone().into_iter(); let len = keys.len() as IdxSize; - let hasher = hash_tbl.hasher().clone(); - let mut cnt = 0; - keys.for_each(|k| { + for (key_idx, k) in keys.into_iter().enumerate_idx() { let k = k.to_total_ord(); - let idx = cnt + offset; - cnt += 1; + let idx = key_idx + offset; if thread_no == hash_to_partition(k.dirty_hash(), n_partitions) { - let hash = hasher.hash_one(k); - let entry = hash_tbl.raw_entry_mut().from_key_hashed_nocheck(hash, &k); - - match entry { - RawEntryMut::Vacant(entry) => { - let tuples = unitvec![idx]; - entry.insert_with_hasher(hash, k, (idx, tuples), |k| { - hasher.hash_one(*k) - }); + match hash_tbl.entry(k) { + Entry::Vacant(entry) => { + entry.insert(unitvec![idx]); }, - RawEntryMut::Occupied(mut entry) => { - let v = entry.get_mut(); - v.1.push(idx); + Entry::Occupied(mut entry) => { + entry.get_mut().push(idx); }, } } - }); + } offset += len; } // iterating the hash tables locally @@ -252,7 +222,7 @@ where // indirection hash_tbl .into_iter() - .map(|(_k, v)| v) + .map(|(_k, v)| (unsafe { *v.first().unwrap_unchecked() }, v)) .collect_trusted::>() }) .collect::>() diff --git a/crates/polars-core/src/frame/group_by/proxy.rs b/crates/polars-core/src/frame/group_by/proxy.rs index d9aedc261faf..d1c04162b7b9 100644 --- a/crates/polars-core/src/frame/group_by/proxy.rs +++ b/crates/polars-core/src/frame/group_by/proxy.rs @@ -3,7 +3,6 @@ use std::ops::Deref; use arrow::offset::OffsetsBuffer; use polars_utils::idx_vec::IdxVec; -use polars_utils::sync::SyncPtr; use rayon::iter::plumbing::UnindexedConsumer; use rayon::prelude::*; @@ -46,61 +45,6 @@ impl From> for GroupsIdx { } } -impl From, Vec)>> for GroupsIdx { - fn from(v: Vec<(Vec, Vec)>) -> Self { - // we have got the hash tables so we can determine the final - let cap = v.iter().map(|v| v.0.len()).sum::(); - let offsets = v - .iter() - .scan(0_usize, |acc, v| { - let out = *acc; - *acc += v.0.len(); - Some(out) - }) - .collect::>(); - let mut global_first = Vec::with_capacity(cap); - let global_first_ptr = unsafe { SyncPtr::new(global_first.as_mut_ptr()) }; - let mut global_all = Vec::with_capacity(cap); - let global_all_ptr = unsafe { SyncPtr::new(global_all.as_mut_ptr()) }; - - POOL.install(|| { - v.into_par_iter().zip(offsets).for_each( - |((local_first_vals, mut local_all_vals), offset)| unsafe { - let global_first: *mut IdxSize = global_first_ptr.get(); - let global_all: *mut IdxVec = global_all_ptr.get(); - let global_first = global_first.add(offset); - let global_all = global_all.add(offset); - - std::ptr::copy_nonoverlapping( - local_first_vals.as_ptr(), - global_first, - local_first_vals.len(), - ); - std::ptr::copy_nonoverlapping( - local_all_vals.as_ptr(), - global_all, - local_all_vals.len(), - ); - // local_all_vals: Vec> - // we just copied the contents: Vec to a new buffer - // now, we want to free the outer vec, without freeing - // the inner vecs as they are moved, so we set the len to 0 - local_all_vals.set_len(0); - }, - ); - }); - unsafe { - global_all.set_len(cap); - global_first.set_len(cap); - } - GroupsIdx { - sorted: false, - first: global_first, - all: global_all, - } - } -} - impl From>> for GroupsIdx { fn from(v: Vec>) -> Self { // single threaded flatten: 10% faster than `iter().flatten().collect() @@ -149,6 +93,9 @@ impl GroupsIdx { } pub fn sort(&mut self) { + if self.sorted { + return; + } let mut idx = 0; let first = std::mem::take(&mut self.first); // store index and values so that we can sort those @@ -542,7 +489,7 @@ impl GroupsProxy { } } - pub fn slice(&self, offset: i64, len: usize) -> SlicedGroups { + pub fn slice(&self, offset: i64, len: usize) -> SlicedGroups<'_> { // SAFETY: // we create new `Vec`s from the sliced groups. But we wrap them in ManuallyDrop // so that we never call drop on them. diff --git a/crates/polars-ops/src/chunked_array/list/sets.rs b/crates/polars-ops/src/chunked_array/list/sets.rs index a26d76b5a768..e06234f4dfcc 100644 --- a/crates/polars-ops/src/chunked_array/list/sets.rs +++ b/crates/polars-ops/src/chunked_array/list/sets.rs @@ -102,7 +102,7 @@ where } } -fn copied_wrapper_opt( +fn copied_wrapper_opt( v: Option<&T>, ) -> as ToTotalOrd>::TotalOrdItem { v.copied().to_total_ord() diff --git a/crates/polars-utils/src/total_ord.rs b/crates/polars-utils/src/total_ord.rs index 80b21bec507a..cfaa05f0141d 100644 --- a/crates/polars-utils/src/total_ord.rs +++ b/crates/polars-utils/src/total_ord.rs @@ -472,7 +472,7 @@ impl<'a> TotalEq for BytesHash<'a> { /// This elides creating a [`TotalOrdWrap`] for types that don't need it. pub trait ToTotalOrd { - type TotalOrdItem; + type TotalOrdItem: Hash + Eq; type SourceItem; fn to_total_ord(&self) -> Self::TotalOrdItem; @@ -564,7 +564,7 @@ impl_to_total_ord_wrapped!(f64); /// `TotalOrdWrap>` implements `Eq + Hash`, iff: /// `Option` implements `TotalEq + TotalHash`, iff: /// `T` implements `TotalEq + TotalHash` -impl ToTotalOrd for Option { +impl ToTotalOrd for Option { type TotalOrdItem = TotalOrdWrap>; type SourceItem = Option;