Skip to content

Commit

Permalink
add HttpServiceBuilder::h2::io_uring. (#915)
Browse files Browse the repository at this point in the history
  • Loading branch information
fakeshadow authored Feb 1, 2024
1 parent b5986c0 commit 2863b40
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 49 deletions.
2 changes: 2 additions & 0 deletions http/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub(crate) mod marker {
pub struct Http1;
#[cfg(all(feature = "io-uring", feature = "http1"))]
pub struct Http1Uring;
#[cfg(all(feature = "io-uring", feature = "http2"))]
pub struct Http2Uring;
#[cfg(feature = "http2")]
pub struct Http2;
}
Expand Down
45 changes: 45 additions & 0 deletions http/src/h2/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,32 @@ use super::service::H2Service;

type Error = Box<dyn fmt::Debug>;

#[cfg(feature = "io-uring")]
impl<St, FA, const HEADER_LIMIT: usize, const READ_BUF_LIMIT: usize, const WRITE_BUF_LIMIT: usize>
HttpServiceBuilder<marker::Http2, St, FA, HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>
{
/// transform Self to a http2 service builder that producing a service that able to handle [xitca_io::net::io_uring::TcpStream]
pub fn io_uring(
self,
) -> HttpServiceBuilder<
marker::Http2Uring,
xitca_io::net::io_uring::TcpStream,
FA,
HEADER_LIMIT,
READ_BUF_LIMIT,
WRITE_BUF_LIMIT,
>
where
FA: Service,
{
HttpServiceBuilder {
tls_factory: self.tls_factory,
config: self.config,
_body: std::marker::PhantomData,
}
}
}

impl<St, FA, S, E, const HEADER_LIMIT: usize, const READ_BUF_LIMIT: usize, const WRITE_BUF_LIMIT: usize>
Service<Result<S, E>> for HttpServiceBuilder<marker::Http2, St, FA, HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>
where
Expand All @@ -24,3 +50,22 @@ where
Ok(H2Service::new(self.config, service, tls_acceptor))
}
}

#[cfg(feature = "io-uring")]
impl<St, FA, S, E, const HEADER_LIMIT: usize, const READ_BUF_LIMIT: usize, const WRITE_BUF_LIMIT: usize>
Service<Result<S, E>>
for HttpServiceBuilder<marker::Http2Uring, St, FA, HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>
where
FA: Service,
FA::Error: fmt::Debug + 'static,
E: fmt::Debug + 'static,
{
type Response = super::service::H2UringService<S, FA::Response, HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>;
type Error = Error;

async fn call(&self, res: Result<S, E>) -> Result<Self::Response, Self::Error> {
let service = res.map_err(|e| Box::new(e) as Error)?;
let tls_acceptor = self.tls_factory.call(()).await.map_err(|e| Box::new(e) as Error)?;
Ok(super::service::H2UringService::new(self.config, service, tls_acceptor))
}
}
90 changes: 42 additions & 48 deletions http/src/h2/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ mod io_uring {
}

