diff --git a/Cargo.toml b/Cargo.toml index 91dcd50..ce8043f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,10 +19,10 @@ name = "io" harness = false [dependencies] +async-lock = "2.6" concurrent-queue = "1.2.2" futures-lite = "1.11.0" log = "0.4.11" -once_cell = "1.4.1" parking = "2.0.0" polling = "2.0.0" slab = "0.4.2" diff --git a/src/driver.rs b/src/driver.rs index dd52621..3122f1f 100644 --- a/src/driver.rs +++ b/src/driver.rs @@ -6,8 +6,8 @@ use std::task::{Context, Poll}; use std::thread; use std::time::{Duration, Instant}; +use async_lock::OnceCell; use futures_lite::pin; -use once_cell::sync::Lazy; use waker_fn::waker_fn; use crate::reactor::Reactor; @@ -16,25 +16,29 @@ use crate::reactor::Reactor; static BLOCK_ON_COUNT: AtomicUsize = AtomicUsize::new(0); /// Unparker for the "async-io" thread. -static UNPARKER: Lazy = Lazy::new(|| { - let (parker, unparker) = parking::pair(); - - // Spawn a helper thread driving the reactor. - // - // Note that this thread is not exactly necessary, it's only here to help push things - // forward if there are no `Parker`s around or if `Parker`s are just idling and never - // parking. - thread::Builder::new() - .name("async-io".to_string()) - .spawn(move || main_loop(parker)) - .expect("cannot spawn async-io thread"); - - unparker -}); +fn unparker() -> &'static parking::Unparker { + static UNPARKER: OnceCell = OnceCell::new(); + + UNPARKER.get_or_init_blocking(|| { + let (parker, unparker) = parking::pair(); + + // Spawn a helper thread driving the reactor. + // + // Note that this thread is not exactly necessary, it's only here to help push things + // forward if there are no `Parker`s around or if `Parker`s are just idling and never + // parking. + thread::Builder::new() + .name("async-io".to_string()) + .spawn(move || main_loop(parker)) + .expect("cannot spawn async-io thread"); + + unparker + }) +} /// Initializes the "async-io" thread. pub(crate) fn init() { - Lazy::force(&UNPARKER); + let _ = unparker(); } /// The main loop for the "async-io" thread. @@ -109,7 +113,7 @@ pub fn block_on(future: impl Future) -> T { // Make sure to decrement `BLOCK_ON_COUNT` at the end and wake the "async-io" thread. let _guard = CallOnDrop(|| { BLOCK_ON_COUNT.fetch_sub(1, Ordering::SeqCst); - UNPARKER.unpark(); + unparker().unpark(); }); // Parker and unparker for notifying the current thread. @@ -205,7 +209,7 @@ pub fn block_on(future: impl Future) -> T { // Unpark the "async-io" thread in case no other thread is ready to start // processing I/O events. This way we prevent a potential latency spike. - UNPARKER.unpark(); + unparker().unpark(); // Wait for a notification. p.park(); diff --git a/src/reactor.rs b/src/reactor.rs index b6ae153..32a2a11 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -16,9 +16,9 @@ use std::sync::{Arc, Mutex, MutexGuard}; use std::task::{Context, Poll, Waker}; use std::time::{Duration, Instant}; +use async_lock::OnceCell; use concurrent_queue::ConcurrentQueue; use futures_lite::ready; -use once_cell::sync::Lazy; use polling::{Event, Poller}; use slab::Slab; @@ -67,7 +67,9 @@ pub(crate) struct Reactor { impl Reactor { /// Returns a reference to the reactor. pub(crate) fn get() -> &'static Reactor { - static REACTOR: Lazy = Lazy::new(|| { + static REACTOR: OnceCell = OnceCell::new(); + + REACTOR.get_or_init_blocking(|| { crate::driver::init(); Reactor { poller: Poller::new().expect("cannot initialize I/O event notification"), @@ -77,8 +79,7 @@ impl Reactor { timers: Mutex::new(BTreeMap::new()), timer_ops: ConcurrentQueue::bounded(1000), } - }); - &REACTOR + }) } /// Returns the current ticker.