Skip to content

Commit

Permalink
refactor(body): move channel h2 incoming body implementation to H2Bod…
Browse files Browse the repository at this point in the history
…y type
  • Loading branch information
tottoto committed Mar 20, 2024
1 parent 9872591 commit b5a2248
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 65 deletions.
88 changes: 88 additions & 0 deletions src/body/incoming/h2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::task::{Context, Poll};

use bytes::Bytes;
use futures_util::ready;
use http_body::{Frame, SizeHint};

use crate::body::DecodedLength;
use crate::proto::h2::ping;

pub(super) struct H2Body {
content_length: DecodedLength,
data_done: bool,
ping: ping::Recorder,
recv: h2::RecvStream,
}

impl H2Body {
pub(super) fn new(
recv: h2::RecvStream,
mut content_length: DecodedLength,
ping: ping::Recorder,
) -> Self {
// If the stream is already EOS, then the "unknown length" is clearly
// actually ZERO.
if !content_length.is_exact() && recv.is_end_stream() {
content_length = DecodedLength::ZERO;
}

Self {
data_done: false,
ping,
content_length,
recv,
}
}

pub(super) fn poll_frame(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Bytes>, crate::Error>>> {
let Self {
ref mut data_done,
ref ping,
recv: ref mut h2,
content_length: ref mut len,
} = self;

if !*data_done {
match ready!(h2.poll_data(cx)) {
Some(Ok(bytes)) => {
let _ = h2.flow_control().release_capacity(bytes.len());
len.sub_if(bytes.len() as u64);
ping.record_data(bytes.len());
return Poll::Ready(Some(Ok(Frame::data(bytes))));
}
Some(Err(e)) => {
return match e.reason() {
// These reasons should cause the body reading to stop, but not fail it.
// The same logic as for `Read for H2Upgraded` is applied here.
Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => Poll::Ready(None),
_ => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
};
}
None => {
*data_done = true;
// fall through to trailers
}
}
}

// after data, check trailers
match ready!(h2.poll_trailers(cx)) {
Ok(t) => {
ping.record_non_data();
Poll::Ready(Ok(t.map(Frame::trailers)).transpose())
}
Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))),
}
}

pub(super) fn is_end_stream(&self) -> bool {
self.recv.is_end_stream()
}

pub(super) fn size_hint(&self) -> SizeHint {
super::opt_len(self.content_length)
}
}
77 changes: 12 additions & 65 deletions src/body/incoming/mod.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
mod channel;
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
mod h2;

use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::Bytes;
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
use futures_util::ready;
use http_body::{Body, Frame, SizeHint};

#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
use self::channel::ChanBody;
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
pub(crate) use self::channel::Sender;

#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
use self::h2::H2Body;

#[cfg(all(
any(feature = "http1", feature = "http2"),
any(feature = "client", feature = "server")
Expand Down Expand Up @@ -48,12 +51,7 @@ enum Kind {
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
Chan(ChanBody),
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
H2 {
content_length: DecodedLength,
data_done: bool,
ping: ping::Recorder,
recv: h2::RecvStream,
},
H2(H2Body),
#[cfg(feature = "ffi")]
Ffi(crate::ffi::UserBody),
}
Expand Down Expand Up @@ -81,22 +79,11 @@ impl Incoming {

#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
pub(crate) fn h2(
recv: h2::RecvStream,
mut content_length: DecodedLength,
recv: ::h2::RecvStream,
content_length: DecodedLength,
ping: ping::Recorder,
) -> Self {
// If the stream is already EOS, then the "unknown length" is clearly
// actually ZERO.
if !content_length.is_exact() && recv.is_end_stream() {
content_length = DecodedLength::ZERO;
}

Incoming::new(Kind::H2 {
data_done: false,
ping,
content_length,
recv,
})
Incoming::new(Kind::H2(H2Body::new(recv, content_length, ping)))
}

#[cfg(feature = "ffi")]
Expand Down Expand Up @@ -142,47 +129,7 @@ impl Body for Incoming {
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
Kind::Chan(ref mut body) => body.poll_frame(cx),
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
Kind::H2 {
ref mut data_done,
ref ping,
recv: ref mut h2,
content_length: ref mut len,
} => {
if !*data_done {
match ready!(h2.poll_data(cx)) {
Some(Ok(bytes)) => {
let _ = h2.flow_control().release_capacity(bytes.len());
len.sub_if(bytes.len() as u64);
ping.record_data(bytes.len());
return Poll::Ready(Some(Ok(Frame::data(bytes))));
}
Some(Err(e)) => {
return match e.reason() {
// These reasons should cause the body reading to stop, but not fail it.
// The same logic as for `Read for H2Upgraded` is applied here.
Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => {
Poll::Ready(None)
}
_ => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
};
}
None => {
*data_done = true;
// fall through to trailers
}
}
}

// after data, check trailers
match ready!(h2.poll_trailers(cx)) {
Ok(t) => {
ping.record_non_data();
Poll::Ready(Ok(t.map(Frame::trailers)).transpose())
}
Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))),
}
}

Kind::H2(ref mut body) => body.poll_frame(cx),
#[cfg(feature = "ffi")]
Kind::Ffi(ref mut body) => body.poll_data(cx),
}
Expand All @@ -194,7 +141,7 @@ impl Body for Incoming {
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
Kind::Chan(ref body) => body.is_end_stream(),
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
Kind::H2(ref body) => body.is_end_stream(),
#[cfg(feature = "ffi")]
Kind::Ffi(..) => false,
}
Expand All @@ -206,7 +153,7 @@ impl Body for Incoming {
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
Kind::Chan(ref body) => body.size_hint(),
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
Kind::H2 { content_length, .. } => opt_len(content_length),
Kind::H2(ref body) => body.size_hint(),
#[cfg(feature = "ffi")]
Kind::Ffi(..) => SizeHint::default(),
}
Expand Down

0 comments on commit b5a2248

Please sign in to comment.