From c6980238ebe7d439c662b5c39f1550066b5e41d3 Mon Sep 17 00:00:00 2001 From: Brandon Ros Date: Sat, 24 Aug 2024 17:57:23 -0400 Subject: [PATCH] proof of concept broken async outline --- Cargo.lock | 216 +++++++++++++++++++++++++++++ Cargo.toml | 2 + src/async_stream.rs | 331 ++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + src/request.rs | 83 +++++++++-- src/tls.rs | 39 ++++++ 6 files changed, 660 insertions(+), 12 deletions(-) create mode 100644 src/async_stream.rs diff --git a/Cargo.lock b/Cargo.lock index 5bc715d..ded3acd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,6 +11,77 @@ dependencies = [ "memchr", ] +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-io" +version = "2.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "444b0228950ee6501b3568d3c93bf1176a1fdbc3b758dcd9475046d30f4dc7e8" +dependencies = [ + "async-lock", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite", + "parking", + "polling", + "rustix", + "slab", + "tracing", + "windows-sys 0.59.0", +] + +[[package]] +name = "async-lock" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + +[[package]] +name = "async-net" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b948000fad4873c1c9339d60f2623323a0cfd3816e5181033c6a5cb68b2accf7" +dependencies = [ + "async-io", + "blocking", + "futures-lite", +] + +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + +[[package]] +name = "autocfg" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" + [[package]] name = "aws-lc-rs" version = "1.8.1" @@ -73,6 +144,19 @@ version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +[[package]] +name = "blocking" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "703f41c54fc768e63e091340b424302bb1c29ef4aa0c7f10fe849dfb114d29ea" +dependencies = [ + "async-channel", + "async-task", + "futures-io", + "futures-lite", + "piper", +] + [[package]] name = "cc" version = "1.1.13" @@ -119,6 +203,15 @@ dependencies = [ "cc", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -135,6 +228,12 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "dunce" version = "1.0.5" @@ -157,6 +256,27 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "event-listener" +version = "5.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fastrand" version = "2.1.0" @@ -184,6 +304,31 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "futures-core" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-lite" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + [[package]] name = "getrandom" version = "0.2.15" @@ -201,6 +346,12 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "hermit-abi" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" + [[package]] name = "home" version = "0.5.9" @@ -214,6 +365,8 @@ dependencies = [ name = "http_req" version = "0.12.0" dependencies = [ + "async-net", + "futures-lite", "native-tls", "rustls", "rustls-pemfile", @@ -376,18 +529,56 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "paste" version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "pin-project-lite" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" + +[[package]] +name = "piper" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96c8c490f422ef9a4efd2cb5b42b76c8613d7e7dfc1caf667b8a3350a5acc066" +dependencies = [ + "atomic-waker", + "fastrand", + "futures-io", +] + [[package]] name = "pkg-config" version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" +[[package]] +name = "polling" +version = "3.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc2790cd301dec6cd3b7a025e4815cf825724a51c98dccfe6a3e55f05ffb6511" +dependencies = [ + "cfg-if", + "concurrent-queue", + "hermit-abi", + "pin-project-lite", + "rustix", + "tracing", + "windows-sys 0.59.0", +] + [[package]] name = "prettyplease" version = "0.2.20" @@ -560,6 +751,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + [[package]] name = "spin" version = "0.9.8" @@ -596,6 +796,22 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "tracing" +version = "0.1.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +dependencies = [ + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" + [[package]] name = "unicase" version = "2.7.0" diff --git a/Cargo.toml b/Cargo.toml index ee22a3e..5293783 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,8 @@ keywords = ["http", "client", "request"] edition = "2021" [dependencies] +async-net = "2.0.0" +futures-lite = "2.3.0" unicase = "^2.7" [features] diff --git a/src/async_stream.rs b/src/async_stream.rs new file mode 100644 index 0000000..0d3180d --- /dev/null +++ b/src/async_stream.rs @@ -0,0 +1,331 @@ +//! TCP stream + +use async_net::TcpStream; +use futures_lite::{AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt}; + +use crate::{error::Error, tls, uri::Uri, CR_LF, LF}; +use std::{ + io::{self}, net::ToSocketAddrs, path::Path, pin::Pin, sync::mpsc::{Receiver, RecvTimeoutError, Sender}, time::{Duration, Instant} +}; + +const BUF_SIZE: usize = 16 * 1000; + +/// Wrapper around TCP stream for HTTP and HTTPS protocols. +/// Allows to perform common operations on underlying stream. +#[derive(Debug)] +pub enum AsyncStream { + Http(TcpStream), + Https(tls::AsyncConn), +} + +impl<'a> AsyncStream { + /// Opens a TCP connection to a remote host with a connection timeout (if specified). + #[deprecated( + since = "0.12.0", + note = "Stream::new(uri, connect_timeout) was replaced with Stream::connect(uri, connect_timeout)" + )] + pub async fn new(uri: &Uri<'a>, connect_timeout: Option) -> Result { + AsyncStream::connect(uri, connect_timeout).await + } + + /// Opens a TCP connection to a remote host with a connection timeout (if specified). + pub async fn connect(uri: &Uri<'a>, connect_timeout: Option) -> Result { + let host = uri.host().unwrap_or(""); + let port = uri.corr_port(); + + let stream = match connect_timeout { + Some(timeout) => connect_with_timeout(host, port, timeout).await?, + None => TcpStream::connect((host, port)).await?, + }; + + Ok(AsyncStream::Http(stream)) + } + + /// Tries to establish a secure connection over TLS. + /// + /// Checks if `uri` scheme denotes a HTTPS protocol: + /// - If yes, attemps to establish a secure connection + /// - Otherwise, returns the `stream` without any modification + pub async fn try_to_https( + stream: AsyncStream, + uri: &Uri<'a>, + root_cert_file_pem: Option<&Path>, + ) -> Result { + match stream { + AsyncStream::Http(http_stream) => { + if uri.scheme() == "https" { + let host = uri.host().unwrap_or(""); + let mut cnf = tls::Config::default(); + + let cnf = match root_cert_file_pem { + Some(p) => cnf.add_root_cert_file_pem(p)?, + None => &mut cnf, + }; + + let stream = cnf.async_connect(host, http_stream).await?; + Ok(AsyncStream::Https(stream)) + } else { + Ok(AsyncStream::Http(http_stream)) + } + } + AsyncStream::Https(_) => Ok(stream), + } + } + + /// Sets the read timeout on the underlying TCP stream. + pub fn set_read_timeout(&mut self, dur: Option) -> Result<(), Error> { + todo!() + } + + /// Sets the write timeout on the underlying TCP stream. + pub fn set_write_timeout(&mut self, dur: Option) -> Result<(), Error> { + todo!() + } +} + +impl AsyncRead for AsyncStream { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut [u8], + ) -> std::task::Poll> { + let this = self.get_mut(); + match this { + AsyncStream::Http(stream) => Pin::new(stream).poll_read(cx, buf), + AsyncStream::Https(conn) => Pin::new(conn.get_mut()).poll_read(cx, buf), + } + } +} + +impl AsyncWrite for AsyncStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + let this = self.get_mut(); + match this { + AsyncStream::Http(stream) => Pin::new(stream).poll_write(cx, buf), + AsyncStream::Https(conn) => Pin::new(conn.get_mut()).poll_write(cx, buf), + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + let this = self.get_mut(); + match this { + AsyncStream::Http(stream) => Pin::new(stream).poll_flush(cx), + AsyncStream::Https(conn) => Pin::new(conn.get_mut()).poll_flush(cx), + } + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + let this = self.get_mut(); + match this { + AsyncStream::Http(stream) => Pin::new(stream).poll_close(cx), + AsyncStream::Https(conn) => Pin::new(conn.get_mut()).poll_close(cx), + } + } +} + +/// Trait that allows to send data from readers to other threads +pub trait AsyncThreadSend { + /// Reads `head` of the response and sends it via `sender` + async fn async_send_head(&mut self, sender: &Sender>); + + /// Reads all bytes until EOF and sends them via `sender` + async fn async_send_all(&mut self, sender: &Sender>); +} + +impl AsyncThreadSend for T +where + T: AsyncBufRead + Unpin, +{ + async fn async_send_head(&mut self, sender: &Sender>) { + let buf = read_head(self).await; + sender.send(buf).unwrap_or(()); + } + + async fn async_send_all(&mut self, sender: &Sender>) { + loop { + let mut buf = [0; BUF_SIZE]; + + match self.read(&mut buf).await { + Ok(0) | Err(_) => break, + Ok(len) => { + let filled_buf = buf[..len].to_vec(); + if let Err(_) = sender.send(filled_buf) { + break; + } + } + } + } + } +} + +/// Trait that allows to receive data from receivers +pub trait AsyncThreadReceive { + /// Receives data from `receiver` and writes them into this writer. + /// Fails if `deadline` is exceeded. + async fn async_receive(&mut self, receiver: &Receiver>, deadline: Instant) -> Result<(), Error>; + + /// Continuosly receives data from `receiver` until there is no more data + /// or `deadline` is exceeded. Writes received data into this writer. + async fn async_receive_all(&mut self, receiver: &Receiver>, deadline: Instant) + -> Result<(), Error>; +} + +impl AsyncThreadReceive for T +where + T: AsyncWriteExt + Unpin, +{ + async fn async_receive(&mut self, receiver: &Receiver>, deadline: Instant) -> Result<(), Error> { + let now = Instant::now(); + let data_read = receiver.recv_timeout(deadline - now)?; + + Ok(self.write_all(&data_read).await?) + } + + async fn async_receive_all( + &mut self, + receiver: &Receiver>, + deadline: Instant, + ) -> Result<(), Error> { + // TODO: can't do a closure + todo!() + /*async_execute_with_deadline(deadline, async |remaining_time| { + let data_read = match receiver.recv_timeout(remaining_time) { + Ok(data) => data, + Err(e) => match e { + RecvTimeoutError::Timeout => return Err(Error::Timeout), + RecvTimeoutError::Disconnected => return Ok(true), + }, + }; + + self.write_all(&data_read).await.map_err(|e| Error::IO(e))?; + Ok(false) + }).await*/ + } +} + +/// Connects to the target host with a specified timeout. +pub async fn connect_with_timeout(host: T, port: u16, timeout: U) -> io::Result +where + Duration: From, + T: AsRef, +{ + let host = host.as_ref(); + let timeout = Duration::from(timeout); + let addrs: Vec<_> = (host, port).to_socket_addrs()?.collect(); + let count = addrs.len(); + + for (idx, addr) in addrs.into_iter().enumerate() { + // TODO: don't have good timeout mechanism + /*match TcpStream::connect_timeout(&addr, timeout) { + Ok(stream) => return Ok(stream), + Err(err) => match err.kind() { + io::ErrorKind::TimedOut => return Err(err), + _ => { + if idx + 1 == count { + return Err(err); + } + } + }, + };*/ + todo!() + } + + Err(io::Error::new( + io::ErrorKind::AddrNotAvailable, + format!("Could not resolve address for {:?}", host), + )) +} + +/// Exexcutes a function in a loop until operation is completed or deadline is exceeded. +/// +/// It checks if a timeout was exceeded every iteration, therefore it limits +/// how many time a specific function can be called before deadline. +/// For the `execute_with_deadline` to meet the deadline, each call +/// to `func` needs finish before the deadline. +/// +/// Key information about function `func`: +/// - is provided with information about remaining time +/// - must ensure that its execution will not take more time than specified in `remaining_time` +/// - needs to return `Some(true)` when the operation is complete, and `Some(false)` - when operation is in progress +pub fn execute_with_deadline(deadline: Instant, mut func: F) -> Result<(), Error> +where + F: FnMut(Duration) -> Result, +{ + loop { + let now = Instant::now(); + let remaining_time = deadline - now; + + if deadline < now { + return Err(Error::Timeout); + } + + match func(remaining_time) { + Ok(true) => break, + Ok(false) => continue, + Err(e) => return Err(e), + } + } + + Ok(()) +} + +/// Executes an asynchronous function in a loop until the operation is completed or the deadline is exceeded. +/// +/// It checks if a timeout was exceeded every iteration, therefore it limits +/// how many times a specific function can be called before the deadline. +/// For the `async_execute_with_deadline` to meet the deadline, each call +/// to `func` needs to finish before the deadline. +/// +/// Key information about the function `func`: +/// - is provided with information about remaining time +/// - must ensure that its execution will not take more time than specified in `remaining_time` +/// - needs to return `Ok(true)` when the operation is complete, and `Ok(false)` when the operation is in progress +pub async fn async_execute_with_deadline(deadline: Instant, mut func: F) -> Result<(), Error> +where + F: FnMut(Duration) -> Fut + Send, + Fut: std::future::Future> + Send, +{ + loop { + let now = Instant::now(); + let remaining_time = deadline - now; + + if remaining_time <= Duration::ZERO { + return Err(Error::Timeout); + } + + // no tokio timeout + todo!() + } + + Ok(()) +} + +/// Reads the head of HTTP response from `reader`. +/// +/// Reads from `reader` (line by line) until a blank line is identified, +/// which indicates that all meta-information has been read, +pub async fn read_head(reader: &mut B) -> Vec +where + B: AsyncBufRead + Unpin, +{ + let mut buf = Vec::with_capacity(BUF_SIZE); + + loop { + match reader.read_until(LF, &mut buf).await { + Ok(0) | Err(_) => break, + Ok(len) => { + let full_len = buf.len(); + + if len == 2 && &buf[full_len - 2..] == CR_LF { + break; + } + } + } + } + + buf +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 77ad98e..c46ac4a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,6 +21,7 @@ pub mod chunked; pub mod error; pub mod request; pub mod response; +pub mod async_stream; pub mod stream; pub mod tls; pub mod uri; diff --git a/src/request.rs b/src/request.rs index 3add606..167712c 100644 --- a/src/request.rs +++ b/src/request.rs @@ -1,19 +1,11 @@ //! creating and sending HTTP requests +use futures_lite::{AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt}; + use crate::{ - chunked::ChunkReader, - error, - response::{Headers, Response}, - stream::{Stream, ThreadReceive, ThreadSend}, - uri::Uri, + async_stream::{AsyncStream, AsyncThreadReceive, AsyncThreadSend}, chunked::ChunkReader, error, response::{Headers, Response}, stream::{Stream, ThreadReceive, ThreadSend}, uri::Uri }; use std::{ - convert::TryFrom, - fmt, - io::{BufReader, Write}, - path::Path, - sync::mpsc, - thread, - time::{Duration, Instant}, + convert::TryFrom, fmt, io::{BufReader, Write}, path::Path, sync::mpsc, thread, time::{Duration, Instant} }; const CR_LF: &str = "\r\n"; @@ -703,6 +695,73 @@ impl<'a> Request<'a> { Ok(response) } + + pub async fn async_send(&mut self, writer: &mut T) -> Result + where + T: AsyncWrite + AsyncRead + AsyncThreadReceive + AsyncThreadSend, + { + // Set up a stream. + let mut stream = AsyncStream::connect(self.messsage.uri, self.connect_timeout).await?; + stream.set_read_timeout(self.read_timeout)?; + stream.set_write_timeout(self.write_timeout)?; + stream = AsyncStream::try_to_https(stream, self.messsage.uri, self.root_cert_file_pem).await?; + + // Send the request message to stream. + let request_msg = self.messsage.parse(); + stream.write_all(&request_msg).await?; + + // Set up variables + let deadline = Instant::now() + self.timeout; + let (sender, receiver) = mpsc::channel(); + let (sender_supp, receiver_supp) = mpsc::channel(); + let mut raw_response_head: Vec = Vec::new(); + let mut buf_reader = futures_lite::io::BufReader::new(stream); + + // Read from the stream and send over data via `sender`. + buf_reader.async_send_head(&sender).await; + + let params: Vec<&str> = receiver_supp.recv().unwrap_or(Vec::new()); + if params.contains(&"non-empty") { + if params.contains(&"chunked") { + // TODO: async chunked reader + todo!() + } else { + buf_reader.async_send_all(&sender); + } + } + + // Receive and process `head` of the response. + raw_response_head.receive(&receiver, deadline)?; + let response = Response::from_head(&raw_response_head)?; + + if response.status_code().is_redirect() && self.redirect_policy.follow() { + if let Some(location) = response.headers().get("Location") { + let mut raw_uri = location.to_string(); + let uri = if Uri::is_relative(&raw_uri) { + self.messsage.uri.from_relative(&mut raw_uri) + } else { + Uri::try_from(raw_uri.as_str()) + }?; + + // Use `Box::pin` here to handle recursion + return Box::pin(Request::new(&uri) + .redirect_policy(self.redirect_policy) + .async_send(writer)) + .await; + } + } + + let params = response.basic_info(&self.messsage.method).to_vec(); + sender_supp.send(params)?; + + // Receive and process `body` of the response. + let content_len = response.content_len().unwrap_or(1); + if content_len > 0 { + writer.async_receive_all(&receiver, deadline).await?; + } + + Ok(response) + } } /// Creates and sends GET request. Returns response for this request. diff --git a/src/tls.rs b/src/tls.rs index edb5732..720674a 100644 --- a/src/tls.rs +++ b/src/tls.rs @@ -10,6 +10,7 @@ use std::{ #[cfg(feature = "native-tls")] use std::io::prelude::*; +use futures_lite::{AsyncRead, AsyncWrite}; #[cfg(feature = "rust-tls")] use rustls::{ClientConnection, StreamOwned}; #[cfg(feature = "rust-tls")] @@ -30,6 +31,34 @@ pub struct Conn { stream: rustls::StreamOwned, } +/// Wrapper around TLS Stream, depends on selected TLS library: +/// - native_tls: `TlsStream` +/// - rustls: `StreamOwned` +#[derive(Debug)] +pub struct AsyncConn { + #[cfg(feature = "native-tls")] + stream: native_tls::TlsStream, + + #[cfg(feature = "rust-tls")] + stream: rustls::StreamOwned, +} + +impl AsyncConn +where + S: AsyncRead + AsyncWrite +{ + /// Returns a reference to the underlying socket. + pub fn get_ref(&self) -> &S { + self.stream.get_ref() + } + + /// Returns a mutable reference to the underlying socket. + pub fn get_mut(&mut self) -> &mut S { + self.stream.get_mut() + } +} + + impl Conn where S: io::Read + io::Write, @@ -154,6 +183,16 @@ impl Config { Ok(Conn { stream }) } + /// Establishes a secure connection. + #[cfg(feature = "native-tls")] + pub async fn async_connect(&self, hostname: H, stream: S) -> Result, HttpError> + where + H: AsRef, + S: AsyncRead + AsyncWrite, + { + todo!() + } + /// Adds root certificates (X.509) from a PEM file. #[cfg(feature = "rust-tls")] pub fn add_root_cert_file_pem(&mut self, file_path: &Path) -> Result<&mut Self, HttpError> {