Skip to content

Commit

Permalink
Added two custom queue implementations:
Browse files Browse the repository at this point in the history
    DirtyQueue is a simple MS queue with no memory management
    EpochQueue uses crossbeam's epoch-based memory management
  • Loading branch information
PabstMatthew committed May 5, 2020
1 parent 202fce1 commit a19c33c
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 54 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ version = "0.1.0"
clap = "2.33.0"
spin = "0.5.2"
crossbeam-queue = "0.2.1"
crossbeam-epoch = "0.8.2"
lockfree = "0.5.1"
stderrlog = "0.4.2"
log = "0.4.8"
6 changes: 3 additions & 3 deletions src/cmdoptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use clap::{Arg, App};

#[derive(Clone, Debug)]
pub struct CmdOptions {
// TODO command line options
pub impl_type: ImplType,
pub benchmark: String,
pub verbosity: usize,
Expand All @@ -36,7 +35,7 @@ impl CmdOptions {
.required(false)
.takes_value(true)
.help("specifies the implementation to evaluate
\n\toptions include mutex, spin, lockfree, crossbeam, and custom"))
\n\toptions include mutex, spin, lockfree, crossbeam, dirty, and epoch"))
.arg(Arg::with_name("bench")
.short("b")
.required(false)
Expand All @@ -56,7 +55,8 @@ impl CmdOptions {
"spin" => ImplType::SpinLock,
"lockfree" => ImplType::Lockfree,
"crossbeam" => ImplType::Crossbeam,
"custom" => ImplType::Custom,
"dirty" => ImplType::Dirty,
"epoch" => ImplType::Epoch,
_ => panic!("Invalid choice of implementation type!"),
};

Expand Down
46 changes: 19 additions & 27 deletions src/custom_queue.rs → src/dirty_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::atomic::{AtomicPtr, Ordering};
use std::ptr;
use std::mem::MaybeUninit;
use std::cell::UnsafeCell;
use sync_queue::SyncQueue;

/// Stores data and next pointers for items in the queue
// This will align nodes to cachelines, to avoid false sharing between cores.
Expand All @@ -24,23 +25,21 @@ impl<T> Node<T> {
}
}

/// Custom lockfree queue based on the Michael-Scott queue design
/// Dirty lockfree queue based on the Michael-Scott queue design
// Reference counting is difficult to implement in Rust, since there are no
// double-word CAS. Instead the lower bits of pointers are used to indicate
// whether objects are safe to destroy (TODO). This approach is based off of
// a blog post by Christian Hergert.
// double-word CAS. This approach is based off of a blog post by Christian Hergert.
// (http://www.hergert.me/blog/2009/12/25/intro-to-lock-free-wait-free-and-aba.html)
pub struct CustomQueue<T> {
pub struct DirtyQueue<T> {
head: AtomicPtr<Node<T>>,
tail: AtomicPtr<Node<T>>,
}

impl<T> CustomQueue<T> {
pub fn new() -> CustomQueue<T> {
impl<T> DirtyQueue<T> {
pub fn new() -> DirtyQueue<T> {
// Initializes the queue with an empty node. This makes the push/pop
// logic much simpler.
let empty_node = Box::into_raw(Box::new(Node::new()));
CustomQueue {
DirtyQueue {
head: AtomicPtr::new(empty_node),
tail: AtomicPtr::new(empty_node),
}
Expand All @@ -52,12 +51,7 @@ impl<T> CustomQueue<T> {
let mut tail: *mut Node<T>;
loop {
tail = self.tail.load(Ordering::SeqCst);
// TODO set tail has hazardous
// check that tail has not changed
if tail != self.tail.load(Ordering::SeqCst) {
continue
}


// grab the next pointer and make sure that tail has not changed under us
let next: *mut Node<T> = unsafe { (*tail).next.load(Ordering::SeqCst) };
if tail != self.tail.load(Ordering::SeqCst) {
Expand All @@ -66,7 +60,6 @@ impl<T> CustomQueue<T> {

// if next pointer is not null, someone else pushed, so we should retry
if next != ptr::null_mut() {
self.tail.compare_and_swap(tail, next, Ordering::SeqCst);
continue
}

Expand All @@ -85,28 +78,18 @@ impl<T> CustomQueue<T> {
let result: T;
loop {
head = self.head.load(Ordering::SeqCst);
// TODO set head has hazardous
// check that head hasn't changed
if head != self.head.load(Ordering::SeqCst) {
continue
}

let tail = self.tail.load(Ordering::SeqCst);
// grab the next pointer and make sure the head hasn't changed
let next = unsafe { (*head).next.load(Ordering::SeqCst) };
// TODO set next has hazardous
if head != self.head.load(Ordering::SeqCst) {
continue
}

// if there are no more nodes, the queue is empty
if next == ptr::null_mut() {
return None
}

// not completely sure why this is necessary...
// someone beat us to popping
if head == tail {
self.tail.compare_and_swap(tail, next, Ordering::SeqCst);
continue
}

Expand All @@ -117,7 +100,16 @@ impl<T> CustomQueue<T> {
break
}
}
// TODO unset hazard bit on head, and perform reclamation if needed
Some(result)
}
}

impl<T: Send + Sync> SyncQueue<T> for DirtyQueue<T> {
fn pop(&self) -> Option<T> {
self.pop()
}

fn push(&self, elem: T) {
self.push(elem)
}
}
135 changes: 135 additions & 0 deletions src/epoch_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use std::sync::atomic::Ordering;
use std::mem::MaybeUninit;
use crossbeam_epoch as epoch;
use crossbeam_epoch::{Atomic, Owned, Shared};
use sync_queue::SyncQueue;

/// Stores data and next pointers for items in the queue
// This will align nodes to cachelines, to avoid false sharing between cores.
// Experiments seem to show that this hurts performance for some reason.
//#[repr(align(64))]
pub struct Node<T> {
// The MaybeUninit wrapper allows for uninitialized nodes to be created.
pub data: MaybeUninit<T>,
// This pointer to the next node is atomic to allow CAS.
pub next: Atomic<Node<T>>,
}

impl<T> Node<T> {
fn new() -> Node<T> {
Node {
data: MaybeUninit::uninit(),
next: Atomic::null(),
}
}
}

/// Custom lockfree queue based on the Michael-Scott queue design
// Reference counting is difficult to implement in Rust, since there are no
// double-word CAS.
// Our implementation is based off of a blog post by Christian Hergert:
// (http://www.hergert.me/blog/2009/12/25/intro-to-lock-free-wait-free-and-aba.html)
pub struct EpochQueue<T> {
head: Atomic<Node<T>>,
tail: Atomic<Node<T>>,
}

impl<T> EpochQueue<T> {
pub fn new() -> EpochQueue<T> {
let queue = EpochQueue {
head: Atomic::null(),
tail: Atomic::null(),
};

// Initalize the queue with an empty (sentinel) node to simplify push/pop logic
let empty_node = Owned::new(Node::new());
unsafe {
let guard = &epoch::unprotected(); // current thread is active in data structure
let sentinel = empty_node.into_shared(guard); // move this node into the data structure
queue.head.store(sentinel, Ordering::Relaxed);
queue.tail.store(sentinel, Ordering::Relaxed);
queue
}
}

pub fn push(&self, item: T) {
// Create the new node
let mut new_node = Node::new();
new_node.data = MaybeUninit::new(item);

let guard = &epoch::pin(); // enter data structure
let new_node = Owned::new(new_node).into_shared(guard); // move the new node into the data structure
loop {
let shared_tail = self.tail.load(Ordering::SeqCst, guard);
let raw_tail = unsafe { shared_tail.deref() };
let shared_next = raw_tail.next.load(Ordering::SeqCst, guard);

// Have any threads pushed onto our snapshot of tail?
if unsafe { shared_next.as_ref().is_some() } {
// Someone beat us to it, so we should restart.
continue
}

// Try to add our new node.
if raw_tail.next.compare_and_set(Shared::null(), new_node, Ordering::SeqCst, guard).is_ok() {
// Success! Now we can link the global tail to our node.
let _ = self.tail.compare_and_set(shared_tail, new_node, Ordering::SeqCst, guard);
return
}
}
}

pub fn pop(&self) -> Option<T> {
let guard = &epoch::pin(); // enter data structure
loop {
let shared_head = self.head.load(Ordering::SeqCst, guard);
let raw_head = unsafe { shared_head.deref() };
let shared_next = raw_head.next.load(Ordering::SeqCst, guard);

// Are there any real nodes attached to the sentinel node?
match unsafe { shared_next.as_ref() } {
// Found something in the queue!
Some(raw_next) => {
// Let's try to disconnect the head node.
match self.head.compare_and_set(shared_head, shared_next, Ordering::SeqCst, guard) {
// Success! Now we can return the value in the new head.
Ok(_) => {
let shared_tail = self.tail.load(Ordering::SeqCst, guard);
if shared_head == shared_tail {
let _ = self.tail.compare_and_set(shared_tail, shared_next, Ordering::SeqCst, guard);
}
unsafe {
guard.defer_destroy(shared_head);
return Some(raw_next.data.as_ptr().read())
}
},
// Someone beat us to it! Let's retry.
Err(_) => continue,
}
},
// Nothing in the queue.
None => return None,
}
}
}
}

impl<T> Drop for EpochQueue<T> {
fn drop(&mut self) {
while let Some(_) = self.pop() {}
unsafe {
let sentinel = self.head.load(Ordering::SeqCst, &epoch::unprotected());
drop(sentinel.into_owned());
}
}
}

impl<T: Send + Sync> SyncQueue<T> for EpochQueue<T> {
fn pop(&self) -> Option<T> {
self.pop()
}

fn push(&self, elem: T) {
self.push(elem)
}
}
4 changes: 3 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ extern crate log;
extern crate stderrlog;
extern crate spin;
extern crate crossbeam_queue;
extern crate crossbeam_epoch;
extern crate lockfree;
pub mod cmdoptions;
pub mod benchmark;
pub mod kernels;
pub mod sync_queue;
pub mod custom_queue;
pub mod dirty_queue;
pub mod epoch_queue;
use kernels::{WorkloadType};
use benchmark::{Benchmark};
use log::{info};
Expand Down
30 changes: 7 additions & 23 deletions src/sync_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use std::collections::VecDeque;
use std::sync::Mutex;
use spin::Mutex as Spinlock;
use crossbeam_queue::SegQueue;
use crossbeam_epoch::sync::queue::Queue as CBEpochQueue;
use lockfree::queue::Queue as LFQueue;
use custom_queue::CustomQueue;
use dirty_queue::DirtyQueue;
use epoch_queue::EpochQueue;

pub trait SyncQueue<T>: Send + Sync {
fn pop(&self) -> Option<T>;
Expand All @@ -16,7 +18,8 @@ pub enum ImplType {
SpinLock,
Crossbeam,
Lockfree,
Custom, // TODO eventually
Dirty,
Epoch,
}

/// Constructor function for building queues given an ImplType.
Expand All @@ -26,7 +29,8 @@ pub fn create_impl<T: 'static + Sync + Send>(t: &ImplType) -> Box<dyn SyncQueue:
ImplType::SpinLock => Box::new(SpinQueue::<T>::new()),
ImplType::Crossbeam => Box::new(CrossbeamQueue::<T>::new()),
ImplType::Lockfree => Box::new(LockfreeQueue::<T>::new()),
ImplType::Custom => Box::new(OurQueue::<T>::new()),
ImplType::Dirty => Box::new(DirtyQueue::<T>::new()),
ImplType::Epoch => Box::new(EpochQueue::<T>::new()),
}
}

Expand Down Expand Up @@ -123,23 +127,3 @@ impl<T: Send + Sync> SyncQueue<T> for LockfreeQueue<T> {
}
}

struct OurQueue<T> {
q: CustomQueue<T>,
}

impl<T> OurQueue<T> {
pub fn new() -> OurQueue<T> {
OurQueue { q: CustomQueue::new(), }
}
}


impl<T: Send + Sync> SyncQueue<T> for OurQueue<T> {
fn pop(&self) -> Option<T> {
self.q.pop()
}

fn push(&self, elem: T) {
self.q.push(elem)
}
}

0 comments on commit a19c33c

Please sign in to comment.