Skip to content

Commit

Permalink
feat(client): add support for upgrade request
Browse files Browse the repository at this point in the history
  • Loading branch information
joelwurtz committed Feb 12, 2025
1 parent bc460ee commit f8b32ba
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 4 deletions.
59 changes: 59 additions & 0 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{
service::HttpService,
timeout::{Timeout, TimeoutConfig},
tls::connector::Connector,
upgrade::UpgradeRequest,
uri::Uri,
};

Expand Down Expand Up @@ -184,6 +185,64 @@ impl Client {
self.request_builder(url, Method::CONNECT).mutate_marker()
}

#[cfg(feature = "http1")]
/// Start a new upgrade request.
///
/// # Example
/// ```rust
/// use xitca_client::{Client, bytes::Bytes, http::Method};
///
/// async fn _main() -> Result<(), xitca_client::error::Error> {
/// // construct a new client and initialize connect request.
/// let client = Client::new();
/// let mut upgrade_response = client
/// .upgrade("http://localhost:8080", Method::GET)
/// .protocol("protocol1, protocol2")
/// .send().await?
/// ;
///
/// if let Some(upgrade) = upgrade_response.headers.get(xitca_client::http::header::UPGRADE) {
/// // check which protocol it was upgraded to
/// }
///
/// // upgrade_response is a response that contains the http request head and tunnel connection.
///
/// // import Stream trait and call it's method on tunnel to receive bytes.
/// use futures::StreamExt;
/// if let Some(Ok(_)) = upgrade_response.tunnel().next().await {
/// // received bytes data.
/// }
///
/// // import Sink trait and call it's method on tunnel to send bytes data.
/// use futures::SinkExt;
/// // send bytes data.
/// upgrade_response.tunnel().send(b"996").await?;
///
/// // tunnel support split sending/receiving task into different parts to enable concurrent bytes data handling.
/// let (_head, mut tunnel) = upgrade_response.into_parts();
/// let (mut write, mut read) = tunnel.split();
///
/// // read part can operate with Stream trait implement.
/// if let Some(Ok(_)) = read.next().await {
/// // received bytes data.
/// }
///
/// // write part can operate with Sink trait implement.
/// write.send(b"996").await?;
///
/// Ok(())
/// # }
/// ```
pub fn upgrade<U>(&self, url: U, method: Method) -> UpgradeRequest<'_>
where
uri::Uri: TryFrom<U>,
Error: From<<uri::Uri as TryFrom<U>>::Error>,
{
self.request_builder(url, method)
.version(Version::HTTP_11)
.mutate_marker()
}

#[cfg(all(feature = "websocket", feature = "http1"))]
/// Start a new websocket request.
///
Expand Down
6 changes: 3 additions & 3 deletions client/src/http_tunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ impl HttpTunnelRequest<'_> {
}

pub struct HttpTunnel {
body: ResponseBody,
io: TunnelIo,
pub(crate) body: ResponseBody,
pub(crate) io: TunnelIo,
}

// io type to bridge AsyncIo trait and h2 body's poll based read/write apis.
#[derive(Default)]
struct TunnelIo {
pub(crate) struct TunnelIo {
write_buf: BytesMut,
#[cfg(feature = "http2")]
adaptor: TunnelIoAdaptor,
Expand Down
1 change: 1 addition & 0 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ mod service;
mod timeout;
mod tls;
mod tunnel;
mod upgrade;
mod uri;

#[cfg(feature = "http1")]
Expand Down
2 changes: 1 addition & 1 deletion client/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
/// builder type for [http::Request] with extended functionalities.
pub struct RequestBuilder<'a, M = marker::Http> {
pub(crate) req: http::Request<BoxBody>,
err: Vec<Error>,
pub(crate) err: Vec<Error>,
client: &'a Client,
timeout: Duration,
_marker: PhantomData<M>,
Expand Down
102 changes: 102 additions & 0 deletions client/src/upgrade.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
//! http upgrade handling.
use super::{
error::{Error, ErrorResponse},
http::{
header::{self, HeaderValue},
response::Parts,
StatusCode,
},
http_tunnel::HttpTunnel,
request::RequestBuilder,
tunnel::Tunnel,
};
use std::ops::{Deref, DerefMut};

pub type UpgradeRequest<'a> = RequestBuilder<'a, marker::Upgrade>;
pub type UpgradeRequestWithProtocol<'a> = RequestBuilder<'a, marker::UpgradeWithProtocol>;

mod marker {
pub struct Upgrade;
pub struct UpgradeWithProtocol;
}

pub struct UpgradeResponse {
pub parts: Parts,
pub tunnel: Tunnel<HttpTunnel>,
}

impl<'a> UpgradeRequest<'a> {
pub fn protocol<V>(mut self, proto: V) -> UpgradeRequestWithProtocol<'a>
where
V: TryInto<HeaderValue>,
<V as TryInto<HeaderValue>>::Error: std::error::Error + Send + Sync + 'static,
{
match proto.try_into() {
Ok(v) => {
self.req.headers_mut().insert(header::UPGRADE, v);
}
Err(e) => {
self.push_error(Error::Std(Box::new(e)));
}
};

self.mutate_marker()
}
}

impl UpgradeRequestWithProtocol<'_> {
/// Send the request and wait for response asynchronously.
pub async fn send(mut self) -> Result<UpgradeResponse, Error> {
self.headers_mut()
.insert(header::CONNECTION, HeaderValue::from_static("upgrade"));

let res = self._send().await?;

let status = res.status();
let expect_status = StatusCode::SWITCHING_PROTOCOLS;
if status != expect_status {
return Err(Error::from(ErrorResponse {
expect_status,
status,
description: "upgrade tunnel can't be established",
}));
}

let (parts, body) = res.into_inner().into_parts();

Ok(UpgradeResponse {
parts,
tunnel: Tunnel::new(HttpTunnel {
body,
io: Default::default(),
}),
})
}
}

impl UpgradeResponse {
#[inline]
pub fn into_parts(self) -> (Parts, Tunnel<HttpTunnel>) {
(self.parts, self.tunnel)
}

#[inline]
pub fn tunnel(&mut self) -> &mut Tunnel<HttpTunnel> {
&mut self.tunnel
}
}

impl Deref for UpgradeResponse {
type Target = Parts;

fn deref(&self) -> &Self::Target {
&self.parts
}
}

impl DerefMut for UpgradeResponse {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.parts
}
}

0 comments on commit f8b32ba

Please sign in to comment.