/// Experimental h2 http layer.
pub async fn run<Io, S, ResB, ResBE>(io: Io, service: S) -> io::Result<()>
pub async fn run<Io, S, ResB, ResBE>(io: Io, service: &S) -> io::Result<()>
where
Io: AsyncBufRead + AsyncBufWrite,
S: Service<Request<RequestExt<RequestBody>>, Response = Response<ResB>>,
Expand Down Expand Up @@ -385,65 +385,59 @@ mod io_uring {
let mut write_task = pin!(async {
let mut encoder = hpack::Encoder::new(65535, 4096);

use core::ops::ControlFlow;

enum State {
Write,
WriteEof,
}

loop {
let state = poll_fn(|cx| match rx.poll_recv(cx) {
Poll::Ready(Some(msg)) => {
match msg {
Message::Head(headers) => {
let mut buf = (&mut write_buf).limit(4096);
headers.encode(&mut encoder, &mut buf);
}
Message::Data(mut data) => {
data.encode_chunk(&mut write_buf);
}
Message::Trailer(headers) => {
let mut buf = (&mut write_buf).limit(4096);
headers.encode(&mut encoder, &mut buf);
}
Message::Reset(id, reason) => {
let reset = Reset::new(id, reason);
reset.encode(&mut write_buf);
}
Message::WindowUpdate(id, size) => {
debug_assert!(size > 0, "window update size not be 0");
// TODO: batch window update
let update = WindowUpdate::new(0.into(), size as _);
update.encode(&mut write_buf);
let update = WindowUpdate::new(id, size as _);
update.encode(&mut write_buf);
}
Message::Settings => {
let setting = Settings::ack();
setting.encode(&mut write_buf);
}
};
Poll::Ready(ControlFlow::Continue(()))
let state = poll_fn(|cx| loop {
match rx.poll_recv(cx) {
Poll::Ready(Some(msg)) => {
match msg {
Message::Head(headers) => {
let mut buf = (&mut write_buf).limit(4096);
headers.encode(&mut encoder, &mut buf);
}
Message::Data(mut data) => {
data.encode_chunk(&mut write_buf);
}
Message::Trailer(headers) => {
let mut buf = (&mut write_buf).limit(4096);
headers.encode(&mut encoder, &mut buf);
}
Message::Reset(id, reason) => {
let reset = Reset::new(id, reason);
reset.encode(&mut write_buf);
}
Message::WindowUpdate(id, size) => {
debug_assert!(size > 0, "window update size not be 0");
// TODO: batch window update
let update = WindowUpdate::new(0.into(), size as _);
update.encode(&mut write_buf);
let update = WindowUpdate::new(id, size as _);
update.encode(&mut write_buf);
}
Message::Settings => {
let setting = Settings::ack();
setting.encode(&mut write_buf);
}
};
}
Poll::Pending if write_buf.is_empty() => return Poll::Pending,
Poll::Pending => return Poll::Ready(State::Write),
Poll::Ready(None) => return Poll::Ready(State::WriteEof),
}
Poll::Pending if write_buf.is_empty() => Poll::Pending,
Poll::Pending => Poll::Ready(ControlFlow::Break(State::Write)),
Poll::Ready(None) => Poll::Ready(ControlFlow::Break(State::WriteEof)),
})
.await;

match state {
ControlFlow::Continue(_) => continue,
ControlFlow::Break(state) => {
let (res, buf) = write_io(write_buf, &io).await;
write_buf = buf;
let (res, buf) = write_io(write_buf, &io).await;
write_buf = buf;

res?;
res?;

if matches!(state, State::WriteEof) {
return Ok::<_, io::Error>(());
}
}
if matches!(state, State::WriteEof) {
return Ok::<_, io::Error>(());
}
}
});
Expand Down
97 changes: 97 additions & 0 deletions http/src/h2/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,100 @@ where
Ok(())
}
}

#[cfg(feature = "io-uring")]
pub(crate) use io_uring::H2UringService;

#[cfg(feature = "io-uring")]
mod io_uring {
use {
xitca_io::{
io_uring::{AsyncBufRead, AsyncBufWrite},
net::io_uring::TcpStream,
},
xitca_service::ready::ReadyService,
};

use crate::{
config::HttpServiceConfig,
date::{DateTime, DateTimeService},
util::timer::KeepAlive,
};

use super::*;

pub struct H2UringService<
S,
A,
const HEADER_LIMIT: usize,
const READ_BUF_LIMIT: usize,
const WRITE_BUF_LIMIT: usize,
> {
pub(crate) config: HttpServiceConfig<HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>,
pub(crate) date: DateTimeService,
pub(crate) service: S,
pub(crate) tls_acceptor: A,
}

impl<S, A, const HEADER_LIMIT: usize, const READ_BUF_LIMIT: usize, const WRITE_BUF_LIMIT: usize>
H2UringService<S, A, HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>
{
pub(crate) fn new(
config: HttpServiceConfig<HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>,
service: S,
tls_acceptor: A,
) -> Self {
Self {
config,
date: DateTimeService::new(),
service,
tls_acceptor,
}
}
}

impl<S, B, BE, A, const HEADER_LIMIT: usize, const READ_BUF_LIMIT: usize, const WRITE_BUF_LIMIT: usize>
Service<(TcpStream, SocketAddr)> for H2UringService<S, A, HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>
where
S: Service<Request<RequestExt<crate::h2::proto::RequestBody>>, Response = Response<B>>,
A: Service<TcpStream>,
A::Response: AsyncBufRead + AsyncBufWrite + 'static,
B: Stream<Item = Result<Bytes, BE>>,
HttpServiceError<S::Error, BE>: From<A::Error>,

S::Error: fmt::Debug,
BE: fmt::Debug,
{
type Response = ();
type Error = HttpServiceError<S::Error, BE>;
async fn call(&self, (io, _): (TcpStream, SocketAddr)) -> Result<Self::Response, Self::Error> {
let accept_dur = self.config.tls_accept_timeout;
let deadline = self.date.get().now() + accept_dur;
let mut timer = pin!(KeepAlive::new(deadline));

let io = self
.tls_acceptor
.call(io)
.timeout(timer.as_mut())
.await
.map_err(|_| HttpServiceError::Timeout(TimeoutError::TlsAccept))??;

crate::h2::proto::run(io, &self.service).await.unwrap();

Ok(())
}
}

impl<S, A, const HEADER_LIMIT: usize, const READ_BUF_LIMIT: usize, const WRITE_BUF_LIMIT: usize> ReadyService
for H2UringService<S, A, HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>
where
S: ReadyService,
{
type Ready = S::Ready;

#[inline]
async fn ready(&self) -> Self::Ready {
self.service.ready().await
}
}
}
2 changes: 1 addition & 1 deletion test/tests/h2_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn h2_v2_post() {
let service = fn_service(move |(stream, _): (TcpStream, SocketAddr)| {
let tx2 = tx2.clone();
async move {
h2::run(stream, fn_service(handler).call(()).now_or_panic().unwrap())
h2::run(stream, &fn_service(handler).call(()).now_or_panic().unwrap())
.await
.map(|_| {
let _ = tx2.send(());
Expand Down

0 comments on commit 2863b40

Please sign in to comment.