Skip to content

Commit

Permalink
Merge branch 'new_storage'
Browse files Browse the repository at this point in the history
  • Loading branch information
agerasev committed Jan 20, 2024
2 parents 98599b9 + c6250bd commit 2345b39
Show file tree
Hide file tree
Showing 35 changed files with 398 additions and 254 deletions.
6 changes: 3 additions & 3 deletions async/src/alias.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use crate::{
};
#[cfg(feature = "alloc")]
use alloc::sync::Arc;
use ringbuf::{storage::Array, SharedRb};
#[cfg(feature = "alloc")]
use ringbuf::{storage::Heap, HeapRb};
use ringbuf::{storage::Static, SharedRb};

#[cfg(feature = "alloc")]
pub type AsyncHeapRb<T> = AsyncRb<Heap<T>>;
Expand All @@ -22,11 +22,11 @@ impl<T> AsyncHeapRb<T> {
}
}

pub type AsyncStaticRb<T, const N: usize> = AsyncRb<Static<T, N>>;
pub type AsyncStaticRb<T, const N: usize> = AsyncRb<Array<T, N>>;
pub type AsyncStaticProd<'a, T, const N: usize> = AsyncProd<&'a AsyncStaticRb<T, N>>;
pub type AsyncStaticCons<'a, T, const N: usize> = AsyncCons<&'a AsyncStaticRb<T, N>>;

impl<T, const N: usize> Default for AsyncRb<Static<T, N>> {
impl<T, const N: usize> Default for AsyncRb<Array<T, N>> {
fn default() -> Self {
AsyncRb::from(SharedRb::default())
}
Expand Down
5 changes: 4 additions & 1 deletion async/src/rb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,12 @@ impl<S: Storage> Observer for AsyncRb<S> {
self.base.write_index()
}

unsafe fn unsafe_slices(&self, start: usize, end: usize) -> (&mut [MaybeUninit<S::Item>], &mut [MaybeUninit<S::Item>]) {
unsafe fn unsafe_slices(&self, start: usize, end: usize) -> (&[MaybeUninit<S::Item>], &[MaybeUninit<S::Item>]) {
self.base.unsafe_slices(start, end)
}
unsafe fn unsafe_slices_mut(&self, start: usize, end: usize) -> (&mut [MaybeUninit<S::Item>], &mut [MaybeUninit<S::Item>]) {
self.base.unsafe_slices_mut(start, end)
}

#[inline]
fn read_is_held(&self) -> bool {
Expand Down
6 changes: 3 additions & 3 deletions async/src/traits/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub trait AsyncConsumer: Consumer {
}
}

pub struct PopFuture<'a, A: AsyncConsumer> {
pub struct PopFuture<'a, A: AsyncConsumer + ?Sized> {
owner: &'a mut A,
done: bool,
}
Expand Down Expand Up @@ -130,7 +130,7 @@ impl<'a, A: AsyncConsumer> Future for PopFuture<'a, A> {
}
}

pub struct PopSliceFuture<'a, 'b, A: AsyncConsumer>
pub struct PopSliceFuture<'a, 'b, A: AsyncConsumer + ?Sized>
where
A::Item: Copy,
{
Expand Down Expand Up @@ -177,7 +177,7 @@ where
}
}

pub struct WaitOccupiedFuture<'a, A: AsyncConsumer> {
pub struct WaitOccupiedFuture<'a, A: AsyncConsumer + ?Sized> {
owner: &'a A,
count: usize,
done: bool,
Expand Down
8 changes: 4 additions & 4 deletions async/src/traits/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub trait AsyncProducer: Producer {
}
}

pub struct PushFuture<'a, A: AsyncProducer> {
pub struct PushFuture<'a, A: AsyncProducer + ?Sized> {
owner: &'a mut A,
item: Option<A::Item>,
}
Expand Down Expand Up @@ -144,7 +144,7 @@ impl<'a, A: AsyncProducer> Future for PushFuture<'a, A> {
}
}

pub struct PushSliceFuture<'a, 'b, A: AsyncProducer>
pub struct PushSliceFuture<'a, 'b, A: AsyncProducer + ?Sized>
where
A::Item: Copy,
{
Expand Down Expand Up @@ -190,7 +190,7 @@ where
}
}

