Skip to content

Commit f4b6b84

Browse files
committed
feat: support asyncfd and refactor uring based poller
1 parent 1344092 commit f4b6b84

37 files changed

+907
-431
lines changed

monoio/src/driver/legacy/mod.rs

+33-84
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,10 @@ use std::{
1010

1111
use super::{
1212
op::{CompletionMeta, Op, OpAble},
13+
poll::Poll as LegacyPoll,
1314
ready::{self, Ready},
14-
scheduled_io::ScheduledIo,
1515
Driver, Inner, CURRENT,
1616
};
17-
use crate::utils::slab::Slab;
1817

1918
#[allow(missing_docs, unreachable_pub, dead_code, unused_imports)]
2019
#[cfg(windows)]
@@ -26,15 +25,7 @@ mod waker;
2625
pub(crate) use waker::UnparkHandle;
2726

2827
pub(crate) struct LegacyInner {
29-
pub(crate) io_dispatch: Slab<ScheduledIo>,
30-
#[cfg(unix)]
31-
events: mio::Events,
32-
#[cfg(unix)]
33-
poll: mio::Poll,
34-
#[cfg(windows)]
35-
events: iocp::Events,
36-
#[cfg(windows)]
37-
poll: iocp::Poller,
28+
pub(crate) poller: LegacyPoll,
3829

3930
#[cfg(feature = "sync")]
4031
shared_waker: std::sync::Arc<waker::EventWaker>,
@@ -66,14 +57,10 @@ impl LegacyDriver {
6657
}
6758

6859
pub(crate) fn new_with_entries(entries: u32) -> io::Result<Self> {
69-
#[cfg(unix)]
70-
let poll = mio::Poll::new()?;
71-
#[cfg(windows)]
72-
let poll = iocp::Poller::new()?;
73-
60+
let poller = LegacyPoll::with_capacity(entries as usize)?;
7461
#[cfg(all(unix, feature = "sync"))]
7562
let shared_waker = std::sync::Arc::new(waker::EventWaker::new(mio::Waker::new(
76-
poll.registry(),
63+
poller.poll.registry(),
7764
TOKEN_WAKEUP,
7865
)?));
7966
#[cfg(all(windows, feature = "sync"))]
@@ -87,15 +74,7 @@ impl LegacyDriver {
8774
let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id);
8875

8976
let inner = LegacyInner {
90-
io_dispatch: Slab::new(),
91-
#[cfg(unix)]
92-
events: mio::Events::with_capacity(entries as usize),
93-
#[cfg(unix)]
94-
poll,
95-
#[cfg(windows)]
96-
events: iocp::Events::with_capacity(entries as usize),
97-
#[cfg(windows)]
98-
poll,
77+
poller,
9978
#[cfg(feature = "sync")]
10079
shared_waker,
10180
#[cfg(feature = "sync")]
@@ -150,13 +129,12 @@ impl LegacyDriver {
150129
timeout = Some(Duration::ZERO);
151130
}
152131

153-
// here we borrow 2 mut self, but its safe.
154-
let events = unsafe { &mut (*self.inner.get()).events };
155-
match inner.poll.poll(events, timeout) {
132+
match inner.poller.poll_inside(timeout) {
156133
Ok(_) => {}
157134
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
158135
Err(e) => return Err(e),
159136
}
137+
let events = &inner.poller.events;
160138
#[cfg(unix)]
161139
let iter = events.iter();
162140
#[cfg(windows)]
@@ -166,103 +144,69 @@ impl LegacyDriver {
166144

167145
#[cfg(feature = "sync")]
168146
if token != TOKEN_WAKEUP {
169-
inner.dispatch(token, Ready::from_mio(event));
147+
LegacyPoll::dispatch(
148+
&mut inner.poller.io_dispatch,
149+
token.0,
150+
Ready::from_mio(event),
151+
);
170152
}
171153

172154
#[cfg(not(feature = "sync"))]
173-
inner.dispatch(token, Ready::from_mio(event));
155+
LegacyPoll::dispatch(
156+
&mut inner.poller.io_dispatch,
157+
token.0,
158+
Ready::from_mio(event),
159+
);
174160
}
175161
Ok(())
176162
}
177163

178164
#[cfg(windows)]
165+
#[inline]
179166
pub(crate) fn register(
180167
this: &Rc<UnsafeCell<LegacyInner>>,
181168
state: &mut iocp::SocketState,
182169
interest: mio::Interest,
183170
) -> io::Result<usize> {
184171
let inner = unsafe { &mut *this.get() };
185-
let io = ScheduledIo::default();
186-
let token = inner.io_dispatch.insert(io);
187-
188-
match inner.poll.register(state, mio::Token(token), interest) {
189-
Ok(_) => Ok(token),
190-
Err(e) => {
191-
inner.io_dispatch.remove(token);
192-
Err(e)
193-
}
194-
}
172+
inner.poller.register(state, interest)
195173
}
196174

197175
#[cfg(windows)]
176+
#[inline]
198177
pub(crate) fn deregister(
199178
this: &Rc<UnsafeCell<LegacyInner>>,
200179
token: usize,
201180
state: &mut iocp::SocketState,
202181
) -> io::Result<()> {
203182
let inner = unsafe { &mut *this.get() };
204-
205-
// try to deregister fd first, on success we will remove it from slab.
206-
match inner.poll.deregister(state) {
207-
Ok(_) => {
208-
inner.io_dispatch.remove(token);
209-
Ok(())
210-
}
211-
Err(e) => Err(e),
212-
}
183+
inner.poller.deregister(token, state)
213184
}
214185

215186
#[cfg(unix)]
187+
#[inline]
216188
pub(crate) fn register(
217189
this: &Rc<UnsafeCell<LegacyInner>>,
218190
source: &mut impl mio::event::Source,
219191
interest: mio::Interest,
220192
) -> io::Result<usize> {
221193
let inner = unsafe { &mut *this.get() };
222-
let token = inner.io_dispatch.insert(ScheduledIo::new());
223-
224-
let registry = inner.poll.registry();
225-
match registry.register(source, mio::Token(token), interest) {
226-
Ok(_) => Ok(token),
227-
Err(e) => {
228-
inner.io_dispatch.remove(token);
229-
Err(e)
230-
}
231-
}
194+
inner.poller.register(source, interest)
232195
}
233196

234197
#[cfg(unix)]
198+
#[inline]
235199
pub(crate) fn deregister(
236200
this: &Rc<UnsafeCell<LegacyInner>>,
237201
token: usize,
238202
source: &mut impl mio::event::Source,
239203
) -> io::Result<()> {
240204
let inner = unsafe { &mut *this.get() };
241-
242-
// try to deregister fd first, on success we will remove it from slab.
243-
match inner.poll.registry().deregister(source) {
244-
Ok(_) => {
245-
inner.io_dispatch.remove(token);
246-
Ok(())
247-
}
248-
Err(e) => Err(e),
249-
}
205+
inner.poller.deregister(source, token)
250206
}
251207
}
252208

253209
impl LegacyInner {
254-
fn dispatch(&mut self, token: mio::Token, ready: Ready) {
255-
let mut sio = match self.io_dispatch.get(token.0) {
256-
Some(io) => io,
257-
None => {
258-
return;
259-
}
260-
};
261-
let ref_mut = sio.as_mut();
262-
ref_mut.set_readiness(|curr| curr | ready);
263-
ref_mut.wake(ready);
264-
}
265-
266210
pub(crate) fn poll_op<T: OpAble>(
267211
this: &Rc<UnsafeCell<Self>>,
268212
data: &mut T,
@@ -282,7 +226,11 @@ impl LegacyInner {
282226
};
283227

284228
// wait io ready and do syscall
285-
let mut scheduled_io = inner.io_dispatch.get(index).expect("scheduled_io lost");
229+
let mut scheduled_io = inner
230+
.poller
231+
.io_dispatch
232+
.get(index)
233+
.expect("scheduled_io lost");
286234
let ref_mut = scheduled_io.as_mut();
287235

288236
let readiness = ready!(ref_mut.poll_readiness(cx, direction));
@@ -316,15 +264,16 @@ impl LegacyInner {
316264

317265
pub(crate) fn cancel_op(
318266
this: &Rc<UnsafeCell<LegacyInner>>,
319-
index: usize,
267+
token: usize,
320268
direction: ready::Direction,
321269
) {
322270
let inner = unsafe { &mut *this.get() };
323271
let ready = match direction {
324272
ready::Direction::Read => Ready::READ_CANCELED,
325273
ready::Direction::Write => Ready::WRITE_CANCELED,
274+
ready::Direction::ReadOrWrite => Ready::CANCELED,
326275
};
327-
inner.dispatch(mio::Token(index), ready);
276+
LegacyPoll::dispatch(&mut inner.poller.io_dispatch, token, ready);
328277
}
329278

330279
pub(crate) fn submit_with_data<T>(

monoio/src/driver/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/// Monoio Driver.
22
#[allow(dead_code)]
33
pub(crate) mod op;
4-
#[cfg(all(feature = "poll-io", unix))]
4+
#[cfg(any(feature = "legacy", feature = "poll-io"))]
55
pub(crate) mod poll;
66
#[cfg(any(feature = "legacy", feature = "poll-io"))]
77
pub(crate) mod ready;

monoio/src/driver/op.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ mod accept;
1313
mod connect;
1414
mod fsync;
1515
mod open;
16-
mod poll;
16+
pub(crate) mod poll;
1717
mod read;
1818
mod recv;
1919
mod send;

monoio/src/driver/op/accept.rs

+19-15
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ impl OpAble for Accept {
7676
let addr = self.addr.0.as_mut_ptr() as *mut _;
7777
let len = &mut self.addr.1;
7878

79-
syscall!(accept(fd as _, addr, len), PartialEq::eq, INVALID_SOCKET)
79+
unsafe { syscall!(accept(fd as _, addr, len), PartialEq::eq, INVALID_SOCKET) }
8080
}
8181

8282
#[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))]
@@ -102,12 +102,14 @@ impl OpAble for Accept {
102102
target_os = "netbsd",
103103
target_os = "openbsd"
104104
))]
105-
return syscall_u32!(accept4(
106-
fd,
107-
addr,
108-
len,
109-
libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK,
110-
));
105+
return unsafe {
106+
syscall_u32!(accept4(
107+
fd,
108+
addr,
109+
len,
110+
libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK,
111+
))
112+
};
111113

112114
// But not all platforms have the `accept4(2)` call. Luckily BSD (derived)
113115
// OSes inherit the non-blocking flag from the listener, so we just have to
@@ -119,14 +121,16 @@ impl OpAble for Accept {
119121
target_os = "redox"
120122
))]
121123
return {
122-
let stream_fd = syscall_u32!(accept(fd, addr, len))? as i32;
123-
syscall_u32!(fcntl(stream_fd, libc::F_SETFD, libc::FD_CLOEXEC))
124-
.and_then(|_| syscall_u32!(fcntl(stream_fd, libc::F_SETFL, libc::O_NONBLOCK)))
125-
.map_err(|e| {
126-
let _ = syscall_u32!(close(stream_fd));
127-
e
128-
})?;
129-
Ok(stream_fd as _)
124+
unsafe {
125+
let stream_fd = syscall_u32!(accept(fd, addr, len))? as i32;
126+
syscall_u32!(fcntl(stream_fd, libc::F_SETFD, libc::FD_CLOEXEC))
127+
.and_then(|_| syscall_u32!(fcntl(stream_fd, libc::F_SETFL, libc::O_NONBLOCK)))
128+
.map_err(|e| {
129+
let _ = syscall_u32!(close(stream_fd));
130+
e
131+
})?;
132+
Ok(stream_fd as _)
133+
}
130134
};
131135
}
132136
}

monoio/src/driver/op/close.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ impl OpAble for Close {
4747
#[cfg(any(feature = "legacy", feature = "poll-io"))]
4848
fn legacy_call(&mut self) -> io::Result<u32> {
4949
#[cfg(unix)]
50-
return crate::syscall_u32!(close(self.fd));
50+
return unsafe { crate::syscall_u32!(close(self.fd)) };
5151

5252
#[cfg(windows)]
53-
return syscall!(closesocket(self.fd as _), PartialEq::ne, 0);
53+
return unsafe { syscall!(closesocket(self.fd as _), PartialEq::ne, 0) };
5454
}
5555
}

monoio/src/driver/op/connect.rs

+26-20
Original file line numberDiff line numberDiff line change
@@ -70,27 +70,31 @@ impl OpAble for Connect {
7070
endpoints.sae_dstaddr = self.socket_addr.as_ptr();
7171
endpoints.sae_dstaddrlen = self.socket_addr_len;
7272

73-
return match crate::syscall_u32!(connectx(
74-
self.fd.raw_fd(),
75-
&endpoints as *const _,
76-
libc::SAE_ASSOCID_ANY,
77-
libc::CONNECT_DATA_IDEMPOTENT | libc::CONNECT_RESUME_ON_READ_WRITE,
78-
std::ptr::null(),
79-
0,
80-
std::ptr::null_mut(),
81-
std::ptr::null_mut(),
82-
)) {
73+
return match unsafe {
74+
crate::syscall_u32!(connectx(
75+
self.fd.raw_fd(),
76+
&endpoints as *const _,
77+
libc::SAE_ASSOCID_ANY,
78+
libc::CONNECT_DATA_IDEMPOTENT | libc::CONNECT_RESUME_ON_READ_WRITE,
79+
std::ptr::null(),
80+
0,
81+
std::ptr::null_mut(),
82+
std::ptr::null_mut(),
83+
))
84+
} {
8385
Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err),
8486
_ => Ok(self.fd.raw_fd() as u32),
8587
};
8688
}
8789

8890
#[cfg(unix)]
89-
match crate::syscall_u32!(connect(
90-
self.fd.raw_fd(),
91-
self.socket_addr.as_ptr(),
92-
self.socket_addr_len,
93-
)) {
91+
match unsafe {
92+
crate::syscall_u32!(connect(
93+
self.fd.raw_fd(),
94+
self.socket_addr.as_ptr(),
95+
self.socket_addr_len,
96+
))
97+
} {
9498
Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err),
9599
_ => Ok(self.fd.raw_fd() as u32),
96100
}
@@ -159,11 +163,13 @@ impl OpAble for ConnectUnix {
159163

160164
#[cfg(any(feature = "legacy", feature = "poll-io"))]
161165
fn legacy_call(&mut self) -> io::Result<u32> {
162-
match crate::syscall_u32!(connect(
163-
self.fd.raw_fd(),
164-
&self.socket_addr.0 as *const _ as *const _,
165-
self.socket_addr.1
166-
)) {
166+
match unsafe {
167+
crate::syscall_u32!(connect(
168+
self.fd.raw_fd(),
169+
&self.socket_addr.0 as *const _ as *const _,
170+
self.socket_addr.1
171+
))
172+
} {
167173
Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err),
168174
_ => Ok(self.fd.raw_fd() as u32),
169175
}

0 commit comments

Comments
 (0)