diff --git a/.github/workflows/sdd.yml b/.github/workflows/sdd.yml index 1f0841d..29bb90a 100644 --- a/.github/workflows/sdd.yml +++ b/.github/workflows/sdd.yml @@ -77,7 +77,7 @@ jobs: - name: Loom run: cargo test --features loom --release --lib - name: Miri - run: MIRIFLAGS="-Zmiri-disable-data-race-detector" cargo +nightly miri test --lib --bins --tests + run: MIRIFLAGS="-Zmiri-disable-data-race-detector" cargo +nightly miri test benchmark: runs-on: ubuntu-latest timeout-minutes: 15 diff --git a/CHANGELOG.md b/CHANGELOG.md index d84d23e..35695c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +3.0.0 + +* Make `Collectible` private since it is unsafe. +* Remove `Guard::defer` which depends on `Collectible`. +* Remove `prepare`. + 2.1.0 * Minor performance optimization. diff --git a/Cargo.toml b/Cargo.toml index bea877f..658d647 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "sdd" description = "Scalable lock-free delayed memory reclaimer" documentation = "https://docs.rs/sdd" -version = "2.1.0" +version = "3.0.0" authors = ["wvwwvwwv "] edition = "2021" rust-version = "1.65.0" diff --git a/README.md b/README.md index 13b5f44..4767aba 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Its delayed deallocation algorithm is based on a variant of epoch-based reclamat This crate can be used _without an `unsafe` block_. ```rust -use sdd::{suspend, AtomicOwned, AtomicShared, Guard, Ptr, Shared, Tag}; +use sdd::{suspend, AtomicOwned, AtomicShared, Guard, Owned, Ptr, Shared, Tag}; use std::sync::atomic::Ordering::Relaxed; // `atomic_shared` holds a strong reference to `17`. @@ -51,7 +51,7 @@ ptr.set_tag(Tag::First); // The ownership of the contained instance is transferred to the return value of CAS. let prev: Shared = atomic_shared.compare_exchange( ptr, - (Some(Shared::new(18)), Tag::Second), + (Some(Shared::new(19)), Tag::Second), Relaxed, Relaxed, &guard).unwrap().0.unwrap(); @@ -62,7 +62,7 @@ drop(prev); // `sdd::AtomicShared` can be converted into `sdd::Shared`. let shared: Shared = atomic_shared.into_shared(Relaxed).unwrap(); -assert_eq!(*shared, 18); +assert_eq!(*shared, 19); // `18` and `19` will be garbage-collected later. drop(shared); @@ -75,6 +75,10 @@ assert_eq!(*ptr.as_ref().unwrap(), 17); guard.defer_execute(|| println!("deferred")); drop(guard); +// `sdd::Owned` and `sdd::Shared` can be nested. +let shared_nested: Shared>> = Shared::new(Owned::new(Shared::new(20))); +assert_eq!(***shared_nested, 20); + // If the thread is expected to lie dormant for a while, call `suspend()` to allow // others to reclaim the memory. suspend(); diff --git a/examples/src/ebr.rs b/examples/src/ebr.rs index 86854fa..866bcfa 100644 --- a/examples/src/ebr.rs +++ b/examples/src/ebr.rs @@ -3,7 +3,7 @@ mod examples { use sdd::{AtomicShared, Guard, Owned, Shared, Tag}; use std::sync::atomic::AtomicIsize; use std::sync::atomic::Ordering::{Acquire, Relaxed}; - use std::thread; + use std::thread::{self, yield_now}; struct R(&'static AtomicIsize); impl Drop for R { @@ -35,8 +35,8 @@ mod examples { drop(guard); while DROP_CNT.load(Relaxed) != 1 { - let guard = Guard::new(); - drop(guard); + Guard::new().accelerate(); + yield_now(); } assert_eq!(DROP_CNT.load(Relaxed), 1); } @@ -81,8 +81,8 @@ mod examples { }); while DROP_CNT.load(Relaxed) != 2 { - let guard = Guard::new(); - drop(guard); + Guard::new().accelerate(); + yield_now(); } assert_eq!(DROP_CNT.load(Relaxed), 2); } diff --git a/src/collectible.rs b/src/collectible.rs index 065b1f2..ab15ab6 100644 --- a/src/collectible.rs +++ b/src/collectible.rs @@ -4,38 +4,7 @@ use std::sync::atomic::{AtomicPtr, AtomicUsize}; /// [`Collectible`] defines the memory layout for the type in order to be passed to the garbage /// collector. -/// -/// # Examples -/// -/// ``` -/// use sdd::{Collectible, Guard, Link}; -/// use std::ptr::NonNull; -/// -/// struct LazyString(String, Link); -/// -/// impl Collectible for LazyString { -/// fn next_ptr(&self) -> Option> { -/// self.1.next_ptr() -/// } -/// fn set_next_ptr(&self, next_ptr: Option>) { -/// self.1.set_next_ptr(next_ptr); -/// } -/// } -/// -/// let boxed: Box = Box::new(LazyString(String::from("Lazy"), Link::default())); -/// -/// let static_ref: &'static LazyString = unsafe { std::mem::transmute(&*boxed) }; -/// let guard_for_ref = Guard::new(); -/// -/// let guard_to_drop = Guard::new(); -/// guard_to_drop.defer(boxed); -/// drop(guard_to_drop); -/// -/// // The reference is valid as long as a `Guard` that had been created before `boxed` was -/// // passed to a `Guard` survives. -/// assert_eq!(static_ref.0, "Lazy"); -/// ``` -pub trait Collectible { +pub(super) trait Collectible { /// Returns the next [`Collectible`] pointer. fn next_ptr(&self) -> Option>; diff --git a/src/collector.rs b/src/collector.rs index 9797773..f2ed110 100644 --- a/src/collector.rs +++ b/src/collector.rs @@ -1,13 +1,14 @@ +use super::collectible::{Collectible, Link}; use super::exit_guard::ExitGuard; use super::maybe_std::fence as maybe_std_fence; -use super::{Collectible, Epoch, Link, Tag}; -use std::ptr::{self, NonNull}; +use super::{Epoch, Tag}; +use std::ptr::{self, addr_of_mut, NonNull}; use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst}; use std::sync::atomic::{AtomicPtr, AtomicU8}; /// [`Collector`] is a garbage collector that reclaims thread-locally unreachable instances /// when they are globally unreachable. -#[derive(Debug)] +#[derive(Debug, Default)] #[repr(align(128))] pub(super) struct Collector { state: AtomicU8, @@ -205,18 +206,9 @@ impl Collector { /// Allocates a new [`Collector`]. fn alloc() -> *mut Collector { - let boxed = Box::new(Collector { - state: AtomicU8::new(Self::INACTIVE), - announcement: Epoch::default(), - next_epoch_update: Self::CADENCE, - has_garbage: false, - num_readers: 0, - previous_instance_link: Link::default().next_ptr(), - current_instance_link: Link::default().next_ptr(), - next_instance_link: Link::default().next_ptr(), - next_link: AtomicPtr::default(), - link: Link::default(), - }); + let boxed = Box::new(Collector::default()); + boxed.state.store(Self::INACTIVE, Relaxed); + let ptr = Box::into_raw(boxed); let mut current = GLOBAL_ROOT.chain_head.load(Relaxed); loop { @@ -297,44 +289,13 @@ impl Collector { /// Scans the [`Collector`] instances to update the global epoch. unsafe fn scan(collector_ptr: *mut Collector) -> bool { - debug_assert_eq!((*collector_ptr).state.load(Relaxed) & Self::INACTIVE, 0); - debug_assert_eq!( - (*collector_ptr).state.load(Relaxed), - u8::from((*collector_ptr).announcement) - ); + debug_assert_eq!((*collector_ptr).state.load(Relaxed) & Self::INVALID, 0); // Only one thread that acquires the chain lock is allowed to scan the thread-local // collectors. - let lock_result = GLOBAL_ROOT - .chain_head - .fetch_update(Acquire, Acquire, |p| { - let tag = Tag::into_tag(p); - if tag == Tag::First || tag == Tag::Both { - None - } else { - Some(Tag::update_tag(p, Tag::First).cast_mut()) - } - }) - .map(|p| Tag::unset_tag(p).cast_mut()); + let lock_result = Self::lock_chain(); if let Ok(mut current_collector_ptr) = lock_result { - let _guard = ExitGuard::new((), |()| { - // Unlock the chain. - loop { - let result = GLOBAL_ROOT.chain_head.fetch_update(Release, Relaxed, |p| { - let tag = Tag::into_tag(p); - debug_assert!(tag == Tag::First || tag == Tag::Both); - let new_tag = if tag == Tag::First { - Tag::None - } else { - Tag::Second - }; - Some(Tag::update_tag(p, new_tag).cast_mut()) - }); - if result.is_ok() { - break; - } - } - }); + let _guard = ExitGuard::new((), |()| Self::unlock_chain()); let known_epoch = (*collector_ptr).state.load(Relaxed); let mut update_global_epoch = true; @@ -396,6 +357,80 @@ impl Collector { false } + + /// Clears the [`Collector`] chain to if all are invalid. + unsafe fn clear_chain() -> bool { + let lock_result = Self::lock_chain(); + if let Ok(collector_head) = lock_result { + let _guard = ExitGuard::new((), |()| Self::unlock_chain()); + + let mut current_collector_ptr = collector_head; + while !current_collector_ptr.is_null() { + if ((*current_collector_ptr).state.load(Relaxed) & Self::INVALID) == 0 { + return false; + } + current_collector_ptr = (*current_collector_ptr).next_link.load(Relaxed); + } + + // Reaching here means that there is no `Ptr` that possibly sees any garbage instances + // in those `Collector` instances in the chain. + let result = GLOBAL_ROOT.chain_head.fetch_update(Release, Relaxed, |p| { + if Tag::unset_tag(p) == collector_head { + let tag = Tag::into_tag(p); + debug_assert!(tag == Tag::First || tag == Tag::Both); + Some(Tag::update_tag(ptr::null::(), tag).cast_mut()) + } else { + None + } + }); + + if result.is_ok() { + let mut current_collector_ptr = collector_head; + while !current_collector_ptr.is_null() { + let next_collector_ptr = (*current_collector_ptr).next_link.load(Relaxed); + drop(Box::from_raw(current_collector_ptr)); + current_collector_ptr = next_collector_ptr; + } + return true; + } + } + false + } + + /// Locks the chain. + fn lock_chain() -> Result<*mut Collector, *mut Collector> { + GLOBAL_ROOT + .chain_head + .fetch_update(Acquire, Acquire, |p| { + let tag = Tag::into_tag(p); + if tag == Tag::First || tag == Tag::Both { + None + } else { + Some(Tag::update_tag(p, Tag::First).cast_mut()) + } + }) + .map(|p| Tag::unset_tag(p).cast_mut()) + } + + /// Unlocks the chain. + fn unlock_chain() { + loop { + let result = GLOBAL_ROOT.chain_head.fetch_update(Release, Relaxed, |p| { + let tag = Tag::into_tag(p); + debug_assert!(tag == Tag::First || tag == Tag::Both); + let new_tag = if tag == Tag::First { + Tag::None + } else { + // Retain the mark. + Tag::Second + }; + Some(Tag::update_tag(p, new_tag).cast_mut()) + }); + if result.is_ok() { + break; + } + } + } } impl Drop for Collector { @@ -431,7 +466,7 @@ impl Drop for CollectorAnchor { #[inline] fn drop(&mut self) { unsafe { - try_drop_local_collector(); + clear_local_collector(); } } } @@ -449,35 +484,23 @@ fn mark_scan_enforced() { }); } -/// Tries to drop the local [`Collector`] if it is the sole survivor. -/// -/// # Safety -/// -/// The function is safe to call only when the thread is being joined. -unsafe fn try_drop_local_collector() { - let collector_ptr = LOCAL_COLLECTOR.with(|local_collector| local_collector.load(Relaxed)); - if collector_ptr.is_null() { - return; - } - let chain_head_ptr = GLOBAL_ROOT.chain_head.load(Relaxed); - if (*collector_ptr).next_link.load(Relaxed).is_null() - && ptr::eq(collector_ptr, chain_head_ptr) - && GLOBAL_ROOT - .chain_head - .compare_exchange(chain_head_ptr, ptr::null_mut(), Relaxed, Relaxed) - .is_ok() - { - // If it is the head, and the only `Collector` in the chain, drop it here. - // - // The `Collector` needs to be cleared before being dropped since nested `Collectible`s may - // access the `Collector`, causing trouble with `MIRI`. - Collector::clear_for_drop(collector_ptr); - drop(Box::from_raw(collector_ptr)); - return; - } +/// Tries to clear the local [`Collector`] and the chain. +unsafe fn clear_local_collector() { + LOCAL_COLLECTOR.with(|local_collector| { + let collector_ptr = local_collector.load(Relaxed); + if !collector_ptr.is_null() { + (*collector_ptr).state.fetch_or(Collector::INVALID, Release); + } + + let mut temp_collector = Collector::default(); + local_collector.store(addr_of_mut!(temp_collector), Relaxed); + if !Collector::clear_chain() { + mark_scan_enforced(); + } - (*collector_ptr).state.fetch_or(Collector::INVALID, Release); - mark_scan_enforced(); + Collector::clear_for_drop(addr_of_mut!(temp_collector)); + local_collector.store(ptr::null_mut(), Relaxed); + }); } thread_local! { diff --git a/src/exit_guard.rs b/src/exit_guard.rs index 4645d91..c753cbb 100644 --- a/src/exit_guard.rs +++ b/src/exit_guard.rs @@ -4,7 +4,7 @@ use std::ops::{Deref, DerefMut}; /// [`ExitGuard`] captures the environment and invokes the defined closure at the end of the scope. -pub(crate) struct ExitGuard { +pub(super) struct ExitGuard { drop_callback: Option<(T, F)>, } diff --git a/src/guard.rs b/src/guard.rs index caf2465..b70db97 100644 --- a/src/guard.rs +++ b/src/guard.rs @@ -1,6 +1,6 @@ use super::collectible::DeferredClosure; use super::collector::Collector; -use super::{Collectible, Epoch}; +use super::Epoch; use std::panic::UnwindSafe; /// [`Guard`] allows the user to read [`AtomicShared`](super::AtomicShared) and keeps the @@ -43,16 +43,15 @@ impl Guard { /// This method can be used to check whether a retired memory region is potentially reachable or /// not. A chunk of memory retired in a witnessed [`Epoch`] can be deallocated after the thread /// has observed three new epochs. For instance, if the witnessed epoch value is `1` in the - /// current thread where the global epoch value is `2`, and an instance of a [`Collectible`] - /// type is retired in the same thread, the instance can be dropped when the thread witnesses - /// `0` which is three epochs away from `1`. + /// current thread where the global epoch value is `2`, and an instance is retired in the same + /// thread, the instance can be dropped when the thread witnesses `0` which is three epochs away + /// from `1`. /// /// In other words, there can be potential readers of the memory chunk until the current thread /// witnesses the previous epoch. In the above example, the global epoch can be in `2` /// while the current thread has only witnessed `1`, and therefore there can a reader of the /// memory chunk in another thread in epoch `2`. The reader can survive until the global epoch - /// reaches `0` again, because the thread being in `2` prevents the global epoch from reaching - /// `0`. + /// reaches `0`, because the thread being in `2` prevents the global epoch from reaching `0`. /// /// # Examples /// @@ -92,7 +91,6 @@ impl Guard { /// /// assert!(DROPPED.load(Relaxed)); /// assert_eq!(Guard::new().epoch(), epoch_before.prev()); - /// /// ``` #[inline] #[must_use] @@ -123,42 +121,6 @@ impl Guard { } } - /// Defers dropping and memory reclamation of the supplied [`Box`] of a type implementing - /// [`Collectible`]. - /// - /// # Examples - /// - /// ``` - /// use sdd::{Collectible, Guard, Link}; - /// use std::ptr::NonNull; - /// - /// struct C(usize, Link); - /// - /// impl Collectible for C { - /// fn next_ptr(&self) -> Option> { - /// self.1.next_ptr() - /// } - /// fn set_next_ptr(&self, next_ptr: Option>) { - /// self.1.set_next_ptr(next_ptr); - /// } - /// } - /// - /// let boxed: Box = Box::new(C(7, Link::default())); - /// - /// let static_ref: &'static C = unsafe { std::mem::transmute(&*boxed) }; - /// - /// let guard = Guard::new(); - /// guard.defer(boxed); - /// - /// assert_eq!(static_ref.0, 7); - /// ``` - #[inline] - pub fn defer(&self, collectible: Box) { - unsafe { - Collector::collect(self.collector_ptr, Box::into_raw(collectible)); - } - } - /// Executes the supplied closure at a later point of time. /// /// It is guaranteed that the closure will be executed after every [`Guard`] at the moment when @@ -175,7 +137,12 @@ impl Guard { /// ``` #[inline] pub fn defer_execute(&self, f: F) { - self.defer(Box::new(DeferredClosure::new(f))); + unsafe { + Collector::collect( + self.collector_ptr, + Box::into_raw(Box::new(DeferredClosure::new(f))), + ); + } } } diff --git a/src/lib.rs b/src/lib.rs index 50ab3ea..6fdcffb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,9 +10,6 @@ pub use atomic_shared::AtomicShared; mod guard; pub use guard::Guard; -mod collectible; -pub use collectible::{Collectible, Link}; - mod epoch; pub use epoch::Epoch; @@ -28,34 +25,11 @@ pub use shared::Shared; mod tag; pub use tag::Tag; +mod collectible; mod collector; mod exit_guard; mod ref_counted; -/// Prepares a garbage collector for the current thread. -/// -/// This method is useful in an environment where heap memory allocation is strictly controlled. -/// [`Guard::new`] will never fail afterwards in the current thread until [`suspend`] is called as -/// long as [`drop`] of every [`Collectible`] type is infallible. -/// -/// # Panics -/// -/// Panics if memory allocation failed. -/// -/// # Examples -/// -/// ``` -/// use sdd::{prepare, Guard}; -/// -/// prepare(); -/// -/// let guard = Guard::new(); -/// ``` -#[inline] -pub fn prepare() { - collector::Collector::current(); -} - /// Suspends the garbage collector of the current thread. /// /// If returns `false` if there is an active [`Guard`] in the thread. Otherwise, it passes all its diff --git a/src/ref_counted.rs b/src/ref_counted.rs index 14ef407..d20d6dc 100644 --- a/src/ref_counted.rs +++ b/src/ref_counted.rs @@ -1,5 +1,5 @@ +use super::collectible::{Collectible, Link}; use super::collector::Collector; -use super::{Collectible, Link}; use std::ops::Deref; use std::ptr::{self, addr_of, NonNull}; use std::sync::atomic::AtomicUsize; diff --git a/src/tests/model.rs b/src/tests/model.rs index 7651d79..0ce718e 100644 --- a/src/tests/model.rs +++ b/src/tests/model.rs @@ -5,7 +5,7 @@ mod test_model { use loom::sync::atomic::AtomicUsize; use loom::thread::{spawn, yield_now}; use std::sync::atomic::Ordering::Relaxed; - use std::sync::Arc; + use std::sync::{Arc, Mutex}; struct A(String, Arc); impl Drop for A { @@ -14,8 +14,11 @@ mod test_model { } } + static SERIALIZER: Mutex<()> = Mutex::new(()); + #[test] - fn ebr() { + fn ebr_owned() { + let _guard = SERIALIZER.lock().unwrap(); loom::model(|| { let str = "HOW ARE YOU HOW ARE YOU"; let drop_count = Arc::new(AtomicUsize::new(0)); @@ -48,7 +51,11 @@ mod test_model { assert!(thread.join().is_ok()); assert_eq!(drop_count.load(Relaxed), 1); }); + } + #[test] + fn ebr_shared() { + let _guard = SERIALIZER.lock().unwrap(); loom::model(|| { let str = "HOW ARE YOU HOW ARE YOU"; let drop_count = Arc::new(AtomicUsize::new(0));