pub struct PushIterFuture<'a, A: AsyncProducer, I: Iterator<Item = A::Item>> {
pub struct PushIterFuture<'a, A: AsyncProducer + ?Sized, I: Iterator<Item = A::Item>> {
owner: &'a mut A,
iter: Option<Peekable<I>>,
}
Expand Down Expand Up @@ -224,7 +224,7 @@ impl<'a, A: AsyncProducer, I: Iterator<Item = A::Item>> Future for PushIterFutur
}
}

pub struct WaitVacantFuture<'a, A: AsyncProducer> {
pub struct WaitVacantFuture<'a, A: AsyncProducer + ?Sized> {
owner: &'a A,
count: usize,
done: bool,
Expand Down
8 changes: 4 additions & 4 deletions blocking/src/alias.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#[cfg(feature = "std")]
use crate::sync::StdSemaphore;
use crate::{rb::BlockingRb, sync::Semaphore};
use ringbuf::{storage::Array, SharedRb};
#[cfg(feature = "alloc")]
use ringbuf::{storage::Heap, HeapRb};
use ringbuf::{storage::Static, SharedRb};

#[cfg(feature = "std")]
pub type BlockingHeapRb<T, X = StdSemaphore> = BlockingRb<Heap<T>, X>;
Expand All @@ -18,11 +18,11 @@ impl<T, X: Semaphore> BlockingHeapRb<T, X> {
}

#[cfg(feature = "std")]
pub type BlockingStaticRb<T, const N: usize, X = StdSemaphore> = BlockingRb<Static<T, N>, X>;
pub type BlockingStaticRb<T, const N: usize, X = StdSemaphore> = BlockingRb<Array<T, N>, X>;
#[cfg(all(feature = "alloc", not(feature = "std")))]
pub type BlockingStaticRb<T, const N: usize, X> = BlockingRb<Static<T, N>, X>;
pub type BlockingStaticRb<T, const N: usize, X> = BlockingRb<Array<T, N>, X>;

