diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a41e51..28e27df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Unreleased -Nothing. +- Add `Full` which is a `Body` that consists of a single chunk. +- Add `AsyncReadBody` which turns an `AsyncRead` into a `Body`. # 0.4.1 (March 18, 2021) diff --git a/Cargo.toml b/Cargo.toml index ef1b598..030bc1f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,8 @@ categories = ["web-programming"] bytes = "1" http = "0.2" pin-project-lite = "0.2" +tokio = "1" +tokio-util = { version = "0.6", features = ["io"] } [dev-dependencies] -tokio = { version = "1", features = ["macros", "rt"] } +tokio = { version = "1", features = ["macros", "rt", "fs"] } diff --git a/src/adapters/async_read.rs b/src/adapters/async_read.rs new file mode 100644 index 0000000..353ab10 --- /dev/null +++ b/src/adapters/async_read.rs @@ -0,0 +1,132 @@ +use crate::Body; +use bytes::{Bytes, BytesMut}; +use pin_project_lite::pin_project; +use std::{ + io, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::io::AsyncRead; +use tokio_util::io::poll_read_buf; + +pin_project! { + /// Adapter that converts an [`AsyncRead`] into a [`Body`]. + /// + /// [`AsyncRead`]: tokio::io::AsyncRead; + #[derive(Debug, Clone)] + pub struct AsyncReadBody { + #[pin] + inner: R, + buf: BytesMut, + } +} + +impl AsyncReadBody { + /// Create a new `AsyncReadBody`. + pub fn new(read: R) -> Self + where + R: AsyncRead, + { + Self::new_with_buffer_size(read, 1024) + } + + /// Create a new `AsyncReadBody` using `capacity` as the initial capacity of the internal + /// buffer. + pub fn new_with_buffer_size(read: R, capacity: usize) -> Self + where + R: AsyncRead, + { + Self { + inner: read, + buf: BytesMut::with_capacity(capacity), + } + } + + /// Get a reference to the inner value. + pub fn get_ref(&self) -> &R { + &self.inner + } + + /// Get a mutable reference to the inner value. + pub fn get_mut(&mut self) -> &mut R { + &mut self.inner + } + + /// Get a pinned mutable reference to the inner value. + pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { + self.project().inner + } + + /// Consumes `self`, returning the inner value. + pub fn into_inner(self) -> R { + self.inner + } +} + +impl Body for AsyncReadBody +where + R: AsyncRead, +{ + type Data = Bytes; + type Error = io::Error; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let mut this = self.project(); + + this.buf.clear(); + + let bytes_read = match poll_read_buf(this.inner, cx, &mut this.buf) { + Poll::Ready(bytes_read) => bytes_read?, + Poll::Pending => return Poll::Pending, + }; + + if bytes_read == 0 { + Poll::Ready(None) + } else { + Poll::Ready(Some(Ok(this.buf.clone().freeze()))) + } + } + + #[inline] + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + Poll::Ready(Ok(None)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn works() { + let read = tokio::fs::File::open("Cargo.toml").await.unwrap(); + let body = AsyncReadBody::new(read); + + let bytes = to_bytes(body).await.unwrap(); + let s = String::from_utf8(bytes.to_vec()).unwrap(); + assert!(s.contains("name = \"http-body\"")); + } + + async fn to_bytes(mut body: B) -> Result + where + B: Body + Unpin, + { + let mut buf = BytesMut::new(); + + loop { + let chunk = body.data().await.transpose()?; + + if let Some(chunk) = chunk { + buf.extend(&chunk[..]); + } else { + return Ok(buf.freeze()); + } + } + } +} diff --git a/src/adapters/mod.rs b/src/adapters/mod.rs new file mode 100644 index 0000000..6bb907c --- /dev/null +++ b/src/adapters/mod.rs @@ -0,0 +1,7 @@ +//! Adapters for turning various types into [`Body`]s. +//! +//! [`Body`]: crate::Body + +mod async_read; + +pub use async_read::AsyncReadBody; diff --git a/src/lib.rs b/src/lib.rs index 28e4763..f7149c5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,6 +18,7 @@ mod full; mod next; mod size_hint; +pub mod adapters; pub mod combinators; pub use self::empty::Empty;