Skip to content

Commit

Permalink
feat(client): allow to set response timeout per request
Browse files Browse the repository at this point in the history
  • Loading branch information
joelwurtz committed Feb 13, 2025
1 parent cec4ef4 commit 94ec4c0
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 28 deletions.
17 changes: 15 additions & 2 deletions client/src/middleware/redirect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,28 @@ where
type Error = Error;

async fn call(&self, req: ServiceRequest<'r, 'c>) -> Result<Self::Response, Self::Error> {
let ServiceRequest { req, client, timeout } = req;
let ServiceRequest {
req,
client,
request_timeout,
response_timeout,
} = req;
let mut headers = req.headers().clone();
let mut method = req.method().clone();
let mut uri = req.uri().clone();
let ext = req.extensions().clone();
let mut count = 0;

loop {
let mut res = self.service.call(ServiceRequest { req, client, timeout }).await?;
let mut res = self
.service
.call(ServiceRequest {
req,
client,
request_timeout,
response_timeout,
})
.await?;

if count == MAX {
return Ok(res);
Expand Down
32 changes: 21 additions & 11 deletions client/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ pub struct RequestBuilder<'a, M = marker::Http> {
pub(crate) req: http::Request<BoxBody>,
pub(crate) err: Vec<Error>,
client: &'a Client,
timeout: Duration,
request_timeout: Duration,
response_timeout: Duration,
_marker: PhantomData<M>,
}

Expand Down Expand Up @@ -103,7 +104,8 @@ impl<'a, M> RequestBuilder<'a, M> {
req: req.map(BoxBody::new),
err: Vec::new(),
client,
timeout: client.timeout_config.request_timeout,
request_timeout: client.timeout_config.request_timeout,
response_timeout: client.timeout_config.response_timeout,
_marker: PhantomData,
}
}
Expand All @@ -113,7 +115,8 @@ impl<'a, M> RequestBuilder<'a, M> {
req: self.req,
err: self.err,
client: self.client,
timeout: self.timeout,
request_timeout: self.request_timeout,
response_timeout: self.response_timeout,
_marker: PhantomData,
}
}
Expand All @@ -124,7 +127,8 @@ impl<'a, M> RequestBuilder<'a, M> {
mut req,
err,
client,
timeout,
request_timeout,
response_timeout,
..
} = self;

Expand All @@ -137,7 +141,8 @@ impl<'a, M> RequestBuilder<'a, M> {
.call(ServiceRequest {
req: &mut req,
client,
timeout,
request_timeout,
response_timeout,
})
.await
}
Expand Down Expand Up @@ -199,14 +204,19 @@ impl<'a, M> RequestBuilder<'a, M> {
self
}

/// Set timeout of this request.
/// Set timeout for request.
///
/// The value passed would override global [ClientBuilder::set_request_timeout].
/// Default to client's [TimeoutConfig::request_timeout].
pub fn set_request_timeout(mut self, dur: Duration) -> Self {
self.request_timeout = dur;
self
}

/// Set timeout for collecting response body.
///
/// [ClientBuilder::set_request_timeout]: crate::builder::ClientBuilder::set_request_timeout
#[inline]
pub fn timeout(mut self, dur: Duration) -> Self {
self.timeout = dur;
/// Default to client's [TimeoutConfig::response_timeout].
pub fn set_response_timeout(mut self, dur: Duration) -> Self {
self.response_timeout = dur;
self
}

Expand Down
35 changes: 20 additions & 15 deletions client/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ where
pub struct ServiceRequest<'r, 'c> {
pub req: &'r mut Request<BoxBody>,
pub client: &'c Client,
pub timeout: Duration,
pub request_timeout: Duration,
pub response_timeout: Duration,
}

/// type alias for object safe wrapper of type implement [Service] trait.
Expand All @@ -85,7 +86,12 @@ pub(crate) fn base_service() -> HttpService {
#[cfg(any(feature = "http1", feature = "http2", feature = "http3"))]
use crate::{error::TimeoutError, timeout::Timeout};

let ServiceRequest { req, client, timeout } = req;
let ServiceRequest {
req,
client,
request_timeout,
response_timeout,
} = req;

let uri = Uri::try_parse(req.uri())?;

Expand All @@ -102,7 +108,7 @@ pub(crate) fn base_service() -> HttpService {
match version {
Version::HTTP_2 | Version::HTTP_3 => match client.shared_pool.acquire(&connect.uri).await {
shared::AcquireOutput::Conn(mut _conn) => {
let mut _timer = Box::pin(tokio::time::sleep(timeout));
let mut _timer = Box::pin(tokio::time::sleep(request_timeout));
*req.version_mut() = version;
#[allow(unreachable_code)]
return match _conn.conn {
Expand All @@ -112,10 +118,7 @@ pub(crate) fn base_service() -> HttpService {
.timeout(_timer.as_mut())
.await
{
Ok(Ok(res)) => {
let timeout = client.timeout_config.response_timeout;
Ok(Response::new(res, _timer, timeout))
}
Ok(Ok(res)) => Ok(Response::new(res, _timer, response_timeout)),
Ok(Err(e)) => {
_conn.destroy_on_drop();
Err(e.into())
Expand All @@ -133,8 +136,7 @@ pub(crate) fn base_service() -> HttpService {
.await
.map_err(|_| TimeoutError::Request)??;

let timeout = client.timeout_config.response_timeout;
Ok(Response::new(res, _timer, timeout))
Ok(Response::new(res, _timer, response_timeout))
}
};
}
Expand Down Expand Up @@ -218,7 +220,7 @@ pub(crate) fn base_service() -> HttpService {

#[cfg(feature = "http1")]
{
let mut timer = Box::pin(tokio::time::sleep(timeout));
let mut timer = Box::pin(tokio::time::sleep(request_timeout));
let res = crate::h1::proto::send(&mut *_conn, _date, req)
.timeout(timer.as_mut())
.await;
Expand All @@ -230,8 +232,8 @@ pub(crate) fn base_service() -> HttpService {
}
let body = crate::h1::body::ResponseBody::new(_conn, buf, decoder);
let res = res.map(|_| crate::body::ResponseBody::H1(body));
let timeout = client.timeout_config.response_timeout;
Ok(Response::new(res, timer, timeout))

Ok(Response::new(res, timer, response_timeout))
}
Ok(Err(e)) => {
_conn.destroy_on_drop();
Expand Down Expand Up @@ -306,7 +308,8 @@ mod test {
ServiceRequest {
req,
client: &self.0,
timeout: self.0.timeout_config.request_timeout,
request_timeout: self.0.timeout_config.request_timeout,
response_timeout: self.0.timeout_config.response_timeout,
}
}
}
Expand All @@ -317,7 +320,9 @@ mod test {

async fn call(
&self,
ServiceRequest { req, timeout, .. }: ServiceRequest<'r, 'c>,
ServiceRequest {
req, response_timeout, ..
}: ServiceRequest<'r, 'c>,
) -> Result<Self::Response, Self::Error> {
let handler = req.extensions().get::<HandlerFn>().unwrap().clone();

Expand All @@ -326,7 +331,7 @@ mod test {
Ok(Response::new(
res,
Box::pin(tokio::time::sleep(Duration::from_secs(0))),
timeout,
response_timeout,
))
}
}
Expand Down

0 comments on commit 94ec4c0

Please sign in to comment.