Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
joelwurtz committed Jan 31, 2025
1 parent f0bd80e commit 2f236c9
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 35 deletions.
22 changes: 19 additions & 3 deletions http/src/h1/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use core::{
};

use futures_core::stream::Stream;
use std::io;
use std::{io, rc::Rc};
use tokio_util::sync::CancellationToken;
use tracing::trace;
use xitca_io::io::{AsyncIo, Interest, Ready};
Expand Down Expand Up @@ -41,6 +41,7 @@ use super::proto::{
encode::CONTINUE,
error::ProtoError,
};
use crate::util::futures::WaitOrPending;

type ExtRequest<B> = crate::http::Request<crate::http::RequestExt<B>>;

Expand Down Expand Up @@ -91,6 +92,7 @@ struct Dispatcher<'a, St, S, ReqB, W, D, const HEADER_LIMIT: usize, const READ_B
service: &'a S,
_phantom: PhantomData<ReqB>,
cancellation_token: CancellationToken,
request_guard: Rc<()>,
}

// timer state is transformed in following order:
Expand Down Expand Up @@ -186,6 +188,7 @@ where
service,
_phantom: PhantomData,
cancellation_token,
request_guard: Rc::new(()),
}
}

Expand All @@ -208,9 +211,18 @@ where
// TODO: add timeout for drain write?
self.io.drain_write().await?;

// shutdown io if connection is closed.
if self.ctx.is_connection_closed() {
return self.io.shutdown().await.map_err(Into::into);
}

// shutdown io if there is no more read buf
if self.io.read_buf.is_empty()
&& self.cancellation_token.is_cancelled()
&& Rc::strong_count(&self.request_guard) == 1
{
return self.io.shutdown().await.map_err(Into::into);
}
}
}

Expand All @@ -220,21 +232,25 @@ where
match self
.io
.read()
.select(self.cancellation_token.cancelled())
.select(WaitOrPending::new(
self.cancellation_token.cancelled(),
self.cancellation_token.is_cancelled(),
))
.timeout(self.timer.get())
.await
{
Err(_) => return Err(self.timer.map_to_err()),
Ok(SelectOutput::A(Ok(_))) => {}
Ok(SelectOutput::A(Err(_))) => return Err(Error::KeepAliveExpire),
Ok(SelectOutput::B(())) => self.ctx.set_close(),
Ok(SelectOutput::B(())) => {}
}

while let Some((req, decoder)) = self.ctx.decode_head::<READ_BUF_LIMIT>(&mut self.io.read_buf)? {
self.timer.reset_state();

let (mut body_reader, body) = BodyReader::from_coding(decoder);
let req = req.map(|ext| ext.map_body(|_| ReqB::from(body)));
let _guard = self.request_guard.clone();

let (parts, body) = match self
.service
Expand Down
53 changes: 35 additions & 18 deletions http/src/h1/dispatcher_uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,6 @@ use xitca_io::{
use xitca_service::Service;
use xitca_unsafe_collection::futures::{Select, SelectOutput};

use crate::{
body::NoneBody,
bytes::Bytes,
config::HttpServiceConfig,
date::DateTime,
h1::{body::RequestBody, error::Error},
http::{response::Response, StatusCode},
util::timer::{KeepAlive, Timeout},
};

use super::{
dispatcher::{status_only, Timer},
proto::{
Expand All @@ -42,6 +32,16 @@ use super::{
error::ProtoError,
},
};
use crate::util::futures::WaitOrPending;
use crate::{
body::NoneBody,
bytes::Bytes,
config::HttpServiceConfig,
date::DateTime,
h1::{body::RequestBody, error::Error},
http::{response::Response, StatusCode},
util::timer::{KeepAlive, Timeout},
};

type ExtRequest<B> = crate::http::Request<crate::http::RequestExt<B>>;

Expand All @@ -56,6 +56,7 @@ pub(super) struct Dispatcher<'a, Io, S, ReqB, D, const H_LIMIT: usize, const R_L
notify: Notify<BufOwned>,
_phantom: PhantomData<ReqB>,
cancellation_token: CancellationToken,
request_guard: Rc<()>,
}

#[derive(Default)]
Expand Down Expand Up @@ -132,13 +133,14 @@ where
notify: Notify::new(),
_phantom: PhantomData,
cancellation_token,
request_guard: Rc::new(()),
}
}

