Skip to content

Commit 2c7e98a

Browse files
authored
fix: improve retrying of dying pooled connections (#133)
1 parent 53aadac commit 2c7e98a

File tree

2 files changed

+51
-43
lines changed

2 files changed

+51
-43
lines changed

Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ features = ["full"]
1818
rustdoc-args = ["--cfg", "docsrs"]
1919

2020
[dependencies]
21-
hyper = "1.3.0"
21+
hyper = "1.4.0"
2222
futures-util = { version = "0.3.16", default-features = false }
2323
http = "1.0"
2424
http-body = "1.0.0"
@@ -32,7 +32,7 @@ tower-service ={ version = "0.3", optional = true }
3232
tower = { version = "0.4.1", optional = true, default-features = false, features = ["make", "util"] }
3333

3434
[dev-dependencies]
35-
hyper = { version = "1.3.0", features = ["full"] }
35+
hyper = { version = "1.4.0", features = ["full"] }
3636
bytes = "1"
3737
http-body-util = "0.1.0"
3838
tokio = { version = "1", features = ["macros", "test-util", "signal"] }

src/client/legacy/client.rs

+49-41
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use std::time::Duration;
1313

1414
use futures_util::future::{self, Either, FutureExt, TryFutureExt};
1515
use http::uri::Scheme;
16+
use hyper::client::conn::TrySendError as ConnTrySendError;
1617
use hyper::header::{HeaderValue, HOST};
1718
use hyper::rt::Timer;
1819
use hyper::{body::Body, Method, Request, Response, Uri, Version};
@@ -86,6 +87,11 @@ macro_rules! e {
8687
// We might change this... :shrug:
8788
type PoolKey = (http::uri::Scheme, http::uri::Authority);
8889

90+
enum TrySendError<B> {
91+
Retryable { error: Error, req: Request<B> },
92+
Nope(Error),
93+
}
94+
8995
/// A `Future` that will resolve to an HTTP Response.
9096
///
9197
/// This is returned by `Client::request` (and `Client::get`).
@@ -223,47 +229,46 @@ where
223229
ResponseFuture::new(self.clone().send_request(req, pool_key))
224230
}
225231

226-
/*
227-
async fn retryably_send_request(
232+
async fn send_request(
228233
self,
229234
mut req: Request<B>,
230235
pool_key: PoolKey,
231236
) -> Result<Response<hyper::body::Incoming>, Error> {
232237
let uri = req.uri().clone();
233238

234239
loop {
235-
req = match self.send_request(req, pool_key.clone()).await {
240+
req = match self.try_send_request(req, pool_key.clone()).await {
236241
Ok(resp) => return Ok(resp),
237-
Err(ClientError::Normal(err)) => return Err(err),
238-
Err(ClientError::Canceled {
239-
connection_reused,
240-
mut req,
241-
reason,
242-
}) => {
243-
if !self.config.retry_canceled_requests || !connection_reused {
242+
Err(TrySendError::Nope(err)) => return Err(err),
243+
Err(TrySendError::Retryable { mut req, error }) => {
244+
if !self.config.retry_canceled_requests {
244245
// if client disabled, don't retry
245246
// a fresh connection means we definitely can't retry
246-
return Err(reason);
247+
return Err(error);
247248
}
248249

249250
trace!(
250251
"unstarted request canceled, trying again (reason={:?})",
251-
reason
252+
error
252253
);
253254
*req.uri_mut() = uri.clone();
254255
req
255256
}
256257
}
257258
}
258259
}
259-
*/
260260

261-
async fn send_request(
262-
self,
261+
async fn try_send_request(
262+
&self,
263263
mut req: Request<B>,
264264
pool_key: PoolKey,
265-
) -> Result<Response<hyper::body::Incoming>, Error> {
266-
let mut pooled = self.connection_for(pool_key).await?;
265+
) -> Result<Response<hyper::body::Incoming>, TrySendError<B>> {
266+
let mut pooled = self
267+
.connection_for(pool_key)
268+
.await
269+
// `connection_for` already retries checkout errors, so if
270+
// it returns an error, there's not much else to retry
271+
.map_err(TrySendError::Nope)?;
267272

268273
req.extensions_mut()
269274
.get_mut::<CaptureConnectionExtension>()
@@ -272,7 +277,7 @@ where
272277
if pooled.is_http1() {
273278
if req.version() == Version::HTTP_2 {
274279
warn!("Connection is HTTP/1, but request requires HTTP/2");
275-
return Err(e!(UserUnsupportedVersion));
280+
return Err(TrySendError::Nope(e!(UserUnsupportedVersion)));
276281
}
277282

278283
if self.config.set_host {
@@ -301,18 +306,26 @@ where
301306
authority_form(req.uri_mut());
302307
}
303308

304-
let fut = pooled.send_request(req);
309+
let mut res = match pooled.try_send_request(req).await {
310+
Ok(res) => res,
311+
Err(mut err) => {
312+
return if let Some(req) = err.take_message() {
313+
Err(TrySendError::Retryable {
314+
error: e!(Canceled, err.into_error()),
315+
req,
316+
})
317+
} else {
318+
Err(TrySendError::Nope(e!(SendRequest, err.into_error())))
319+
}
320+
}
321+
};
305322
//.send_request_retryable(req)
306323
//.map_err(ClientError::map_with_reused(pooled.is_reused()));
307324

308325
// If the Connector included 'extra' info, add to Response...
309-
let extra_info = pooled.conn_info.extra.clone();
310-
let fut = fut.map_ok(move |mut res| {
311-
if let Some(extra) = extra_info {
312-
extra.set(res.extensions_mut());
313-
}
314-
res
315-
});
326+
if let Some(extra) = &pooled.conn_info.extra {
327+
extra.set(res.extensions_mut());
328+
}
316329

317330
// As of [email protected], there is a race condition in the mpsc
318331
// channel, such that sending when the receiver is closing can
@@ -322,11 +335,9 @@ where
322335
// To counteract this, we must check if our senders 'want' channel
323336
// has been closed after having tried to send. If so, error out...
324337
if pooled.is_closed() {
325-
return fut.await;
338+
return Ok(res);
326339
}
327340

328-
let res = fut.await?;
329-
330341
// If pooled is HTTP/2, we can toss this reference immediately.
331342
//
332343
// when pooled is dropped, it will try to insert back into the
@@ -749,37 +760,34 @@ impl<B> PoolClient<B> {
749760
}
750761

751762
impl<B: Body + 'static> PoolClient<B> {
752-
fn send_request(
763+
fn try_send_request(
753764
&mut self,
754765
req: Request<B>,
755-
) -> impl Future<Output = Result<Response<hyper::body::Incoming>, Error>>
766+
) -> impl Future<Output = Result<Response<hyper::body::Incoming>, ConnTrySendError<Request<B>>>>
756767
where
757768
B: Send,
758769
{
759770
#[cfg(all(feature = "http1", feature = "http2"))]
760771
return match self.tx {
761772
#[cfg(feature = "http1")]
762-
PoolTx::Http1(ref mut tx) => Either::Left(tx.send_request(req)),
773+
PoolTx::Http1(ref mut tx) => Either::Left(tx.try_send_request(req)),
763774
#[cfg(feature = "http2")]
764-
PoolTx::Http2(ref mut tx) => Either::Right(tx.send_request(req)),
765-
}
766-
.map_err(Error::tx);
775+
PoolTx::Http2(ref mut tx) => Either::Right(tx.try_send_request(req)),
776+
};
767777

768778
#[cfg(feature = "http1")]
769779
#[cfg(not(feature = "http2"))]
770780
return match self.tx {
771781
#[cfg(feature = "http1")]
772-
PoolTx::Http1(ref mut tx) => tx.send_request(req),
773-
}
774-
.map_err(Error::tx);
782+
PoolTx::Http1(ref mut tx) => tx.try_send_request(req),
783+
};
775784

776785
#[cfg(not(feature = "http1"))]
777786
#[cfg(feature = "http2")]
778787
return match self.tx {
779788
#[cfg(feature = "http2")]
780-
PoolTx::Http2(ref mut tx) => tx.send_request(req),
781-
}
782-
.map_err(Error::tx);
789+
PoolTx::Http2(ref mut tx) => tx.try_send_request(req),
790+
};
783791
}
784792
/*
785793
//TODO: can we re-introduce this somehow? Or must people use tower::retry?

0 commit comments

Comments
 (0)