From b5a2248e8aa0a9865f1127af74869cebbad21222 Mon Sep 17 00:00:00 2001 From: tottoto Date: Wed, 20 Mar 2024 20:23:47 +0900 Subject: [PATCH] refactor(body): move channel h2 incoming body implementation to H2Body type --- src/body/incoming/h2.rs | 88 ++++++++++++++++++++++++++++++++++++++++ src/body/incoming/mod.rs | 77 ++++++----------------------------- 2 files changed, 100 insertions(+), 65 deletions(-) create mode 100644 src/body/incoming/h2.rs diff --git a/src/body/incoming/h2.rs b/src/body/incoming/h2.rs new file mode 100644 index 0000000000..767441916b --- /dev/null +++ b/src/body/incoming/h2.rs @@ -0,0 +1,88 @@ +use std::task::{Context, Poll}; + +use bytes::Bytes; +use futures_util::ready; +use http_body::{Frame, SizeHint}; + +use crate::body::DecodedLength; +use crate::proto::h2::ping; + +pub(super) struct H2Body { + content_length: DecodedLength, + data_done: bool, + ping: ping::Recorder, + recv: h2::RecvStream, +} + +impl H2Body { + pub(super) fn new( + recv: h2::RecvStream, + mut content_length: DecodedLength, + ping: ping::Recorder, + ) -> Self { + // If the stream is already EOS, then the "unknown length" is clearly + // actually ZERO. + if !content_length.is_exact() && recv.is_end_stream() { + content_length = DecodedLength::ZERO; + } + + Self { + data_done: false, + ping, + content_length, + recv, + } + } + + pub(super) fn poll_frame( + &mut self, + cx: &mut Context<'_>, + ) -> Poll, crate::Error>>> { + let Self { + ref mut data_done, + ref ping, + recv: ref mut h2, + content_length: ref mut len, + } = self; + + if !*data_done { + match ready!(h2.poll_data(cx)) { + Some(Ok(bytes)) => { + let _ = h2.flow_control().release_capacity(bytes.len()); + len.sub_if(bytes.len() as u64); + ping.record_data(bytes.len()); + return Poll::Ready(Some(Ok(Frame::data(bytes)))); + } + Some(Err(e)) => { + return match e.reason() { + // These reasons should cause the body reading to stop, but not fail it. + // The same logic as for `Read for H2Upgraded` is applied here. + Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => Poll::Ready(None), + _ => Poll::Ready(Some(Err(crate::Error::new_body(e)))), + }; + } + None => { + *data_done = true; + // fall through to trailers + } + } + } + + // after data, check trailers + match ready!(h2.poll_trailers(cx)) { + Ok(t) => { + ping.record_non_data(); + Poll::Ready(Ok(t.map(Frame::trailers)).transpose()) + } + Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))), + } + } + + pub(super) fn is_end_stream(&self) -> bool { + self.recv.is_end_stream() + } + + pub(super) fn size_hint(&self) -> SizeHint { + super::opt_len(self.content_length) + } +} diff --git a/src/body/incoming/mod.rs b/src/body/incoming/mod.rs index 5d9defb20c..ec8c75f3f1 100644 --- a/src/body/incoming/mod.rs +++ b/src/body/incoming/mod.rs @@ -1,13 +1,13 @@ #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] mod channel; +#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] +mod h2; use std::fmt; use std::pin::Pin; use std::task::{Context, Poll}; use bytes::Bytes; -#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] -use futures_util::ready; use http_body::{Body, Frame, SizeHint}; #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] @@ -15,6 +15,9 @@ use self::channel::ChanBody; #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] pub(crate) use self::channel::Sender; +#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] +use self::h2::H2Body; + #[cfg(all( any(feature = "http1", feature = "http2"), any(feature = "client", feature = "server") @@ -48,12 +51,7 @@ enum Kind { #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] Chan(ChanBody), #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] - H2 { - content_length: DecodedLength, - data_done: bool, - ping: ping::Recorder, - recv: h2::RecvStream, - }, + H2(H2Body), #[cfg(feature = "ffi")] Ffi(crate::ffi::UserBody), } @@ -81,22 +79,11 @@ impl Incoming { #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] pub(crate) fn h2( - recv: h2::RecvStream, - mut content_length: DecodedLength, + recv: ::h2::RecvStream, + content_length: DecodedLength, ping: ping::Recorder, ) -> Self { - // If the stream is already EOS, then the "unknown length" is clearly - // actually ZERO. - if !content_length.is_exact() && recv.is_end_stream() { - content_length = DecodedLength::ZERO; - } - - Incoming::new(Kind::H2 { - data_done: false, - ping, - content_length, - recv, - }) + Incoming::new(Kind::H2(H2Body::new(recv, content_length, ping))) } #[cfg(feature = "ffi")] @@ -142,47 +129,7 @@ impl Body for Incoming { #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] Kind::Chan(ref mut body) => body.poll_frame(cx), #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] - Kind::H2 { - ref mut data_done, - ref ping, - recv: ref mut h2, - content_length: ref mut len, - } => { - if !*data_done { - match ready!(h2.poll_data(cx)) { - Some(Ok(bytes)) => { - let _ = h2.flow_control().release_capacity(bytes.len()); - len.sub_if(bytes.len() as u64); - ping.record_data(bytes.len()); - return Poll::Ready(Some(Ok(Frame::data(bytes)))); - } - Some(Err(e)) => { - return match e.reason() { - // These reasons should cause the body reading to stop, but not fail it. - // The same logic as for `Read for H2Upgraded` is applied here. - Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => { - Poll::Ready(None) - } - _ => Poll::Ready(Some(Err(crate::Error::new_body(e)))), - }; - } - None => { - *data_done = true; - // fall through to trailers - } - } - } - - // after data, check trailers - match ready!(h2.poll_trailers(cx)) { - Ok(t) => { - ping.record_non_data(); - Poll::Ready(Ok(t.map(Frame::trailers)).transpose()) - } - Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))), - } - } - + Kind::H2(ref mut body) => body.poll_frame(cx), #[cfg(feature = "ffi")] Kind::Ffi(ref mut body) => body.poll_data(cx), } @@ -194,7 +141,7 @@ impl Body for Incoming { #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] Kind::Chan(ref body) => body.is_end_stream(), #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] - Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(), + Kind::H2(ref body) => body.is_end_stream(), #[cfg(feature = "ffi")] Kind::Ffi(..) => false, } @@ -206,7 +153,7 @@ impl Body for Incoming { #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] Kind::Chan(ref body) => body.size_hint(), #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] - Kind::H2 { content_length, .. } => opt_len(content_length), + Kind::H2(ref body) => body.size_hint(), #[cfg(feature = "ffi")] Kind::Ffi(..) => SizeHint::default(), }