Skip to content

Commit

Permalink
fix(client): drop connection if body not consumed
Browse files Browse the repository at this point in the history
  • Loading branch information
joelwurtz committed Jan 30, 2025
1 parent 300c5cd commit c4712ec
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 53 deletions.
8 changes: 1 addition & 7 deletions client/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ use crate::bytes::Bytes;
#[allow(clippy::large_enum_variant)]
pub enum ResponseBody {
#[cfg(feature = "http1")]
H1(crate::h1::body::ResponseBody<crate::connection::H1ConnectionWithKey>),
#[cfg(feature = "http1")]
H1Owned(crate::h1::body::ResponseBody<crate::connection::H1ConnectionWithoutKey>),
H1(crate::h1::body::ResponseBody),
#[cfg(feature = "http2")]
H2(crate::h2::body::ResponseBody),
#[cfg(feature = "http3")]
Expand All @@ -36,8 +34,6 @@ impl fmt::Debug for ResponseBody {
match *self {
#[cfg(feature = "http1")]
Self::H1(_) => f.write_str("ResponseBody::H1(..)"),
#[cfg(feature = "http1")]
Self::H1Owned(_) => f.write_str("ResponseBody::H1Owned(..)"),
#[cfg(feature = "http2")]
Self::H2(_) => f.write_str("ResponseBody::H2(..)"),
#[cfg(feature = "http3")]
Expand Down Expand Up @@ -73,8 +69,6 @@ impl Stream for ResponseBody {
match self.get_mut() {
#[cfg(feature = "http1")]
Self::H1(body) => Pin::new(body).poll_next(cx),
#[cfg(feature = "http1")]
Self::H1Owned(body) => Pin::new(body).poll_next(cx),
#[cfg(feature = "http2")]
Self::H2(body) => Pin::new(body).poll_next(cx),
#[cfg(feature = "http3")]
Expand Down
8 changes: 0 additions & 8 deletions client/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,6 @@ use xitca_http::http::uri::{Authority, PathAndQuery};

use super::{tls::TlsStream, uri::Uri};

#[cfg(feature = "http1")]
/// A convince type alias for typing connection without interacting with pool.
pub type H1ConnectionWithKey = crate::pool::exclusive::Conn<ConnectionKey, ConnectionExclusive>;

#[cfg(feature = "http1")]
/// A convince type alias for typing connection without interacting with pool.
pub type H1ConnectionWithoutKey = crate::pool::exclusive::PooledConn<ConnectionExclusive>;

/// exclusive connection for http1 and in certain case they can be upgraded to [ConnectionShared]
pub type ConnectionExclusive = TlsStream;

Expand Down
34 changes: 22 additions & 12 deletions client/src/h1/body.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{
io,
ops::DerefMut,
pin::Pin,
task::{ready, Context, Poll},
};
Expand All @@ -13,31 +12,34 @@ use xitca_http::{
};
use xitca_io::io::{AsyncIo, Interest};

pub struct ResponseBody<C> {
conn: C,
use crate::{
connection::{ConnectionExclusive, ConnectionKey},
pool::exclusive::Conn,
};

pub type Connection = Conn<ConnectionKey, ConnectionExclusive>;

pub struct ResponseBody {
conn: Connection,
buf: BytesMut,
decoder: TransferCoding,
}

impl<C> ResponseBody<C> {
pub(crate) fn new(conn: C, buf: BytesMut, decoder: TransferCoding) -> Self {
impl ResponseBody {
pub(crate) fn new(conn: Connection, buf: BytesMut, decoder: TransferCoding) -> Self {
Self { conn, buf, decoder }
}

pub(crate) fn conn(&self) -> &C {
pub(crate) fn conn(&self) -> &Connection {
&self.conn
}

pub(crate) fn conn_mut(&mut self) -> &mut C {
pub(crate) fn conn_mut(&mut self) -> &mut Connection {
&mut self.conn
}
}

impl<C> Stream for ResponseBody<C>
where
C: DerefMut + Unpin,
C::Target: AsyncIo + Sized,
{
impl Stream for ResponseBody {
type Item = Result<Bytes, BodyError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down Expand Up @@ -69,3 +71,11 @@ where
}
}
}

impl Drop for ResponseBody {
fn drop(&mut self) {
if !self.decoder.is_eof() {
self.conn.destroy_on_drop()
}
}
}
20 changes: 0 additions & 20 deletions client/src/http_tunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ where
let _io: &mut ConnectionExclusive = match inner.body {
#[cfg(feature = "http1")]
ResponseBody::H1(ref mut body) => body.conn_mut(),
#[cfg(feature = "http1")]
ResponseBody::H1Owned(ref mut body) => body.conn_mut(),
#[cfg(feature = "http2")]
ResponseBody::H2(ref mut body) => {
while !inner.io.write_buf.chunk().is_empty() {
Expand Down Expand Up @@ -147,10 +145,6 @@ where
ResponseBody::H1(ref mut body) => {
xitca_io::io::AsyncIo::poll_shutdown(Pin::new(&mut **body.conn_mut()), cx).map_err(Into::into)
}
#[cfg(feature = "http1")]
ResponseBody::H1Owned(ref mut body) => {
xitca_io::io::AsyncIo::poll_shutdown(Pin::new(&mut **body.conn_mut()), cx).map_err(Into::into)
}
#[cfg(feature = "http2")]
ResponseBody::H2(ref mut body) => {
body.send_data(xitca_http::bytes::Bytes::new(), true)?;
Expand All @@ -175,8 +169,6 @@ impl AsyncIo for HttpTunnel {
match self.body {
#[cfg(feature = "http1")]
ResponseBody::H1(ref mut body) => body.conn_mut().ready(_interest).await,
#[cfg(feature = "http1")]
ResponseBody::H1Owned(ref mut body) => body.conn_mut().ready(_interest).await,
#[cfg(feature = "http2")]
ResponseBody::H2(_) => core::future::poll_fn(|cx| self.poll_ready(_interest, cx)).await,
_ => Ok(Ready::ALL),
Expand All @@ -187,8 +179,6 @@ impl AsyncIo for HttpTunnel {
match self.body {
#[cfg(feature = "http1")]
ResponseBody::H1(ref mut body) => body.conn_mut().poll_ready(_interest, _cx),
#[cfg(feature = "http1")]
ResponseBody::H1Owned(ref mut body) => body.conn_mut().poll_ready(_interest, _cx),
#[cfg(feature = "http2")]
ResponseBody::H2(ref mut body) => self.io.poll_ready(body, _interest, _cx),
_ => Poll::Ready(Ok(Ready::ALL)),
Expand All @@ -199,8 +189,6 @@ impl AsyncIo for HttpTunnel {
match self.body {
#[cfg(feature = "http1")]
ResponseBody::H1(ref body) => body.conn().is_vectored_write(),
#[cfg(feature = "http1")]
ResponseBody::H1Owned(ref body) => body.conn().is_vectored_write(),
_ => false,
}
}
Expand All @@ -210,8 +198,6 @@ impl AsyncIo for HttpTunnel {
match this.body {
#[cfg(feature = "http1")]
ResponseBody::H1(ref mut body) => AsyncIo::poll_shutdown(Pin::new(&mut **body.conn_mut()), _cx),
#[cfg(feature = "http1")]
ResponseBody::H1Owned(ref mut body) => AsyncIo::poll_shutdown(Pin::new(&mut **body.conn_mut()), _cx),
#[cfg(feature = "http2")]
ResponseBody::H2(ref mut body) => this.io.poll_shutdown(body, _cx),
_ => Poll::Ready(Ok(())),
Expand All @@ -224,8 +210,6 @@ impl io::Read for HttpTunnel {
match self.body {
#[cfg(feature = "http1")]
ResponseBody::H1(ref mut body) => body.conn_mut().read(_buf),
#[cfg(feature = "http1")]
ResponseBody::H1Owned(ref mut body) => body.conn_mut().read(_buf),
#[cfg(feature = "http2")]
ResponseBody::H2(_) => self.io.read(_buf),
_ => Ok(0),
Expand All @@ -238,8 +222,6 @@ impl io::Write for HttpTunnel {
match self.body {
#[cfg(feature = "http1")]
ResponseBody::H1(ref mut body) => body.conn_mut().write(_buf),
#[cfg(feature = "http1")]
ResponseBody::H1Owned(ref mut body) => body.conn_mut().write(_buf),
#[cfg(feature = "http2")]
ResponseBody::H2(ref mut body) => self.io.write(body, _buf),
_ => Ok(0),
Expand All @@ -250,8 +232,6 @@ impl io::Write for HttpTunnel {
match self.body {
#[cfg(feature = "http1")]
ResponseBody::H1(ref mut body) => body.conn_mut().flush(),
#[cfg(feature = "http1")]
ResponseBody::H1Owned(ref mut body) => body.conn_mut().flush(),
#[cfg(feature = "http2")]
ResponseBody::H2(_) => self.io.flush(),
_ => Ok(()),
Expand Down
6 changes: 0 additions & 6 deletions client/src/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ impl Sink<Message> for WebSocketTunnel {
let _io: &mut ConnectionExclusive = match inner.recv_stream.inner_mut() {
#[cfg(feature = "http1")]
ResponseBody::H1(body) => body.conn_mut(),
#[cfg(feature = "http1")]
ResponseBody::H1Owned(body) => body.conn_mut(),
#[cfg(feature = "http2")]
ResponseBody::H2(body) => {
while !inner.send_buf.chunk().is_empty() {
Expand Down Expand Up @@ -160,10 +158,6 @@ impl Sink<Message> for WebSocketTunnel {
ResponseBody::H1(body) => {
xitca_io::io::AsyncIo::poll_shutdown(Pin::new(&mut **body.conn_mut()), cx).map_err(Into::into)
}
#[cfg(feature = "http1")]
ResponseBody::H1Owned(body) => {
xitca_io::io::AsyncIo::poll_shutdown(Pin::new(&mut **body.conn_mut()), cx).map_err(Into::into)
}
#[cfg(feature = "http2")]
ResponseBody::H2(body) => {
body.send_data(xitca_http::bytes::Bytes::new(), true)?;
Expand Down
27 changes: 27 additions & 0 deletions test/tests/h1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,33 @@ async fn h1_get() -> Result<(), Error> {
Ok(())
}

#[tokio::test]
async fn h1_get_without_body_reading() -> Result<(), Error> {
let mut handle = test_h1_server(fn_service(handle))?;

let server_url = format!("http://{}/", handle.ip_port_string());

let c = Client::builder().set_pool_capacity(1).finish();

let mut res = c.get(&server_url).version(Version::HTTP_11).send().await?;
assert_eq!(res.status().as_u16(), 200);
assert!(!res.can_close_connection());

// drop the response body without reading it.
drop(res);

let mut res = c.get(&server_url).version(Version::HTTP_11).send().await?;
assert_eq!(res.status().as_u16(), 200);
assert!(!res.can_close_connection());
let body = res.string().await?;
assert_eq!("GET Response", body);

handle.try_handle()?.stop(false);
handle.await?;

Ok(())
}

#[tokio::test]
async fn h1_head() -> Result<(), Error> {
let mut handle = test_h1_server(fn_service(handle))?;
Expand Down

0 comments on commit c4712ec

Please sign in to comment.