Skip to content

Commit 29ca6c2

Browse files
committed
feat: support poll-io feature to run epoll over iouring
1 parent c332fb2 commit 29ca6c2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1416
-546
lines changed

Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,5 @@ members = [
77

88
# Internal
99
"examples",
10-
"examples/tokio-io-compat",
1110
]
1211
resolver = "2"

docs/en/compatiable-with-tokio-eco.md

+31-19
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,50 @@
11
---
22
title: Compatible with Tokio ecology
33
date: 2021-12-17 18:00:00
4+
updated: 2024-01-30 15:00:00
45
author: ihciah
56
---
67

7-
# Compatible with Tokio ecology
8-
A large number of existing Rust components are compatible with Tokio, and they directly rely on Tokio's `AsyncRead` and `AsyncWrite` interfaces.
8+
# Compatible with the Tokio Ecosystem
9+
A large number of existing Rust components are compatible with Tokio and directly depend on Tokio's `AsyncRead` and `AsyncWrite` traits.
910

10-
In Monoio, since the bottom layer is an asynchronous system call, we chose a similar tokio-uring approach: providing an IO interface that transfers the ownership of the buffer. But at this stage, obviously there are not many available libraries that can work, so we need to quickly support functions with a certain performance sacrifice.
11+
In Monoio, due to the underlying asynchronous system calls, we chose an approach similar to tokio-uring: providing IO interfaces that transfer buffer ownership. However, at this stage, there are obviously not many libraries available that work with this, so we need some means to be compatible with existing interface components.
1112

12-
## monoio-compat
13-
`monoio-compat` is a compatibility layer that implements Tokio's `AsyncRead` and `AsyncWrite` based on the buffer ownership interface.
13+
Currently, there are 3 ways to achieve compatibility:
14+
15+
## tokio-compat
16+
`tokio-compat` is a feature of monoio. When the `iouring` is disabled and the `legacy` feature is enabled, after turning on the `tokio-compat` feature, `TcpStream`/`UnixStream` will implement `tokio::io::{AsyncRead, AsyncWrite}`.
17+
18+
If you explicitly do not use iouring, then you can provide compatibility in this way. This form of compatibility has no overhead. If you might use iouring, then you should use the `poll-io` feature.
1419

15-
### How it works
16-
For `poll_read`, the remaining capacity of the slice passed by the user will be read first, and then the buffer held by the user will be limited to this capacity and a future will be generated. After that, every time the user `poll_read` again, it will be forwarded to the `poll` method of this future. When returning to `Ready`, the contents of the buffer are copied to the slice passed by the user.
20+
## poll-io
21+
`poll-io` is a feature of monoio. After enabling this feature:
22+
1. `tokio::io` will be re-exported to `monoio::io::poll_io`
23+
2. `TcpStream`/`UnixStream` can be converted to and from `TcpStreamPoll`/`UnixStreamPoll`
24+
3. `TcpStreamPoll`/`UnixStreamPoll` implements `tokio::io::{AsyncRead, AsyncWrite}`
1725

18-
For `poll_write`, the content of the slice passed by the user is copied to its own buffer first, then a future is generated and stored, and Ready is returned immediately. After that, every time the user `poll_write` again, it will first wait for the last content to be sent, then copy the data and return to Ready immediately. It behaves like BufWriter, and may causing errors to be delayed to be perceived.
26+
The underlying implementation of this feature runs epoll to sense fd readiness on top of iouring and directly initiates a syscall. Although this form of compatibility cannot effectively utilize iouring for asynchronous io, its performance is similar to other epoll+syscall implementations without additional copy overhead.
27+
28+
## monoio-compat
29+
`monoio-compat` is a compatibility layer that implements Tokio's `AsyncRead` and `AsyncWrite` based on an interface that transfers buffer ownership.
1930

20-
At the cost, using this compatibility layer will cost you an extra buffer copy.
31+
### How It Works
32+
For `poll_read`, it first reads into the remaining capacity of the slice passed by the user, then restricts its own buffer to that capacity and generates a future. Afterwards, every time the user again calls `poll_read`, it will forward to the `poll` method of this future. When returning `Ready`, it copies the contents of the buffer into the user's passed slice.
2133

