From bf1ff3e47b44e69a667daa9a79dd99cfe40f3f5b Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Fri, 8 Mar 2024 15:42:05 +0900 Subject: [PATCH] [wip] Cumulative micro opts (1-2% perf. gain) --- crossbeam-channel/src/flavors/list.rs | 161 ++++++++++++++------------ 1 file changed, 84 insertions(+), 77 deletions(-) diff --git a/crossbeam-channel/src/flavors/list.rs b/crossbeam-channel/src/flavors/list.rs index 34957fad4..e0e1325fe 100644 --- a/crossbeam-channel/src/flavors/list.rs +++ b/crossbeam-channel/src/flavors/list.rs @@ -5,7 +5,7 @@ use std::cell::UnsafeCell; use std::marker::PhantomData; use std::mem::MaybeUninit; use std::ptr; -use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; +use std::sync::atomic::{self, AtomicPtr, AtomicU8, AtomicUsize, Ordering}; use std::time::Instant; use crossbeam_utils::{Backoff, CachePadded}; @@ -25,14 +25,15 @@ use crate::waker::SyncWaker; // * If a message has been written into the slot, `WRITE` is set. // * If a message has been read from the slot, `READ` is set. // * If the block is being destroyed, `DESTROY` is set. -const WRITE: usize = 1; -const READ: usize = 2; -const DESTROY: usize = 4; +const WRITE: u8 = 1; +const READ: u8 = 2; +const DESTROY: u8 = 4; // Each block covers one "lap" of indices. const LAP: usize = 32; // The maximum number of messages a block can hold. const BLOCK_CAP: usize = LAP - 1; +const NEAR_BLOCK_CAP: usize = LAP - 2; // How many lower bits are reserved for metadata. const SHIFT: usize = 1; // Has two different purposes: @@ -41,20 +42,12 @@ const SHIFT: usize = 1; const MARK_BIT: usize = 1; /// A slot in a block. -struct Slot { - /// The message. - msg: UnsafeCell>, - - /// The state of the slot. - state: AtomicUsize, +struct Slot<'a, T> { + state: &'a AtomicU8, + msg: &'a UnsafeCell, } -impl Slot { - const UNINIT: Self = Self { - msg: UnsafeCell::new(MaybeUninit::uninit()), - state: AtomicUsize::new(0), - }; - +impl Slot<'_, T> { /// Waits until a message is written into the slot. fn wait_write(&self) { let backoff = Backoff::new(); @@ -71,16 +64,23 @@ struct Block { /// The next block in the linked list. next: AtomicPtr>, - /// Slots for messages. - slots: [Slot; BLOCK_CAP], + /// states for slots. + states: [AtomicU8; BLOCK_CAP], + + /// messages for slots. + msgs: MaybeUninit<[UnsafeCell; BLOCK_CAP]>, } impl Block { /// Creates an empty block. fn new() -> Self { + #[allow(clippy::declare_interior_mutable_const)] + const UNINIT_STATE: AtomicU8 = AtomicU8::new(0); + Self { next: AtomicPtr::new(ptr::null_mut()), - slots: [Slot::UNINIT; BLOCK_CAP], + states: [UNINIT_STATE; BLOCK_CAP], + msgs: MaybeUninit::uninit(), } } @@ -96,12 +96,19 @@ impl Block { } } + unsafe fn get_slot_unchecked(&self, i: usize) -> Slot<'_, T> { + Slot { + msg: unsafe { self.msgs.assume_init_ref().get_unchecked(i) }, + state: unsafe { self.states.get_unchecked(i) }, + } + } + /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. unsafe fn destroy(this: *mut Self, start: usize) { // It is not necessary to set the `DESTROY` bit in the last slot because that slot has // begun destruction of the block. for i in start..BLOCK_CAP - 1 { - let slot = unsafe { (*this).slots.get_unchecked(i) }; + let slot = unsafe { (*this).get_slot_unchecked(i) }; // Mark the `DESTROY` bit if a thread is still using the slot. if slot.state.load(Ordering::Acquire) & READ == 0 @@ -196,39 +203,32 @@ impl Channel { } /// Attempts to reserve a slot for sending a message. - fn start_send(&self, token: &mut Token) -> bool { + fn start_send(&self, token: &mut Token) { let backoff = Backoff::new(); - let mut tail = self.tail.index.load(Ordering::Acquire); - let mut block = self.tail.block.load(Ordering::Acquire); - let mut next_block = None; + let mut next_block = ptr::null_mut::>(); + let mut tail; + let mut block; loop { + tail = self.tail.index.load(Ordering::Acquire); + block = self.tail.block.load(Ordering::Acquire); + // Check if the channel is disconnected. if tail & MARK_BIT != 0 { token.list.block = ptr::null(); - return true; + break; } // Calculate the offset of the index into the block. let offset = (tail >> SHIFT) % LAP; - // If we reached the end of the block, wait until the next one is installed. - if offset == BLOCK_CAP { + if offset > NEAR_BLOCK_CAP { + // If we reached the end of the block, wait until the next one is installed. backoff.snooze(); - tail = self.tail.index.load(Ordering::Acquire); - block = self.tail.block.load(Ordering::Acquire); continue; - } - - // If we're going to have to install the next block, allocate it in advance in order to - // make the wait for other threads as short as possible. - if offset + 1 == BLOCK_CAP && next_block.is_none() { - next_block = Some(Box::new(Block::::new())); - } - - // If this is the first message to be sent into the channel, we need to allocate the - // first block and install it. - if block.is_null() { + } else if block.is_null() { + // If this is the first message to be sent into the channel, we need to allocate + // the first block and install it. let new = Box::into_raw(Box::new(Block::::new())); if self @@ -240,16 +240,17 @@ impl Channel { self.head.block.store(new, Ordering::Release); block = new; } else { - next_block = unsafe { Some(Box::from_raw(new)) }; - tail = self.tail.index.load(Ordering::Acquire); - block = self.tail.block.load(Ordering::Acquire); + next_block = new; continue; } + } else if offset == NEAR_BLOCK_CAP && next_block.is_null() { + // If we're going to have to install the next block, allocate it in advance in + // order to make the wait for other threads as short as possible. + next_block = Box::into_raw(Box::new(Block::::new())); } - let new_tail = tail + (1 << SHIFT); - // Try advancing the tail forward. + let new_tail = tail + (1 << SHIFT); match self.tail.index.compare_exchange_weak( tail, new_tail, @@ -258,24 +259,26 @@ impl Channel { ) { Ok(_) => unsafe { // If we've reached the end of the block, install the next one. - if offset + 1 == BLOCK_CAP { - let next_block = Box::into_raw(next_block.unwrap()); - self.tail.block.store(next_block, Ordering::Release); + if offset == NEAR_BLOCK_CAP { + let n = std::mem::replace(&mut next_block, ptr::null_mut()); + self.tail.block.store(n, Ordering::Release); self.tail.index.fetch_add(1 << SHIFT, Ordering::Release); - (*block).next.store(next_block, Ordering::Release); + (*block).next.store(n, Ordering::Release); } token.list.block = block as *const u8; token.list.offset = offset; - return true; + break; }, - Err(t) => { - tail = t; - block = self.tail.block.load(Ordering::Acquire); + Err(_) => { backoff.spin(); } } } + + if !next_block.is_null() { + drop(unsafe { Box::from_raw(next_block) }); + } } /// Writes a message into the channel. @@ -288,8 +291,8 @@ impl Channel { // Write the message into the slot. let block = token.list.block.cast::>(); let offset = token.list.offset; - let slot = unsafe { (*block).slots.get_unchecked(offset) }; - unsafe { slot.msg.get().write(MaybeUninit::new(msg)) } + let slot = unsafe { (*block).get_slot_unchecked(offset) }; + unsafe { slot.msg.get().write(msg) } slot.state.fetch_or(WRITE, Ordering::Release); // Wake a sleeping receiver. @@ -300,18 +303,19 @@ impl Channel { /// Attempts to reserve a slot for receiving a message. fn start_recv(&self, token: &mut Token) -> bool { let backoff = Backoff::new(); - let mut head = self.head.index.load(Ordering::Acquire); - let mut block = self.head.block.load(Ordering::Acquire); + let mut head; + let mut block; loop { + head = self.head.index.load(Ordering::Acquire); + block = self.head.block.load(Ordering::Acquire); + // Calculate the offset of the index into the block. let offset = (head >> SHIFT) % LAP; // If we reached the end of the block, wait until the next one is installed. if offset == BLOCK_CAP { backoff.snooze(); - head = self.head.index.load(Ordering::Acquire); - block = self.head.block.load(Ordering::Acquire); continue; } @@ -344,8 +348,6 @@ impl Channel { // In that case, just wait until it gets initialized. if block.is_null() { backoff.snooze(); - head = self.head.index.load(Ordering::Acquire); - block = self.head.block.load(Ordering::Acquire); continue; } @@ -358,7 +360,7 @@ impl Channel { ) { Ok(_) => unsafe { // If we've reached the end of the block, move to the next one. - if offset + 1 == BLOCK_CAP { + if offset == NEAR_BLOCK_CAP { let next = (*block).wait_next(); let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT); if !(*next).next.load(Ordering::Relaxed).is_null() { @@ -373,9 +375,7 @@ impl Channel { token.list.offset = offset; return true; }, - Err(h) => { - head = h; - block = self.head.block.load(Ordering::Acquire); + Err(_) => { backoff.spin(); } } @@ -392,14 +392,14 @@ impl Channel { // Read the message. let block = token.list.block as *mut Block; let offset = token.list.offset; - let slot = unsafe { (*block).slots.get_unchecked(offset) }; + let slot = unsafe { (*block).get_slot_unchecked(offset) }; slot.wait_write(); - let msg = unsafe { slot.msg.get().read().assume_init() }; + let msg = unsafe { slot.msg.get().read() }; // Destroy the block if we've reached the end, or if another thread wanted to destroy but // couldn't because we were busy reading from the slot. unsafe { - if offset + 1 == BLOCK_CAP { + if offset == NEAR_BLOCK_CAP { Block::destroy(block, 0); } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { Block::destroy(block, offset + 1); @@ -423,8 +423,10 @@ impl Channel { msg: T, _deadline: Option, ) -> Result<(), SendTimeoutError> { - let token = &mut Token::default(); - assert!(self.start_send(token)); + let token = MaybeUninit::uninit(); + let token = &mut unsafe { token.assume_init() }; + + self.start_send(token); unsafe { self.write(token, msg) .map_err(SendTimeoutError::Disconnected) @@ -433,7 +435,8 @@ impl Channel { /// Attempts to receive a message without blocking. pub(crate) fn try_recv(&self) -> Result { - let token = &mut Token::default(); + let token = MaybeUninit::uninit(); + let token = &mut unsafe { token.assume_init() }; if self.start_recv(token) { unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } @@ -444,7 +447,9 @@ impl Channel { /// Receives a message from the channel. pub(crate) fn recv(&self, deadline: Option) -> Result { - let token = &mut Token::default(); + let token = MaybeUninit::uninit(); + let token = &mut unsafe { token.assume_init() }; + loop { // Try receiving a message several times. let backoff = Backoff::new(); @@ -609,9 +614,9 @@ impl Channel { if offset < BLOCK_CAP { // Drop the message in the slot. - let slot = (*block).slots.get_unchecked(offset); + let slot = (*block).get_slot_unchecked(offset); slot.wait_write(); - (*slot.msg.get()).assume_init_drop(); + drop(ptr::read(slot.msg.get())); } else { (*block).wait_next(); // Deallocate the block and move to the next one. @@ -667,8 +672,9 @@ impl Drop for Channel { if offset < BLOCK_CAP { // Drop the message in the slot. - let slot = (*block).slots.get_unchecked(offset); - (*slot.msg.get()).assume_init_drop(); + let msg = + ptr::read((*block).msgs.assume_init_ref().get_unchecked(offset).get()); + drop(msg); } else { // Deallocate the block and move to the next one. let next = *(*block).next.get_mut(); @@ -731,7 +737,8 @@ impl SelectHandle for Receiver<'_, T> { impl SelectHandle for Sender<'_, T> { fn try_select(&self, token: &mut Token) -> bool { - self.0.start_send(token) + self.0.start_send(token); + true } fn deadline(&self) -> Option {