Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Safer thread parkers #110

Merged
merged 9 commits into from
Dec 7, 2018
Merged
70 changes: 25 additions & 45 deletions core/src/thread_parker/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,94 +5,74 @@
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.

use std::cell::Cell;
use std::sync::{Condvar, Mutex, MutexGuard};
//! A simple spin lock based thread parker. Used on platforms without better
//! parking facilities available.

use std::sync::atomic::spin_loop_hint;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Instant;

// Helper type for putting a thread to sleep until some other thread wakes it up
pub struct ThreadParker {
should_park: Cell<bool>,
mutex: Mutex<()>,
condvar: Condvar,
parked: AtomicBool,
}

impl ThreadParker {
pub fn new() -> ThreadParker {
ThreadParker {
should_park: Cell::new(false),
mutex: Mutex::new(()),
condvar: Condvar::new(),
parked: AtomicBool::new(false),
}
}

// Prepares the parker. This should be called before adding it to the queue.
pub unsafe fn prepare_park(&self) {
self.should_park.set(true);
pub fn prepare_park(&self) {
self.parked.store(true, Ordering::Relaxed);
}

// Checks if the park timed out. This should be called while holding the
// queue lock after park_until has returned false.
pub unsafe fn timed_out(&self) -> bool {
// We need to grab the mutex here because another thread may be
// concurrently executing UnparkHandle::unpark, which is done without
// holding the queue lock.
let _lock = self.mutex.lock().unwrap();
self.should_park.get()
pub fn timed_out(&self) -> bool {
self.parked.load(Ordering::Relaxed) != false
}

// Parks the thread until it is unparked. This should be called after it has
// been added to the queue, after unlocking the queue.
pub unsafe fn park(&self) {
let mut lock = self.mutex.lock().unwrap();
while self.should_park.get() {
lock = self.condvar.wait(lock).unwrap();
pub fn park(&self) {
while self.parked.load(Ordering::Acquire) != false {
spin_loop_hint();
}
}

// Parks the thread until it is unparked or the timeout is reached. This
// should be called after it has been added to the queue, after unlocking
// the queue. Returns true if we were unparked and false if we timed out.
pub unsafe fn park_until(&self, timeout: Instant) -> bool {
let mut lock = self.mutex.lock().unwrap();
while self.should_park.get() {
let now = Instant::now();
if timeout <= now {
pub fn park_until(&self, timeout: Instant) -> bool {
while self.parked.load(Ordering::Acquire) != false {
if Instant::now() >= timeout {
return false;
}
let (new_lock, _) = self.condvar.wait_timeout(lock, timeout - now).unwrap();
lock = new_lock;
spin_loop_hint();
}
true
}

// Locks the parker to prevent the target thread from exiting. This is
// necessary to ensure that thread-local ThreadData objects remain valid.
// This should be called while holding the queue lock.
pub unsafe fn unpark_lock(&self) -> UnparkHandle {
UnparkHandle {
thread_parker: self,
_guard: self.mutex.lock().unwrap(),
}
pub fn unpark_lock(&self) -> UnparkHandle {
// We don't need to lock anything, just clear the state
self.parked.store(false, Ordering::Release);
UnparkHandle(())
}
}

// Handle for a thread that is about to be unparked. We need to mark the thread
// as unparked while holding the queue lock, but we delay the actual unparking
// until after the queue lock is released.
pub struct UnparkHandle<'a> {
thread_parker: *const ThreadParker,
_guard: MutexGuard<'a, ()>,
}
pub struct UnparkHandle(());

impl<'a> UnparkHandle<'a> {
impl UnparkHandle {
// Wakes up the parked thread. This should be called after the queue lock is
// released to avoid blocking the queue for too long.
pub unsafe fn unpark(self) {
(*self.thread_parker).should_park.set(false);

// We notify while holding the lock here to avoid races with the target
// thread. In particular, the thread could exit after we unlock the
// mutex, which would make the condvar access invalid memory.
(*self.thread_parker).condvar.notify_one();
}
pub fn unpark(self) {}
}
60 changes: 31 additions & 29 deletions core/src/thread_parker/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// copied, modified, or distributed except according to those terms.

use libc;
use std::ptr;
use std::sync::atomic::{AtomicI32, Ordering};
use std::time::Instant;

Expand Down Expand Up @@ -35,41 +36,28 @@ impl ThreadParker {
}

// Prepares the parker. This should be called before adding it to the queue.
pub unsafe fn prepare_park(&self) {
pub fn prepare_park(&self) {
self.futex.store(1, Ordering::Relaxed);
}

// Checks if the park timed out. This should be called while holding the
// queue lock after park_until has returned false.
pub unsafe fn timed_out(&self) -> bool {
pub fn timed_out(&self) -> bool {
self.futex.load(Ordering::Relaxed) != 0
}

// Parks the thread until it is unparked. This should be called after it has
// been added to the queue, after unlocking the queue.
pub unsafe fn park(&self) {
pub fn park(&self) {
while self.futex.load(Ordering::Acquire) != 0 {
let r = libc::syscall(
libc::SYS_futex,
&self.futex,
FUTEX_WAIT | FUTEX_PRIVATE,
1,
0,
);
debug_assert!(r == 0 || r == -1);
if r == -1 {
debug_assert!(
*libc::__errno_location() == libc::EINTR
|| *libc::__errno_location() == libc::EAGAIN
);
}
self.futex_wait(None);
}
}

// Parks the thread until it is unparked or the timeout is reached. This
// should be called after it has been added to the queue, after unlocking
// the queue. Returns true if we were unparked and false if we timed out.
pub unsafe fn park_until(&self, timeout: Instant) -> bool {
pub fn park_until(&self, timeout: Instant) -> bool {
while self.futex.load(Ordering::Acquire) != 0 {
let now = Instant::now();
if timeout <= now {
Expand All @@ -85,29 +73,42 @@ impl ThreadParker {
tv_sec: diff.as_secs() as libc::time_t,
tv_nsec: diff.subsec_nanos() as tv_nsec_t,
};
let r = libc::syscall(
self.futex_wait(Some(ts));
}
true
}

#[inline]
fn futex_wait(&self, ts: Option<libc::timespec>) {
let ts_ptr = ts
.as_ref()
.map(|ts_ref| ts_ref as *const _)
.unwrap_or(ptr::null());
let r = unsafe {
libc::syscall(
libc::SYS_futex,
&self.futex,
FUTEX_WAIT | FUTEX_PRIVATE,
1,
&ts,
);
debug_assert!(r == 0 || r == -1);
if r == -1 {
ts_ptr,
)
};
debug_assert!(r == 0 || r == -1);
if r == -1 {
unsafe {
debug_assert!(
*libc::__errno_location() == libc::EINTR
|| *libc::__errno_location() == libc::EAGAIN
|| *libc::__errno_location() == libc::ETIMEDOUT
|| (ts.is_some() && *libc::__errno_location() == libc::ETIMEDOUT)
);
}
}
true
}

// Locks the parker to prevent the target thread from exiting. This is
// necessary to ensure that thread-local ThreadData objects remain valid.
// This should be called while holding the queue lock.
pub unsafe fn unpark_lock(&self) -> UnparkHandle {
pub fn unpark_lock(&self) -> UnparkHandle {
// We don't need to lock anything, just clear the state
self.futex.store(0, Ordering::Release);

Expand All @@ -125,13 +126,14 @@ pub struct UnparkHandle {
impl UnparkHandle {
// Wakes up the parked thread. This should be called after the queue lock is
// released to avoid blocking the queue for too long.
pub unsafe fn unpark(self) {
pub fn unpark(self) {
// The thread data may have been freed at this point, but it doesn't
// matter since the syscall will just return EFAULT in that case.
let r = libc::syscall(libc::SYS_futex, self.futex, FUTEX_WAKE | FUTEX_PRIVATE, 1);
let r =
unsafe { libc::syscall(libc::SYS_futex, self.futex, FUTEX_WAKE | FUTEX_PRIVATE, 1) };
debug_assert!(r == 0 || r == 1 || r == -1);
if r == -1 {
debug_assert_eq!(*libc::__errno_location(), libc::EFAULT);
debug_assert_eq!(unsafe { *libc::__errno_location() }, libc::EFAULT);
}
}
}
14 changes: 7 additions & 7 deletions core/src/thread_parker/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,33 +195,33 @@ impl UnparkHandle {

// Returns the current time on the clock used by pthread_cond_t as a timespec.
#[cfg(any(target_os = "macos", target_os = "ios"))]
unsafe fn timespec_now() -> libc::timespec {
let mut now: libc::timeval = mem::uninitialized();
let r = libc::gettimeofday(&mut now, ptr::null_mut());
fn timespec_now() -> libc::timespec {
let mut now: libc::timeval = unsafe { mem::uninitialized() };
let r = unsafe { libc::gettimeofday(&mut now, ptr::null_mut()) };
debug_assert_eq!(r, 0);
libc::timespec {
tv_sec: now.tv_sec,
tv_nsec: now.tv_usec as tv_nsec_t * 1000,
}
}
#[cfg(not(any(target_os = "macos", target_os = "ios")))]
unsafe fn timespec_now() -> libc::timespec {
let mut now: libc::timespec = mem::uninitialized();
fn timespec_now() -> libc::timespec {
let mut now: libc::timespec = unsafe { mem::uninitialized() };
let clock = if cfg!(target_os = "android") {
// Android doesn't support pthread_condattr_setclock, so we need to
// specify the timeout in CLOCK_REALTIME.
libc::CLOCK_REALTIME
} else {
libc::CLOCK_MONOTONIC
};
let r = libc::clock_gettime(clock, &mut now);
let r = unsafe { libc::clock_gettime(clock, &mut now) };
debug_assert_eq!(r, 0);
now
}

// Converts a relative timeout into an absolute timeout in the clock used by
// pthread_cond_t.
unsafe fn timeout_to_timespec(timeout: Duration) -> Option<libc::timespec> {
fn timeout_to_timespec(timeout: Duration) -> Option<libc::timespec> {
// Handle overflows early on
if timeout.as_secs() > libc::time_t::max_value() as u64 {
return None;
Expand Down
87 changes: 45 additions & 42 deletions core/src/thread_parker/windows/keyed_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,56 +49,59 @@ impl KeyedEvent {
}

#[allow(non_snake_case)]
pub unsafe fn create() -> Option<KeyedEvent> {
let ntdll = GetModuleHandleA(b"ntdll.dll\0".as_ptr() as LPCSTR);
if ntdll.is_null() {
return None;
}
pub fn create() -> Option<KeyedEvent> {
unsafe {
let ntdll = GetModuleHandleA(b"ntdll.dll\0".as_ptr() as LPCSTR);
if ntdll.is_null() {
return None;
}

let NtCreateKeyedEvent = GetProcAddress(ntdll, b"NtCreateKeyedEvent\0".as_ptr() as LPCSTR);
if NtCreateKeyedEvent.is_null() {
return None;
}
let NtReleaseKeyedEvent =
GetProcAddress(ntdll, b"NtReleaseKeyedEvent\0".as_ptr() as LPCSTR);
if NtReleaseKeyedEvent.is_null() {
return None;
}
let NtWaitForKeyedEvent =
GetProcAddress(ntdll, b"NtWaitForKeyedEvent\0".as_ptr() as LPCSTR);
if NtWaitForKeyedEvent.is_null() {
return None;
}
let NtCreateKeyedEvent =
GetProcAddress(ntdll, b"NtCreateKeyedEvent\0".as_ptr() as LPCSTR);
if NtCreateKeyedEvent.is_null() {
return None;
}
let NtReleaseKeyedEvent =
GetProcAddress(ntdll, b"NtReleaseKeyedEvent\0".as_ptr() as LPCSTR);
if NtReleaseKeyedEvent.is_null() {
return None;
}
let NtWaitForKeyedEvent =
GetProcAddress(ntdll, b"NtWaitForKeyedEvent\0".as_ptr() as LPCSTR);
if NtWaitForKeyedEvent.is_null() {
return None;
}

let NtCreateKeyedEvent: extern "system" fn(
KeyedEventHandle: PHANDLE,
DesiredAccess: ACCESS_MASK,
ObjectAttributes: PVOID,
Flags: ULONG,
) -> NTSTATUS = mem::transmute(NtCreateKeyedEvent);
let mut handle = mem::uninitialized();
let status = NtCreateKeyedEvent(
&mut handle,
GENERIC_READ | GENERIC_WRITE,
ptr::null_mut(),
0,
);
if status != STATUS_SUCCESS {
return None;
}
let NtCreateKeyedEvent: extern "system" fn(
KeyedEventHandle: PHANDLE,
DesiredAccess: ACCESS_MASK,
ObjectAttributes: PVOID,
Flags: ULONG,
) -> NTSTATUS = mem::transmute(NtCreateKeyedEvent);
let mut handle = mem::uninitialized();
let status = NtCreateKeyedEvent(
&mut handle,
GENERIC_READ | GENERIC_WRITE,
ptr::null_mut(),
0,
);
if status != STATUS_SUCCESS {
return None;
}

Some(KeyedEvent {
handle,
NtReleaseKeyedEvent: mem::transmute(NtReleaseKeyedEvent),
NtWaitForKeyedEvent: mem::transmute(NtWaitForKeyedEvent),
})
Some(KeyedEvent {
handle,
NtReleaseKeyedEvent: mem::transmute(NtReleaseKeyedEvent),
NtWaitForKeyedEvent: mem::transmute(NtWaitForKeyedEvent),
})
}
}

pub unsafe fn prepare_park(&'static self, key: &AtomicUsize) {
pub fn prepare_park(&'static self, key: &AtomicUsize) {
key.store(STATE_PARKED, Ordering::Relaxed);
}

pub unsafe fn timed_out(&'static self, key: &AtomicUsize) -> bool {
pub fn timed_out(&'static self, key: &AtomicUsize) -> bool {
key.load(Ordering::Relaxed) == STATE_TIMED_OUT
}

Expand Down
Loading