22-
For `poll_write`, the content of the slice passed by the user is first copied to its own buffer, and then a future is generated and stored. After that, every time the user `poll_write` again, it will be forwarded to the `poll` method of this future. Return the result to the user when returning `Ready`.
34+
For `poll_write`, it first copies the contents of the user's passed slice into its own buffer, then generates a future and stores it, and immediately returns Ready. Thereafter, every time the user again calls `poll_write`, it will first wait for the content of the last write to be fully sent before copying data and immediately returning Ready. This behavior is similar to that of a BufWriter, which can lead to delayed error detection.
2335

24-
In other words, using this compatibility layer will cost you an extra buffer copy overhead.
36+
The cost of using this compatibility layer is an additional buffer copy overhead.
2537

26-
### Usage restrictions
27-
For write operations, users need to manually call poll_flush or poll_shutdown of AsyncWrite at the end (or the corresponding flush or shutdown methods of AsyncWriteExt), otherwise the data may not be submitted to the kernel (continuous writes do not require manual flushing).
38+
### Usage Restrictions
39+
For write operations, users need to manually call AsyncWrite's poll_flush or poll_shutdown (or the corresponding flush or shutdown methods in AsyncWriteExt) at the end; otherwise, data may not be submitted to the kernel (continuous writes do not require manual flushing).
2840

29-
## Poll-oriented interface and asynchronous system call
30-
There are two ways to express asynchrony in Rust, one is based on `poll` and the other is based on `async`. `poll` is synchronous, semantically expressing an immediate attempt; while `async` is essentially the syntactic sugar of poll, it will swallow the future and generate a state machine, which is executed in a loop during await.
41+
## Poll-Based Interfaces and Asynchronous System Calls
42+
There are two ways to express asynchrony in Rust: one is based on `poll`, and the other is based on `async`. `Poll` is synchronous and semantically expresses the trying immediately; while `async` is essentially syntactic sugar for poll which encapsulates the future and generates a state machine, executing this state machine in a loop when awaiting.
3143

32-
In Tokio, there are methods similar to `poll_read` and `poll_write`, both of which express the semantics of synchronous attempts.
44+
In Tokio, there are methods like `poll_read` and `poll_write` that both express the semantic of synchronous trying.
3345

34-
When it returns to `Pending`, it means that IO is not ready (and the waker of cx is registered for notification), and when it returns to `Ready` it means that IO has been completed. It is easy to implement these two interfaces based on synchronous system calls. Directly make the corresponding system calls and determine the return result. If the IO is not ready, it will be suspended to Reactor.
46+
When `Pending` is returned, it implies that the IO is not ready (and registers a notice with the waker in cx), and when `Ready` is returned, it means the IO has completed. It is easy to implement these two interfaces based on synchronous system calls, by directly making the corresponding system call and judging the return result, and if the IO is not ready, suspend to the Reactor.
3547

36-
However, these two interfaces are difficult to implement under asynchronous system calls. If we have pushed an Op into the io_uring SQ, the state of this syscall is uncertain before the corresponding CQE is consumed. We have no way to provide clear completed or unfinished semantics. In `monoio-compat`, we provide a poll-like interface through some hacks, so the lack of capabilities has led to our use restrictions. Under asynchronous system calls, it is more appropriate to pass the ownership of the buffer and cooperate with `async+await`.
48+
However, these two interfaces are difficult to implement under asynchronous system calls. If we have already pushed an Op into the io_uring SQ, then the status of that syscall is uncertain until we consume the corresponding CQE. We cannot provide a clear completed or not completed semantics. In `monoio-compat`, we provide a poll-like interface through some hacks, so the lack of capabilities leads to our usage restrictions. Under asynchronous system calls, transferring buffer ownership in combination with `async+await` is more appropriate.
3749

38-
At present, the Rust standard library does not have a interface for asynchronous system calls, and there is no related component ecology. We are working hard to solve this problem.
50+
Currently, Rust's standard library does not have a universal interface oriented towards asynchronous system calls, and neither does the related component ecosystem. We are working hard to improve this problem.

docs/zh/compatiable-with-tokio-eco.md

