diff --git a/quic/s2n-quic-bench/src/buffer.rs b/quic/s2n-quic-bench/src/buffer.rs index a3835f22fd..c40c942dd6 100644 --- a/quic/s2n-quic-bench/src/buffer.rs +++ b/quic/s2n-quic-bench/src/buffer.rs @@ -2,7 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 use criterion::{black_box, BenchmarkId, Criterion, Throughput}; -use s2n_quic_core::{buffer::ReceiveBuffer, varint::VarInt}; +use s2n_quic_core::{ + buffer::{reader::Storage as _, writer, Reassembler}, + varint::VarInt, +}; pub fn benchmarks(c: &mut Criterion) { let mut group = c.benchmark_group("buffer"); @@ -13,19 +16,21 @@ pub fn benchmarks(c: &mut Criterion) { group.throughput(Throughput::Bytes(input.len() as _)); group.bench_with_input(BenchmarkId::new("skip", size), &input, |b, _input| { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); + let size = VarInt::try_from(size).unwrap(); b.iter(move || { buffer.skip(black_box(size)).unwrap(); }); }); group.bench_with_input(BenchmarkId::new("write_at", size), &input, |b, input| { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); let mut offset = VarInt::from_u8(0); let len = VarInt::new(input.len() as _).unwrap(); b.iter(move || { buffer.write_at(offset, input).unwrap(); - buffer.copy_into_buf(&mut NoOpBuf); + // Avoid oversampling the `pop` implementation + buffer.copy_into(&mut writer::storage::Discard).unwrap(); offset += len; }); }); @@ -36,7 +41,7 @@ pub fn benchmarks(c: &mut Criterion) { BenchmarkId::new("write_at_fragmented", size), &input, |b, input| { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); let mut offset = VarInt::from_u8(0); let len = VarInt::new(input.len() as _).unwrap(); b.iter(move || { @@ -44,34 +49,11 @@ pub fn benchmarks(c: &mut Criterion) { buffer.write_at(first_offset, input).unwrap(); let second_offset = offset; buffer.write_at(second_offset, input).unwrap(); - buffer.copy_into_buf(&mut NoOpBuf); + // Avoid oversampling the `pop` implementation + buffer.copy_into(&mut writer::storage::Discard).unwrap(); offset = first_offset + len; }); }, ); } } - -/// A BufMut implementation that doesn't actually copy data into it -/// -/// This is used to avoid oversampling the `pop` implementation for -/// `write_at` benchmarks. -struct NoOpBuf; - -unsafe impl bytes::BufMut for NoOpBuf { - #[inline] - fn remaining_mut(&self) -> usize { - usize::MAX - } - - #[inline] - unsafe fn advance_mut(&mut self, _cnt: usize) {} - - #[inline] - fn put_slice(&mut self, _slice: &[u8]) {} - - #[inline] - fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice { - unimplemented!() - } -} diff --git a/quic/s2n-quic-core/src/buffer.rs b/quic/s2n-quic-core/src/buffer.rs new file mode 100644 index 0000000000..3ae9be32d3 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer.rs @@ -0,0 +1,14 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +pub mod duplex; +mod error; +pub mod reader; +pub mod reassembler; +pub mod writer; + +pub use duplex::Duplex; +pub use error::Error; +pub use reader::Reader; +pub use reassembler::Reassembler; +pub use writer::Writer; diff --git a/quic/s2n-quic-core/src/buffer/duplex.rs b/quic/s2n-quic-core/src/buffer/duplex.rs new file mode 100644 index 0000000000..186a22d2b7 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/duplex.rs @@ -0,0 +1,17 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Error, Reader, Writer}; +use crate::varint::VarInt; + +mod split; + +pub use split::Split; + +pub trait Duplex: Reader + Writer {} + +impl Duplex for T {} + +pub trait Skip: Duplex { + fn skip(&mut self, len: VarInt, final_offset: Option) -> Result<(), Error>; +} diff --git a/quic/s2n-quic-core/src/buffer/duplex/split.rs b/quic/s2n-quic-core/src/buffer/duplex/split.rs new file mode 100644 index 0000000000..c312e38eaa --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/duplex/split.rs @@ -0,0 +1,164 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + buffer::{ + duplex, + reader::{self, storage::Infallible as _, Reader, Storage as _}, + writer::{self, Writer}, + Error, + }, + varint::VarInt, +}; +use core::convert::Infallible; + +pub struct Split<'a, C, D> +where + C: writer::Storage + ?Sized, + D: duplex::Skip + ?Sized, +{ + chunk: &'a mut C, + duplex: &'a mut D, +} + +impl<'a, C, D> Split<'a, C, D> +where + C: writer::Storage + ?Sized, + D: duplex::Skip + ?Sized, +{ + #[inline] + pub fn new(chunk: &'a mut C, duplex: &'a mut D) -> Self { + Self { chunk, duplex } + } +} + +/// Delegates to the inner Duplex +impl<'a, C, D> reader::Storage for Split<'a, C, D> +where + C: writer::Storage + ?Sized, + D: duplex::Skip + ?Sized, +{ + type Error = D::Error; + + #[inline] + fn buffered_len(&self) -> usize { + self.duplex.buffered_len() + } + + #[inline] + fn buffer_is_empty(&self) -> bool { + self.duplex.buffer_is_empty() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result, Self::Error> { + self.duplex.read_chunk(watermark) + } + + #[inline] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result, Self::Error> + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + self.duplex.partial_copy_into(dest) + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + self.duplex.copy_into(dest) + } +} + +/// Delegates to the inner Duplex +impl<'a, C, D> Reader for Split<'a, C, D> +where + C: writer::Storage + ?Sized, + D: duplex::Skip + ?Sized, +{ + #[inline] + fn current_offset(&self) -> VarInt { + self.duplex.current_offset() + } + + #[inline] + fn final_offset(&self) -> Option { + self.duplex.final_offset() + } + + #[inline] + fn has_buffered_fin(&self) -> bool { + self.duplex.has_buffered_fin() + } + + #[inline] + fn is_consumed(&self) -> bool { + self.duplex.is_consumed() + } +} + +impl<'a, C, D> Writer for Split<'a, C, D> +where + C: writer::Storage + ?Sized, + D: duplex::Skip + ?Sized, +{ + #[inline] + fn copy_from(&mut self, reader: &mut R) -> Result<(), Error> + where + R: Reader + ?Sized, + { + // enable reader checks + let mut reader = reader.with_checks(); + let reader = &mut reader; + + let initial_offset = reader.current_offset(); + let final_offset = reader.final_offset(); + let is_contiguous = initial_offset == self.duplex.current_offset(); + + { + // if the chunk specializes writing zero-copy Bytes/BytesMut, then just write to the + // receive buffer, since that's what it stores + let mut should_delegate = C::SPECIALIZES_BYTES || C::SPECIALIZES_BYTES_MUT; + + // if this packet is non-contiguous, then delegate to the wrapped writer + should_delegate |= !is_contiguous; + + // if the chunk doesn't have any remaining capacity, then delegate + should_delegate |= !self.chunk.has_remaining_capacity(); + + if should_delegate { + self.duplex.copy_from(reader)?; + + if !self.duplex.buffer_is_empty() && self.chunk.has_remaining_capacity() { + self.duplex.infallible_copy_into(self.chunk); + } + + return Ok(()); + } + } + + debug_assert!( + self.chunk.has_remaining_capacity(), + "this code should only be executed if the chunk is empty" + ); + + reader.copy_into(self.chunk)?; + + let write_len = reader.current_offset() - initial_offset; + + self.duplex + .skip(write_len, final_offset) + .map_err(Error::mapped)?; + + if !reader.buffer_is_empty() { + self.duplex.copy_from(reader)?; + } + + Ok(()) + } +} diff --git a/quic/s2n-quic-core/src/buffer/error.rs b/quic/s2n-quic-core/src/buffer/error.rs new file mode 100644 index 0000000000..1c321d2118 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/error.rs @@ -0,0 +1,46 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum Error { + /// An invalid data range was provided + OutOfRange, + /// The provided final size was invalid for the buffer's state + InvalidFin, + /// The provided reader failed + ReaderError(Reader), +} + +impl From for Error { + #[inline] + fn from(reader: Reader) -> Self { + Self::ReaderError(reader) + } +} + +impl Error { + #[inline] + pub fn mapped(error: Error) -> Error { + match error { + Error::OutOfRange => Error::OutOfRange, + Error::InvalidFin => Error::InvalidFin, + Error::ReaderError(_) => unreachable!(), + } + } +} + +#[cfg(feature = "std")] +impl std::error::Error for Error {} + +impl core::fmt::Display for Error { + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + match self { + Self::OutOfRange => write!(f, "write extends out of the maximum possible offset"), + Self::InvalidFin => write!( + f, + "write modifies the final offset in a non-compliant manner" + ), + Self::ReaderError(reader) => write!(f, "the provided reader failed with: {reader}"), + } + } +} diff --git a/quic/s2n-quic-core/src/buffer/mod.rs b/quic/s2n-quic-core/src/buffer/mod.rs deleted file mode 100644 index d252d8b391..0000000000 --- a/quic/s2n-quic-core/src/buffer/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 - -mod receive_buffer; - -pub use receive_buffer::*; diff --git a/quic/s2n-quic-core/src/buffer/reader.rs b/quic/s2n-quic-core/src/buffer/reader.rs new file mode 100644 index 0000000000..995b894661 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader.rs @@ -0,0 +1,71 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::varint::VarInt; + +pub mod checked; +mod empty; +pub mod incremental; +mod limit; +mod slice; +pub mod storage; + +pub use checked::Checked; +pub use empty::Empty; +pub use incremental::Incremental; +pub use limit::Limit; +pub use slice::Slice; +pub use storage::Storage; + +pub trait Reader: Storage { + /// Returns the currently read offset for the stream + fn current_offset(&self) -> VarInt; + + /// Returns the final offset for the stream + fn final_offset(&self) -> Option; + + /// Returns `true` if the reader has the final offset buffered + #[inline] + fn has_buffered_fin(&self) -> bool { + self.final_offset().map_or(false, |fin| { + let buffered_end = self + .current_offset() + .as_u64() + .saturating_add(self.buffered_len() as u64); + fin == buffered_end + }) + } + + /// Returns `true` if the reader is finished producing data + #[inline] + fn is_consumed(&self) -> bool { + self.final_offset() + .map_or(false, |fin| fin == self.current_offset()) + } + + /// Limits the maximum offset that the caller can read from the reader + #[inline] + fn with_max_data(&mut self, max_data: VarInt) -> Limit { + let max_buffered_len = max_data.saturating_sub(self.current_offset()); + let max_buffered_len = max_buffered_len.as_u64().min(self.buffered_len() as u64) as usize; + self.with_limit(max_buffered_len) + } + + /// Limits the maximum amount of data that the caller can read from the reader + #[inline] + fn with_limit(&mut self, max_buffered_len: usize) -> Limit { + Limit::new(self, max_buffered_len) + } + + /// Temporarily clears the buffer for the reader, while preserving the offsets + #[inline] + fn with_empty_buffer(&self) -> Empty { + Empty::new(self) + } + + /// Enables checking the reader for correctness invariants + #[inline] + fn with_checks(&mut self) -> Checked { + Checked::new(self) + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/checked.rs b/quic/s2n-quic-core/src/buffer/reader/checked.rs new file mode 100644 index 0000000000..49fa8f646b --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/checked.rs @@ -0,0 +1,210 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Reader, Storage}; +use crate::{buffer::writer, varint::VarInt}; + +#[cfg(debug_assertions)] +use crate::buffer::reader::storage::Infallible; + +pub struct Checked<'a, R> +where + R: Reader + ?Sized, +{ + inner: &'a mut R, + #[cfg(debug_assertions)] + chunk: alloc::vec::Vec, +} + +impl<'a, R> Checked<'a, R> +where + R: Reader + ?Sized, +{ + #[inline(always)] + pub fn new(inner: &'a mut R) -> Self { + Self { + inner, + #[cfg(debug_assertions)] + chunk: Default::default(), + } + } +} + +#[cfg(not(debug_assertions))] +impl<'a, R> Storage for Checked<'a, R> +where + R: Reader + ?Sized, +{ + type Error = R::Error; + + #[inline(always)] + fn buffered_len(&self) -> usize { + self.inner.buffered_len() + } + + #[inline(always)] + fn read_chunk(&mut self, watermark: usize) -> Result, Self::Error> { + self.inner.read_chunk(watermark) + } + + #[inline(always)] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result, Self::Error> + where + Dest: writer::Storage + ?Sized, + { + self.inner.partial_copy_into(dest) + } + + #[inline(always)] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: writer::Storage + ?Sized, + { + self.inner.copy_into(dest) + } +} + +#[cfg(debug_assertions)] +impl<'a, R> Storage for Checked<'a, R> +where + R: Reader + ?Sized, +{ + type Error = R::Error; + + #[inline] + fn buffered_len(&self) -> usize { + self.inner.buffered_len() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result, Self::Error> { + let snapshot = Snapshot::new(self.inner, watermark); + + let mut chunk = self.inner.read_chunk(watermark)?; + + // copy the returned chunk into another buffer so we can read the `inner` state + self.chunk.clear(); + chunk.infallible_copy_into(&mut self.chunk); + + snapshot.check(self.inner, 0, self.chunk.len()); + + Ok(self.chunk[..].into()) + } + + #[inline] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result, Self::Error> + where + Dest: writer::Storage + ?Sized, + { + let snapshot = Snapshot::new(self.inner, dest.remaining_capacity()); + let mut dest = dest.tracked(); + + let mut chunk = self.inner.partial_copy_into(&mut dest)?; + + // copy the returned chunk into another buffer so we can read the `inner` state + self.chunk.clear(); + chunk.infallible_copy_into(&mut self.chunk); + + snapshot.check(self.inner, dest.written_len(), self.chunk.len()); + + Ok(self.chunk[..].into()) + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: writer::Storage + ?Sized, + { + let snapshot = Snapshot::new(self.inner, dest.remaining_capacity()); + let mut dest = dest.tracked(); + + self.inner.copy_into(&mut dest)?; + + snapshot.check(self.inner, dest.written_len(), 0); + + Ok(()) + } +} + +impl<'a, R> Reader for Checked<'a, R> +where + R: Reader + ?Sized, +{ + #[inline(always)] + fn current_offset(&self) -> VarInt { + self.inner.current_offset() + } + + #[inline(always)] + fn final_offset(&self) -> Option { + self.inner.final_offset() + } + + #[inline(always)] + fn has_buffered_fin(&self) -> bool { + self.inner.has_buffered_fin() + } + + #[inline(always)] + fn is_consumed(&self) -> bool { + self.inner.is_consumed() + } +} + +#[cfg(debug_assertions)] +struct Snapshot { + current_offset: VarInt, + final_offset: Option, + buffered_len: usize, + dest_capacity: usize, +} + +#[cfg(debug_assertions)] +impl Snapshot { + #[inline] + fn new(reader: &R, dest_capacity: usize) -> Self { + let current_offset = reader.current_offset(); + let final_offset = reader.final_offset(); + let buffered_len = reader.buffered_len(); + Self { + current_offset, + final_offset, + buffered_len, + dest_capacity, + } + } + + #[inline] + fn check(&self, reader: &R, dest_written_len: usize, chunk_len: usize) { + assert!( + chunk_len <= self.dest_capacity, + "chunk exceeded destination" + ); + + let write_len = reader.current_offset() - self.current_offset; + + assert_eq!( + dest_written_len as u64 + chunk_len as u64, + write_len.as_u64(), + "{} reader misreporting offsets", + core::any::type_name::(), + ); + + assert!(write_len <= self.buffered_len as u64); + + if self.final_offset.is_some() { + assert_eq!( + reader.final_offset(), + self.final_offset, + "{} reader changed final offset", + core::any::type_name::(), + ) + } + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/empty.rs b/quic/s2n-quic-core/src/buffer/reader/empty.rs new file mode 100644 index 0000000000..685f78f129 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/empty.rs @@ -0,0 +1,77 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{storage, Reader, Storage, VarInt}; + +/// Returns an empty buffer for the current offset of an inner reader +#[derive(Debug)] +pub struct Empty<'a, R: Reader + ?Sized>(&'a R); + +impl<'a, R: Reader + ?Sized> Empty<'a, R> { + #[inline] + pub fn new(reader: &'a R) -> Self { + Self(reader) + } +} + +impl<'a, R: Reader + ?Sized> Storage for Empty<'a, R> { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + 0 + } + + #[inline] + fn read_chunk(&mut self, _watermark: usize) -> Result { + Ok(storage::Chunk::empty()) + } + + #[inline] + fn partial_copy_into(&mut self, _dest: &mut Dest) -> Result + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + Ok(storage::Chunk::empty()) + } +} + +impl<'a, R: Reader + ?Sized> Reader for Empty<'a, R> { + #[inline] + fn current_offset(&self) -> VarInt { + self.0.current_offset() + } + + #[inline] + fn final_offset(&self) -> Option { + self.0.final_offset() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::stream::testing::Data; + + #[test] + fn empty_test() { + let mut reader = Data::new(1000); + assert_eq!(reader.buffered_len(), 1000); + + { + assert_eq!(reader.with_empty_buffer().buffered_len(), 0); + } + + let mut dest = &mut [0u8; 16][..]; + let chunk = reader.partial_copy_into(&mut dest).unwrap(); + assert_eq!(chunk.len(), 16); + + let mut reader = reader.with_empty_buffer(); + + assert_eq!(reader.buffered_len(), 0); + assert!(reader.buffer_is_empty()); + + let chunk = reader.partial_copy_into(&mut dest).unwrap(); + assert_eq!(chunk.len(), 0); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/incremental.rs b/quic/s2n-quic-core/src/buffer/reader/incremental.rs new file mode 100644 index 0000000000..5e60022011 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/incremental.rs @@ -0,0 +1,179 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{storage, Reader, Storage, VarInt}; +use crate::{buffer, ensure}; + +/// Implements an incremental reader that handles temporary chunks as the stream data +/// +/// This is useful for scenarios where the the stream isn't completely buffered in memory and +/// chunks come in gradually. +#[derive(Debug, Default)] +pub struct Incremental { + current_offset: VarInt, + final_offset: Option, +} + +impl Incremental { + #[inline] + pub fn with_chunk<'a, C: Storage>( + &'a mut self, + chunk: &'a mut C, + is_fin: bool, + ) -> Result, buffer::Error> { + let mut chunk = WithChunk { + incremental: self, + chunk, + }; + + if is_fin { + chunk.set_fin()?; + } else { + ensure!( + chunk.incremental.final_offset.is_none(), + Err(buffer::Error::InvalidFin) + ); + } + + Ok(chunk) + } +} + +impl Storage for Incremental { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + 0 + } + + #[inline] + fn read_chunk(&mut self, _watermark: usize) -> Result { + Ok(Default::default()) + } + + #[inline] + fn partial_copy_into(&mut self, _dest: &mut Dest) -> Result + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + Ok(Default::default()) + } +} + +impl Reader for Incremental { + #[inline] + fn current_offset(&self) -> VarInt { + self.current_offset + } + + #[inline] + fn final_offset(&self) -> Option { + self.final_offset + } +} + +pub struct WithChunk<'a, C: Storage> { + incremental: &'a mut Incremental, + chunk: &'a mut C, +} + +impl<'a, C: Storage> WithChunk<'a, C> { + #[inline] + pub fn set_fin(&mut self) -> Result<&mut Self, buffer::Error> { + let final_offset = self + .incremental + .current_offset + .checked_add_usize(self.buffered_len()) + .ok_or(buffer::Error::OutOfRange)?; + + // make sure the final length doesn't change + if let Some(current) = self.incremental.final_offset { + ensure!(final_offset == current, Err(buffer::Error::InvalidFin)); + } + + self.incremental.final_offset = Some(final_offset); + + Ok(self) + } +} + +impl<'a, C: Storage> Storage for WithChunk<'a, C> { + type Error = C::Error; + + #[inline] + fn buffered_len(&self) -> usize { + self.chunk.buffered_len() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + let chunk = self.chunk.read_chunk(watermark)?; + self.incremental.current_offset += chunk.len(); + Ok(chunk) + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + let len = self.buffered_len().min(dest.remaining_capacity()); + let chunk = self.chunk.partial_copy_into(dest)?; + self.incremental.current_offset += len; + Ok(chunk) + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + let len = self.buffered_len().min(dest.remaining_capacity()); + self.chunk.copy_into(dest)?; + self.incremental.current_offset += len; + Ok(()) + } +} + +impl<'a, C: Storage> Reader for WithChunk<'a, C> { + #[inline] + fn current_offset(&self) -> VarInt { + self.incremental.current_offset() + } + + #[inline] + fn final_offset(&self) -> Option { + self.incremental.final_offset() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn incremental_test() { + let mut incremental = Incremental::default(); + + assert_eq!(incremental.current_offset(), VarInt::ZERO); + assert_eq!(incremental.final_offset, None); + assert_eq!(incremental.buffered_len(), 0); + + { + let mut chunk: &[u8] = &[1, 2, 3, 4]; + let mut with_chunk = incremental.with_chunk(&mut chunk, false).unwrap(); + + assert_eq!(with_chunk.buffered_len(), 4); + + let mut dest: &mut [u8] = &mut [0; 4]; + let trailing_chunk = with_chunk.partial_copy_into(&mut dest).unwrap(); + assert_eq!(&*trailing_chunk, &[1, 2, 3, 4]); + + assert_eq!(with_chunk.buffered_len(), 0); + } + + assert_eq!(incremental.current_offset(), VarInt::from_u8(4)); + assert_eq!(incremental.buffered_len(), 0); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/limit.rs b/quic/s2n-quic-core/src/buffer/reader/limit.rs new file mode 100644 index 0000000000..3b22cd2abc --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/limit.rs @@ -0,0 +1,115 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{storage, Reader, Storage, VarInt}; +use crate::buffer::writer::Storage as _; + +/// Wraps a reader and limits the amount of that can be read from it +/// +/// This can be used for applying back pressure to the reader with flow control. +pub struct Limit<'a, R: Reader + ?Sized> { + buffered_len: usize, + reader: &'a mut R, +} + +impl<'a, R: Reader + ?Sized> Limit<'a, R> { + #[inline] + pub fn new(reader: &'a mut R, max_buffered_len: usize) -> Self { + let buffered_len = max_buffered_len.min(reader.buffered_len()); + + Self { + buffered_len, + reader, + } + } +} + +impl<'a, R: Reader + ?Sized> Storage for Limit<'a, R> { + type Error = R::Error; + + #[inline] + fn buffered_len(&self) -> usize { + self.buffered_len + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + let watermark = self.buffered_len.min(watermark); + let chunk = self.reader.read_chunk(watermark)?; + unsafe { + assume!(chunk.len() <= self.buffered_len); + } + self.buffered_len -= chunk.len(); + Ok(chunk) + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + let mut dest = dest.limit(self.buffered_len); + let len = dest.remaining_capacity(); + let chunk = self.reader.partial_copy_into(&mut dest)?; + unsafe { + assume!(len <= self.buffered_len); + } + self.buffered_len -= len; + Ok(chunk) + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + let mut dest = dest.limit(self.buffered_len); + let len = dest.remaining_capacity(); + self.reader.copy_into(&mut dest)?; + unsafe { + assume!(len <= self.buffered_len); + } + self.buffered_len -= len; + Ok(()) + } +} + +impl<'a, R: Reader + ?Sized> Reader for Limit<'a, R> { + #[inline] + fn current_offset(&self) -> VarInt { + self.reader.current_offset() + } + + #[inline] + fn final_offset(&self) -> Option { + self.reader.final_offset() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::stream::testing::Data; + + #[test] + fn max_data_test() { + let mut reader = Data::new(1000); + assert_eq!(reader.buffered_len(), 1000); + + let max_data = 32usize; + + let mut reader = reader.with_max_data(VarInt::from_u8(max_data as _)); + assert_eq!(reader.buffered_len(), max_data); + + let mut dest = &mut [0u8; 16][..]; + let chunk = reader.partial_copy_into(&mut dest).unwrap(); + assert_eq!(chunk.len(), 16); + + assert_eq!(reader.buffered_len(), max_data - 16); + + let mut dest = &mut [0u8; 16][..]; + let chunk = reader.partial_copy_into(&mut dest).unwrap(); + assert_eq!(chunk.len(), 16); + assert!(reader.buffer_is_empty()); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/slice.rs b/quic/s2n-quic-core/src/buffer/reader/slice.rs new file mode 100644 index 0000000000..bdcc64da3a --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/slice.rs @@ -0,0 +1,114 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{storage, Reader, Storage, VarInt}; +use crate::buffer; + +/// Wraps a single slice as a reader. +/// +/// This can be used for scenarios where the entire stream is buffered and known up-front. +#[derive(Debug)] +pub struct Slice<'a, C> { + chunk: &'a mut C, + current_offset: VarInt, + final_offset: VarInt, +} + +impl<'a, C> Slice<'a, C> +where + C: Storage, +{ + #[inline] + pub fn new(chunk: &'a mut C) -> Result { + let final_offset = VarInt::try_from(chunk.buffered_len()) + .ok() + .ok_or(buffer::Error::OutOfRange)?; + Ok(Self { + chunk, + current_offset: VarInt::ZERO, + final_offset, + }) + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.chunk.buffer_is_empty() + } +} + +impl<'a, C> Storage for Slice<'a, C> +where + C: Storage, +{ + type Error = C::Error; + + #[inline] + fn buffered_len(&self) -> usize { + self.chunk.buffered_len() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + let chunk = self.chunk.read_chunk(watermark)?; + self.current_offset += chunk.len(); + Ok(chunk) + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + let len = self.buffered_len().min(dest.remaining_capacity()); + let chunk = self.chunk.partial_copy_into(dest)?; + self.current_offset += len; + Ok(chunk) + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + let len = self.buffered_len().min(dest.remaining_capacity()); + self.chunk.copy_into(dest)?; + self.current_offset += len; + Ok(()) + } +} + +impl<'a, C> Reader for Slice<'a, C> +where + C: Storage, +{ + #[inline] + fn current_offset(&self) -> VarInt { + self.current_offset + } + + #[inline] + fn final_offset(&self) -> Option { + Some(self.final_offset) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn slice_test() { + let mut storage: &[u8] = &[1, 2, 3, 4]; + let mut reader = Slice::new(&mut storage).unwrap(); + + assert_eq!(reader.current_offset(), VarInt::ZERO); + assert_eq!(reader.final_offset(), Some(VarInt::from_u8(4))); + + let mut dest: &mut [u8] = &mut [0; 4]; + let chunk = reader.partial_copy_into(&mut dest).unwrap(); + assert_eq!(&*chunk, &[1, 2, 3, 4]); + + assert_eq!(reader.current_offset(), VarInt::from_u8(4)); + assert!(reader.buffer_is_empty()); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/storage.rs b/quic/s2n-quic-core/src/buffer/reader/storage.rs new file mode 100644 index 0000000000..17954a01eb --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/storage.rs @@ -0,0 +1,66 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +mod buf; +mod bytes; +mod chunk; +mod full_copy; +mod infallible; +mod io_slice; +mod slice; + +#[cfg(test)] +mod tests; + +pub use buf::Buf; +pub use chunk::Chunk; +pub use full_copy::FullCopy; +pub use infallible::Infallible; +pub use io_slice::IoSlice; + +pub trait Storage { + type Error; + + /// Returns the length of the chunk + fn buffered_len(&self) -> usize; + + /// Returns if the chunk is empty + #[inline] + fn buffer_is_empty(&self) -> bool { + self.buffered_len() == 0 + } + + /// Reads the current contiguous chunk + fn read_chunk(&mut self, watermark: usize) -> Result, Self::Error>; + + /// Copies the reader into `dest`, with a trailing chunk of bytes. + /// + /// Implementations should either fill the `dest` completely or exhaust the buffered data. + /// + /// The storage may optionally return a `Chunk`, which can be used by the caller to defer + /// copying the trailing chunk until later. + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result, Self::Error> + where + Dest: crate::buffer::writer::Storage + ?Sized; + + /// Copies the reader into `dest`. + /// + /// Implementations should either fill the `dest` completely or exhaust the buffered data. + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + let mut chunk = self.partial_copy_into(dest)?; + chunk.infallible_copy_into(dest); + Ok(()) + } + + /// Forces the entire reader to be copied, even when calling `partial_copy_into`. + /// + /// The returned `Chunk` from `partial_copy_into` will always be empty. + #[inline] + fn full_copy(&mut self) -> FullCopy { + FullCopy::new(self) + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/storage/buf.rs b/quic/s2n-quic-core/src/buffer/reader/storage/buf.rs new file mode 100644 index 0000000000..4c21eb46f9 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/storage/buf.rs @@ -0,0 +1,129 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Chunk, Storage}; +use crate::assume; +use core::cmp::Ordering; + +pub struct Buf<'a, B: bytes::Buf> { + buf: &'a mut B, + pending: usize, +} + +impl<'a, B> Buf<'a, B> +where + B: bytes::Buf, +{ + #[inline] + pub fn new(buf: &'a mut B) -> Self { + Self { buf, pending: 0 } + } + + #[inline] + fn commit_pending(&mut self) { + if self.pending > 0 { + unsafe { + assume!(self.buf.remaining() >= self.pending); + assume!(self.buf.chunk().len() >= self.pending); + } + self.buf.advance(self.pending); + self.pending = 0; + } + } +} + +impl<'a, B> Storage for Buf<'a, B> +where + B: bytes::Buf, +{ + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + unsafe { + assume!(self.buf.remaining() >= self.pending); + assume!(self.buf.chunk().len() >= self.pending); + } + self.buf.remaining() - self.pending + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + self.commit_pending(); + let chunk = self.buf.chunk(); + let len = chunk.len().min(watermark); + self.pending = len; + Ok(chunk[..len].into()) + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + self.commit_pending(); + + ensure!(dest.has_remaining_capacity(), Ok(Chunk::empty())); + + loop { + let chunk_len = self.buf.chunk().len(); + + if chunk_len == 0 { + debug_assert_eq!( + self.buf.remaining(), + 0, + "buf returned empty chunk with remaining bytes" + ); + return Ok(Chunk::empty()); + } + + match chunk_len.cmp(&dest.remaining_capacity()) { + // if there's more chunks left, then copy this one out and keep going + Ordering::Less if self.buf.remaining() > chunk_len => { + dest.put_slice(self.buf.chunk()); + self.buf.advance(chunk_len); + continue; + } + Ordering::Less | Ordering::Equal => { + let chunk = self.buf.chunk(); + self.pending = chunk.len(); + return Ok(chunk.into()); + } + Ordering::Greater => { + let len = dest.remaining_capacity(); + let chunk = &self.buf.chunk()[..len]; + self.pending = len; + return Ok(chunk.into()); + } + } + } + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + self.commit_pending(); + + loop { + let chunk = self.buf.chunk(); + let len = chunk.len().min(dest.remaining_capacity()); + + ensure!(len > 0, Ok(())); + + dest.put_slice(&chunk[..len]); + self.buf.advance(len); + } + } +} + +impl<'a, B> Drop for Buf<'a, B> +where + B: bytes::Buf, +{ + #[inline] + fn drop(&mut self) { + self.commit_pending(); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/storage/bytes.rs b/quic/s2n-quic-core/src/buffer/reader/storage/bytes.rs new file mode 100644 index 0000000000..3ef68439fc --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/storage/bytes.rs @@ -0,0 +1,94 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Chunk, Storage}; +use bytes::{Bytes, BytesMut}; + +impl Storage for BytesMut { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + let len = self.len().min(watermark); + Ok(self.split_to(len).into()) + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + self.read_chunk(dest.remaining_capacity()) + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + let watermark = self.len().min(dest.remaining_capacity()); + + if Dest::SPECIALIZES_BYTES_MUT { + let buffer = self.split_to(watermark); + dest.put_bytes_mut(buffer); + } else if Dest::SPECIALIZES_BYTES { + let buffer = self.split_to(watermark); + dest.put_bytes(buffer.freeze()); + } else { + // copy bytes into the destination buf + dest.put_slice(&self[..watermark]); + // advance the chunk rather than splitting to avoid refcount churn + bytes::Buf::advance(self, watermark) + } + + Ok(()) + } +} + +impl Storage for Bytes { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + let len = self.len().min(watermark); + Ok(self.split_to(len).into()) + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + self.read_chunk(dest.remaining_capacity()) + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + let watermark = self.len().min(dest.remaining_capacity()); + + if Dest::SPECIALIZES_BYTES { + let buffer = self.split_to(watermark); + dest.put_bytes(buffer); + } else { + // copy bytes into the destination buf + dest.put_slice(&self[..watermark]); + // advance the chunk rather than splitting to avoid refcount churn + bytes::Buf::advance(self, watermark) + } + + Ok(()) + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/storage/chunk.rs b/quic/s2n-quic-core/src/buffer/reader/storage/chunk.rs new file mode 100644 index 0000000000..1a001cdf22 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/storage/chunk.rs @@ -0,0 +1,105 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use bytes::{Bytes, BytesMut}; + +/// Concrete chunk of bytes +/// +/// This can be returned to allow the caller to defer copying the data until later. +#[derive(Clone, Debug)] +#[must_use = "trailing data cannot be discarded"] +pub enum Chunk<'a> { + Slice(&'a [u8]), + Bytes(Bytes), + BytesMut(BytesMut), +} + +impl<'a> Default for Chunk<'a> { + #[inline] + fn default() -> Self { + Self::empty() + } +} + +impl<'a> Chunk<'a> { + #[inline] + pub fn empty() -> Self { + Self::Slice(&[]) + } +} + +impl<'a> From<&'a [u8]> for Chunk<'a> { + #[inline] + fn from(chunk: &'a [u8]) -> Self { + Self::Slice(chunk) + } +} + +impl<'a> From for Chunk<'a> { + #[inline] + fn from(chunk: Bytes) -> Self { + Self::Bytes(chunk) + } +} + +impl<'a> From for Chunk<'a> { + #[inline] + fn from(chunk: BytesMut) -> Self { + Self::BytesMut(chunk) + } +} + +impl<'a> core::ops::Deref for Chunk<'a> { + type Target = [u8]; + + #[inline] + fn deref(&self) -> &Self::Target { + match self { + Self::Slice(t) => t, + Self::Bytes(t) => t, + Self::BytesMut(t) => t, + } + } +} + +impl<'a> super::Storage for Chunk<'a> { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + match self { + Self::Slice(v) => v.read_chunk(watermark), + Self::Bytes(v) => v.read_chunk(watermark), + Self::BytesMut(v) => v.read_chunk(watermark), + } + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + match self { + Self::Slice(v) => v.partial_copy_into(dest), + Self::Bytes(v) => v.partial_copy_into(dest), + Self::BytesMut(v) => v.partial_copy_into(dest), + } + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + match self { + Self::Slice(v) => v.copy_into(dest), + Self::Bytes(v) => v.copy_into(dest), + Self::BytesMut(v) => v.copy_into(dest), + } + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/storage/full_copy.rs b/quic/s2n-quic-core/src/buffer/reader/storage/full_copy.rs new file mode 100644 index 0000000000..5cf06e9788 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/storage/full_copy.rs @@ -0,0 +1,51 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Chunk, Storage}; + +#[derive(Debug)] +pub struct FullCopy<'a, C: Storage + ?Sized>(&'a mut C); + +impl<'a, C: Storage + ?Sized> FullCopy<'a, C> { + #[inline] + pub fn new(chunk: &'a mut C) -> Self { + Self(chunk) + } +} + +impl<'a, C: Storage + ?Sized> Storage for FullCopy<'a, C> { + type Error = C::Error; + + #[inline] + fn buffered_len(&self) -> usize { + self.0.buffered_len() + } + + #[inline] + fn buffer_is_empty(&self) -> bool { + self.0.buffer_is_empty() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + self.0.read_chunk(watermark) + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + // force the full copy + self.0.copy_into(dest)?; + Ok(Chunk::empty()) + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + self.0.copy_into(dest) + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/storage/infallible.rs b/quic/s2n-quic-core/src/buffer/reader/storage/infallible.rs new file mode 100644 index 0000000000..a60e94bcf3 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/storage/infallible.rs @@ -0,0 +1,32 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Chunk, Storage}; + +// unwrapping an infallible error doesn't panic +// https://godbolt.org/z/7v5MWdvGa + +pub trait Infallible: Storage { + #[inline(always)] + fn infallible_read_chunk(&mut self, watermark: usize) -> Chunk<'_> { + self.read_chunk(watermark).unwrap() + } + + #[inline(always)] + fn infallible_partial_copy_into(&mut self, dest: &mut Dest) -> Chunk<'_> + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + self.partial_copy_into(dest).unwrap() + } + + #[inline] + fn infallible_copy_into(&mut self, dest: &mut Dest) + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + self.copy_into(dest).unwrap() + } +} + +impl Infallible for T where T: Storage + ?Sized {} diff --git a/quic/s2n-quic-core/src/buffer/reader/storage/io_slice.rs b/quic/s2n-quic-core/src/buffer/reader/storage/io_slice.rs new file mode 100644 index 0000000000..a7f16aafbf --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/storage/io_slice.rs @@ -0,0 +1,228 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Chunk, Storage}; +use crate::assume; +use core::cmp::Ordering; + +pub struct IoSlice<'a, T> { + len: usize, + head: &'a [u8], + buf: &'a [T], +} + +impl<'a, T> IoSlice<'a, T> +where + T: core::ops::Deref, +{ + #[inline] + pub fn new(buf: &'a [T]) -> Self { + let mut len = 0; + + let mut first_non_empty = None; + for (idx, buf) in buf.iter().enumerate() { + len += buf.len(); + if !buf.is_empty() && first_non_empty.is_none() { + first_non_empty = Some(idx); + } + } + + if let Some(idx) = first_non_empty { + let buf = &buf[idx..]; + + unsafe { + assume!(!buf.is_empty()); + } + + let mut slice = Self { + len, + head: &[], + buf, + }; + slice.advance_buf_once(); + slice + } else { + Self { + len: 0, + head: &[], + buf: &[], + } + } + } + + #[inline(always)] + fn advance_buf(&mut self) { + while self.head.is_empty() && !self.buf.is_empty() { + self.advance_buf_once(); + } + } + + #[inline(always)] + fn advance_buf_once(&mut self) { + let (head, tail) = self.buf.split_at(1); + self.head = &head[0][..]; + self.buf = tail; + } + + #[inline] + fn sub_len(&mut self, len: usize) { + unsafe { + assume!(self.len >= len); + } + self.set_len(self.len - len); + } + + #[inline] + fn set_len(&mut self, len: usize) { + if cfg!(debug_assertions) { + let mut computed = self.head.len(); + for buf in self.buf.iter() { + computed += buf.len(); + } + assert_eq!(len, computed); + } + self.len = len; + } +} + +impl<'a, T> bytes::Buf for IoSlice<'a, T> +where + T: core::ops::Deref, +{ + #[inline] + fn remaining(&self) -> usize { + self.len + } + + #[inline] + fn chunk(&self) -> &[u8] { + self.head + } + + #[inline] + fn advance(&mut self, mut cnt: usize) { + assert!(cnt <= self.len); + let new_len = self.len - cnt; + + if new_len == 0 { + self.head = &[]; + self.buf = &[]; + self.set_len(new_len); + return; + } + + while cnt > 0 { + let len = self.head.len().min(cnt); + cnt -= len; + + if len >= self.head.len() { + unsafe { + assume!(!self.buf.is_empty()); + } + + self.head = &[]; + self.advance_buf(); + continue; + } + + self.head = &self.head[len..]; + break; + } + + self.set_len(new_len); + } +} + +impl<'a, T> Storage for IoSlice<'a, T> +where + T: core::ops::Deref, +{ + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + // we only have one chunk left so do the happy path + if self.buf.is_empty() { + let len = self.head.len().min(watermark); + let (head, tail) = self.head.split_at(len); + self.head = tail; + self.set_len(tail.len()); + return Ok(head.into()); + } + + // head can be returned and we need to take the next buf entry + if self.head.len() >= watermark { + let head = self.head; + self.head = &[]; + unsafe { + assume!(!self.buf.is_empty()); + } + self.advance_buf(); + self.sub_len(head.len()); + return Ok(head.into()); + } + + // we just need to split off the current head and return it + let (head, tail) = self.head.split_at(watermark); + self.head = tail; + self.sub_len(head.len()); + Ok(head.into()) + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + ensure!(dest.has_remaining_capacity(), Ok(Default::default())); + + loop { + // we only have one chunk left so do the happy path + if self.buf.is_empty() { + let len = self.head.len().min(dest.remaining_capacity()); + let (head, tail) = self.head.split_at(len); + self.head = tail; + self.set_len(tail.len()); + return Ok(head.into()); + } + + match self.head.len().cmp(&dest.remaining_capacity()) { + // head needs to be copied into dest and we need to take the next buf entry + Ordering::Less => { + let len = self.head.len(); + dest.put_slice(self.head); + self.head = &[]; + unsafe { + assume!(!self.buf.is_empty()); + } + self.advance_buf(); + self.sub_len(len); + continue; + } + // head can be returned and we need to take the next buf entry + Ordering::Equal => { + let head = self.head; + self.head = &[]; + unsafe { + assume!(!self.buf.is_empty()); + } + self.advance_buf(); + self.sub_len(head.len()); + return Ok(head.into()); + } + // we just need to split off the current head and return it + Ordering::Greater => { + let (head, tail) = self.head.split_at(dest.remaining_capacity()); + self.head = tail; + self.sub_len(head.len()); + return Ok(head.into()); + } + } + } + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/storage/slice.rs b/quic/s2n-quic-core/src/buffer/reader/storage/slice.rs new file mode 100644 index 0000000000..7c0bda6793 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/storage/slice.rs @@ -0,0 +1,53 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Chunk, Storage}; + +macro_rules! impl_slice { + ($ty:ty, $split:ident) => { + impl Storage for $ty { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len() + } + + #[inline] + fn buffer_is_empty(&self) -> bool { + self.is_empty() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + let len = self.len().min(watermark); + let (chunk, remaining) = core::mem::take(self).$split(len); + *self = remaining; + Ok((&*chunk).into()) + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + self.read_chunk(dest.remaining_capacity()) + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + let len = self.len().min(dest.remaining_capacity()); + let (chunk, remaining) = core::mem::take(self).$split(len); + *self = remaining; + dest.put_slice(chunk); + Ok(()) + } + } + }; +} + +impl_slice!(&[u8], split_at); +impl_slice!(&mut [u8], split_at_mut); diff --git a/quic/s2n-quic-core/src/buffer/reader/storage/tests.rs b/quic/s2n-quic-core/src/buffer/reader/storage/tests.rs new file mode 100644 index 0000000000..e52a6ec209 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/storage/tests.rs @@ -0,0 +1,157 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::*; + +/// ensures each implementation returns a trailing chunk correctly +#[test] +#[cfg_attr(miri, ignore)] // This test is too expensive for miri to complete in a reasonable amount of time +fn trailing_chunk_test() { + let mut dest = vec![]; + let mut source = vec![]; + bolero::check!() + .with_type::<(u16, u16)>() + .for_each(|(dest_len, source_len)| { + source.resize(*source_len as usize, 42); + + let dest_len = (*source_len).min(*dest_len) as usize; + dest.resize(dest_len, 0); + let expected = &source[..dest_len]; + let dest = &mut dest[..]; + + // direct implementation + { + let mut reader: &[u8] = &source[..]; + let mut target = &mut dest[..]; + + let chunk = reader.partial_copy_into(&mut target).unwrap(); + assert_eq!(expected, &*chunk); + assert!( + dest.iter().all(|b| *b == 0), + "no bytes should be copied into dest" + ); + } + + // IoSlice implementation + { + let io_slice = [&source[..]]; + let mut reader = IoSlice::new(&io_slice); + let mut target = &mut dest[..]; + + let chunk = reader.partial_copy_into(&mut target).unwrap(); + assert_eq!(expected, &*chunk); + assert!( + dest.iter().all(|b| *b == 0), + "no bytes should be copied into dest" + ); + } + + // Buf implementation + { + let mut slice = &source[..]; + let mut reader = Buf::new(&mut slice); + let mut target = &mut dest[..]; + + let chunk = reader.partial_copy_into(&mut target).unwrap(); + assert_eq!(expected, &*chunk); + assert!( + dest.iter().all(|b| *b == 0), + "no bytes should be copied into dest" + ); + } + + // full_copy + { + let mut source = &source[..]; + let mut reader = source.full_copy(); + let mut target = &mut dest[..]; + + let chunk = reader.partial_copy_into(&mut target).unwrap(); + assert!(chunk.is_empty()); + assert_eq!(expected, dest); + dest.fill(0); + } + }); +} + +/// ensures each storage type correctly copies multiple chunks into the destination +#[test] +#[cfg_attr(miri, ignore)] // This test is too expensive for miri to complete in a reasonable amount of time +fn io_slice_test() { + let mut dest = vec![]; + let mut source: Vec> = vec![]; + let mut pool = vec![]; + let mut expected = vec![]; + bolero::check!() + .with_type::<(u16, Vec)>() + .for_each(|(dest_len, source_lens)| { + while source.len() > source_lens.len() { + pool.push(source.pop().unwrap()); + } + + while source.len() < source_lens.len() { + source.push(pool.pop().unwrap_or_default()); + } + + let mut source_len = 0; + let mut last_chunk_idx = 0; + let mut last_chunk_len = 0; + let mut remaining_len = *dest_len as usize; + for (idx, (len, source)) in source_lens.iter().zip(&mut source).enumerate() { + let fill = (idx + 1) as u8; + let len = *len as usize; + source.resize(len, fill); + source.fill(fill); + if len > 0 && remaining_len > 0 { + last_chunk_idx = idx; + last_chunk_len = len.min(remaining_len); + } + source_len += len; + remaining_len = remaining_len.saturating_sub(len); + } + + let dest_len = source_len.min(*dest_len as usize); + dest.resize(dest_len, 0); + dest.fill(0); + let dest = &mut dest[..]; + + expected.resize(dest_len, 0); + expected.fill(0); + + { + // don't copy the last chunk, since that should be returned + let source = &source[..last_chunk_idx]; + crate::slice::vectored_copy(source, &mut [&mut expected[..]]); + } + + let expected_chunk = source + .get(last_chunk_idx) + .map(|v| &v[..last_chunk_len]) + .unwrap_or(&[]); + + // IoSlice implementation + { + let mut source = IoSlice::new(&source); + let mut target = &mut dest[..]; + + let chunk = source.partial_copy_into(&mut target).unwrap(); + + assert_eq!(expected, dest); + assert_eq!(expected_chunk, &*chunk); + // reset the destination + dest.fill(0); + } + + // Buf implementation + { + let mut source = IoSlice::new(&source); + let mut source = Buf::new(&mut source); + let mut target = &mut dest[..]; + + let chunk = source.partial_copy_into(&mut target).unwrap(); + + assert_eq!(expected, dest); + assert_eq!(expected_chunk, &*chunk); + } + }); +} diff --git a/quic/s2n-quic-core/src/buffer/receive_buffer.rs b/quic/s2n-quic-core/src/buffer/reassembler.rs similarity index 72% rename from quic/s2n-quic-core/src/buffer/receive_buffer.rs rename to quic/s2n-quic-core/src/buffer/reassembler.rs index dcb86b5acc..cba2df722a 100644 --- a/quic/s2n-quic-core/src/buffer/receive_buffer.rs +++ b/quic/s2n-quic-core/src/buffer/reassembler.rs @@ -1,13 +1,15 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -//! This module contains data structures for buffering incoming and outgoing data -//! in Quic streams. +//! This module contains data structures for buffering incoming streams. -use crate::varint::VarInt; +use super::Error; +use crate::{ + buffer::{self, reader::storage::Infallible as _}, + varint::VarInt, +}; use alloc::collections::{vec_deque, VecDeque}; use bytes::BytesMut; -use core::fmt; mod probe; mod request; @@ -19,31 +21,7 @@ mod tests; use request::Request; use slot::Slot; -/// Enumerates error that can occur while inserting data into the Receive Buffer -#[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub enum ReceiveBufferError { - /// An invalid data range was provided - OutOfRange, - /// The provided final size was invalid for the buffer's state - InvalidFin, -} - -#[cfg(feature = "std")] -impl std::error::Error for ReceiveBufferError {} - -impl fmt::Display for ReceiveBufferError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Self::OutOfRange => write!(f, "write extends out of the maximum possible offset"), - Self::InvalidFin => write!( - f, - "write modifies the final offset in a non-compliant manner" - ), - } - } -} - -/// The default buffer size for slots that the [`ReceiveBuffer`] uses. +/// The default buffer size for slots that the [`Reassembler`] uses. /// /// This value was picked as it is typically used for the default memory page size. const MIN_BUFFER_ALLOCATION_SIZE: usize = 4096; @@ -58,14 +36,14 @@ const UNKNOWN_FINAL_SIZE: u64 = u64::MAX; //# Endpoints MUST be able to deliver stream data to an application as an //# ordered byte-stream. -/// `ReceiveBuffer` is a buffer structure for combining chunks of bytes in an +/// `Reassembler` is a buffer structure for combining chunks of bytes in an /// ordered stream, which might arrive out of order. /// -/// `ReceiveBuffer` will accumulate the bytes, and provide them to its users +/// `Reassembler` will accumulate the bytes, and provide them to its users /// once a contiguous range of bytes at the current position of the stream has /// been accumulated. /// -/// `ReceiveBuffer` is optimized for minimizing memory allocations and for +/// `Reassembler` is optimized for minimizing memory allocations and for /// offering it's users chunks of sizes that minimize call overhead. /// /// If data is received in smaller chunks, only the first chunk will trigger a @@ -80,10 +58,10 @@ const UNKNOWN_FINAL_SIZE: u64 = u64::MAX; /// /// ## Usage /// -/// ```rust,ignore -/// use s2n_quic_transport::buffer::ReceiveBuffer; +/// ```rust +/// use s2n_quic_core::buffer::Reassembler; /// -/// let mut buffer = ReceiveBuffer::new(); +/// let mut buffer = Reassembler::new(); /// /// // write a chunk of bytes at offset 4, which can not be consumed yet /// assert!(buffer.write_at(4u32.into(), &[4, 5, 6, 7]).is_ok()); @@ -99,23 +77,23 @@ const UNKNOWN_FINAL_SIZE: u64 = u64::MAX; /// assert_eq!(&[0u8, 1, 2, 3, 4, 5, 6, 7], &buffer.pop().unwrap()[..]); /// ``` #[derive(Debug, PartialEq)] -pub struct ReceiveBuffer { +pub struct Reassembler { slots: VecDeque, start_offset: u64, max_recv_offset: u64, final_offset: u64, } -impl Default for ReceiveBuffer { +impl Default for Reassembler { fn default() -> Self { Self::new() } } -impl ReceiveBuffer { - /// Creates a new `ReceiveBuffer` - pub fn new() -> ReceiveBuffer { - ReceiveBuffer { +impl Reassembler { + /// Creates a new `Reassembler` + pub fn new() -> Reassembler { + Reassembler { slots: VecDeque::new(), start_offset: 0, max_recv_offset: 0, @@ -179,7 +157,7 @@ impl ReceiveBuffer { /// Pushes a slice at a certain offset #[inline] - pub fn write_at(&mut self, offset: VarInt, data: &[u8]) -> Result<(), ReceiveBufferError> { + pub fn write_at(&mut self, offset: VarInt, data: &[u8]) -> Result<(), Error> { // create a request let request = Request::new(offset, data)?; self.write_request(request)?; @@ -188,7 +166,7 @@ impl ReceiveBuffer { /// Pushes a slice at a certain offset, which is the end of the buffer #[inline] - pub fn write_at_fin(&mut self, offset: VarInt, data: &[u8]) -> Result<(), ReceiveBufferError> { + pub fn write_at_fin(&mut self, offset: VarInt, data: &[u8]) -> Result<(), Error> { // create a request let request = Request::new(offset, data)?; @@ -203,17 +181,11 @@ impl ReceiveBuffer { //# of type FINAL_SIZE_ERROR; see Section 11 for details on error //# handling. if let Some(final_size) = self.final_size() { - ensure!( - final_size == final_offset, - Err(ReceiveBufferError::InvalidFin) - ); + ensure!(final_size == final_offset, Err(Error::InvalidFin)); } // make sure that we didn't see any previous chunks greater than the final size - ensure!( - self.max_recv_offset <= final_offset, - Err(ReceiveBufferError::InvalidFin) - ); + ensure!(self.max_recv_offset <= final_offset, Err(Error::InvalidFin)); self.final_offset = final_offset; @@ -223,7 +195,7 @@ impl ReceiveBuffer { } #[inline] - fn write_request(&mut self, request: Request) -> Result<(), ReceiveBufferError> { + fn write_request(&mut self, request: Request) -> Result<(), Error> { // trim off any data that we've already read let (_, request) = request.split(self.start_offset); // trim off any data that exceeds our final length @@ -236,7 +208,7 @@ impl ReceiveBuffer { //# final size for the stream, an endpoint SHOULD respond with an error //# of type FINAL_SIZE_ERROR; see Section 11 for details on error //# handling. - ensure!(excess.is_empty(), Err(ReceiveBufferError::InvalidFin)); + ensure!(excess.is_empty(), Err(Error::InvalidFin)); // if the request is empty we're done ensure!(!request.is_empty(), Ok(())); @@ -300,20 +272,17 @@ impl ReceiveBuffer { /// This can be used for copy-avoidance applications where a packet is received in order and /// doesn't need to be stored temporarily for future packets to unblock the stream. #[inline] - pub fn skip(&mut self, len: usize) -> Result<(), ReceiveBufferError> { + pub fn skip(&mut self, len: VarInt) -> Result<(), Error> { // zero-length skip is a no-op - ensure!(len > 0, Ok(())); + ensure!(len > VarInt::ZERO, Ok(())); let new_start_offset = self .start_offset - .checked_add(len as u64) - .ok_or(ReceiveBufferError::OutOfRange)?; + .checked_add(len.as_u64()) + .ok_or(Error::OutOfRange)?; if let Some(final_size) = self.final_size() { - ensure!( - final_size >= new_start_offset, - Err(ReceiveBufferError::InvalidFin) - ); + ensure!(final_size >= new_start_offset, Err(Error::InvalidFin)); } // record the maximum offset that we've seen @@ -398,48 +367,6 @@ impl ReceiveBuffer { }) } - /// Copies all the available buffered data into the provided `buf`'s remaining capacity. - /// - /// This method is slightly more efficient than [`Self::pop`] when the caller ends up copying - /// the buffered data into another slice, since it avoids a refcount increment/decrement on - /// the contained [`BytesMut`]. - /// - /// The total number of bytes copied is returned. - #[inline] - pub fn copy_into_buf(&mut self, buf: &mut B) -> usize { - use bytes::Buf; - - let mut total = 0; - - loop { - let remaining = buf.remaining_mut(); - // ensure we have enough capacity in the destination buf - ensure!(remaining > 0, total); - - let transform = |buffer: &mut BytesMut, is_final_offset| { - let watermark = buffer.len().min(remaining); - - // copy bytes into the destination buf - buf.put_slice(&buffer[..watermark]); - // advance the chunk rather than splitting to avoid refcount churn - buffer.advance(watermark); - total += watermark; - - (is_final_offset, watermark) - }; - - match self.pop_transform(transform) { - // if we're at the final offset, then no need to keep iterating - Some(true) => break, - Some(false) => continue, - // no more available chunks - None => break, - } - } - - total - } - /// Pops a buffer from the front of the receive queue as long as the `transform` function returns a /// non-empty buffer. #[inline] @@ -667,7 +594,7 @@ pub struct Iter<'a> { impl<'a> Iter<'a> { #[inline] - fn new(buffer: &'a ReceiveBuffer) -> Self { + fn new(buffer: &'a Reassembler) -> Self { Self { prev_end: buffer.start_offset, inner: buffer.slots.iter(), @@ -690,7 +617,7 @@ impl<'a> Iterator for Iter<'a> { } pub struct Drain<'a> { - inner: &'a mut ReceiveBuffer, + inner: &'a mut Reassembler, } impl<'a> Iterator for Drain<'a> { @@ -707,3 +634,203 @@ impl<'a> Iterator for Drain<'a> { (len, Some(len)) } } + +impl buffer::reader::Storage for Reassembler { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len() + } + + #[inline] + fn buffer_is_empty(&self) -> bool { + self.is_empty() + } + + #[inline] + fn read_chunk( + &mut self, + watermark: usize, + ) -> Result { + if let Some(chunk) = self.pop_watermarked(watermark) { + return Ok(chunk.into()); + } + + Ok(Default::default()) + } + + #[inline] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result + where + Dest: buffer::writer::Storage + ?Sized, + { + let mut prev = BytesMut::new(); + + loop { + let remaining = dest.remaining_capacity(); + // ensure we have enough capacity in the destination buf + ensure!(remaining > 0, Ok(Default::default())); + + match self.pop_watermarked(remaining) { + Some(chunk) => { + let mut prev = core::mem::replace(&mut prev, chunk); + if !prev.is_empty() { + prev.infallible_copy_into(dest); + } + } + None if prev.is_empty() => { + return Ok(Default::default()); + } + None => { + return Ok(prev.into()); + } + } + } + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: buffer::writer::Storage + ?Sized, + { + loop { + let remaining = dest.remaining_capacity(); + // ensure we have enough capacity in the destination buf + ensure!(remaining > 0, Ok(())); + + let transform = |buffer: &mut BytesMut, _is_final_offset| { + let len = buffer.len().min(remaining); + buffer.infallible_copy_into(dest); + ((), len) + }; + + if self.pop_transform(transform).is_none() { + return Ok(()); + } + } + } +} + +impl buffer::reader::Reader for Reassembler { + #[inline] + fn current_offset(&self) -> VarInt { + unsafe { + // SAFETY: offset will always fit into a VarInt + VarInt::new_unchecked(self.start_offset) + } + } + + #[inline] + fn final_offset(&self) -> Option { + self.final_size().map(|v| unsafe { + // SAFETY: offset will always fit into a VarInt + VarInt::new_unchecked(v) + }) + } +} + +impl buffer::writer::Writer for Reassembler { + #[inline] + fn copy_from(&mut self, reader: &mut R) -> Result<(), buffer::Error> + where + R: buffer::Reader + ?Sized, + { + use buffer::reader::{Reader as _, Storage as _}; + + // enable checks for the reader + let mut reader = reader.with_checks(); + let reader = &mut reader; + + let final_offset = reader.final_offset(); + + // optimize for the case where the stream consists of a single chunk + if let Some(final_offset) = final_offset { + let mut is_single_chunk_stream = true; + + let offset = reader.current_offset(); + + // the reader is starting at the beginning of the stream + is_single_chunk_stream &= offset == VarInt::ZERO; + // the reader has buffered the final offset + is_single_chunk_stream &= reader.has_buffered_fin(); + // no data has been consumed from the Reassembler + is_single_chunk_stream &= self.consumed_len() == 0; + // we aren't tracking any slots + is_single_chunk_stream &= self.slots.is_empty(); + + if is_single_chunk_stream { + let payload_len = reader.buffered_len(); + let end = final_offset.as_u64(); + + // don't allocate anything if we don't need to + if payload_len == 0 { + let chunk = reader.read_chunk(0)?; + debug_assert!(chunk.is_empty()); + } else { + let mut data = BytesMut::with_capacity(payload_len); + + // copy the whole thing into `data` + reader.copy_into(&mut data)?; + + self.slots.push_back(Slot::new(offset.as_u64(), end, data)); + }; + + // update the final offset after everything was read correctly + self.final_offset = end; + self.invariants(); + + return Ok(()); + } + } + + // TODO add better support for copy avoidance by iterating to the appropriate slot and + // copying into that, if possible + + // fall back to copying individual chunks into the receive buffer + let mut first_write = true; + loop { + let offset = reader.current_offset(); + let chunk = reader.read_chunk(usize::MAX)?; + + // Record the final size before writing to avoid excess allocation. This also needs to + // happen after we read the first chunk in case there are errors. + if first_write { + if let Some(offset) = final_offset { + self.write_at_fin(offset, &[]) + .map_err(buffer::Error::mapped)?; + } + } + + // TODO maybe specialize on BytesMut chunks? - for now we'll just treat them as + // slices + + self.write_at(offset, &chunk) + .map_err(buffer::Error::mapped)?; + + first_write = false; + + if reader.buffer_is_empty() { + break; + } + } + + Ok(()) + } +} + +impl buffer::duplex::Skip for Reassembler { + #[inline] + fn skip(&mut self, len: VarInt, final_offset: Option) -> Result<(), Error> { + if let Some(offset) = final_offset { + self.write_at_fin(offset, &[])?; + } + + (*self).skip(len)?; + + Ok(()) + } +} diff --git a/quic/s2n-quic-core/src/buffer/receive_buffer/__fuzz__/buffer__receive_buffer__tests__model/corpus.tar.gz b/quic/s2n-quic-core/src/buffer/reassembler/__fuzz__/buffer__reassembler__tests__model/corpus.tar.gz similarity index 100% rename from quic/s2n-quic-core/src/buffer/receive_buffer/__fuzz__/buffer__receive_buffer__tests__model/corpus.tar.gz rename to quic/s2n-quic-core/src/buffer/reassembler/__fuzz__/buffer__reassembler__tests__model/corpus.tar.gz diff --git a/quic/s2n-quic-core/src/buffer/receive_buffer/probe.rs b/quic/s2n-quic-core/src/buffer/reassembler/probe.rs similarity index 71% rename from quic/s2n-quic-core/src/buffer/receive_buffer/probe.rs rename to quic/s2n-quic-core/src/buffer/reassembler/probe.rs index 542304b6fb..86da449953 100644 --- a/quic/s2n-quic-core/src/buffer/receive_buffer/probe.rs +++ b/quic/s2n-quic-core/src/buffer/reassembler/probe.rs @@ -4,15 +4,15 @@ crate::probe::define!( extern "probe" { /// Emitted when a buffer is allocated for a particular offset - #[link_name = s2n_quic_core__buffer__receive_buffer__alloc] + #[link_name = s2n_quic_core__buffer__reassembler__alloc] pub fn alloc(offset: u64, capacity: usize); /// Emitted when a chunk is read from the beginning of the buffer - #[link_name = s2n_quic_core__buffer__receive_buffer__pop] + #[link_name = s2n_quic_core__buffer__reassembler__pop] pub fn pop(offset: u64, len: usize); /// Emitted when a chunk of data is written at an offset - #[link_name = s2n_quic_core__buffer__receive_buffer__write] + #[link_name = s2n_quic_core__buffer__reassembler__write] pub fn write(offset: u64, len: usize); } ); diff --git a/quic/s2n-quic-core/src/buffer/receive_buffer/request.rs b/quic/s2n-quic-core/src/buffer/reassembler/request.rs similarity index 93% rename from quic/s2n-quic-core/src/buffer/receive_buffer/request.rs rename to quic/s2n-quic-core/src/buffer/reassembler/request.rs index bfe2c47a5d..8bbe2b6d20 100644 --- a/quic/s2n-quic-core/src/buffer/receive_buffer/request.rs +++ b/quic/s2n-quic-core/src/buffer/reassembler/request.rs @@ -1,7 +1,7 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -use super::ReceiveBufferError; +use super::Error; use crate::varint::VarInt; use bytes::{BufMut, BytesMut}; use core::fmt; @@ -23,10 +23,10 @@ impl<'a> fmt::Debug for Request<'a> { impl<'a> Request<'a> { #[inline] - pub fn new(offset: VarInt, data: &'a [u8]) -> Result { + pub fn new(offset: VarInt, data: &'a [u8]) -> Result { offset .checked_add_usize(data.len()) - .ok_or(ReceiveBufferError::OutOfRange)?; + .ok_or(Error::OutOfRange)?; Ok(Self { offset: offset.as_u64(), data, diff --git a/quic/s2n-quic-core/src/buffer/receive_buffer/slot.rs b/quic/s2n-quic-core/src/buffer/reassembler/slot.rs similarity index 99% rename from quic/s2n-quic-core/src/buffer/receive_buffer/slot.rs rename to quic/s2n-quic-core/src/buffer/reassembler/slot.rs index 6f69cd9e35..b087e5d3b1 100644 --- a/quic/s2n-quic-core/src/buffer/receive_buffer/slot.rs +++ b/quic/s2n-quic-core/src/buffer/reassembler/slot.rs @@ -5,7 +5,7 @@ use super::Request; use bytes::{Buf, BufMut, BytesMut}; use core::fmt; -/// Possible states for slots in the [`ReceiveBuffer`]s queue +/// Possible states for slots in the [`Reassembler`]'s queue #[derive(PartialEq, Eq)] pub struct Slot { start: u64, diff --git a/quic/s2n-quic-core/src/buffer/receive_buffer/tests.rs b/quic/s2n-quic-core/src/buffer/reassembler/tests.rs similarity index 94% rename from quic/s2n-quic-core/src/buffer/receive_buffer/tests.rs rename to quic/s2n-quic-core/src/buffer/reassembler/tests.rs index bef282f0b0..2b5f4a418f 100644 --- a/quic/s2n-quic-core/src/buffer/receive_buffer/tests.rs +++ b/quic/s2n-quic-core/src/buffer/reassembler/tests.rs @@ -20,7 +20,7 @@ enum Op { watermark: Option, }, Skip { - len: usize, + len: VarInt, }, } @@ -28,7 +28,7 @@ enum Op { #[cfg_attr(miri, ignore)] // This test is too expensive for miri to complete in a reasonable amount of time fn model_test() { check!().with_type::>().for_each(|ops| { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); let mut recv = Data::new(u64::MAX); for op in ops { match *op { @@ -59,8 +59,8 @@ fn model_test() { let consumed_len = buffer.consumed_len(); if buffer.skip(len).is_ok() { let new_consumed_len = buffer.consumed_len(); - assert_eq!(new_consumed_len, consumed_len + len as u64); - recv.seek_forward(len); + assert_eq!(new_consumed_len, consumed_len + len.as_u64()); + recv.seek_forward(len.as_u64()); } } } @@ -68,14 +68,14 @@ fn model_test() { // make sure a cleared buffer is the same as a new one buffer.reset(); - assert_eq!(buffer, ReceiveBuffer::new()); + assert_eq!(buffer, Reassembler::new()); }) } #[test] #[cfg_attr(miri, ignore)] // This test is too expensive for miri to complete in a reasonable amount of time fn write_and_pop() { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); let mut offset = VarInt::default(); let chunk = Data::send_one_at(0, 9000); let mut popped_bytes = 0; @@ -92,22 +92,25 @@ fn write_and_pop() { #[test] #[cfg_attr(miri, ignore)] // This test is too expensive for miri to complete in a reasonable amount of time fn write_and_copy_into_buf() { - let mut buffer = ReceiveBuffer::new(); + use crate::buffer::reader::Storage; + + let mut buffer = Reassembler::new(); let mut offset = VarInt::default(); - let mut output = vec![]; + let mut output: Vec = vec![]; for len in 0..10000 { + dbg!(len, offset); let chunk = Data::send_one_at(offset.as_u64(), len); buffer.write_at(offset, &chunk).unwrap(); offset += chunk.len(); - let copied_len = buffer.copy_into_buf(&mut output); - assert_eq!(copied_len, chunk.len()); + buffer.copy_into(&mut output).unwrap(); + assert_eq!(output.len(), chunk.len()); assert_eq!(&output[..], &chunk[..]); output.clear(); } } -fn new_receive_buffer() -> ReceiveBuffer { - let buffer = ReceiveBuffer::new(); +fn new_receive_buffer() -> Reassembler { + let buffer = Reassembler::new(); assert_eq!(buffer.len(), 0); buffer } @@ -400,7 +403,7 @@ fn chunk_partial_larger_after_test() { #[test] #[allow(clippy::cognitive_complexity)] // several operations are needed to get the buffer in the desired state fn write_and_read_buffer() { - let mut buf = ReceiveBuffer::new(); + let mut buf = Reassembler::new(); assert_eq!(0, buf.len()); assert!(buf.is_empty()); @@ -452,7 +455,7 @@ fn write_and_read_buffer() { #[test] fn fill_preallocated_gaps() { - let mut buf: ReceiveBuffer = ReceiveBuffer::new(); + let mut buf: Reassembler = Reassembler::new(); buf.write_at((MIN_BUFFER_ALLOCATION_SIZE as u32 + 2).into(), &[42, 45]) .unwrap(); @@ -498,7 +501,7 @@ fn fill_preallocated_gaps() { #[test] fn create_and_fill_large_gaps() { - let mut buf = ReceiveBuffer::new(); + let mut buf = Reassembler::new(); // This creates 3 full buffer gaps of full allocation ranges buf.write_at( (MIN_BUFFER_ALLOCATION_SIZE as u32 * 3 + 2).into(), @@ -573,7 +576,7 @@ fn create_and_fill_large_gaps() { #[test] fn ignore_already_consumed_data() { - let mut buf = ReceiveBuffer::new(); + let mut buf = Reassembler::new(); buf.write_at(0u32.into(), &[0, 1, 2, 3]).unwrap(); assert_eq!(0, buf.consumed_len()); assert_eq!(&[0u8, 1, 2, 3], &*buf.pop().unwrap()); @@ -597,7 +600,7 @@ fn ignore_already_consumed_data() { #[test] fn merge_right() { - let mut buf = ReceiveBuffer::new(); + let mut buf = Reassembler::new(); buf.write_at(4u32.into(), &[4, 5, 6]).unwrap(); buf.write_at(0u32.into(), &[0, 1, 2, 3]).unwrap(); assert_eq!(7, buf.len()); @@ -606,7 +609,7 @@ fn merge_right() { #[test] fn merge_left() { - let mut buf = ReceiveBuffer::new(); + let mut buf = Reassembler::new(); buf.write_at(0u32.into(), &[0, 1, 2, 3]).unwrap(); buf.write_at(4u32.into(), &[4, 5, 6]).unwrap(); assert_eq!(7, buf.len()); @@ -615,7 +618,7 @@ fn merge_left() { #[test] fn merge_both_sides() { - let mut buf = ReceiveBuffer::new(); + let mut buf = Reassembler::new(); // Create gaps on all sides, and merge them later buf.write_at(4u32.into(), &[4, 5]).unwrap(); buf.write_at(8u32.into(), &[8, 9]).unwrap(); @@ -627,7 +630,7 @@ fn merge_both_sides() { #[test] fn do_not_merge_across_allocations_right() { - let mut buf = ReceiveBuffer::new(); + let mut buf = Reassembler::new(); let mut data_left = [0u8; MIN_BUFFER_ALLOCATION_SIZE]; let mut data_right = [0u8; MIN_BUFFER_ALLOCATION_SIZE]; @@ -645,7 +648,7 @@ fn do_not_merge_across_allocations_right() { #[test] fn do_not_merge_across_allocations_left() { - let mut buf = ReceiveBuffer::new(); + let mut buf = Reassembler::new(); let mut data_left = [0u8; MIN_BUFFER_ALLOCATION_SIZE]; let mut data_right = [0u8; MIN_BUFFER_ALLOCATION_SIZE]; @@ -663,7 +666,7 @@ fn do_not_merge_across_allocations_left() { #[test] fn reset_buffer() { - let mut buf = ReceiveBuffer::new(); + let mut buf = Reassembler::new(); buf.write_at(2u32.into(), &[2, 3]).unwrap(); buf.write_at(0u32.into(), &[0, 1]).unwrap(); assert_eq!(4, buf.len()); @@ -701,7 +704,7 @@ fn fail_to_push_out_of_bounds_data() { for nr_bytes in 0..64 * 2 + 1 { let data = vec![0u8; nr_bytes + 1]; assert_eq!( - Err(ReceiveBufferError::OutOfRange), + Err(Error::OutOfRange), buffer.write_at( VarInt::new(MAX_VARINT_VALUE - nr_bytes as u64).unwrap(), &data[..] @@ -714,7 +717,7 @@ fn fail_to_push_out_of_bounds_data() { #[cfg_attr(miri, ignore)] // miri fails because the slice points to invalid memory #[cfg(target_pointer_width = "64")] fn fail_to_push_out_of_bounds_data_with_long_buffer() { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); // Overflow the allowed buffers by size 1. This uses an invalid memory // reference, due to not wanting to allocate too much memory. This is @@ -728,7 +731,7 @@ fn fail_to_push_out_of_bounds_data_with_long_buffer() { for _ in 0..64 * 2 + 1 { assert_eq!( - Err(ReceiveBufferError::OutOfRange), + Err(Error::OutOfRange), buffer.write_at(20u32.into(), fake_data) ); } @@ -736,7 +739,7 @@ fn fail_to_push_out_of_bounds_data_with_long_buffer() { #[test] fn pop_watermarked_test() { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); assert_eq!( None, @@ -788,7 +791,7 @@ fn write_start_fin_test() { for size in INTERESTING_CHUNK_SIZES.iter().copied() { for pre_empty_fin in [false, true] { let bytes: Vec = Iterator::map(0..size, |v| v as u8).collect(); - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); // write the fin offset first if pre_empty_fin { @@ -832,11 +835,11 @@ fn write_partial_fin_test() { let partial_bytes: Vec = Iterator::map(0..partial_size, |v| v as u8).collect(); let fin_bytes: Vec = Iterator::map(0..fin_size, |v| v as u8).collect(); - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); assert!(!buffer.is_writing_complete()); assert!(!buffer.is_reading_complete()); - let mut oracle = ReceiveBuffer::new(); + let mut oracle = Reassembler::new(); let mut requests = vec![ (0u32, &partial_bytes, false), @@ -919,13 +922,13 @@ fn write_partial_fin_test() { #[test] fn write_fin_zero_test() { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); buffer.write_at_fin(0u32.into(), &[]).unwrap(); assert_eq!( buffer.write_at(0u32.into(), &[1]), - Err(ReceiveBufferError::InvalidFin), + Err(Error::InvalidFin), "no data can be written after a fin" ); } @@ -933,7 +936,7 @@ fn write_fin_zero_test() { #[test] fn fin_pop_take_test() { for write_fin in [false, true] { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); buffer.write_at(0u32.into(), &[1]).unwrap(); if write_fin { buffer.write_at_fin(1u32.into(), &[]).unwrap(); @@ -956,38 +959,38 @@ fn fin_pop_take_test() { #[test] fn write_fin_changed_error_test() { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); buffer.write_at_fin(16u32.into(), &[]).unwrap(); assert_eq!( buffer.write_at_fin(0u32.into(), &[]), - Err(ReceiveBufferError::InvalidFin), + Err(Error::InvalidFin), "the fin cannot decrease a previous fin" ); assert_eq!( buffer.write_at_fin(32u32.into(), &[]), - Err(ReceiveBufferError::InvalidFin), + Err(Error::InvalidFin), "the fin cannot exceed a previous fin" ); } #[test] fn write_fin_lowered_test() { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); buffer.write_at(32u32.into(), &[1]).unwrap(); assert_eq!( buffer.write_at_fin(16u32.into(), &[]), - Err(ReceiveBufferError::InvalidFin), + Err(Error::InvalidFin), "the fin cannot be lower than an already existing chunk" ); } #[test] fn write_fin_complete_test() { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); buffer.write_at_fin(4u32.into(), &[4]).unwrap(); @@ -1018,7 +1021,7 @@ fn allocation_size_test() { for (index, (offset, size)) in received.iter().copied().enumerate() { assert_eq!( - ReceiveBuffer::allocation_size(offset), + Reassembler::allocation_size(offset), size, "offset = {}", offset @@ -1027,7 +1030,7 @@ fn allocation_size_test() { if let Some((offset, _)) = received.get(index + 1) { let offset = offset - 1; assert_eq!( - ReceiveBuffer::allocation_size(offset), + Reassembler::allocation_size(offset), size, "offset = {}", offset diff --git a/quic/s2n-quic-core/src/buffer/writer.rs b/quic/s2n-quic-core/src/buffer/writer.rs new file mode 100644 index 0000000000..4d55d3f186 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer.rs @@ -0,0 +1,12 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +pub mod storage; + +pub use storage::Storage; + +pub trait Writer { + fn copy_from(&mut self, reader: &mut R) -> Result<(), super::Error> + where + R: super::Reader + ?Sized; +} diff --git a/quic/s2n-quic-core/src/buffer/writer/storage.rs b/quic/s2n-quic-core/src/buffer/writer/storage.rs new file mode 100644 index 0000000000..c181b076cf --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/storage.rs @@ -0,0 +1,75 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::buffer::reader::storage::Chunk; +use bytes::{buf::UninitSlice, Bytes, BytesMut}; + +mod buf; +mod byte_queue; +mod discard; +mod empty; +mod limit; +mod tracked; +mod uninit_slice; + +pub use buf::BufMut; +pub use discard::Discard; +pub use empty::Empty; +pub use limit::Limit; +pub use tracked::Tracked; + +pub trait Storage { + const SPECIALIZES_BYTES: bool = false; + const SPECIALIZES_BYTES_MUT: bool = false; + + fn put_slice(&mut self, bytes: &[u8]); + + #[inline(always)] + fn put_uninit_slice(&mut self, payload_len: usize, f: F) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + // we can specialize on an empty payload + ensure!(payload_len == 0, Ok(false)); + + f(UninitSlice::new(&mut []))?; + + Ok(true) + } + + fn remaining_capacity(&self) -> usize; + + #[inline] + fn has_remaining_capacity(&self) -> bool { + self.remaining_capacity() > 0 + } + + #[inline] + fn put_bytes(&mut self, bytes: Bytes) { + self.put_slice(&bytes); + } + + #[inline] + fn put_bytes_mut(&mut self, bytes: BytesMut) { + self.put_slice(&bytes); + } + + #[inline] + fn put_chunk(&mut self, chunk: Chunk) { + match chunk { + Chunk::Slice(v) => self.put_slice(v), + Chunk::Bytes(v) => self.put_bytes(v), + Chunk::BytesMut(v) => self.put_bytes_mut(v), + } + } + + #[inline] + fn limit(&mut self, max_len: usize) -> Limit { + Limit::new(self, max_len) + } + + #[inline] + fn tracked(&mut self) -> Tracked { + Tracked::new(self) + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer/storage/buf.rs b/quic/s2n-quic-core/src/buffer/writer/storage/buf.rs new file mode 100644 index 0000000000..704aa9d1fb --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/storage/buf.rs @@ -0,0 +1,111 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Storage, UninitSlice}; + +pub struct BufMut<'a, T: bytes::BufMut> { + buf_mut: &'a mut T, +} + +impl<'a, T: bytes::BufMut> BufMut<'a, T> { + #[inline] + pub fn new(buf_mut: &'a mut T) -> Self { + Self { buf_mut } + } +} + +impl<'a, T: bytes::BufMut> Storage for BufMut<'a, T> { + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + self.buf_mut.put_slice(bytes); + } + + #[inline] + fn remaining_capacity(&self) -> usize { + self.buf_mut.remaining_mut() + } + + #[inline] + fn put_uninit_slice(&mut self, payload_len: usize, f: F) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + let chunk = self.buf_mut.chunk_mut(); + ensure!(chunk.len() >= payload_len, Ok(false)); + + f(&mut chunk[..payload_len])?; + + unsafe { + self.buf_mut.advance_mut(payload_len); + } + + Ok(true) + } +} + +macro_rules! impl_buf_mut { + ($ty:ty $(, $reserve:ident)?) => { + impl Storage for $ty { + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + bytes::BufMut::put_slice(self, bytes); + } + + #[inline] + fn remaining_capacity(&self) -> usize { + bytes::BufMut::remaining_mut(self) + } + + #[inline] + fn put_uninit_slice( + &mut self, + payload_len: usize, + f: F, + ) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + use bytes::BufMut; + + $( + self.$reserve(payload_len); + )? + + let chunk = self.chunk_mut(); + ensure!(chunk.len() >= payload_len, Ok(false)); + + f(&mut chunk[..payload_len])?; + + unsafe { + self.advance_mut(payload_len); + } + + Ok(true) + } + } + }; +} + +impl_buf_mut!(bytes::BytesMut, reserve); +impl_buf_mut!(alloc::vec::Vec, reserve); +impl_buf_mut!(&mut [u8]); +impl_buf_mut!(&mut [core::mem::MaybeUninit]); + +#[cfg(test)] +mod tests { + use crate::buffer::{reader::Storage as _, writer::Storage as _}; + + #[test] + fn vec_test() { + let mut buffer: Vec = vec![]; + assert_eq!(buffer.remaining_capacity(), isize::MAX as usize); + + let expected = &b"hello world!"[..]; + + let mut source = expected; + + source.copy_into(&mut buffer).unwrap(); + + assert_eq!(&buffer, expected); + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer/storage/byte_queue.rs b/quic/s2n-quic-core/src/buffer/writer/storage/byte_queue.rs new file mode 100644 index 0000000000..2d311d5b24 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/storage/byte_queue.rs @@ -0,0 +1,72 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Bytes, BytesMut, Storage}; +use alloc::{collections::VecDeque, vec::Vec}; + +macro_rules! impl_queue { + ($ty:ident, $push:ident) => { + impl Storage for $ty { + const SPECIALIZES_BYTES: bool = true; + const SPECIALIZES_BYTES_MUT: bool = true; + + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + self.put_bytes(Bytes::copy_from_slice(bytes)); + } + + #[inline] + fn remaining_capacity(&self) -> usize { + usize::MAX + } + + #[inline] + fn has_remaining_capacity(&self) -> bool { + true + } + + #[inline] + fn put_bytes(&mut self, bytes: Bytes) { + self.$push(bytes); + } + + #[inline] + fn put_bytes_mut(&mut self, bytes: BytesMut) { + self.$push(bytes.freeze()); + } + } + + impl Storage for $ty { + const SPECIALIZES_BYTES_MUT: bool = true; + + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + self.put_bytes_mut(BytesMut::from(bytes)); + } + + #[inline] + fn remaining_capacity(&self) -> usize { + usize::MAX + } + + #[inline] + fn has_remaining_capacity(&self) -> bool { + true + } + + #[inline] + fn put_bytes(&mut self, bytes: Bytes) { + // we can't convert Bytes into BytesMut so we'll need to copy it + self.put_slice(&bytes); + } + + #[inline] + fn put_bytes_mut(&mut self, bytes: BytesMut) { + self.$push(bytes); + } + } + }; +} + +impl_queue!(Vec, push); +impl_queue!(VecDeque, push_back); diff --git a/quic/s2n-quic-core/src/buffer/writer/storage/discard.rs b/quic/s2n-quic-core/src/buffer/writer/storage/discard.rs new file mode 100644 index 0000000000..38682d3541 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/storage/discard.rs @@ -0,0 +1,17 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +#[derive(Clone, Copy, Debug, Default)] +pub struct Discard; + +impl super::Storage for Discard { + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + let _ = bytes; + } + + #[inline] + fn remaining_capacity(&self) -> usize { + usize::MAX + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer/storage/empty.rs b/quic/s2n-quic-core/src/buffer/writer/storage/empty.rs new file mode 100644 index 0000000000..8fae1b8697 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/storage/empty.rs @@ -0,0 +1,22 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::Storage; + +#[derive(Clone, Copy, Debug, Default)] +pub struct Empty; + +impl Storage for Empty { + #[inline] + fn put_slice(&mut self, slice: &[u8]) { + debug_assert!( + slice.is_empty(), + "cannot put a non-empty slice in empty writer chunk" + ); + } + + #[inline] + fn remaining_capacity(&self) -> usize { + 0 + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer/storage/limit.rs b/quic/s2n-quic-core/src/buffer/writer/storage/limit.rs new file mode 100644 index 0000000000..9c6fe209a4 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/storage/limit.rs @@ -0,0 +1,80 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Chunk, Storage}; +use bytes::{buf::UninitSlice, Bytes, BytesMut}; + +pub struct Limit<'a, C: Storage + ?Sized> { + chunk: &'a mut C, + remaining_capacity: usize, +} + +impl<'a, C: Storage + ?Sized> Limit<'a, C> { + #[inline] + pub fn new(chunk: &'a mut C, remaining_capacity: usize) -> Self { + let remaining_capacity = chunk.remaining_capacity().min(remaining_capacity); + Self { + chunk, + remaining_capacity, + } + } +} + +impl<'a, C: Storage + ?Sized> Storage for Limit<'a, C> { + const SPECIALIZES_BYTES: bool = C::SPECIALIZES_BYTES; + const SPECIALIZES_BYTES_MUT: bool = C::SPECIALIZES_BYTES_MUT; + + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + debug_assert!(bytes.len() <= self.remaining_capacity); + self.chunk.put_slice(bytes); + self.remaining_capacity -= bytes.len(); + } + + #[inline(always)] + fn put_uninit_slice(&mut self, payload_len: usize, f: F) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + debug_assert!(payload_len <= self.remaining_capacity); + let did_write = self.chunk.put_uninit_slice(payload_len, f)?; + if did_write { + self.remaining_capacity -= payload_len; + } + Ok(did_write) + } + + #[inline] + fn remaining_capacity(&self) -> usize { + self.chunk.remaining_capacity().min(self.remaining_capacity) + } + + #[inline] + fn has_remaining_capacity(&self) -> bool { + self.remaining_capacity > 0 && self.chunk.has_remaining_capacity() + } + + #[inline] + fn put_bytes(&mut self, bytes: Bytes) { + let len = bytes.len(); + debug_assert!(len <= self.remaining_capacity); + self.chunk.put_bytes(bytes); + self.remaining_capacity -= len; + } + + #[inline] + fn put_bytes_mut(&mut self, bytes: BytesMut) { + let len = bytes.len(); + debug_assert!(len <= self.remaining_capacity); + self.chunk.put_bytes_mut(bytes); + self.remaining_capacity -= len; + } + + #[inline] + fn put_chunk(&mut self, chunk: Chunk) { + let len = chunk.len(); + debug_assert!(len <= self.remaining_capacity); + self.chunk.put_chunk(chunk); + self.remaining_capacity -= len; + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer/storage/tracked.rs b/quic/s2n-quic-core/src/buffer/writer/storage/tracked.rs new file mode 100644 index 0000000000..6c457e3bb6 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/storage/tracked.rs @@ -0,0 +1,76 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Chunk, Storage}; +use bytes::{buf::UninitSlice, Bytes, BytesMut}; + +pub struct Tracked<'a, C: Storage + ?Sized> { + chunk: &'a mut C, + written: usize, +} + +impl<'a, C: Storage + ?Sized> Tracked<'a, C> { + #[inline] + pub fn new(chunk: &'a mut C) -> Self { + Self { chunk, written: 0 } + } + + #[inline] + pub fn written_len(&self) -> usize { + self.written + } +} + +impl<'a, C: Storage + ?Sized> Storage for Tracked<'a, C> { + const SPECIALIZES_BYTES: bool = C::SPECIALIZES_BYTES; + const SPECIALIZES_BYTES_MUT: bool = C::SPECIALIZES_BYTES_MUT; + + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + self.chunk.put_slice(bytes); + self.written += bytes.len(); + } + + #[inline(always)] + fn put_uninit_slice(&mut self, payload_len: usize, f: F) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + let did_write = self.chunk.put_uninit_slice(payload_len, f)?; + if did_write { + self.written += payload_len; + } + Ok(did_write) + } + + #[inline] + fn remaining_capacity(&self) -> usize { + self.chunk.remaining_capacity() + } + + #[inline] + fn has_remaining_capacity(&self) -> bool { + self.chunk.has_remaining_capacity() + } + + #[inline] + fn put_bytes(&mut self, bytes: Bytes) { + let len = bytes.len(); + self.chunk.put_bytes(bytes); + self.written += len; + } + + #[inline] + fn put_bytes_mut(&mut self, bytes: BytesMut) { + let len = bytes.len(); + self.chunk.put_bytes_mut(bytes); + self.written += len; + } + + #[inline] + fn put_chunk(&mut self, chunk: Chunk) { + let len = chunk.len(); + self.chunk.put_chunk(chunk); + self.written += len; + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer/storage/uninit_slice.rs b/quic/s2n-quic-core/src/buffer/writer/storage/uninit_slice.rs new file mode 100644 index 0000000000..d3afc07a0f --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/storage/uninit_slice.rs @@ -0,0 +1,36 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::Storage; +use bytes::buf::UninitSlice; + +impl Storage for &mut UninitSlice { + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + self[..bytes.len()].copy_from_slice(bytes); + let empty = UninitSlice::new(&mut []); + let next = core::mem::replace(self, empty); + *self = &mut next[bytes.len()..]; + } + + #[inline] + fn put_uninit_slice(&mut self, payload_len: usize, f: F) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + ensure!(self.len() >= payload_len, Ok(false)); + + f(&mut self[..payload_len])?; + + let empty = UninitSlice::new(&mut []); + let next = core::mem::replace(self, empty); + *self = &mut next[payload_len..]; + + Ok(true) + } + + #[inline] + fn remaining_capacity(&self) -> usize { + self.len() + } +} diff --git a/quic/s2n-quic-core/src/stream/testing.rs b/quic/s2n-quic-core/src/stream/testing.rs index d815b9f53f..86d8920d22 100644 --- a/quic/s2n-quic-core/src/stream/testing.rs +++ b/quic/s2n-quic-core/src/stream/testing.rs @@ -3,14 +3,12 @@ //! A model that ensures stream data is correctly sent and received between peers -#[cfg(feature = "generator")] -use bolero_generator::*; - -#[cfg(test)] -use bolero::generator::*; - +use crate::buffer::reader; use bytes::Bytes; +#[cfg(any(test, feature = "generator"))] +use bolero_generator::*; + static DATA: Bytes = { const INNER: [u8; DATA_LEN] = { let mut data = [0; DATA_LEN]; @@ -125,7 +123,7 @@ impl Data { let amount = ((self.len - self.offset) as usize).min(amount); let chunk = Self::send_one_at(self.offset, amount); - self.seek_forward(chunk.len()); + self.seek_forward(chunk.len() as u64); Some(chunk) } @@ -147,8 +145,56 @@ impl Data { } /// Moves the current offset forward by the provided `len` - pub fn seek_forward(&mut self, len: usize) { - self.offset += len as u64; + pub fn seek_forward(&mut self, len: u64) { + self.offset += len; + } +} + +impl reader::Storage for Data { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + (self.len - self.offset).try_into().unwrap() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + if let Some(chunk) = self.send_one(watermark) { + return Ok(chunk.into()); + } + + Ok(Default::default()) + } + + #[inline] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + while let Some(chunk) = self.send_one(dest.remaining_capacity()) { + // if the chunk matches the destination then return it instead of copying + if chunk.len() == dest.remaining_capacity() || self.is_finished() { + return Ok(chunk.into()); + } + + // otherwise copy the chunk into the destination + dest.put_bytes(chunk); + } + + Ok(Default::default()) + } +} + +impl reader::Reader for Data { + #[inline] + fn current_offset(&self) -> crate::varint::VarInt { + self.offset().try_into().unwrap() + } + + #[inline] + fn final_offset(&self) -> Option { + Some(self.len.try_into().unwrap()) } } diff --git a/quic/s2n-quic-transport/src/space/crypto_stream.rs b/quic/s2n-quic-transport/src/space/crypto_stream.rs index d835fc75e3..e455fa61c6 100644 --- a/quic/s2n-quic-transport/src/space/crypto_stream.rs +++ b/quic/s2n-quic-transport/src/space/crypto_stream.rs @@ -6,7 +6,7 @@ use crate::{ transmission, }; use s2n_quic_core::{ - ack, buffer::ReceiveBuffer, frame::crypto::CryptoRef, transport, varint::VarInt, + ack, buffer::Reassembler, frame::crypto::CryptoRef, transport, varint::VarInt, }; pub type TxCryptoStream = DataSender; @@ -33,7 +33,7 @@ impl OutgoingDataFlowController for CryptoFlowController { #[derive(Debug)] pub struct CryptoStream { pub tx: TxCryptoStream, - pub rx: ReceiveBuffer, + pub rx: Reassembler, is_finished: bool, } @@ -49,7 +49,7 @@ impl CryptoStream { pub fn new() -> Self { Self { tx: TxCryptoStream::new(Default::default(), TX_MAX_BUFFER_CAPACITY), - rx: ReceiveBuffer::default(), + rx: Reassembler::default(), is_finished: false, } } diff --git a/quic/s2n-quic-transport/src/stream/receive_stream.rs b/quic/s2n-quic-transport/src/stream/receive_stream.rs index b2a7d911e0..e6168b0cb2 100644 --- a/quic/s2n-quic-transport/src/stream/receive_stream.rs +++ b/quic/s2n-quic-transport/src/stream/receive_stream.rs @@ -18,9 +18,7 @@ use core::{ }; use s2n_quic_core::{ ack, application, - buffer::{ - ReceiveBuffer as StreamReceiveBuffer, ReceiveBufferError as StreamReceiveBufferError, - }, + buffer::{self, Reassembler}, frame::{stream::StreamRef, MaxStreamData, ResetStream, StopSending, StreamDataBlocked}, packet::number::PacketNumber, stream::{ops, StreamId}, @@ -337,7 +335,7 @@ pub struct ReceiveStream { /// The current state of the stream pub(super) state: ReceiveStreamState, /// Buffer of already received data - pub(super) receive_buffer: StreamReceiveBuffer, + pub(super) receive_buffer: Reassembler, /// The composite flow controller for receiving data pub(super) flow_controller: ReceiveStreamFlowController, /// Synchronizes the `STOP_SENDING` flag towards the peer. @@ -368,7 +366,7 @@ impl ReceiveStream { let mut result = ReceiveStream { state, - receive_buffer: StreamReceiveBuffer::new(), + receive_buffer: Reassembler::new(), flow_controller: ReceiveStreamFlowController::new( connection_flow_controller, initial_window, @@ -447,7 +445,7 @@ impl ReceiveStream { // If this is the last frame then inform the receive_buffer so it can check for any // final size errors. - let write_result = if frame.is_fin { + let write_result: Result<(), buffer::Error> = if frame.is_fin { self.receive_buffer.write_at_fin(frame.offset, frame.data) } else { self.receive_buffer.write_at(frame.offset, frame.data) @@ -460,16 +458,17 @@ impl ReceiveStream { //# FLOW_CONTROL_ERROR if it receives more data than the maximum data //# value that it has sent. This includes violations of remembered //# limits in Early Data; see Section 7.4.1. - StreamReceiveBufferError::OutOfRange => { - transport::Error::FLOW_CONTROL_ERROR - } + buffer::Error::OutOfRange => transport::Error::FLOW_CONTROL_ERROR, //= https://www.rfc-editor.org/rfc/rfc9000#section-4.5 //# Once a final size for a stream is known, it cannot change. If a //# RESET_STREAM or STREAM frame is received indicating a change in the //# final size for the stream, an endpoint SHOULD respond with an error //# of type FINAL_SIZE_ERROR; see Section 11 for details on error //# handling. - StreamReceiveBufferError::InvalidFin => transport::Error::FINAL_SIZE_ERROR, + buffer::Error::InvalidFin => transport::Error::FINAL_SIZE_ERROR, + buffer::Error::ReaderError(_) => { + unreachable!("reader is infallible") + } } .with_reason("data reception error") .with_frame_type(frame.tag().into()) diff --git a/scripts/copyright_check b/scripts/copyright_check index e2003e87de..1846e45b8c 100755 --- a/scripts/copyright_check +++ b/scripts/copyright_check @@ -7,7 +7,7 @@ set -e -S2N_QUIC_FILES=$(find "$PWD" -type f \( -name "*.rs" -o -name "*.py" \) -not \( -path "*/s2n-quic/target/*" -o -path "*/s2n-tls-sys/s2n/*" \)) +S2N_QUIC_FILES=$(find "$PWD" -type f \( -name "*.rs" -o -name "*.py" \) -not -path "*/target/*") FAILED=0