Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use once_cell from async-lock #95

Merged
merged 4 commits into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ name = "io"
harness = false

[dependencies]
async-lock = "2.6"
concurrent-queue = "1.2.2"
futures-lite = "1.11.0"
log = "0.4.11"
once_cell = "1.4.1"
parking = "2.0.0"
polling = "2.0.0"
slab = "0.4.2"
Expand Down
42 changes: 23 additions & 19 deletions src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::task::{Context, Poll};
use std::thread;
use std::time::{Duration, Instant};

use async_lock::OnceCell;
use futures_lite::pin;
use once_cell::sync::Lazy;
use waker_fn::waker_fn;

use crate::reactor::Reactor;
Expand All @@ -16,25 +16,29 @@ use crate::reactor::Reactor;
static BLOCK_ON_COUNT: AtomicUsize = AtomicUsize::new(0);

/// Unparker for the "async-io" thread.
static UNPARKER: Lazy<parking::Unparker> = Lazy::new(|| {
let (parker, unparker) = parking::pair();

// Spawn a helper thread driving the reactor.
//
// Note that this thread is not exactly necessary, it's only here to help push things
// forward if there are no `Parker`s around or if `Parker`s are just idling and never
// parking.
thread::Builder::new()
.name("async-io".to_string())
.spawn(move || main_loop(parker))
.expect("cannot spawn async-io thread");

unparker
});
fn unparker() -> &'static parking::Unparker {
static UNPARKER: OnceCell<parking::Unparker> = OnceCell::new();

UNPARKER.get_or_init_blocking(|| {
let (parker, unparker) = parking::pair();

// Spawn a helper thread driving the reactor.
//
// Note that this thread is not exactly necessary, it's only here to help push things
// forward if there are no `Parker`s around or if `Parker`s are just idling and never
// parking.
thread::Builder::new()
.name("async-io".to_string())
.spawn(move || main_loop(parker))
.expect("cannot spawn async-io thread");

unparker
})
}

/// Initializes the "async-io" thread.
pub(crate) fn init() {
Lazy::force(&UNPARKER);
let _ = unparker();
}

/// The main loop for the "async-io" thread.
Expand Down Expand Up @@ -109,7 +113,7 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
// Make sure to decrement `BLOCK_ON_COUNT` at the end and wake the "async-io" thread.
let _guard = CallOnDrop(|| {
BLOCK_ON_COUNT.fetch_sub(1, Ordering::SeqCst);
UNPARKER.unpark();
unparker().unpark();
});

// Parker and unparker for notifying the current thread.
Expand Down Expand Up @@ -205,7 +209,7 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {

// Unpark the "async-io" thread in case no other thread is ready to start
// processing I/O events. This way we prevent a potential latency spike.
UNPARKER.unpark();
unparker().unpark();

// Wait for a notification.
p.park();
Expand Down
11 changes: 7 additions & 4 deletions src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use std::sync::{Arc, Mutex, MutexGuard};
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant};

use async_lock::OnceCell;
use concurrent_queue::ConcurrentQueue;
use futures_lite::ready;
use once_cell::sync::Lazy;
use polling::{Event, Poller};
use slab::Slab;

Expand Down Expand Up @@ -67,7 +67,9 @@ pub(crate) struct Reactor {
impl Reactor {
/// Returns a reference to the reactor.
pub(crate) fn get() -> &'static Reactor {
static REACTOR: Lazy<Reactor> = Lazy::new(|| {
static REACTOR: OnceCell<Reactor> = OnceCell::new();

REACTOR.get_or_init_blocking(|| {
crate::driver::init();
Reactor {
poller: Poller::new().expect("cannot initialize I/O event notification"),
Expand All @@ -76,9 +78,10 @@ impl Reactor {
events: Mutex::new(Vec::new()),
timers: Mutex::new(BTreeMap::new()),
timer_ops: ConcurrentQueue::bounded(1000),
#[cfg(loom)]
mock_events: ConcurrentQueue::unbounded(),
taiki-e marked this conversation as resolved.
Show resolved Hide resolved
}
});
&REACTOR
})
}

/// Returns the current ticker.
Expand Down