From a19c33cae1cf2e13170d17cd2bbb7bff961e9e4b Mon Sep 17 00:00:00 2001 From: Matthew Pabst Date: Tue, 5 May 2020 18:05:04 -0500 Subject: [PATCH] Added two custom queue implementations: DirtyQueue is a simple MS queue with no memory management EpochQueue uses crossbeam's epoch-based memory management --- Cargo.toml | 1 + src/cmdoptions.rs | 6 +- src/{custom_queue.rs => dirty_queue.rs} | 46 ++++---- src/epoch_queue.rs | 135 ++++++++++++++++++++++++ src/main.rs | 4 +- src/sync_queue.rs | 30 ++---- 6 files changed, 168 insertions(+), 54 deletions(-) rename src/{custom_queue.rs => dirty_queue.rs} (74%) create mode 100644 src/epoch_queue.rs diff --git a/Cargo.toml b/Cargo.toml index 431f7d6..f4462f1 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ version = "0.1.0" clap = "2.33.0" spin = "0.5.2" crossbeam-queue = "0.2.1" +crossbeam-epoch = "0.8.2" lockfree = "0.5.1" stderrlog = "0.4.2" log = "0.4.8" diff --git a/src/cmdoptions.rs b/src/cmdoptions.rs index 9ff24bc..5b91690 100755 --- a/src/cmdoptions.rs +++ b/src/cmdoptions.rs @@ -9,7 +9,6 @@ use clap::{Arg, App}; #[derive(Clone, Debug)] pub struct CmdOptions { - // TODO command line options pub impl_type: ImplType, pub benchmark: String, pub verbosity: usize, @@ -36,7 +35,7 @@ impl CmdOptions { .required(false) .takes_value(true) .help("specifies the implementation to evaluate - \n\toptions include mutex, spin, lockfree, crossbeam, and custom")) + \n\toptions include mutex, spin, lockfree, crossbeam, dirty, and epoch")) .arg(Arg::with_name("bench") .short("b") .required(false) @@ -56,7 +55,8 @@ impl CmdOptions { "spin" => ImplType::SpinLock, "lockfree" => ImplType::Lockfree, "crossbeam" => ImplType::Crossbeam, - "custom" => ImplType::Custom, + "dirty" => ImplType::Dirty, + "epoch" => ImplType::Epoch, _ => panic!("Invalid choice of implementation type!"), }; diff --git a/src/custom_queue.rs b/src/dirty_queue.rs similarity index 74% rename from src/custom_queue.rs rename to src/dirty_queue.rs index 1a0edb9..12270ff 100644 --- a/src/custom_queue.rs +++ b/src/dirty_queue.rs @@ -2,6 +2,7 @@ use std::sync::atomic::{AtomicPtr, Ordering}; use std::ptr; use std::mem::MaybeUninit; use std::cell::UnsafeCell; +use sync_queue::SyncQueue; /// Stores data and next pointers for items in the queue // This will align nodes to cachelines, to avoid false sharing between cores. @@ -24,23 +25,21 @@ impl Node { } } -/// Custom lockfree queue based on the Michael-Scott queue design +/// Dirty lockfree queue based on the Michael-Scott queue design // Reference counting is difficult to implement in Rust, since there are no -// double-word CAS. Instead the lower bits of pointers are used to indicate -// whether objects are safe to destroy (TODO). This approach is based off of -// a blog post by Christian Hergert. +// double-word CAS. This approach is based off of a blog post by Christian Hergert. // (http://www.hergert.me/blog/2009/12/25/intro-to-lock-free-wait-free-and-aba.html) -pub struct CustomQueue { +pub struct DirtyQueue { head: AtomicPtr>, tail: AtomicPtr>, } -impl CustomQueue { - pub fn new() -> CustomQueue { +impl DirtyQueue { + pub fn new() -> DirtyQueue { // Initializes the queue with an empty node. This makes the push/pop // logic much simpler. let empty_node = Box::into_raw(Box::new(Node::new())); - CustomQueue { + DirtyQueue { head: AtomicPtr::new(empty_node), tail: AtomicPtr::new(empty_node), } @@ -52,12 +51,7 @@ impl CustomQueue { let mut tail: *mut Node; loop { tail = self.tail.load(Ordering::SeqCst); - // TODO set tail has hazardous - // check that tail has not changed - if tail != self.tail.load(Ordering::SeqCst) { - continue - } - + // grab the next pointer and make sure that tail has not changed under us let next: *mut Node = unsafe { (*tail).next.load(Ordering::SeqCst) }; if tail != self.tail.load(Ordering::SeqCst) { @@ -66,7 +60,6 @@ impl CustomQueue { // if next pointer is not null, someone else pushed, so we should retry if next != ptr::null_mut() { - self.tail.compare_and_swap(tail, next, Ordering::SeqCst); continue } @@ -85,28 +78,18 @@ impl CustomQueue { let result: T; loop { head = self.head.load(Ordering::SeqCst); - // TODO set head has hazardous - // check that head hasn't changed - if head != self.head.load(Ordering::SeqCst) { - continue - } let tail = self.tail.load(Ordering::SeqCst); // grab the next pointer and make sure the head hasn't changed let next = unsafe { (*head).next.load(Ordering::SeqCst) }; - // TODO set next has hazardous - if head != self.head.load(Ordering::SeqCst) { - continue - } // if there are no more nodes, the queue is empty if next == ptr::null_mut() { return None } - // not completely sure why this is necessary... + // someone beat us to popping if head == tail { - self.tail.compare_and_swap(tail, next, Ordering::SeqCst); continue } @@ -117,7 +100,16 @@ impl CustomQueue { break } } - // TODO unset hazard bit on head, and perform reclamation if needed Some(result) } } + +impl SyncQueue for DirtyQueue { + fn pop(&self) -> Option { + self.pop() + } + + fn push(&self, elem: T) { + self.push(elem) + } +} diff --git a/src/epoch_queue.rs b/src/epoch_queue.rs new file mode 100644 index 0000000..523d93d --- /dev/null +++ b/src/epoch_queue.rs @@ -0,0 +1,135 @@ +use std::sync::atomic::Ordering; +use std::mem::MaybeUninit; +use crossbeam_epoch as epoch; +use crossbeam_epoch::{Atomic, Owned, Shared}; +use sync_queue::SyncQueue; + +/// Stores data and next pointers for items in the queue +// This will align nodes to cachelines, to avoid false sharing between cores. +// Experiments seem to show that this hurts performance for some reason. +//#[repr(align(64))] +pub struct Node { + // The MaybeUninit wrapper allows for uninitialized nodes to be created. + pub data: MaybeUninit, + // This pointer to the next node is atomic to allow CAS. + pub next: Atomic>, +} + +impl Node { + fn new() -> Node { + Node { + data: MaybeUninit::uninit(), + next: Atomic::null(), + } + } +} + +/// Custom lockfree queue based on the Michael-Scott queue design +// Reference counting is difficult to implement in Rust, since there are no +// double-word CAS. +// Our implementation is based off of a blog post by Christian Hergert: +// (http://www.hergert.me/blog/2009/12/25/intro-to-lock-free-wait-free-and-aba.html) +pub struct EpochQueue { + head: Atomic>, + tail: Atomic>, +} + +impl EpochQueue { + pub fn new() -> EpochQueue { + let queue = EpochQueue { + head: Atomic::null(), + tail: Atomic::null(), + }; + + // Initalize the queue with an empty (sentinel) node to simplify push/pop logic + let empty_node = Owned::new(Node::new()); + unsafe { + let guard = &epoch::unprotected(); // current thread is active in data structure + let sentinel = empty_node.into_shared(guard); // move this node into the data structure + queue.head.store(sentinel, Ordering::Relaxed); + queue.tail.store(sentinel, Ordering::Relaxed); + queue + } + } + + pub fn push(&self, item: T) { + // Create the new node + let mut new_node = Node::new(); + new_node.data = MaybeUninit::new(item); + + let guard = &epoch::pin(); // enter data structure + let new_node = Owned::new(new_node).into_shared(guard); // move the new node into the data structure + loop { + let shared_tail = self.tail.load(Ordering::SeqCst, guard); + let raw_tail = unsafe { shared_tail.deref() }; + let shared_next = raw_tail.next.load(Ordering::SeqCst, guard); + + // Have any threads pushed onto our snapshot of tail? + if unsafe { shared_next.as_ref().is_some() } { + // Someone beat us to it, so we should restart. + continue + } + + // Try to add our new node. + if raw_tail.next.compare_and_set(Shared::null(), new_node, Ordering::SeqCst, guard).is_ok() { + // Success! Now we can link the global tail to our node. + let _ = self.tail.compare_and_set(shared_tail, new_node, Ordering::SeqCst, guard); + return + } + } + } + + pub fn pop(&self) -> Option { + let guard = &epoch::pin(); // enter data structure + loop { + let shared_head = self.head.load(Ordering::SeqCst, guard); + let raw_head = unsafe { shared_head.deref() }; + let shared_next = raw_head.next.load(Ordering::SeqCst, guard); + + // Are there any real nodes attached to the sentinel node? + match unsafe { shared_next.as_ref() } { + // Found something in the queue! + Some(raw_next) => { + // Let's try to disconnect the head node. + match self.head.compare_and_set(shared_head, shared_next, Ordering::SeqCst, guard) { + // Success! Now we can return the value in the new head. + Ok(_) => { + let shared_tail = self.tail.load(Ordering::SeqCst, guard); + if shared_head == shared_tail { + let _ = self.tail.compare_and_set(shared_tail, shared_next, Ordering::SeqCst, guard); + } + unsafe { + guard.defer_destroy(shared_head); + return Some(raw_next.data.as_ptr().read()) + } + }, + // Someone beat us to it! Let's retry. + Err(_) => continue, + } + }, + // Nothing in the queue. + None => return None, + } + } + } +} + +impl Drop for EpochQueue { + fn drop(&mut self) { + while let Some(_) = self.pop() {} + unsafe { + let sentinel = self.head.load(Ordering::SeqCst, &epoch::unprotected()); + drop(sentinel.into_owned()); + } + } +} + +impl SyncQueue for EpochQueue { + fn pop(&self) -> Option { + self.pop() + } + + fn push(&self, elem: T) { + self.push(elem) + } +} diff --git a/src/main.rs b/src/main.rs index 9352993..2faeefe 100755 --- a/src/main.rs +++ b/src/main.rs @@ -3,12 +3,14 @@ extern crate log; extern crate stderrlog; extern crate spin; extern crate crossbeam_queue; +extern crate crossbeam_epoch; extern crate lockfree; pub mod cmdoptions; pub mod benchmark; pub mod kernels; pub mod sync_queue; -pub mod custom_queue; +pub mod dirty_queue; +pub mod epoch_queue; use kernels::{WorkloadType}; use benchmark::{Benchmark}; use log::{info}; diff --git a/src/sync_queue.rs b/src/sync_queue.rs index d2b6cce..ce76a50 100644 --- a/src/sync_queue.rs +++ b/src/sync_queue.rs @@ -2,8 +2,10 @@ use std::collections::VecDeque; use std::sync::Mutex; use spin::Mutex as Spinlock; use crossbeam_queue::SegQueue; +use crossbeam_epoch::sync::queue::Queue as CBEpochQueue; use lockfree::queue::Queue as LFQueue; -use custom_queue::CustomQueue; +use dirty_queue::DirtyQueue; +use epoch_queue::EpochQueue; pub trait SyncQueue: Send + Sync { fn pop(&self) -> Option; @@ -16,7 +18,8 @@ pub enum ImplType { SpinLock, Crossbeam, Lockfree, - Custom, // TODO eventually + Dirty, + Epoch, } /// Constructor function for building queues given an ImplType. @@ -26,7 +29,8 @@ pub fn create_impl(t: &ImplType) -> Box Box::new(SpinQueue::::new()), ImplType::Crossbeam => Box::new(CrossbeamQueue::::new()), ImplType::Lockfree => Box::new(LockfreeQueue::::new()), - ImplType::Custom => Box::new(OurQueue::::new()), + ImplType::Dirty => Box::new(DirtyQueue::::new()), + ImplType::Epoch => Box::new(EpochQueue::::new()), } } @@ -123,23 +127,3 @@ impl SyncQueue for LockfreeQueue { } } -struct OurQueue { - q: CustomQueue, -} - -impl OurQueue { - pub fn new() -> OurQueue { - OurQueue { q: CustomQueue::new(), } - } -} - - -impl SyncQueue for OurQueue { - fn pop(&self) -> Option { - self.q.pop() - } - - fn push(&self, elem: T) { - self.q.push(elem) - } -}