Skip to content

Commit

Permalink
feat(client): allow !Send IO with HTTP/1 client (#3371)
Browse files Browse the repository at this point in the history
This removes the requirement of the IO type from being `Send` for the
HTTP/1 client connection. To do so, the ability to perform
`hyper::upgrade`s had to be moved to a separate type which does require
the `Send` bound. This mirrors how the server types do it.

The `Connection` type now has a `with_upgrades()` method to convert.

Closes #3363

BREAKING CHANGE: If you use client HTTP/1 upgrades, you must call
  `Connection::with_upgrades()` to still work the same.
  • Loading branch information
seanmonstar authored Oct 26, 2023
1 parent 569a42d commit cf87eda
Showing 3 changed files with 83 additions and 32 deletions.
4 changes: 2 additions & 2 deletions examples/single_threaded.rs
Original file line number Diff line number Diff line change
@@ -138,7 +138,7 @@ async fn http1_server() -> Result<(), Box<dyn std::error::Error>> {
loop {
let (stream, _) = listener.accept().await?;

let io = TokioIo::new(stream);
let io = IOTypeNotSend::new(TokioIo::new(stream));

let cnt = counter.clone();

@@ -166,7 +166,7 @@ async fn http1_client(url: hyper::Uri) -> Result<(), Box<dyn std::error::Error>>
let addr = format!("{}:{}", host, port);
let stream = TcpStream::connect(addr).await?;

let io = TokioIo::new(stream);
let io = IOTypeNotSend::new(TokioIo::new(stream));

let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;

3 changes: 2 additions & 1 deletion examples/upgrades.rs
Original file line number Diff line number Diff line change
@@ -107,7 +107,8 @@ async fn client_upgrade_request(addr: SocketAddr) -> Result<()> {
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;

tokio::task::spawn(async move {
if let Err(err) = conn.await {
// Don't forget to enable upgrades on the connection.
if let Err(err) = conn.with_upgrades().await {
println!("Connection failed: {:?}", err);
}
});
108 changes: 79 additions & 29 deletions src/client/conn/http1.rs
Original file line number Diff line number Diff line change
@@ -14,7 +14,6 @@ use httparse::ParserConfig;
use super::super::dispatch;
use crate::body::{Body, Incoming as IncomingBody};
use crate::proto;
use crate::upgrade::Upgraded;

type Dispatcher<T, B> =
proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
@@ -51,23 +50,23 @@ pub struct Parts<T> {
#[must_use = "futures do nothing unless polled"]
pub struct Connection<T, B>
where
T: Read + Write + Send + 'static,
T: Read + Write + 'static,
B: Body + 'static,
{
inner: Option<Dispatcher<T, B>>,
inner: Dispatcher<T, B>,
}

impl<T, B> Connection<T, B>
where
T: Read + Write + Send + Unpin + 'static,
T: Read + Write + Unpin + 'static,
B: Body + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
/// Return the inner IO object, and additional information.
///
/// Only works for HTTP/1 connections. HTTP/2 connections will panic.
pub fn into_parts(self) -> Parts<T> {
let (io, read_buf, _) = self.inner.expect("already upgraded").into_inner();
let (io, read_buf, _) = self.inner.into_inner();
Parts {
io,
read_buf,
@@ -87,10 +86,7 @@ where
/// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
/// to work with this function; or use the `without_shutdown` wrapper.
pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
self.inner
.as_mut()
.expect("already upgraded")
.poll_without_shutdown(cx)
self.inner.poll_without_shutdown(cx)
}
}

@@ -119,7 +115,7 @@ pub struct Builder {
/// See [`client::conn`](crate::client::conn) for more.
pub async fn handshake<T, B>(io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
where
T: Read + Write + Unpin + Send + 'static,
T: Read + Write + Unpin + 'static,
B: Body + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
@@ -240,9 +236,23 @@ impl<B> fmt::Debug for SendRequest<B> {

// ===== impl Connection

impl<T, B> Connection<T, B>
where
T: Read + Write + Unpin + Send + 'static,
B: Body + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
/// Enable this connection to support higher-level HTTP upgrades.
///
/// See [the `upgrade` module](crate::upgrade) for more.
pub fn with_upgrades(self) -> upgrades::UpgradeableConnection<T, B> {
upgrades::UpgradeableConnection { inner: Some(self) }
}
}

impl<T, B> fmt::Debug for Connection<T, B>
where
T: Read + Write + fmt::Debug + Send + 'static,
T: Read + Write + fmt::Debug + 'static,
B: Body + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
@@ -252,27 +262,24 @@ where

impl<T, B> Future for Connection<T, B>
where
T: Read + Write + Unpin + Send + 'static,
T: Read + Write + Unpin + 'static,
B: Body + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type Output = crate::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(self.inner.as_mut().unwrap()).poll(cx))? {
match ready!(Pin::new(&mut self.inner).poll(cx))? {
proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
proto::Dispatched::Upgrade(pending) => match self.inner.take() {
Some(h1) => {
let (io, buf, _) = h1.into_inner();
pending.fulfill(Upgraded::new(io, buf));
Poll::Ready(Ok(()))
}
_ => {
drop(pending);
unreachable!("Upgraded twice");
}
},
proto::Dispatched::Upgrade(pending) => {
// With no `Send` bound on `I`, we can't try to do
// upgrades here. In case a user was trying to use
// `upgrade` with this API, send a special
// error letting them know about that.
pending.manual();
Poll::Ready(Ok(()))
}
}
}
}
@@ -474,7 +481,7 @@ impl Builder {
io: T,
) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
where
T: Read + Write + Unpin + Send + 'static,
T: Read + Write + Unpin + 'static,
B: Body + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
@@ -518,10 +525,53 @@ impl Builder {
let cd = proto::h1::dispatch::Client::new(rx);
let proto = proto::h1::Dispatcher::new(cd, conn);

Ok((
SendRequest { dispatch: tx },
Connection { inner: Some(proto) },
))
Ok((SendRequest { dispatch: tx }, Connection { inner: proto }))
}
}
}

mod upgrades {
use crate::upgrade::Upgraded;

use super::*;

// A future binding a connection with a Service with Upgrade support.
//
// This type is unnameable outside the crate.
#[must_use = "futures do nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct UpgradeableConnection<T, B>
where
T: Read + Write + Unpin + Send + 'static,
B: Body + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
pub(super) inner: Option<Connection<T, B>>,
}

impl<I, B> Future for UpgradeableConnection<I, B>
where
I: Read + Write + Unpin + Send + 'static,
B: Body + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type Output = crate::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(&mut self.inner.as_mut().unwrap().inner).poll(cx)) {
Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
Ok(proto::Dispatched::Upgrade(pending)) => {
let Parts {
io,
read_buf,
_inner,
} = self.inner.take().unwrap().into_parts();
pending.fulfill(Upgraded::new(io, read_buf));
Poll::Ready(Ok(()))
}
Err(e) => Poll::Ready(Err(e)),
}
}
}
}

0 comments on commit cf87eda

Please sign in to comment.