Skip to content

Commit

Permalink
relax stream impl for boxed body. (#953)
Browse files Browse the repository at this point in the history
  • Loading branch information
fakeshadow authored Feb 22, 2024
1 parent 89f7597 commit 64bd35f
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 35 deletions.
1 change: 1 addition & 0 deletions http/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# unreleased 0.4.0
## Change
- `body::Either` doesn't expose it's enum variants in public API anymore.
- relax `Stream::Item` associated type when impl on `body::BoxBody::new` and `body::ResponseBody::boxed_stream` types. Instead of requiring the stream to yield `Ok<Bytes>` it now accepts types `Ok<impl Into<Bytes>>`.

# 0.3.0
## Add
Expand Down
72 changes: 38 additions & 34 deletions http/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,18 +242,45 @@ pub struct BoxBody(LocalBoxStream<'static, Result<Bytes, BodyError>>);

impl Default for BoxBody {
fn default() -> Self {
Self::new(NoneBody::default())
Self::new(NoneBody::<Bytes>::default())
}
}

impl BoxBody {
#[inline]
pub fn new<B, E>(body: B) -> Self
pub fn new<B, T, E>(body: B) -> Self
where
B: Stream<Item = Result<Bytes, E>> + 'static,
B: Stream<Item = Result<T, E>> + 'static,
T: Into<Bytes>,
E: Into<BodyError>,
{
Self(Box::pin(BoxStreamMapErr { body }))
pin_project! {
struct MapStream<B> {
#[pin]
body: B
}
}

impl<B, T, E> Stream for MapStream<B>
where
B: Stream<Item = Result<T, E>>,
T: Into<Bytes>,
E: Into<BodyError>,
{
type Item = Result<Bytes, BodyError>;

#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().body.poll_next(cx).map_ok(Into::into).map_err(Into::into)
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.body.size_hint()
}
}

Self(Box::pin(MapStream { body }))
}
}

Expand All @@ -271,31 +298,6 @@ impl Stream for BoxBody {
}
}

pin_project! {
struct BoxStreamMapErr<B> {
#[pin]
body: B
}
}

impl<B, T, E> Stream for BoxStreamMapErr<B>
where
B: Stream<Item = Result<T, E>>,
E: Into<BodyError>,
{
type Item = Result<T, BodyError>;

#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().body.poll_next(cx).map_err(Into::into)
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.body.size_hint()
}
}

pin_project! {
/// A unified response body type.
/// Generic type is for custom pinned response body(type implement [Stream](futures_core::Stream)).
Expand Down Expand Up @@ -329,9 +331,10 @@ impl<B> Default for ResponseBody<B> {
impl ResponseBody {
/// Construct a new Stream variant of ResponseBody with default type as [BoxBody]
#[inline]
pub fn box_stream<B, E>(stream: B) -> Self
pub fn box_stream<B, T, E>(stream: B) -> Self
where
B: Stream<Item = Result<Bytes, E>> + 'static,
B: Stream<Item = Result<T, E>> + 'static,
T: Into<Bytes>,
E: Into<BodyError>,
{
Self::stream(BoxBody::new(stream))
Expand Down Expand Up @@ -361,7 +364,7 @@ impl<B> ResponseBody<B> {

/// Construct a new Stream variant of ResponseBody
#[inline]
pub fn stream(stream: B) -> Self {
pub const fn stream(stream: B) -> Self {
Self {
inner: ResponseBodyInner::Stream { stream },
}
Expand All @@ -382,9 +385,10 @@ impl<B> ResponseBody<B> {

/// erase generic body type by boxing the variant.
#[inline]
pub fn into_boxed<E>(self) -> ResponseBody
pub fn into_boxed<T, E>(self) -> ResponseBody
where
B: Stream<Item = Result<Bytes, E>> + 'static,
B: Stream<Item = Result<T, E>> + 'static,
T: Into<Bytes>,
E: error::Error + Send + Sync + 'static,
{
match self.inner {
Expand Down
2 changes: 1 addition & 1 deletion http/src/util/service/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ where
async fn respond(self, req: R) -> Result<Self::Response, Self::Error> {
match self {
Self::First(f) => f.respond(req).await,
Self::Second(s) => Ok(s.respond(req).await?),
Self::Second(s) => s.respond(req).await.map_err(From::from),
}
}
}
Expand Down

0 comments on commit 64bd35f

Please sign in to comment.