Skip to content

Commit eb9a0b2

Browse files
committed
feat: support asyncfd and refactor uring based poller
1 parent 1344092 commit eb9a0b2

File tree

19 files changed

+613
-191
lines changed

19 files changed

+613
-191
lines changed

monoio/src/driver/legacy/mod.rs

+33-84
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,10 @@ use std::{
1010

1111
use super::{
1212
op::{CompletionMeta, Op, OpAble},
13+
poll::Poll as LegacyPoll,
1314
ready::{self, Ready},
14-
scheduled_io::ScheduledIo,
1515
Driver, Inner, CURRENT,
1616
};
17-
use crate::utils::slab::Slab;
1817

1918
#[allow(missing_docs, unreachable_pub, dead_code, unused_imports)]
2019
#[cfg(windows)]
@@ -26,15 +25,7 @@ mod waker;
2625
pub(crate) use waker::UnparkHandle;
2726

2827
pub(crate) struct LegacyInner {
29-
pub(crate) io_dispatch: Slab<ScheduledIo>,
30-
#[cfg(unix)]
31-
events: mio::Events,
32-
#[cfg(unix)]
33-
poll: mio::Poll,
34-
#[cfg(windows)]
35-
events: iocp::Events,
36-
#[cfg(windows)]
37-
poll: iocp::Poller,
28+
pub(crate) poller: LegacyPoll,
3829

3930
#[cfg(feature = "sync")]
4031
shared_waker: std::sync::Arc<waker::EventWaker>,
@@ -66,14 +57,10 @@ impl LegacyDriver {
6657
}
6758

6859
pub(crate) fn new_with_entries(entries: u32) -> io::Result<Self> {
69-
#[cfg(unix)]
70-
let poll = mio::Poll::new()?;
71-
#[cfg(windows)]
72-
let poll = iocp::Poller::new()?;
73-
60+
let poller = LegacyPoll::with_capacity(entries as usize)?;
7461
#[cfg(all(unix, feature = "sync"))]
7562
let shared_waker = std::sync::Arc::new(waker::EventWaker::new(mio::Waker::new(
76-
poll.registry(),
63+
poller.poll.registry(),
7764
TOKEN_WAKEUP,
7865
)?));
7966
#[cfg(all(windows, feature = "sync"))]
@@ -87,15 +74,7 @@ impl LegacyDriver {
8774
let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id);
8875

8976
let inner = LegacyInner {
90-
io_dispatch: Slab::new(),
91-
#[cfg(unix)]
92-
events: mio::Events::with_capacity(entries as usize),
93-
#[cfg(unix)]
94-
poll,
95-
#[cfg(windows)]
96-
events: iocp::Events::with_capacity(entries as usize),
97-
#[cfg(windows)]
98-
poll,
77+
poller,
9978
#[cfg(feature = "sync")]
10079
shared_waker,
10180
#[cfg(feature = "sync")]
@@ -150,13 +129,12 @@ impl LegacyDriver {
150129
timeout = Some(Duration::ZERO);
151130
}
152131

153-
// here we borrow 2 mut self, but its safe.
154-
let events = unsafe { &mut (*self.inner.get()).events };
155-
match inner.poll.poll(events, timeout) {
132+
match inner.poller.poll_inside(timeout) {
156133
Ok(_) => {}
157134
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
158135
Err(e) => return Err(e),
159136
}
137+
let events = &inner.poller.events;
160138
#[cfg(unix)]
161139
let iter = events.iter();
162140
#[cfg(windows)]
@@ -166,103 +144,69 @@ impl LegacyDriver {
166144

167145
#[cfg(feature = "sync")]
168146
if token != TOKEN_WAKEUP {
169-
inner.dispatch(token, Ready::from_mio(event));
147+
LegacyPoll::dispatch(
148+
&mut inner.poller.io_dispatch,
149+
token.0,
150+
Ready::from_mio(event),
151+
);
170152
}
171153

172154
#[cfg(not(feature = "sync"))]
173-
inner.dispatch(token, Ready::from_mio(event));
155+
LegacyPoll::dispatch(
156+
&mut inner.poller.io_dispatch,
157+
token.0,
158+
Ready::from_mio(event),
159+
);
174160
}
175161
Ok(())
176162
}
177163

178164
#[cfg(windows)]
165+
#[inline]
179166
pub(crate) fn register(
180167
this: &Rc<UnsafeCell<LegacyInner>>,
181168
state: &mut iocp::SocketState,
182169
interest: mio::Interest,
183170
) -> io::Result<usize> {
184171
let inner = unsafe { &mut *this.get() };
185-
let io = ScheduledIo::default();
186-
let token = inner.io_dispatch.insert(io);
187-
188-
match inner.poll.register(state, mio::Token(token), interest) {
189-
Ok(_) => Ok(token),
190-
Err(e) => {
191-
inner.io_dispatch.remove(token);
192-
Err(e)
193-
}
194-
}
172+
inner.poller.register(state, interest)
195173
}
196174

197175
#[cfg(windows)]
176+
#[inline]
198177
pub(crate) fn deregister(
199178
this: &Rc<UnsafeCell<LegacyInner>>,
200179
token: usize,
201180
state: &mut iocp::SocketState,
202181
) -> io::Result<()> {
203182
let inner = unsafe { &mut *this.get() };
204-
205-
// try to deregister fd first, on success we will remove it from slab.
206-
match inner.poll.deregister(state) {
207-
Ok(_) => {
208-
inner.io_dispatch.remove(token);
209-
Ok(())
210-
}
211-
Err(e) => Err(e),
212-
}
183+
inner.poller.deregister(token, state)
213184
}
214185

215186
#[cfg(unix)]
187+
#[inline]
216188
pub(crate) fn register(
217189
this: &Rc<UnsafeCell<LegacyInner>>,
218190
source: &mut impl mio::event::Source,
219191
interest: mio::Interest,
220192
) -> io::Result<usize> {
221193
let inner = unsafe { &mut *this.get() };
222-
let token = inner.io_dispatch.insert(ScheduledIo::new());
223-
224-
let registry = inner.poll.registry();
225-
match registry.register(source, mio::Token(token), interest) {
226-
Ok(_) => Ok(token),
227-
Err(e) => {
228-
inner.io_dispatch.remove(token);
229-
Err(e)
230-
}
231-
}
194+
inner.poller.register(source, interest)
232195
}
233196

234197
#[cfg(unix)]
198+
#[inline]
235199
pub(crate) fn deregister(
236200
this: &Rc<UnsafeCell<LegacyInner>>,
237201
token: usize,
238202
source: &mut impl mio::event::Source,
239203
) -> io::Result<()> {
240204
let inner = unsafe { &mut *this.get() };
241-
242-
// try to deregister fd first, on success we will remove it from slab.
243-
match inner.poll.registry().deregister(source) {
244-
Ok(_) => {
245-
inner.io_dispatch.remove(token);
246-
Ok(())
247-
}
248-
Err(e) => Err(e),
249-
}
205+
inner.poller.deregister(source, token)
250206
}
251207
}
252208

253209
impl LegacyInner {
254-
fn dispatch(&mut self, token: mio::Token, ready: Ready) {
255-
let mut sio = match self.io_dispatch.get(token.0) {
256-
Some(io) => io,
257-
None => {
258-
return;
259-
}
260-
};
261-
let ref_mut = sio.as_mut();
262-
ref_mut.set_readiness(|curr| curr | ready);
263-
ref_mut.wake(ready);
264-
}
265-
266210
pub(crate) fn poll_op<T: OpAble>(
267211
this: &Rc<UnsafeCell<Self>>,
268212
data: &mut T,
@@ -282,7 +226,11 @@ impl LegacyInner {
282226
};
283227

284228
// wait io ready and do syscall
285-
let mut scheduled_io = inner.io_dispatch.get(index).expect("scheduled_io lost");
229+
let mut scheduled_io = inner
230+
.poller
231+
.io_dispatch
232+
.get(index)
233+
.expect("scheduled_io lost");
286234
let ref_mut = scheduled_io.as_mut();
287235

288236
let readiness = ready!(ref_mut.poll_readiness(cx, direction));
@@ -316,15 +264,16 @@ impl LegacyInner {
316264

317265
pub(crate) fn cancel_op(
318266
this: &Rc<UnsafeCell<LegacyInner>>,
319-
index: usize,
267+
token: usize,
320268
direction: ready::Direction,
321269
) {
322270
let inner = unsafe { &mut *this.get() };
323271
let ready = match direction {
324272
ready::Direction::Read => Ready::READ_CANCELED,
325273
ready::Direction::Write => Ready::WRITE_CANCELED,
274+
ready::Direction::ReadOrWrite => Ready::CANCELED,
326275
};
327-
inner.dispatch(mio::Token(index), ready);
276+
LegacyPoll::dispatch(&mut inner.poller.io_dispatch, token, ready);
328277
}
329278

330279
pub(crate) fn submit_with_data<T>(

monoio/src/driver/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/// Monoio Driver.
22
#[allow(dead_code)]
33
pub(crate) mod op;
4-
#[cfg(all(feature = "poll-io", unix))]
4+
#[cfg(any(feature = "legacy", feature = "poll-io"))]
55
pub(crate) mod poll;
66
#[cfg(any(feature = "legacy", feature = "poll-io"))]
77
pub(crate) mod ready;

monoio/src/driver/op.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ mod accept;
1313
mod connect;
1414
mod fsync;
1515
mod open;
16-
mod poll;
16+
pub(crate) mod poll;
1717
mod read;
1818
mod recv;
1919
mod send;

monoio/src/driver/op/poll.rs

+49-33
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,39 @@ use super::{super::shared_fd::SharedFd, Op, OpAble};
1616
#[cfg(any(feature = "legacy", feature = "poll-io"))]
1717
use crate::driver::ready::Direction;
1818

19+
/// Interest for PollAdd and AsyncFd.
20+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21+
pub(crate) enum PollAddInterest {
22+
Read,
23+
Write,
24+
ReadOrWrite,
25+
}
26+
27+
impl PollAddInterest {
28+
pub(crate) fn to_flags(self) -> i16 {
29+
match self {
30+
PollAddInterest::Read => libc::POLLIN,
31+
PollAddInterest::Write => libc::POLLOUT,
32+
PollAddInterest::ReadOrWrite => libc::POLLIN | libc::POLLOUT,
33+
}
34+
}
35+
36+
#[cfg(any(feature = "legacy", feature = "poll-io"))]
37+
pub(crate) fn to_direction(self) -> Direction {
38+
match self {
39+
PollAddInterest::Read => Direction::Read,
40+
PollAddInterest::Write => Direction::Write,
41+
PollAddInterest::ReadOrWrite => Direction::ReadOrWrite,
42+
}
43+
}
44+
}
45+
1946
pub(crate) struct PollAdd {
2047
/// Holds a strong ref to the FD, preventing the file from being closed
2148
/// while the operation is in-flight.
2249
#[allow(unused)]
2350
fd: SharedFd,
24-
// true: read; false: write
25-
is_read: bool,
51+
interest: PollAddInterest,
2652
#[cfg(any(feature = "legacy", feature = "poll-io"))]
2753
relaxed: bool,
2854
}
@@ -31,7 +57,7 @@ impl Op<PollAdd> {
3157
pub(crate) fn poll_read(fd: &SharedFd, _relaxed: bool) -> io::Result<Op<PollAdd>> {
3258
Op::submit_with(PollAdd {
3359
fd: fd.clone(),
34-
is_read: true,
60+
interest: PollAddInterest::Read,
3561
#[cfg(any(feature = "legacy", feature = "poll-io"))]
3662
relaxed: _relaxed,
3763
})
@@ -40,7 +66,20 @@ impl Op<PollAdd> {
4066
pub(crate) fn poll_write(fd: &SharedFd, _relaxed: bool) -> io::Result<Op<PollAdd>> {
4167
Op::submit_with(PollAdd {
4268
fd: fd.clone(),
43-
is_read: false,
69+
interest: PollAddInterest::Write,
70+
#[cfg(any(feature = "legacy", feature = "poll-io"))]
71+
relaxed: _relaxed,
72+
})
73+
}
74+
75+
pub(crate) fn poll_with_interest(
76+
fd: &SharedFd,
77+
interest: PollAddInterest,
78+
_relaxed: bool,
79+
) -> io::Result<Op<PollAdd>> {
80+
Op::submit_with(PollAdd {
81+
fd: fd.clone(),
82+
interest,
4483
#[cfg(any(feature = "legacy", feature = "poll-io"))]
4584
relaxed: _relaxed,
4685
})
@@ -55,30 +94,15 @@ impl Op<PollAdd> {
5594
impl OpAble for PollAdd {
5695
#[cfg(all(target_os = "linux", feature = "iouring"))]
5796
fn uring_op(&mut self) -> io_uring::squeue::Entry {
58-
opcode::PollAdd::new(
59-
types::Fd(self.fd.raw_fd()),
60-
if self.is_read {
61-
libc::POLLIN as _
62-
} else {
63-
libc::POLLOUT as _
64-
},
65-
)
66-
.build()
97+
opcode::PollAdd::new(types::Fd(self.fd.raw_fd()), self.interest.to_flags() as _).build()
6798
}
6899

69100
#[cfg(any(feature = "legacy", feature = "poll-io"))]
70101
#[inline]
71102
fn legacy_interest(&self) -> Option<(Direction, usize)> {
72-
self.fd.registered_index().map(|idx| {
73-
(
74-
if self.is_read {
75-
Direction::Read
76-
} else {
77-
Direction::Write
78-
},
79-
idx,
80-
)
81-
})
103+
self.fd
104+
.registered_index()
105+
.map(|idx| (self.interest.to_direction(), idx))
82106
}
83107

84108
#[cfg(all(any(feature = "legacy", feature = "poll-io"), not(windows)))]
@@ -88,11 +112,7 @@ impl OpAble for PollAdd {
88112

89113
let mut pollfd = libc::pollfd {
90114
fd: self.fd.as_raw_fd(),
91-
events: if self.is_read {
92-
libc::POLLIN as _
93-
} else {
94-
libc::POLLOUT as _
95-
},
115+
events: self.interest.to_flags(),
96116
revents: 0,
97117
};
98118
let ret = crate::syscall_u32!(poll(&mut pollfd as *mut _, 1, 0))?;
@@ -108,11 +128,7 @@ impl OpAble for PollAdd {
108128
if !self.relaxed {
109129
let mut pollfd = WSAPOLLFD {
110130
fd: self.fd.as_raw_socket() as _,
111-
events: if self.is_read {
112-
POLLIN as _
113-
} else {
114-
POLLOUT as _
115-
},
131+
events: self.interest.to_flags(),
116132
revents: 0,
117133
};
118134
let ret = unsafe { WSAPoll(&mut pollfd as *mut _, 1, 0) };

0 commit comments

Comments
 (0)