Skip to content

Commit 4d54f3a

Browse files
committed
util: add track_caller to public APIs
Functions that may panic can be annotated with `#[track_caller]` so that in the event of a panic, the function where the user called the panicking function is shown instead of the file and line within Tokio source. This change adds `#[track_caller]` to all the non-unstable public APIs in tokio-util where the documentation describes how the function may panic due to incorrect context or inputs. In one place, an assert was added where the described behavior appeared not to be implemented. The documentation for `DelayQueue::reserve` states that the function will panic if the new capacity exceeds the maximum number of entries the queue can contain. However, the function didn't panic until a higher number caused by an allocation failure. This is inconsistent with `DelayQueue::insert_at` which will panic if the number of entries were to go over MAX_ENTRIES. Tests are included to cover each potentially panicking function. Refs: #4413
1 parent 159508b commit 4d54f3a

File tree

7 files changed

+255
-1
lines changed

7 files changed

+255
-1
lines changed

tokio-util/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ tokio-stream = { version = "0.1", path = "../tokio-stream" }
5555
async-stream = "0.3.0"
5656
futures = "0.3.0"
5757
futures-test = "0.3.5"
58+
parking_lot = "0.12.0"
5859

5960
[package.metadata.docs.rs]
6061
all-features = true

tokio-util/src/io/sync_bridge.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,10 @@ impl<T: Unpin> SyncIoBridge<T> {
8585
///
8686
/// Use e.g. `SyncIoBridge::new(Box::pin(src))`.
8787
///
88-
/// # Panic
88+
/// # Panics
8989
///
9090
/// This will panic if called outside the context of a Tokio runtime.
91+
#[track_caller]
9192
pub fn new(src: T) -> Self {
9293
Self::new_with_handle(src, tokio::runtime::Handle::current())
9394
}

tokio-util/src/sync/mpsc.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ impl<T: Send + 'static> PollSender<T> {
136136
///
137137
/// If `poll_reserve` was not successfully called prior to calling `send_item`, then this method
138138
/// will panic.
139+
#[track_caller]
139140
pub fn send_item(&mut self, value: T) -> Result<(), PollSendError<T>> {
140141
let (result, next_state) = match self.take_state() {
141142
State::Idle(_) | State::Acquiring => {

tokio-util/src/task/spawn_pinned.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ impl LocalPoolHandle {
5757
/// pool via [`LocalPoolHandle::spawn_pinned`].
5858
///
5959
/// # Panics
60+
///
6061
/// Panics if the pool size is less than one.
62+
#[track_caller]
6163
pub fn new(pool_size: usize) -> LocalPoolHandle {
6264
assert!(pool_size > 0);
6365

@@ -167,6 +169,7 @@ impl LocalPoolHandle {
167169
/// }
168170
/// ```
169171
///
172+
#[track_caller]
170173
pub fn spawn_pinned_by_idx<F, Fut>(&self, create_task: F, idx: usize) -> JoinHandle<Fut::Output>
171174
where
172175
F: FnOnce() -> Fut,
@@ -196,6 +199,7 @@ struct LocalPool {
196199

197200
impl LocalPool {
198201
/// Spawn a `?Send` future onto a worker
202+
#[track_caller]
199203
fn spawn_pinned<F, Fut>(
200204
&self,
201205
create_task: F,
@@ -324,6 +328,7 @@ impl LocalPool {
324328
}
325329
}
326330

331+
#[track_caller]
327332
fn find_worker_by_idx(&self, idx: usize) -> (&LocalWorkerHandle, JobCountGuard) {
328333
let worker = &self.workers[idx];
329334
worker.task_count.fetch_add(1, Ordering::SeqCst);

tokio-util/src/time/delay_queue.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,7 @@ impl<T> DelayQueue<T> {
531531
/// [`reset`]: method@Self::reset
532532
/// [`Key`]: struct@Key
533533
/// [type]: #
534+
#[track_caller]
534535
pub fn insert_at(&mut self, value: T, when: Instant) -> Key {
535536
assert!(self.slab.len() < MAX_ENTRIES, "max entries exceeded");
536537

@@ -649,10 +650,12 @@ impl<T> DelayQueue<T> {
649650
/// [`reset`]: method@Self::reset
650651
/// [`Key`]: struct@Key
651652
/// [type]: #
653+
#[track_caller]
652654
pub fn insert(&mut self, value: T, timeout: Duration) -> Key {
653655
self.insert_at(value, Instant::now() + timeout)
654656
}
655657

658+
#[track_caller]
656659
fn insert_idx(&mut self, when: u64, key: Key) {
657660
use self::wheel::{InsertError, Stack};
658661

@@ -674,6 +677,7 @@ impl<T> DelayQueue<T> {
674677
/// # Panics
675678
///
676679
/// Panics if the key is not contained in the expired queue or the wheel.
680+
#[track_caller]
677681
fn remove_key(&mut self, key: &Key) {
678682
use crate::time::wheel::Stack;
679683

@@ -713,6 +717,7 @@ impl<T> DelayQueue<T> {
713717
/// assert_eq!(*item.get_ref(), "foo");
714718
/// # }
715719
/// ```
720+
#[track_caller]
716721
pub fn remove(&mut self, key: &Key) -> Expired<T> {
717722
let prev_deadline = self.next_deadline();
718723

@@ -769,6 +774,7 @@ impl<T> DelayQueue<T> {
769774
/// // "foo" is now scheduled to be returned in 10 seconds
770775
/// # }
771776
/// ```
777+
#[track_caller]
772778
pub fn reset_at(&mut self, key: &Key, when: Instant) {
773779
self.remove_key(key);
774780

@@ -873,6 +879,7 @@ impl<T> DelayQueue<T> {
873879
/// // "foo"is now scheduled to be returned in 10 seconds
874880
/// # }
875881
/// ```
882+
#[track_caller]
876883
pub fn reset(&mut self, key: &Key, timeout: Duration) {
877884
self.reset_at(key, Instant::now() + timeout);
878885
}
@@ -978,7 +985,12 @@ impl<T> DelayQueue<T> {
978985
/// assert!(delay_queue.capacity() >= 11);
979986
/// # }
980987
/// ```
988+
#[track_caller]
981989
pub fn reserve(&mut self, additional: usize) {
990+
assert!(
991+
self.slab.capacity() + additional <= MAX_ENTRIES,
992+
"max queue capacity exceeded"
993+
);
982994
self.slab.reserve(additional);
983995
}
984996

@@ -1117,6 +1129,7 @@ impl<T> wheel::Stack for Stack<T> {
11171129
}
11181130
}
11191131

1132+
#[track_caller]
11201133
fn remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store) {
11211134
let key = *item;
11221135
assert!(store.contains(item));

tokio-util/src/time/wheel/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ where
118118
}
119119

120120
/// Remove `item` from the timing wheel.
121+
#[track_caller]
121122
pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) {
122123
let when = T::when(item, store);
123124

tokio-util/tests/panic.rs

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
#![warn(rust_2018_idioms)]
2+
#![cfg(feature = "full")]
3+
4+
use parking_lot::{const_mutex, Mutex};
5+
use std::error::Error;
6+
use std::panic;
7+
use std::sync::Arc;
8+
use tokio::runtime::Runtime;
9+
use tokio::sync::mpsc::channel;
10+
use tokio::time::{Duration, Instant};
11+
use tokio_test::task;
12+
use tokio_util::io::SyncIoBridge;
13+
use tokio_util::sync::PollSender;
14+
use tokio_util::task::LocalPoolHandle;
15+
use tokio_util::time::DelayQueue;
16+
17+
fn test_panic<Func: FnOnce() + panic::UnwindSafe>(func: Func) -> Option<String> {
18+
static PANIC_MUTEX: Mutex<()> = const_mutex(());
19+
20+
{
21+
let _guard = PANIC_MUTEX.lock();
22+
let panic_file: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
23+
24+
let prev_hook = panic::take_hook();
25+
{
26+
let panic_file = panic_file.clone();
27+
panic::set_hook(Box::new(move |panic_info| {
28+
let panic_location = panic_info.location().unwrap();
29+
panic_file
30+
.lock()
31+
.clone_from(&Some(panic_location.file().to_string()));
32+
}));
33+
}
34+
35+
let result = panic::catch_unwind(func);
36+
// Return to the previously set panic hook (maybe default) so that we get nice error
37+
// messages in the tests.
38+
panic::set_hook(prev_hook);
39+
40+
if result.is_err() {
41+
panic_file.lock().clone()
42+
} else {
43+
None
44+
}
45+
}
46+
}
47+
48+
#[test]
49+
fn sync_bridge_new_panic_caller() -> Result<(), Box<dyn Error>> {
50+
let panic_location_file = test_panic(|| {
51+
let _ = SyncIoBridge::new(tokio::io::empty());
52+
});
53+
54+
// The panic location should be in this file
55+
assert_eq!(&panic_location_file.unwrap(), file!());
56+
57+
Ok(())
58+
}
59+
60+
#[test]
61+
fn poll_sender_send_item_panic_caller() -> Result<(), Box<dyn Error>> {
62+
let panic_location_file = test_panic(|| {
63+
let (send, _) = channel::<u32>(3);
64+
let mut send = PollSender::new(send);
65+
66+
let _ = send.send_item(42);
67+
});
68+
69+
// The panic location should be in this file
70+
assert_eq!(&panic_location_file.unwrap(), file!());
71+
72+
Ok(())
73+
}
74+
75+
#[test]
76+
77+
fn local_pool_handle_new_panic_caller() -> Result<(), Box<dyn Error>> {
78+
let panic_location_file = test_panic(|| {
79+
let _ = LocalPoolHandle::new(0);
80+
});
81+
82+
// The panic location should be in this file
83+
assert_eq!(&panic_location_file.unwrap(), file!());
84+
85+
Ok(())
86+
}
87+
88+
#[test]
89+
90+
fn local_pool_handle_spawn_pinned_by_idx_panic_caller() -> Result<(), Box<dyn Error>> {
91+
let panic_location_file = test_panic(|| {
92+
let rt = basic();
93+
94+
rt.block_on(async {
95+
let handle = LocalPoolHandle::new(2);
96+
handle.spawn_pinned_by_idx(|| async { "test" }, 3);
97+
});
98+
});
99+
100+
// The panic location should be in this file
101+
assert_eq!(&panic_location_file.unwrap(), file!());
102+
103+
Ok(())
104+
}
105+
#[test]
106+
fn delay_queue_insert_at_panic_caller() -> Result<(), Box<dyn Error>> {
107+
let panic_location_file = test_panic(|| {
108+
let rt = basic();
109+
rt.block_on(async {
110+
let mut queue = task::spawn(DelayQueue::with_capacity(3));
111+
112+
let _k = queue.insert_at(
113+
"1",
114+
// ~24,855 days in the future
115+
Instant::now() + Duration::from_secs(2_u64.pow(31)),
116+
);
117+
});
118+
});
119+
120+
// The panic location should be in this file
121+
assert_eq!(&panic_location_file.unwrap(), file!());
122+
123+
Ok(())
124+
}
125+
126+
#[test]
127+
fn delay_queue_insert_panic_caller() -> Result<(), Box<dyn Error>> {
128+
let panic_location_file = test_panic(|| {
129+
let rt = basic();
130+
rt.block_on(async {
131+
let mut queue = task::spawn(DelayQueue::with_capacity(3));
132+
133+
let _k = queue.insert(
134+
"1",
135+
// ~24,855 days
136+
Duration::from_secs(2_u64.pow(31)),
137+
);
138+
});
139+
});
140+
141+
// The panic location should be in this file
142+
assert_eq!(&panic_location_file.unwrap(), file!());
143+
144+
Ok(())
145+
}
146+
147+
#[test]
148+
fn delay_queue_remove_panic_caller() -> Result<(), Box<dyn Error>> {
149+
let panic_location_file = test_panic(|| {
150+
let rt = basic();
151+
rt.block_on(async {
152+
let mut queue = task::spawn(DelayQueue::with_capacity(3));
153+
154+
let key = queue.insert_at("1", Instant::now());
155+
queue.remove(&key);
156+
queue.remove(&key);
157+
});
158+
});
159+
160+
// The panic location should be in this file
161+
assert_eq!(&panic_location_file.unwrap(), file!());
162+
163+
Ok(())
164+
}
165+
166+
#[test]
167+
fn delay_queue_reset_at_panic_caller() -> Result<(), Box<dyn Error>> {
168+
let panic_location_file = test_panic(|| {
169+
let rt = basic();
170+
rt.block_on(async {
171+
let mut queue = task::spawn(DelayQueue::with_capacity(3));
172+
173+
let key = queue.insert_at("1", Instant::now());
174+
queue.reset_at(
175+
&key,
176+
// ~24,855 days in the future
177+
Instant::now() + Duration::from_secs(2_u64.pow(31)),
178+
);
179+
});
180+
});
181+
182+
// The panic location should be in this file
183+
assert_eq!(&panic_location_file.unwrap(), file!());
184+
185+
Ok(())
186+
}
187+
188+
#[test]
189+
fn delay_queue_reset_panic_caller() -> Result<(), Box<dyn Error>> {
190+
let panic_location_file = test_panic(|| {
191+
let rt = basic();
192+
rt.block_on(async {
193+
let mut queue = task::spawn(DelayQueue::with_capacity(3));
194+
195+
let key = queue.insert_at("1", Instant::now());
196+
queue.reset(
197+
&key,
198+
// ~24,855 days
199+
Duration::from_secs(2_u64.pow(31)),
200+
);
201+
});
202+
});
203+
204+
// The panic location should be in this file
205+
assert_eq!(&panic_location_file.unwrap(), file!());
206+
207+
Ok(())
208+
}
209+
210+
#[test]
211+
fn delay_queue_reserve_panic_caller() -> Result<(), Box<dyn Error>> {
212+
let panic_location_file = test_panic(|| {
213+
let rt = basic();
214+
rt.block_on(async {
215+
let mut queue = task::spawn(DelayQueue::<u32>::with_capacity(3));
216+
217+
queue.reserve((1 << 30) as usize);
218+
});
219+
});
220+
221+
// The panic location should be in this file
222+
assert_eq!(&panic_location_file.unwrap(), file!());
223+
224+
Ok(())
225+
}
226+
227+
fn basic() -> Runtime {
228+
tokio::runtime::Builder::new_current_thread()
229+
.enable_all()
230+
.build()
231+
.unwrap()
232+
}

0 commit comments

Comments
 (0)