From 64bd35f406f9ac4e46db4ae52bd34908095ab1a7 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Fri, 23 Feb 2024 03:55:46 +0800 Subject: [PATCH] relax stream impl for boxed body. (#953) --- http/CHANGES.md | 1 + http/src/body.rs | 72 +++++++++++++++++--------------- http/src/util/service/handler.rs | 2 +- 3 files changed, 40 insertions(+), 35 deletions(-) diff --git a/http/CHANGES.md b/http/CHANGES.md index 664f5660..de7609f6 100644 --- a/http/CHANGES.md +++ b/http/CHANGES.md @@ -1,6 +1,7 @@ # unreleased 0.4.0 ## Change - `body::Either` doesn't expose it's enum variants in public API anymore. +- relax `Stream::Item` associated type when impl on `body::BoxBody::new` and `body::ResponseBody::boxed_stream` types. Instead of requiring the stream to yield `Ok` it now accepts types `Ok>`. # 0.3.0 ## Add diff --git a/http/src/body.rs b/http/src/body.rs index 69cab77d..f264750f 100644 --- a/http/src/body.rs +++ b/http/src/body.rs @@ -242,18 +242,45 @@ pub struct BoxBody(LocalBoxStream<'static, Result>); impl Default for BoxBody { fn default() -> Self { - Self::new(NoneBody::default()) + Self::new(NoneBody::::default()) } } impl BoxBody { #[inline] - pub fn new(body: B) -> Self + pub fn new(body: B) -> Self where - B: Stream> + 'static, + B: Stream> + 'static, + T: Into, E: Into, { - Self(Box::pin(BoxStreamMapErr { body })) + pin_project! { + struct MapStream { + #[pin] + body: B + } + } + + impl Stream for MapStream + where + B: Stream>, + T: Into, + E: Into, + { + type Item = Result; + + #[inline] + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().body.poll_next(cx).map_ok(Into::into).map_err(Into::into) + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + self.body.size_hint() + } + } + + Self(Box::pin(MapStream { body })) } } @@ -271,31 +298,6 @@ impl Stream for BoxBody { } } -pin_project! { - struct BoxStreamMapErr { - #[pin] - body: B - } -} - -impl Stream for BoxStreamMapErr -where - B: Stream>, - E: Into, -{ - type Item = Result; - - #[inline] - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().body.poll_next(cx).map_err(Into::into) - } - - #[inline] - fn size_hint(&self) -> (usize, Option) { - self.body.size_hint() - } -} - pin_project! { /// A unified response body type. /// Generic type is for custom pinned response body(type implement [Stream](futures_core::Stream)). @@ -329,9 +331,10 @@ impl Default for ResponseBody { impl ResponseBody { /// Construct a new Stream variant of ResponseBody with default type as [BoxBody] #[inline] - pub fn box_stream(stream: B) -> Self + pub fn box_stream(stream: B) -> Self where - B: Stream> + 'static, + B: Stream> + 'static, + T: Into, E: Into, { Self::stream(BoxBody::new(stream)) @@ -361,7 +364,7 @@ impl ResponseBody { /// Construct a new Stream variant of ResponseBody #[inline] - pub fn stream(stream: B) -> Self { + pub const fn stream(stream: B) -> Self { Self { inner: ResponseBodyInner::Stream { stream }, } @@ -382,9 +385,10 @@ impl ResponseBody { /// erase generic body type by boxing the variant. #[inline] - pub fn into_boxed(self) -> ResponseBody + pub fn into_boxed(self) -> ResponseBody where - B: Stream> + 'static, + B: Stream> + 'static, + T: Into, E: error::Error + Send + Sync + 'static, { match self.inner { diff --git a/http/src/util/service/handler.rs b/http/src/util/service/handler.rs index 984c1d7c..c8f9cd51 100644 --- a/http/src/util/service/handler.rs +++ b/http/src/util/service/handler.rs @@ -227,7 +227,7 @@ where async fn respond(self, req: R) -> Result { match self { Self::First(f) => f.respond(req).await, - Self::Second(s) => Ok(s.respond(req).await?), + Self::Second(s) => s.respond(req).await.map_err(From::from), } } }