pub(super) async fn run(mut self) -> Result<(), Error<S::Error, BE>> {
loop {
match self._run().await {
Ok(shutdown) => shutdown,
Ok(_) => {}
Err(Error::KeepAliveExpire) => {
trace!(target: "h1_dispatcher", "Connection keep-alive expired. Shutting down");
return Ok(());
Expand All @@ -156,6 +158,14 @@ where
if self.ctx.is_connection_closed() {
return self.io.shutdown(Shutdown::Both).map_err(Into::into);
}

// shutdown io if there is no more read buf
if self.read_buf.is_empty()
&& self.cancellation_token.is_cancelled()
&& Rc::strong_count(&self.request_guard) == 1
{
return self.io.shutdown(Shutdown::Both).map_err(Into::into);
}
}
}

Expand All @@ -165,26 +175,32 @@ where
let read = match self
.read_buf
.read_io(&*self.io)
.select(self.cancellation_token.cancelled())
.select(WaitOrPending::new(
self.cancellation_token.cancelled(),
self.cancellation_token.is_cancelled(),
))
.timeout(self.timer.get())
.await
{
Err(_) => return Err(self.timer.map_to_err()),
Ok(SelectOutput::A(Ok(read))) => read,
Ok(SelectOutput::A(Err(_))) => return Err(Error::KeepAliveExpire),
Ok(SelectOutput::B(())) => {
self.ctx.set_close();

return Ok(());
}
Ok(SelectOutput::B(())) => 0,
};

if read == 0 {
self.ctx.set_close();
if !self.cancellation_token.is_cancelled() {
println!("set close");
self.ctx.set_close();
} else {
println!("cancelled");
}

return Ok(());
}

while let Some((req, decoder)) = self.ctx.decode_head::<R_LIMIT>(&mut self.read_buf)? {
println!("decode head");
self.timer.reset_state();

let (waiter, body) = if decoder.is_eof() {
Expand All @@ -204,6 +220,7 @@ where

let req = req.map(|ext| ext.map_body(|_| ReqB::from(body)));

let _guard = self.request_guard.clone();
let (parts, body) = self.service.call(req).await.map_err(Error::Service)?.into_parts();

let mut encoder = self.ctx.encode_head(parts, &body, &mut *self.write_buf)?;
Expand Down
24 changes: 21 additions & 3 deletions http/src/h2/proto/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ use core::{
task::{ready, Context, Poll},
time::Duration,
};

use std::rc::Rc;
use ::h2::{
server::{Connection, SendResponse},
Ping, PingPong,
};
use futures_core::stream::Stream;
use tokio_util::sync::CancellationToken;
use tracing::trace;
use xitca_io::io::{AsyncRead, AsyncWrite};
use xitca_service::Service;
Expand All @@ -30,6 +31,7 @@ use crate::{
},
util::{futures::Queue, timer::KeepAlive},
};
use crate::util::futures::WaitOrPending;

/// Http/2 dispatcher
pub(crate) struct Dispatcher<'a, TlsSt, S, ReqB> {
Expand All @@ -39,6 +41,7 @@ pub(crate) struct Dispatcher<'a, TlsSt, S, ReqB> {
ka_dur: Duration,
service: &'a S,
date: &'a DateTimeHandle,
cancellation_token: CancellationToken,
_req_body: PhantomData<ReqB>,
}

Expand All @@ -60,6 +63,7 @@ where
ka_dur: Duration,
service: &'a S,
date: &'a DateTimeHandle,
cancellation_token: CancellationToken,
) -> Self {
Self {
io,
Expand All @@ -69,6 +73,7 @@ where
service,
date,
_req_body: PhantomData,
cancellation_token,
}
}

Expand All @@ -80,6 +85,7 @@ where
ka_dur,
service,
date,
cancellation_token,
..
} = self;

Expand All @@ -101,7 +107,11 @@ where
let mut queue = Queue::new();

