Skip to content

Commit 22658fd

Browse files
committed
Use AtomicWaker in mpsc
AFAIU AtomicWaker is a perfect candidate to use in mpsc receiver. This patch is good because: * it enabled `AtomicWaker` dogfooding * it is a good example of `AtomicWaker` proper (I hope) usage * it kills overcomplicated logic of `ReceiverTask`
1 parent 8021bb8 commit 22658fd

File tree

1 file changed

+16
-88
lines changed
  • futures-channel/src/mpsc

1 file changed

+16
-88
lines changed

futures-channel/src/mpsc/mod.rs

+16-88
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080

8181
use futures_core::stream::Stream;
8282
use futures_core::task::{LocalWaker, Waker, Poll};
83+
use futures_core::task::__internal::AtomicWaker;
8384
use std::any::Any;
8485
use std::error::Error;
8586
use std::fmt;
@@ -291,7 +292,7 @@ struct Inner<T> {
291292
num_senders: AtomicUsize,
292293

293294
// Handle to the receiver's task.
294-
recv_task: Mutex<ReceiverTask>,
295+
recv_task: AtomicWaker,
295296
}
296297

297298
// Struct representation of `Inner::state`.
@@ -304,18 +305,6 @@ struct State {
304305
num_messages: usize,
305306
}
306307

307-
#[derive(Debug)]
308-
struct ReceiverTask {
309-
unparked: bool,
310-
task: Option<Waker>,
311-
}
312-
313-
// Returned from Receiver::try_park()
314-
enum TryPark {
315-
Parked,
316-
NotEmpty,
317-
}
318-
319308
// The `is_open` flag is stored in the left-most bit of `Inner::state`
320309
const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1);
321310

@@ -394,10 +383,7 @@ fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) {
394383
message_queue: Queue::new(),
395384
parked_queue: Queue::new(),
396385
num_senders: AtomicUsize::new(1),
397-
recv_task: Mutex::new(ReceiverTask {
398-
unparked: false,
399-
task: None,
400-
}),
386+
recv_task: AtomicWaker::new(),
401387
});
402388

403389
let tx = Sender {
@@ -512,7 +498,7 @@ impl<T> Sender<T> {
512498

513499
// Signal to the receiver that a message has been enqueued. If the
514500
// receiver is parked, this will unpark the task.
515-
self.signal();
501+
self.inner.recv_task.wake();
516502
}
517503

518504
// Increment the number of queued messages. Returns the resulting number.
@@ -545,35 +531,6 @@ impl<T> Sender<T> {
545531
}
546532
}
547533

548-
// Signal to the receiver task that a message has been enqueued
549-
fn signal(&self) {
550-
// TODO
551-
// This logic can probably be improved by guarding the lock with an
552-
// atomic.
553-
//
554-
// Do this step first so that the lock is dropped when
555-
// `unpark` is called
556-
let task = {
557-
let mut recv_task = self.inner.recv_task.lock().unwrap();
558-
559-
// If the receiver has already been unparked, then there is nothing
560-
// more to do
561-
if recv_task.unparked {
562-
return;
563-
}
564-
565-
// Setting this flag enables the receiving end to detect that
566-
// an unpark event happened in order to avoid unnecessarily
567-
// parking.
568-
recv_task.unparked = true;
569-
recv_task.task.take()
570-
};
571-
572-
if let Some(task) = task {
573-
task.wake();
574-
}
575-
}
576-
577534
fn park(&mut self, lw: Option<&LocalWaker>) {
578535
// TODO: clean up internal state if the task::current will fail
579536

@@ -633,7 +590,7 @@ impl<T> Sender<T> {
633590
// that stuff from `do_send`.
634591

635592
self.inner.set_closed();
636-
self.signal();
593+
self.inner.recv_task.wake();
637594
}
638595

639596
fn poll_unparked(&mut self, lw: Option<&LocalWaker>) -> Poll<()> {
@@ -680,7 +637,7 @@ impl<T> UnboundedSender<T> {
680637
/// Closes this channel from the sender side, preventing any new messages.
681638
pub fn close_channel(&self) {
682639
self.0.inner.set_closed();
683-
self.0.signal();
640+
self.0.inner.recv_task.wake();
684641
}
685642

686643
// Do the send without parking current task.
@@ -847,21 +804,6 @@ impl<T> Receiver<T> {
847804
}
848805
}
849806

850-
// Try to park the receiver task
851-
fn try_park(&self, lw: &LocalWaker) -> TryPark {
852-
// First, track the task in the `recv_task` slot
853-
let mut recv_task = self.inner.recv_task.lock().unwrap();
854-
855-
if recv_task.unparked {
856-
// Consume the `unpark` signal without actually parking
857-
recv_task.unparked = false;
858-
return TryPark::NotEmpty;
859-
}
860-
861-
recv_task.task = Some(lw.clone().into_waker());
862-
TryPark::Parked
863-
}
864-
865807
fn dec_num_messages(&self) {
866808
// OPEN_MASK is highest bit, so it's unaffected by subtraction
867809
// unless there's underflow, and we know there's no underflow
@@ -880,31 +822,17 @@ impl<T> Stream for Receiver<T> {
880822
mut self: Pin<&mut Self>,
881823
lw: &LocalWaker,
882824
) -> Poll<Option<T>> {
883-
loop {
884825
// Try to read a message off of the message queue.
885-
let msg = match self.next_message() {
886-
Poll::Ready(msg) => msg,
887-
Poll::Pending => {
888-
// There are no messages to read, in this case, attempt to
889-
// park. The act of parking will verify that the channel is
890-
// still empty after the park operation has completed.
891-
match self.try_park(lw) {
892-
TryPark::Parked => {
893-
// The task was parked, and the channel is still
894-
// empty, return Pending.
895-
return Poll::Pending;
896-
}
897-
TryPark::NotEmpty => {
898-
// A message has been sent while attempting to
899-
// park. Loop again, the next iteration is
900-
// guaranteed to get the message.
901-
continue;
902-
}
903-
}
904-
}
905-
};
906-
// Return the message
907-
return Poll::Ready(msg);
826+
match self.next_message() {
827+
Poll::Ready(msg) => Poll::Ready(msg),
828+
Poll::Pending => {
829+
// There are no messages to read, in this case, park.
830+
self.inner.recv_task.register(lw);
831+
// Check queue again after parking to prevent race condition:
832+
// a message could be added to the queue after previous `next_message`
833+
// before `register` call.
834+
self.next_message()
835+
}
908836
}
909837
}
910838
}

0 commit comments

Comments
 (0)