+17-1
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,29 @@
11
---
22
title: 与 Tokio 生态兼容
33
date: 2021-12-17 18:00:00
4+
updated: 2024-01-30 15:00:00
45
author: ihciah
56
---
67

78
# 与 Tokio 生态兼容
89
现有 Rust 组件中有大量的组件与 Tokio 是兼容的,它们直接依赖了 Tokio 的 `AsyncRead``AsyncWrite` 接口。
910

10-
而在 Monoio,由于底层是异步系统调用,所以我们选择了类似 tokio-uring 的做法:提供传 buffer 所有权的 IO 接口。但现阶段明显没有很多可用的库可以工作,所以我们需要以一定的性能牺牲来快速支持功能。
11+
而在 Monoio,由于底层是异步系统调用,所以我们选择了类似 tokio-uring 的做法:提供传 buffer 所有权的 IO 接口。但现阶段明显没有很多可用的库可以工作,所以我们需要一些手段来兼容现有接口的组件。
12+
13+
当前共有 3 种兼容手段:
14+
15+
## tokio-compat
16+
`tokio-compat` 是 monoio 的一个 feature。当关闭 `iouring` 且打开 `legacy` feature 时,开启 `tokio-compat` feature 后,`TcpStream`/`UnixStream` 会实现 `tokio::io::{AsyncRead, AsyncWrite}`
17+
18+
如果你明确不使用 iouring,那么你可以通过这种方式提供兼容性。这种形式的兼容性是没有 overhead 的。如果你有可能会使用 iouring,那么你应该使用 `poll-io` feature。
19+
20+
## poll-io
21+
`poll-io` 是 monoio 的一个 feature。当开启该 feature 后:
22+
1. `tokio::io` 会被 reexport 到 `monoio::io::poll_io`
23+
2. `TcpStream`/`UnixStream` 可以与 `TcpStreamPoll`/`UnixStreamPoll` 互相转换
24+
3. `TcpStreamPoll`/`UnixStreamPoll` 实现 `tokio::io::{AsyncRead, AsyncWrite}`
25+
26+
这个 feature 的底层实现是在 iouring 上运行 epoll 来感知 fd readiness,并直接发起 syscall。这种形式的兼容虽然无法有效利用 iouring 做异步 io,但它的性能与其他基于 epoll+syscall 的实现是类似的,没有额外的拷贝开销。
1127

1228
## monoio-compat
1329
`monoio-compat` 是一个兼容层,它基于 buffer 所有权的接口实现 Tokio 的 `AsyncRead``AsyncWrite`

examples/Cargo.toml

+10-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ monoio = { path = "../monoio", default-features = false, features = [
1717
"legacy",
1818
"macros",
1919
"utils",
20+
"poll-io", # experimental
2021
] }
2122

