diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 71605920..d582dcb0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -39,7 +39,7 @@ jobs: steps: - uses: actions/checkout@v2 - name: Install Rust - run: rustup update 1.46.0 && rustup default 1.46.0 + run: rustup update 1.47.0 && rustup default 1.47.0 - name: Check run: cargo check --all-features diff --git a/Cargo.toml b/Cargo.toml index 52cef615..98df4a29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,3 +39,5 @@ serde = { version = "1.0.92", features = ["derive"], optional = true } serde_json = { version = "1.0.33", optional = true } futures-util = { version = "0.3.0", optional = true } + +once_cell = { version = "1.4.1" } \ No newline at end of file diff --git a/src/future/atomic_waker.rs b/src/future/atomic_waker.rs index 25698ee5..1770ab7a 100644 --- a/src/future/atomic_waker.rs +++ b/src/future/atomic_waker.rs @@ -21,8 +21,9 @@ impl AtomicWaker { } /// Registers the current task to be notified on calls to `wake`. + #[track_caller] pub fn register(&self, waker: Waker) { - if dbg!(!self.object.try_acquire_lock()) { + if dbg!(!self.object.try_acquire_lock(&trace!())) { waker.wake(); // yield the task and try again... this is a spin lock. thread::yield_now(); @@ -30,7 +31,7 @@ impl AtomicWaker { } *self.waker.lock().unwrap() = Some(waker); - dbg!(self.object.release_lock()); + dbg!(self.object.release_lock(&trace!())); } /// Registers the current task to be woken without consuming the value. @@ -47,12 +48,13 @@ impl AtomicWaker { /// Attempts to take the `Waker` value out of the `AtomicWaker` with the /// intention that the caller will wake the task later. + #[track_caller] pub fn take_waker(&self) -> Option { - dbg!(self.object.acquire_lock()); + dbg!(self.object.acquire_lock(&trace!())); let ret = self.waker.lock().unwrap().take(); - dbg!(self.object.release_lock()); + dbg!(self.object.release_lock(&trace!())); ret } diff --git a/src/future/mod.rs b/src/future/mod.rs index e72590c0..46740ed8 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -13,6 +13,7 @@ use std::mem; use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; /// Block the current thread, driving `f` to completion. +#[track_caller] pub fn block_on(f: F) -> F::Output where F: Future, @@ -36,7 +37,7 @@ where Poll::Pending => {} } - notify.wait(); + notify.wait(&trace!()); } } @@ -61,15 +62,17 @@ unsafe fn clone_arc_raw(data: *const ()) -> RawWaker { RawWaker::new(data, waker_vtable()) } +#[track_caller] unsafe fn wake_arc_raw(data: *const ()) { let notify: Arc = Arc::from_raw(data as *const _); - notify.notify(); + notify.notify(&trace!()); } +#[track_caller] unsafe fn wake_by_ref_arc_raw(data: *const ()) { // Retain Arc, but don't touch refcount by wrapping in ManuallyDrop let arc = mem::ManuallyDrop::new(Arc::::from_raw(data as *const _)); - arc.notify(); + arc.notify(&trace!()); } unsafe fn drop_arc_raw(data: *const ()) { diff --git a/src/lazy_static.rs b/src/lazy_static.rs index 06b821e7..6a183cf3 100644 --- a/src/lazy_static.rs +++ b/src/lazy_static.rs @@ -2,7 +2,6 @@ use crate::rt; pub use crate::rt::thread::AccessError; -pub use crate::rt::yield_now; use crate::sync::atomic::Ordering; pub use std::thread::panicking; diff --git a/src/model.rs b/src/model.rs index 0329c2f5..6874d357 100644 --- a/src/model.rs +++ b/src/model.rs @@ -194,7 +194,11 @@ impl Builder { let f = f.clone(); + let panic_in_drop_cell = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + let _enter = rt::panic::with_panic_in_drop_cell(panic_in_drop_cell.clone()); + scheduler.run(&mut execution, move || { + let _enter = rt::panic::with_panic_in_drop_cell(panic_in_drop_cell); f(); let lazy_statics = rt::execution(|execution| execution.lazy_statics.drop()); @@ -205,8 +209,12 @@ impl Builder { rt::thread_done(); }); + execution.check_consistency(); + execution.check_for_leaks(); + rt::panic::check_panic_in_drop(); + if let Some(next) = execution.step() { execution = next; } else { diff --git a/src/rt/arc.rs b/src/rt/arc.rs index 51ba3cee..0b96ac8f 100644 --- a/src/rt/arc.rs +++ b/src/rt/arc.rs @@ -3,11 +3,19 @@ use crate::rt::{self, Access, Location, Synchronize, VersionVec}; use std::sync::atomic::Ordering::{Acquire, Release}; +use super::{trace::TraceEntity, Trace}; + #[derive(Debug)] pub(crate) struct Arc { state: object::Ref, } +impl TraceEntity for Arc { + fn as_trace_ref(&self) -> rt::TraceRef { + self.state.as_trace_ref().relabel("Arc") + } +} + #[derive(Debug)] pub(super) struct State { /// Reference count @@ -59,8 +67,8 @@ impl Arc { }) } - pub(crate) fn ref_inc(&self) { - self.branch(Action::RefInc); + pub(crate) fn ref_inc(&self, trace: &Trace) { + self.branch(trace, Action::RefInc); rt::execution(|execution| { let state = self.state.get_mut(&mut execution.objects); @@ -69,8 +77,8 @@ impl Arc { } /// Validate a `get_mut` call - pub(crate) fn get_mut(&self) -> bool { - self.branch(Action::RefDec); + pub(crate) fn get_mut(&self, trace: &Trace) -> bool { + self.branch(trace, Action::RefDec); rt::execution(|execution| { let state = self.state.get_mut(&mut execution.objects); @@ -89,8 +97,8 @@ impl Arc { } /// Returns true if the memory should be dropped. - pub(crate) fn ref_dec(&self) -> bool { - self.branch(Action::RefDec); + pub(crate) fn ref_dec(&self, trace: &Trace) -> bool { + self.branch(trace, Action::RefDec); rt::execution(|execution| { let state = self.state.get_mut(&mut execution.objects); @@ -118,9 +126,9 @@ impl Arc { }) } - fn branch(&self, action: Action) { + fn branch(&self, trace: &Trace, action: Action) { let r = self.state; - r.branch_action(action); + r.branch_action(&trace.with_ref(&self.state), action); assert!( r.ref_eq(self.state), "Internal state mutated during branch. This is \ diff --git a/src/rt/atomic.rs b/src/rt/atomic.rs index cd22f273..5477d2c2 100644 --- a/src/rt/atomic.rs +++ b/src/rt/atomic.rs @@ -40,12 +40,20 @@ use std::marker::PhantomData; use std::sync::atomic::Ordering; use std::u16; +use super::{trace::TraceEntity, Trace}; + #[derive(Debug)] pub(crate) struct Atomic { state: object::Ref, _p: PhantomData T>, } +impl TraceEntity for Atomic { + fn as_trace_ref(&self) -> rt::TraceRef { + self.state.as_trace_ref().relabel_implicit(self) + } +} + #[derive(Debug)] pub(super) struct State { /// Where the atomic was created @@ -175,8 +183,10 @@ impl Atomic { } /// Loads a value from the atomic cell. - pub(crate) fn load(&self, location: Location, ordering: Ordering) -> T { - self.branch(Action::Load); + pub(crate) fn load(&self, trace: &Trace, location: Location, ordering: Ordering) -> T { + let trace = trace.with_ref(self); + + self.branch(&trace, Action::Load); super::synchronize(|execution| { let state = self.state.get_mut(&mut execution.objects); @@ -187,11 +197,11 @@ impl Atomic { let n = state.match_load_to_stores(&execution.threads, &mut seed[..], ordering); - execution.path.push_load(&seed[..n]); + execution.path.push_load(&trace, &seed[..n]); } // Get the store to return from this load. - let index = execution.path.branch_load(); + let index = execution.path.branch_load(&trace); T::from_u64(state.load(&mut execution.threads, index, location, ordering)) }) @@ -216,8 +226,8 @@ impl Atomic { } /// Stores a value into the atomic cell. - pub(crate) fn store(&self, location: Location, val: T, ordering: Ordering) { - self.branch(Action::Store); + pub(crate) fn store(&self, trace: &Trace, location: Location, val: T, ordering: Ordering) { + self.branch(&trace.with_ref(self), Action::Store); super::synchronize(|execution| { let state = self.state.get_mut(&mut execution.objects); @@ -240,6 +250,7 @@ impl Atomic { pub(crate) fn rmw( &self, + trace: &Trace, location: Location, success: Ordering, failure: Ordering, @@ -248,7 +259,9 @@ impl Atomic { where F: FnOnce(T) -> Result, { - self.branch(Action::Rmw); + let trace = trace.with_ref(self); + + self.branch(&trace, Action::Rmw); super::synchronize(|execution| { let state = self.state.get_mut(&mut execution.objects); @@ -258,11 +271,11 @@ impl Atomic { let mut seed = [0; MAX_ATOMIC_HISTORY]; let n = state.match_rmw_to_stores(&mut seed[..]); - execution.path.push_load(&seed[..n]); + execution.path.push_load(&trace, &seed[..n]); } // Get the store to use for the read portion of the rmw operation. - let index = execution.path.branch_load(); + let index = execution.path.branch_load(&trace); state .rmw( @@ -324,9 +337,9 @@ impl Atomic { f(&mut reset.0) } - fn branch(&self, action: Action) { + fn branch(&self, trace: &Trace, action: Action) { let r = self.state; - r.branch_action(action); + r.branch_action(trace, action); assert!( r.ref_eq(self.state), "Internal state mutated during branch. This is \ diff --git a/src/rt/condvar.rs b/src/rt/condvar.rs index 11de8b2d..aedde7bb 100644 --- a/src/rt/condvar.rs +++ b/src/rt/condvar.rs @@ -3,11 +3,19 @@ use crate::rt::{self, thread, Access, Mutex, VersionVec}; use std::collections::VecDeque; +use super::{trace::TraceEntity, Trace}; + #[derive(Debug, Copy, Clone)] pub(crate) struct Condvar { state: object::Ref, } +impl TraceEntity for Condvar { + fn as_trace_ref(&self) -> rt::TraceRef { + self.state.as_trace_ref().relabel("Condvar") + } +} + #[derive(Debug)] pub(super) struct State { /// Tracks access to the mutex @@ -31,8 +39,8 @@ impl Condvar { } /// Blocks the current thread until this condition variable receives a notification. - pub(crate) fn wait(&self, mutex: &Mutex) { - self.state.branch_opaque(); + pub(crate) fn wait(&self, trace: &Trace, mutex: &Mutex) { + self.state.branch_opaque(&trace.with_ref(self)); rt::execution(|execution| { let state = self.state.get_mut(&mut execution.objects); @@ -42,18 +50,18 @@ impl Condvar { }); // Release the lock - mutex.release_lock(); + mutex.release_lock(&trace.with_ref(self)); // Disable the current thread - rt::park(); + rt::park(&trace.with_ref(self)); // Acquire the lock again - mutex.acquire_lock(); + mutex.acquire_lock(trace); } /// Wakes up one blocked thread on this condvar. - pub(crate) fn notify_one(&self) { - self.state.branch_opaque(); + pub(crate) fn notify_one(&self, trace: &Trace) { + self.state.branch_opaque(&trace.with_ref(self)); rt::execution(|execution| { let state = self.state.get_mut(&mut execution.objects); @@ -68,8 +76,8 @@ impl Condvar { } /// Wakes up all blocked threads on this condvar. - pub(crate) fn notify_all(&self) { - self.state.branch_opaque(); + pub(crate) fn notify_all(&self, trace: &Trace) { + self.state.branch_opaque(&trace.with_ref(self)); rt::execution(|execution| { let state = self.state.get_mut(&mut execution.objects); diff --git a/src/rt/execution.rs b/src/rt/execution.rs index 87e07bc3..a66a0380 100644 --- a/src/rt/execution.rs +++ b/src/rt/execution.rs @@ -5,6 +5,8 @@ use std::collections::HashMap; use std::convert::TryInto; use std::fmt; +use super::Trace; + pub(crate) struct Execution { /// Uniquely identifies an execution pub(super) id: Id, @@ -124,7 +126,7 @@ impl Execution { } /// Returns `true` if a switch is required - pub(crate) fn schedule(&mut self) -> bool { + pub(crate) fn schedule(&mut self, trace: &Trace) -> bool { use crate::rt::path::Thread; // Implementation of the DPOR algorithm. @@ -177,7 +179,7 @@ impl Execution { let path_id = self.path.pos(); - let next = self.path.branch_thread(self.id, { + let next = self.path.branch_thread(trace, self.id, { self.threads.iter().map(|(i, th)| { if initial.is_none() && th.is_runnable() { initial = Some(i); @@ -197,6 +199,14 @@ impl Execution { let switched = Some(self.threads.active_id()) != next; + if switched { + if let Some(thread_id) = next { + self.path.record_event( + &Trace::opaque("THREAD SWITCH").with_custom_ref("Thread", thread_id.as_usize()), + ) + } + } + self.threads.set_active(next); // There is no active thread. Unless all threads have terminated, the @@ -246,6 +256,13 @@ impl Execution { curr_thread != self.threads.active_id() } + /// Panics if execution was determined to be non-deterministic + pub(crate) fn check_consistency(&self) { + if self.path.is_inconsistent() { + panic!("Aborting due to non-deterministic execution"); + } + } + /// Panics if any leaks were detected pub(crate) fn check_for_leaks(&self) { self.objects.check_for_leaks(); diff --git a/src/rt/mod.rs b/src/rt/mod.rs index 704f7396..f31821fe 100644 --- a/src/rt/mod.rs +++ b/src/rt/mod.rs @@ -1,3 +1,7 @@ +#[macro_use] +pub(crate) mod trace; +pub(crate) use self::trace::{Trace, TraceRef}; + mod access; use self::access::Access; @@ -62,13 +66,66 @@ pub(crate) const MAX_THREADS: usize = 4; /// Maximum number of atomic store history to track per-cell. pub(crate) const MAX_ATOMIC_HISTORY: usize = 7; +/// In some cases, we may need to suppress panics that occur in Drop handlers. +/// The thread-local state here provides a place to record that we did so, so +/// that we can force the test to fail, while allowing the drop processing to +/// proceed in the meantime. +pub(crate) mod panic { + use std::cell::RefCell; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + + thread_local! { + static PANIC_IN_DROP_CELL : RefCell>> = RefCell::new(None); + } + + pub(crate) fn paniced_in_drop() { + eprintln!("Suppressing panic occurring in drop handler"); + PANIC_IN_DROP_CELL.with(|p| { + let borrow = p.borrow(); + if let Some(atomic) = &*borrow { + atomic.store(true, Ordering::SeqCst); + } + }); + } + + pub(crate) fn get_panic_in_drop_cell() -> Option> { + PANIC_IN_DROP_CELL.with(|p| p.borrow().clone()) + } + + pub(crate) fn with_panic_in_drop_cell(cell: Arc) -> impl Drop { + struct ClearCell(Option>); + impl Drop for ClearCell { + fn drop(&mut self) { + PANIC_IN_DROP_CELL.with(|p| p.replace(self.0.take())); + } + } + + PANIC_IN_DROP_CELL.with(|p| { + let mut p = p.borrow_mut(); + + let restore = ClearCell(p.take()); + *p = Some(cell); + restore + }) + } + + pub(crate) fn check_panic_in_drop() { + if PANIC_IN_DROP_CELL.with(|p| p.borrow().as_ref().unwrap().load(Ordering::SeqCst)) { + panic!("Paniced in drop handler"); + } + } +} + pub(crate) fn spawn(f: F) -> crate::rt::thread::Id where F: FnOnce() + 'static, { + let panic_in_drop = panic::get_panic_in_drop_cell().unwrap(); let id = execution(|execution| execution.new_thread()); Scheduler::spawn(Box::new(move || { + let _enter = panic::with_panic_in_drop_cell(panic_in_drop); f(); thread_done(); })); @@ -77,24 +134,24 @@ where } /// Marks the current thread as blocked -pub fn park() { +pub(crate) fn park(trace: &Trace) { execution(|execution| { execution.threads.active_mut().set_blocked(); execution.threads.active_mut().operation = None; - execution.schedule() + execution.schedule(trace) }); Scheduler::switch(); } /// Add an execution branch point. -fn branch(f: F) -> R +fn branch(trace: &Trace, f: F) -> R where F: FnOnce(&mut Execution) -> R, { let (ret, switch) = execution(|execution| { let ret = f(execution); - (ret, execution.schedule()) + (ret, execution.schedule(trace)) }); if switch { @@ -119,11 +176,11 @@ where /// /// This enables concurrent algorithms that require other threads to make /// progress. -pub fn yield_now() { +pub(crate) fn yield_now(trace: &Trace) { let switch = execution(|execution| { execution.threads.active_mut().set_yield(); execution.threads.active_mut().operation = None; - execution.schedule() + execution.schedule(trace) }); if switch { @@ -138,6 +195,7 @@ where Scheduler::with_execution(f) } +#[track_caller] pub fn thread_done() { let locals = execution(|execution| execution.threads.active_mut().drop_locals()); @@ -147,6 +205,6 @@ pub fn thread_done() { execution(|execution| { execution.threads.active_mut().operation = None; execution.threads.active_mut().set_terminated(); - execution.schedule(); + execution.schedule(&trace!()); }); } diff --git a/src/rt/mpsc.rs b/src/rt/mpsc.rs index 2a15e3ef..022c113b 100644 --- a/src/rt/mpsc.rs +++ b/src/rt/mpsc.rs @@ -2,11 +2,19 @@ use crate::rt::{object, Access, Synchronize, VersionVec}; use std::collections::VecDeque; use std::sync::atomic::Ordering::{Acquire, Release}; +use super::{trace::TraceEntity, Trace}; + #[derive(Debug)] pub(crate) struct Channel { state: object::Ref, } +impl TraceEntity for Channel { + fn as_trace_ref(&self) -> super::TraceRef { + self.state.as_trace_ref().relabel("Entity") + } +} + #[derive(Debug)] pub(super) struct State { /// Count of messages in the channel. @@ -61,8 +69,9 @@ impl Channel { }) } - pub(crate) fn send(&self) { - self.state.branch_action(Action::MsgSend); + pub(crate) fn send(&self, trace: &Trace) { + self.state + .branch_action(&trace.with_ref(self), Action::MsgSend); super::execution(|execution| { let state = self.state.get_mut(&mut execution.objects); state.msg_cnt = state.msg_cnt.checked_add(1).expect("overflow"); @@ -95,8 +104,9 @@ impl Channel { }) } - pub(crate) fn recv(&self) { - self.state.branch_disable(Action::MsgRecv, self.is_empty()); + pub(crate) fn recv(&self, trace: &Trace) { + self.state + .branch_disable(&trace.with_ref(self), Action::MsgRecv, self.is_empty()); super::execution(|execution| { let state = self.state.get_mut(&mut execution.objects); let thread_id = execution.threads.active_id(); diff --git a/src/rt/mutex.rs b/src/rt/mutex.rs index f939258d..39927220 100644 --- a/src/rt/mutex.rs +++ b/src/rt/mutex.rs @@ -3,11 +3,19 @@ use crate::rt::{thread, Access, Synchronize, VersionVec}; use std::sync::atomic::Ordering::{Acquire, Release}; +use super::{trace::TraceEntity, Trace}; + #[derive(Debug, Copy, Clone)] pub(crate) struct Mutex { state: object::Ref, } +impl TraceEntity for Mutex { + fn as_trace_ref(&self) -> super::TraceRef { + self.state.as_trace_ref().relabel("Mutex") + } +} + #[derive(Debug)] pub(super) struct State { /// If the mutex should establish sequential consistency. @@ -38,18 +46,21 @@ impl Mutex { }) } - pub(crate) fn acquire_lock(&self) { - self.state.branch_acquire(self.is_locked()); + pub(crate) fn acquire_lock(&self, trace: &Trace) { + self.state + .branch_acquire(&trace.with_ref(self), self.is_locked()); assert!(self.post_acquire(), "expected to be able to acquire lock"); } - pub(crate) fn try_acquire_lock(&self) -> bool { - self.state.branch_opaque(); + pub(crate) fn try_acquire_lock(&self, trace: &Trace) -> bool { + self.state.branch_opaque(&trace.with_ref(self)); self.post_acquire() } - pub(crate) fn release_lock(&self) { + pub(crate) fn release_lock(&self, trace: &Trace) { super::execution(|execution| { + execution.path.record_event(&trace.with_ref(self)); + let state = self.state.get_mut(&mut execution.objects); // Release the lock flag diff --git a/src/rt/notify.rs b/src/rt/notify.rs index ad55b53d..e2d0fa52 100644 --- a/src/rt/notify.rs +++ b/src/rt/notify.rs @@ -3,11 +3,19 @@ use crate::rt::{self, Access, Synchronize, VersionVec}; use std::sync::atomic::Ordering::{Acquire, Release}; +use super::{trace::TraceEntity, Trace}; + #[derive(Debug, Copy, Clone)] pub(crate) struct Notify { state: object::Ref, } +impl TraceEntity for Notify { + fn as_trace_ref(&self) -> rt::TraceRef { + self.state.as_trace_ref().relabel("Notify") + } +} + #[derive(Debug)] pub(super) struct State { /// If true, spurious notifications are possible @@ -45,8 +53,8 @@ impl Notify { }) } - pub(crate) fn notify(self) { - self.state.branch_opaque(); + pub(crate) fn notify(self, trace: &Trace) { + self.state.branch_opaque(&trace.with_ref(&self)); rt::execution(|execution| { let state = self.state.get_mut(&mut execution.objects); @@ -76,10 +84,12 @@ impl Notify { }); } - pub(crate) fn wait(self) { + pub(crate) fn wait(self, trace: &Trace) { + let trace = trace.with_ref(&self); + let (notified, spurious) = rt::execution(|execution| { let spurious = if self.state.get(&execution.objects).might_spur() { - execution.path.branch_spurious() + execution.path.branch_spurious(&trace) } else { false }; @@ -94,15 +104,15 @@ impl Notify { }); if spurious { - rt::yield_now(); + rt::yield_now(&trace); return; } if notified { - self.state.branch_opaque(); + self.state.branch_opaque(&trace); } else { // This should become branch_disable - self.state.branch_acquire(true) + self.state.branch_acquire(&trace, true) } // Thread was notified diff --git a/src/rt/object.rs b/src/rt/object.rs index 85914a34..547792b8 100644 --- a/src/rt/object.rs +++ b/src/rt/object.rs @@ -7,6 +7,8 @@ use std::marker::PhantomData; #[cfg(feature = "checkpoint")] use serde::{Deserialize, Serialize}; +use super::Trace; + /// Stores objects #[derive(Debug)] #[cfg_attr(feature = "checkpoint", derive(Serialize, Deserialize))] @@ -265,6 +267,11 @@ impl Ref { pub(super) fn ref_eq(self, other: Ref) -> bool { self.index == other.index } + + /// Returns the Ref's unique index (for debugging output) + pub(super) fn index(self) -> usize { + self.index + } } impl Ref { @@ -324,8 +331,8 @@ impl fmt::Debug for Ref { // TODO: These fns shouldn't be on Ref impl> Ref { // TODO: rename `branch_disable` - pub(super) fn branch_acquire(self, is_locked: bool) { - super::branch(|execution| { + pub(super) fn branch_acquire(self, trace: &Trace, is_locked: bool) { + super::branch(trace, |execution| { self.set_action(execution, Action::Opaque); if is_locked { @@ -335,14 +342,19 @@ impl> Ref { }) } - pub(super) fn branch_action(self, action: impl Into) { - super::branch(|execution| { + pub(super) fn branch_action(self, trace: &Trace, action: impl Into) { + super::branch(trace, |execution| { self.set_action(execution, action.into()); }) } - pub(super) fn branch_disable(self, action: impl Into + std::fmt::Debug, disable: bool) { - super::branch(|execution| { + pub(super) fn branch_disable( + self, + trace: &Trace, + action: impl Into + std::fmt::Debug, + disable: bool, + ) { + super::branch(trace, |execution| { self.set_action(execution, action.into()); if disable { @@ -352,8 +364,8 @@ impl> Ref { }) } - pub(super) fn branch_opaque(self) { - self.branch_action(Action::Opaque) + pub(super) fn branch_opaque(self, trace: &Trace) { + self.branch_action(trace, Action::Opaque) } fn set_action(self, execution: &mut Execution, action: Action) { diff --git a/src/rt/path.rs b/src/rt/path.rs index 61b43ec9..cca7b970 100644 --- a/src/rt/path.rs +++ b/src/rt/path.rs @@ -1,8 +1,18 @@ -use crate::rt::{execution, object, thread, MAX_ATOMIC_HISTORY, MAX_THREADS}; +use std::{cell::Cell, collections::VecDeque}; + +use crate::rt::{execution, object, thread, Trace, MAX_ATOMIC_HISTORY, MAX_THREADS}; #[cfg(feature = "checkpoint")] use serde::{Deserialize, Serialize}; +use super::object::Object; + +const DETAIL_TRACE_LEN: usize = 40; + +thread_local! { + static ABORTED_THREAD: Cell = Cell::new(false); +} + /// An execution path #[derive(Debug)] #[cfg_attr(feature = "checkpoint", derive(Serialize, Deserialize))] @@ -22,6 +32,25 @@ pub(crate) struct Path { /// /// A branch is of type `Schedule`, `Load`, or `Spurious` branches: object::Store, + + /// List of execution trace records. + /// + /// Each record indicates which point in the code (and what operation) is + /// associated with the corresponding entry in branches. + schedule_trace: Vec, + + /// List of detail trace records. + /// + /// These records include some non-scheduling events as well, but are + /// truncated to the last N record (or all records since the last schedule_trace + /// event) + #[cfg_attr(feature = "checkpoint", serde(skip))] + exec_trace: VecDeque<(Option, Trace)>, + + /// True if a consistency check has failed (we'll need to unwind and + /// terminate the model, but need to avoid panicing-in-panicing in the + /// process) + inconsistent: bool, } #[derive(Debug)] @@ -108,12 +137,17 @@ impl Path { preemption_bound, pos: 0, branches: object::Store::with_capacity(max_branches), + schedule_trace: Vec::with_capacity(max_branches), + exec_trace: VecDeque::with_capacity(DETAIL_TRACE_LEN), + inconsistent: false, } } pub(crate) fn set_max_branches(&mut self, max_branches: usize) { self.branches .reserve_exact(max_branches - self.branches.len()); + self.schedule_trace + .reserve_exact(max_branches - self.branches.len()) } /// Returns `true` if the execution has reached a point where the known path @@ -122,35 +156,145 @@ impl Path { self.pos == self.branches.len() } + /// Returns `true` if nondeterministic execution was detected. + pub(crate) fn is_inconsistent(&self) -> bool { + self.inconsistent + } + pub(super) fn pos(&self) -> usize { self.pos } + /// Records a trace originating from a new execution branching point. This + /// should be called whenever we add something to `branches`. + fn push_new_schedule_trace(&mut self, trace: &Trace, obj: O) -> super::object::Ref + where + O: Object, + { + let r = self.branches.insert(obj); + let index = self.schedule_trace.len() as u32; + self.schedule_trace.push(trace.clone()); + self.record_schedule(index, trace); + + r + } + + /// Records a branching point to the execution trace. Unlike + /// `push_new_schedule_trace`, this is invoked also when revisiting entries + /// already in `branches`, and is used to maintain `exec_trace` (which is + /// used to provide more helpful debugging output). + fn record_schedule(&mut self, pos: u32, trace: &Trace) { + self.exec_trace.push_back((Some(pos), trace.clone())); + + while self.exec_trace.len() > DETAIL_TRACE_LEN + && self.exec_trace.front().unwrap().0 != pos.checked_sub(1) + { + self.exec_trace.pop_front(); + } + } + + /// Records an arbitrary non-branching event. These events are used only for + /// debug information when nondeterministic execution does occur. + pub(crate) fn record_event(&mut self, trace: &Trace) { + self.exec_trace.push_back((None, trace.clone())); + } + + /// Asserts that the current branching point matches the given trace. Should + /// be calling when processing pre-existing entries in `branches`. + fn check_trace(&mut self, trace: &Trace) { + let expected = match self.schedule_trace.get(self.pos) { + Some(t) => t, + None => { + self.inconsistent = true; + self.trace_mismatch(Trace::opaque("{missing}"), trace); + return; + } + }; + + if expected != trace { + // drop the reference before making a call on mut self + let expected = expected.clone(); + + self.inconsistent = true; + self.trace_mismatch(expected, trace); + return; + } + + self.record_schedule(self.pos as u32, trace); + } + + #[cold] + fn trace_mismatch(&mut self, expected: Trace, trace: &Trace) { + if ABORTED_THREAD.with(|cell| cell.replace(true)) { + // We've already paniced on this thread; avoid doing it again to + // avoid panicing during destructors. This is not fully effective as + // nondeterministic execution tends to muck up the scheduling logic, + // which can lead to deadlock panics in drop routines, etc, but it + // helps a little. + return; + } + + let mut err = String::new(); + err.push_str("===== NONDETERMINISTIC EXECUTION DETECTED =====\n"); + err.push_str("Previous execution:\n"); + err.push_str(&format!(" {}: {}\n\n", self.pos, expected.simplify())); + err.push_str("Current execution:\n"); + err.push_str(&format!(" {}: {}\n", self.pos, trace.simplify())); + + err.push_str("\nRecent events:\n"); + for (i, trace) in self.exec_trace.iter() { + let trace = trace.simplify(); + + if let Some(i) = i { + err.push_str(&format!(" {:4}: {}\n", i, trace)); + } else { + err.push_str(&format!(" ...: {}\n", trace)); + } + } + + // We avoid panicing at the site of the problem, because this can lead + // to panics-while-panicing, and the resultant process termination. + // Since the normal rust test harness redirects and buffers output, + // terminating before the test completes hides this output and can + // therefore result in a wholly unhelpful test failure. + // + // Instead, we log that we're in an inconsistent state, discard our + // entire execution path (since the test is doomed, we don't need to do + // any further exploration), and let the current exploration run to + // completion. + eprintln!("{}", err); + } + /// Push a new atomic-load branch - pub(super) fn push_load(&mut self, seed: &[u8]) { + pub(super) fn push_load(&mut self, trace: &Trace, seed: &[u8]) { assert_path_len!(self.branches); - let load_ref = self.branches.insert(Load { - values: [0; MAX_ATOMIC_HISTORY], - pos: 0, - len: 0, - }); + let load_ref = self.push_new_schedule_trace( + trace, + Load { + values: [0; MAX_ATOMIC_HISTORY], + pos: 0, + len: 0, + }, + ); let load = load_ref.get_mut(&mut self.branches); for (i, &store) in seed.iter().enumerate() { - assert!( - store < MAX_ATOMIC_HISTORY as u8, - "[loom internal bug] store = {}; max = {}", - store, - MAX_ATOMIC_HISTORY - ); - assert!( - i < MAX_ATOMIC_HISTORY, - "[loom internal bug] i = {}; max = {}", - i, - MAX_ATOMIC_HISTORY - ); + if !self.inconsistent { + assert!( + store < MAX_ATOMIC_HISTORY as u8, + "[loom internal bug] store = {}; max = {}", + store, + MAX_ATOMIC_HISTORY + ); + assert!( + i < MAX_ATOMIC_HISTORY, + "[loom internal bug] i = {}; max = {}", + i, + MAX_ATOMIC_HISTORY + ); + } load.values[i] = store as u8; load.len += 1; @@ -158,9 +302,11 @@ impl Path { } /// Returns the atomic write to read - pub(super) fn branch_load(&mut self) -> usize { + pub(super) fn branch_load(&mut self, trace: &Trace) -> usize { assert!(!self.is_traversed(), "[loom internal bug]"); + self.check_trace(trace); + let load = object::Ref::from_usize(self.pos) .downcast::(&self.branches) .expect("Reached unexpected exploration state. Is the model fully determistic?") @@ -172,11 +318,13 @@ impl Path { } /// Branch on spurious notifications - pub(super) fn branch_spurious(&mut self) -> bool { + pub(super) fn branch_spurious(&mut self, trace: &Trace) -> bool { if self.is_traversed() { assert_path_len!(self.branches); - self.branches.insert(Spurious(false)); + self.push_new_schedule_trace(trace, Spurious(false)); + } else { + self.check_trace(trace); } let spurious = object::Ref::from_usize(self.pos) @@ -192,10 +340,25 @@ impl Path { /// Returns the thread identifier to schedule pub(super) fn branch_thread( &mut self, + trace: &Trace, execution_id: execution::Id, seed: impl ExactSizeIterator, ) -> Option { - if self.is_traversed() { + if !self.is_traversed() { + self.check_trace(trace); + } + + if self.is_traversed() || self.is_inconsistent() { + if self.is_inconsistent() { + // If we're inconsistent, we'll always re-seed the scheduler. + // This screws up the path, but that's okay - we just want to + // try to run the current execution to completion so drop + // handlers don't start panicing inside panics (and thus + // terminating the entire process, which might be running other + // tests as well) + self.pos = self.branches.len(); + } + assert_path_len!(self.branches); // Find the last thread scheduling branch in the path @@ -205,12 +368,15 @@ impl Path { // // Initialize a new branch. The initial field values don't matter // as they will be updated below. - let schedule_ref = self.branches.insert(Schedule { - preemptions: 0, - initial_active: None, - threads: [Thread::Disabled; MAX_THREADS], - prev, - }); + let schedule_ref = self.push_new_schedule_trace( + trace, + Schedule { + preemptions: 0, + initial_active: None, + threads: [Thread::Disabled; MAX_THREADS], + prev, + }, + ); // Get a reference to the branch in the object store. let schedule = schedule_ref.get_mut(&mut self.branches); @@ -329,10 +495,19 @@ impl Path { /// This function will also trim the object store, dropping any objects that /// are created in pruned sections of the path. pub(super) fn step(&mut self) -> bool { + if self.is_inconsistent() { + // We've corrupted our path trace, so abort. + panic!("Inconsistent execution detected"); + } + // Reset the position to zero, the path will start traversing from the // beginning self.pos = 0; + // Since we're re-running the user code, clear the execution trace to + // avoid confusion. + self.exec_trace.clear(); + // Set the final branch to try the next option. If all options have been // traversed, pop the final branch and try again w/ the one under it. // @@ -343,6 +518,7 @@ impl Path { // Remove all objects that were created **after** this branch self.branches.truncate(last); + self.schedule_trace.truncate(self.branches.len()); if let Some(schedule_ref) = last.downcast::(&self.branches) { let schedule = schedule_ref.get_mut(&mut self.branches); diff --git a/src/rt/rwlock.rs b/src/rt/rwlock.rs index d37d5eba..5ab3411c 100644 --- a/src/rt/rwlock.rs +++ b/src/rt/rwlock.rs @@ -4,11 +4,19 @@ use crate::rt::{thread, Access, Execution, Synchronize, VersionVec}; use std::collections::HashSet; use std::sync::atomic::Ordering::{Acquire, Release}; +use super::{trace::TraceEntity, Trace}; + #[derive(Debug, Copy, Clone)] pub(crate) struct RwLock { state: object::Ref, } +impl TraceEntity for RwLock { + fn as_trace_ref(&self) -> super::TraceRef { + self.state.as_trace_ref().relabel("RwLock") + } +} + #[derive(Debug, PartialEq)] enum Locked { Read(HashSet), @@ -53,9 +61,9 @@ impl RwLock { /// Acquire the read lock. /// Fail to acquire read lock if already *write* locked. - pub(crate) fn acquire_read_lock(&self) { + pub(crate) fn acquire_read_lock(&self, trace: &Trace) { self.state - .branch_disable(Action::Read, self.is_write_locked()); + .branch_disable(&trace.with_ref(self), Action::Read, self.is_write_locked()); assert!( self.post_acquire_read_lock(), @@ -65,8 +73,9 @@ impl RwLock { /// Acquire write lock. /// Fail to acquire write lock if either read or write locked. - pub(crate) fn acquire_write_lock(&self) { + pub(crate) fn acquire_write_lock(&self, trace: &Trace) { self.state.branch_disable( + &trace.with_ref(self), Action::Write, self.is_write_locked() || self.is_read_locked(), ); @@ -77,18 +86,22 @@ impl RwLock { ); } - pub(crate) fn try_acquire_read_lock(&self) -> bool { - self.state.branch_action(Action::Read); + pub(crate) fn try_acquire_read_lock(&self, trace: &Trace) -> bool { + self.state + .branch_action(&trace.with_ref(self), Action::Read); self.post_acquire_read_lock() } - pub(crate) fn try_acquire_write_lock(&self) -> bool { - self.state.branch_action(Action::Write); + pub(crate) fn try_acquire_write_lock(&self, trace: &Trace) -> bool { + self.state + .branch_action(&trace.with_ref(self), Action::Write); self.post_acquire_write_lock() } - pub(crate) fn release_read_lock(&self) { + pub(crate) fn release_read_lock(&self, trace: &Trace) { super::execution(|execution| { + execution.path.record_event(&trace.with_ref(self)); + let state = self.state.get_mut(&mut execution.objects); let thread_id = execution.threads.active_id(); @@ -114,8 +127,10 @@ impl RwLock { }); } - pub(crate) fn release_write_lock(&self) { + pub(crate) fn release_write_lock(&self, trace: &Trace) { super::execution(|execution| { + execution.path.record_event(&trace.with_ref(self)); + let state = self.state.get_mut(&mut execution.objects); state.lock = None; diff --git a/src/rt/trace.rs b/src/rt/trace.rs new file mode 100644 index 00000000..070c0484 --- /dev/null +++ b/src/rt/trace.rs @@ -0,0 +1,340 @@ +//! Execution tracing facilities. +//! +//! The types in this module are used as a lightweight execution trace to help +//! detect nondeterministic execution, and to print a useful debug trace if +//! nondeterministic execution does occur. + +use std::any::{Any, TypeId}; +use std::collections::HashSet; +use std::panic::Location; +use std::sync::Mutex; + +use once_cell::sync::OnceCell; + +// Needed to serialize 'static strings +#[cfg_attr(feature = "checkpoint", derive(Copy, Clone, Eq, PartialEq, Hash))] +#[cfg_attr(not(feature = "checkpoint"), derive(Copy, Clone, Eq, PartialEq, Hash))] +struct InternStr(&'static str); + +impl std::fmt::Debug for InternStr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Debug::fmt(self.0, f) + } +} + +impl std::fmt::Display for InternStr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(self.0, f) + } +} + +impl std::borrow::Borrow for InternStr { + fn borrow(&self) -> &str { + self.0 + } +} + +static INTERN_STR_CACHE: OnceCell>> = OnceCell::new(); + +impl InternStr { + pub(crate) fn from_static(s: &'static str) -> Self { + InternStr(s) + } + + pub(crate) fn from_string(s: String) -> Self { + let mut lock = INTERN_STR_CACHE + .get_or_init(|| Default::default()) + .lock() + .unwrap(); + + if let Some(static_ref) = lock.get(s.as_str()) { + *static_ref + } else { + let s = InternStr(Box::leak(Box::new(s)).as_str()); + + lock.insert(s); + + s + } + } + + #[cfg(feature = "checkpoint")] + pub(crate) fn from_str(s: &str) -> Self { + let mut lock = INTERN_STR_CACHE + .get_or_init(|| Default::default()) + .lock() + .unwrap(); + + if let Some(static_ref) = lock.get(s) { + *static_ref + } else { + let s = InternStr(Box::leak(Box::new(s.to_string())).as_str()); + + lock.insert(s); + + s + } + } +} + +#[cfg(feature = "checkpoint")] +impl serde::Serialize for InternStr { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(self.0) + } +} + +#[cfg(feature = "checkpoint")] +impl<'de> serde::Deserialize<'de> for InternStr { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + return deserializer.deserialize_str(StrVisitor); + + struct StrVisitor; + impl<'de> serde::de::Visitor<'de> for StrVisitor { + type Value = InternStr; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("a string") + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + Ok(InternStr::from_str(v)) + } + + fn visit_string(self, v: String) -> Result + where + E: serde::de::Error, + { + Ok(InternStr::from_string(v)) + } + } + } +} + +/// References a specific tracked atomic object in memory +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +#[cfg_attr(feature = "checkpoint", derive(serde::Serialize, serde::Deserialize))] +pub(crate) struct TraceRef { + index: usize, + ty_name: InternStr, +} + +impl std::fmt::Display for TraceRef { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}::{}", self.ty_name, self.index) + } +} + +impl TraceRef { + pub fn new(ty_name: &'static str, index: usize) -> Self { + Self { + index, + ty_name: InternStr::from_static(ty_name), + } + } + + pub fn relabel(self, ty_name: &'static str) -> Self { + Self { + index: self.index, + ty_name: InternStr::from_static(ty_name), + } + } + + pub fn relabel_implicit(self, _ty: &T) -> Self { + Self { + index: self.index, + ty_name: InternStr::from_static(std::any::type_name::()), + } + } +} + +/// Represents an operation performed, potentially against a particular object +#[derive(Debug, Clone, Eq)] +#[cfg_attr(feature = "checkpoint", derive(serde::Serialize, serde::Deserialize))] +pub(crate) struct Trace { + operation: InternStr, + entity: Option, + #[cfg_attr(feature = "checkpoint", serde(skip))] + caller: Option<&'static Location<'static>>, +} + +impl PartialEq for Trace { + fn eq(&self, other: &Self) -> bool { + self.operation == other.operation + && self.entity == other.entity + && self + .caller + .and_then(|caller| other.caller.map(|other| caller == other)) + .unwrap_or(true) + } +} + +impl std::fmt::Display for Trace { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(caller) = self.caller { + write!(f, "[{}] ", caller)?; + } + + if let Some(entity) = self.entity { + write!(f, "{} on {}", self.operation, entity) + } else { + write!(f, "{}", self.operation) + } + } +} + +macro_rules! enclosing_fn_path { + () => {{ + fn type_name(_: T) -> &'static str { + std::any::type_name::() + } + let closure_name = type_name(|| ()); // function_name::{{closure}} + + &closure_name[..(closure_name.len() - 13)] + }}; +} + +macro_rules! trace { + ( ) => { + $crate::rt::trace::Trace::new_unbound(enclosing_fn_path!(), std::panic::Location::caller()) + }; + ( $ref:expr ) => { + $crate::rt::trace::Trace::new(enclosing_fn_path!(), std::panic::Location::caller(), $ref) + }; +} + +// TODO - make const (see https://github.com/rust-lang/rust/issues/57563) +fn entity_ty_name() -> &'static str { + if TypeId::of::() == TypeId::of::<()>() { + "UNKNOWN" + } else { + let mut name = std::any::type_name::(); + if let Some(last_colon) = name.rfind(':') { + name = &name[last_colon + 1..]; + } + + name + } +} + +/// Trait for types which can be converted into [`TraceRef`]s +pub(crate) trait TraceEntity { + fn as_trace_ref(&self) -> TraceRef; +} + +impl TraceEntity for super::object::Ref { + fn as_trace_ref(&self) -> TraceRef { + TraceRef { + index: self.index(), + ty_name: InternStr::from_static(entity_ty_name::()), + } + } +} + +impl<'a> Trace { + /// Generates a trace record for an arbitrary operation name. This is + /// typically used for internal operations like thread exit events. + #[inline] + pub(crate) fn opaque(operation: &'static str) -> Self { + Self { + operation: InternStr::from_static(operation), + caller: None, + entity: None, + } + } + + /// Creates a new trace record with a known caller location and entity. + #[inline] + pub(crate) fn new( + operation: &'static str, + caller: &'static Location<'static>, + entity: &T, + ) -> Self { + Self { + operation: InternStr::from_static(operation), + caller: Some(caller), + entity: Some(entity.as_trace_ref()), + } + } + + /// Creates a new trace record with a known caller location, but not bound to any entity. + #[inline] + pub(crate) fn new_unbound(operation: &'static str, caller: &'static Location<'static>) -> Self { + Self { + operation: InternStr::from_static(operation), + caller: Some(caller), + entity: None, + } + } + + /// Frobs the trace record using some heuristics to make it a bit easier to read. + pub(crate) fn simplify(&self) -> Self { + let mut this = self.clone(); + + if let Some(caller) = self.caller.as_ref() { + if caller + .file() + .ends_with("src/rust/library/core/src/ptr/mod.rs") + { + // hide Drop invocations to make things less noisy + this.caller = None; + } + } + + let frequent_traits = [" as core::ops::drop::Drop>", " as core::clone::Clone>"]; + + for matchstr in frequent_traits.iter() { + if let Some(index) = self.operation.0.find(matchstr) { + let mut s = String::with_capacity(self.operation.0.len()); + s.push_str(&self.operation.0[1..index]); + s.push_str(&self.operation.0[index + matchstr.len()..]); + + this.operation = InternStr::from_string(s); + } + } + + this + } + + /// Updates the trace record to contain a reference to this entity. If the + /// record aleady has an associated entity, the existing entity is left in + /// place (we assume code higher up the call stack has a more specific idea + /// of what the entity is). + #[inline] + pub(super) fn with_ref(&self, entity: &T) -> Self { + if self.entity.is_some() { + return self.clone(); + } + + let mut this = self.clone(); + this.entity = Some(entity.as_trace_ref()); + + this + } + + /// Updates the trace record to contain a manually constructed entity, if it + /// doesn't already have one associated with itself. + #[inline] + pub(crate) fn with_custom_ref(&self, entity_ty: &'static str, index: usize) -> Self { + if self.entity.is_some() { + return self.clone(); + } + + Self { + entity: Some(TraceRef { + index, + ty_name: InternStr::from_static(entity_ty), + }), + ..*self + } + } +} diff --git a/src/sync/arc.rs b/src/sync/arc.rs index 5ba4dbf3..792bc04c 100644 --- a/src/sync/arc.rs +++ b/src/sync/arc.rs @@ -37,7 +37,7 @@ impl Arc { /// Returns a mutable reference to the inner value, if there are /// no other `Arc` or [`Weak`][weak] pointers to the same value. pub fn get_mut(this: &mut Self) -> Option<&mut T> { - if this.inner.obj.get_mut() { + if this.inner.obj.get_mut(&trace!()) { assert_eq!(1, std::sync::Arc::strong_count(&this.inner)); Some(&mut std::sync::Arc::get_mut(&mut this.inner).unwrap().value) } else { @@ -81,8 +81,9 @@ impl ops::Deref for Arc { } impl Clone for Arc { + #[track_caller] fn clone(&self) -> Arc { - self.inner.obj.ref_inc(); + self.inner.obj.ref_inc(&trace!()); Arc { inner: self.inner.clone(), @@ -91,13 +92,21 @@ impl Clone for Arc { } impl Drop for Arc { + #[track_caller] fn drop(&mut self) { - if self.inner.obj.ref_dec() { - assert_eq!( - 1, - std::sync::Arc::strong_count(&self.inner), - "something odd is going on" - ); + let trace = trace!(); + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + if self.inner.obj.ref_dec(&trace) { + assert_eq!( + 1, + std::sync::Arc::strong_count(&self.inner), + "something odd is going on" + ); + } + })); + + if result.is_err() { + crate::rt::panic::paniced_in_drop(); } } } diff --git a/src/sync/atomic/atomic.rs b/src/sync/atomic/atomic.rs index 20c8814a..5de54593 100644 --- a/src/sync/atomic/atomic.rs +++ b/src/sync/atomic/atomic.rs @@ -25,12 +25,13 @@ where #[track_caller] pub(crate) fn load(&self, order: Ordering) -> T { - self.state.load(location!(), order) + self.state.load(&trace!(&self.state), location!(), order) } #[track_caller] pub(crate) fn store(&self, value: T, order: Ordering) { - self.state.store(location!(), value, order) + self.state + .store(&trace!(&self.state), location!(), value, order) } #[track_caller] @@ -46,7 +47,11 @@ where where F: FnOnce(T) -> T, { - self.try_rmw::<_, ()>(order, order, |v| Ok(f(v))).unwrap() + self.state + .rmw::<_, ()>(&trace!(&self.state), location!(), order, order, |v| { + Ok(f(v)) + }) + .unwrap() } #[track_caller] @@ -54,7 +59,8 @@ where where F: FnOnce(T) -> Result, { - self.state.rmw(location!(), success, failure, f) + self.state + .rmw(&trace!(&self.state), location!(), success, failure, f) } #[track_caller] diff --git a/src/sync/atomic/mod.rs b/src/sync/atomic/mod.rs index 9a7d9a6e..15da4cba 100644 --- a/src/sync/atomic/mod.rs +++ b/src/sync/atomic/mod.rs @@ -15,8 +15,9 @@ pub use self::ptr::AtomicPtr; pub use std::sync::atomic::Ordering; /// Signals the processor that it is entering a busy-wait spin-loop. +#[track_caller] pub fn spin_loop_hint() { - crate::thread::yield_now(); + crate::rt::yield_now(&trace!()); } /// An atomic fence. diff --git a/src/sync/atomic/ptr.rs b/src/sync/atomic/ptr.rs index 68cff748..9f1dc991 100644 --- a/src/sync/atomic/ptr.rs +++ b/src/sync/atomic/ptr.rs @@ -73,7 +73,7 @@ impl AtomicPtr { } } -impl Default for AtomicPtr { +impl Default for AtomicPtr { fn default() -> AtomicPtr { use std::ptr; AtomicPtr::new(ptr::null_mut()) diff --git a/src/sync/condvar.rs b/src/sync/condvar.rs index a7bd7410..5a1a935d 100644 --- a/src/sync/condvar.rs +++ b/src/sync/condvar.rs @@ -24,13 +24,14 @@ impl Condvar { } /// Blocks the current thread until this condition variable receives a notification. + #[track_caller] pub fn wait<'a, T>(&self, mut guard: MutexGuard<'a, T>) -> LockResult> { // Release the RefCell borrow guard allowing another thread to lock the // data guard.unborrow(); // Wait until notified - self.object.wait(guard.rt()); + self.object.wait(&trace!(), guard.rt()); // Borrow the mutex guarded data again guard.reborrow(); @@ -40,6 +41,7 @@ impl Condvar { /// Waits on this condition variable for a notification, timing out after a /// specified duration. + #[track_caller] pub fn wait_timeout<'a, T>( &self, guard: MutexGuard<'a, T>, @@ -52,13 +54,15 @@ impl Condvar { } /// Wakes up one blocked thread on this condvar. + #[track_caller] pub fn notify_one(&self) { - self.object.notify_one(); + self.object.notify_one(&trace!()); } /// Wakes up all blocked threads on this condvar. + #[track_caller] pub fn notify_all(&self) { - self.object.notify_all(); + self.object.notify_all(&trace!()); } } diff --git a/src/sync/mpsc.rs b/src/sync/mpsc.rs index 17dbd7de..a2aeaf50 100644 --- a/src/sync/mpsc.rs +++ b/src/sync/mpsc.rs @@ -27,13 +27,15 @@ pub struct Sender { impl Sender { /// Attempts to send a value on this channel, returning it back if it could /// not be sent. + #[track_caller] pub fn send(&self, msg: T) -> Result<(), std::sync::mpsc::SendError> { - self.object.send(); + self.object.send(&trace!(&*self.object)); self.sender.send(msg) } } impl Clone for Sender { + #[track_caller] fn clone(&self) -> Sender { Sender { object: std::sync::Arc::clone(&self.object), @@ -52,8 +54,9 @@ pub struct Receiver { impl Receiver { /// Attempts to wait for a value on this receiver, returning an error if the /// corresponding channel has hung up. + #[track_caller] pub fn recv(&self) -> Result { - self.object.recv(); + self.object.recv(&trace!(&*self.object)); self.receiver.recv() } /// Attempts to wait for a value on this receiver, returning an error if the @@ -67,6 +70,7 @@ impl Receiver { } impl Drop for Receiver { + #[track_caller] fn drop(&mut self) { // Drain the channel. while !self.object.is_empty() { diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index 9a3c9cb6..3cfd1aaa 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -29,8 +29,9 @@ impl Mutex { impl Mutex { /// Acquires a mutex, blocking the current thread until it is able to do so. + #[track_caller] pub fn lock(&self) -> LockResult> { - self.object.acquire_lock(); + self.object.acquire_lock(&trace!(&self.object)); Ok(MutexGuard { lock: self, @@ -45,8 +46,9 @@ impl Mutex { /// guard is dropped. /// /// This function does not block. + #[track_caller] pub fn try_lock(&self) -> TryLockResult> { - if self.object.try_acquire_lock() { + if self.object.try_acquire_lock(&trace!(&self.object)) { Ok(MutexGuard { lock: self, data: Some(self.data.lock().unwrap()), @@ -92,8 +94,9 @@ impl<'a, T> ops::DerefMut for MutexGuard<'a, T> { } impl<'a, T: 'a> Drop for MutexGuard<'a, T> { + #[track_caller] fn drop(&mut self) { self.data = None; - self.lock.object.release_lock(); + self.lock.object.release_lock(&trace!(&self.lock.object)); } } diff --git a/src/sync/notify.rs b/src/sync/notify.rs index da03a079..0213c463 100644 --- a/src/sync/notify.rs +++ b/src/sync/notify.rs @@ -27,16 +27,18 @@ impl Notify { } /// Notify the watier + #[track_caller] pub fn notify(&self) { - self.object.notify(); + self.object.notify(&trace!(&self.object)); } /// Wait for a notification + #[track_caller] pub fn wait(&self) { let actual = self.waiting.compare_and_swap(false, true, SeqCst); assert!(!actual, "only a single thread may wait on `Notify`"); - self.object.wait(); + self.object.wait(&trace!(&self.object)); self.waiting.store(false, SeqCst); } } diff --git a/src/sync/rwlock.rs b/src/sync/rwlock.rs index 18515968..6f01a80c 100644 --- a/src/sync/rwlock.rs +++ b/src/sync/rwlock.rs @@ -43,8 +43,9 @@ impl RwLock { /// lock when this method returns. This method does not provide any /// guarantees with respect to the ordering of whether contentious readers /// or writers will acquire the lock first. + #[track_caller] pub fn read(&self) -> LockResult> { - self.object.acquire_read_lock(); + self.object.acquire_read_lock(&trace!(&self.object)); Ok(RwLockReadGuard { lock: self, @@ -59,8 +60,9 @@ impl RwLock { /// access when it is dropped. /// /// This function does not block. + #[track_caller] pub fn try_read(&self) -> TryLockResult> { - if self.object.try_acquire_read_lock() { + if self.object.try_acquire_read_lock(&trace!(&self.object)) { Ok(RwLockReadGuard { lock: self, data: Some(self.data.try_read().expect("loom::RwLock state corrupt")), @@ -75,8 +77,9 @@ impl RwLock { /// /// This function will not return while other writers or other readers /// currently have access to the lock. + #[track_caller] pub fn write(&self) -> LockResult> { - self.object.acquire_write_lock(); + self.object.acquire_write_lock(&trace!(&self.object)); Ok(RwLockWriteGuard { lock: self, @@ -91,8 +94,9 @@ impl RwLock { /// it is dropped. /// /// This function does not block. + #[track_caller] pub fn try_write(&self) -> TryLockResult> { - if self.object.try_acquire_write_lock() { + if self.object.try_acquire_write_lock(&trace!(&self.object)) { Ok(RwLockWriteGuard { lock: self, data: Some(self.data.try_write().expect("loom::RwLock state corrupt")), @@ -123,9 +127,12 @@ impl<'a, T> ops::Deref for RwLockReadGuard<'a, T> { } impl<'a, T: 'a> Drop for RwLockReadGuard<'a, T> { + #[track_caller] fn drop(&mut self) { self.data = None; - self.lock.object.release_read_lock() + self.lock + .object + .release_read_lock(&trace!(&self.lock.object)) } } @@ -144,8 +151,11 @@ impl<'a, T> ops::DerefMut for RwLockWriteGuard<'a, T> { } impl<'a, T: 'a> Drop for RwLockWriteGuard<'a, T> { + #[track_caller] fn drop(&mut self) { self.data = None; - self.lock.object.release_write_lock() + self.lock + .object + .release_write_lock(&trace!(&self.lock.object)) } } diff --git a/src/thread.rs b/src/thread.rs index a570b72c..66fb8a6b 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -1,8 +1,7 @@ //! Mock implementation of `std::thread`. pub use crate::rt::thread::AccessError; -pub use crate::rt::yield_now; -use crate::rt::{self, Execution}; +use crate::rt::{self, Execution, Trace}; pub use std::thread::panicking; @@ -36,6 +35,12 @@ impl Thread { } } +impl crate::rt::trace::TraceEntity for Thread { + fn as_trace_ref(&self) -> rt::TraceRef { + rt::TraceRef::new("Thread", self.id.id.as_usize()) + } +} + /// Mock implementation of `std::thread::ThreadId`. #[derive(Clone, Copy, Eq, Hash, PartialEq)] pub struct ThreadId { @@ -87,6 +92,15 @@ fn init_current(execution: &mut Execution, name: Option) -> Thread { thread } +/// Yield the thread. +/// +/// This enables concurrent algorithms that require other threads to make +/// progress. +#[track_caller] +pub fn yield_now() { + rt::yield_now(&trace!()); +} + /// Returns a handle to the current thread. pub fn current() -> Thread { rt::execution(|execution| { @@ -129,7 +143,7 @@ where }); *result.lock().unwrap() = Some(Ok(f())); - notify.notify(); + notify.notify(&Trace::opaque("thread termination - notify JoinHandle")); }) }; @@ -177,8 +191,9 @@ impl Builder { impl JoinHandle { /// Waits for the associated thread to finish. + #[track_caller] pub fn join(self) -> std::thread::Result { - self.notify.wait(); + self.notify.wait(&trace!(&self.thread)); self.result.lock().unwrap().take().unwrap() } diff --git a/tests/nondet.rs b/tests/nondet.rs new file mode 100644 index 00000000..b69ae326 --- /dev/null +++ b/tests/nondet.rs @@ -0,0 +1,58 @@ +use std::collections::HashMap; + +use loom::model; +use loom::sync::{Arc, Mutex}; +use loom::thread::{self, JoinHandle, ThreadId}; + +#[test] +#[should_panic] +fn nondeterministic_execution_detected() { + #[derive(Default)] + struct State { + h: HashMap>, + prior: Option>, + } + + fn spawn_one(s: &Arc>) { + let mut lock = s.lock().unwrap(); + let s = s.clone(); + + let handle = thread::spawn(move || { + let mut lock = s.lock().unwrap(); + + let self_handle = lock.h.remove(&thread::current().id()); + let prior_handle = std::mem::replace(&mut lock.prior, self_handle); + + std::mem::drop(lock); + + if let Some(prior_handle) = prior_handle { + let _ = prior_handle.join(); + } + }); + + lock.h.insert(handle.thread().id(), handle); + + thread::yield_now(); + } + + model(|| { + let state = Arc::new(Mutex::new(State::default())); + + for _ in 0..3 { + spawn_one(&state); + } + + let mut lock = state.lock().unwrap(); + let prior = lock.prior.take(); + let all_threads = std::mem::take(&mut lock.h); + std::mem::drop(lock); + + if let Some(prior) = prior { + let _ = prior.join(); + } + + for (_, handle) in all_threads.into_iter() { + let _ = handle.join(); + } + }); +}