Skip to content

Commit

Permalink
feat(client): allow to set timeout per request
Browse files Browse the repository at this point in the history
  • Loading branch information
joelwurtz committed Dec 26, 2024
1 parent 99f7c97 commit b84b020
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 40 deletions.
22 changes: 10 additions & 12 deletions client/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use core::{net::SocketAddr, pin::Pin};
use core::{net::SocketAddr, pin::Pin, time::Duration};

use futures_core::stream::Stream;
use tokio::time::{Instant, Sleep};
Expand Down Expand Up @@ -254,18 +254,18 @@ impl Client {
connect: &mut Connect<'_>,
timer: &mut Pin<Box<Sleep>>,
expected_version: Version,
connect_timeout: Duration,
tls_connect_timeout: Duration,
) -> Result<(ConnectionExclusive, Version), Error> {
match connect.uri {
Uri::Tcp(_) | Uri::Tls(_) => {
let conn = self.make_tcp(connect, timer).await?;
let conn = self.make_tcp(connect, timer, connect_timeout).await?;

if matches!(connect.uri, Uri::Tcp(_)) {
return Ok((conn, expected_version));
}

timer
.as_mut()
.reset(Instant::now() + self.timeout_config.tls_connect_timeout);
timer.as_mut().reset(Instant::now() + tls_connect_timeout);

let (conn, version) = self
.connector
Expand All @@ -277,7 +277,7 @@ impl Client {
Ok((conn, version))
}
Uri::Unix(_) => self
.make_unix(connect, timer)
.make_unix(connect, timer, connect_timeout)
.await
.map(|conn| (conn, expected_version)),
}
Expand All @@ -287,16 +287,15 @@ impl Client {
&self,
connect: &mut Connect<'_>,
timer: &mut Pin<Box<Sleep>>,
connect_timeout: Duration,
) -> Result<ConnectionExclusive, Error> {
self.resolver
.call(connect)
.timeout(timer.as_mut())
.await
.map_err(|_| TimeoutError::Resolve)??;

timer
.as_mut()
.reset(Instant::now() + self.timeout_config.connect_timeout);
timer.as_mut().reset(Instant::now() + connect_timeout);

let stream = self
.make_tcp_inner(connect)
Expand Down Expand Up @@ -354,10 +353,9 @@ impl Client {
&self,
_connect: &Connect<'_>,
timer: &mut Pin<Box<Sleep>>,
connect_timeout: Duration,
) -> Result<ConnectionExclusive, Error> {
timer
.as_mut()
.reset(Instant::now() + self.timeout_config.connect_timeout);
timer.as_mut().reset(Instant::now() + connect_timeout);

#[cfg(unix)]
{
Expand Down
15 changes: 13 additions & 2 deletions client/src/middleware/redirect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,23 @@ where
type Error = Error;

async fn call(&self, req: ServiceRequest<'r, 'c>) -> Result<Self::Response, Self::Error> {
let ServiceRequest { req, client, timeout } = req;
let ServiceRequest {
req,
client,
timeout_config,
} = req;
let mut headers = req.headers().clone();
let mut method = req.method().clone();
let mut uri = req.uri().clone();
loop {
let mut res = self.service.call(ServiceRequest { req, client, timeout }).await?;
let mut res = self
.service
.call(ServiceRequest {
req,
client,
timeout_config: timeout_config.clone(),
})
.await?;
match res.status() {
StatusCode::MOVED_PERMANENTLY | StatusCode::FOUND | StatusCode::SEE_OTHER => {
if method != Method::HEAD {
Expand Down
52 changes: 41 additions & 11 deletions client/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ use crate::{
},
response::Response,
service::ServiceRequest,
timeout::PartialTimeoutConfig,
};

/// builder type for [http::Request] with extended functionalities.
pub struct RequestBuilder<'a, M = marker::Http> {
pub(crate) req: http::Request<BoxBody>,
err: Vec<Error>,
timeout_config: PartialTimeoutConfig,
client: &'a Client,
timeout: Duration,
_marker: PhantomData<M>,
}

Expand Down Expand Up @@ -103,7 +104,7 @@ impl<'a, M> RequestBuilder<'a, M> {
req: req.map(BoxBody::new),
err: Vec::new(),
client,
timeout: client.timeout_config.request_timeout,
timeout_config: PartialTimeoutConfig::new(),
_marker: PhantomData,
}
}
Expand All @@ -113,7 +114,7 @@ impl<'a, M> RequestBuilder<'a, M> {
req: self.req,
err: self.err,
client: self.client,
timeout: self.timeout,
timeout_config: self.timeout_config,
_marker: PhantomData,
}
}
Expand All @@ -124,7 +125,7 @@ impl<'a, M> RequestBuilder<'a, M> {
mut req,
err,
client,
timeout,
timeout_config,
..
} = self;

Expand All @@ -137,7 +138,7 @@ impl<'a, M> RequestBuilder<'a, M> {
.call(ServiceRequest {
req: &mut req,
client,
timeout,
timeout_config: timeout_config.to_timeout_config(&client.timeout_config),
})
.await
}
Expand Down Expand Up @@ -199,14 +200,43 @@ impl<'a, M> RequestBuilder<'a, M> {
self
}

/// Set timeout of this request.
/// Set timeout for DNS resolve.
///
/// The value passed would override global [ClientBuilder::set_request_timeout].
/// Default to client's [TimeoutConfig::resolve_timeout].
pub fn set_resolve_timeout(mut self, dur: Duration) -> Self {
self.timeout_config.resolve_timeout = Some(dur);
self
}

/// Set timeout for establishing connection.
///
/// [ClientBuilder::set_request_timeout]: crate::builder::ClientBuilder::set_request_timeout
#[inline]
pub fn timeout(mut self, dur: Duration) -> Self {
self.timeout = dur;
/// Default to client's [TimeoutConfig::connect_timeout].
pub fn set_connect_timeout(mut self, dur: Duration) -> Self {
self.timeout_config.connect_timeout = Some(dur);
self
}

/// Set timeout for tls handshake.
///
/// Default to client's [TimeoutConfig::tls_connect_timeout].
pub fn set_tls_connect_timeout(mut self, dur: Duration) -> Self {
self.timeout_config.tls_connect_timeout = Some(dur);
self
}

/// Set timeout for request.
///
/// Default to client's [TimeoutConfig::request_timeout].
pub fn set_request_timeout(mut self, dur: Duration) -> Self {
self.timeout_config.request_timeout = Some(dur);
self
}

/// Set timeout for collecting response body.
///
/// Default to client's [TimeoutConfig::response_timeout].
pub fn set_response_timeout(mut self, dur: Duration) -> Self {
self.timeout_config.response_timeout = Some(dur);
self
}

Expand Down
50 changes: 35 additions & 15 deletions client/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use core::{future::Future, pin::Pin, time::Duration};
use core::{future::Future, pin::Pin};

use crate::{
body::BoxBody,
Expand All @@ -8,6 +8,7 @@ use crate::{
http::{Request, Version},
pool::{exclusive, shared},
response::Response,
timeout::TimeoutConfig,
uri::Uri,
};

Expand Down Expand Up @@ -67,7 +68,7 @@ where
pub struct ServiceRequest<'r, 'c> {
pub req: &'r mut Request<BoxBody>,
pub client: &'c Client,
pub timeout: Duration,
pub timeout_config: TimeoutConfig,
}

/// type alias for object safe wrapper of type implement [Service] trait.
Expand All @@ -85,7 +86,11 @@ pub(crate) fn base_service() -> HttpService {
#[cfg(any(feature = "http1", feature = "http2", feature = "http3"))]
use crate::{error::TimeoutError, timeout::Timeout};

let ServiceRequest { req, client, timeout } = req;
let ServiceRequest {
req,
client,
timeout_config,
} = req;

let uri = Uri::try_parse(req.uri())?;

Expand All @@ -102,7 +107,7 @@ pub(crate) fn base_service() -> HttpService {
match version {
Version::HTTP_2 | Version::HTTP_3 => match client.shared_pool.acquire(&connect.uri).await {
shared::AcquireOutput::Conn(mut _conn) => {
let mut _timer = Box::pin(tokio::time::sleep(timeout));
let mut _timer = Box::pin(tokio::time::sleep(timeout_config.request_timeout));
*req.version_mut() = version;
#[allow(unreachable_code)]
return match _conn.conn {
Expand All @@ -113,7 +118,7 @@ pub(crate) fn base_service() -> HttpService {
.await
{
Ok(Ok(res)) => {
let timeout = client.timeout_config.response_timeout;
let timeout = timeout_config.response_timeout;
Ok(Response::new(res, _timer, timeout))
}
Ok(Err(e)) => {
Expand All @@ -133,7 +138,7 @@ pub(crate) fn base_service() -> HttpService {
.await
.map_err(|_| TimeoutError::Request)??;

let timeout = client.timeout_config.response_timeout;
let timeout = timeout_config.response_timeout;
Ok(Response::new(res, _timer, timeout))
}
};
Expand All @@ -142,15 +147,15 @@ pub(crate) fn base_service() -> HttpService {
Version::HTTP_3 => {
#[cfg(feature = "http3")]
{
let mut timer = Box::pin(tokio::time::sleep(client.timeout_config.resolve_timeout));
let mut timer = Box::pin(tokio::time::sleep(timeout_config.resolve_timeout));

Service::call(&client.resolver, &mut connect)
.timeout(timer.as_mut())
.await
.map_err(|_| TimeoutError::Resolve)??;
timer
.as_mut()
.reset(tokio::time::Instant::now() + client.timeout_config.connect_timeout);
.reset(tokio::time::Instant::now() + timeout_config.connect_timeout);

if let Ok(Ok(conn)) = crate::h3::proto::connect(
&client.h3_client,
Expand Down Expand Up @@ -182,9 +187,16 @@ pub(crate) fn base_service() -> HttpService {
Version::HTTP_2 => {
#[cfg(feature = "http2")]
{
let mut timer = Box::pin(tokio::time::sleep(client.timeout_config.resolve_timeout));
let (conn, alpn_version) =
client.make_exclusive(&mut connect, &mut timer, Version::HTTP_2).await?;
let mut timer = Box::pin(tokio::time::sleep(timeout_config.resolve_timeout));
let (conn, alpn_version) = client
.make_exclusive(
&mut connect,
&mut timer,
Version::HTTP_2,
timeout_config.connect_timeout,
timeout_config.tls_connect_timeout,
)
.await?;

if alpn_version == Version::HTTP_2 {
let conn = crate::h2::proto::handshake(conn).await?;
Expand Down Expand Up @@ -218,7 +230,7 @@ pub(crate) fn base_service() -> HttpService {

#[cfg(feature = "http1")]
{
let mut timer = Box::pin(tokio::time::sleep(timeout));
let mut timer = Box::pin(tokio::time::sleep(timeout_config.request_timeout));
let res = crate::h1::proto::send(&mut *_conn, _date, req)
.timeout(timer.as_mut())
.await;
Expand All @@ -230,7 +242,7 @@ pub(crate) fn base_service() -> HttpService {
}
let body = crate::h1::body::ResponseBody::new(_conn, buf, decoder);
let res = res.map(|_| crate::body::ResponseBody::H1(body));
let timeout = client.timeout_config.response_timeout;
let timeout = timeout_config.response_timeout;
Ok(Response::new(res, timer, timeout))
}
Ok(Err(e)) => {
Expand All @@ -250,8 +262,16 @@ pub(crate) fn base_service() -> HttpService {
}
}
exclusive::AcquireOutput::Spawner(_spawner) => {
let mut timer = Box::pin(tokio::time::sleep(client.timeout_config.resolve_timeout));
let (conn, _) = client.make_exclusive(&mut connect, &mut timer, version).await?;
let mut timer = Box::pin(tokio::time::sleep(timeout_config.resolve_timeout));
let (conn, _) = client
.make_exclusive(
&mut connect,
&mut timer,
version,
timeout_config.connect_timeout,
timeout_config.tls_connect_timeout,
)
.await?;
_spawner.spawned(conn);
}
},
Expand Down
44 changes: 44 additions & 0 deletions client/src/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use pin_project_lite::pin_project;
use tokio::time::Sleep;

/// Configuration for various timeout setting for http client.
#[derive(Clone)]
pub struct TimeoutConfig {
/// Timeout for resolve DNS look up for given address.
/// Default to 5 seconds.
Expand Down Expand Up @@ -45,6 +46,49 @@ impl Default for TimeoutConfig {
}
}

/// Configuration for various timeout setting for http request by client, all fields are optional,
/// the one not set will use the value from [TimeoutConfig] of the client.
pub struct PartialTimeoutConfig {
/// Timeout for resolve DNS look up for given address.
pub resolve_timeout: Option<Duration>,
/// Timeout for establishing http connection for the first time.
pub connect_timeout: Option<Duration>,
/// Timeout for tls handshake when tls features enabled.
pub tls_connect_timeout: Option<Duration>,
/// Timeout for request reach server and response head(all lines before response body) returns.
pub request_timeout: Option<Duration>,
/// Timeout for collecting response body.
pub response_timeout: Option<Duration>,
}

impl PartialTimeoutConfig {
pub const fn new() -> Self {
Self {
resolve_timeout: None,
connect_timeout: None,
tls_connect_timeout: None,
request_timeout: None,
response_timeout: None,
}
}

pub fn to_timeout_config(&self, client_timeout: &TimeoutConfig) -> TimeoutConfig {
TimeoutConfig {
resolve_timeout: self.resolve_timeout.unwrap_or(client_timeout.resolve_timeout),
connect_timeout: self.connect_timeout.unwrap_or(client_timeout.connect_timeout),
tls_connect_timeout: self.tls_connect_timeout.unwrap_or(client_timeout.tls_connect_timeout),
request_timeout: self.request_timeout.unwrap_or(client_timeout.request_timeout),
response_timeout: self.response_timeout.unwrap_or(client_timeout.response_timeout),
}
}
}

impl Default for PartialTimeoutConfig {
fn default() -> Self {
Self::new()
}
}

pub(crate) trait Timeout: Sized {
fn timeout(self, timer: Pin<&mut Sleep>) -> TimeoutFuture<'_, Self> {
TimeoutFuture { fut: self, timer }
Expand Down

0 comments on commit b84b020

Please sign in to comment.