From 322424ef65f194e884c358cbfa689dd18b6f6b3c Mon Sep 17 00:00:00 2001 From: jtnunley Date: Wed, 28 Sep 2022 08:44:55 -0700 Subject: [PATCH 1/4] Use once_cell from async-lock --- Cargo.toml | 5 ++++- src/driver.rs | 42 +++++++++++++++++++++++------------------- src/reactor.rs | 11 +++++++---- 3 files changed, 34 insertions(+), 24 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 91dcd50..152242f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,16 +19,19 @@ name = "io" harness = false [dependencies] +async-lock = { git = "https://github.com/smol-rs/async-lock.git" } concurrent-queue = "1.2.2" futures-lite = "1.11.0" log = "0.4.11" -once_cell = "1.4.1" parking = "2.0.0" polling = "2.0.0" slab = "0.4.2" socket2 = { version = "0.4.2", features = ["all"] } waker-fn = "1.1.0" +[target.'cfg(loom)'.dependencies] +loom = "0.5" + [build-dependencies] autocfg = "1" diff --git a/src/driver.rs b/src/driver.rs index dd52621..3122f1f 100644 --- a/src/driver.rs +++ b/src/driver.rs @@ -6,8 +6,8 @@ use std::task::{Context, Poll}; use std::thread; use std::time::{Duration, Instant}; +use async_lock::OnceCell; use futures_lite::pin; -use once_cell::sync::Lazy; use waker_fn::waker_fn; use crate::reactor::Reactor; @@ -16,25 +16,29 @@ use crate::reactor::Reactor; static BLOCK_ON_COUNT: AtomicUsize = AtomicUsize::new(0); /// Unparker for the "async-io" thread. -static UNPARKER: Lazy = Lazy::new(|| { - let (parker, unparker) = parking::pair(); - - // Spawn a helper thread driving the reactor. - // - // Note that this thread is not exactly necessary, it's only here to help push things - // forward if there are no `Parker`s around or if `Parker`s are just idling and never - // parking. - thread::Builder::new() - .name("async-io".to_string()) - .spawn(move || main_loop(parker)) - .expect("cannot spawn async-io thread"); - - unparker -}); +fn unparker() -> &'static parking::Unparker { + static UNPARKER: OnceCell = OnceCell::new(); + + UNPARKER.get_or_init_blocking(|| { + let (parker, unparker) = parking::pair(); + + // Spawn a helper thread driving the reactor. + // + // Note that this thread is not exactly necessary, it's only here to help push things + // forward if there are no `Parker`s around or if `Parker`s are just idling and never + // parking. + thread::Builder::new() + .name("async-io".to_string()) + .spawn(move || main_loop(parker)) + .expect("cannot spawn async-io thread"); + + unparker + }) +} /// Initializes the "async-io" thread. pub(crate) fn init() { - Lazy::force(&UNPARKER); + let _ = unparker(); } /// The main loop for the "async-io" thread. @@ -109,7 +113,7 @@ pub fn block_on(future: impl Future) -> T { // Make sure to decrement `BLOCK_ON_COUNT` at the end and wake the "async-io" thread. let _guard = CallOnDrop(|| { BLOCK_ON_COUNT.fetch_sub(1, Ordering::SeqCst); - UNPARKER.unpark(); + unparker().unpark(); }); // Parker and unparker for notifying the current thread. @@ -205,7 +209,7 @@ pub fn block_on(future: impl Future) -> T { // Unpark the "async-io" thread in case no other thread is ready to start // processing I/O events. This way we prevent a potential latency spike. - UNPARKER.unpark(); + unparker().unpark(); // Wait for a notification. p.park(); diff --git a/src/reactor.rs b/src/reactor.rs index b6ae153..5e7adcf 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -16,9 +16,9 @@ use std::sync::{Arc, Mutex, MutexGuard}; use std::task::{Context, Poll, Waker}; use std::time::{Duration, Instant}; +use async_lock::OnceCell; use concurrent_queue::ConcurrentQueue; use futures_lite::ready; -use once_cell::sync::Lazy; use polling::{Event, Poller}; use slab::Slab; @@ -67,7 +67,9 @@ pub(crate) struct Reactor { impl Reactor { /// Returns a reference to the reactor. pub(crate) fn get() -> &'static Reactor { - static REACTOR: Lazy = Lazy::new(|| { + static REACTOR: OnceCell = OnceCell::new(); + + REACTOR.get_or_init_blocking(|| { crate::driver::init(); Reactor { poller: Poller::new().expect("cannot initialize I/O event notification"), @@ -76,9 +78,10 @@ impl Reactor { events: Mutex::new(Vec::new()), timers: Mutex::new(BTreeMap::new()), timer_ops: ConcurrentQueue::bounded(1000), + #[cfg(loom)] + mock_events: ConcurrentQueue::unbounded(), } - }); - &REACTOR + }) } /// Returns the current ticker. From d5fee70e96302c68b2670b3e45ec3a55de04854d Mon Sep 17 00:00:00 2001 From: jtnunley Date: Wed, 28 Sep 2022 09:01:17 -0700 Subject: [PATCH 2/4] Remove accidental loom experiment. --- Cargo.toml | 3 --- 1 file changed, 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 152242f..6e9791d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,9 +29,6 @@ slab = "0.4.2" socket2 = { version = "0.4.2", features = ["all"] } waker-fn = "1.1.0" -[target.'cfg(loom)'.dependencies] -loom = "0.5" - [build-dependencies] autocfg = "1" From d501988dbec3c799ce12d93be9e24b50f13a382d Mon Sep 17 00:00:00 2001 From: jtnunley Date: Tue, 25 Oct 2022 09:42:32 -0700 Subject: [PATCH 3/4] Use released version of async-lock --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 6e9791d..ce8043f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ name = "io" harness = false [dependencies] -async-lock = { git = "https://github.com/smol-rs/async-lock.git" } +async-lock = "2.6" concurrent-queue = "1.2.2" futures-lite = "1.11.0" log = "0.4.11" From 273d4feb250f403b9116050edbcba0def04d1283 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Tue, 25 Oct 2022 09:46:21 -0700 Subject: [PATCH 4/4] Remove loom experiment --- src/reactor.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/reactor.rs b/src/reactor.rs index 5e7adcf..32a2a11 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -78,8 +78,6 @@ impl Reactor { events: Mutex::new(Vec::new()), timers: Mutex::new(BTreeMap::new()), timer_ops: ConcurrentQueue::bounded(1000), - #[cfg(loom)] - mock_events: ConcurrentQueue::unbounded(), } }) }