Skip to content

Commit c5e394e

Browse files
committed
Do not hang in poll if event loop is destroyed
`Loop` destructor marks all loop handles dead, so any further `poll` requests fail (they currently panic, but should probably return an error). Also `Loop` destructor unparks all tasks waiting for IO on self. It has no effect on tasks bound to self event loop, but tasks running on different executors call `poll` right afterwards, so they immediately fail instead of hanging.
1 parent 03ac1ea commit c5e394e

File tree

2 files changed

+34
-5
lines changed

2 files changed

+34
-5
lines changed

src/event_loop/mod.rs

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ pub struct Loop {
3737
id: usize,
3838
io: mio::Poll,
3939
events: mio::Events,
40-
tx: Arc<Sender<Message>>,
40+
inner: Arc<LoopHandleInner>,
4141
rx: Receiver<Message>,
4242
io_dispatch: RefCell<Slab<ScheduledIo, usize>>,
4343
task_dispatch: RefCell<Slab<ScheduledTask, usize>>,
@@ -62,6 +62,31 @@ pub struct Loop {
6262
timeouts: RefCell<Slab<(Timeout, TimeoutState), usize>>,
6363
}
6464

65+
impl Drop for Loop {
66+
fn drop(&mut self) {
67+
// mark event loop as dead, so all schedule operations will be rejected
68+
self.inner.alive.store(0, Ordering::SeqCst);
69+
70+
// Unpark all tasks.
71+
// It has no effect for tasks in this event loop,
72+
// however tasks in another executors get an error
73+
// when they do `poll` right after wakeup.
74+
for io in self.io_dispatch.borrow_mut().iter_mut() {
75+
if let Some(ref mut reader) = io.reader {
76+
reader.unpark();
77+
}
78+
if let Some(ref mut writer) = io.writer {
79+
writer.unpark();
80+
}
81+
}
82+
}
83+
}
84+
85+
struct LoopHandleInner {
86+
tx: Arc<Sender<Message>>,
87+
alive: AtomicUsize, // 1 iff even loop is alive
88+
}
89+
6590
/// Handle to an event loop, used to construct I/O objects, send messages, and
6691
/// otherwise interact indirectly with the event loop itself.
6792
///
@@ -70,7 +95,7 @@ pub struct Loop {
7095
#[derive(Clone)]
7196
pub struct LoopHandle {
7297
id: usize,
73-
tx: Arc<Sender<Message>>,
98+
inner: Arc<LoopHandleInner>,
7499
}
75100

76101
/// A non-sendable handle to an event loop, useful for manufacturing instances
@@ -145,7 +170,10 @@ impl Loop {
145170
id: NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed),
146171
io: io,
147172
events: mio::Events::with_capacity(1024),
148-
tx: Arc::new(tx),
173+
inner: Arc::new(LoopHandleInner {
174+
tx: Arc::new(tx),
175+
alive: AtomicUsize::new(1),
176+
}),
149177
rx: rx,
150178
io_dispatch: RefCell::new(Slab::with_capacity(SLAB_CAPACITY)),
151179
task_dispatch: RefCell::new(Slab::with_capacity(SLAB_CAPACITY)),
@@ -169,7 +197,7 @@ impl Loop {
169197
pub fn handle(&self) -> LoopHandle {
170198
LoopHandle {
171199
id: self.id,
172-
tx: self.tx.clone(),
200+
inner: self.inner.clone(),
173201
}
174202
}
175203

@@ -516,7 +544,7 @@ impl LoopHandle {
516544
lp.notify(msg);
517545
}
518546
None => {
519-
match self.tx.send(msg) {
547+
match self.inner.tx.send(msg) {
520548
Ok(()) => {}
521549

522550
// This should only happen when there was an error

src/event_loop/source.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ impl LoopHandle {
7575
/// This function will also panic if there is not a currently running future
7676
/// task.
7777
pub fn schedule_read(&self, tok: &IoToken) {
78+
assert!(self.inner.alive.load(Ordering::SeqCst) == 1, "Loop is destroyed");
7879
self.send(Message::Schedule(tok.token, task::park(), Direction::Read));
7980
}
8081

0 commit comments

Comments
 (0)