impl<T, const N: usize, X: Semaphore> Default for BlockingRb<Static<T, N>, X> {
impl<T, const N: usize, X: Semaphore> Default for BlockingRb<Array<T, N>, X> {
fn default() -> Self {
BlockingRb::from(SharedRb::default())
}
Expand Down
5 changes: 4 additions & 1 deletion blocking/src/rb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ impl<S: Storage, X: Semaphore> Observer for BlockingRb<S, X> {
self.base.write_index()
}

unsafe fn unsafe_slices(&self, start: usize, end: usize) -> (&mut [MaybeUninit<S::Item>], &mut [MaybeUninit<S::Item>]) {
unsafe fn unsafe_slices(&self, start: usize, end: usize) -> (&[MaybeUninit<S::Item>], &[MaybeUninit<S::Item>]) {
self.base.unsafe_slices(start, end)
}
unsafe fn unsafe_slices_mut(&self, start: usize, end: usize) -> (&mut [MaybeUninit<S::Item>], &mut [MaybeUninit<S::Item>]) {
self.base.unsafe_slices_mut(start, end)
}

#[inline]
fn read_is_held(&self) -> bool {
Expand Down
4 changes: 2 additions & 2 deletions src/alias.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use super::storage::Heap;
use super::{
rb::SharedRb,
storage::Static,
storage::Array,
wrap::{CachingCons, CachingProd},
};
#[cfg(feature = "alloc")]
Expand All @@ -11,7 +11,7 @@ use alloc::sync::Arc;
/// Stack-allocated ring buffer with static capacity.
///
/// *Capacity (`N`) must be greater than zero.*
pub type StaticRb<T, const N: usize> = SharedRb<Static<T, N>>;
pub type StaticRb<T, const N: usize> = SharedRb<Array<T, N>>;

/// Alias for [`StaticRb`] producer.
pub type StaticProd<'a, T, const N: usize> = CachingProd<&'a StaticRb<T, N>>;
Expand Down
8 changes: 4 additions & 4 deletions src/benchmarks/base.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::{storage::Static, traits::*, LocalRb, SharedRb};
use crate::{storage::Array, traits::*, LocalRb, SharedRb};
use test::{black_box, Bencher};

const RB_SIZE: usize = 256;
const BATCH_SIZE: usize = 100;

#[bench]
fn push_pop_shared(b: &mut Bencher) {
let buf = SharedRb::<Static<u64, RB_SIZE>>::default();
let buf = SharedRb::<Array<u64, RB_SIZE>>::default();
let (mut prod, mut cons) = buf.split();
prod.push_slice(&[1; RB_SIZE / 2]);
b.iter(|| {
Expand All @@ -17,7 +17,7 @@ fn push_pop_shared(b: &mut Bencher) {

#[bench]
fn push_pop_local(b: &mut Bencher) {
let buf = LocalRb::<Static<u64, RB_SIZE>>::default();
let buf = LocalRb::<Array<u64, RB_SIZE>>::default();
let (mut prod, mut cons) = buf.split();
prod.push_slice(&[1; RB_SIZE / 2]);
b.iter(|| {
Expand All @@ -28,7 +28,7 @@ fn push_pop_local(b: &mut Bencher) {

#[bench]
fn push_pop_x100(b: &mut Bencher) {
let buf = SharedRb::<Static<u64, RB_SIZE>>::default();
let buf = SharedRb::<Array<u64, RB_SIZE>>::default();
let (mut prod, mut cons) = buf.split();
prod.push_slice(&[1; RB_SIZE / 2]);
b.iter(|| {
Expand Down
8 changes: 4 additions & 4 deletions src/benchmarks/parts.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::{storage::Static, traits::*, SharedRb};
use crate::{storage::Array, traits::*, SharedRb};
use test::{black_box, Bencher};

const RB_SIZE: usize = 256;

#[bench]
fn advance(b: &mut Bencher) {
let buf = SharedRb::<Static<u64, RB_SIZE>>::default();
let buf = SharedRb::<Array<u64, RB_SIZE>>::default();
let (mut prod, cons) = buf.split();
prod.push_slice(&[1; RB_SIZE / 2]);
b.iter(|| {
Expand All @@ -16,7 +16,7 @@ fn advance(b: &mut Bencher) {

#[bench]
fn get_occupied_slices(b: &mut Bencher) {
let buf = SharedRb::<Static<u64, RB_SIZE>>::default();
let buf = SharedRb::<Array<u64, RB_SIZE>>::default();
let (mut prod, mut cons) = buf.split();
prod.push_slice(&[0; 3 * RB_SIZE / 4]);
cons.skip(RB_SIZE);
Expand All @@ -29,7 +29,7 @@ fn get_occupied_slices(b: &mut Bencher) {

#[bench]
fn get_vacant_slices(b: &mut Bencher) {
let buf = SharedRb::<Static<u64, RB_SIZE>>::default();
let buf = SharedRb::<Array<u64, RB_SIZE>>::default();
let (mut prod, mut cons) = buf.split();
prod.push_slice(&[0; 1 * RB_SIZE / 4]);
cons.skip(RB_SIZE);
Expand Down
58 changes: 40 additions & 18 deletions src/rb/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::{macros::rb_impl_init, utils::ranges};
#[cfg(feature = "alloc")]
use crate::traits::Split;
use crate::{
storage::{Shared, Static, Storage},
storage::Storage,
traits::{
consumer::{impl_consumer_traits, Consumer},
producer::{impl_producer_traits, Producer},
Expand All @@ -11,7 +11,7 @@ use crate::{
wrap::{Cons, Prod},
};
#[cfg(feature = "alloc")]
use alloc::rc::Rc;
use alloc::{boxed::Box, rc::Rc};
use core::{
cell::Cell,
mem::{ManuallyDrop, MaybeUninit},
Expand All @@ -36,10 +36,10 @@ impl End {
/// Ring buffer for single-threaded use only.
///
/// Slightly faster than multi-threaded version because it doesn't synchronize cache.
pub struct LocalRb<S: Storage> {
storage: Shared<S>,
pub struct LocalRb<S: Storage + ?Sized> {
read: End,
write: End,
storage: S,
}

impl<S: Storage> LocalRb<S> {
Expand All @@ -50,8 +50,9 @@ impl<S: Storage> LocalRb<S> {
/// The items in storage inside `read..write` range must be initialized, items outside this range must be uninitialized.
/// `read` and `write` positions must be valid (see implementation details).
pub unsafe fn from_raw_parts(storage: S, read: usize, write: usize) -> Self {
assert!(!storage.is_empty());
Self {
storage: Shared::new(storage),
storage,
read: End::new(read),
write: End::new(write),
}
Expand All @@ -63,16 +64,16 @@ impl<S: Storage> LocalRb<S> {
/// Initialized contents of the storage must be properly dropped.
pub unsafe fn into_raw_parts(self) -> (S, usize, usize) {
let this = ManuallyDrop::new(self);
(ptr::read(&this.storage).into_inner(), this.read_index(), this.write_index())
(ptr::read(&this.storage), this.read_index(), this.write_index())
}
}

impl<S: Storage> Observer for LocalRb<S> {
impl<S: Storage + ?Sized> Observer for LocalRb<S> {
type Item = S::Item;

#[inline]
fn capacity(&self) -> NonZeroUsize {
self.storage.len()
unsafe { NonZeroUsize::new_unchecked(self.storage.len()) }
}

#[inline]
Expand All @@ -84,10 +85,14 @@ impl<S: Storage> Observer for LocalRb<S> {
self.write.index.get()
}

unsafe fn unsafe_slices(&self, start: usize, end: usize) -> (&mut [MaybeUninit<S::Item>], &mut [MaybeUninit<S::Item>]) {
unsafe fn unsafe_slices(&self, start: usize, end: usize) -> (&[MaybeUninit<S::Item>], &[MaybeUninit<S::Item>]) {
let (first, second) = ranges(self.capacity(), start, end);
(self.storage.slice(first), self.storage.slice(second))
}
unsafe fn unsafe_slices_mut(&self, start: usize, end: usize) -> (&mut [MaybeUninit<S::Item>], &mut [MaybeUninit<S::Item>]) {
let (first, second) = ranges(self.capacity(), start, end);
(self.storage.slice_mut(first), self.storage.slice_mut(second))
}

#[inline]
fn read_is_held(&self) -> bool {
Expand All @@ -99,21 +104,21 @@ impl<S: Storage> Observer for LocalRb<S> {
}
}

impl<S: Storage> Producer for LocalRb<S> {
impl<S: Storage + ?Sized> Producer for LocalRb<S> {
#[inline]
unsafe fn set_write_index(&self, value: usize) {
self.write.index.set(value);
}
}

impl<S: Storage> Consumer for LocalRb<S> {
impl<S: Storage + ?Sized> Consumer for LocalRb<S> {
#[inline]
unsafe fn set_read_index(&self, value: usize) {
self.read.index.set(value);
}
}

impl<S: Storage> RingBuffer for LocalRb<S> {
impl<S: Storage + ?Sized> RingBuffer for LocalRb<S> {
#[inline]
unsafe fn hold_read(&self, flag: bool) -> bool {
self.read.held.replace(flag)
Expand All @@ -124,7 +129,7 @@ impl<S: Storage> RingBuffer for LocalRb<S> {
}
}

impl<S: Storage> Drop for LocalRb<S> {
impl<S: Storage + ?Sized> Drop for LocalRb<S> {
fn drop(&mut self) {
self.clear();
}
Expand All @@ -136,11 +141,28 @@ impl<S: Storage> Split for LocalRb<S> {
type Cons = Cons<Rc<Self>>;

fn split(self) -> (Self::Prod, Self::Cons) {
let rc = Rc::new(self);
(Prod::new(rc.clone()), Cons::new(rc))
Rc::new(self).split()
}
}
#[cfg(feature = "alloc")]
impl<S: Storage + ?Sized> Split for Rc<LocalRb<S>> {
type Prod = Prod<Self>;
type Cons = Cons<Self>;

fn split(self) -> (Self::Prod, Self::Cons) {
(Prod::new(self.clone()), Cons::new(self))
}
}
#[cfg(feature = "alloc")]
impl<S: Storage + ?Sized> Split for Box<LocalRb<S>> {
type Prod = Prod<Rc<LocalRb<S>>>;
type Cons = Cons<Rc<LocalRb<S>>>;

fn split(self) -> (Self::Prod, Self::Cons) {
Rc::<LocalRb<S>>::from(self).split()
}
}
impl<S: Storage> SplitRef for LocalRb<S> {
impl<S: Storage + ?Sized> SplitRef for LocalRb<S> {
type RefProd<'a> = Prod<&'a Self> where Self: 'a;
type RefCons<'a> = Cons<&'a Self> where Self: 'a;

Expand All @@ -154,12 +176,12 @@ rb_impl_init!(LocalRb);
impl_producer_traits!(LocalRb<S: Storage>);
impl_consumer_traits!(LocalRb<S: Storage>);

impl<S: Storage> AsRef<Self> for LocalRb<S> {
impl<S: Storage + ?Sized> AsRef<Self> for LocalRb<S> {
fn as_ref(&self) -> &Self {
self
}
}
impl<S: Storage> AsMut<Self> for LocalRb<S> {
impl<S: Storage + ?Sized> AsMut<Self> for LocalRb<S> {
fn as_mut(&mut self) -> &mut Self {
self
}
Expand Down
Loading

0 comments on commit 2345b39

Please sign in to comment.