2223
# Enable tracing and tracing-subscriber for print out runtime debug
@@ -25,15 +26,16 @@ monoio = { path = "../monoio", default-features = false, features = [
2526
# tracing-subscriber = "0.3"
2627

2728
# For hyper examples
28-
hyper = { version = "0.14", features = ["http1", "client", "server", "stream"] }
29+
hyper = { version = "1.1", features = ["http1", "client", "server"] }
30+
http-body-util = "0.1"
2931

3032
# For h2 examples
3133
bytes = { version = "1" }
32-
h2 = { version = "0.3" }
33-
http = { version = "0.2" }
34+
h2 = { version = "0.4" }
35+
http = { version = "1" }
3436

3537
# For hyper and h2 examples
36-
monoio-compat = { path = "../monoio-compat" }
38+
monoio-compat = { path = "../monoio-compat", features = ["hyper"] }
3739

3840
tokio = { version = "1", default-features = false, features = ["io-util"] }
3941
tower-service = "0.3"
@@ -70,6 +72,10 @@ path = "echo.rs"
7072
name = "echo-tfo"
7173
path = "echo_tfo.rs"
7274

75+
[[example]]
76+
name = "echo-poll"
77+
path = "echo_poll.rs"
78+
7379
[[example]]
7480
name = "join"
7581
path = "join.rs"

examples/echo_poll.rs

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
//! An echo example.
2+
//!
3+
//! Run the example and `nc 127.0.0.1 50002` in another shell.
4+
//! All your input will be echoed out.
5+
6+
use monoio::{
7+
io::{
8+
poll_io::{AsyncReadExt, AsyncWriteExt},
9+
IntoPollIo,
10+
},
11+
net::{TcpListener, TcpStream},
12+
};
13+
14+
#[monoio::main(driver = "fusion")]
15+
async fn main() {
16+
let listener = TcpListener::bind("127.0.0.1:50002").unwrap();
17+
println!("listening");
18+
loop {
19+
let incoming = listener.accept().await;
20+
match incoming {
21+
Ok((stream, addr)) => {
22+
println!("accepted a connection from {addr}");
23+
monoio::spawn(echo(stream));
24+
}
25+
Err(e) => {
26+
println!("accepted connection failed: {e}");
27+
return;
28+
}
29+
}
30+
}
31+
}
32+
33+
async fn echo(stream: TcpStream) -> std::io::Result<()> {
34+
// Convert completion-based io to poll-based io(which impl tokio::io)
35+
let mut stream = stream.into_poll_io()?;
36+
let mut buf: Vec<u8> = vec![0; 1024];
37+
let mut res;
38+
loop {
39+
// read
40+
res = stream.read(&mut buf).await?;
41+
if res == 0 {
42+
return Ok(());
43+
}
44+
45+
// write all
46+
stream.write_all(&buf[0..res]).await?;
47+
}
48+
}

examples/hyper_client.rs

+41-108
Original file line numberDiff line numberDiff line change
@@ -1,127 +1,60 @@
1-
//! HTTP client example with hyper in compatible mode.
1+
//! HTTP client example with hyper in poll-io mode.
22
//!
3-
//! It will try to fetch http://127.0.0.1:23300/monoio and print the
3+
//! It will try to fetch http://httpbin.org/ip and print the
44
//! response.
5-
//!
6-
//! Note:
7-
//! It is not recommended to use this example as a production code.
8-
//! The `hyper` require `Send` for a future and obviously the future
9-
//! is not `Send` in monoio. So we just use some unsafe code to let
10-
//! it pass which infact not a good solution but the only way to
11-
//! make it work without modifying hyper.
125
13-
use std::{future::Future, pin::Pin};
6+
use std::io::Write;
147

15-
use monoio_compat::TcpStreamCompat;
8+
use bytes::Bytes;
9+
use http_body_util::{BodyExt, Empty};
10+
use hyper::Request;
11+
use monoio::{io::IntoPollIo, net::TcpStream};
12+
use monoio_compat::hyper::MonoioIo;
1613

17-
#[derive(Clone)]
18-
struct HyperExecutor;
14+
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
1915

20-
impl<F> hyper::rt::Executor<F> for HyperExecutor
21-
where
22-
F: Future + 'static,
23-
F::Output: 'static,
24-
{
25-
fn execute(&self, fut: F) {
26-
monoio::spawn(fut);
27-
}
28-
}
16+
async fn fetch_url(url: hyper::Uri) -> Result<()> {
17+
let host = url.host().expect("uri has no host");
18+
let port = url.port_u16().unwrap_or(80);
19+
let addr = format!("{}:{}", host, port);
20+
let stream = TcpStream::connect(addr).await?.into_poll_io()?;
21+
let io = MonoioIo::new(stream);
2922

30-
#[derive(Clone)]
31-
struct HyperConnector;
23+
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
24+
monoio::spawn(async move {
25+
if let Err(err) = conn.await {
26+
println!("Connection failed: {:?}", err);
27+
}
28+
});
3229

33-
impl tower_service::Service<hyper::Uri> for HyperConnector {
34-
type Response = HyperConnection;
30+
let authority = url.authority().unwrap().clone();
3531

36-
type Error = std::io::Error;
32+
let path = url.path();
33+
let req = Request::builder()
34+
.uri(path)
35+
.header(hyper::header::HOST, authority.as_str())
36+
.body(Empty::<Bytes>::new())?;
3737

38-
#[allow(clippy::type_complexity)]
39-
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
40-
41-
fn poll_ready(
42-
&mut self,
43-
_: &mut std::task::Context<'_>,
44-
) -> std::task::Poll<Result<(), Self::Error>> {
45-
std::task::Poll::Ready(Ok(()))
46-
}
38+
let mut res = sender.send_request(req).await?;
4739

48-
fn call(&mut self, uri: hyper::Uri) -> Self::Future {
49-
let host = uri.host().unwrap();
50-
let port = uri.port_u16().unwrap_or(80);
51-
let address = format!("{host}:{port}");
40+
println!("Response: {}", res.status());
41+
println!("Headers: {:#?}\n", res.headers());
5242

53-
#[allow(clippy::type_complexity)]
54-
let b: Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>> =
55-
Box::pin(async move {
56-
let conn = monoio::net::TcpStream::connect(address).await?;
57-
let hyper_conn = HyperConnection(TcpStreamCompat::new(conn));
58-
Ok(hyper_conn)
59-
});
60-
unsafe { std::mem::transmute(b) }
43+
// Stream the body, writing each chunk to stdout as we get it
44+
// (instead of buffering and printing at the end).
45+
while let Some(next) = res.frame().await {
46+
let frame = next?;
47+
if let Some(chunk) = frame.data_ref() {
48+
std::io::stdout().write_all(chunk)?;
49+
}
6150
}
62-
}
63-
64-
struct HyperConnection(TcpStreamCompat);
51+
println!("\n\nDone!");
6552

66-
impl tokio::io::AsyncRead for HyperConnection {
67-
fn poll_read(
68-
mut self: Pin<&mut Self>,
69-
cx: &mut std::task::Context<'_>,
70-
buf: &mut tokio::io::ReadBuf<'_>,
71-
) -> std::task::Poll<std::io::Result<()>> {
72-
Pin::new(&mut self.0).poll_read(cx, buf)
73-
}
53+
Ok(())
7454
}
7555

76-
impl tokio::io::AsyncWrite for HyperConnection {
77-
fn poll_write(
78-
mut self: Pin<&mut Self>,
79-
cx: &mut std::task::Context<'_>,
80-
buf: &[u8],
81-
) -> std::task::Poll<Result<usize, std::io::Error>> {
82-
Pin::new(&mut self.0).poll_write(cx, buf)
83-
}
84-
85-
fn poll_flush(
86-
mut self: Pin<&mut Self>,
87-
cx: &mut std::task::Context<'_>,
88-
) -> std::task::Poll<Result<(), std::io::Error>> {
89-
Pin::new(&mut self.0).poll_flush(cx)
90-
}
91-
92-
fn poll_shutdown(
93-
mut self: Pin<&mut Self>,
94-
cx: &mut std::task::Context<'_>,
95-
) -> std::task::Poll<Result<(), std::io::Error>> {
96-
Pin::new(&mut self.0).poll_shutdown(cx)
97-
}
98-
}
99-
100-
impl hyper::client::connect::Connection for HyperConnection {
101-
fn connected(&self) -> hyper::client::connect::Connected {
102-
hyper::client::connect::Connected::new()
103-
}
104-
}
105-
106-
#[allow(clippy::non_send_fields_in_send_ty)]
107-
unsafe impl Send for HyperConnection {}
108-
10956
#[monoio::main]
11057
async fn main() {
111-
println!("Running http client");
112-
let connector = HyperConnector;
113-
let client = hyper::Client::builder()
114-
.executor(HyperExecutor)
115-
.build::<HyperConnector, hyper::Body>(connector);
116-
let res = client
117-
.get("http://127.0.0.1:23300/monoio".parse().unwrap())
118-
.await
119-
.expect("failed to fetch");
120-
println!("Response status: {}", res.status());
121-
let body = hyper::body::to_bytes(res.into_body())
122-
.await
123-
.expect("failed to read body");
124-
let body =
125-
String::from_utf8(body.into_iter().collect()).expect("failed to convert body to string");
126-
println!("Response body: {body}");
58+
let url = "http://httpbin.org/ip".parse::<hyper::Uri>().unwrap();
59+
fetch_url(url).await.unwrap();
12760
}

0 commit comments

Comments
 (0)