loop {
match io.accept().select(try_poll_queue(&mut queue, &mut ping_pong)).await {
if queue.is_empty() && cancellation_token.is_cancelled() {
break;
}

match io.accept().select(try_poll_queue(&mut queue, &mut ping_pong, cancellation_token.clone())).await {
SelectOutput::A(Some(Ok((req, tx)))) => {
// Convert http::Request body type to crate::h2::Body
// and reconstruct as HttpRequest.
Expand Down Expand Up @@ -137,6 +147,7 @@ where
async fn try_poll_queue<F, E, S, B>(
queue: &mut Queue<F>,
ping_ping: &mut H2PingPong<'_>,
cancellation_token: CancellationToken,
) -> SelectOutput<(), Result<(), ::h2::Error>>
where
F: Future<Output = Result<ConnectionState, E>>,
Expand All @@ -146,7 +157,14 @@ where
{
loop {
if queue.is_empty() {
return SelectOutput::B(ping_ping.await);
return match ping_ping.select(WaitOrPending::new(cancellation_token.cancelled(), cancellation_token.is_cancelled())).await {
SelectOutput::A(res) => SelectOutput::B(res),
SelectOutput::B(_) => {
println!("cancelled");

SelectOutput::A(())
},
}
}

match queue.next2().await {
Expand Down
3 changes: 2 additions & 1 deletion http/src/h2/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ where

async fn call(
&self,
((io, addr), _): ((St, SocketAddr), CancellationToken),
((io, addr), cancellation_token): ((St, SocketAddr), CancellationToken),
) -> Result<Self::Response, Self::Error> {
// tls accept timer.
let timer = self.keep_alive();
Expand Down Expand Up @@ -78,6 +78,7 @@ where
self.config.keep_alive_timeout,
&self.service,
self.date.get(),
cancellation_token
);

dispatcher.run().await?;
Expand Down
1 change: 1 addition & 0 deletions http/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ where
self.config.keep_alive_timeout,
&self.service,
self.date.get(),
cancellation_token,
)
.run()
.await
Expand Down
38 changes: 38 additions & 0 deletions http/src/util/futures.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use pin_project_lite::pin_project;
#[cfg(any(feature = "http2", feature = "http3"))]
pub(crate) use queue::*;
use std::future::Future;

#[cfg(any(feature = "http2", feature = "http3"))]
mod queue {
Expand Down Expand Up @@ -43,3 +45,39 @@ mod queue {
}
}
}

// A future that resolve only one time when the future is ready
pin_project! {
pub(crate) struct WaitOrPending<F> {
#[pin]
future: F,
is_pending: bool,
}
}

impl<F> WaitOrPending<F> {
pub fn new(future: F, is_pending: bool) -> Self {
Self { future, is_pending }
}
}

impl<F: Future> Future for WaitOrPending<F> {
type Output = F::Output;

fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
if self.is_pending {
return std::task::Poll::Pending;
}

let this = self.as_mut().project();

match this.future.poll(cx) {
std::task::Poll::Ready(f) => {
*this.is_pending = true;

std::task::Poll::Ready(f)
}
poll => poll,
}
}
}
2 changes: 2 additions & 0 deletions server/src/server/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ impl ServerFuture {
match *self {
Self::Init { ref server, .. } => Ok(ServerHandle {
tx: server.tx_cmd.clone(),
cancellation_token: server.cancellation_token.clone(),
}),
Self::Running(ref inner) => Ok(ServerHandle {
tx: inner.server.tx_cmd.clone(),
cancellation_token: inner.server.cancellation_token.clone(),
}),
Self::Error(_) => match mem::take(self) {
Self::Error(e) => Err(e),
Expand Down
6 changes: 4 additions & 2 deletions server/src/server/handle.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use tokio::sync::mpsc::UnboundedSender;

use super::Command;
use tokio::sync::mpsc::UnboundedSender;
use tokio_util::sync::CancellationToken;

#[derive(Clone)]
pub struct ServerHandle {
pub(super) tx: UnboundedSender<Command>,
pub(super) cancellation_token: CancellationToken,
}

impl ServerHandle {
Expand All @@ -17,5 +18,6 @@ impl ServerHandle {
};

let _ = self.tx.send(cmd);
self.cancellation_token.cancel();
}
}
Loading

0 comments on commit 2f236c9

Please sign in to comment.