diff --git a/Cargo.toml b/Cargo.toml index c883c99..dfe9460 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,10 +18,12 @@ criterion = "0.4.0" compiletest_rs = "0.10.0" [features] -default = ["alloc"] +default = ["alloc", "batched_extend"] # disable the alloc based ringbuffer, to make RingBuffers work in no_alloc environments alloc = [] +batched_extend = [] + [[bench]] name = "bench" harness = false diff --git a/benches/bench.rs b/benches/bench.rs index 6793a40..3289d0e 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -1,5 +1,6 @@ -#![no_coverage] -use criterion::{black_box, criterion_group, criterion_main, Bencher, Criterion}; +#![cfg(not(tarpaulin_include))] + +use criterion::{black_box, criterion_group, criterion_main, BatchSize, Bencher, Criterion}; use ringbuffer::{AllocRingBuffer, ConstGenericRingBuffer, RingBuffer}; fn benchmark_push, F: Fn() -> T>(b: &mut Bencher, new: F) { @@ -180,6 +181,68 @@ fn criterion_benchmark(c: &mut Criterion) { 8192, 8195 ]; + + c.bench_function("extend too many", extend_too_many); + c.bench_function("extend many too many", extend_many_too_many); + c.bench_function("extend exact cap", extend_exact_cap); + c.bench_function("extend too few", extend_too_few); + c.bench_function("extend after one", extend_after_one); +} + +fn extend_many_too_many(b: &mut Bencher) { + let rb = ConstGenericRingBuffer::new::<8192>(); + let input = (0..16384).collect::>(); + + b.iter_batched( + &|| rb.clone(), + |mut r| black_box(r.extend(black_box(input.as_slice()))), + BatchSize::SmallInput, + ); +} + +fn extend_too_many(b: &mut Bencher) { + let rb = ConstGenericRingBuffer::new::<8192>(); + let input = (0..10000).collect::>(); + + b.iter_batched( + &|| rb.clone(), + |mut r| black_box(r.extend(black_box(input.as_slice()))), + BatchSize::SmallInput, + ); +} + +fn extend_exact_cap(b: &mut Bencher) { + let rb = ConstGenericRingBuffer::new::<8192>(); + let input = (0..8192).collect::>(); + + b.iter_batched( + &|| rb.clone(), + |mut r| black_box(r.extend(black_box(input.as_slice()))), + BatchSize::SmallInput, + ); +} + +fn extend_too_few(b: &mut Bencher) { + let rb = ConstGenericRingBuffer::new::<8192>(); + let input = (0..4096).collect::>(); + + b.iter_batched( + &|| rb.clone(), + |mut r| black_box(r.extend(black_box(input.as_slice()))), + BatchSize::LargeInput, + ); +} + +fn extend_after_one(b: &mut Bencher) { + let mut rb = ConstGenericRingBuffer::new::<8192>(); + rb.push(&0); + let input = (0..4096).collect::>(); + + b.iter_batched( + &|| rb.clone(), + |mut r| black_box(r.extend(black_box(input.as_slice()))), + BatchSize::LargeInput, + ); } criterion_group!(benches, criterion_benchmark); diff --git a/src/ringbuffer_trait.rs b/src/ringbuffer_trait.rs index ae92f09..854be52 100644 --- a/src/ringbuffer_trait.rs +++ b/src/ringbuffer_trait.rs @@ -2,6 +2,7 @@ use core::ops::{Index, IndexMut}; #[cfg(feature = "alloc")] extern crate alloc; + #[cfg(feature = "alloc")] use alloc::vec::Vec; @@ -83,6 +84,41 @@ pub unsafe trait RingBuffer: self.push(value); } + /// alias for [`extend`](RingBuffer::extend). + #[inline] + fn enqueue_many>(&mut self, items: I) { + self.extend(items); + } + + /// Clones and appends all elements in a slice to the `Vec`. + /// + /// Iterates over the slice `other`, clones each element, and then appends + /// it to this `RingBuffer`. The `other` slice is traversed in-order. + /// + /// Depending on the `RingBuffer` implementation, may be faster than inserting items in a loop. + /// `ConstGenericRingBuffer` is especially optimised in this regard. + /// See also: [`ConstGenericRingBuffer::custom_extend_batched`](crate::with_const_generics::ConstGenericRingBuffer::custom_extend_batched) + /// + /// # Examples + /// + /// ``` + /// use ringbuffer::{ConstGenericRingBuffer, RingBuffer}; + /// + /// let mut rb = ConstGenericRingBuffer::<_, 6>::new(); + /// rb.push(1); + /// + /// rb.extend_from_slice(&[2, 3, 4]); + /// assert_eq!(rb.to_vec(), vec![1, 2, 3, 4]); + /// ``` + /// + /// [`extend`]: RingBuffer::extend + fn extend_from_slice(&mut self, other: &[T]) + where + T: Clone, + { + self.extend(other.iter().cloned()); + } + /// dequeues the top item off the ringbuffer, and moves this item out. fn dequeue(&mut self) -> Option; @@ -119,6 +155,24 @@ pub unsafe trait RingBuffer: RingBufferDrainingIterator::new(self) } + /// Moves all the elements of `other` into `self`, leaving `other` empty. + /// + /// # Examples + /// + /// ``` + /// use ringbuffer::{ConstGenericRingBuffer, RingBuffer}; + /// + /// let mut vec = ConstGenericRingBuffer::<_, 6>::from(vec![1, 2, 3]); + /// let mut vec2 = ConstGenericRingBuffer::<_, 6>::from(vec![4, 5, 6]); + /// + /// vec.append(&mut vec2); + /// assert_eq!(vec.to_vec(), &[1, 2, 3, 4, 5, 6]); + /// assert_eq!(vec2.to_vec(), &[]); + /// ``` + fn append(&mut self, other: &mut Self) { + self.extend(other.drain()); + } + /// Sets every element in the ringbuffer to the value returned by f. fn fill_with T>(&mut self, f: F); diff --git a/src/with_const_generics.rs b/src/with_const_generics.rs index e4b6570..b65860a 100644 --- a/src/with_const_generics.rs +++ b/src/with_const_generics.rs @@ -241,16 +241,283 @@ impl<'a, T, const CAP: usize> IntoIterator for &'a mut ConstGenericRingBuffer Extend for ConstGenericRingBuffer { - fn extend>(&mut self, iter: A) { - let iter = iter.into_iter(); +impl ConstGenericRingBuffer { + /// splits the ringbuffer into two slices. One from the old pointer to the end of the buffer, + /// and one from the start of the buffer to the new pointer + /// + /// # Safety + /// Only safe when old != new + #[inline] + #[cfg(feature = "batched_extend")] + unsafe fn split_pointer_move( + &mut self, + old: usize, + new: usize, + ) -> (&mut [MaybeUninit], &mut [MaybeUninit]) { + let old_mod = crate::mask_modulo(CAP, old); + let new_mod = crate::mask_modulo(CAP, new); + + if old_mod < new_mod { + // if there's no wrapping, nice! we can just return one slice + (&mut self.buf[old_mod..new_mod], &mut []) + } else { + // the first part is from old_mod to CAP + let (start, p1) = self.buf.split_at_mut(old_mod); - for i in iter { - self.push(i); + // and the second part from 0 to new_mod + let (p2, _) = start.split_at_mut(new_mod); + + (p1, p2) + } + } + + /// # Safety + /// Only safe when `CAP` >= `BATCH_SIZE` + #[inline] + #[cfg(feature = "batched_extend")] + unsafe fn extend_from_arr_batch(&mut self, data: [T; BATCH_SIZE]) { + debug_assert!(CAP >= BATCH_SIZE); + + // algorithm to push 1 item: + // + // if self.is_full() { + // let previous_value = mem::replace( + // &mut self.buf[crate::mask_modulo(CAP, self.readptr)], + // MaybeUninit::uninit(), + // ); + // // make sure we drop whatever is being overwritten + // // SAFETY: the buffer is full, so this must be initialized + // // : also, index has been masked + // // make sure we drop because it won't happen automatically + // unsafe { + // drop(previous_value.assume_init()); + // } + // self.readptr += 1; + // } + // let index = crate::mask_modulo(CAP, self.writeptr); + // self.buf[index] = MaybeUninit::new(value); + // self.writeptr += 1; + + let old_len = self.len(); + + let old_writeptr = self.writeptr; + let old_readptr = self.readptr; + + // so essentially, we need to update the write pointer by Self::BATCH_SIZE + self.writeptr += BATCH_SIZE; + + // but maybe we also need to update the readptr + // first we calculate if we will be full. if not, no need to update the readptr + let num_items_until_full = self.capacity() - old_len; + if num_items_until_full < BATCH_SIZE { + // the difference is how much the read ptr needs to move + self.readptr += BATCH_SIZE - num_items_until_full; + + debug_assert_ne!(old_readptr, self.readptr); + + // if readptr moves, we also need to free some items. + // Safety: same safety guarantees as this function and old != new by the assertion above + let (p1, p2) = unsafe { self.split_pointer_move(old_readptr, self.readptr) }; + // assertion: we can never be in a situation where we have to drop more than a batch size of items + debug_assert!(p1.len() + p2.len() <= BATCH_SIZE); + + for i in p1 { + i.assume_init_drop(); + } + for i in p2 { + i.assume_init_drop(); + } + } + + debug_assert_ne!(old_writeptr, self.writeptr); + // now we need to write some items between old_writeptr and self.writeptr + // Safety: same safety guarantees as this function and old != new by the assertion above + let (p1, p2) = unsafe { self.split_pointer_move(old_writeptr, self.writeptr) }; + // assertion: we can never be in a situation where we have to write more than a batch size of items + debug_assert!( + p1.len() + p2.len() <= BATCH_SIZE, + "p1: {}; p2: {}; batch: {}", + p1.len(), + p2.len(), + BATCH_SIZE + ); + + // if we are lucky, we're not on the boundary so either p1 or p2 has a length of Self::BATCH_SIZE + if p1.len() == BATCH_SIZE { + for (index, i) in data.into_iter().enumerate() { + p1[index] = MaybeUninit::new(i); + } + } else if p2.len() == BATCH_SIZE { + for (index, i) in data.into_iter().enumerate() { + p2[index] = MaybeUninit::new(i); + } + } else { + // oof, unfortunately we're on a boundary + + // iterate over the data + let mut data_iter = data.into_iter(); + + // put p1.len() in p1 + for i in p1 { + let next_item = data_iter.next(); + // Safety: p1.len() + p2.len() <= Self::BATCH_SIZE so the two loops here + // together cannot run for more than Self::BATCH_SIZE iterations + *i = MaybeUninit::new(unsafe { next_item.unwrap_unchecked() }); + } + + // put p2.len() in p2 + for i in p2 { + let next_item = data_iter.next(); + // Safety: p1.len() + p2.len() <= Self::BATCH_SIZE so the two loops here + // together cannot run for more than Self::BATCH_SIZE iterations + *i = MaybeUninit::new(unsafe { next_item.unwrap_unchecked() }); + } + } + } + + #[inline] + #[cfg(feature = "batched_extend")] + fn fill_batch( + batch: &mut [MaybeUninit; BATCH_SIZE], + iter: &mut impl Iterator, + ) -> usize { + for (index, b) in batch.iter_mut().enumerate() { + if let Some(i) = iter.next() { + *b = MaybeUninit::new(i); + } else { + return index; + } + } + + BATCH_SIZE + } + + #[inline] + #[cfg(feature = "batched_extend")] + fn extend_batched_internal( + &mut self, + mut other: impl Iterator, + ) { + // SAFETY: if CAP < Self::BATCH_SIZE we can't run extend_from_arr_batch so we catch that here + if CAP < BATCH_SIZE { + for i in other { + self.push(i); + } + } else { + // Safety: assume init to MaybeUninit slice is safe + let mut batch: [MaybeUninit; BATCH_SIZE] = + unsafe { MaybeUninit::uninit().assume_init() }; + + // repeat until we find an empty batch + loop { + // fill up a batch + let how_full = Self::fill_batch(&mut batch, &mut other); + + // if the batch isn't complete, individually add the items from that batch + if how_full < BATCH_SIZE { + for b in batch.iter().take(how_full) { + // Safety: fill_batch filled up at least `how_full` items so if we iterate + // until there this is safe + self.push(unsafe { b.assume_init_read() }); + } + + // then we're done! + return; + } + + // else the batch is full, and we can transmute it to an init slice + let batch = unsafe { + mem::transmute_copy::<[MaybeUninit; BATCH_SIZE], [T; BATCH_SIZE]>(&batch) + }; + + // SAFETY: if CAP < Self::BATCH_SIZE we woudn't be here + unsafe { self.extend_from_arr_batch(batch) } + } + } + } + + /// # Safety + /// ONLY USE WHEN WORKING ON A CLEARED RINGBUFFER + #[cfg(feature = "batched_extend")] + #[inline] + unsafe fn finish_iter(&mut self, mut iter: impl Iterator) { + let mut index = 0; + for i in iter.by_ref() { + self.buf[index] = MaybeUninit::new(i); + index += 1; + + if index > CAP - 1 { + break; + } + } + + if index < CAP { + // we set writepointer to however many elements we managed to write (up to CAP-1) + // WARNING: ONLY WORKS WHEN WORKING ON A CLEARED RINGBUFFER + self.writeptr = index; + } else { + self.writeptr = CAP; + self.extend_batched_internal::(iter); + } + } + + /// Alias of [`Extend::extend`](ConstGenericRingBuffer::extend) but can take a custom batch size. + /// + /// We found that `30` works well for us, which is the batch size we use in `extend`, + /// but on different architectures this may not be true. + pub fn custom_extend_batched( + &mut self, + iter: impl IntoIterator, + ) { + #[cfg(not(feature = "batched_extend"))] + { + for i in iter { + self.push(i); + } + } + + #[cfg(feature = "batched_extend")] + { + let iter = iter.into_iter(); + + let (lower, _) = iter.size_hint(); + + if lower >= CAP { + // if there are more elements in our iterator than we have size in the ringbuffer + // drain the ringbuffer + self.clear(); + + // we need exactly CAP elements. + // so we need to drop until the number of elements in the iterator is exactly CAP + let num_we_can_drop = lower - CAP; + + let iter = iter.skip(num_we_can_drop); + + // Safety: clear above + unsafe { self.finish_iter::(iter) }; + } else if self.is_empty() { + self.clear(); + + // Safety: clear above + unsafe { self.finish_iter::(iter) }; + } else { + self.extend_batched_internal::(iter); + } } } } +impl Extend for ConstGenericRingBuffer { + /// NOTE: correctness (but not soundness) of extend depends on `size_hint` on iter being correct. + #[inline] + fn extend>(&mut self, iter: A) { + /// good number, found through benchmarking. + /// gives ~30% performance boost over not batching + const BATCH_SIZE: usize = 30; + self.custom_extend_batched::(iter); + } +} + unsafe impl RingBuffer for ConstGenericRingBuffer { #[inline] unsafe fn ptr_capacity(_: *const Self) -> usize { @@ -355,6 +622,8 @@ impl IndexMut for ConstGenericRingBuffer { #[cfg(test)] mod tests { use super::*; + use core::hint::black_box; + use core::ops::Range; #[test] fn test_not_power_of_two() { @@ -418,6 +687,125 @@ mod tests { } } + struct Weirderator(::IntoIter, SizeHint); + + impl Iterator for Weirderator { + type Item = ::Item; + + fn next(&mut self) -> Option { + self.0.next() + } + + fn size_hint(&self) -> (usize, Option) { + let (lower, upper) = self.0.size_hint(); + + match self.1 { + SizeHint::TooHigh => (lower + 10, upper), + SizeHint::TooLow => (lower - 10, upper), + SizeHint::Good => (lower, upper), + } + } + } + + #[derive(Debug, Copy, Clone)] + pub enum SizeHint { + TooHigh, + TooLow, + Good, + } + + struct IntoWeirderator(pub T, SizeHint); + + impl IntoIterator for IntoWeirderator + where + ::IntoIter: Sized, + { + type Item = ::Item; + type IntoIter = Weirderator; + + fn into_iter(self) -> Self::IntoIter { + Weirderator(self.0.into_iter(), self.1) + } + } + + #[test] + // tests whether we correctly drop items when the batch crosses the boundary + fn boundary_drop_extend() { + for n in 50..300 { + let mut a = ConstGenericRingBuffer::<_, 128>::new(); + + for i in 0..n { + a.push(i); + } + + a.extend(0..n); + + for _ in 0..128 { + let _ = black_box(a.dequeue()); + } + } + } + + #[test] + fn test_verify_extend() { + extern crate std; + + macro_rules! for_cap { + ($cap: expr) => {{ + const CAP: usize = $cap; + + for start in 0..5 { + for size in [SizeHint::TooLow, SizeHint::Good, SizeHint::TooHigh] { + std::println!("{start} {size:?}"); + + let mut rb = ConstGenericRingBuffer::::new(); + for i in 0..start { + rb.push(i); + } + + rb.extend(Weirderator::>(0..CAP, size)); + rb.push(17); + rb.push(18); + rb.push(19); + + for _ in 0..CAP { + let _ = rb.dequeue(); + } + + let mut rb = ConstGenericRingBuffer::::new(); + for i in 0..start { + rb.push(i); + } + + rb.extend(Weirderator::>(0..(CAP + 1), size)); + rb.push(18); + rb.push(19); + + for _ in 0..CAP { + let _ = rb.dequeue(); + } + + let mut rb = ConstGenericRingBuffer::::new(); + for i in 0..start { + rb.push(i); + } + + rb.extend(Weirderator::>(0..(CAP + 2), size)); + rb.push(19); + + for _ in 0..CAP { + let _ = rb.dequeue(); + } + } + } + };}; + } + + for_cap!(17); + for_cap!(70); + for_cap!(128); + } + #[cfg(test)] mod tests { use crate::{AllocRingBuffer, ConstGenericRingBuffer, GrowableAllocRingBuffer, RingBuffer};