-
Notifications
You must be signed in to change notification settings - Fork 111
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(client): add
proxy::Tunnel
legacy util
- Loading branch information
1 parent
7afb1ed
commit 1ffb195
Showing
6 changed files
with
263 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
//! Proxy helpers | ||
mod tunnel; | ||
|
||
pub use self::tunnel::Tunnel; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,181 @@ | ||
use std::future::Future; | ||
use std::marker::{PhantomData, Unpin}; | ||
use std::pin::Pin; | ||
use std::task::{self, Poll}; | ||
|
||
use http::{HeaderValue, Uri}; | ||
use hyper::rt::{Read, Write}; | ||
use pin_project_lite::pin_project; | ||
use tower_service::Service; | ||
|
||
/// Tunnel Proxy via HTTP CONNECT | ||
#[derive(Debug)] | ||
pub struct Tunnel<C> { | ||
auth: Option<HeaderValue>, | ||
inner: C, | ||
proxy_dst: Uri, | ||
} | ||
|
||
#[derive(Debug)] | ||
pub enum TunnelError<C> { | ||
Inner(C), | ||
Io(std::io::Error), | ||
MissingHost, | ||
ProxyAuthRequired, | ||
ProxyHeadersTooLong, | ||
TunnelUnexpectedEof, | ||
TunnelUnsuccessful, | ||
} | ||
|
||
pin_project! { | ||
// Not publicly exported (so missing_docs doesn't trigger). | ||
// | ||
// We return this `Future` instead of the `Pin<Box<dyn Future>>` directly | ||
// so that users don't rely on it fitting in a `Pin<Box<dyn Future>>` slot | ||
// (and thus we can change the type in the future). | ||
#[must_use = "futures do nothing unless polled"] | ||
#[allow(missing_debug_implementations)] | ||
pub struct Tunneling<F, T, E> { | ||
#[pin] | ||
fut: BoxTunneling<T, E>, | ||
_marker: PhantomData<F>, | ||
} | ||
} | ||
|
||
type BoxTunneling<T, E> = Pin<Box<dyn Future<Output = Result<T, TunnelError<E>>> + Send>>; | ||
|
||
impl<C> Tunnel<C> { | ||
/// Create a new Tunnel service. | ||
pub fn new(proxy_dst: Uri, connector: C) -> Self { | ||
Self { | ||
auth: None, | ||
inner: connector, | ||
proxy_dst, | ||
} | ||
} | ||
|
||
/// Add `proxy-authorization` header value to the CONNECT request. | ||
pub fn with_auth(mut self, mut auth: HeaderValue) -> Self { | ||
// just in case the user forgot | ||
auth.set_sensitive(true); | ||
self.auth = Some(auth); | ||
self | ||
} | ||
} | ||
|
||
impl<C> Service<Uri> for Tunnel<C> | ||
where | ||
C: Service<Uri>, | ||
C::Future: Send + 'static, | ||
C::Response: Read + Write + Unpin + Send + 'static, | ||
C::Error: Send + 'static, | ||
{ | ||
type Response = C::Response; | ||
type Error = TunnelError<C::Error>; | ||
type Future = Tunneling<C::Future, C::Response, C::Error>; | ||
|
||
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
futures_util::ready!(self.inner.poll_ready(cx)).map_err(TunnelError::Inner)?; | ||
Poll::Ready(Ok(())) | ||
} | ||
|
||
fn call(&mut self, dst: Uri) -> Self::Future { | ||
let connecting = self.inner.call(self.proxy_dst.clone()); | ||
|
||
Tunneling { | ||
fut: Box::pin(async move { | ||
let conn = connecting.await.map_err(TunnelError::Inner)?; | ||
tunnel( | ||
conn, | ||
dst.host().ok_or(TunnelError::MissingHost)?, | ||
dst.port().map(|p| p.as_u16()).unwrap_or(443), | ||
None, | ||
None, | ||
) | ||
.await | ||
}), | ||
_marker: PhantomData, | ||
} | ||
} | ||
} | ||
|
||
impl<F, T, E> Future for Tunneling<F, T, E> | ||
where | ||
F: Future<Output = Result<T, E>>, | ||
{ | ||
type Output = Result<T, TunnelError<E>>; | ||
|
||
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { | ||
self.project().fut.poll(cx) | ||
} | ||
} | ||
|
||
async fn tunnel<T, E>( | ||
mut conn: T, | ||
host: &str, | ||
port: u16, | ||
user_agent: Option<HeaderValue>, | ||
auth: Option<HeaderValue>, | ||
) -> Result<T, TunnelError<E>> | ||
where | ||
T: Read + Write + Unpin, | ||
{ | ||
let mut buf = format!( | ||
"\ | ||
CONNECT {host}:{port} HTTP/1.1\r\n\ | ||
Host: {host}:{port}\r\n\ | ||
" | ||
) | ||
.into_bytes(); | ||
|
||
// user-agent | ||
if let Some(user_agent) = user_agent { | ||
buf.extend_from_slice(b"User-Agent: "); | ||
buf.extend_from_slice(user_agent.as_bytes()); | ||
buf.extend_from_slice(b"\r\n"); | ||
} | ||
|
||
// proxy-authorization | ||
if let Some(value) = auth { | ||
//log::debug!("tunnel to {host}:{port} using basic auth"); | ||
buf.extend_from_slice(b"Proxy-Authorization: "); | ||
buf.extend_from_slice(value.as_bytes()); | ||
buf.extend_from_slice(b"\r\n"); | ||
} | ||
|
||
// headers end | ||
buf.extend_from_slice(b"\r\n"); | ||
|
||
crate::rt::write_all(&mut conn, &buf) | ||
.await | ||
.map_err(TunnelError::Io)?; | ||
|
||
let mut buf = [0; 8192]; | ||
let mut pos = 0; | ||
|
||
loop { | ||
let n = crate::rt::read(&mut conn, &mut buf[pos..]) | ||
.await | ||
.map_err(TunnelError::Io)?; | ||
|
||
if n == 0 { | ||
return Err(TunnelError::TunnelUnexpectedEof); | ||
} | ||
pos += n; | ||
|
||
let recvd = &buf[..pos]; | ||
if recvd.starts_with(b"HTTP/1.1 200") || recvd.starts_with(b"HTTP/1.0 200") { | ||
if recvd.ends_with(b"\r\n\r\n") { | ||
return Ok(conn); | ||
} | ||
if pos == buf.len() { | ||
return Err(TunnelError::ProxyHeadersTooLong); | ||
} | ||
// else read more | ||
} else if recvd.starts_with(b"HTTP/1.1 407") { | ||
return Err(TunnelError::ProxyAuthRequired); | ||
} else { | ||
return Err(TunnelError::TunnelUnsuccessful); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
use std::marker::Unpin; | ||
use std::pin::Pin; | ||
use std::task::Poll; | ||
|
||
use futures_util::future; | ||
use futures_util::ready; | ||
use hyper::rt::{Read, ReadBuf, Write}; | ||
|
||
pub(crate) async fn read<T>(io: &mut T, buf: &mut [u8]) -> Result<usize, std::io::Error> | ||
where | ||
T: Read + Unpin, | ||
{ | ||
future::poll_fn(move |cx| { | ||
let mut buf = ReadBuf::new(buf); | ||
ready!(Pin::new(&mut *io).poll_read(cx, buf.unfilled()))?; | ||
Poll::Ready(Ok(buf.filled().len())) | ||
}) | ||
.await | ||
} | ||
|
||
pub(crate) async fn write_all<T>(io: &mut T, buf: &[u8]) -> Result<(), std::io::Error> | ||
where | ||
T: Write + Unpin, | ||
{ | ||
let mut n = 0; | ||
future::poll_fn(move |cx| { | ||
while n < buf.len() { | ||
n += ready!(Pin::new(&mut *io).poll_write(cx, &buf[n..])?); | ||
} | ||
Poll::Ready(Ok(())) | ||
}) | ||
.await | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
use tokio::io::{AsyncReadExt, AsyncWriteExt}; | ||
use tokio::net::TcpListener; | ||
use tower_service::Service; | ||
|
||
use hyper_util::client::legacy::connect::{proxy::Tunnel, HttpConnector}; | ||
|
||
#[cfg(not(miri))] | ||
#[tokio::test] | ||
async fn test_tunnel_works() { | ||
let tcp = TcpListener::bind("127.0.0.1:0").await.expect("bind"); | ||
let addr = tcp.local_addr().expect("local_addr"); | ||
|
||
let proxy_dst = format!("http://{}", addr).parse().expect("uri"); | ||
let mut connector = Tunnel::new(proxy_dst, HttpConnector::new()); | ||
let t1 = tokio::spawn(async move { | ||
let _conn = connector | ||
.call("https://hyper.rs".parse().unwrap()) | ||
.await | ||
.expect("tunnel"); | ||
}); | ||
|
||
let t2 = tokio::spawn(async move { | ||
let (mut io, _) = tcp.accept().await.expect("accept"); | ||
let mut buf = [0u8; 64]; | ||
let n = io.read(&mut buf).await.expect("read 1"); | ||
assert_eq!( | ||
&buf[..n], | ||
b"CONNECT hyper.rs:443 HTTP/1.1\r\nHost: hyper.rs:443\r\n\r\n" | ||
); | ||
io.write_all(b"HTTP/1.1 200 OK\r\n\r\n") | ||
.await | ||
.expect("write 1"); | ||
}); | ||
|
||
t1.await.expect("task 1"); | ||
t2.await.expect("task 2"); | ||
} |