Skip to content

Commit

Permalink
Use AtomicU64 for head/tail index in deque, channel, and queues
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Jan 23, 2022
1 parent 85d0bdc commit 2a94555
Show file tree
Hide file tree
Showing 23 changed files with 334 additions and 101 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ default-features = false
optional = true

[dependencies.crossbeam-utils]
version = "0.8.5"
version = "0.8.6"
path = "./crossbeam-utils"
default-features = false

Expand Down
3 changes: 2 additions & 1 deletion ci/no_atomic.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ cat >"${file}" <<EOF
// This file is @generated by $(basename "$0").
// It is not intended for manual editing.
#[allow(dead_code)] // Only crossbeam-{epoch,queue,skiplist,utils} use this.
const NO_ATOMIC_CAS: &[&str] = &[
EOF
for target in "${no_atomic_cas[@]}"; do
Expand All @@ -49,7 +50,7 @@ done
cat >>"${file}" <<EOF
];
#[allow(dead_code)] // Only crossbeam-utils uses this.
#[allow(dead_code)] // Only crossbeam-{channel,deque,queue,utils} use this.
const NO_ATOMIC_64: &[&str] = &[
EOF
for target in "${no_atomic_64[@]}"; do
Expand Down
2 changes: 1 addition & 1 deletion crossbeam-channel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ std = ["crossbeam-utils/std"]
cfg-if = "1"

[dependencies.crossbeam-utils]
version = "0.8"
version = "0.8.6"
path = "../crossbeam-utils"
default-features = false
optional = true
Expand Down
43 changes: 43 additions & 0 deletions crossbeam-channel/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// The rustc-cfg listed below are considered public API, but it is *unstable*
// and outside of the normal semver guarantees:
//
// - `crossbeam_no_atomic_64`
// Assume the target does *not* support AtomicU64/AtomicI64.
// This is usually detected automatically by the build script, but you may
// need to enable it manually when building for custom targets or using
// non-cargo build systems that don't run the build script.
//
// With the exceptions mentioned above, the rustc-cfg emitted by the build
// script are *not* public API.

#![warn(rust_2018_idioms)]

use std::env;

include!("no_atomic.rs");

fn main() {
let target = match env::var("TARGET") {
Ok(target) => target,
Err(e) => {
println!(
"cargo:warning={}: unable to get TARGET environment variable: {}",
env!("CARGO_PKG_NAME"),
e
);
return;
}
};

// Note that this is `no_*`, not `has_*`. This allows treating
// "max-atomic-width" as 64 when the build script doesn't run. This is
// needed for compatibility with non-cargo build systems that don't run the
// build script.
if NO_ATOMIC_64.contains(&&*target) {
println!("cargo:rustc-cfg=crossbeam_no_atomic_64");
} else {
// Otherwise, assuming `"max-atomic-width" == 64` or `"max-atomic-width" == 128`.
}

println!("cargo:rerun-if-changed=no_atomic.rs");
}
1 change: 1 addition & 0 deletions crossbeam-channel/no_atomic.rs
39 changes: 20 additions & 19 deletions crossbeam-channel/src/flavors/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::atomic::{self, AtomicUsize, Ordering};
use std::sync::atomic::{self, Ordering};
use std::time::Instant;

use crossbeam_utils::{Backoff, CachePadded};

use crate::context::Context;
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::select::{Operation, SelectHandle, Selected, Token};
use crate::utils::AtomicU64;
use crate::waker::SyncWaker;

/// A slot in a channel.
struct Slot<T> {
/// The current stamp.
stamp: AtomicUsize,
stamp: AtomicU64,

/// The message in this slot.
msg: UnsafeCell<MaybeUninit<T>>,
Expand All @@ -37,7 +38,7 @@ pub(crate) struct ArrayToken {
slot: *const u8,

/// Stamp to store into the slot after reading or writing.
stamp: usize,
stamp: u64,
}

impl Default for ArrayToken {
Expand All @@ -55,20 +56,20 @@ pub(crate) struct Channel<T> {
/// The head of the channel.
///
/// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
/// packed into a single `usize`. The lower bits represent the index, while the upper bits
/// packed into a single `u64`. The lower bits represent the index, while the upper bits
/// represent the lap. The mark bit in the head is always zero.
///
/// Messages are popped from the head of the channel.
head: CachePadded<AtomicUsize>,
head: CachePadded<AtomicU64>,

/// The tail of the channel.
///
/// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
/// packed into a single `usize`. The lower bits represent the index, while the upper bits
/// packed into a single `u64`. The lower bits represent the index, while the upper bits
/// represent the lap. The mark bit indicates that the channel is disconnected.
///
/// Messages are pushed into the tail of the channel.
tail: CachePadded<AtomicUsize>,
tail: CachePadded<AtomicU64>,

/// The buffer holding slots.
buffer: Box<[Slot<T>]>,
Expand All @@ -77,10 +78,10 @@ pub(crate) struct Channel<T> {
cap: usize,

/// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
one_lap: usize,
one_lap: u64,

/// If this bit is set in the tail, that means the channel is disconnected.
mark_bit: usize,
mark_bit: u64,

/// Senders waiting while the channel is full.
senders: SyncWaker,
Expand All @@ -95,7 +96,7 @@ impl<T> Channel<T> {
assert!(cap > 0, "capacity must be positive");

// Compute constants `mark_bit` and `one_lap`.
let mark_bit = (cap + 1).next_power_of_two();
let mark_bit = (cap as u64 + 1).next_power_of_two();
let one_lap = mark_bit * 2;

// Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
Expand All @@ -105,11 +106,11 @@ impl<T> Channel<T> {

// Allocate a buffer of `cap` slots initialized
// with stamps.
let buffer: Box<[Slot<T>]> = (0..cap)
let buffer: Box<[Slot<T>]> = (0..cap as u64)
.map(|i| {
// Set the stamp to `{ lap: 0, mark: 0, index: i }`.
Slot {
stamp: AtomicUsize::new(i),
stamp: AtomicU64::new(i),
msg: UnsafeCell::new(MaybeUninit::uninit()),
}
})
Expand All @@ -120,8 +121,8 @@ impl<T> Channel<T> {
cap,
one_lap,
mark_bit,
head: CachePadded::new(AtomicUsize::new(head)),
tail: CachePadded::new(AtomicUsize::new(tail)),
head: CachePadded::new(AtomicU64::new(head)),
tail: CachePadded::new(AtomicU64::new(tail)),
senders: SyncWaker::new(),
receivers: SyncWaker::new(),
}
Expand Down Expand Up @@ -151,7 +152,7 @@ impl<T> Channel<T> {
}

// Deconstruct the tail.
let index = tail & (self.mark_bit - 1);
let index = (tail & (self.mark_bit - 1)) as usize;
let lap = tail & !(self.one_lap - 1);

// Inspect the corresponding slot.
Expand Down Expand Up @@ -234,7 +235,7 @@ impl<T> Channel<T> {

loop {
// Deconstruct the head.
let index = head & (self.mark_bit - 1);
let index = (head & (self.mark_bit - 1)) as usize;
let lap = head & !(self.one_lap - 1);

// Inspect the corresponding slot.
Expand Down Expand Up @@ -452,8 +453,8 @@ impl<T> Channel<T> {

// If the tail didn't change, we've got consistent values to work with.
if self.tail.load(Ordering::SeqCst) == tail {
let hix = head & (self.mark_bit - 1);
let tix = tail & (self.mark_bit - 1);
let hix = (head & (self.mark_bit - 1)) as usize;
let tix = (tail & (self.mark_bit - 1)) as usize;

return if hix < tix {
tix - hix
Expand Down Expand Up @@ -522,7 +523,7 @@ impl<T> Channel<T> {
impl<T> Drop for Channel<T> {
fn drop(&mut self) {
// Get the index of the head.
let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1);
let hix = (self.head.load(Ordering::Relaxed) & (self.mark_bit - 1)) as usize;

// Loop over all slots that hold a message and drop them.
for i in 0..self.len() {
Expand Down
31 changes: 16 additions & 15 deletions crossbeam-channel/src/flavors/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crossbeam_utils::{Backoff, CachePadded};
use crate::context::Context;
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::select::{Operation, SelectHandle, Selected, Token};
use crate::utils::AtomicU64;
use crate::waker::SyncWaker;

// TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the
Expand All @@ -29,15 +30,15 @@ const READ: usize = 2;
const DESTROY: usize = 4;

// Each block covers one "lap" of indices.
const LAP: usize = 32;
const LAP: u64 = 32;
// The maximum number of messages a block can hold.
const BLOCK_CAP: usize = LAP - 1;
const BLOCK_CAP: usize = LAP as usize - 1;
// How many lower bits are reserved for metadata.
const SHIFT: usize = 1;
const SHIFT: u64 = 1;
// Has two different purposes:
// * If set in head, indicates that the block is not the last one.
// * If set in tail, indicates that the channel is disconnected.
const MARK_BIT: usize = 1;
const MARK_BIT: u64 = 1;

/// A slot in a block.
struct Slot<T> {
Expand Down Expand Up @@ -66,7 +67,7 @@ struct Block<T> {
next: AtomicPtr<Block<T>>,

/// Slots for messages.
slots: [Slot<T>; BLOCK_CAP],
slots: [Slot<T>; BLOCK_CAP as usize],
}

impl<T> Block<T> {
Expand Down Expand Up @@ -97,7 +98,7 @@ impl<T> Block<T> {
unsafe fn destroy(this: *mut Block<T>, start: usize) {
// It is not necessary to set the `DESTROY` bit in the last slot because that slot has
// begun destruction of the block.
for i in start..BLOCK_CAP - 1 {
for i in start..BLOCK_CAP as usize - 1 {
let slot = (*this).slots.get_unchecked(i);

// Mark the `DESTROY` bit if a thread is still using the slot.
Expand All @@ -118,7 +119,7 @@ impl<T> Block<T> {
#[derive(Debug)]
struct Position<T> {
/// The index in the channel.
index: AtomicUsize,
index: AtomicU64,

/// The block in the linked list.
block: AtomicPtr<Block<T>>,
Expand Down Expand Up @@ -171,11 +172,11 @@ impl<T> Channel<T> {
Channel {
head: CachePadded::new(Position {
block: AtomicPtr::new(ptr::null_mut()),
index: AtomicUsize::new(0),
index: AtomicU64::new(0),
}),
tail: CachePadded::new(Position {
block: AtomicPtr::new(ptr::null_mut()),
index: AtomicUsize::new(0),
index: AtomicU64::new(0),
}),
receivers: SyncWaker::new(),
_marker: PhantomData,
Expand Down Expand Up @@ -207,7 +208,7 @@ impl<T> Channel<T> {
}

// Calculate the offset of the index into the block.
let offset = (tail >> SHIFT) % LAP;
let offset = ((tail >> SHIFT) % LAP) as usize;

// If we reached the end of the block, wait until the next one is installed.
if offset == BLOCK_CAP {
Expand Down Expand Up @@ -302,7 +303,7 @@ impl<T> Channel<T> {

loop {
// Calculate the offset of the index into the block.
let offset = (head >> SHIFT) % LAP;
let offset = ((head >> SHIFT) % LAP) as usize;

// If we reached the end of the block, wait until the next one is installed.
if offset == BLOCK_CAP {
Expand Down Expand Up @@ -520,7 +521,7 @@ impl<T> Channel<T> {
head >>= SHIFT;

// Return the difference minus the number of blocks between tail and head.
return tail - head - tail / LAP;
return (tail - head - tail / LAP) as usize;
}
}
}
Expand Down Expand Up @@ -567,7 +568,7 @@ impl<T> Channel<T> {
let backoff = Backoff::new();
let mut tail = self.tail.index.load(Ordering::Acquire);
loop {
let offset = (tail >> SHIFT) % LAP;
let offset = ((tail >> SHIFT) % LAP) as usize;
if offset != BLOCK_CAP {
break;
}
Expand All @@ -585,7 +586,7 @@ impl<T> Channel<T> {
unsafe {
// Drop all messages between head and tail and deallocate the heap-allocated blocks.
while head >> SHIFT != tail >> SHIFT {
let offset = (head >> SHIFT) % LAP;
let offset = ((head >> SHIFT) % LAP) as usize;

if offset < BLOCK_CAP {
// Drop the message in the slot.
Expand Down Expand Up @@ -645,7 +646,7 @@ impl<T> Drop for Channel<T> {
unsafe {
// Drop all messages between head and tail and deallocate the heap-allocated blocks.
while head != tail {
let offset = (head >> SHIFT) % LAP;
let offset = ((head >> SHIFT) % LAP) as usize;

if offset < BLOCK_CAP {
// Drop the message in the slot.
Expand Down
40 changes: 40 additions & 0 deletions crossbeam-channel/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,46 @@ pub(crate) fn sleep_until(deadline: Option<Instant>) {
}
}

#[cfg(not(crossbeam_no_atomic_64))]
pub(crate) use core::sync::atomic::AtomicU64;

#[cfg(crossbeam_no_atomic_64)]
#[derive(Debug)]
#[repr(transparent)]
pub(crate) struct AtomicU64 {
inner: crossbeam_utils::atomic::AtomicCell<u64>,
}

#[cfg(crossbeam_no_atomic_64)]
impl AtomicU64 {
pub(crate) const fn new(v: u64) -> Self {
Self {
inner: crossbeam_utils::atomic::AtomicCell::new(v),
}
}
pub(crate) fn load(&self, _order: Ordering) -> u64 {
self.inner.load()
}
pub(crate) fn store(&self, val: u64, _order: Ordering) {
self.inner.store(val);
}
pub(crate) fn compare_exchange_weak(
&self,
current: u64,
new: u64,
_success: Ordering,
_failure: Ordering,
) -> Result<u64, u64> {
self.inner.compare_exchange(current, new)
}
pub(crate) fn fetch_add(&self, val: u64, _order: Ordering) -> u64 {
self.inner.fetch_add(val)
}
pub(crate) fn fetch_or(&self, val: u64, _order: Ordering) -> u64 {
self.inner.fetch_or(val)
}
}

/// A simple spinlock.
pub(crate) struct Spinlock<T> {
flag: AtomicBool,
Expand Down
Loading

0 comments on commit 2a94555

Please sign in to comment.