Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport libstd style #106

Merged
merged 9 commits into from
Dec 8, 2018
225 changes: 111 additions & 114 deletions core/src/parking_lot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ struct ThreadData {
parked_with_timeout: Cell<bool>,

// Extra data for deadlock detection
// TODO: once supported in stable replace with #[cfg...] & remove dummy struct/impl
#[allow(dead_code)]
#[cfg(feature = "deadlock_detection")]
deadlock_data: deadlock::DeadlockData,
}

Expand All @@ -157,13 +156,17 @@ impl ThreadData {
unpark_token: Cell::new(DEFAULT_UNPARK_TOKEN),
park_token: Cell::new(DEFAULT_PARK_TOKEN),
parked_with_timeout: Cell::new(false),
#[cfg(feature = "deadlock_detection")]
deadlock_data: deadlock::DeadlockData::new(),
}
}
}

// Returns a ThreadData structure for the current thread
unsafe fn get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData {
// Invokes the given closure with a reference to the current thread `ThreadData`.
fn with_thread_data<F, T>(f: F) -> T
where
F: FnOnce(&ThreadData) -> T,
{
// Try to read from thread-local storage, but return None if the TLS has
// already been destroyed.
#[cfg(has_localkey_try_with)]
Expand All @@ -177,14 +180,19 @@ unsafe fn get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData {

// Unlike word_lock::ThreadData, parking_lot::ThreadData is always expensive
// to construct. Try to use a thread-local version if possible.
let mut thread_data_ptr = ptr::null();
thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
if let Some(tls) = try_get_tls(&THREAD_DATA) {
return &*tls;
if let Some(tls_thread_data) = try_get_tls(&THREAD_DATA) {
thread_data_ptr = tls_thread_data;
}

// Otherwise just create a ThreadData on the stack
*local = Some(ThreadData::new());
local.as_ref().unwrap()
let mut thread_data_storage = None;
if thread_data_ptr.is_null() {
thread_data_ptr = thread_data_storage.get_or_insert_with(ThreadData::new);
}

f(unsafe { &*thread_data_ptr })
}

impl Drop for ThreadData {
Expand Down Expand Up @@ -579,106 +587,105 @@ unsafe fn park_internal(
timeout: Option<Instant>,
) -> ParkResult {
// Grab our thread data, this also ensures that the hash table exists
let mut thread_data = None;
let thread_data = get_thread_data(&mut thread_data);
with_thread_data(|thread_data| {
// Lock the bucket for the given key
let bucket = lock_bucket(key);

// Lock the bucket for the given key
let bucket = lock_bucket(key);
// If the validation function fails, just return
if !validate() {
bucket.mutex.unlock();
return ParkResult::Invalid;
}

// If the validation function fails, just return
if !validate() {
// Append our thread data to the queue and unlock the bucket
thread_data.parked_with_timeout.set(timeout.is_some());
thread_data.next_in_queue.set(ptr::null());
thread_data.key.store(key, Ordering::Relaxed);
thread_data.park_token.set(park_token);
thread_data.parker.prepare_park();
if !bucket.queue_head.get().is_null() {
(*bucket.queue_tail.get()).next_in_queue.set(thread_data);
} else {
bucket.queue_head.set(thread_data);
}
bucket.queue_tail.set(thread_data);
bucket.mutex.unlock();
return ParkResult::Invalid;
}

// Append our thread data to the queue and unlock the bucket
thread_data.parked_with_timeout.set(timeout.is_some());
thread_data.next_in_queue.set(ptr::null());
thread_data.key.store(key, Ordering::Relaxed);
thread_data.park_token.set(park_token);
thread_data.parker.prepare_park();
if !bucket.queue_head.get().is_null() {
(*bucket.queue_tail.get()).next_in_queue.set(thread_data);
} else {
bucket.queue_head.set(thread_data);
}
bucket.queue_tail.set(thread_data);
bucket.mutex.unlock();
// Invoke the pre-sleep callback
before_sleep();

// Park our thread and determine whether we were woken up by an unpark or by
// our timeout. Note that this isn't precise: we can still be unparked since
// we are still in the queue.
let unparked = match timeout {
Some(timeout) => thread_data.parker.park_until(timeout),
None => {
thread_data.parker.park();
// call deadlock detection on_unpark hook
deadlock::on_unpark(thread_data);
true
}
};

// Invoke the pre-sleep callback
before_sleep();

// Park our thread and determine whether we were woken up by an unpark or by
// our timeout. Note that this isn't precise: we can still be unparked since
// we are still in the queue.
let unparked = match timeout {
Some(timeout) => thread_data.parker.park_until(timeout),
None => {
thread_data.parker.park();
// call deadlock detection on_unpark hook
deadlock::on_unpark(thread_data);
true
// If we were unparked, return now
if unparked {
return ParkResult::Unparked(thread_data.unpark_token.get());
}
};

// If we were unparked, return now
if unparked {
return ParkResult::Unparked(thread_data.unpark_token.get());
}

// Lock our bucket again. Note that the hashtable may have been rehashed in
// the meantime. Our key may also have changed if we were requeued.
let (key, bucket) = lock_bucket_checked(&thread_data.key);
// Lock our bucket again. Note that the hashtable may have been rehashed in
// the meantime. Our key may also have changed if we were requeued.
let (key, bucket) = lock_bucket_checked(&thread_data.key);

// Now we need to check again if we were unparked or timed out. Unlike the
// last check this is precise because we hold the bucket lock.
if !thread_data.parker.timed_out() {
bucket.mutex.unlock();
return ParkResult::Unparked(thread_data.unpark_token.get());
}
// Now we need to check again if we were unparked or timed out. Unlike the
// last check this is precise because we hold the bucket lock.
if !thread_data.parker.timed_out() {
bucket.mutex.unlock();
return ParkResult::Unparked(thread_data.unpark_token.get());
}

// We timed out, so we now need to remove our thread from the queue
let mut link = &bucket.queue_head;
let mut current = bucket.queue_head.get();
let mut previous = ptr::null();
while !current.is_null() {
if current == thread_data {
let next = (*current).next_in_queue.get();
link.set(next);
let mut was_last_thread = true;
if bucket.queue_tail.get() == current {
bucket.queue_tail.set(previous);
} else {
// Scan the rest of the queue to see if there are any other
// entries with the given key.
let mut scan = next;
while !scan.is_null() {
if (*scan).key.load(Ordering::Relaxed) == key {
was_last_thread = false;
break;
// We timed out, so we now need to remove our thread from the queue
let mut link = &bucket.queue_head;
let mut current = bucket.queue_head.get();
let mut previous = ptr::null();
while !current.is_null() {
if current == thread_data {
let next = (*current).next_in_queue.get();
link.set(next);
let mut was_last_thread = true;
if bucket.queue_tail.get() == current {
bucket.queue_tail.set(previous);
} else {
// Scan the rest of the queue to see if there are any other
// entries with the given key.
let mut scan = next;
while !scan.is_null() {
if (*scan).key.load(Ordering::Relaxed) == key {
was_last_thread = false;
break;
}
scan = (*scan).next_in_queue.get();
}
scan = (*scan).next_in_queue.get();
}
}

// Callback to indicate that we timed out, and whether we were the
// last thread on the queue.
timed_out(key, was_last_thread);
break;
} else {
link = &(*current).next_in_queue;
previous = current;
current = link.get();
// Callback to indicate that we timed out, and whether we were the
// last thread on the queue.
timed_out(key, was_last_thread);
break;
} else {
link = &(*current).next_in_queue;
previous = current;
current = link.get();
}
}
}

// There should be no way for our thread to have been removed from the queue
// if we timed out.
debug_assert!(!current.is_null());
// There should be no way for our thread to have been removed from the queue
// if we timed out.
debug_assert!(!current.is_null());

// Unlock the bucket, we are done
bucket.mutex.unlock();
ParkResult::TimedOut
// Unlock the bucket, we are done
bucket.mutex.unlock();
ParkResult::TimedOut
})
}

/// Unparks one thread from the queue associated with the given key.
Expand Down Expand Up @@ -1102,16 +1109,6 @@ pub mod deadlock {
#[cfg(feature = "deadlock_detection")]
pub(super) use super::deadlock_impl::DeadlockData;

#[cfg(not(feature = "deadlock_detection"))]
pub(super) struct DeadlockData {}

#[cfg(not(feature = "deadlock_detection"))]
impl DeadlockData {
pub(super) fn new() -> Self {
DeadlockData {}
}
}

/// Acquire a resource identified by key in the deadlock detector
/// Noop if deadlock_detection feature isn't enabled.
/// Note: Call after the resource is acquired
Expand Down Expand Up @@ -1149,7 +1146,7 @@ pub mod deadlock {

#[cfg(feature = "deadlock_detection")]
mod deadlock_impl {
use super::{get_hashtable, get_thread_data, lock_bucket, ThreadData, NUM_THREADS};
use super::{get_hashtable, lock_bucket, with_thread_data, ThreadData, NUM_THREADS};
use backtrace::Backtrace;
use petgraph;
use petgraph::graphmap::DiGraphMap;
Expand Down Expand Up @@ -1222,19 +1219,19 @@ mod deadlock_impl {
}

pub unsafe fn acquire_resource(key: usize) {
let mut thread_data = None;
let thread_data = get_thread_data(&mut thread_data);
(*thread_data.deadlock_data.resources.get()).push(key);
with_thread_data(|thread_data| {
(*thread_data.deadlock_data.resources.get()).push(key);
});
}

pub unsafe fn release_resource(key: usize) {
let mut thread_data = None;
let thread_data = get_thread_data(&mut thread_data);
let resources = &mut (*thread_data.deadlock_data.resources.get());
match resources.iter().rposition(|x| *x == key) {
Some(p) => resources.swap_remove(p),
None => panic!("key {} not found in thread resources", key),
};
with_thread_data(|thread_data| {
let resources = &mut (*thread_data.deadlock_data.resources.get());
match resources.iter().rposition(|x| *x == key) {
Some(p) => resources.swap_remove(p),
None => panic!("key {} not found in thread resources", key),
};
});
}

pub fn check_deadlock() -> Vec<Vec<DeadlockedThread>> {
Expand Down
61 changes: 5 additions & 56 deletions core/src/spinwait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,53 +5,8 @@
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.

#[cfg(unix)]
use libc;
use std::sync::atomic::spin_loop_hint;
#[cfg(not(any(windows, unix)))]
use std::thread;
#[cfg(windows)]
use winapi;

// Yields the rest of the current timeslice to the OS
#[cfg(windows)]
#[inline]
fn thread_yield() {
// Note that this is manually defined here rather than using the definition
// through `winapi`. The `winapi` definition comes from the `synchapi`
// header which enables the "synchronization.lib" library. It turns out,
// however that `Sleep` comes from `kernel32.dll` so this activation isn't
// necessary.
//
// This was originally identified in rust-lang/rust where on MinGW the
// libsynchronization.a library pulls in a dependency on a newer DLL not
// present in older versions of Windows. (see rust-lang/rust#49438)
//
// This is a bit of a hack for now and ideally we'd fix MinGW's own import
// libraries, but that'll probably take a lot longer than patching this here
// and avoiding the `synchapi` feature entirely.
extern "system" {
fn Sleep(a: winapi::shared::minwindef::DWORD);
}
unsafe {
// We don't use SwitchToThread here because it doesn't consider all
// threads in the system and the thread we are waiting for may not get
// selected.
Sleep(0);
}
}
#[cfg(unix)]
#[inline]
fn thread_yield() {
unsafe {
libc::sched_yield();
}
}
#[cfg(not(any(windows, unix)))]
#[inline]
fn thread_yield() {
thread::yield_now();
}
use thread_parker;

// Wastes some CPU time for the given number of iterations,
// using a hint to indicate to the CPU that we are spinning.
Expand All @@ -63,15 +18,16 @@ fn cpu_relax(iterations: u32) {
}

/// A counter used to perform exponential backoff in spin loops.
#[derive(Default)]
pub struct SpinWait {
counter: u32,
}

impl SpinWait {
/// Creates a new `SpinWait`.
#[inline]
pub fn new() -> SpinWait {
SpinWait { counter: 0 }
pub fn new() -> Self {
Self::default()
}

/// Resets a `SpinWait` to its initial state.
Expand All @@ -97,7 +53,7 @@ impl SpinWait {
if self.counter <= 3 {
cpu_relax(1 << self.counter);
} else {
thread_yield();
thread_parker::thread_yield();
}
true
}
Expand All @@ -116,10 +72,3 @@ impl SpinWait {
cpu_relax(1 << self.counter);
}
}

impl Default for SpinWait {
#[inline]
fn default() -> SpinWait {
SpinWait::new()
}
}
Loading