From ffebc04ceac65f0b2445bd3b762ec6dae81fc239 Mon Sep 17 00:00:00 2001 From: Cameron Bytheway Date: Tue, 16 Jan 2024 18:31:06 -0700 Subject: [PATCH] feat(s2n-quic-core): add buffer reader --- quic/s2n-quic-core/src/buffer/mod.rs | 25 ++ quic/s2n-quic-core/src/buffer/reader.rs | 48 ++++ quic/s2n-quic-core/src/buffer/reader/chunk.rs | 51 ++++ .../src/buffer/reader/chunk/buf.rs | 113 +++++++++ .../src/buffer/reader/chunk/bytes.rs | 38 +++ .../src/buffer/reader/chunk/force_copy.rs | 43 ++++ .../src/buffer/reader/chunk/io_slice.rs | 228 ++++++++++++++++++ .../src/buffer/reader/chunk/slice.rs | 73 ++++++ .../src/buffer/reader/chunk/tests.rs | 155 ++++++++++++ .../src/buffer/reader/chunk/trailer.rs | 94 ++++++++ quic/s2n-quic-core/src/buffer/reader/empty.rs | 75 ++++++ .../src/buffer/reader/incremental.rs | 154 ++++++++++++ .../src/buffer/reader/max_data.rs | 101 ++++++++ quic/s2n-quic-core/src/buffer/reader/slice.rs | 103 ++++++++ .../src/buffer/receive_buffer.rs | 87 +++++-- quic/s2n-quic-core/src/buffer/writer.rs | 6 + quic/s2n-quic-core/src/buffer/writer/chunk.rs | 57 +++++ .../src/buffer/writer/chunk/buf.rs | 88 +++++++ .../src/buffer/writer/chunk/byte_queue.rs | 72 ++++++ .../src/buffer/writer/chunk/uninit_slice.rs | 36 +++ quic/s2n-quic-core/src/stream/testing.rs | 58 ++++- scripts/copyright_check | 2 +- 22 files changed, 1676 insertions(+), 31 deletions(-) create mode 100644 quic/s2n-quic-core/src/buffer/reader.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/chunk.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/chunk/buf.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/chunk/bytes.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/chunk/force_copy.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/chunk/io_slice.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/chunk/slice.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/chunk/tests.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/chunk/trailer.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/empty.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/incremental.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/max_data.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/slice.rs create mode 100644 quic/s2n-quic-core/src/buffer/writer.rs create mode 100644 quic/s2n-quic-core/src/buffer/writer/chunk.rs create mode 100644 quic/s2n-quic-core/src/buffer/writer/chunk/buf.rs create mode 100644 quic/s2n-quic-core/src/buffer/writer/chunk/byte_queue.rs create mode 100644 quic/s2n-quic-core/src/buffer/writer/chunk/uninit_slice.rs diff --git a/quic/s2n-quic-core/src/buffer/mod.rs b/quic/s2n-quic-core/src/buffer/mod.rs index d252d8b391..342023ed67 100644 --- a/quic/s2n-quic-core/src/buffer/mod.rs +++ b/quic/s2n-quic-core/src/buffer/mod.rs @@ -1,6 +1,31 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +pub mod reader; mod receive_buffer; +pub mod writer; pub use receive_buffer::*; + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum Error { + /// An invalid data range was provided + OutOfRange, + /// The provided final size was invalid for the buffer's state + InvalidFin, +} + +#[cfg(feature = "std")] +impl std::error::Error for Error {} + +impl core::fmt::Display for Error { + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + match self { + Self::OutOfRange => write!(f, "write extends out of the maximum possible offset"), + Self::InvalidFin => write!( + f, + "write modifies the final offset in a non-compliant manner" + ), + } + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader.rs b/quic/s2n-quic-core/src/buffer/reader.rs new file mode 100644 index 0000000000..ccd13c783a --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader.rs @@ -0,0 +1,48 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::varint::VarInt; + +pub mod chunk; +mod empty; +pub mod incremental; +mod max_data; +mod slice; + +pub use chunk::Chunk; +pub use empty::Empty; +pub use incremental::Incremental; +pub use max_data::MaxData; +pub use slice::Slice; + +pub trait Reader: Chunk { + /// Returns the currently read offset for the stream + fn current_offset(&self) -> VarInt; + + /// Returns the final offset for the stream + fn final_offset(&self) -> Option; + + /// Returns `true` if the reader has the final offset buffered + #[inline] + fn has_buffered_fin(&self) -> bool { + self.final_offset().map_or(false, |fin| { + let buffered_end = self + .current_offset() + .as_u64() + .saturating_add(self.buffered_len() as u64); + fin == buffered_end + }) + } + + /// Limits the maximum offset that the caller can read from the reader + #[inline] + fn with_max_data(&mut self, max_data: VarInt) -> MaxData { + MaxData::new(self, max_data) + } + + /// Temporarily clears the buffer for the reader, while preserving the offsets + #[inline] + fn with_empty_buffer(&self) -> Empty { + Empty::new(self) + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/chunk.rs b/quic/s2n-quic-core/src/buffer/reader/chunk.rs new file mode 100644 index 0000000000..3a93fdf9a0 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/chunk.rs @@ -0,0 +1,51 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +mod buf; +mod bytes; +mod force_copy; +mod io_slice; +mod slice; +mod trailer; + +#[cfg(test)] +mod tests; + +pub use buf::Buf; +pub use force_copy::ForceCopy; +pub use io_slice::IoSlice; +pub use trailer::Trailer; + +pub trait Chunk { + type Error; + + /// Returns the length of the chunk + fn buffered_len(&self) -> usize; + + /// Returns if the chunk is empty + #[inline] + fn buffer_is_empty(&self) -> bool { + self.buffered_len() == 0 + } + + /// Reads the current trailer for the chunk + fn read_trailer(&mut self, watermark: usize) -> Result, Self::Error>; + + /// Copies the chunk of bytes into `dest`. + /// + /// `dest` must always be less than or equal to the value returned from `buffered_len`. + /// + /// The chunk may optionally return a `Trailer`, which can be used by the caller to defer + /// copying the trailing chunk until later. + fn copy_into(&mut self, dest: &mut Dest) -> Result, Self::Error> + where + Dest: crate::buffer::writer::Chunk; + + /// Forces the entire chunk to be copied + /// + /// The returned `Trailer` will always be empty. + #[inline] + fn force_copy(&mut self) -> ForceCopy { + ForceCopy::new(self) + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/chunk/buf.rs b/quic/s2n-quic-core/src/buffer/reader/chunk/buf.rs new file mode 100644 index 0000000000..5950ccb5eb --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/chunk/buf.rs @@ -0,0 +1,113 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Chunk, Trailer}; +use crate::assume; +use core::cmp::Ordering; + +pub struct Buf<'a, B: bytes::Buf> { + buf: &'a mut B, + pending: usize, +} + +impl<'a, B> Buf<'a, B> +where + B: bytes::Buf, +{ + #[inline] + pub fn new(buf: &'a mut B) -> Self { + Self { buf, pending: 0 } + } + + #[inline] + fn commit_pending(&mut self) { + if self.pending > 0 { + unsafe { + assume!(self.buf.remaining() >= self.pending); + assume!(self.buf.chunk().len() >= self.pending); + } + self.buf.advance(self.pending); + self.pending = 0; + } + } +} + +impl<'a, B> Chunk for Buf<'a, B> +where + B: bytes::Buf, +{ + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + unsafe { + assume!(self.buf.remaining() >= self.pending); + assume!(self.buf.chunk().len() >= self.pending); + } + self.buf.remaining() - self.pending + } + + #[inline] + fn read_trailer(&mut self, watermark: usize) -> Result { + self.commit_pending(); + let chunk = self.buf.chunk(); + let len = chunk.len().min(watermark); + if len > 0 { + self.pending = len; + } + Ok(chunk[..len].into()) + } + + #[inline] + fn copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + unsafe { + assume!(dest.remaining_capacity() <= self.buffered_len()); + } + + self.commit_pending(); + + loop { + let chunk_len = self.buf.chunk().len(); + + if chunk_len == 0 { + debug_assert_eq!( + self.buf.remaining(), + 0, + "buf returned empty chunk with remaining bytes" + ); + } + + match chunk_len.cmp(&dest.remaining_capacity()) { + Ordering::Less => { + dest.put_slice(self.buf.chunk()); + self.buf.advance(chunk_len); + continue; + } + Ordering::Equal => { + let chunk = self.buf.chunk(); + self.pending = chunk.len(); + return Ok(chunk.into()); + } + Ordering::Greater => { + let len = dest.remaining_capacity(); + let chunk = &self.buf.chunk()[..len]; + self.pending = len; + return Ok(chunk.into()); + } + } + } + } +} + +impl<'a, B> Drop for Buf<'a, B> +where + B: bytes::Buf, +{ + #[inline] + fn drop(&mut self) { + self.commit_pending(); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/chunk/bytes.rs b/quic/s2n-quic-core/src/buffer/reader/chunk/bytes.rs new file mode 100644 index 0000000000..c354e96938 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/chunk/bytes.rs @@ -0,0 +1,38 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Chunk, Trailer}; + +macro_rules! impl_bytes { + ($ty:ty) => { + impl Chunk for $ty { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len() + } + + #[inline] + fn read_trailer(&mut self, watermark: usize) -> Result { + let len = self.len().min(watermark); + if self.len() == len { + Ok(core::mem::replace(self, Self::new()).into()) + } else { + Ok(self.split_to(len).into()) + } + } + + #[inline] + fn copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + self.read_trailer(dest.remaining_capacity()) + } + } + }; +} + +impl_bytes!(bytes::Bytes); +impl_bytes!(bytes::BytesMut); diff --git a/quic/s2n-quic-core/src/buffer/reader/chunk/force_copy.rs b/quic/s2n-quic-core/src/buffer/reader/chunk/force_copy.rs new file mode 100644 index 0000000000..70cd09c5cc --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/chunk/force_copy.rs @@ -0,0 +1,43 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Chunk, Trailer}; + +#[derive(Debug)] +pub struct ForceCopy<'a, C: Chunk + ?Sized>(&'a mut C); + +impl<'a, C: Chunk + ?Sized> ForceCopy<'a, C> { + #[inline] + pub fn new(chunk: &'a mut C) -> Self { + Self(chunk) + } +} + +impl<'a, C: Chunk + ?Sized> Chunk for ForceCopy<'a, C> { + type Error = C::Error; + + #[inline] + fn buffered_len(&self) -> usize { + self.0.buffered_len() + } + + #[inline] + fn buffer_is_empty(&self) -> bool { + self.0.buffer_is_empty() + } + + #[inline] + fn read_trailer(&mut self, watermark: usize) -> Result { + self.0.read_trailer(watermark) + } + + #[inline] + fn copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + let trailer = self.0.copy_into(dest)?; + dest.put_trailer(trailer); + Ok(Trailer::empty()) + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/chunk/io_slice.rs b/quic/s2n-quic-core/src/buffer/reader/chunk/io_slice.rs new file mode 100644 index 0000000000..f0022ce14f --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/chunk/io_slice.rs @@ -0,0 +1,228 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Chunk, Trailer}; +use crate::assume; +use core::cmp::Ordering; + +pub struct IoSlice<'a, T> { + len: usize, + head: &'a [u8], + buf: &'a [T], +} + +impl<'a, T> IoSlice<'a, T> +where + T: core::ops::Deref, +{ + #[inline] + pub fn new(buf: &'a [T]) -> Self { + let mut len = 0; + + let mut first_non_empty = None; + for (idx, buf) in buf.iter().enumerate() { + len += buf.len(); + if !buf.is_empty() && first_non_empty.is_none() { + first_non_empty = Some(idx); + } + } + + if let Some(idx) = first_non_empty { + let buf = &buf[idx..]; + + unsafe { + assume!(!buf.is_empty()); + } + + let mut slice = Self { + len, + head: &[], + buf, + }; + slice.advance_buf_once(); + slice + } else { + Self { + len: 0, + head: &[], + buf: &[], + } + } + } + + #[inline(always)] + fn advance_buf(&mut self) { + while self.head.is_empty() && !self.buf.is_empty() { + self.advance_buf_once(); + } + } + + #[inline(always)] + fn advance_buf_once(&mut self) { + let (head, tail) = self.buf.split_at(1); + self.head = &head[0][..]; + self.buf = tail; + } + + #[inline] + fn sub_len(&mut self, len: usize) { + unsafe { + assume!(self.len >= len); + } + self.set_len(self.len - len); + } + + #[inline] + fn set_len(&mut self, len: usize) { + if cfg!(debug_assertions) { + let mut computed = self.head.len(); + for buf in self.buf.iter() { + computed += buf.len(); + } + assert_eq!(len, computed); + } + self.len = len; + } +} + +impl<'a, T> bytes::Buf for IoSlice<'a, T> +where + T: core::ops::Deref, +{ + #[inline] + fn remaining(&self) -> usize { + self.len + } + + #[inline] + fn chunk(&self) -> &[u8] { + self.head + } + + #[inline] + fn advance(&mut self, mut cnt: usize) { + assert!(cnt <= self.len); + let new_len = self.len - cnt; + + if new_len == 0 { + self.head = &[]; + self.buf = &[]; + self.set_len(new_len); + return; + } + + while cnt > 0 { + let len = self.head.len().min(cnt); + cnt -= len; + + if len >= self.head.len() { + unsafe { + assume!(!self.buf.is_empty()); + } + + self.head = &[]; + self.advance_buf(); + continue; + } + + self.head = &self.head[len..]; + break; + } + + self.set_len(new_len); + } +} + +impl<'a, T> Chunk for IoSlice<'a, T> +where + T: core::ops::Deref, +{ + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len + } + + #[inline] + fn read_trailer(&mut self, watermark: usize) -> Result { + // we only have one chunk left so do the happy path + if self.buf.is_empty() { + let len = self.head.len().min(watermark); + let (head, tail) = self.head.split_at(len); + self.head = tail; + self.set_len(tail.len()); + return Ok(head.into()); + } + + // head can be returned and we need to take the next buf entry + if self.head.len() >= watermark { + let head = self.head; + self.head = &[]; + unsafe { + assume!(!self.buf.is_empty()); + } + self.advance_buf(); + self.sub_len(head.len()); + return Ok(head.into()); + } + + // we just need to split off the current head and return it + let (head, tail) = self.head.split_at(watermark); + self.head = tail; + self.sub_len(head.len()); + Ok(head.into()) + } + + #[inline] + fn copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + debug_assert!(dest.remaining_capacity() <= self.buffered_len()); + + loop { + // we only have one chunk left so do the happy path + if self.buf.is_empty() { + let len = self.head.len().min(dest.remaining_capacity()); + let (head, tail) = self.head.split_at(len); + self.head = tail; + self.set_len(tail.len()); + return Ok(head.into()); + } + + match self.head.len().cmp(&dest.remaining_capacity()) { + // head needs to be copied into dest and we need to take the next buf entry + Ordering::Less => { + let len = self.head.len(); + dest.put_slice(self.head); + self.head = &[]; + unsafe { + assume!(!self.buf.is_empty()); + } + self.advance_buf(); + self.sub_len(len); + continue; + } + // head can be returned and we need to take the next buf entry + Ordering::Equal => { + let head = self.head; + self.head = &[]; + unsafe { + assume!(!self.buf.is_empty()); + } + self.advance_buf(); + self.sub_len(head.len()); + return Ok(head.into()); + } + // we just need to split off the current head and return it + Ordering::Greater => { + let (head, tail) = self.head.split_at(dest.remaining_capacity()); + self.head = tail; + self.sub_len(head.len()); + return Ok(head.into()); + } + } + } + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/chunk/slice.rs b/quic/s2n-quic-core/src/buffer/reader/chunk/slice.rs new file mode 100644 index 0000000000..04dfe8ed3f --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/chunk/slice.rs @@ -0,0 +1,73 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Chunk, Trailer}; +use crate::assume; + +impl Chunk for &[u8] { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len() + } + + #[inline] + fn buffer_is_empty(&self) -> bool { + self.is_empty() + } + + #[inline] + fn read_trailer(&mut self, watermark: usize) -> Result { + let len = self.len().min(watermark); + let (trailer, remaining) = self.split_at(len); + *self = remaining; + Ok(trailer.into()) + } + + #[inline] + fn copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + unsafe { + assume!(dest.remaining_capacity() <= self.len()); + } + let (trailer, remaining) = self.split_at(dest.remaining_capacity()); + *self = remaining; + Ok(trailer.into()) + } +} + +impl Chunk for &mut [u8] { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len() + } + + #[inline] + fn buffer_is_empty(&self) -> bool { + self.is_empty() + } + + #[inline] + fn read_trailer(&mut self, watermark: usize) -> Result { + let len = self.len().min(watermark); + let (trailer, remaining) = core::mem::take(self).split_at_mut(len); + *self = remaining; + Ok((&*trailer).into()) + } + + #[inline] + fn copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + unsafe { + assume!(dest.remaining_capacity() <= self.len()); + } + self.read_trailer(dest.remaining_capacity()) + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/chunk/tests.rs b/quic/s2n-quic-core/src/buffer/reader/chunk/tests.rs new file mode 100644 index 0000000000..84740f4f96 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/chunk/tests.rs @@ -0,0 +1,155 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::*; + +/// ensures each implementation returns a trailing chunk correctly +#[test] +fn trailer_test() { + let mut dest = vec![]; + let mut source = vec![]; + bolero::check!() + .with_type::<(u16, u16)>() + .for_each(|(dest_len, source_len)| { + source.resize(*source_len as usize, 42); + + let dest_len = (*source_len).min(*dest_len) as usize; + dest.resize(dest_len, 0); + let expected = &source[..dest_len]; + let dest = &mut dest[..]; + + // direct implementation + { + let mut chunk = &source[..]; + let mut target = &mut dest[..]; + + let trailer = chunk.copy_into(&mut target).unwrap(); + assert_eq!(expected, &*trailer); + assert!( + dest.iter().all(|b| *b == 0), + "no bytes should be copied into dest" + ); + } + + // IoSlice implementation + { + let io_slice = [&source[..]]; + let mut chunk = IoSlice::new(&io_slice); + let mut target = &mut dest[..]; + + let trailer = chunk.copy_into(&mut target).unwrap(); + assert_eq!(expected, &*trailer); + assert!( + dest.iter().all(|b| *b == 0), + "no bytes should be copied into dest" + ); + } + + // Buf implementation + { + let mut slice = &source[..]; + let mut chunk = Buf::new(&mut slice); + let mut target = &mut dest[..]; + + let trailer = chunk.copy_into(&mut target).unwrap(); + assert_eq!(expected, &*trailer); + assert!( + dest.iter().all(|b| *b == 0), + "no bytes should be copied into dest" + ); + } + + // force_copy + { + let mut source = &source[..]; + let mut chunk = source.force_copy(); + let mut target = &mut dest[..]; + + let trailer = chunk.copy_into(&mut target).unwrap(); + assert!(trailer.is_empty()); + assert_eq!(expected, dest); + dest.fill(0); + } + }); +} + +/// ensures each chunk type correctly copies multiple chunks into the destination +#[test] +fn io_slice_test() { + let mut dest = vec![]; + let mut source: Vec> = vec![]; + let mut pool = vec![]; + let mut expected = vec![]; + bolero::check!() + .with_type::<(u16, Vec)>() + .for_each(|(dest_len, source_lens)| { + while source.len() > source_lens.len() { + pool.push(source.pop().unwrap()); + } + + while source.len() < source_lens.len() { + source.push(pool.pop().unwrap_or_default()); + } + + let mut source_len = 0; + let mut last_chunk_idx = 0; + let mut last_chunk_len = 0; + let mut remaining_len = *dest_len as usize; + for (idx, (len, source)) in source_lens.iter().zip(&mut source).enumerate() { + let fill = (idx + 1) as u8; + let len = *len as usize; + source.resize(len, fill); + source.fill(fill); + if len > 0 && remaining_len > 0 { + last_chunk_idx = idx; + last_chunk_len = len.min(remaining_len); + } + source_len += len; + remaining_len = remaining_len.saturating_sub(len); + } + + let dest_len = source_len.min(*dest_len as usize); + dest.resize(dest_len, 0); + dest.fill(0); + let dest = &mut dest[..]; + + expected.resize(dest_len, 0); + expected.fill(0); + + { + // don't copy the last chunk, since that should be returned + let source = &source[..last_chunk_idx]; + crate::slice::vectored_copy(source, &mut [&mut expected[..]]); + } + + let expected_chunk = source + .get(last_chunk_idx) + .map(|v| &v[..last_chunk_len]) + .unwrap_or(&[]); + + // IoSlice implementation + { + let mut source = IoSlice::new(&source); + let mut target = &mut dest[..]; + + let chunk = source.copy_into(&mut target).unwrap(); + + assert_eq!(expected, dest); + assert_eq!(expected_chunk, &*chunk); + // reset the destination + dest.fill(0); + } + + // Buf implementation + { + let mut source = IoSlice::new(&source); + let mut source = Buf::new(&mut source); + let mut target = &mut dest[..]; + + let chunk = source.copy_into(&mut target).unwrap(); + + assert_eq!(expected, dest); + assert_eq!(expected_chunk, &*chunk); + } + }); +} diff --git a/quic/s2n-quic-core/src/buffer/reader/chunk/trailer.rs b/quic/s2n-quic-core/src/buffer/reader/chunk/trailer.rs new file mode 100644 index 0000000000..ea1523f655 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/chunk/trailer.rs @@ -0,0 +1,94 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use bytes::{Bytes, BytesMut}; + +/// Trailing chunk from a copy operation +/// +/// This is returned from [`Chunk::copy_into_slice`] to allow the caller to defer copying the final +/// chunk until later. +#[derive(Clone, Debug)] +#[must_use = "trailing data cannot be discarded"] +pub enum Trailer<'a> { + Slice(&'a [u8]), + Bytes(Bytes), + BytesMut(BytesMut), +} + +impl<'a> Default for Trailer<'a> { + #[inline] + fn default() -> Self { + Self::empty() + } +} + +impl<'a> Trailer<'a> { + #[inline] + pub fn empty() -> Self { + Self::Slice(&[]) + } +} + +impl<'a> From<&'a [u8]> for Trailer<'a> { + #[inline] + fn from(chunk: &'a [u8]) -> Self { + Self::Slice(chunk) + } +} + +impl<'a> From for Trailer<'a> { + #[inline] + fn from(chunk: Bytes) -> Self { + Self::Bytes(chunk) + } +} + +impl<'a> From for Trailer<'a> { + #[inline] + fn from(chunk: BytesMut) -> Self { + Self::BytesMut(chunk) + } +} + +impl<'a> core::ops::Deref for Trailer<'a> { + type Target = [u8]; + + #[inline] + fn deref(&self) -> &Self::Target { + match self { + Self::Slice(t) => t, + Self::Bytes(t) => t, + Self::BytesMut(t) => t, + } + } +} + +impl<'a> super::Chunk for Trailer<'a> { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len() + } + + #[inline] + fn read_trailer(&mut self, watermark: usize) -> Result { + match self { + Self::Slice(v) => v.read_trailer(watermark), + Self::Bytes(v) => v.read_trailer(watermark), + Self::BytesMut(v) => v.read_trailer(watermark), + } + } + + #[inline] + fn copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + match self { + Self::Slice(v) => v.copy_into(dest), + Self::Bytes(v) => v.copy_into(dest), + Self::BytesMut(v) => v.copy_into(dest), + } + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/empty.rs b/quic/s2n-quic-core/src/buffer/reader/empty.rs new file mode 100644 index 0000000000..264c25cb19 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/empty.rs @@ -0,0 +1,75 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{chunk, Chunk, Reader, VarInt}; + +/// Returns an empty buffer for the current offset of an inner reader +#[derive(Debug)] +pub struct Empty<'a, R: Reader + ?Sized>(&'a R); + +impl<'a, R: Reader + ?Sized> Empty<'a, R> { + #[inline] + pub fn new(reader: &'a R) -> Self { + Self(reader) + } +} + +impl<'a, R: Reader + ?Sized> Chunk for Empty<'a, R> { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + 0 + } + + #[inline] + fn read_trailer(&mut self, _watermark: usize) -> Result { + Ok(chunk::Trailer::empty()) + } + + #[inline] + fn copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + debug_assert_eq!(dest.remaining_capacity(), 0); + Ok(chunk::Trailer::empty()) + } +} + +impl<'a, R: Reader + ?Sized> Reader for Empty<'a, R> { + #[inline] + fn current_offset(&self) -> VarInt { + self.0.current_offset() + } + + #[inline] + fn final_offset(&self) -> Option { + self.0.final_offset() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::stream::testing::Data; + + #[test] + fn empty_test() { + let mut reader = Data::new(1000); + assert_eq!(reader.buffered_len(), 1000); + + { + assert_eq!(reader.with_empty_buffer().buffered_len(), 0); + } + + let mut dest = &mut [0u8; 16][..]; + let trailer = reader.copy_into(&mut dest).unwrap(); + assert_eq!(trailer.len(), 16); + + let reader = reader.with_empty_buffer(); + + assert_eq!(reader.buffered_len(), 0); + assert!(reader.buffer_is_empty()); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/incremental.rs b/quic/s2n-quic-core/src/buffer/reader/incremental.rs new file mode 100644 index 0000000000..9c32e85d7e --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/incremental.rs @@ -0,0 +1,154 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{chunk, Chunk, Reader, VarInt}; +use crate::{buffer, ensure}; + +/// Implements an incremental reader that handles temporary chunks as the stream data +/// +/// This is useful for scenarios where the the stream isn't completely buffered in memory and +/// chunks come in gradually. +#[derive(Debug, Default)] +pub struct Incremental { + current_offset: VarInt, + final_offset: Option, +} + +impl Incremental { + #[inline] + pub fn with_chunk<'a, C: Chunk>(&'a mut self, chunk: &'a mut C) -> WithChunk<'a, C> { + WithChunk { + incremental: self, + chunk, + } + } +} + +impl Chunk for Incremental { + type Error = core::convert::Infallible; + + #[inline] + fn read_trailer(&mut self, _watermark: usize) -> Result { + Ok(Default::default()) + } + + #[inline] + fn copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + debug_assert_eq!(dest.remaining_capacity(), 0); + Ok(Default::default()) + } + + #[inline] + fn buffered_len(&self) -> usize { + 0 + } +} + +impl Reader for Incremental { + #[inline] + fn current_offset(&self) -> VarInt { + self.current_offset + } + + #[inline] + fn final_offset(&self) -> Option { + self.final_offset + } +} + +pub struct WithChunk<'a, C: Chunk> { + incremental: &'a mut Incremental, + chunk: &'a mut C, +} + +impl<'a, C: Chunk> WithChunk<'a, C> { + #[inline] + pub fn set_fin(&mut self) -> Result<&mut Self, buffer::Error> { + let final_offset = self + .incremental + .current_offset + .checked_add_usize(self.buffered_len()) + .ok_or(buffer::Error::OutOfRange)?; + + // make sure the final length doesn't change + if let Some(current) = self.incremental.final_offset { + ensure!(final_offset == current, Err(buffer::Error::InvalidFin)); + } + + self.incremental.final_offset = Some(final_offset); + + Ok(self) + } +} + +impl<'a, C: Chunk> Chunk for WithChunk<'a, C> { + type Error = C::Error; + + #[inline] + fn read_trailer(&mut self, watermark: usize) -> Result { + let trailer = self.chunk.read_trailer(watermark)?; + self.incremental.current_offset += trailer.len(); + Ok(trailer) + } + + #[inline] + fn copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + let len = dest.remaining_capacity(); + let trailer = self.chunk.copy_into(dest)?; + self.incremental.current_offset += len; + Ok(trailer) + } + + #[inline] + fn buffered_len(&self) -> usize { + self.chunk.buffered_len() + } +} + +impl<'a, C: Chunk> Reader for WithChunk<'a, C> { + #[inline] + fn current_offset(&self) -> VarInt { + self.incremental.current_offset() + } + + #[inline] + fn final_offset(&self) -> Option { + self.incremental.final_offset() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn incremental_test() { + let mut incremental = Incremental::default(); + + assert_eq!(incremental.current_offset(), VarInt::ZERO); + assert_eq!(incremental.final_offset, None); + assert_eq!(incremental.buffered_len(), 0); + + { + let mut chunk: &[u8] = &[1, 2, 3, 4]; + let mut with_chunk = incremental.with_chunk(&mut chunk); + + assert_eq!(with_chunk.buffered_len(), 4); + + let mut dest: &mut [u8] = &mut [0; 4]; + let trailer = with_chunk.copy_into(&mut dest).unwrap(); + assert_eq!(&*trailer, &[1, 2, 3, 4]); + + assert_eq!(with_chunk.buffered_len(), 0); + } + + assert_eq!(incremental.current_offset(), VarInt::from_u8(4)); + assert_eq!(incremental.buffered_len(), 0); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/max_data.rs b/quic/s2n-quic-core/src/buffer/reader/max_data.rs new file mode 100644 index 0000000000..3c8fba2538 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/max_data.rs @@ -0,0 +1,101 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{chunk, Chunk, Reader, VarInt}; + +/// Wraps a reader and limits the max offset that can be read from it +/// +/// This can be used for applying back pressure to the reader with flow control. +pub struct MaxData<'a, R: Reader + ?Sized> { + buffered_len: usize, + reader: &'a mut R, +} + +impl<'a, R: Reader + ?Sized> MaxData<'a, R> { + #[inline] + pub fn new(reader: &'a mut R, max_data: VarInt) -> Self { + let max_buffered_len = max_data + .as_u64() + .saturating_sub(reader.current_offset().as_u64()); + + let buffered_len = max_buffered_len.min(reader.buffered_len() as u64) as usize; + + MaxData { + buffered_len, + reader, + } + } +} + +impl<'a, R: Reader + ?Sized> Chunk for MaxData<'a, R> { + type Error = R::Error; + + #[inline] + fn buffered_len(&self) -> usize { + self.buffered_len + } + + #[inline] + fn read_trailer(&mut self, watermark: usize) -> Result { + let trailer = self.reader.read_trailer(watermark)?; + unsafe { + assume!(trailer.len() <= self.buffered_len); + } + self.buffered_len -= trailer.len(); + Ok(trailer) + } + + #[inline] + fn copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + let len = dest.remaining_capacity(); + let trailer = self.reader.copy_into(dest)?; + unsafe { + assume!(len <= self.buffered_len); + } + self.buffered_len -= len; + Ok(trailer) + } +} + +impl<'a, R: Reader + ?Sized> Reader for MaxData<'a, R> { + #[inline] + fn current_offset(&self) -> VarInt { + self.reader.current_offset() + } + + #[inline] + fn final_offset(&self) -> Option { + self.reader.final_offset() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::stream::testing::Data; + + #[test] + fn max_data_test() { + let mut reader = Data::new(1000); + assert_eq!(reader.buffered_len(), 1000); + + let max_data = 32usize; + + let mut reader = reader.with_max_data(VarInt::from_u8(max_data as _)); + assert_eq!(reader.buffered_len(), max_data); + + let mut dest = &mut [0u8; 16][..]; + let trailer = reader.copy_into(&mut dest).unwrap(); + assert_eq!(trailer.len(), 16); + + assert_eq!(reader.buffered_len(), max_data - 16); + + let mut dest = &mut [0u8; 16][..]; + let trailer = reader.copy_into(&mut dest).unwrap(); + assert_eq!(trailer.len(), 16); + assert!(reader.buffer_is_empty()); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/slice.rs b/quic/s2n-quic-core/src/buffer/reader/slice.rs new file mode 100644 index 0000000000..702484992e --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/slice.rs @@ -0,0 +1,103 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{chunk, Chunk, Reader, VarInt}; +use crate::buffer; + +/// Wraps a single slice as a reader. +/// +/// This can be used for scenarios where the entire stream is buffered and known up-front. +#[derive(Debug)] +pub struct Slice<'a, C> { + chunk: &'a mut C, + current_offset: VarInt, + final_offset: VarInt, +} + +impl<'a, C> Slice<'a, C> +where + C: Chunk, +{ + #[inline] + pub fn new(chunk: &'a mut C) -> Result { + let final_offset = VarInt::try_from(chunk.buffered_len()) + .ok() + .ok_or(buffer::Error::OutOfRange)?; + Ok(Self { + chunk, + current_offset: VarInt::ZERO, + final_offset, + }) + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.chunk.buffer_is_empty() + } +} + +impl<'a, C> Chunk for Slice<'a, C> +where + C: Chunk, +{ + type Error = C::Error; + + #[inline] + fn read_trailer(&mut self, watermark: usize) -> Result { + let trailer = self.chunk.read_trailer(watermark)?; + self.current_offset += trailer.len(); + Ok(trailer) + } + + #[inline] + fn copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + let len = dest.remaining_capacity(); + let trailer = self.chunk.copy_into(dest)?; + self.current_offset += len; + Ok(trailer) + } + + #[inline] + fn buffered_len(&self) -> usize { + self.chunk.buffered_len() + } +} + +impl<'a, C> Reader for Slice<'a, C> +where + C: Chunk, +{ + #[inline] + fn current_offset(&self) -> VarInt { + self.current_offset + } + + #[inline] + fn final_offset(&self) -> Option { + Some(self.final_offset) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn slice_test() { + let mut chunk: &[u8] = &[1, 2, 3, 4]; + let mut reader = Slice::new(&mut chunk).unwrap(); + + assert_eq!(reader.current_offset(), VarInt::ZERO); + assert_eq!(reader.final_offset(), Some(VarInt::from_u8(4))); + + let mut dest: &mut [u8] = &mut [0; 4]; + let trailer = reader.copy_into(&mut dest).unwrap(); + assert_eq!(&*trailer, &[1, 2, 3, 4]); + + assert_eq!(reader.current_offset(), VarInt::from_u8(4)); + assert!(reader.buffer_is_empty()); + } +} diff --git a/quic/s2n-quic-core/src/buffer/receive_buffer.rs b/quic/s2n-quic-core/src/buffer/receive_buffer.rs index dcb86b5acc..6c34bf7fd1 100644 --- a/quic/s2n-quic-core/src/buffer/receive_buffer.rs +++ b/quic/s2n-quic-core/src/buffer/receive_buffer.rs @@ -7,7 +7,6 @@ use crate::varint::VarInt; use alloc::collections::{vec_deque, VecDeque}; use bytes::BytesMut; -use core::fmt; mod probe; mod request; @@ -19,29 +18,7 @@ mod tests; use request::Request; use slot::Slot; -/// Enumerates error that can occur while inserting data into the Receive Buffer -#[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub enum ReceiveBufferError { - /// An invalid data range was provided - OutOfRange, - /// The provided final size was invalid for the buffer's state - InvalidFin, -} - -#[cfg(feature = "std")] -impl std::error::Error for ReceiveBufferError {} - -impl fmt::Display for ReceiveBufferError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Self::OutOfRange => write!(f, "write extends out of the maximum possible offset"), - Self::InvalidFin => write!( - f, - "write modifies the final offset in a non-compliant manner" - ), - } - } -} +pub use super::Error as ReceiveBufferError; /// The default buffer size for slots that the [`ReceiveBuffer`] uses. /// @@ -707,3 +684,65 @@ impl<'a> Iterator for Drain<'a> { (len, Some(len)) } } + +impl crate::buffer::reader::Chunk for ReceiveBuffer { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len() + } + + #[inline] + fn buffer_is_empty(&self) -> bool { + self.is_empty() + } + + #[inline] + fn read_trailer( + &mut self, + watermark: usize, + ) -> Result { + if let Some(chunk) = self.pop_watermarked(watermark) { + return Ok(chunk.into()); + } + + Ok(Default::default()) + } + + #[inline] + fn copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + while let Some(chunk) = self.pop_watermarked(dest.remaining_capacity()) { + // if the chunk matches the same length as dest, then it's the trailer + if chunk.len() == dest.remaining_capacity() { + return Ok(chunk.into()); + } + + // otherwise copy the chunks into the destination + dest.put_bytes_mut(chunk); + } + + Ok(Default::default()) + } +} + +impl crate::buffer::reader::Reader for ReceiveBuffer { + #[inline] + fn current_offset(&self) -> VarInt { + unsafe { + // SAFETY: offset will always fit into a VarInt + VarInt::new_unchecked(self.start_offset) + } + } + + #[inline] + fn final_offset(&self) -> Option { + self.final_size().map(|v| unsafe { + // SAFETY: offset will always fit into a VarInt + VarInt::new_unchecked(v) + }) + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer.rs b/quic/s2n-quic-core/src/buffer/writer.rs new file mode 100644 index 0000000000..f8bf840b48 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer.rs @@ -0,0 +1,6 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +pub mod chunk; + +pub use chunk::Chunk; diff --git a/quic/s2n-quic-core/src/buffer/writer/chunk.rs b/quic/s2n-quic-core/src/buffer/writer/chunk.rs new file mode 100644 index 0000000000..292a6ae2f4 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/chunk.rs @@ -0,0 +1,57 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::buffer::reader::chunk::Trailer; +use bytes::{buf::UninitSlice, Bytes, BytesMut}; + +mod buf; +mod byte_queue; +mod uninit_slice; + +pub use buf::BufMut; + +pub trait Chunk { + const SPECIALIZES_BYTES: bool = false; + const SPECIALIZES_BYTES_MUT: bool = false; + + fn put_slice(&mut self, bytes: &[u8]); + + #[inline(always)] + fn put_uninit_slice(&mut self, payload_len: usize, f: F) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + // we can specialize on an empty payload + ensure!(payload_len == 0, Ok(false)); + + f(UninitSlice::new(&mut []))?; + + Ok(true) + } + + fn remaining_capacity(&self) -> usize; + + #[inline] + fn has_remaining_capacity(&self) -> bool { + self.remaining_capacity() > 0 + } + + #[inline] + fn put_bytes(&mut self, bytes: Bytes) { + self.put_slice(&bytes); + } + + #[inline] + fn put_bytes_mut(&mut self, bytes: BytesMut) { + self.put_slice(&bytes); + } + + #[inline] + fn put_trailer(&mut self, trailer: Trailer) { + match trailer { + Trailer::Slice(v) => self.put_slice(v), + Trailer::Bytes(v) => self.put_bytes(v), + Trailer::BytesMut(v) => self.put_bytes_mut(v), + } + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer/chunk/buf.rs b/quic/s2n-quic-core/src/buffer/writer/chunk/buf.rs new file mode 100644 index 0000000000..76c3a1db10 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/chunk/buf.rs @@ -0,0 +1,88 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Chunk, UninitSlice}; + +pub struct BufMut<'a, T: bytes::BufMut> { + buf_mut: &'a mut T, +} + +impl<'a, T: bytes::BufMut> BufMut<'a, T> { + #[inline] + pub fn new(buf_mut: &'a mut T) -> Self { + Self { buf_mut } + } +} + +impl<'a, T: bytes::BufMut> Chunk for BufMut<'a, T> { + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + self.buf_mut.put_slice(bytes); + } + + #[inline] + fn remaining_capacity(&self) -> usize { + self.buf_mut.remaining_mut() + } + + #[inline] + fn put_uninit_slice(&mut self, payload_len: usize, f: F) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + let chunk = self.buf_mut.chunk_mut(); + ensure!(chunk.len() >= payload_len, Ok(false)); + + f(&mut chunk[..payload_len])?; + + unsafe { + self.buf_mut.advance_mut(payload_len); + } + + Ok(true) + } +} + +macro_rules! impl_buf_mut { + ($ty:ty) => { + impl Chunk for $ty { + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + bytes::BufMut::put_slice(self, bytes); + } + + #[inline] + fn remaining_capacity(&self) -> usize { + bytes::BufMut::remaining_mut(self) + } + + #[inline] + fn put_uninit_slice( + &mut self, + payload_len: usize, + f: F, + ) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + use bytes::BufMut; + + let chunk = self.chunk_mut(); + ensure!(chunk.len() >= payload_len, Ok(false)); + + f(&mut chunk[..payload_len])?; + + unsafe { + self.advance_mut(payload_len); + } + + Ok(true) + } + } + }; +} + +impl_buf_mut!(bytes::BytesMut); +impl_buf_mut!(&mut [u8]); +impl_buf_mut!(&mut [core::mem::MaybeUninit]); +impl_buf_mut!(alloc::vec::Vec); diff --git a/quic/s2n-quic-core/src/buffer/writer/chunk/byte_queue.rs b/quic/s2n-quic-core/src/buffer/writer/chunk/byte_queue.rs new file mode 100644 index 0000000000..0a8d6f21b1 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/chunk/byte_queue.rs @@ -0,0 +1,72 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Bytes, BytesMut, Chunk}; +use alloc::{collections::VecDeque, vec::Vec}; + +macro_rules! impl_queue { + ($ty:ident, $push:ident) => { + impl Chunk for $ty { + const SPECIALIZES_BYTES: bool = true; + const SPECIALIZES_BYTES_MUT: bool = true; + + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + self.put_bytes(Bytes::copy_from_slice(bytes)); + } + + #[inline] + fn remaining_capacity(&self) -> usize { + usize::MAX + } + + #[inline] + fn has_remaining_capacity(&self) -> bool { + true + } + + #[inline] + fn put_bytes(&mut self, bytes: Bytes) { + self.$push(bytes); + } + + #[inline] + fn put_bytes_mut(&mut self, bytes: BytesMut) { + self.$push(bytes.freeze()); + } + } + + impl Chunk for $ty { + const SPECIALIZES_BYTES_MUT: bool = true; + + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + self.put_bytes_mut(BytesMut::from(bytes)); + } + + #[inline] + fn remaining_capacity(&self) -> usize { + usize::MAX + } + + #[inline] + fn has_remaining_capacity(&self) -> bool { + true + } + + #[inline] + fn put_bytes(&mut self, bytes: Bytes) { + // we can't convert Bytes into BytesMut so we'll need to copy it + self.put_slice(&bytes); + } + + #[inline] + fn put_bytes_mut(&mut self, bytes: BytesMut) { + self.$push(bytes); + } + } + }; +} + +impl_queue!(Vec, push); +impl_queue!(VecDeque, push_back); diff --git a/quic/s2n-quic-core/src/buffer/writer/chunk/uninit_slice.rs b/quic/s2n-quic-core/src/buffer/writer/chunk/uninit_slice.rs new file mode 100644 index 0000000000..66b08fea76 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/chunk/uninit_slice.rs @@ -0,0 +1,36 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::Chunk; +use bytes::buf::UninitSlice; + +impl Chunk for &mut UninitSlice { + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + self[..bytes.len()].copy_from_slice(bytes); + let empty = UninitSlice::new(&mut []); + let next = core::mem::replace(self, empty); + *self = &mut next[bytes.len()..]; + } + + #[inline] + fn put_uninit_slice(&mut self, payload_len: usize, f: F) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + ensure!(self.len() >= payload_len, Ok(false)); + + f(&mut self[..payload_len])?; + + let empty = UninitSlice::new(&mut []); + let next = core::mem::replace(self, empty); + *self = &mut next[payload_len..]; + + Ok(true) + } + + #[inline] + fn remaining_capacity(&self) -> usize { + self.len() + } +} diff --git a/quic/s2n-quic-core/src/stream/testing.rs b/quic/s2n-quic-core/src/stream/testing.rs index d815b9f53f..c61318d8ae 100644 --- a/quic/s2n-quic-core/src/stream/testing.rs +++ b/quic/s2n-quic-core/src/stream/testing.rs @@ -3,14 +3,12 @@ //! A model that ensures stream data is correctly sent and received between peers -#[cfg(feature = "generator")] -use bolero_generator::*; - -#[cfg(test)] -use bolero::generator::*; - +use crate::buffer::reader; use bytes::Bytes; +#[cfg(any(test, feature = "generator"))] +use bolero_generator::*; + static DATA: Bytes = { const INNER: [u8; DATA_LEN] = { let mut data = [0; DATA_LEN]; @@ -152,6 +150,54 @@ impl Data { } } +impl reader::Chunk for Data { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + (self.len - self.offset).try_into().unwrap() + } + + #[inline] + fn read_trailer(&mut self, watermark: usize) -> Result { + if let Some(chunk) = self.send_one(watermark) { + return Ok(chunk.into()); + } + + Ok(Default::default()) + } + + #[inline] + fn copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + while let Some(chunk) = self.send_one(dest.remaining_capacity()) { + // if the chunk matches the destination then it's a trailer + if chunk.len() == dest.remaining_capacity() { + return Ok(chunk.into()); + } + + // otherwise copy the chunk into the destination + dest.put_bytes(chunk); + } + + Ok(Default::default()) + } +} + +impl reader::Reader for Data { + #[inline] + fn current_offset(&self) -> crate::varint::VarInt { + self.offset().try_into().unwrap() + } + + #[inline] + fn final_offset(&self) -> Option { + Some(self.len.try_into().unwrap()) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/scripts/copyright_check b/scripts/copyright_check index e2003e87de..1846e45b8c 100755 --- a/scripts/copyright_check +++ b/scripts/copyright_check @@ -7,7 +7,7 @@ set -e -S2N_QUIC_FILES=$(find "$PWD" -type f \( -name "*.rs" -o -name "*.py" \) -not \( -path "*/s2n-quic/target/*" -o -path "*/s2n-tls-sys/s2n/*" \)) +S2N_QUIC_FILES=$(find "$PWD" -type f \( -name "*.rs" -o -name "*.py" \) -not -path "*/target/*") FAILED=0