diff --git a/futures-channel/src/mpsc/mod.rs b/futures-channel/src/mpsc/mod.rs index 20d16a1dc7..4dd54fbe48 100644 --- a/futures-channel/src/mpsc/mod.rs +++ b/futures-channel/src/mpsc/mod.rs @@ -80,6 +80,7 @@ use futures_core::stream::Stream; use futures_core::task::{LocalWaker, Waker, Poll}; +use futures_core::task::__internal::AtomicWaker; use std::any::Any; use std::error::Error; use std::fmt; @@ -291,7 +292,7 @@ struct Inner { num_senders: AtomicUsize, // Handle to the receiver's task. - recv_task: Mutex, + recv_task: AtomicWaker, } // Struct representation of `Inner::state`. @@ -304,18 +305,6 @@ struct State { num_messages: usize, } -#[derive(Debug)] -struct ReceiverTask { - unparked: bool, - task: Option, -} - -// Returned from Receiver::try_park() -enum TryPark { - Parked, - NotEmpty, -} - // The `is_open` flag is stored in the left-most bit of `Inner::state` const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1); @@ -394,10 +383,7 @@ fn channel2(buffer: Option) -> (Sender, Receiver) { message_queue: Queue::new(), parked_queue: Queue::new(), num_senders: AtomicUsize::new(1), - recv_task: Mutex::new(ReceiverTask { - unparked: false, - task: None, - }), + recv_task: AtomicWaker::new(), }); let tx = Sender { @@ -512,7 +498,7 @@ impl Sender { // Signal to the receiver that a message has been enqueued. If the // receiver is parked, this will unpark the task. - self.signal(); + self.inner.recv_task.wake(); } // Increment the number of queued messages. Returns the resulting number. @@ -545,35 +531,6 @@ impl Sender { } } - // Signal to the receiver task that a message has been enqueued - fn signal(&self) { - // TODO - // This logic can probably be improved by guarding the lock with an - // atomic. - // - // Do this step first so that the lock is dropped when - // `unpark` is called - let task = { - let mut recv_task = self.inner.recv_task.lock().unwrap(); - - // If the receiver has already been unparked, then there is nothing - // more to do - if recv_task.unparked { - return; - } - - // Setting this flag enables the receiving end to detect that - // an unpark event happened in order to avoid unnecessarily - // parking. - recv_task.unparked = true; - recv_task.task.take() - }; - - if let Some(task) = task { - task.wake(); - } - } - fn park(&mut self, lw: Option<&LocalWaker>) { // TODO: clean up internal state if the task::current will fail @@ -633,7 +590,7 @@ impl Sender { // that stuff from `do_send`. self.inner.set_closed(); - self.signal(); + self.inner.recv_task.wake(); } fn poll_unparked(&mut self, lw: Option<&LocalWaker>) -> Poll<()> { @@ -680,7 +637,7 @@ impl UnboundedSender { /// Closes this channel from the sender side, preventing any new messages. pub fn close_channel(&self) { self.0.inner.set_closed(); - self.0.signal(); + self.0.inner.recv_task.wake(); } // Do the send without parking current task. @@ -847,21 +804,6 @@ impl Receiver { } } - // Try to park the receiver task - fn try_park(&self, lw: &LocalWaker) -> TryPark { - // First, track the task in the `recv_task` slot - let mut recv_task = self.inner.recv_task.lock().unwrap(); - - if recv_task.unparked { - // Consume the `unpark` signal without actually parking - recv_task.unparked = false; - return TryPark::NotEmpty; - } - - recv_task.task = Some(lw.clone().into_waker()); - TryPark::Parked - } - fn dec_num_messages(&self) { // OPEN_MASK is highest bit, so it's unaffected by subtraction // unless there's underflow, and we know there's no underflow @@ -880,31 +822,17 @@ impl Stream for Receiver { mut self: Pin<&mut Self>, lw: &LocalWaker, ) -> Poll> { - loop { // Try to read a message off of the message queue. - let msg = match self.next_message() { - Poll::Ready(msg) => msg, - Poll::Pending => { - // There are no messages to read, in this case, attempt to - // park. The act of parking will verify that the channel is - // still empty after the park operation has completed. - match self.try_park(lw) { - TryPark::Parked => { - // The task was parked, and the channel is still - // empty, return Pending. - return Poll::Pending; - } - TryPark::NotEmpty => { - // A message has been sent while attempting to - // park. Loop again, the next iteration is - // guaranteed to get the message. - continue; - } - } - } - }; - // Return the message - return Poll::Ready(msg); + match self.next_message() { + Poll::Ready(msg) => Poll::Ready(msg), + Poll::Pending => { + // There are no messages to read, in this case, park. + self.inner.recv_task.register(lw); + // Check queue again after parking to prevent race condition: + // a message could be added to the queue after previous `next_message` + // before `register` call. + self.next_message() + } } } } diff --git a/futures-core/Cargo.toml b/futures-core/Cargo.toml index 8269780657..e14e0aa8b7 100644 --- a/futures-core/Cargo.toml +++ b/futures-core/Cargo.toml @@ -1,3 +1,5 @@ +cargo-features = ["rename-dependency"] + [package] name = "futures-core-preview" edition = "2018" @@ -17,6 +19,10 @@ name = "futures_core" [features] default = ["std"] std = ["either/use_std"] +nightly = [] [dependencies] either = { version = "1.4", default-features = false, optional = true } + +[dev-dependencies] +futures-preview = { path = "../futures", version = "0.3.0-alpha.9" } diff --git a/futures-core/src/lib.rs b/futures-core/src/lib.rs index 48813e8cb0..9b54d2aa54 100644 --- a/futures-core/src/lib.rs +++ b/futures-core/src/lib.rs @@ -1,6 +1,7 @@ //! Core traits and types for asynchronous operations in Rust. #![feature(pin, arbitrary_self_types, futures_api)] +#![cfg_attr(feature = "nightly", feature(cfg_target_has_atomic))] #![cfg_attr(not(feature = "std"), no_std)] diff --git a/futures-util/src/task/atomic_waker.rs b/futures-core/src/task/__internal/atomic_waker.rs similarity index 99% rename from futures-util/src/task/atomic_waker.rs rename to futures-core/src/task/__internal/atomic_waker.rs index 45d8695e60..7f6d945517 100644 --- a/futures-util/src/task/atomic_waker.rs +++ b/futures-core/src/task/__internal/atomic_waker.rs @@ -2,7 +2,7 @@ use core::fmt; use core::cell::UnsafeCell; use core::sync::atomic::AtomicUsize; use core::sync::atomic::Ordering::{Acquire, Release, AcqRel}; -use futures_core::task::{LocalWaker, Waker}; +use crate::task::{LocalWaker, Waker}; /// A synchronization primitive for task wakeup. /// diff --git a/futures-core/src/task/__internal/mod.rs b/futures-core/src/task/__internal/mod.rs new file mode 100644 index 0000000000..822ff8fa68 --- /dev/null +++ b/futures-core/src/task/__internal/mod.rs @@ -0,0 +1,10 @@ +#[cfg_attr( + feature = "nightly", + cfg(all(target_has_atomic = "cas", target_has_atomic = "ptr")) +)] +mod atomic_waker; +#[cfg_attr( + feature = "nightly", + cfg(all(target_has_atomic = "cas", target_has_atomic = "ptr")) +)] +pub use self::atomic_waker::AtomicWaker; diff --git a/futures-core/src/task/mod.rs b/futures-core/src/task/mod.rs index be168862ba..95dc3cb57f 100644 --- a/futures-core/src/task/mod.rs +++ b/futures-core/src/task/mod.rs @@ -1,6 +1,8 @@ //! Task notification. mod spawn; +#[doc(hidden)] +pub mod __internal; pub use self::spawn::{Spawn, LocalSpawn, SpawnError}; pub use core::task::{Poll, Waker, LocalWaker, UnsafeWake}; diff --git a/futures-util/src/task/mod.rs b/futures-util/src/task/mod.rs index d3d4ac29c5..93266e6f16 100644 --- a/futures-util/src/task/mod.rs +++ b/futures-util/src/task/mod.rs @@ -15,9 +15,4 @@ pub use self::local_waker_ref::{local_waker_ref, local_waker_ref_from_nonlocal, feature = "nightly", cfg(all(target_has_atomic = "cas", target_has_atomic = "ptr")) )] -mod atomic_waker; -#[cfg_attr( - feature = "nightly", - cfg(all(target_has_atomic = "cas", target_has_atomic = "ptr")) -)] -pub use self::atomic_waker::AtomicWaker; +pub use futures_core::task::__internal::AtomicWaker; diff --git a/futures/Cargo.toml b/futures/Cargo.toml index 14cfcbe6f3..bb6e7a830b 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -36,7 +36,7 @@ futures-test-preview = { path = "../futures-test", version = "0.3.0-alpha.9", de tokio = "0.1.11" [features] -nightly = ["futures-util-preview/nightly"] +nightly = ["futures-util-preview/nightly", "futures-core-preview/nightly"] std = ["futures-core-preview/std", "futures-executor-preview/std", "futures-io-preview/std", "futures-sink-preview/std", "futures-util-preview/std"] default = ["std"] compat = ["std", "futures-util-preview/compat"]