Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

import tokio-util for buffered read. #991

Merged
merged 1 commit into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2021"

[features]
default = ["http1"]
http1 = ["httparse", "xitca-http/http1"]
http1 = ["httparse", "tokio-util", "xitca-http/http1"]
http2 = ["h2", "itoa", "xitca-http/http2"]
http3 = ["h3", "h3-quinn", "quinn/tls-rustls", "itoa", "async-stream", "rustls_0dot21", "webpki_roots_0dot25"]
openssl = ["openssl-crate", "tokio-openssl"]
Expand All @@ -31,6 +31,7 @@ tracing = { version = "0.1.40", default-features = false }

# http/1 support
httparse = { version = "1.8.0", optional = true }
tokio-util = { version = "0.7", features = ["io"], optional = true }

# http/2 support
h2 = { version = "0.4", optional = true }
Expand Down
27 changes: 9 additions & 18 deletions client/src/h1/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
};

use futures_core::stream::Stream;
use tokio::io::{AsyncRead, ReadBuf};
use tokio::io::AsyncRead;
use xitca_http::{
bytes::{Bytes, BytesMut},
error::BodyError,
Expand All @@ -16,20 +16,12 @@ use xitca_http::{
pub struct ResponseBody<C> {
conn: C,
buf: BytesMut,
// a chunker reader is used to provide a safe rust only api for reading into buf.
// this is less efficient than reading directly into BytesMut.
chunk: Vec<u8>,
decoder: TransferCoding,
}

impl<C> ResponseBody<C> {
pub(crate) fn new(conn: C, buf: BytesMut, chunk: Vec<u8>, decoder: TransferCoding) -> Self {
Self {
conn,
buf,
chunk,
decoder,
}
pub(crate) fn new(conn: C, buf: BytesMut, decoder: TransferCoding) -> Self {
Self { conn, buf, decoder }
}

pub(crate) fn conn(&mut self) -> &mut C {
Expand All @@ -51,15 +43,14 @@ where
match this.decoder.decode(&mut this.buf) {
ChunkResult::Ok(bytes) => return Poll::Ready(Some(Ok(bytes))),
ChunkResult::InsufficientData => {
let mut buf = ReadBuf::new(this.chunk.as_mut_slice());
ready!(Pin::new(&mut *this.conn).poll_read(cx, &mut buf))?;
let filled = buf.filled();

if filled.is_empty() {
let n = ready!(tokio_util::io::poll_read_buf(
Pin::new(&mut *this.conn),
cx,
&mut this.buf
))?;
if n == 0 {
return Poll::Ready(Some(Err(io::Error::from(io::ErrorKind::UnexpectedEof).into())));
}

this.buf.extend_from_slice(filled);
}
ChunkResult::Err(e) => return Poll::Ready(Some(Err(e.into()))),
_ => return Poll::Ready(None),
Expand Down
28 changes: 9 additions & 19 deletions client/src/h1/proto/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use core::{future::poll_fn, pin::Pin};
use std::io;

use futures_core::stream::Stream;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::io::{AsyncRead, AsyncWrite};
use xitca_http::{body::BodySize, bytes::Buf, h1::proto::codec::TransferCoding};

use crate::{
Expand All @@ -23,7 +23,7 @@ pub(crate) async fn send<S, B, E>(
stream: &mut S,
date: DateTimeHandle<'_>,
req: &mut Request<B>,
) -> Result<(Response<()>, BytesMut, Vec<u8>, TransferCoding, bool), Error>
) -> Result<(Response<()>, BytesMut, TransferCoding, bool), Error>
where
S: AsyncRead + AsyncWrite + Unpin,
B: Stream<Item = Result<Bytes, E>> + Unpin,
Expand Down Expand Up @@ -83,14 +83,11 @@ where

write_all_buf(stream.as_mut(), &mut buf).await?;

let mut chunk = vec![0; 4096];

if is_expect {
poll_fn(|cx| stream.as_mut().poll_flush(cx)).await?;

loop {
if let Some((res, mut decoder)) = try_read_response(stream.as_mut(), &mut buf, &mut chunk, &mut ctx).await?
{
if let Some((res, mut decoder)) = try_read_response(stream.as_mut(), &mut buf, &mut ctx).await? {
if res.status() == StatusCode::CONTINUE {
break;
}
Expand All @@ -101,7 +98,7 @@ where
decoder = TransferCoding::eof();
}

return Ok((res, buf, chunk, decoder, is_close));
return Ok((res, buf, decoder, is_close));
}
}
}
Expand All @@ -124,7 +121,7 @@ where

// read response head and get body decoder.
loop {
if let Some((res, mut decoder)) = try_read_response(stream.as_mut(), &mut buf, &mut chunk, &mut ctx).await? {
if let Some((res, mut decoder)) = try_read_response(stream.as_mut(), &mut buf, &mut ctx).await? {
// check if server sent connection close header.

// *. If send_body function produces error, Context has already set
Expand All @@ -138,7 +135,7 @@ where
decoder = TransferCoding::eof();
}

return Ok((res, buf, chunk, decoder, is_close));
return Ok((res, buf, decoder, is_close));
}
}
}
Expand Down Expand Up @@ -180,32 +177,25 @@ where
{
while buf.has_remaining() {
let n = poll_fn(|cx| stream.as_mut().poll_write(cx, buf.chunk())).await?;
buf.advance(n);
if n == 0 {
return Err(io::Error::from(io::ErrorKind::WriteZero));
}
buf.advance(n);
}
Ok(())
}

async fn try_read_response<S>(
mut stream: Pin<&mut S>,
buf: &mut BytesMut,
chunk: &mut [u8],
ctx: &mut Context<'_, '_, 128>,
) -> Result<Option<(Response<()>, TransferCoding)>, Error>
where
S: AsyncRead,
{
let mut b = ReadBuf::new(chunk);
poll_fn(|cx| stream.as_mut().poll_read(cx, &mut b)).await?;
let filled = b.filled();

if filled.is_empty() {
let n = poll_fn(|cx| tokio_util::io::poll_read_buf(stream.as_mut(), cx, buf)).await?;
if n == 0 {
return Err(Error::from(io::Error::from(io::ErrorKind::UnexpectedEof)));
}

buf.extend_from_slice(filled);

ctx.decode_head(buf).map_err(Into::into)
}
4 changes: 2 additions & 2 deletions client/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,12 @@ pub(crate) fn base_service() -> HttpService {

#[cfg(feature = "http1")]
match _res {
Ok(Ok((res, buf, chunk, decoder, is_close))) => {
Ok(Ok((res, buf, decoder, is_close))) => {
if is_close {
conn.destroy_on_drop();
}

let body = crate::h1::body::ResponseBody::new(conn, buf, chunk, decoder);
let body = crate::h1::body::ResponseBody::new(conn, buf, decoder);
let res = res.map(|_| crate::body::ResponseBody::H1(body));
let timeout = client.timeout_config.response_timeout;

Expand Down
Loading