From f2d64c5111fd8a2d13fa6b980b7a82bd13b10aa8 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Wed, 24 Jan 2024 01:47:50 +0800 Subject: [PATCH] remove push. (#908) --- http/src/h2/proto/headers.rs | 560 +++++++++++------------------------ http/src/h2/proto/mod.rs | 2 +- 2 files changed, 175 insertions(+), 387 deletions(-) diff --git a/http/src/h2/proto/headers.rs b/http/src/h2/proto/headers.rs index f92cc257..257c96ae 100644 --- a/http/src/h2/proto/headers.rs +++ b/http/src/h2/proto/headers.rs @@ -9,8 +9,8 @@ use xitca_unsafe_collection::bytes::BytesStr; use crate::{ bytes::{buf::Limit, BufMut, Bytes, BytesMut}, http::{ - header::{self, HeaderName, HeaderValue}, - uri, HeaderMap, Method, Request, StatusCode, Uri, + header::{self, HeaderName}, + uri, HeaderMap, Method, StatusCode, Uri, }, }; @@ -30,13 +30,10 @@ type EncodeBuf<'a> = Limit<&'a mut BytesMut>; pub struct Headers

{ /// The ID of the stream with which this frame is associated. stream_id: StreamId, - /// The stream dependency information, if any. stream_dep: Option, - /// The header block fragment header_block: HeaderBlock

, - /// The associated flags flags: HeadersFlag, } @@ -44,29 +41,10 @@ pub struct Headers

{ #[derive(Copy, Clone, Eq, PartialEq)] pub struct HeadersFlag(u8); -#[derive(Eq, PartialEq)] -pub struct PushPromise { - /// The ID of the stream with which this frame is associated. - stream_id: StreamId, - - /// The ID of the stream being reserved by this PushPromise. - promised_id: StreamId, - - /// The header block fragment - header_block: HeaderBlock, - - /// The associated flags - flags: PushPromiseFlag, -} - -#[derive(Copy, Clone, Eq, PartialEq)] -pub struct PushPromiseFlag(u8); - #[derive(Debug)] pub struct Continuation { /// Stream ID of continuation frame stream_id: StreamId, - header_block: EncodingHeaderBlock, } @@ -90,23 +68,12 @@ pub struct ResponsePseudo { status: StatusCode, } -#[derive(Debug)] -pub struct Iter { - /// Pseudo headers - pseudo: Option, - - /// Header fields - fields: header::IntoIter, -} - #[derive(Debug, PartialEq, Eq)] struct HeaderBlock

{ /// The decoded header fields fields: HeaderMap, - /// Set to true if decoding went over the max header list size. is_over_size: bool, - /// Pseudo headers, these are broken out as they must be sent as part of the /// headers frame. pseudo: P, @@ -123,9 +90,12 @@ const PADDED: u8 = 0x8; const PRIORITY: u8 = 0x20; const ALL: u8 = END_STREAM | END_HEADERS | PADDED | PRIORITY; -impl Headers { +impl

Headers

+where + P: _Pseudo + Default, +{ /// Create a new HEADERS frame - pub fn request(stream_id: StreamId, pseudo: Pseudo, fields: HeaderMap) -> Self { + pub fn new(stream_id: StreamId, pseudo: P, fields: HeaderMap) -> Self { Self { stream_id, stream_dep: None, @@ -146,49 +116,7 @@ impl Headers { ) -> Result<(), Error> { self.header_block.load(src, max_header_list_size, decoder) } -} - -impl Headers { - pub fn response(stream_id: StreamId, pseudo: ResponsePseudo, fields: HeaderMap) -> Self { - Self { - stream_id, - stream_dep: None, - header_block: HeaderBlock { - fields, - is_over_size: false, - pseudo, - }, - flags: HeadersFlag::default(), - } - } - - /// Whether it has status 1xx - pub(crate) fn is_informational(&self) -> bool { - self.header_block.pseudo.is_informational() - } -} - -impl Headers<()> { - pub fn trailers(stream_id: StreamId, fields: HeaderMap) -> Self { - let mut flags = HeadersFlag::default(); - flags.set_end_stream(); - Self { - stream_id, - stream_dep: None, - header_block: HeaderBlock { - fields, - is_over_size: false, - pseudo: (), - }, - flags, - } - } -} -impl

Headers

-where - P: IntoHeaderIter + Default, -{ /// Loads the header frame but doesn't actually do HPACK decoding. /// /// HPACK decoding is done in the `load_hpack` step. @@ -316,7 +244,31 @@ where } } -impl fmt::Debug for Headers { +impl Headers { + /// Whether it has status 1xx + pub(crate) fn is_informational(&self) -> bool { + self.header_block.pseudo.status.is_informational() + } +} + +impl Headers<()> { + pub fn trailers(stream_id: StreamId, fields: HeaderMap) -> Self { + let mut flags = HeadersFlag::default(); + flags.set_end_stream(); + Self { + stream_id, + stream_dep: None, + header_block: HeaderBlock { + fields, + is_over_size: false, + pseudo: (), + }, + flags, + } + } +} + +impl

fmt::Debug for Headers

{ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let mut builder = f.debug_struct("Headers"); builder.field("stream_id", &self.stream_id).field("flags", &self.flags); @@ -356,185 +308,6 @@ pub fn parse_u64(src: &[u8]) -> Result { Ok(ret) } -// ===== impl PushPromise ===== - -#[derive(Debug)] -pub enum PushPromiseHeaderError { - InvalidContentLength(Result), - NotSafeAndCacheable, -} - -impl PushPromise { - pub fn new(stream_id: StreamId, promised_id: StreamId, pseudo: Pseudo, fields: HeaderMap) -> Self { - PushPromise { - flags: PushPromiseFlag::default(), - header_block: HeaderBlock { - fields, - is_over_size: false, - pseudo, - }, - promised_id, - stream_id, - } - } - - pub fn validate_request(req: &Request<()>) -> Result<(), PushPromiseHeaderError> { - use PushPromiseHeaderError::*; - // The spec has some requirements for promised request headers - // [https://httpwg.org/specs/rfc7540.html#PushRequests] - - // A promised request "that indicates the presence of a request body - // MUST reset the promised stream with a stream error" - if let Some(content_length) = req.headers().get(header::CONTENT_LENGTH) { - let parsed_length = parse_u64(content_length.as_bytes()); - if parsed_length != Ok(0) { - return Err(InvalidContentLength(parsed_length)); - } - } - // "The server MUST include a method in the :method pseudo-header field - // that is safe and cacheable" - if !Self::safe_and_cacheable(req.method()) { - return Err(NotSafeAndCacheable); - } - - Ok(()) - } - - fn safe_and_cacheable(method: &Method) -> bool { - // Cacheable: https://httpwg.org/specs/rfc7231.html#cacheable.methods - // Safe: https://httpwg.org/specs/rfc7231.html#safe.methods - method == Method::GET || method == Method::HEAD - } - - pub fn fields(&self) -> &HeaderMap { - &self.header_block.fields - } - - #[cfg(feature = "unstable")] - pub fn into_fields(self) -> HeaderMap { - self.header_block.fields - } - - /// Loads the push promise frame but doesn't actually do HPACK decoding. - /// - /// HPACK decoding is done in the `load_hpack` step. - pub fn load(head: Head, mut src: BytesMut) -> Result<(Self, BytesMut), Error> { - let flags = PushPromiseFlag(head.flag()); - let mut pad = 0; - - if head.stream_id().is_zero() { - todo!() - // return Err(Error::InvalidStreamId); - } - - // Read the padding length - if flags.is_padded() { - if src.is_empty() { - todo!() - // return Err(Error::MalformedMessage); - } - - // TODO: Ensure payload is sized correctly - pad = src[0] as usize; - - // Drop the padding - let _ = src.split_to(1); - } - - if src.len() < 5 { - todo!() - // return Err(Error::MalformedMessage); - } - - let (promised_id, _) = StreamId::parse(&src[..4]); - // Drop promised_id bytes - let _ = src.split_to(4); - - if pad > 0 { - if pad > src.len() { - todo!() - // return Err(Error::TooMuchPadding); - } - - let len = src.len() - pad; - src.truncate(len); - } - - let frame = PushPromise { - flags, - header_block: HeaderBlock { - fields: HeaderMap::new(), - is_over_size: false, - pseudo: Pseudo::default(), - }, - promised_id, - stream_id: head.stream_id(), - }; - Ok((frame, src)) - } - - pub fn load_hpack( - &mut self, - src: &mut BytesMut, - max_header_list_size: usize, - decoder: &mut hpack::Decoder, - ) -> Result<(), Error> { - self.header_block.load(src, max_header_list_size, decoder) - } - - pub fn stream_id(&self) -> StreamId { - self.stream_id - } - - pub fn promised_id(&self) -> StreamId { - self.promised_id - } - - pub fn is_end_headers(&self) -> bool { - self.flags.is_end_headers() - } - - pub fn set_end_headers(&mut self) { - self.flags.set_end_headers(); - } - - pub fn is_over_size(&self) -> bool { - self.header_block.is_over_size - } - - pub fn encode(self, encoder: &mut hpack::Encoder, dst: &mut EncodeBuf<'_>) -> Option { - // At this point, the `is_end_headers` flag should always be set - debug_assert!(self.flags.is_end_headers()); - - let head = self.head(); - let promised_id = self.promised_id; - - self.header_block.into_encoding(encoder).encode(&head, dst, |dst| { - dst.put_u32(promised_id.into()); - }) - } - - fn head(&self) -> Head { - Head::new(Kind::PushPromise, self.flags.into(), self.stream_id) - } - - /// Consume `self`, returning the parts of the frame - pub fn into_parts(self) -> (Pseudo, HeaderMap) { - (self.header_block.pseudo, self.header_block.fields) - } -} - -impl fmt::Debug for PushPromise { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("PushPromise") - .field("stream_id", &self.stream_id) - .field("promised_id", &self.promised_id) - .field("flags", &self.flags) - // `fields` and `pseudo` purposefully not included - .finish() - } -} - // ===== impl Continuation ===== impl Continuation { @@ -617,15 +390,6 @@ impl Pseudo { } } -impl ResponsePseudo { - /// Whether it has status 1xx - pub(crate) fn is_informational(&self) -> bool { - self.status.is_informational() - } -} - -// ===== impl EncodingHeaderBlock ===== - impl EncodingHeaderBlock { fn encode(mut self, head: &Head, dst: &mut EncodeBuf<'_>, f: F) -> Option where @@ -676,8 +440,6 @@ impl EncodingHeaderBlock { } } -// ===== impl HeadersFlag ===== - impl HeadersFlag { pub fn empty() -> HeadersFlag { HeadersFlag(0) @@ -737,56 +499,10 @@ impl fmt::Debug for HeadersFlag { } } -// ===== impl PushPromiseFlag ===== - -impl PushPromiseFlag { - pub fn empty() -> PushPromiseFlag { - PushPromiseFlag(0) - } - - pub fn load(bits: u8) -> PushPromiseFlag { - PushPromiseFlag(bits & ALL) - } - - pub fn is_end_headers(&self) -> bool { - self.0 & END_HEADERS == END_HEADERS - } - - pub fn set_end_headers(&mut self) { - self.0 |= END_HEADERS; - } - - pub fn is_padded(&self) -> bool { - self.0 & PADDED == PADDED - } -} - -impl Default for PushPromiseFlag { - /// Returns a `PushPromiseFlag` value with `END_HEADERS` set. - fn default() -> Self { - PushPromiseFlag(END_HEADERS) - } -} - -impl From for u8 { - fn from(src: PushPromiseFlag) -> u8 { - src.0 - } -} - -impl fmt::Debug for PushPromiseFlag { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - // util::debug_flags(fmt, self.0) - // .flag_if(self.is_end_headers(), "END_HEADERS") - // .flag_if(self.is_padded(), "PADDED") - // .finish() - todo!() - } -} - -// ===== HeaderBlock ===== - -impl HeaderBlock { +impl

HeaderBlock

+where + P: _Pseudo, +{ fn load( &mut self, src: &mut BytesMut, @@ -797,27 +513,6 @@ impl HeaderBlock { let mut malformed = false; let mut headers_size = self.calculate_header_list_size(); - macro_rules! set_pseudo { - ($field:ident, $val:expr) => {{ - if reg { - tracing::trace!("load_hpack; header malformed -- pseudo not at head of block"); - malformed = true; - } else if self.pseudo.$field.is_some() { - tracing::trace!("load_hpack; header malformed -- repeated pseudo"); - malformed = true; - } else { - let __val = $val; - headers_size += decoded_header_size(stringify!($field).len() + 1, __val.as_str().len()); - if headers_size < max_header_list_size { - self.pseudo.$field = Some(__val); - } else if !self.is_over_size { - tracing::trace!("load_hpack; header list size over max"); - self.is_over_size = true; - } - } - }}; - } - let mut cursor = Cursor::new(src); // If the header frame is malformed, we still have to continue decoding @@ -855,15 +550,14 @@ impl HeaderBlock { } } } - Authority(v) => set_pseudo!(authority, v), - Method(v) => set_pseudo!(method, v), - Scheme(v) => set_pseudo!(scheme, v), - Path(v) => set_pseudo!(path, v), - Protocol(v) => { - todo!() - // set_pseudo!(protocol, v) - } - Status(v) => set_pseudo!(status, v), + header => self.pseudo.from_header( + header, + max_header_list_size, + &mut self.is_over_size, + reg, + &mut malformed, + &mut headers_size, + ), } }); @@ -880,6 +574,18 @@ impl HeaderBlock { Ok(()) } + fn into_encoding(self, encoder: &mut hpack::Encoder) -> EncodingHeaderBlock { + let mut hpack = BytesMut::new(); + let headers = self.pseudo.into_iter().chain( + self.fields + .into_iter() + .map(|(name, value)| super::hpack::Header::Field { name, value }), + ); + encoder.encode(headers, &mut hpack); + + EncodingHeaderBlock { hpack: hpack.freeze() } + } + /// Calculates the size of the currently decoded header list. /// /// According to http://httpwg.org/specs/rfc7540.html#SETTINGS_MAX_HEADER_LIST_SIZE @@ -888,21 +594,7 @@ impl HeaderBlock { /// > including the length of the name and value in octets plus an /// > overhead of 32 octets for each header field. fn calculate_header_list_size(&self) -> usize { - macro_rules! pseudo_size { - ($name:ident) => {{ - self.pseudo - .$name - .as_ref() - .map(|m| decoded_header_size(stringify!($name).len() + 1, m.as_str().len())) - .unwrap_or(0) - }}; - } - - pseudo_size!(method) - + pseudo_size!(scheme) - + pseudo_size!(status) - + pseudo_size!(authority) - + pseudo_size!(path) + self.pseudo.as_header_size() + self .fields .iter() @@ -911,28 +603,23 @@ impl HeaderBlock { } } -impl

HeaderBlock

-where - P: IntoHeaderIter, -{ - fn into_encoding(self, encoder: &mut hpack::Encoder) -> EncodingHeaderBlock { - let mut hpack = BytesMut::new(); - let headers = self.pseudo.into_iter().chain( - self.fields - .into_iter() - .map(|(name, value)| super::hpack::Header::Field { name, value }), - ); - encoder.encode(headers, &mut hpack); +pub trait _Pseudo { + fn into_iter(self) -> impl Iterator>> + Send; - EncodingHeaderBlock { hpack: hpack.freeze() } - } -} + fn from_header( + &mut self, + header: super::hpack::Header, + max_header_list_size: usize, + is_over_size: &mut bool, + reg: bool, + malformed: &mut bool, + headers_size: &mut usize, + ); -pub trait IntoHeaderIter { - fn into_iter(self) -> impl Iterator>> + Send; + fn as_header_size(&self) -> usize; } -impl IntoHeaderIter for Pseudo { +impl _Pseudo for Pseudo { fn into_iter(self) -> impl Iterator>> { struct Iter(Pseudo); @@ -972,24 +659,125 @@ impl IntoHeaderIter for Pseudo { Iter(self) } + + fn from_header( + &mut self, + header: super::hpack::Header, + max_header_list_size: usize, + is_over_size: &mut bool, + reg: bool, + malformed: &mut bool, + headers_size: &mut usize, + ) { + use super::hpack::Header::*; + + macro_rules! set_pseudo { + ($field:ident, $val:expr) => {{ + if reg { + tracing::trace!("load_hpack; header malformed -- pseudo not at head of block"); + *malformed = true; + } else if self.$field.is_some() { + tracing::trace!("load_hpack; header malformed -- repeated pseudo"); + *malformed = true; + } else { + let __val = $val; + *headers_size += decoded_header_size(stringify!($field).len() + 1, __val.as_str().len()); + if *headers_size < max_header_list_size { + self.$field = Some(__val); + } else if !*is_over_size { + tracing::trace!("load_hpack; header list size over max"); + *is_over_size = true; + } + } + }}; + } + + match header { + Authority(v) => set_pseudo!(authority, v), + Method(v) => set_pseudo!(method, v), + Scheme(v) => set_pseudo!(scheme, v), + Path(v) => set_pseudo!(path, v), + Protocol(v) => { + // set_pseudo!(protocol, v) + } + Status(v) => set_pseudo!(status, v), + _ => unreachable!(), + } + } + + fn as_header_size(&self) -> usize { + macro_rules! pseudo_size { + ($name:ident) => {{ + self.$name + .as_ref() + .map(|m| decoded_header_size(stringify!($name).len() + 1, m.as_str().len())) + .unwrap_or(0) + }}; + } + + pseudo_size!(method) + + pseudo_size!(scheme) + + pseudo_size!(status) + + pseudo_size!(authority) + + pseudo_size!(path) + } } -impl IntoHeaderIter for ResponsePseudo { +impl _Pseudo for ResponsePseudo { fn into_iter(self) -> impl Iterator>> { core::iter::once_with(move || super::hpack::Header::Status(self.status)) } + + fn from_header( + &mut self, + header: super::hpack::Header, + max_header_list_size: usize, + is_over_size: &mut bool, + reg: bool, + malformed: &mut bool, + headers_size: &mut usize, + ) { + use super::hpack::Header; + match header { + Header::Status(status) => { + if reg { + tracing::trace!("load_hpack; header malformed -- pseudo not at head of block"); + *malformed = true; + } else { + *headers_size += self.as_header_size(); + if *headers_size < max_header_list_size { + self.status = status; + } else if !*is_over_size { + tracing::trace!("load_hpack; header list size over max"); + *is_over_size = true; + } + } + } + _ => unreachable!(), + } + } + + fn as_header_size(&self) -> usize { + decoded_header_size("status".len() + 1, self.status.as_str().len()) + } } -impl IntoHeaderIter for () { +impl _Pseudo for () { fn into_iter(self) -> impl Iterator>> { core::iter::empty() } + + fn from_header(&mut self, _: super::hpack::Header, _: usize, _: &mut bool, _: bool, _: &mut bool, _: &mut usize) {} + + fn as_header_size(&self) -> usize { + 0 + } } fn decoded_header_size(name: usize, value: usize) -> usize { name + value + 32 } -// + // #[cfg(test)] // mod test { // use std::iter::FromIterator; diff --git a/http/src/h2/proto/mod.rs b/http/src/h2/proto/mod.rs index e7240ac6..940b9b6c 100644 --- a/http/src/h2/proto/mod.rs +++ b/http/src/h2/proto/mod.rs @@ -479,7 +479,7 @@ mod io_uring { } let pseudo = headers::Pseudo::response(parts.status); - let mut headers = headers::Headers::response(stream_id, pseudo, parts.headers); + let mut headers = headers::Headers::new(stream_id, pseudo, parts.headers); match size { BodySize::None => {