diff --git a/quic/s2n-quic-bench/src/buffer.rs b/quic/s2n-quic-bench/src/buffer.rs index a3835f22fd..c40c942dd6 100644 --- a/quic/s2n-quic-bench/src/buffer.rs +++ b/quic/s2n-quic-bench/src/buffer.rs @@ -2,7 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 use criterion::{black_box, BenchmarkId, Criterion, Throughput}; -use s2n_quic_core::{buffer::ReceiveBuffer, varint::VarInt}; +use s2n_quic_core::{ + buffer::{reader::Storage as _, writer, Reassembler}, + varint::VarInt, +}; pub fn benchmarks(c: &mut Criterion) { let mut group = c.benchmark_group("buffer"); @@ -13,19 +16,21 @@ pub fn benchmarks(c: &mut Criterion) { group.throughput(Throughput::Bytes(input.len() as _)); group.bench_with_input(BenchmarkId::new("skip", size), &input, |b, _input| { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); + let size = VarInt::try_from(size).unwrap(); b.iter(move || { buffer.skip(black_box(size)).unwrap(); }); }); group.bench_with_input(BenchmarkId::new("write_at", size), &input, |b, input| { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); let mut offset = VarInt::from_u8(0); let len = VarInt::new(input.len() as _).unwrap(); b.iter(move || { buffer.write_at(offset, input).unwrap(); - buffer.copy_into_buf(&mut NoOpBuf); + // Avoid oversampling the `pop` implementation + buffer.copy_into(&mut writer::storage::Discard).unwrap(); offset += len; }); }); @@ -36,7 +41,7 @@ pub fn benchmarks(c: &mut Criterion) { BenchmarkId::new("write_at_fragmented", size), &input, |b, input| { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); let mut offset = VarInt::from_u8(0); let len = VarInt::new(input.len() as _).unwrap(); b.iter(move || { @@ -44,34 +49,11 @@ pub fn benchmarks(c: &mut Criterion) { buffer.write_at(first_offset, input).unwrap(); let second_offset = offset; buffer.write_at(second_offset, input).unwrap(); - buffer.copy_into_buf(&mut NoOpBuf); + // Avoid oversampling the `pop` implementation + buffer.copy_into(&mut writer::storage::Discard).unwrap(); offset = first_offset + len; }); }, ); } } - -/// A BufMut implementation that doesn't actually copy data into it -/// -/// This is used to avoid oversampling the `pop` implementation for -/// `write_at` benchmarks. -struct NoOpBuf; - -unsafe impl bytes::BufMut for NoOpBuf { - #[inline] - fn remaining_mut(&self) -> usize { - usize::MAX - } - - #[inline] - unsafe fn advance_mut(&mut self, _cnt: usize) {} - - #[inline] - fn put_slice(&mut self, _slice: &[u8]) {} - - #[inline] - fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice { - unimplemented!() - } -} diff --git a/quic/s2n-quic-core/src/buffer.rs b/quic/s2n-quic-core/src/buffer.rs new file mode 100644 index 0000000000..3ae9be32d3 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer.rs @@ -0,0 +1,14 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +pub mod duplex; +mod error; +pub mod reader; +pub mod reassembler; +pub mod writer; + +pub use duplex::Duplex; +pub use error::Error; +pub use reader::Reader; +pub use reassembler::Reassembler; +pub use writer::Writer; diff --git a/quic/s2n-quic-core/src/buffer/duplex.rs b/quic/s2n-quic-core/src/buffer/duplex.rs new file mode 100644 index 0000000000..c4f73caf40 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/duplex.rs @@ -0,0 +1,23 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Error, Reader, Writer}; +use crate::varint::VarInt; + +mod interposer; + +pub use interposer::Interposer; + +/// A buffer that is capable of both reading and writing +pub trait Duplex: Reader + Writer {} + +impl Duplex for T {} + +/// A buffer which can be advanced forward without reading or writing payloads. This +/// is essentially a forward-only [`std::io::Seek`]. +/// +/// This can be used for scenarios where the buffer was written somewhere else but still needed to +/// be tracked. +pub trait Skip: Duplex { + fn skip(&mut self, len: VarInt, final_offset: Option) -> Result<(), Error>; +} diff --git a/quic/s2n-quic-core/src/buffer/duplex/interposer.rs b/quic/s2n-quic-core/src/buffer/duplex/interposer.rs new file mode 100644 index 0000000000..aa70deb2b4 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/duplex/interposer.rs @@ -0,0 +1,332 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + buffer::{ + duplex, + reader::{self, Reader, Storage as _}, + writer::{self, Writer}, + Error, + }, + varint::VarInt, +}; +use core::convert::Infallible; + +/// A wrapper around an underlying buffer (`duplex`) which will prefer to read/write from a +/// user-provided temporary buffer (`storage`). The underlying buffer (`duplex`)'s current +/// position and total length are updated if needed. +pub struct Interposer<'a, S, D> +where + S: writer::Storage + ?Sized, + D: duplex::Skip + ?Sized, +{ + storage: &'a mut S, + duplex: &'a mut D, +} + +impl<'a, S, D> Interposer<'a, S, D> +where + S: writer::Storage + ?Sized, + D: duplex::Skip + ?Sized, +{ + #[inline] + pub fn new(storage: &'a mut S, duplex: &'a mut D) -> Self { + debug_assert!( + !storage.has_remaining_capacity() || duplex.buffer_is_empty(), + "`duplex` should be drained into `storage` before constructing an Interposer" + ); + + Self { storage, duplex } + } +} + +/// Delegates to the inner Duplex +impl<'a, S, D> reader::Storage for Interposer<'a, S, D> +where + S: writer::Storage + ?Sized, + D: duplex::Skip + ?Sized, +{ + type Error = D::Error; + + #[inline] + fn buffered_len(&self) -> usize { + self.duplex.buffered_len() + } + + #[inline] + fn buffer_is_empty(&self) -> bool { + self.duplex.buffer_is_empty() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result, Self::Error> { + self.duplex.read_chunk(watermark) + } + + #[inline] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result, Self::Error> + where + Dest: writer::Storage + ?Sized, + { + self.duplex.partial_copy_into(dest) + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: writer::Storage + ?Sized, + { + self.duplex.copy_into(dest) + } +} + +/// Delegates to the inner Duplex +impl<'a, C, D> Reader for Interposer<'a, C, D> +where + C: writer::Storage + ?Sized, + D: duplex::Skip + ?Sized, +{ + #[inline] + fn current_offset(&self) -> VarInt { + self.duplex.current_offset() + } + + #[inline] + fn final_offset(&self) -> Option { + self.duplex.final_offset() + } + + #[inline] + fn has_buffered_fin(&self) -> bool { + self.duplex.has_buffered_fin() + } + + #[inline] + fn is_consumed(&self) -> bool { + self.duplex.is_consumed() + } +} + +impl<'a, C, D> Writer for Interposer<'a, C, D> +where + C: writer::Storage + ?Sized, + D: duplex::Skip + ?Sized, +{ + #[inline] + fn read_from(&mut self, reader: &mut R) -> Result<(), Error> + where + R: Reader + ?Sized, + { + let final_offset = reader.final_offset(); + + { + // if the storage specializes writing zero-copy Bytes/BytesMut, then just write to the + // receive buffer, since that's what it stores + let mut should_delegate = C::SPECIALIZES_BYTES || C::SPECIALIZES_BYTES_MUT; + + // if the storage has no space left then write into the duplex + should_delegate |= !self.storage.has_remaining_capacity(); + + // if this packet is non-contiguous, then delegate to the wrapped writer + should_delegate |= reader.current_offset() != self.duplex.current_offset(); + + // if the storage has less than half of the payload, then delegate + should_delegate |= self.storage.remaining_capacity() < (reader.buffered_len() / 2); + + if should_delegate { + self.duplex.read_from(reader)?; + + // don't copy into `storage` here - let the caller do that later since it can be + // more efficient to pull from `duplex` all in one go. + + return Ok(()); + } + } + + debug_assert!( + self.storage.has_remaining_capacity(), + "this code should only be executed if the storage has capacity" + ); + + { + // track the number of consumed bytes + let mut reader = reader.track_read(); + + reader.copy_into(self.storage)?; + + let write_len = reader.consumed_len(); + let write_len = VarInt::try_from(write_len).map_err(|_| Error::OutOfRange)?; + + // notify the duplex that we bypassed it and should skip + self.duplex + .skip(write_len, final_offset) + .map_err(Error::mapped)?; + } + + // if we still have some remaining bytes consume the rest in the duplex + if !reader.buffer_is_empty() { + self.duplex.read_from(reader)?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + buffer::{ + reader::Reader, + writer::{Storage as _, Writer}, + Reassembler, + }, + stream::testing::Data, + }; + + #[test] + fn undersized_storage_test() { + let mut duplex = Reassembler::default(); + let mut reader = Data::new(10); + let mut checker = reader; + + let mut storage: Vec = vec![]; + { + // limit the storage capacity so we force writing into the duplex + let mut storage = storage.with_write_limit(1); + + let mut interposer = Interposer::new(&mut storage, &mut duplex); + + interposer.read_from(&mut reader).unwrap(); + } + + // the storage was too small so we delegated to duplex + assert!(storage.is_empty()); + assert_eq!(duplex.buffered_len(), 10); + + // move the reassembled bytes into the checker + checker.read_from(&mut duplex).unwrap(); + assert_eq!(duplex.current_offset().as_u64(), 10); + assert!(duplex.is_consumed()); + } + + #[test] + fn out_of_order_test() { + let mut duplex = Reassembler::default(); + + // first write 5 bytes at offset 5 + { + let mut reader = Data::new(10); + + // advance the reader by 5 bytes + let _ = reader.send_one(5); + + let mut storage: Vec = vec![]; + + let mut interposer = Interposer::new(&mut storage, &mut duplex); + + interposer.read_from(&mut reader).unwrap(); + + // make sure we consumed the reader + assert_eq!(reader.current_offset().as_u64(), 10); + + assert_eq!(interposer.current_offset().as_u64(), 0); + assert_eq!(interposer.buffered_len(), 0); + + // make sure we didn't write to the storage, even if we had capacity, since the + // current_offset doesn't match + assert!(storage.is_empty()); + } + + // then write 10 bytes at offset 0 + { + let mut reader = Data::new(10); + + let mut storage: Vec = vec![]; + + let mut interposer = Interposer::new(&mut storage, &mut duplex); + + interposer.read_from(&mut reader).unwrap(); + + // make sure we consumed the reader + assert_eq!(reader.current_offset().as_u64(), 10); + + assert_eq!(interposer.current_offset().as_u64(), 10); + assert_eq!(interposer.buffered_len(), 0); + + // make sure we copied the entire reader + assert_eq!(storage.len(), 10); + assert!(duplex.is_consumed()); + } + } + + #[test] + fn skip_test() { + let mut duplex = Reassembler::default(); + let mut reader = Data::new(10); + let mut checker = reader; + + let mut storage: Vec = vec![]; + + let mut interposer = Interposer::new(&mut storage, &mut duplex); + + interposer.read_from(&mut reader).unwrap(); + + assert_eq!(storage.len(), 10); + assert_eq!(duplex.current_offset().as_u64(), 10); + + checker.receive(&[&storage[..]]); + } + + #[test] + fn empty_storage_test() { + let mut duplex = Reassembler::default(); + let mut reader = Data::new(10); + let mut checker = reader; + + let mut storage = writer::storage::Empty; + + let mut interposer = Interposer::new(&mut storage, &mut duplex); + + interposer.read_from(&mut reader).unwrap(); + + assert_eq!(interposer.current_offset().as_u64(), 0); + assert_eq!(interposer.buffered_len(), 10); + + checker.read_from(&mut interposer).unwrap(); + + assert_eq!(interposer.current_offset().as_u64(), 10); + assert!(interposer.buffer_is_empty()); + assert_eq!(interposer.buffered_len(), 0); + assert!(interposer.is_consumed()); + } + + #[test] + fn partial_test() { + let mut duplex = Reassembler::default(); + let mut reader = Data::new(10); + let mut checker = reader; + + let mut storage: Vec = vec![]; + { + let mut storage = storage.with_write_limit(9); + + let mut interposer = Interposer::new(&mut storage, &mut duplex); + + interposer.read_from(&mut reader).unwrap(); + } + + // the storage was at least half the reader + assert_eq!(storage.len(), 9); + assert_eq!(duplex.buffered_len(), 1); + + // move the reassembled bytes into the checker + checker.receive(&[&storage]); + checker.read_from(&mut duplex).unwrap(); + assert_eq!(duplex.current_offset().as_u64(), 10); + assert!(duplex.is_consumed()); + } +} diff --git a/quic/s2n-quic-core/src/buffer/error.rs b/quic/s2n-quic-core/src/buffer/error.rs new file mode 100644 index 0000000000..e9d6201d7f --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/error.rs @@ -0,0 +1,60 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum Error { + /// An invalid data range was provided + OutOfRange, + /// The provided final size was invalid for the buffer's state + InvalidFin, + /// The provided reader failed + ReaderError(E), +} + +impl From for Error { + #[inline] + fn from(reader: E) -> Self { + Self::ReaderError(reader) + } +} + +impl Error { + /// Maps from an infallible error into a more specific error + #[inline] + pub fn mapped(error: Error) -> Error { + match error { + Error::OutOfRange => Error::OutOfRange, + Error::InvalidFin => Error::InvalidFin, + Error::ReaderError(_) => unreachable!(), + } + } +} + +#[cfg(feature = "std")] +impl std::error::Error for Error {} + +impl core::fmt::Display for Error { + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + match self { + Self::OutOfRange => write!(f, "write extends out of the maximum possible offset"), + Self::InvalidFin => write!( + f, + "write modifies the final offset in a non-compliant manner" + ), + Self::ReaderError(reader) => write!(f, "the provided reader failed with: {reader}"), + } + } +} + +#[cfg(feature = "std")] +impl From> for std::io::Error { + #[inline] + fn from(error: Error) -> Self { + let kind = match &error { + Error::OutOfRange => std::io::ErrorKind::InvalidData, + Error::InvalidFin => std::io::ErrorKind::InvalidData, + Error::ReaderError(_) => std::io::ErrorKind::Other, + }; + Self::new(kind, error) + } +} diff --git a/quic/s2n-quic-core/src/buffer/mod.rs b/quic/s2n-quic-core/src/buffer/mod.rs deleted file mode 100644 index d252d8b391..0000000000 --- a/quic/s2n-quic-core/src/buffer/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 - -mod receive_buffer; - -pub use receive_buffer::*; diff --git a/quic/s2n-quic-core/src/buffer/reader.rs b/quic/s2n-quic-core/src/buffer/reader.rs new file mode 100644 index 0000000000..6382f5a434 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader.rs @@ -0,0 +1,77 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::varint::VarInt; + +pub mod checked; +mod complete; +mod empty; +pub mod incremental; +mod limit; +pub mod storage; + +pub use checked::Checked; +pub use complete::Complete; +pub use empty::Empty; +pub use incremental::Incremental; +pub use limit::Limit; +pub use storage::Storage; + +/// A buffer that can be read with a tracked offset and final position. +pub trait Reader: Storage { + /// Returns the currently read offset for the stream + fn current_offset(&self) -> VarInt; + + /// Returns the final offset for the stream + fn final_offset(&self) -> Option; + + /// Returns `true` if the reader has the final offset buffered + #[inline] + fn has_buffered_fin(&self) -> bool { + self.final_offset().map_or(false, |fin| { + let buffered_end = self + .current_offset() + .as_u64() + .saturating_add(self.buffered_len() as u64); + fin == buffered_end + }) + } + + /// Returns `true` if the reader is finished producing data + #[inline] + fn is_consumed(&self) -> bool { + self.final_offset() + .map_or(false, |fin| fin == self.current_offset()) + } + + /// Limits the maximum offset that the caller can read from the reader + #[inline] + fn with_max_data(&mut self, max_data: VarInt) -> Limit { + let max_buffered_len = max_data.saturating_sub(self.current_offset()); + let max_buffered_len = max_buffered_len.as_u64().min(self.buffered_len() as u64) as usize; + self.with_read_limit(max_buffered_len) + } + + /// Limits the maximum amount of data that the caller can read from the reader + #[inline] + fn with_read_limit(&mut self, max_buffered_len: usize) -> Limit { + Limit::new(self, max_buffered_len) + } + + /// Return an empty view onto the reader, with no change in current offset + #[inline] + fn with_empty_buffer(&self) -> Empty { + Empty::new(self) + } + + /// Enables checking the reader for correctness invariants + /// + /// # Note + /// + /// `debug_assertions` must be enabled for these checks to be performed. Otherwise, the reader + /// methods will simply be forwarded to `Self`. + #[inline] + fn with_checks(&mut self) -> Checked { + Checked::new(self) + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/checked.rs b/quic/s2n-quic-core/src/buffer/reader/checked.rs new file mode 100644 index 0000000000..361c457e08 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/checked.rs @@ -0,0 +1,227 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + buffer::{ + reader::{Reader, Storage}, + writer, + }, + varint::VarInt, +}; + +#[cfg(debug_assertions)] +use crate::buffer::reader::storage::Infallible; + +/// Ensures [`Reader`] invariants are held as each trait function is called +pub struct Checked<'a, R> +where + R: Reader + ?Sized, +{ + inner: &'a mut R, + #[cfg(debug_assertions)] + chunk: alloc::vec::Vec, +} + +impl<'a, R> Checked<'a, R> +where + R: Reader + ?Sized, +{ + #[inline(always)] + pub fn new(inner: &'a mut R) -> Self { + Self { + inner, + #[cfg(debug_assertions)] + chunk: Default::default(), + } + } +} + +/// Forward on to the inner reader when debug_assertions are disabled +#[cfg(not(debug_assertions))] +impl<'a, R> Storage for Checked<'a, R> +where + R: Reader + ?Sized, +{ + type Error = R::Error; + + #[inline(always)] + fn buffered_len(&self) -> usize { + self.inner.buffered_len() + } + + #[inline(always)] + fn buffer_is_empty(&self) -> bool { + self.inner.buffer_is_empty() + } + + #[inline(always)] + fn read_chunk(&mut self, watermark: usize) -> Result, Self::Error> { + self.inner.read_chunk(watermark) + } + + #[inline(always)] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result, Self::Error> + where + Dest: writer::Storage + ?Sized, + { + self.inner.partial_copy_into(dest) + } + + #[inline(always)] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: writer::Storage + ?Sized, + { + self.inner.copy_into(dest) + } +} + +#[cfg(debug_assertions)] +impl<'a, R> Storage for Checked<'a, R> +where + R: Reader + ?Sized, +{ + type Error = R::Error; + + #[inline] + fn buffered_len(&self) -> usize { + self.inner.buffered_len() + } + + #[inline] + fn buffer_is_empty(&self) -> bool { + self.inner.buffer_is_empty() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result, Self::Error> { + let snapshot = Snapshot::new(self.inner, watermark); + + let mut chunk = self.inner.read_chunk(watermark)?; + + // copy the returned chunk into another buffer so we can read the `inner` state + self.chunk.clear(); + chunk.infallible_copy_into(&mut self.chunk); + + snapshot.check(self.inner, 0, self.chunk.len()); + + Ok(self.chunk[..].into()) + } + + #[inline] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result, Self::Error> + where + Dest: writer::Storage + ?Sized, + { + let snapshot = Snapshot::new(self.inner, dest.remaining_capacity()); + let mut dest = dest.track_write(); + + let mut chunk = self.inner.partial_copy_into(&mut dest)?; + + // copy the returned chunk into another buffer so we can read the `inner` state + self.chunk.clear(); + chunk.infallible_copy_into(&mut self.chunk); + + snapshot.check(self.inner, dest.written_len(), self.chunk.len()); + + Ok(self.chunk[..].into()) + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: writer::Storage + ?Sized, + { + let snapshot = Snapshot::new(self.inner, dest.remaining_capacity()); + let mut dest = dest.track_write(); + + self.inner.copy_into(&mut dest)?; + + snapshot.check(self.inner, dest.written_len(), 0); + + Ok(()) + } +} + +impl<'a, R> Reader for Checked<'a, R> +where + R: Reader + ?Sized, +{ + #[inline(always)] + fn current_offset(&self) -> VarInt { + self.inner.current_offset() + } + + #[inline(always)] + fn final_offset(&self) -> Option { + self.inner.final_offset() + } + + #[inline(always)] + fn has_buffered_fin(&self) -> bool { + self.inner.has_buffered_fin() + } + + #[inline(always)] + fn is_consumed(&self) -> bool { + self.inner.is_consumed() + } +} + +#[cfg(debug_assertions)] +struct Snapshot { + current_offset: VarInt, + final_offset: Option, + buffered_len: usize, + dest_capacity: usize, +} + +#[cfg(debug_assertions)] +impl Snapshot { + #[inline] + fn new(reader: &R, dest_capacity: usize) -> Self { + let current_offset = reader.current_offset(); + let final_offset = reader.final_offset(); + let buffered_len = reader.buffered_len(); + Self { + current_offset, + final_offset, + buffered_len, + dest_capacity, + } + } + + #[inline] + fn check(&self, reader: &R, dest_written_len: usize, chunk_len: usize) { + assert!( + chunk_len <= self.dest_capacity, + "chunk exceeded destination" + ); + + let write_len = reader.current_offset() - self.current_offset; + + assert_eq!( + dest_written_len as u64 + chunk_len as u64, + write_len.as_u64(), + "{} reader misreporting offsets", + core::any::type_name::(), + ); + + assert!(write_len <= self.buffered_len as u64); + + if self.final_offset.is_some() { + assert_eq!( + reader.final_offset(), + self.final_offset, + "{} reader changed final offset", + core::any::type_name::(), + ) + } + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/complete.rs b/quic/s2n-quic-core/src/buffer/reader/complete.rs new file mode 100644 index 0000000000..4dca4b8e3a --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/complete.rs @@ -0,0 +1,157 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + buffer::{ + reader::{storage::Chunk, Reader, Storage}, + writer, Error, + }, + varint::VarInt, +}; + +/// Wraps a single [`Storage`] instance as a [`Reader`]. +/// +/// This can be used for scenarios where the entire stream is buffered and known up-front. +#[derive(Debug)] +pub struct Complete<'a, S> { + storage: &'a mut S, + current_offset: VarInt, + final_offset: VarInt, +} + +impl<'a, S> Complete<'a, S> +where + S: Storage, +{ + #[inline] + pub fn new(storage: &'a mut S) -> Result { + let final_offset = VarInt::try_from(storage.buffered_len()) + .ok() + .ok_or(Error::OutOfRange)?; + Ok(Self { + storage, + current_offset: VarInt::ZERO, + final_offset, + }) + } +} + +impl<'a, S> Storage for Complete<'a, S> +where + S: Storage, +{ + type Error = S::Error; + + #[inline] + fn buffered_len(&self) -> usize { + self.storage.buffered_len() + } + + #[inline] + fn buffer_is_empty(&self) -> bool { + self.storage.buffer_is_empty() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + let chunk = self.storage.read_chunk(watermark)?; + self.current_offset += chunk.len(); + Ok(chunk) + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result + where + Dest: writer::Storage + ?Sized, + { + let mut dest = dest.track_write(); + let chunk = self.storage.partial_copy_into(&mut dest)?; + self.current_offset += chunk.len(); + self.current_offset += dest.written_len(); + Ok(chunk) + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: writer::Storage + ?Sized, + { + let mut dest = dest.track_write(); + self.storage.copy_into(&mut dest)?; + self.current_offset += dest.written_len(); + Ok(()) + } +} + +impl<'a, C> Reader for Complete<'a, C> +where + C: Storage, +{ + #[inline] + fn current_offset(&self) -> VarInt { + self.current_offset + } + + #[inline] + fn final_offset(&self) -> Option { + Some(self.final_offset) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn read_chunk_test() { + let mut storage: &[u8] = &[1, 2, 3, 4]; + let mut reader = Complete::new(&mut storage).unwrap(); + let mut reader = reader.with_checks(); + + assert_eq!(reader.current_offset(), VarInt::ZERO); + assert_eq!(reader.final_offset(), Some(VarInt::from_u8(4))); + + let chunk = reader.read_chunk(usize::MAX).unwrap(); + assert_eq!(&*chunk, &[1, 2, 3, 4]); + + assert_eq!(reader.current_offset(), VarInt::from_u8(4)); + assert!(reader.buffer_is_empty()); + } + + #[test] + fn partial_copy_test() { + let mut storage: &[u8] = &[1, 2, 3, 4]; + let mut reader = Complete::new(&mut storage).unwrap(); + let mut reader = reader.with_checks(); + + assert_eq!(reader.current_offset(), VarInt::ZERO); + assert_eq!(reader.final_offset(), Some(VarInt::from_u8(4))); + + let mut dest: &mut [u8] = &mut [0; 4]; + let chunk = reader.partial_copy_into(&mut dest).unwrap(); + assert_eq!(&*chunk, &[1, 2, 3, 4]); + + assert_eq!(reader.current_offset(), VarInt::from_u8(4)); + assert!(reader.buffer_is_empty()); + } + + #[test] + fn copy_test() { + let mut storage: &[u8] = &[1, 2, 3, 4]; + let mut reader = Complete::new(&mut storage).unwrap(); + let mut reader = reader.with_checks(); + + assert_eq!(reader.current_offset(), VarInt::ZERO); + assert_eq!(reader.final_offset(), Some(VarInt::from_u8(4))); + + let mut dest = [0; 4]; + { + let mut dest = &mut dest[..]; + reader.copy_into(&mut dest).unwrap(); + } + assert_eq!(&dest[..], &[1, 2, 3, 4]); + + assert_eq!(reader.current_offset(), VarInt::from_u8(4)); + assert!(reader.buffer_is_empty()); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/empty.rs b/quic/s2n-quic-core/src/buffer/reader/empty.rs new file mode 100644 index 0000000000..26d927914f --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/empty.rs @@ -0,0 +1,84 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + buffer::{ + reader::{storage::Chunk, Reader, Storage}, + writer, + }, + varint::VarInt, +}; + +/// Returns an empty buffer for the current offset of an inner reader +#[derive(Debug)] +pub struct Empty<'a, R: Reader + ?Sized>(&'a R); + +impl<'a, R: Reader + ?Sized> Empty<'a, R> { + #[inline] + pub fn new(reader: &'a R) -> Self { + Self(reader) + } +} + +impl<'a, R: Reader + ?Sized> Storage for Empty<'a, R> { + type Error = core::convert::Infallible; + + #[inline(always)] + fn buffered_len(&self) -> usize { + 0 + } + + #[inline(always)] + fn read_chunk(&mut self, _watermark: usize) -> Result { + Ok(Chunk::empty()) + } + + #[inline(always)] + fn partial_copy_into(&mut self, _dest: &mut Dest) -> Result + where + Dest: writer::Storage + ?Sized, + { + Ok(Chunk::empty()) + } +} + +impl<'a, R: Reader + ?Sized> Reader for Empty<'a, R> { + #[inline] + fn current_offset(&self) -> VarInt { + self.0.current_offset() + } + + #[inline] + fn final_offset(&self) -> Option { + self.0.final_offset() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::stream::testing::Data; + + #[test] + fn empty_test() { + let mut reader = Data::new(1000); + assert_eq!(reader.buffered_len(), 1000); + let mut reader = reader.with_checks(); + + { + assert_eq!(reader.with_empty_buffer().buffered_len(), 0); + } + + let mut dest = &mut [0u8; 16][..]; + let chunk = reader.partial_copy_into(&mut dest).unwrap(); + assert_eq!(chunk.len(), 16); + + let mut reader = reader.with_empty_buffer(); + + assert_eq!(reader.buffered_len(), 0); + assert!(reader.buffer_is_empty()); + + let chunk = reader.partial_copy_into(&mut dest).unwrap(); + assert_eq!(chunk.len(), 0); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/incremental.rs b/quic/s2n-quic-core/src/buffer/reader/incremental.rs new file mode 100644 index 0000000000..78a4b4ff0a --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/incremental.rs @@ -0,0 +1,176 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + buffer::{ + reader::{storage::Chunk, Reader, Storage}, + writer, Error, + }, + ensure, + varint::VarInt, +}; + +/// Implements an incremental [`Reader`] that joins to temporary [`Storage`] as the stream data +/// +/// This is useful for scenarios where the the stream isn't completely buffered in memory and +/// data come in gradually. +#[derive(Debug, Default)] +pub struct Incremental { + current_offset: VarInt, + final_offset: Option, +} + +impl Incremental { + #[inline] + pub fn with_storage<'a, C: Storage>( + &'a mut self, + storage: &'a mut C, + is_fin: bool, + ) -> Result, Error> { + let mut storage = WithStorage { + incremental: self, + storage, + }; + + if is_fin { + storage.set_fin()?; + } else { + ensure!( + storage.incremental.final_offset.is_none(), + Err(Error::InvalidFin) + ); + } + + Ok(storage) + } + + #[inline] + pub fn current_offset(&self) -> VarInt { + self.current_offset + } + + #[inline] + pub fn final_offset(&self) -> Option { + self.final_offset + } +} + +pub struct WithStorage<'a, C: Storage> { + incremental: &'a mut Incremental, + storage: &'a mut C, +} + +impl<'a, C: Storage> WithStorage<'a, C> { + #[inline] + pub fn set_fin(&mut self) -> Result<&mut Self, Error> { + let final_offset = self + .incremental + .current_offset + .checked_add_usize(self.buffered_len()) + .ok_or(Error::OutOfRange)?; + + // make sure the final length doesn't change + if let Some(current) = self.incremental.final_offset { + ensure!(final_offset == current, Err(Error::InvalidFin)); + } + + self.incremental.final_offset = Some(final_offset); + + Ok(self) + } +} + +impl<'a, C: Storage> Storage for WithStorage<'a, C> { + type Error = C::Error; + + #[inline] + fn buffered_len(&self) -> usize { + self.storage.buffered_len() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + let chunk = self.storage.read_chunk(watermark)?; + self.incremental.current_offset += chunk.len(); + Ok(chunk) + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result + where + Dest: writer::Storage + ?Sized, + { + let mut dest = dest.track_write(); + let chunk = self.storage.partial_copy_into(&mut dest)?; + self.incremental.current_offset += chunk.len(); + self.incremental.current_offset += dest.written_len(); + Ok(chunk) + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: writer::Storage + ?Sized, + { + let mut dest = dest.track_write(); + self.storage.copy_into(&mut dest)?; + self.incremental.current_offset += dest.written_len(); + Ok(()) + } +} + +impl<'a, C: Storage> Reader for WithStorage<'a, C> { + #[inline] + fn current_offset(&self) -> VarInt { + self.incremental.current_offset() + } + + #[inline] + fn final_offset(&self) -> Option { + self.incremental.final_offset() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn incremental_test() { + let mut incremental = Incremental::default(); + + assert_eq!(incremental.current_offset(), VarInt::ZERO); + assert_eq!(incremental.final_offset(), None); + + { + let mut chunk: &[u8] = &[1, 2, 3, 4]; + let mut reader = incremental.with_storage(&mut chunk, false).unwrap(); + let mut reader = reader.with_checks(); + + assert_eq!(reader.buffered_len(), 4); + + let mut dest: &mut [u8] = &mut [0; 4]; + let trailing_chunk = reader.partial_copy_into(&mut dest).unwrap(); + assert_eq!(&*trailing_chunk, &[1, 2, 3, 4]); + + assert_eq!(reader.buffered_len(), 0); + } + + assert_eq!(incremental.current_offset(), VarInt::from_u8(4)); + + { + let mut chunk: &[u8] = &[5, 6, 7, 8]; + let mut reader = incremental.with_storage(&mut chunk, true).unwrap(); + let mut reader = reader.with_checks(); + + assert_eq!(reader.buffered_len(), 4); + + let trailing_chunk = reader.read_chunk(usize::MAX).unwrap(); + assert_eq!(&*trailing_chunk, &[5, 6, 7, 8]); + + assert_eq!(reader.buffered_len(), 0); + assert!(reader.buffer_is_empty()); + assert!(reader.is_consumed()); + } + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/limit.rs b/quic/s2n-quic-core/src/buffer/reader/limit.rs new file mode 100644 index 0000000000..0c8736c3c1 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/limit.rs @@ -0,0 +1,120 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + buffer::{ + reader::{storage::Chunk, Reader, Storage}, + writer::{self, Storage as _}, + }, + varint::VarInt, +}; + +/// Wraps a reader and limits the amount of data that can be read from it +/// +/// This can be used for applying back pressure to the reader with flow control. +pub struct Limit<'a, R: Reader + ?Sized> { + len: usize, + reader: &'a mut R, +} + +impl<'a, R: Reader + ?Sized> Limit<'a, R> { + #[inline] + pub fn new(reader: &'a mut R, max_buffered_len: usize) -> Self { + let len = max_buffered_len.min(reader.buffered_len()); + + Self { len, reader } + } +} + +impl<'a, R: Reader + ?Sized> Storage for Limit<'a, R> { + type Error = R::Error; + + #[inline] + fn buffered_len(&self) -> usize { + self.len + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + let watermark = self.len.min(watermark); + let chunk = self.reader.read_chunk(watermark)?; + unsafe { + assume!(chunk.len() <= self.len); + } + self.len -= chunk.len(); + Ok(chunk) + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result + where + Dest: writer::Storage + ?Sized, + { + let mut dest = dest.with_write_limit(self.len); + let mut dest = dest.track_write(); + let chunk = self.reader.partial_copy_into(&mut dest)?; + let len = dest.written_len() + chunk.len(); + unsafe { + assume!(len <= self.len); + } + self.len -= len; + Ok(chunk) + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: writer::Storage + ?Sized, + { + let mut dest = dest.with_write_limit(self.len); + let mut dest = dest.track_write(); + self.reader.copy_into(&mut dest)?; + let len = dest.written_len(); + unsafe { + assume!(len <= self.len); + } + self.len -= len; + Ok(()) + } +} + +impl<'a, R: Reader + ?Sized> Reader for Limit<'a, R> { + #[inline] + fn current_offset(&self) -> VarInt { + self.reader.current_offset() + } + + #[inline] + fn final_offset(&self) -> Option { + self.reader.final_offset() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::stream::testing::Data; + + #[test] + fn max_data_test() { + let mut reader = Data::new(1000); + assert_eq!(reader.buffered_len(), 1000); + let mut reader = reader.with_checks(); + + let max_data = 32usize; + + let mut reader = reader.with_max_data(VarInt::from_u8(max_data as _)); + assert_eq!(reader.buffered_len(), max_data); + + let mut dest = &mut [0u8; 16][..]; + let chunk = reader.partial_copy_into(&mut dest).unwrap(); + assert_eq!(chunk.len(), 16); + + assert_eq!(reader.buffered_len(), max_data - 16); + + let mut dest = &mut [0u8; 16][..]; + let chunk = reader.partial_copy_into(&mut dest).unwrap(); + assert_eq!(chunk.len(), 16); + assert!(reader.buffer_is_empty()); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/storage.rs b/quic/s2n-quic-core/src/buffer/reader/storage.rs new file mode 100644 index 0000000000..803568f737 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/storage.rs @@ -0,0 +1,78 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +mod buf; +mod bytes; +mod chunk; +mod empty; +mod full_copy; +mod infallible; +mod io_slice; +mod slice; +mod tracked; + +#[cfg(test)] +mod tests; + +pub use buf::Buf; +pub use chunk::Chunk; +pub use empty::Empty; +pub use full_copy::FullCopy; +pub use infallible::Infallible; +pub use io_slice::IoSlice; +pub use tracked::Tracked; + +pub trait Storage { + type Error; + + /// Returns the length of the chunk + fn buffered_len(&self) -> usize; + + /// Returns if the chunk is empty + #[inline] + fn buffer_is_empty(&self) -> bool { + self.buffered_len() == 0 + } + + /// Reads the current contiguous chunk + fn read_chunk(&mut self, watermark: usize) -> Result, Self::Error>; + + /// Copies the reader into `dest`, with a trailing chunk of bytes. + /// + /// Implementations should either fill the `dest` completely or exhaust the buffered data. + /// + /// The storage also returns a `Chunk`, which can be used by the caller to defer + /// copying the trailing chunk until later. The returned chunk must fit into the target + /// destination. The caller must eventually copy the chunk into the destination, otherwise this + /// data will be discarded. + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result, Self::Error> + where + Dest: crate::buffer::writer::Storage + ?Sized; + + /// Copies the reader into `dest`. + /// + /// Implementations should either fill the `dest` completely or exhaust the buffered data. + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + let mut chunk = self.partial_copy_into(dest)?; + chunk.infallible_copy_into(dest); + Ok(()) + } + + /// Forces the entire reader to be copied, even when calling `partial_copy_into`. + /// + /// The returned `Chunk` from `partial_copy_into` will always be empty. + #[inline] + fn full_copy(&mut self) -> FullCopy { + FullCopy::new(self) + } + + /// Tracks the number of bytes read from the storage + #[inline] + fn track_read(&mut self) -> Tracked { + Tracked::new(self) + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/storage/buf.rs b/quic/s2n-quic-core/src/buffer/reader/storage/buf.rs new file mode 100644 index 0000000000..3078d80e57 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/storage/buf.rs @@ -0,0 +1,148 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + assume, + buffer::{ + reader::{storage::Chunk, Storage}, + writer, + }, + ensure, +}; +use core::cmp::Ordering; + +/// Implementation of [`Storage`] that delegates to a [`bytes::Buf`] implementation. +pub struct Buf<'a, B: bytes::Buf> { + buf: &'a mut B, + /// tracks the number of bytes that need to be advanced in the Buf + pending: usize, +} + +impl<'a, B> Buf<'a, B> +where + B: bytes::Buf, +{ + #[inline] + pub fn new(buf: &'a mut B) -> Self { + Self { buf, pending: 0 } + } + + /// Advances any pending bytes that has been read in the underlying Buf + #[inline] + fn commit_pending(&mut self) { + ensure!(self.pending > 0); + unsafe { + assume!(self.buf.remaining() >= self.pending); + assume!(self.buf.chunk().len() >= self.pending); + } + self.buf.advance(self.pending); + self.pending = 0; + } +} + +impl<'a, B> Storage for Buf<'a, B> +where + B: bytes::Buf, +{ + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + unsafe { + assume!(self.buf.remaining() >= self.pending); + assume!(self.buf.chunk().len() >= self.pending); + } + self.buf.remaining() - self.pending + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + self.commit_pending(); + let chunk = self.buf.chunk(); + let len = chunk.len().min(watermark); + self.pending = len; + Ok(chunk[..len].into()) + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result + where + Dest: writer::Storage + ?Sized, + { + self.commit_pending(); + + ensure!(dest.has_remaining_capacity(), Ok(Chunk::empty())); + + loop { + let chunk_len = self.buf.chunk().len(); + + if chunk_len == 0 { + debug_assert_eq!( + self.buf.remaining(), + 0, + "buf returned empty chunk with remaining bytes" + ); + return Ok(Chunk::empty()); + } + + match chunk_len.cmp(&dest.remaining_capacity()) { + // if there's more chunks left, then copy this one out and keep going + Ordering::Less if self.buf.remaining() > chunk_len => { + if Dest::SPECIALIZES_BYTES { + let chunk = self.buf.copy_to_bytes(chunk_len); + dest.put_bytes(chunk); + } else { + dest.put_slice(self.buf.chunk()); + self.buf.advance(chunk_len); + } + continue; + } + Ordering::Less | Ordering::Equal => { + let chunk = self.buf.chunk(); + self.pending = chunk.len(); + return Ok(chunk.into()); + } + Ordering::Greater => { + let len = dest.remaining_capacity(); + let chunk = &self.buf.chunk()[..len]; + self.pending = len; + return Ok(chunk.into()); + } + } + } + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: writer::Storage + ?Sized, + { + self.commit_pending(); + + loop { + let chunk = self.buf.chunk(); + let len = chunk.len().min(dest.remaining_capacity()); + + ensure!(len > 0, Ok(())); + + if Dest::SPECIALIZES_BYTES { + let chunk = self.buf.copy_to_bytes(len); + dest.put_bytes(chunk); + } else { + dest.put_slice(&chunk[..len]); + self.buf.advance(len); + } + } + } +} + +impl<'a, B> Drop for Buf<'a, B> +where + B: bytes::Buf, +{ + #[inline] + fn drop(&mut self) { + // make sure we advance the consumed bytes on drop + self.commit_pending(); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/storage/bytes.rs b/quic/s2n-quic-core/src/buffer/reader/storage/bytes.rs new file mode 100644 index 0000000000..fe3e88f1a0 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/storage/bytes.rs @@ -0,0 +1,123 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::buffer::{ + reader::{storage::Chunk, Storage}, + writer, +}; +use bytes::{Bytes, BytesMut}; + +impl Storage for BytesMut { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + let len = self.len().min(watermark); + Ok(self.split_to(len).into()) + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result + where + Dest: writer::Storage + ?Sized, + { + self.read_chunk(dest.remaining_capacity()) + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: writer::Storage + ?Sized, + { + let watermark = self.len().min(dest.remaining_capacity()); + + if Dest::SPECIALIZES_BYTES_MUT { + let buffer = self.split_to(watermark); + dest.put_bytes_mut(buffer); + } else if Dest::SPECIALIZES_BYTES { + let buffer = self.split_to(watermark); + dest.put_bytes(buffer.freeze()); + } else { + // copy bytes into the destination buf + dest.put_slice(&self[..watermark]); + // advance the chunk rather than splitting to avoid refcount churn + bytes::Buf::advance(self, watermark) + } + + Ok(()) + } +} + +impl Storage for Bytes { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + let len = self.len().min(watermark); + Ok(self.split_to(len).into()) + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result + where + Dest: writer::Storage + ?Sized, + { + self.read_chunk(dest.remaining_capacity()) + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: writer::Storage + ?Sized, + { + let watermark = self.len().min(dest.remaining_capacity()); + + if Dest::SPECIALIZES_BYTES { + let buffer = self.split_to(watermark); + dest.put_bytes(buffer); + } else { + // copy bytes into the destination buf + dest.put_slice(&self[..watermark]); + // advance the chunk rather than splitting to avoid refcount churn + bytes::Buf::advance(self, watermark) + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use writer::Storage as _; + + #[test] + fn bytes_into_queue_test() { + let mut reader = Bytes::from_static(b"hello world"); + + let mut writer: Vec = vec![]; + { + let mut writer = writer.with_write_limit(5); + let chunk = reader.partial_copy_into(&mut writer).unwrap(); + assert_eq!(&chunk[..], b"hello"); + } + + assert!(writer.is_empty()); + assert_eq!(&reader[..], b" world"); + + reader.copy_into(&mut writer).unwrap(); + + assert_eq!(writer.len(), 1); + assert_eq!(&writer.pop().unwrap()[..], b" world"); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/storage/chunk.rs b/quic/s2n-quic-core/src/buffer/reader/storage/chunk.rs new file mode 100644 index 0000000000..3fb7b04c81 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/storage/chunk.rs @@ -0,0 +1,106 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::buffer::{reader::Storage, writer}; +use bytes::{Bytes, BytesMut}; + +/// Concrete chunk of bytes +/// +/// This can be returned to allow the caller to defer copying the data until later. +#[derive(Clone, Debug)] +#[must_use = "Chunk should not be discarded"] +pub enum Chunk<'a> { + Slice(&'a [u8]), + Bytes(Bytes), + BytesMut(BytesMut), +} + +impl<'a> Default for Chunk<'a> { + #[inline] + fn default() -> Self { + Self::empty() + } +} + +impl<'a> Chunk<'a> { + #[inline] + pub fn empty() -> Self { + Self::Slice(&[]) + } +} + +impl<'a> From<&'a [u8]> for Chunk<'a> { + #[inline] + fn from(chunk: &'a [u8]) -> Self { + Self::Slice(chunk) + } +} + +impl<'a> From for Chunk<'a> { + #[inline] + fn from(chunk: Bytes) -> Self { + Self::Bytes(chunk) + } +} + +impl<'a> From for Chunk<'a> { + #[inline] + fn from(chunk: BytesMut) -> Self { + Self::BytesMut(chunk) + } +} + +impl<'a> core::ops::Deref for Chunk<'a> { + type Target = [u8]; + + #[inline] + fn deref(&self) -> &Self::Target { + match self { + Self::Slice(t) => t, + Self::Bytes(t) => t, + Self::BytesMut(t) => t, + } + } +} + +impl<'a> Storage for Chunk<'a> { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + match self { + Self::Slice(v) => v.read_chunk(watermark), + Self::Bytes(v) => v.read_chunk(watermark), + Self::BytesMut(v) => v.read_chunk(watermark), + } + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result + where + Dest: writer::Storage + ?Sized, + { + match self { + Self::Slice(v) => v.partial_copy_into(dest), + Self::Bytes(v) => v.partial_copy_into(dest), + Self::BytesMut(v) => v.partial_copy_into(dest), + } + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: writer::Storage + ?Sized, + { + match self { + Self::Slice(v) => v.copy_into(dest), + Self::Bytes(v) => v.copy_into(dest), + Self::BytesMut(v) => v.copy_into(dest), + } + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/storage/empty.rs b/quic/s2n-quic-core/src/buffer/reader/storage/empty.rs new file mode 100644 index 0000000000..78b03880ec --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/storage/empty.rs @@ -0,0 +1,65 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::buffer::{ + reader::{storage::Chunk, Storage}, + writer, +}; + +/// An empty reader [`Storage`] +#[derive(Clone, Copy, Debug, Default)] +pub struct Empty; + +impl Storage for Empty { + type Error = core::convert::Infallible; + + #[inline(always)] + fn buffered_len(&self) -> usize { + 0 + } + + #[inline(always)] + fn buffer_is_empty(&self) -> bool { + true + } + + #[inline(always)] + fn read_chunk(&mut self, _watermark: usize) -> Result, Self::Error> { + Ok(Chunk::empty()) + } + + #[inline(always)] + fn partial_copy_into(&mut self, _dest: &mut Dest) -> Result, Self::Error> + where + Dest: writer::Storage + ?Sized, + { + Ok(Chunk::empty()) + } + + #[inline] + fn copy_into(&mut self, _dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: writer::Storage + ?Sized, + { + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn empty_test() { + let mut reader = Empty; + let mut writer: Vec = vec![]; + + let chunk = reader.partial_copy_into(&mut writer).unwrap(); + assert_eq!(chunk.len(), 0); + assert_eq!(writer.len(), 0); + + reader.copy_into(&mut writer).unwrap(); + + assert_eq!(writer.len(), 0); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/storage/full_copy.rs b/quic/s2n-quic-core/src/buffer/reader/storage/full_copy.rs new file mode 100644 index 0000000000..224abb42d2 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/storage/full_copy.rs @@ -0,0 +1,72 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::buffer::{ + reader::{storage::Chunk, Storage}, + writer, +}; + +/// Forces a full copy, even when `partial_copy_into` is called +#[derive(Debug)] +pub struct FullCopy<'a, S: Storage + ?Sized>(&'a mut S); + +impl<'a, S: Storage + ?Sized> FullCopy<'a, S> { + #[inline] + pub fn new(storage: &'a mut S) -> Self { + Self(storage) + } +} + +impl<'a, S: Storage + ?Sized> Storage for FullCopy<'a, S> { + type Error = S::Error; + + #[inline] + fn buffered_len(&self) -> usize { + self.0.buffered_len() + } + + #[inline] + fn buffer_is_empty(&self) -> bool { + self.0.buffer_is_empty() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + self.0.read_chunk(watermark) + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result + where + Dest: writer::Storage + ?Sized, + { + // force the full copy + self.0.copy_into(dest)?; + Ok(Chunk::empty()) + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: writer::Storage + ?Sized, + { + self.0.copy_into(dest) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn full_copy_test() { + let mut reader = &b"hello world"[..]; + let len = reader.len(); + let mut reader = reader.full_copy(); + let mut writer: Vec = vec![]; + + let chunk = reader.partial_copy_into(&mut writer).unwrap(); + assert_eq!(chunk.len(), 0); + assert_eq!(writer.len(), len); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/storage/infallible.rs b/quic/s2n-quic-core/src/buffer/reader/storage/infallible.rs new file mode 100644 index 0000000000..e652807d35 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/storage/infallible.rs @@ -0,0 +1,36 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::buffer::{ + reader::{storage::Chunk, Storage}, + writer, +}; + +// unwrapping an infallible error doesn't panic +// https://godbolt.org/z/7v5MWdvGa + +/// [`Storage`] implementation that cannot fail +pub trait Infallible: Storage { + #[inline(always)] + fn infallible_read_chunk(&mut self, watermark: usize) -> Chunk<'_> { + self.read_chunk(watermark).unwrap() + } + + #[inline(always)] + fn infallible_partial_copy_into(&mut self, dest: &mut Dest) -> Chunk<'_> + where + Dest: writer::Storage + ?Sized, + { + self.partial_copy_into(dest).unwrap() + } + + #[inline] + fn infallible_copy_into(&mut self, dest: &mut Dest) + where + Dest: writer::Storage + ?Sized, + { + self.copy_into(dest).unwrap() + } +} + +impl Infallible for T where T: Storage + ?Sized {} diff --git a/quic/s2n-quic-core/src/buffer/reader/storage/io_slice.rs b/quic/s2n-quic-core/src/buffer/reader/storage/io_slice.rs new file mode 100644 index 0000000000..ec2513694b --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/storage/io_slice.rs @@ -0,0 +1,369 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + assume, + buffer::{ + reader::{storage::Chunk, Storage}, + writer, + }, +}; +use core::{cmp::Ordering, ops::ControlFlow}; + +/// A vectored reader [`Storage`] +pub struct IoSlice<'a, T> { + len: usize, + head: &'a [u8], + buf: &'a [T], +} + +impl<'a, T> IoSlice<'a, T> +where + T: core::ops::Deref, +{ + #[inline] + pub fn new(buf: &'a [T]) -> Self { + let mut len = 0; + let mut first_non_empty = usize::MAX; + let mut last_non_empty = 0; + + // find the total length and the first non-empty slice + for (idx, buf) in buf.iter().enumerate() { + len += buf.len(); + if !buf.is_empty() { + last_non_empty = idx; + first_non_empty = first_non_empty.min(idx); + } + } + + // if there are no filled slices then return the base case + if len == 0 { + return Self { + len: 0, + head: &[], + buf: &[], + }; + } + + let buf = unsafe { + // Safety: we checked above that this range is at least 1 element and is in-bounds + buf.get_unchecked(first_non_empty..=last_non_empty) + }; + + let mut slice = Self { + len, + head: &[], + buf, + }; + slice.advance_buf_once(); + slice.invariants(); + slice + } + + #[inline(always)] + fn advance_buf(&mut self) { + // keep advancing the buffer until we get a non-empty slice + while self.head.is_empty() && !self.buf.is_empty() { + self.advance_buf_once(); + } + } + + #[inline(always)] + fn advance_buf_once(&mut self) { + unsafe { + assume!(!self.buf.is_empty()); + } + let (head, tail) = self.buf.split_at(1); + self.head = &head[0][..]; + self.buf = tail; + } + + #[inline] + fn sub_len(&mut self, len: usize) { + unsafe { + assume!(self.len >= len); + } + self.set_len(self.len - len); + } + + #[inline] + fn set_len(&mut self, len: usize) { + self.len = len; + self.invariants(); + } + + #[inline] + fn read_chunk_control_flow(&mut self, watermark: usize) -> ControlFlow, Chunk<'a>> { + // we only have one chunk left so do the happy path + if self.buf.is_empty() { + let len = self.head.len().min(watermark); + let (head, tail) = self.head.split_at(len); + self.head = tail; + self.set_len(tail.len()); + return ControlFlow::Break(head.into()); + } + + match self.head.len().cmp(&watermark) { + // head can be consumed and the watermark still has capacity + Ordering::Less => { + let head = core::mem::take(&mut self.head); + self.advance_buf(); + self.sub_len(head.len()); + ControlFlow::Continue(head.into()) + } + // head can be consumed and the watermark is filled + Ordering::Equal => { + let head = core::mem::take(&mut self.head); + self.advance_buf(); + self.sub_len(head.len()); + ControlFlow::Break(head.into()) + } + // head is partially consumed and the watermark is filled + Ordering::Greater => { + unsafe { + assume!(self.head.len() >= watermark); + } + let (head, tail) = self.head.split_at(watermark); + self.head = tail; + self.sub_len(head.len()); + ControlFlow::Break(head.into()) + } + } + } + + #[inline(always)] + fn invariants(&self) { + #[cfg(debug_assertions)] + { + // make sure the computed len matches the actual remaining len + let mut computed = self.head.len(); + for buf in self.buf.iter() { + computed += buf.len(); + } + assert_eq!(self.len, computed); + + if self.head.is_empty() { + assert!(self.buf.is_empty()); + assert_eq!(self.len, 0); + } + } + } +} + +impl<'a, T> bytes::Buf for IoSlice<'a, T> +where + T: core::ops::Deref, +{ + #[inline] + fn remaining(&self) -> usize { + self.len + } + + #[inline] + fn chunk(&self) -> &[u8] { + self.head + } + + /// Advances through the vectored slices by `cnt` bytes + #[inline] + fn advance(&mut self, mut cnt: usize) { + assert!(cnt <= self.len); + let new_len = self.len - cnt; + + // special-case for when we read the entire thing + if new_len == 0 { + self.head = &[]; + self.buf = &[]; + self.set_len(new_len); + return; + } + + while cnt > 0 { + let len = self.head.len().min(cnt); + cnt -= len; + + if len >= self.head.len() { + unsafe { + assume!(!self.buf.is_empty()); + } + + self.head = &[]; + self.advance_buf(); + continue; + } + + self.head = &self.head[len..]; + break; + } + + self.set_len(new_len); + } +} + +impl<'a, T> Storage for IoSlice<'a, T> +where + T: core::ops::Deref, +{ + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + Ok(match self.read_chunk_control_flow(watermark) { + ControlFlow::Continue(chunk) => chunk, + ControlFlow::Break(chunk) => chunk, + }) + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result + where + Dest: writer::Storage + ?Sized, + { + ensure!(dest.has_remaining_capacity(), Ok(Chunk::empty())); + + loop { + match self.read_chunk_control_flow(dest.remaining_capacity()) { + ControlFlow::Continue(chunk) => { + dest.put_slice(&chunk); + continue; + } + ControlFlow::Break(chunk) => return Ok(chunk), + } + } + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: writer::Storage + ?Sized, + { + ensure!(dest.has_remaining_capacity(), Ok(())); + + loop { + match self.read_chunk_control_flow(dest.remaining_capacity()) { + ControlFlow::Continue(chunk) => { + dest.put_slice(&chunk); + continue; + } + ControlFlow::Break(chunk) => { + if !chunk.is_empty() { + dest.put_slice(&chunk); + } + return Ok(()); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::buffer::reader::storage::Buf; + + /// ensures each storage type correctly copies multiple chunks into the destination + #[test] + #[cfg_attr(miri, ignore)] // This test is too expensive for miri to complete in a reasonable amount of time + fn io_slice_test() { + let mut dest = vec![]; + let mut source: Vec> = vec![]; + let mut pool = vec![]; + let mut expected = vec![]; + bolero::check!() + .with_type::<(u16, Vec)>() + .for_each(|(max_dest_len, source_lens)| { + while source.len() > source_lens.len() { + pool.push(source.pop().unwrap()); + } + + while source.len() < source_lens.len() { + source.push(pool.pop().unwrap_or_default()); + } + + let mut source_len = 0; + let mut last_chunk_idx = 0; + let mut last_chunk_len = 0; + let mut remaining_len = *max_dest_len as usize; + for (idx, (len, source)) in source_lens.iter().zip(&mut source).enumerate() { + let fill = (idx + 1) as u8; + let len = *len as usize; + source.resize(len, fill); + source.fill(fill); + if len > 0 && remaining_len > 0 { + last_chunk_idx = idx; + last_chunk_len = len.min(remaining_len); + } + source_len += len; + remaining_len = remaining_len.saturating_sub(len); + } + + let dest_len = source_len.min(*max_dest_len as usize); + dest.resize(dest_len, 0); + dest.fill(0); + let dest = &mut dest[..]; + + expected.resize(dest_len, 0); + expected.fill(0); + + { + // don't copy the last chunk, since that should be returned + let source = &source[..last_chunk_idx]; + crate::slice::vectored_copy(source, &mut [&mut expected[..]]); + } + + let expected_chunk = source + .get(last_chunk_idx) + .map(|v| &v[..last_chunk_len]) + .unwrap_or(&[]); + + // IoSlice implementation + { + let mut source = IoSlice::new(&source); + let mut target = &mut dest[..]; + + let chunk = source.partial_copy_into(&mut target).unwrap(); + + assert_eq!(expected, dest); + assert_eq!(expected_chunk, &*chunk); + // reset the destination + dest.fill(0); + } + + // Buf implementation + { + let mut source = IoSlice::new(&source); + let mut source = Buf::new(&mut source); + let mut target = &mut dest[..]; + + let chunk = source.partial_copy_into(&mut target).unwrap(); + + assert_eq!(expected, dest); + assert_eq!(expected_chunk, &*chunk); + // reset the destination + dest.fill(0); + } + + // IoSlice read_chunk + { + let mut reader = IoSlice::new(&source); + let max_dest_len = *max_dest_len as usize; + + let expected_chunk_len = source_lens + .iter() + .find(|len| **len > 0) + .copied() + .unwrap_or(0) as usize; + let expected_chunk_len = expected_chunk_len.min(max_dest_len); + + let chunk = reader.read_chunk(max_dest_len).unwrap(); + + assert_eq!(chunk.len(), expected_chunk_len); + } + }); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/storage/slice.rs b/quic/s2n-quic-core/src/buffer/reader/storage/slice.rs new file mode 100644 index 0000000000..4c7b29ce32 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/storage/slice.rs @@ -0,0 +1,63 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + buffer::{ + reader::{storage::Chunk, Storage}, + writer, + }, + ensure, +}; + +macro_rules! impl_slice { + ($ty:ty, $split:ident) => { + impl Storage for $ty { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len() + } + + #[inline] + fn buffer_is_empty(&self) -> bool { + self.is_empty() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + ensure!(!self.is_empty(), Ok(Chunk::empty())); + let len = self.len().min(watermark); + // use `take` to work around borrowing rules + let (chunk, remaining) = core::mem::take(self).$split(len); + *self = remaining; + Ok((&*chunk).into()) + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result + where + Dest: writer::Storage + ?Sized, + { + self.read_chunk(dest.remaining_capacity()) + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: writer::Storage + ?Sized, + { + ensure!(!self.is_empty(), Ok(())); + let len = self.len().min(dest.remaining_capacity()); + // use `take` to work around borrowing rules + let (chunk, remaining) = core::mem::take(self).$split(len); + *self = remaining; + dest.put_slice(chunk); + Ok(()) + } + } + }; +} + +impl_slice!(&[u8], split_at); +impl_slice!(&mut [u8], split_at_mut); diff --git a/quic/s2n-quic-core/src/buffer/reader/storage/tests.rs b/quic/s2n-quic-core/src/buffer/reader/storage/tests.rs new file mode 100644 index 0000000000..57954ceb18 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/storage/tests.rs @@ -0,0 +1,75 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::*; + +/// ensures each implementation returns a trailing chunk correctly +#[test] +#[cfg_attr(miri, ignore)] // This test is too expensive for miri to complete in a reasonable amount of time +fn trailing_chunk_test() { + let mut dest = vec![]; + let mut source = vec![]; + bolero::check!() + .with_type::<(u16, u16)>() + .for_each(|(dest_len, source_len)| { + source.resize(*source_len as usize, 42); + + let dest_len = (*source_len).min(*dest_len) as usize; + dest.resize(dest_len, 0); + let expected = &source[..dest_len]; + let dest = &mut dest[..]; + + // direct implementation + { + let mut reader: &[u8] = &source[..]; + let mut target = &mut dest[..]; + + let chunk = reader.partial_copy_into(&mut target).unwrap(); + assert_eq!(expected, &*chunk); + assert!( + dest.iter().all(|b| *b == 0), + "no bytes should be copied into dest" + ); + } + + // IoSlice implementation + { + let io_slice = [&source[..]]; + let mut reader = IoSlice::new(&io_slice); + let mut target = &mut dest[..]; + + let chunk = reader.partial_copy_into(&mut target).unwrap(); + assert_eq!(expected, &*chunk); + assert!( + dest.iter().all(|b| *b == 0), + "no bytes should be copied into dest" + ); + } + + // Buf implementation + { + let mut slice = &source[..]; + let mut reader = Buf::new(&mut slice); + let mut target = &mut dest[..]; + + let chunk = reader.partial_copy_into(&mut target).unwrap(); + assert_eq!(expected, &*chunk); + assert!( + dest.iter().all(|b| *b == 0), + "no bytes should be copied into dest" + ); + } + + // full_copy + { + let mut source = &source[..]; + let mut reader = source.full_copy(); + let mut target = &mut dest[..]; + + let chunk = reader.partial_copy_into(&mut target).unwrap(); + assert!(chunk.is_empty()); + assert_eq!(expected, dest); + dest.fill(0); + } + }); +} diff --git a/quic/s2n-quic-core/src/buffer/reader/storage/tracked.rs b/quic/s2n-quic-core/src/buffer/reader/storage/tracked.rs new file mode 100644 index 0000000000..d9ca7cb393 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/storage/tracked.rs @@ -0,0 +1,127 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::buffer::{ + reader::{Reader, Storage}, + writer, +}; + +pub struct Tracked<'a, S: Storage + ?Sized> { + consumed: usize, + storage: &'a mut S, +} + +impl<'a, S: Storage + ?Sized> Tracked<'a, S> { + #[inline] + pub fn new(storage: &'a mut S) -> Self { + Self { + consumed: 0, + storage, + } + } + + #[inline] + pub fn consumed_len(&self) -> usize { + self.consumed + } +} + +impl<'a, S: Storage + ?Sized> Storage for Tracked<'a, S> { + type Error = S::Error; + + #[inline] + fn buffered_len(&self) -> usize { + self.storage.buffered_len() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result, Self::Error> { + let chunk = self.storage.read_chunk(watermark)?; + self.consumed += chunk.len(); + Ok(chunk) + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result, Self::Error> + where + Dest: writer::Storage + ?Sized, + { + let mut dest = dest.track_write(); + let chunk = self.storage.partial_copy_into(&mut dest)?; + self.consumed += dest.written_len(); + self.consumed += chunk.len(); + Ok(chunk) + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: writer::Storage + ?Sized, + { + let mut dest = dest.track_write(); + self.storage.copy_into(&mut dest)?; + self.consumed += dest.written_len(); + Ok(()) + } +} + +impl<'a, S: Reader + ?Sized> Reader for Tracked<'a, S> { + #[inline] + fn current_offset(&self) -> crate::varint::VarInt { + self.storage.current_offset() + } + + #[inline] + fn final_offset(&self) -> Option { + self.storage.final_offset() + } + + #[inline] + fn has_buffered_fin(&self) -> bool { + self.storage.has_buffered_fin() + } + + #[inline] + fn is_consumed(&self) -> bool { + self.storage.is_consumed() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use writer::Storage as _; + + #[test] + fn tracked_test() { + let mut reader = &b"hello world"[..]; + let mut writer: Vec = vec![]; + + { + let mut reader = reader.track_read(); + let chunk = reader.read_chunk(1).unwrap(); + assert_eq!(&chunk[..], b"h"); + assert_eq!(reader.consumed_len(), 1); + } + + { + let mut reader = reader.track_read(); + let mut writer = writer.with_write_limit(5); + + let chunk = reader.partial_copy_into(&mut writer).unwrap(); + assert_eq!(&chunk[..], b"ello "); + assert_eq!(reader.consumed_len(), 5); + } + + assert_eq!(writer.len(), 0); + + { + let mut reader = reader.track_read(); + reader.copy_into(&mut writer).unwrap(); + assert_eq!(reader.consumed_len(), 5); + assert_eq!(&writer[..], b"world"); + } + + assert_eq!(reader.len(), 0); + } +} diff --git a/quic/s2n-quic-core/src/buffer/receive_buffer.rs b/quic/s2n-quic-core/src/buffer/reassembler.rs similarity index 83% rename from quic/s2n-quic-core/src/buffer/receive_buffer.rs rename to quic/s2n-quic-core/src/buffer/reassembler.rs index dcb86b5acc..8a69df1a3e 100644 --- a/quic/s2n-quic-core/src/buffer/receive_buffer.rs +++ b/quic/s2n-quic-core/src/buffer/reassembler.rs @@ -1,17 +1,19 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -//! This module contains data structures for buffering incoming and outgoing data -//! in Quic streams. +//! This module contains data structures for buffering incoming streams. +use super::Error; use crate::varint::VarInt; use alloc::collections::{vec_deque, VecDeque}; use bytes::BytesMut; -use core::fmt; +mod duplex; mod probe; +mod reader; mod request; mod slot; +mod writer; #[cfg(test)] mod tests; @@ -19,31 +21,7 @@ mod tests; use request::Request; use slot::Slot; -/// Enumerates error that can occur while inserting data into the Receive Buffer -#[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub enum ReceiveBufferError { - /// An invalid data range was provided - OutOfRange, - /// The provided final size was invalid for the buffer's state - InvalidFin, -} - -#[cfg(feature = "std")] -impl std::error::Error for ReceiveBufferError {} - -impl fmt::Display for ReceiveBufferError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Self::OutOfRange => write!(f, "write extends out of the maximum possible offset"), - Self::InvalidFin => write!( - f, - "write modifies the final offset in a non-compliant manner" - ), - } - } -} - -/// The default buffer size for slots that the [`ReceiveBuffer`] uses. +/// The default buffer size for slots that the [`Reassembler`] uses. /// /// This value was picked as it is typically used for the default memory page size. const MIN_BUFFER_ALLOCATION_SIZE: usize = 4096; @@ -58,14 +36,14 @@ const UNKNOWN_FINAL_SIZE: u64 = u64::MAX; //# Endpoints MUST be able to deliver stream data to an application as an //# ordered byte-stream. -/// `ReceiveBuffer` is a buffer structure for combining chunks of bytes in an +/// `Reassembler` is a buffer structure for combining chunks of bytes in an /// ordered stream, which might arrive out of order. /// -/// `ReceiveBuffer` will accumulate the bytes, and provide them to its users +/// `Reassembler` will accumulate the bytes, and provide them to its users /// once a contiguous range of bytes at the current position of the stream has /// been accumulated. /// -/// `ReceiveBuffer` is optimized for minimizing memory allocations and for +/// `Reassembler` is optimized for minimizing memory allocations and for /// offering it's users chunks of sizes that minimize call overhead. /// /// If data is received in smaller chunks, only the first chunk will trigger a @@ -80,10 +58,10 @@ const UNKNOWN_FINAL_SIZE: u64 = u64::MAX; /// /// ## Usage /// -/// ```rust,ignore -/// use s2n_quic_transport::buffer::ReceiveBuffer; +/// ```rust +/// use s2n_quic_core::buffer::Reassembler; /// -/// let mut buffer = ReceiveBuffer::new(); +/// let mut buffer = Reassembler::new(); /// /// // write a chunk of bytes at offset 4, which can not be consumed yet /// assert!(buffer.write_at(4u32.into(), &[4, 5, 6, 7]).is_ok()); @@ -99,23 +77,23 @@ const UNKNOWN_FINAL_SIZE: u64 = u64::MAX; /// assert_eq!(&[0u8, 1, 2, 3, 4, 5, 6, 7], &buffer.pop().unwrap()[..]); /// ``` #[derive(Debug, PartialEq)] -pub struct ReceiveBuffer { +pub struct Reassembler { slots: VecDeque, start_offset: u64, max_recv_offset: u64, final_offset: u64, } -impl Default for ReceiveBuffer { +impl Default for Reassembler { fn default() -> Self { Self::new() } } -impl ReceiveBuffer { - /// Creates a new `ReceiveBuffer` - pub fn new() -> ReceiveBuffer { - ReceiveBuffer { +impl Reassembler { + /// Creates a new `Reassembler` + pub fn new() -> Reassembler { + Reassembler { slots: VecDeque::new(), start_offset: 0, max_recv_offset: 0, @@ -179,7 +157,7 @@ impl ReceiveBuffer { /// Pushes a slice at a certain offset #[inline] - pub fn write_at(&mut self, offset: VarInt, data: &[u8]) -> Result<(), ReceiveBufferError> { + pub fn write_at(&mut self, offset: VarInt, data: &[u8]) -> Result<(), Error> { // create a request let request = Request::new(offset, data)?; self.write_request(request)?; @@ -188,7 +166,7 @@ impl ReceiveBuffer { /// Pushes a slice at a certain offset, which is the end of the buffer #[inline] - pub fn write_at_fin(&mut self, offset: VarInt, data: &[u8]) -> Result<(), ReceiveBufferError> { + pub fn write_at_fin(&mut self, offset: VarInt, data: &[u8]) -> Result<(), Error> { // create a request let request = Request::new(offset, data)?; @@ -203,17 +181,11 @@ impl ReceiveBuffer { //# of type FINAL_SIZE_ERROR; see Section 11 for details on error //# handling. if let Some(final_size) = self.final_size() { - ensure!( - final_size == final_offset, - Err(ReceiveBufferError::InvalidFin) - ); + ensure!(final_size == final_offset, Err(Error::InvalidFin)); } // make sure that we didn't see any previous chunks greater than the final size - ensure!( - self.max_recv_offset <= final_offset, - Err(ReceiveBufferError::InvalidFin) - ); + ensure!(self.max_recv_offset <= final_offset, Err(Error::InvalidFin)); self.final_offset = final_offset; @@ -223,7 +195,7 @@ impl ReceiveBuffer { } #[inline] - fn write_request(&mut self, request: Request) -> Result<(), ReceiveBufferError> { + fn write_request(&mut self, request: Request) -> Result<(), Error> { // trim off any data that we've already read let (_, request) = request.split(self.start_offset); // trim off any data that exceeds our final length @@ -236,7 +208,7 @@ impl ReceiveBuffer { //# final size for the stream, an endpoint SHOULD respond with an error //# of type FINAL_SIZE_ERROR; see Section 11 for details on error //# handling. - ensure!(excess.is_empty(), Err(ReceiveBufferError::InvalidFin)); + ensure!(excess.is_empty(), Err(Error::InvalidFin)); // if the request is empty we're done ensure!(!request.is_empty(), Ok(())); @@ -300,20 +272,17 @@ impl ReceiveBuffer { /// This can be used for copy-avoidance applications where a packet is received in order and /// doesn't need to be stored temporarily for future packets to unblock the stream. #[inline] - pub fn skip(&mut self, len: usize) -> Result<(), ReceiveBufferError> { + pub fn skip(&mut self, len: VarInt) -> Result<(), Error> { // zero-length skip is a no-op - ensure!(len > 0, Ok(())); + ensure!(len > VarInt::ZERO, Ok(())); let new_start_offset = self .start_offset - .checked_add(len as u64) - .ok_or(ReceiveBufferError::OutOfRange)?; + .checked_add(len.as_u64()) + .ok_or(Error::OutOfRange)?; if let Some(final_size) = self.final_size() { - ensure!( - final_size >= new_start_offset, - Err(ReceiveBufferError::InvalidFin) - ); + ensure!(final_size >= new_start_offset, Err(Error::InvalidFin)); } // record the maximum offset that we've seen @@ -398,48 +367,6 @@ impl ReceiveBuffer { }) } - /// Copies all the available buffered data into the provided `buf`'s remaining capacity. - /// - /// This method is slightly more efficient than [`Self::pop`] when the caller ends up copying - /// the buffered data into another slice, since it avoids a refcount increment/decrement on - /// the contained [`BytesMut`]. - /// - /// The total number of bytes copied is returned. - #[inline] - pub fn copy_into_buf(&mut self, buf: &mut B) -> usize { - use bytes::Buf; - - let mut total = 0; - - loop { - let remaining = buf.remaining_mut(); - // ensure we have enough capacity in the destination buf - ensure!(remaining > 0, total); - - let transform = |buffer: &mut BytesMut, is_final_offset| { - let watermark = buffer.len().min(remaining); - - // copy bytes into the destination buf - buf.put_slice(&buffer[..watermark]); - // advance the chunk rather than splitting to avoid refcount churn - buffer.advance(watermark); - total += watermark; - - (is_final_offset, watermark) - }; - - match self.pop_transform(transform) { - // if we're at the final offset, then no need to keep iterating - Some(true) => break, - Some(false) => continue, - // no more available chunks - None => break, - } - } - - total - } - /// Pops a buffer from the front of the receive queue as long as the `transform` function returns a /// non-empty buffer. #[inline] @@ -667,7 +594,7 @@ pub struct Iter<'a> { impl<'a> Iter<'a> { #[inline] - fn new(buffer: &'a ReceiveBuffer) -> Self { + fn new(buffer: &'a Reassembler) -> Self { Self { prev_end: buffer.start_offset, inner: buffer.slots.iter(), @@ -690,7 +617,7 @@ impl<'a> Iterator for Iter<'a> { } pub struct Drain<'a> { - inner: &'a mut ReceiveBuffer, + inner: &'a mut Reassembler, } impl<'a> Iterator for Drain<'a> { diff --git a/quic/s2n-quic-core/src/buffer/receive_buffer/__fuzz__/buffer__receive_buffer__tests__model/corpus.tar.gz b/quic/s2n-quic-core/src/buffer/reassembler/__fuzz__/buffer__reassembler__tests__model/corpus.tar.gz similarity index 100% rename from quic/s2n-quic-core/src/buffer/receive_buffer/__fuzz__/buffer__receive_buffer__tests__model/corpus.tar.gz rename to quic/s2n-quic-core/src/buffer/reassembler/__fuzz__/buffer__reassembler__tests__model/corpus.tar.gz diff --git a/quic/s2n-quic-core/src/buffer/reassembler/duplex.rs b/quic/s2n-quic-core/src/buffer/reassembler/duplex.rs new file mode 100644 index 0000000000..6aab27bda7 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reassembler/duplex.rs @@ -0,0 +1,23 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::Reassembler; +use crate::{ + buffer::{duplex::Skip, Error}, + varint::VarInt, +}; + +impl Skip for Reassembler { + #[inline] + fn skip(&mut self, len: VarInt, final_offset: Option) -> Result<(), Error> { + // write the final offset first, if possible + if let Some(offset) = final_offset { + self.write_at_fin(offset, &[])?; + } + + // then skip the bytes + (*self).skip(len)?; + + Ok(()) + } +} diff --git a/quic/s2n-quic-core/src/buffer/receive_buffer/probe.rs b/quic/s2n-quic-core/src/buffer/reassembler/probe.rs similarity index 71% rename from quic/s2n-quic-core/src/buffer/receive_buffer/probe.rs rename to quic/s2n-quic-core/src/buffer/reassembler/probe.rs index 542304b6fb..86da449953 100644 --- a/quic/s2n-quic-core/src/buffer/receive_buffer/probe.rs +++ b/quic/s2n-quic-core/src/buffer/reassembler/probe.rs @@ -4,15 +4,15 @@ crate::probe::define!( extern "probe" { /// Emitted when a buffer is allocated for a particular offset - #[link_name = s2n_quic_core__buffer__receive_buffer__alloc] + #[link_name = s2n_quic_core__buffer__reassembler__alloc] pub fn alloc(offset: u64, capacity: usize); /// Emitted when a chunk is read from the beginning of the buffer - #[link_name = s2n_quic_core__buffer__receive_buffer__pop] + #[link_name = s2n_quic_core__buffer__reassembler__pop] pub fn pop(offset: u64, len: usize); /// Emitted when a chunk of data is written at an offset - #[link_name = s2n_quic_core__buffer__receive_buffer__write] + #[link_name = s2n_quic_core__buffer__reassembler__write] pub fn write(offset: u64, len: usize); } ); diff --git a/quic/s2n-quic-core/src/buffer/reassembler/reader.rs b/quic/s2n-quic-core/src/buffer/reassembler/reader.rs new file mode 100644 index 0000000000..527703b270 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reassembler/reader.rs @@ -0,0 +1,191 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::Reassembler; +use crate::{ + buffer::{ + reader::{ + storage::{Chunk, Infallible}, + Reader, Storage, + }, + writer, + }, + varint::VarInt, +}; +use bytes::BytesMut; + +impl Storage for Reassembler { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len() + } + + #[inline] + fn buffer_is_empty(&self) -> bool { + self.is_empty() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + if let Some(chunk) = self.pop_watermarked(watermark) { + return Ok(chunk.into()); + } + + Ok(Default::default()) + } + + #[inline] + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result + where + Dest: writer::Storage + ?Sized, + { + // ensure we have enough capacity in the destination buf + ensure!(dest.has_remaining_capacity(), Ok(Default::default())); + + let mut prev = BytesMut::new(); + + loop { + let remaining = dest.remaining_capacity(); + unsafe { + assume!(prev.len() <= remaining); + } + let watermark = remaining - prev.len(); + + debug_assert!(remaining > 0); + + match self.pop_watermarked(watermark) { + Some(chunk) => { + debug_assert!(!chunk.is_empty(), "pop should never return an empty chunk"); + debug_assert!( + chunk.len() <= watermark, + "chunk should never exceed watermark" + ); + + // flush the previous chunk if needed + if !prev.is_empty() { + dest.put_bytes_mut(prev); + } + + // if the chunk is exactly the same size as the watermark, then return it + if chunk.len() == watermark { + return Ok(chunk.into()); + } + + // store the chunk for another iteration, in case we can pull more + prev = chunk; + } + None if prev.is_empty() => { + return Ok(Default::default()); + } + None => { + return Ok(prev.into()); + } + } + } + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: writer::Storage + ?Sized, + { + loop { + // ensure we have enough capacity in the destination buf + ensure!(dest.has_remaining_capacity(), Ok(())); + + let transform = |buffer: &mut BytesMut, _is_final_offset| { + let mut dest = dest.track_write(); + buffer.infallible_copy_into(&mut dest); + ((), dest.written_len()) + }; + + if self.pop_transform(transform).is_none() { + return Ok(()); + } + } + } +} + +impl Reader for Reassembler { + #[inline] + fn current_offset(&self) -> VarInt { + unsafe { + // SAFETY: offset will always fit into a VarInt + VarInt::new_unchecked(self.start_offset) + } + } + + #[inline] + fn final_offset(&self) -> Option { + self.final_size().map(|v| unsafe { + // SAFETY: offset will always fit into a VarInt + VarInt::new_unchecked(v) + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn undersized_dest_partial_copy_into_test() { + let mut reassembler = Reassembler::default(); + + reassembler.write_at(VarInt::ZERO, b"hello").unwrap(); + + let mut dest = &mut [0u8; 1][..]; + let chunk = reassembler.infallible_partial_copy_into(&mut dest); + assert_eq!(dest.len(), 1, "the destination should not be written into"); + assert_eq!(&chunk[..], b"h"); + + assert_eq!(reassembler.current_offset().as_u64(), 1); + } + + #[test] + fn oversized_dest_partial_copy_into_test() { + let mut reassembler = Reassembler::default(); + + reassembler.write_at(VarInt::ZERO, b"hello").unwrap(); + + let mut reader = reassembler.with_checks(); + + let mut dest = &mut [0u8; 10][..]; + let chunk = reader.infallible_partial_copy_into(&mut dest); + assert_eq!(dest.len(), 10, "the destination should not be written into"); + assert_eq!(&chunk[..], b"hello"); + + assert_eq!(reader.current_offset().as_u64(), 5); + } + + #[test] + fn multiple_chunk_dest_partial_copy_into_test() { + let mut reassembler = Reassembler::default(); + + // align the cursor to just before a slot boundary + let offset: VarInt = (super::super::MIN_BUFFER_ALLOCATION_SIZE - 1) + .try_into() + .unwrap(); + reassembler.skip(offset).unwrap(); + reassembler.write_at(offset, b"hello").unwrap(); + + let mut reader = reassembler.with_checks(); + let mut dest = [0u8; 10]; + + let chunk = { + let mut dest = &mut dest[..]; + let chunk = reader.infallible_partial_copy_into(&mut dest); + assert_eq!( + dest.len(), + 9, + "the destination should have a single byte written to it" + ); + chunk + }; + + assert_eq!(&dest[..1], b"h"); + assert_eq!(&chunk[..], b"ello"); + } +} diff --git a/quic/s2n-quic-core/src/buffer/receive_buffer/request.rs b/quic/s2n-quic-core/src/buffer/reassembler/request.rs similarity index 93% rename from quic/s2n-quic-core/src/buffer/receive_buffer/request.rs rename to quic/s2n-quic-core/src/buffer/reassembler/request.rs index bfe2c47a5d..8bbe2b6d20 100644 --- a/quic/s2n-quic-core/src/buffer/receive_buffer/request.rs +++ b/quic/s2n-quic-core/src/buffer/reassembler/request.rs @@ -1,7 +1,7 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -use super::ReceiveBufferError; +use super::Error; use crate::varint::VarInt; use bytes::{BufMut, BytesMut}; use core::fmt; @@ -23,10 +23,10 @@ impl<'a> fmt::Debug for Request<'a> { impl<'a> Request<'a> { #[inline] - pub fn new(offset: VarInt, data: &'a [u8]) -> Result { + pub fn new(offset: VarInt, data: &'a [u8]) -> Result { offset .checked_add_usize(data.len()) - .ok_or(ReceiveBufferError::OutOfRange)?; + .ok_or(Error::OutOfRange)?; Ok(Self { offset: offset.as_u64(), data, diff --git a/quic/s2n-quic-core/src/buffer/receive_buffer/slot.rs b/quic/s2n-quic-core/src/buffer/reassembler/slot.rs similarity index 99% rename from quic/s2n-quic-core/src/buffer/receive_buffer/slot.rs rename to quic/s2n-quic-core/src/buffer/reassembler/slot.rs index 6f69cd9e35..b087e5d3b1 100644 --- a/quic/s2n-quic-core/src/buffer/receive_buffer/slot.rs +++ b/quic/s2n-quic-core/src/buffer/reassembler/slot.rs @@ -5,7 +5,7 @@ use super::Request; use bytes::{Buf, BufMut, BytesMut}; use core::fmt; -/// Possible states for slots in the [`ReceiveBuffer`]s queue +/// Possible states for slots in the [`Reassembler`]'s queue #[derive(PartialEq, Eq)] pub struct Slot { start: u64, diff --git a/quic/s2n-quic-core/src/buffer/receive_buffer/tests.rs b/quic/s2n-quic-core/src/buffer/reassembler/tests.rs similarity index 94% rename from quic/s2n-quic-core/src/buffer/receive_buffer/tests.rs rename to quic/s2n-quic-core/src/buffer/reassembler/tests.rs index bef282f0b0..2b5f4a418f 100644 --- a/quic/s2n-quic-core/src/buffer/receive_buffer/tests.rs +++ b/quic/s2n-quic-core/src/buffer/reassembler/tests.rs @@ -20,7 +20,7 @@ enum Op { watermark: Option, }, Skip { - len: usize, + len: VarInt, }, } @@ -28,7 +28,7 @@ enum Op { #[cfg_attr(miri, ignore)] // This test is too expensive for miri to complete in a reasonable amount of time fn model_test() { check!().with_type::>().for_each(|ops| { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); let mut recv = Data::new(u64::MAX); for op in ops { match *op { @@ -59,8 +59,8 @@ fn model_test() { let consumed_len = buffer.consumed_len(); if buffer.skip(len).is_ok() { let new_consumed_len = buffer.consumed_len(); - assert_eq!(new_consumed_len, consumed_len + len as u64); - recv.seek_forward(len); + assert_eq!(new_consumed_len, consumed_len + len.as_u64()); + recv.seek_forward(len.as_u64()); } } } @@ -68,14 +68,14 @@ fn model_test() { // make sure a cleared buffer is the same as a new one buffer.reset(); - assert_eq!(buffer, ReceiveBuffer::new()); + assert_eq!(buffer, Reassembler::new()); }) } #[test] #[cfg_attr(miri, ignore)] // This test is too expensive for miri to complete in a reasonable amount of time fn write_and_pop() { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); let mut offset = VarInt::default(); let chunk = Data::send_one_at(0, 9000); let mut popped_bytes = 0; @@ -92,22 +92,25 @@ fn write_and_pop() { #[test] #[cfg_attr(miri, ignore)] // This test is too expensive for miri to complete in a reasonable amount of time fn write_and_copy_into_buf() { - let mut buffer = ReceiveBuffer::new(); + use crate::buffer::reader::Storage; + + let mut buffer = Reassembler::new(); let mut offset = VarInt::default(); - let mut output = vec![]; + let mut output: Vec = vec![]; for len in 0..10000 { + dbg!(len, offset); let chunk = Data::send_one_at(offset.as_u64(), len); buffer.write_at(offset, &chunk).unwrap(); offset += chunk.len(); - let copied_len = buffer.copy_into_buf(&mut output); - assert_eq!(copied_len, chunk.len()); + buffer.copy_into(&mut output).unwrap(); + assert_eq!(output.len(), chunk.len()); assert_eq!(&output[..], &chunk[..]); output.clear(); } } -fn new_receive_buffer() -> ReceiveBuffer { - let buffer = ReceiveBuffer::new(); +fn new_receive_buffer() -> Reassembler { + let buffer = Reassembler::new(); assert_eq!(buffer.len(), 0); buffer } @@ -400,7 +403,7 @@ fn chunk_partial_larger_after_test() { #[test] #[allow(clippy::cognitive_complexity)] // several operations are needed to get the buffer in the desired state fn write_and_read_buffer() { - let mut buf = ReceiveBuffer::new(); + let mut buf = Reassembler::new(); assert_eq!(0, buf.len()); assert!(buf.is_empty()); @@ -452,7 +455,7 @@ fn write_and_read_buffer() { #[test] fn fill_preallocated_gaps() { - let mut buf: ReceiveBuffer = ReceiveBuffer::new(); + let mut buf: Reassembler = Reassembler::new(); buf.write_at((MIN_BUFFER_ALLOCATION_SIZE as u32 + 2).into(), &[42, 45]) .unwrap(); @@ -498,7 +501,7 @@ fn fill_preallocated_gaps() { #[test] fn create_and_fill_large_gaps() { - let mut buf = ReceiveBuffer::new(); + let mut buf = Reassembler::new(); // This creates 3 full buffer gaps of full allocation ranges buf.write_at( (MIN_BUFFER_ALLOCATION_SIZE as u32 * 3 + 2).into(), @@ -573,7 +576,7 @@ fn create_and_fill_large_gaps() { #[test] fn ignore_already_consumed_data() { - let mut buf = ReceiveBuffer::new(); + let mut buf = Reassembler::new(); buf.write_at(0u32.into(), &[0, 1, 2, 3]).unwrap(); assert_eq!(0, buf.consumed_len()); assert_eq!(&[0u8, 1, 2, 3], &*buf.pop().unwrap()); @@ -597,7 +600,7 @@ fn ignore_already_consumed_data() { #[test] fn merge_right() { - let mut buf = ReceiveBuffer::new(); + let mut buf = Reassembler::new(); buf.write_at(4u32.into(), &[4, 5, 6]).unwrap(); buf.write_at(0u32.into(), &[0, 1, 2, 3]).unwrap(); assert_eq!(7, buf.len()); @@ -606,7 +609,7 @@ fn merge_right() { #[test] fn merge_left() { - let mut buf = ReceiveBuffer::new(); + let mut buf = Reassembler::new(); buf.write_at(0u32.into(), &[0, 1, 2, 3]).unwrap(); buf.write_at(4u32.into(), &[4, 5, 6]).unwrap(); assert_eq!(7, buf.len()); @@ -615,7 +618,7 @@ fn merge_left() { #[test] fn merge_both_sides() { - let mut buf = ReceiveBuffer::new(); + let mut buf = Reassembler::new(); // Create gaps on all sides, and merge them later buf.write_at(4u32.into(), &[4, 5]).unwrap(); buf.write_at(8u32.into(), &[8, 9]).unwrap(); @@ -627,7 +630,7 @@ fn merge_both_sides() { #[test] fn do_not_merge_across_allocations_right() { - let mut buf = ReceiveBuffer::new(); + let mut buf = Reassembler::new(); let mut data_left = [0u8; MIN_BUFFER_ALLOCATION_SIZE]; let mut data_right = [0u8; MIN_BUFFER_ALLOCATION_SIZE]; @@ -645,7 +648,7 @@ fn do_not_merge_across_allocations_right() { #[test] fn do_not_merge_across_allocations_left() { - let mut buf = ReceiveBuffer::new(); + let mut buf = Reassembler::new(); let mut data_left = [0u8; MIN_BUFFER_ALLOCATION_SIZE]; let mut data_right = [0u8; MIN_BUFFER_ALLOCATION_SIZE]; @@ -663,7 +666,7 @@ fn do_not_merge_across_allocations_left() { #[test] fn reset_buffer() { - let mut buf = ReceiveBuffer::new(); + let mut buf = Reassembler::new(); buf.write_at(2u32.into(), &[2, 3]).unwrap(); buf.write_at(0u32.into(), &[0, 1]).unwrap(); assert_eq!(4, buf.len()); @@ -701,7 +704,7 @@ fn fail_to_push_out_of_bounds_data() { for nr_bytes in 0..64 * 2 + 1 { let data = vec![0u8; nr_bytes + 1]; assert_eq!( - Err(ReceiveBufferError::OutOfRange), + Err(Error::OutOfRange), buffer.write_at( VarInt::new(MAX_VARINT_VALUE - nr_bytes as u64).unwrap(), &data[..] @@ -714,7 +717,7 @@ fn fail_to_push_out_of_bounds_data() { #[cfg_attr(miri, ignore)] // miri fails because the slice points to invalid memory #[cfg(target_pointer_width = "64")] fn fail_to_push_out_of_bounds_data_with_long_buffer() { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); // Overflow the allowed buffers by size 1. This uses an invalid memory // reference, due to not wanting to allocate too much memory. This is @@ -728,7 +731,7 @@ fn fail_to_push_out_of_bounds_data_with_long_buffer() { for _ in 0..64 * 2 + 1 { assert_eq!( - Err(ReceiveBufferError::OutOfRange), + Err(Error::OutOfRange), buffer.write_at(20u32.into(), fake_data) ); } @@ -736,7 +739,7 @@ fn fail_to_push_out_of_bounds_data_with_long_buffer() { #[test] fn pop_watermarked_test() { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); assert_eq!( None, @@ -788,7 +791,7 @@ fn write_start_fin_test() { for size in INTERESTING_CHUNK_SIZES.iter().copied() { for pre_empty_fin in [false, true] { let bytes: Vec = Iterator::map(0..size, |v| v as u8).collect(); - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); // write the fin offset first if pre_empty_fin { @@ -832,11 +835,11 @@ fn write_partial_fin_test() { let partial_bytes: Vec = Iterator::map(0..partial_size, |v| v as u8).collect(); let fin_bytes: Vec = Iterator::map(0..fin_size, |v| v as u8).collect(); - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); assert!(!buffer.is_writing_complete()); assert!(!buffer.is_reading_complete()); - let mut oracle = ReceiveBuffer::new(); + let mut oracle = Reassembler::new(); let mut requests = vec![ (0u32, &partial_bytes, false), @@ -919,13 +922,13 @@ fn write_partial_fin_test() { #[test] fn write_fin_zero_test() { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); buffer.write_at_fin(0u32.into(), &[]).unwrap(); assert_eq!( buffer.write_at(0u32.into(), &[1]), - Err(ReceiveBufferError::InvalidFin), + Err(Error::InvalidFin), "no data can be written after a fin" ); } @@ -933,7 +936,7 @@ fn write_fin_zero_test() { #[test] fn fin_pop_take_test() { for write_fin in [false, true] { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); buffer.write_at(0u32.into(), &[1]).unwrap(); if write_fin { buffer.write_at_fin(1u32.into(), &[]).unwrap(); @@ -956,38 +959,38 @@ fn fin_pop_take_test() { #[test] fn write_fin_changed_error_test() { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); buffer.write_at_fin(16u32.into(), &[]).unwrap(); assert_eq!( buffer.write_at_fin(0u32.into(), &[]), - Err(ReceiveBufferError::InvalidFin), + Err(Error::InvalidFin), "the fin cannot decrease a previous fin" ); assert_eq!( buffer.write_at_fin(32u32.into(), &[]), - Err(ReceiveBufferError::InvalidFin), + Err(Error::InvalidFin), "the fin cannot exceed a previous fin" ); } #[test] fn write_fin_lowered_test() { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); buffer.write_at(32u32.into(), &[1]).unwrap(); assert_eq!( buffer.write_at_fin(16u32.into(), &[]), - Err(ReceiveBufferError::InvalidFin), + Err(Error::InvalidFin), "the fin cannot be lower than an already existing chunk" ); } #[test] fn write_fin_complete_test() { - let mut buffer = ReceiveBuffer::new(); + let mut buffer = Reassembler::new(); buffer.write_at_fin(4u32.into(), &[4]).unwrap(); @@ -1018,7 +1021,7 @@ fn allocation_size_test() { for (index, (offset, size)) in received.iter().copied().enumerate() { assert_eq!( - ReceiveBuffer::allocation_size(offset), + Reassembler::allocation_size(offset), size, "offset = {}", offset @@ -1027,7 +1030,7 @@ fn allocation_size_test() { if let Some((offset, _)) = received.get(index + 1) { let offset = offset - 1; assert_eq!( - ReceiveBuffer::allocation_size(offset), + Reassembler::allocation_size(offset), size, "offset = {}", offset diff --git a/quic/s2n-quic-core/src/buffer/reassembler/writer.rs b/quic/s2n-quic-core/src/buffer/reassembler/writer.rs new file mode 100644 index 0000000000..5659b700ab --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reassembler/writer.rs @@ -0,0 +1,94 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Reassembler, Slot}; +use crate::{ + buffer::{reader::Storage as _, Error, Reader, Writer}, + varint::VarInt, +}; +use bytes::BytesMut; + +impl Writer for Reassembler { + #[inline] + fn read_from(&mut self, reader: &mut R) -> Result<(), Error> + where + R: Reader + ?Sized, + { + // enable checks for the reader + let mut reader = reader.with_checks(); + let reader = &mut reader; + + let final_offset = reader.final_offset(); + + // optimize for the case where the stream consists of a single chunk + if let Some(final_offset) = final_offset { + let mut is_single_chunk_stream = true; + + let offset = reader.current_offset(); + + // the reader is starting at the beginning of the stream + is_single_chunk_stream &= offset == VarInt::ZERO; + // the reader has buffered the final offset + is_single_chunk_stream &= reader.has_buffered_fin(); + // no data has been consumed from the Reassembler + is_single_chunk_stream &= self.consumed_len() == 0; + // we aren't tracking any slots + is_single_chunk_stream &= self.slots.is_empty(); + + if is_single_chunk_stream { + let payload_len = reader.buffered_len(); + let end = final_offset.as_u64(); + + // don't allocate anything if we don't need to + if payload_len == 0 { + let chunk = reader.read_chunk(0)?; + debug_assert!(chunk.is_empty()); + } else { + let mut data = BytesMut::with_capacity(payload_len); + + // copy the whole thing into `data` + reader.copy_into(&mut data)?; + + self.slots.push_back(Slot::new(offset.as_u64(), end, data)); + }; + + // update the final offset after everything was read correctly + self.final_offset = end; + self.invariants(); + + return Ok(()); + } + } + + // TODO add better support for copy avoidance by iterating to the appropriate slot and + // copying into that, if possible + + // fall back to copying individual chunks into the receive buffer + let mut first_write = true; + loop { + let offset = reader.current_offset(); + let chunk = reader.read_chunk(usize::MAX)?; + + // Record the final size before writing to avoid excess allocation. This also needs to + // happen after we read the first chunk in case there are errors. + if first_write { + if let Some(offset) = final_offset { + self.write_at_fin(offset, &[]).map_err(Error::mapped)?; + } + } + + // TODO maybe specialize on BytesMut chunks? - for now we'll just treat them as + // slices + + self.write_at(offset, &chunk).map_err(Error::mapped)?; + + first_write = false; + + if reader.buffer_is_empty() { + break; + } + } + + Ok(()) + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer.rs b/quic/s2n-quic-core/src/buffer/writer.rs new file mode 100644 index 0000000000..8949ac00b5 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer.rs @@ -0,0 +1,13 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +pub mod storage; + +pub use storage::Storage; + +/// A buffer capable of being written into by a reader +pub trait Writer { + fn read_from(&mut self, reader: &mut R) -> Result<(), super::Error> + where + R: super::Reader + ?Sized; +} diff --git a/quic/s2n-quic-core/src/buffer/writer/storage.rs b/quic/s2n-quic-core/src/buffer/writer/storage.rs new file mode 100644 index 0000000000..47b155aa0b --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/storage.rs @@ -0,0 +1,108 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::buffer::reader::storage::Chunk; +use bytes::{Bytes, BytesMut}; + +mod buf; +mod byte_queue; +mod discard; +mod empty; +mod limit; +mod tracked; +mod uninit_slice; +mod write_once; + +pub use buf::BufMut; +pub use bytes::buf::UninitSlice; +pub use discard::Discard; +pub use empty::Empty; +pub use limit::Limit; +pub use tracked::Tracked; +pub use write_once::WriteOnce; + +/// An implementation that accepts concrete types of chunked writes +pub trait Storage { + const SPECIALIZES_BYTES: bool = false; + const SPECIALIZES_BYTES_MUT: bool = false; + + /// Writes a slice of bytes into the storage + /// + /// The bytes MUST always be less than `remaining_capacity`. + fn put_slice(&mut self, bytes: &[u8]); + + /// Tries to write into a uninit slice for the current storage + /// + /// If `false` is returned, the storage wasn't capable of this operation and a regular `put_*` + /// call should be used instead. + #[inline(always)] + fn put_uninit_slice(&mut self, payload_len: usize, f: F) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + // we can specialize on an empty payload + ensure!(payload_len == 0, Ok(false)); + + f(UninitSlice::new(&mut []))?; + + Ok(true) + } + + /// Returns the additional number of bytes that can be written to the storage + fn remaining_capacity(&self) -> usize; + + /// Returns `true` if the storage will accept any additional bytes + #[inline] + fn has_remaining_capacity(&self) -> bool { + self.remaining_capacity() > 0 + } + + /// Writes [`Bytes`] into the storage + /// + /// Callers should check `SPECIALIZES_BYTES` before deciding to use this method. Otherwise, it + /// might be cheaper to copy a slice into the storage and then increment the offset. + #[inline] + fn put_bytes(&mut self, bytes: Bytes) { + self.put_slice(&bytes); + } + + /// Writes [`BytesMut`] into the storage + /// + /// Callers should check `SPECIALIZES_BYTES_MUT` before deciding to use this method. Otherwise, it + /// might be cheaper to copy a slice into the storage and then increment the offset. + #[inline] + fn put_bytes_mut(&mut self, bytes: BytesMut) { + self.put_slice(&bytes); + } + + /// Writes a reader [`Chunk`] into the storage + #[inline] + fn put_chunk(&mut self, chunk: Chunk) { + match chunk { + Chunk::Slice(v) => self.put_slice(v), + Chunk::Bytes(v) => self.put_bytes(v), + Chunk::BytesMut(v) => self.put_bytes_mut(v), + } + } + + /// Limits the number of bytes that can be written to the storage + #[inline] + fn with_write_limit(&mut self, max_len: usize) -> Limit { + Limit::new(self, max_len) + } + + /// Tracks the number of bytes written to the storage + #[inline] + fn track_write(&mut self) -> Tracked { + Tracked::new(self) + } + + /// Only allows a single write into the storage. After that, no more writes are allowed. + /// + /// This can be used for very low latency scenarios where processing the single read is more + /// important than filling the entire storage with as much data as possible. + #[inline] + fn write_once(&mut self) -> WriteOnce { + WriteOnce::new(self) + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer/storage/buf.rs b/quic/s2n-quic-core/src/buffer/writer/storage/buf.rs new file mode 100644 index 0000000000..b495809668 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/storage/buf.rs @@ -0,0 +1,137 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::buffer::writer::{storage::UninitSlice, Storage}; + +/// Delegates storage operations into a [`bytes::BufMut`] implementation. +pub struct BufMut<'a, T: bytes::BufMut> { + buf_mut: &'a mut T, +} + +impl<'a, T: bytes::BufMut> BufMut<'a, T> { + #[inline] + pub fn new(buf_mut: &'a mut T) -> Self { + Self { buf_mut } + } +} + +impl<'a, T: bytes::BufMut> Storage for BufMut<'a, T> { + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + self.buf_mut.put_slice(bytes); + } + + #[inline] + fn remaining_capacity(&self) -> usize { + self.buf_mut.remaining_mut() + } + + #[inline] + fn put_uninit_slice(&mut self, payload_len: usize, f: F) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + let chunk = self.buf_mut.chunk_mut(); + + // make sure the current chunk is capable of reading the entire slice + ensure!(chunk.len() >= payload_len, Ok(false)); + + f(&mut chunk[..payload_len])?; + + unsafe { + self.buf_mut.advance_mut(payload_len); + } + + Ok(true) + } +} + +/// Delegates standard types to their BufMut implementations +macro_rules! impl_buf_mut { + ($ty:ty $(, $reserve:ident)?) => { + impl Storage for $ty { + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + bytes::BufMut::put_slice(self, bytes); + } + + #[inline] + fn remaining_capacity(&self) -> usize { + bytes::BufMut::remaining_mut(self) + } + + #[inline] + fn put_uninit_slice( + &mut self, + payload_len: usize, + f: F, + ) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + use bytes::BufMut; + + // try to reserve additional capacity for the write, if possible + $( + self.$reserve(payload_len); + )? + + let chunk = self.chunk_mut(); + ensure!(chunk.len() >= payload_len, Ok(false)); + + f(&mut chunk[..payload_len])?; + + unsafe { + self.advance_mut(payload_len); + } + + Ok(true) + } + } + }; +} + +impl_buf_mut!(bytes::BytesMut, reserve); +impl_buf_mut!(alloc::vec::Vec, reserve); +impl_buf_mut!(&mut [u8]); +impl_buf_mut!(&mut [core::mem::MaybeUninit]); + +#[cfg(test)] +mod tests { + use crate::buffer::{reader::Storage as _, writer::Storage as _}; + + #[test] + fn vec_test() { + let mut buffer: Vec = vec![]; + let expected = vec![42; 1000]; + let expected = &expected[..]; + + { + assert_eq!(buffer.remaining_capacity(), isize::MAX as usize); + + let mut source = expected; + + source.copy_into(&mut buffer).unwrap(); + } + + assert_eq!(&buffer, expected); + } + + #[test] + fn vec_buf_test() { + let mut buffer: Vec = vec![]; + let expected = vec![42; 1000]; + let expected = &expected[..]; + + { + let mut buffer = super::BufMut::new(&mut buffer); + assert_eq!(buffer.remaining_capacity(), isize::MAX as usize); + + let mut source = expected; + + source.copy_into(&mut buffer).unwrap(); + } + + assert_eq!(&buffer, expected); + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer/storage/byte_queue.rs b/quic/s2n-quic-core/src/buffer/writer/storage/byte_queue.rs new file mode 100644 index 0000000000..d39978c556 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/storage/byte_queue.rs @@ -0,0 +1,97 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::buffer::writer::Storage; +use alloc::{collections::VecDeque, vec::Vec}; +use bytes::{Bytes, BytesMut}; + +/// Implements a queue of bytes, capable of zero-copy transfer of data +macro_rules! impl_queue { + ($ty:ident, $push:ident) => { + impl Storage for $ty { + const SPECIALIZES_BYTES: bool = true; + const SPECIALIZES_BYTES_MUT: bool = true; + + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + self.put_bytes(Bytes::copy_from_slice(bytes)); + } + + #[inline] + fn remaining_capacity(&self) -> usize { + usize::MAX + } + + #[inline] + fn has_remaining_capacity(&self) -> bool { + true + } + + #[inline] + fn put_bytes(&mut self, bytes: Bytes) { + self.$push(bytes); + } + + #[inline] + fn put_bytes_mut(&mut self, bytes: BytesMut) { + self.$push(bytes.freeze()); + } + } + + impl Storage for $ty { + const SPECIALIZES_BYTES_MUT: bool = true; + + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + self.put_bytes_mut(BytesMut::from(bytes)); + } + + #[inline] + fn remaining_capacity(&self) -> usize { + usize::MAX + } + + #[inline] + fn has_remaining_capacity(&self) -> bool { + true + } + + #[inline] + fn put_bytes(&mut self, bytes: Bytes) { + // we can't convert Bytes into BytesMut so we'll need to copy it + self.put_slice(&bytes); + } + + #[inline] + fn put_bytes_mut(&mut self, bytes: BytesMut) { + self.$push(bytes); + } + } + }; +} + +impl_queue!(Vec, push); +impl_queue!(VecDeque, push_back); + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn byte_queue_test() { + let mut writer: Vec = vec![]; + + writer.put_slice(b"hello"); + writer.put_bytes(Bytes::from_static(b" ")); + writer.put_bytes_mut(BytesMut::from(&b"world"[..])); + + assert_eq!( + writer, + vec![ + BytesMut::from(&b"hello"[..]), + BytesMut::from(&b" "[..]), + BytesMut::from(&b"world"[..]) + ] + ); + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer/storage/discard.rs b/quic/s2n-quic-core/src/buffer/writer/storage/discard.rs new file mode 100644 index 0000000000..62ab2cc2b1 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/storage/discard.rs @@ -0,0 +1,22 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::buffer::writer::Storage; + +/// Immediately discards any write operations +/// +/// This implementation can be used for benchmarking operations outside of copies. +#[derive(Clone, Copy, Debug, Default)] +pub struct Discard; + +impl Storage for Discard { + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + let _ = bytes; + } + + #[inline] + fn remaining_capacity(&self) -> usize { + usize::MAX + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer/storage/empty.rs b/quic/s2n-quic-core/src/buffer/writer/storage/empty.rs new file mode 100644 index 0000000000..4586edba90 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/storage/empty.rs @@ -0,0 +1,28 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::Storage; + +/// A writer incapable of being written to +#[derive(Clone, Copy, Debug, Default)] +pub struct Empty; + +impl Storage for Empty { + #[inline] + fn put_slice(&mut self, slice: &[u8]) { + debug_assert!( + slice.is_empty(), + "cannot put a non-empty slice in empty writer storage" + ); + } + + #[inline] + fn remaining_capacity(&self) -> usize { + 0 + } + + #[inline] + fn has_remaining_capacity(&self) -> bool { + false + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer/storage/limit.rs b/quic/s2n-quic-core/src/buffer/writer/storage/limit.rs new file mode 100644 index 0000000000..e05dd61e71 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/storage/limit.rs @@ -0,0 +1,137 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + assume, + buffer::{reader::storage::Chunk, writer::Storage}, +}; +use bytes::{buf::UninitSlice, Bytes, BytesMut}; + +/// An implementation that limits the number of bytes that can be written to the underlying storage +pub struct Limit<'a, S: Storage + ?Sized> { + storage: &'a mut S, + remaining_capacity: usize, +} + +impl<'a, S: Storage + ?Sized> Limit<'a, S> { + #[inline] + pub fn new(storage: &'a mut S, remaining_capacity: usize) -> Self { + let remaining_capacity = storage.remaining_capacity().min(remaining_capacity); + Self { + storage, + remaining_capacity, + } + } +} + +impl<'a, S: Storage + ?Sized> Storage for Limit<'a, S> { + const SPECIALIZES_BYTES: bool = S::SPECIALIZES_BYTES; + const SPECIALIZES_BYTES_MUT: bool = S::SPECIALIZES_BYTES_MUT; + + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + debug_assert!(bytes.len() <= self.remaining_capacity); + self.storage.put_slice(bytes); + unsafe { + assume!(self.remaining_capacity >= bytes.len()); + } + self.remaining_capacity -= bytes.len(); + } + + #[inline(always)] + fn put_uninit_slice(&mut self, payload_len: usize, f: F) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + debug_assert!(payload_len <= self.remaining_capacity); + let did_write = self.storage.put_uninit_slice(payload_len, f)?; + if did_write { + unsafe { + assume!(self.remaining_capacity >= payload_len); + } + self.remaining_capacity -= payload_len; + } + Ok(did_write) + } + + #[inline] + fn remaining_capacity(&self) -> usize { + self.storage + .remaining_capacity() + .min(self.remaining_capacity) + } + + #[inline] + fn has_remaining_capacity(&self) -> bool { + self.remaining_capacity > 0 && self.storage.has_remaining_capacity() + } + + #[inline] + fn put_bytes(&mut self, bytes: Bytes) { + let len = bytes.len(); + debug_assert!(len <= self.remaining_capacity); + self.storage.put_bytes(bytes); + unsafe { + assume!(self.remaining_capacity >= len); + } + self.remaining_capacity -= len; + } + + #[inline] + fn put_bytes_mut(&mut self, bytes: BytesMut) { + let len = bytes.len(); + debug_assert!(len <= self.remaining_capacity); + self.storage.put_bytes_mut(bytes); + unsafe { + assume!(self.remaining_capacity >= len); + } + self.remaining_capacity -= len; + } + + #[inline] + fn put_chunk(&mut self, chunk: Chunk) { + let len = chunk.len(); + debug_assert!(len <= self.remaining_capacity); + self.storage.put_chunk(chunk); + unsafe { + assume!(self.remaining_capacity >= len); + } + self.remaining_capacity -= len; + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn limit_test() { + let mut writer: Vec = vec![]; + + { + let mut writer = writer.with_write_limit(5); + assert_eq!(writer.remaining_capacity(), 5); + writer.put_slice(b"hello"); + assert_eq!(writer.remaining_capacity(), 0); + } + + { + let mut writer = writer.with_write_limit(5); + assert_eq!(writer.remaining_capacity(), 5); + writer.put_bytes(Bytes::from_static(b"hello")); + assert_eq!(writer.remaining_capacity(), 0); + } + + { + let mut writer = writer.with_write_limit(5); + assert_eq!(writer.remaining_capacity(), 5); + writer.put_bytes_mut(BytesMut::from(&b"hello"[..])); + assert_eq!(writer.remaining_capacity(), 0); + } + + { + let writer = writer.with_write_limit(0); + assert!(!writer.has_remaining_capacity()); + } + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer/storage/tracked.rs b/quic/s2n-quic-core/src/buffer/writer/storage/tracked.rs new file mode 100644 index 0000000000..0b24e090c1 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/storage/tracked.rs @@ -0,0 +1,112 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::buffer::{reader::storage::Chunk, writer::Storage}; +use bytes::{buf::UninitSlice, Bytes, BytesMut}; + +/// Tracks the number of bytes written to the underlying storage +pub struct Tracked<'a, S: Storage + ?Sized> { + storage: &'a mut S, + written: usize, +} + +impl<'a, S: Storage + ?Sized> Tracked<'a, S> { + #[inline] + pub fn new(storage: &'a mut S) -> Self { + Self { + storage, + written: 0, + } + } + + /// Returns the number of bytes written to the underlying storage + #[inline] + pub fn written_len(&self) -> usize { + self.written + } +} + +impl<'a, S: Storage + ?Sized> Storage for Tracked<'a, S> { + const SPECIALIZES_BYTES: bool = S::SPECIALIZES_BYTES; + const SPECIALIZES_BYTES_MUT: bool = S::SPECIALIZES_BYTES_MUT; + + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + self.storage.put_slice(bytes); + self.written += bytes.len(); + } + + #[inline(always)] + fn put_uninit_slice(&mut self, payload_len: usize, f: F) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + let did_write = self.storage.put_uninit_slice(payload_len, f)?; + if did_write { + self.written += payload_len; + } + Ok(did_write) + } + + #[inline] + fn remaining_capacity(&self) -> usize { + self.storage.remaining_capacity() + } + + #[inline] + fn has_remaining_capacity(&self) -> bool { + self.storage.has_remaining_capacity() + } + + #[inline] + fn put_bytes(&mut self, bytes: Bytes) { + let len = bytes.len(); + self.storage.put_bytes(bytes); + self.written += len; + } + + #[inline] + fn put_bytes_mut(&mut self, bytes: BytesMut) { + let len = bytes.len(); + self.storage.put_bytes_mut(bytes); + self.written += len; + } + + #[inline] + fn put_chunk(&mut self, chunk: Chunk) { + let len = chunk.len(); + self.storage.put_chunk(chunk); + self.written += len; + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn tracked_test() { + let mut writer: Vec = vec![]; + + { + let mut writer = writer.track_write(); + assert_eq!(writer.written_len(), 0); + writer.put_slice(b"hello"); + assert_eq!(writer.written_len(), 5); + } + + { + let mut writer = writer.track_write(); + assert_eq!(writer.written_len(), 0); + writer.put_bytes(Bytes::from_static(b"hello")); + assert_eq!(writer.written_len(), 5); + } + + { + let mut writer = writer.track_write(); + assert_eq!(writer.written_len(), 0); + writer.put_bytes_mut(BytesMut::from(&b"hello"[..])); + assert_eq!(writer.written_len(), 5); + } + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer/storage/uninit_slice.rs b/quic/s2n-quic-core/src/buffer/writer/storage/uninit_slice.rs new file mode 100644 index 0000000000..c4f0453048 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/storage/uninit_slice.rs @@ -0,0 +1,69 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{assume, buffer::writer::Storage}; +use bytes::buf::UninitSlice; + +impl Storage for &mut UninitSlice { + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + unsafe { + assume!(self.len() >= bytes.len()); + } + self[..bytes.len()].copy_from_slice(bytes); + let empty = UninitSlice::new(&mut []); + let next = core::mem::replace(self, empty); + *self = &mut next[bytes.len()..]; + } + + #[inline] + fn put_uninit_slice(&mut self, payload_len: usize, f: F) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + ensure!(self.len() >= payload_len, Ok(false)); + + f(&mut self[..payload_len])?; + + let empty = UninitSlice::new(&mut []); + let next = core::mem::replace(self, empty); + *self = &mut next[payload_len..]; + + Ok(true) + } + + #[inline] + fn remaining_capacity(&self) -> usize { + self.len() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn uninit_slice_test() { + let mut storage = [0; 8]; + + { + let mut writer = UninitSlice::new(&mut storage[..]); + assert_eq!(writer.remaining_capacity(), 8); + writer.put_slice(b"hello"); + assert_eq!(writer.remaining_capacity(), 3); + } + + { + let mut writer = UninitSlice::new(&mut storage[..]); + assert_eq!(writer.remaining_capacity(), 8); + let did_write = writer + .put_uninit_slice(5, |slice| { + slice.copy_from_slice(b"hello"); + >::Ok(()) + }) + .unwrap(); + assert!(did_write); + assert_eq!(writer.remaining_capacity(), 3); + } + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer/storage/write_once.rs b/quic/s2n-quic-core/src/buffer/writer/storage/write_once.rs new file mode 100644 index 0000000000..cfd24c3771 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/storage/write_once.rs @@ -0,0 +1,155 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + buffer::{reader::storage::Chunk, writer::Storage}, + ensure, +}; +use bytes::{buf::UninitSlice, Bytes, BytesMut}; + +/// Only allows a single write into the storage. After that, no more writes are allowed. +/// +/// This can be used for very low latency scenarios where processing the single read is more +/// important than filling the entire storage with as much data as possible. +pub struct WriteOnce<'a, S: Storage + ?Sized> { + storage: &'a mut S, + did_write: bool, +} + +impl<'a, S: Storage + ?Sized> WriteOnce<'a, S> { + #[inline] + pub fn new(storage: &'a mut S) -> Self { + Self { + storage, + did_write: false, + } + } +} + +impl<'a, S: Storage + ?Sized> Storage for WriteOnce<'a, S> { + const SPECIALIZES_BYTES: bool = S::SPECIALIZES_BYTES; + const SPECIALIZES_BYTES_MUT: bool = S::SPECIALIZES_BYTES_MUT; + + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + let did_write = !bytes.is_empty(); + self.storage.put_slice(bytes); + self.did_write |= did_write; + } + + #[inline(always)] + fn put_uninit_slice(&mut self, payload_len: usize, f: F) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + let did_write = self.storage.put_uninit_slice(payload_len, f)?; + self.did_write |= did_write && payload_len > 0; + Ok(did_write) + } + + #[inline] + fn remaining_capacity(&self) -> usize { + ensure!(!self.did_write, 0); + self.storage.remaining_capacity() + } + + #[inline] + fn has_remaining_capacity(&self) -> bool { + ensure!(!self.did_write, false); + self.storage.has_remaining_capacity() + } + + #[inline] + fn put_bytes(&mut self, bytes: Bytes) { + let did_write = !bytes.is_empty(); + self.storage.put_bytes(bytes); + self.did_write |= did_write; + } + + #[inline] + fn put_bytes_mut(&mut self, bytes: BytesMut) { + let did_write = !bytes.is_empty(); + self.storage.put_bytes_mut(bytes); + self.did_write |= did_write; + } + + #[inline] + fn put_chunk(&mut self, chunk: Chunk) { + let did_write = !chunk.is_empty(); + self.storage.put_chunk(chunk); + self.did_write |= did_write; + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn write_once_test() { + let mut writer: Vec = vec![]; + + { + let mut writer = writer.write_once(); + assert!(writer.has_remaining_capacity()); + writer.put_slice(b"hello"); + assert_eq!(writer.remaining_capacity(), 0); + assert!(!writer.has_remaining_capacity()); + } + + { + let mut writer = writer.write_once(); + assert!(writer.has_remaining_capacity()); + writer.put_chunk(b"hello"[..].into()); + assert_eq!(writer.remaining_capacity(), 0); + assert!(!writer.has_remaining_capacity()); + } + + { + let mut writer = writer.write_once(); + assert!(writer.has_remaining_capacity()); + let did_write = writer + .put_uninit_slice(5, |slice| { + slice.copy_from_slice(b"hello"); + >::Ok(()) + }) + .unwrap(); + assert!(did_write); + assert_eq!(writer.remaining_capacity(), 0); + assert!(!writer.has_remaining_capacity()); + } + + { + let mut writer = writer.write_once(); + assert!(writer.has_remaining_capacity()); + writer.put_bytes(Bytes::from_static(b"hello")); + assert_eq!(writer.remaining_capacity(), 0); + assert!(!writer.has_remaining_capacity()); + } + + { + let mut writer = writer.write_once(); + assert!(writer.has_remaining_capacity()); + writer.put_bytes_mut(BytesMut::from(&b"hello"[..])); + assert_eq!(writer.remaining_capacity(), 0); + assert!(!writer.has_remaining_capacity()); + } + } + + // ensures a reader that only reads capacity at the beginning can still write multiple chunks + #[test] + fn copy_into_multi_chunks() { + let mut writer: Vec = vec![]; + { + let mut writer = writer.write_once(); + + assert!(writer.has_remaining_capacity()); + writer.put_slice(b"hello"); + assert!(!writer.has_remaining_capacity()); + writer.put_slice(b"world"); + assert!(!writer.has_remaining_capacity()); + } + + assert_eq!(&writer[..], b"helloworld"); + } +} diff --git a/quic/s2n-quic-core/src/stream/testing.rs b/quic/s2n-quic-core/src/stream/testing.rs index d815b9f53f..3f5c824d87 100644 --- a/quic/s2n-quic-core/src/stream/testing.rs +++ b/quic/s2n-quic-core/src/stream/testing.rs @@ -3,14 +3,12 @@ //! A model that ensures stream data is correctly sent and received between peers -#[cfg(feature = "generator")] -use bolero_generator::*; - -#[cfg(test)] -use bolero::generator::*; - +use crate::buffer::{reader, writer}; use bytes::Bytes; +#[cfg(any(test, feature = "generator"))] +use bolero_generator::*; + static DATA: Bytes = { const INNER: [u8; DATA_LEN] = { let mut data = [0; DATA_LEN]; @@ -125,7 +123,7 @@ impl Data { let amount = ((self.len - self.offset) as usize).min(amount); let chunk = Self::send_one_at(self.offset, amount); - self.seek_forward(chunk.len()); + self.seek_forward(chunk.len() as u64); Some(chunk) } @@ -147,8 +145,82 @@ impl Data { } /// Moves the current offset forward by the provided `len` - pub fn seek_forward(&mut self, len: usize) { - self.offset += len as u64; + pub fn seek_forward(&mut self, len: u64) { + self.offset += len; + } +} + +impl reader::Storage for Data { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + (self.len - self.offset).try_into().unwrap() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result { + if let Some(chunk) = self.send_one(watermark) { + return Ok(chunk.into()); + } + + Ok(Default::default()) + } + + #[inline] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + while let Some(chunk) = self.send_one(dest.remaining_capacity()) { + // if the chunk matches the destination then return it instead of copying + if chunk.len() == dest.remaining_capacity() || self.is_finished() { + return Ok(chunk.into()); + } + + // otherwise copy the chunk into the destination + dest.put_bytes(chunk); + } + + Ok(Default::default()) + } +} + +impl reader::Reader for Data { + #[inline] + fn current_offset(&self) -> crate::varint::VarInt { + self.offset().try_into().unwrap() + } + + #[inline] + fn final_offset(&self) -> Option { + Some(self.len.try_into().unwrap()) + } +} + +impl writer::Storage for Data { + #[inline] + fn put_slice(&mut self, slice: &[u8]) { + self.receive(&[slice]); + } + + #[inline] + fn remaining_capacity(&self) -> usize { + // return max so readers don't know where the stream is expected to end and we can make an + // assertion when they write + usize::MAX + } +} + +impl writer::Writer for Data { + #[inline] + fn read_from(&mut self, reader: &mut R) -> Result<(), crate::buffer::Error> + where + R: reader::Reader + ?Sized, + { + // no need to specialize on anything here + reader.copy_into(self)?; + Ok(()) } } @@ -234,4 +306,17 @@ mod tests { assert!(receiver.is_finished()); }) } + + #[test] + fn buffer_trait_test() { + use writer::Writer as _; + + let mut reader = Data::new(10); + let mut writer = reader; + + writer.read_from(&mut reader).unwrap(); + + assert!(reader.is_finished()); + assert!(writer.is_finished()); + } } diff --git a/quic/s2n-quic-transport/src/space/crypto_stream.rs b/quic/s2n-quic-transport/src/space/crypto_stream.rs index d835fc75e3..e455fa61c6 100644 --- a/quic/s2n-quic-transport/src/space/crypto_stream.rs +++ b/quic/s2n-quic-transport/src/space/crypto_stream.rs @@ -6,7 +6,7 @@ use crate::{ transmission, }; use s2n_quic_core::{ - ack, buffer::ReceiveBuffer, frame::crypto::CryptoRef, transport, varint::VarInt, + ack, buffer::Reassembler, frame::crypto::CryptoRef, transport, varint::VarInt, }; pub type TxCryptoStream = DataSender; @@ -33,7 +33,7 @@ impl OutgoingDataFlowController for CryptoFlowController { #[derive(Debug)] pub struct CryptoStream { pub tx: TxCryptoStream, - pub rx: ReceiveBuffer, + pub rx: Reassembler, is_finished: bool, } @@ -49,7 +49,7 @@ impl CryptoStream { pub fn new() -> Self { Self { tx: TxCryptoStream::new(Default::default(), TX_MAX_BUFFER_CAPACITY), - rx: ReceiveBuffer::default(), + rx: Reassembler::default(), is_finished: false, } } diff --git a/quic/s2n-quic-transport/src/stream/receive_stream.rs b/quic/s2n-quic-transport/src/stream/receive_stream.rs index b2a7d911e0..e6168b0cb2 100644 --- a/quic/s2n-quic-transport/src/stream/receive_stream.rs +++ b/quic/s2n-quic-transport/src/stream/receive_stream.rs @@ -18,9 +18,7 @@ use core::{ }; use s2n_quic_core::{ ack, application, - buffer::{ - ReceiveBuffer as StreamReceiveBuffer, ReceiveBufferError as StreamReceiveBufferError, - }, + buffer::{self, Reassembler}, frame::{stream::StreamRef, MaxStreamData, ResetStream, StopSending, StreamDataBlocked}, packet::number::PacketNumber, stream::{ops, StreamId}, @@ -337,7 +335,7 @@ pub struct ReceiveStream { /// The current state of the stream pub(super) state: ReceiveStreamState, /// Buffer of already received data - pub(super) receive_buffer: StreamReceiveBuffer, + pub(super) receive_buffer: Reassembler, /// The composite flow controller for receiving data pub(super) flow_controller: ReceiveStreamFlowController, /// Synchronizes the `STOP_SENDING` flag towards the peer. @@ -368,7 +366,7 @@ impl ReceiveStream { let mut result = ReceiveStream { state, - receive_buffer: StreamReceiveBuffer::new(), + receive_buffer: Reassembler::new(), flow_controller: ReceiveStreamFlowController::new( connection_flow_controller, initial_window, @@ -447,7 +445,7 @@ impl ReceiveStream { // If this is the last frame then inform the receive_buffer so it can check for any // final size errors. - let write_result = if frame.is_fin { + let write_result: Result<(), buffer::Error> = if frame.is_fin { self.receive_buffer.write_at_fin(frame.offset, frame.data) } else { self.receive_buffer.write_at(frame.offset, frame.data) @@ -460,16 +458,17 @@ impl ReceiveStream { //# FLOW_CONTROL_ERROR if it receives more data than the maximum data //# value that it has sent. This includes violations of remembered //# limits in Early Data; see Section 7.4.1. - StreamReceiveBufferError::OutOfRange => { - transport::Error::FLOW_CONTROL_ERROR - } + buffer::Error::OutOfRange => transport::Error::FLOW_CONTROL_ERROR, //= https://www.rfc-editor.org/rfc/rfc9000#section-4.5 //# Once a final size for a stream is known, it cannot change. If a //# RESET_STREAM or STREAM frame is received indicating a change in the //# final size for the stream, an endpoint SHOULD respond with an error //# of type FINAL_SIZE_ERROR; see Section 11 for details on error //# handling. - StreamReceiveBufferError::InvalidFin => transport::Error::FINAL_SIZE_ERROR, + buffer::Error::InvalidFin => transport::Error::FINAL_SIZE_ERROR, + buffer::Error::ReaderError(_) => { + unreachable!("reader is infallible") + } } .with_reason("data reception error") .with_frame_type(frame.tag().into()) diff --git a/scripts/copyright_check b/scripts/copyright_check index e2003e87de..1846e45b8c 100755 --- a/scripts/copyright_check +++ b/scripts/copyright_check @@ -7,7 +7,7 @@ set -e -S2N_QUIC_FILES=$(find "$PWD" -type f \( -name "*.rs" -o -name "*.py" \) -not \( -path "*/s2n-quic/target/*" -o -path "*/s2n-tls-sys/s2n/*" \)) +S2N_QUIC_FILES=$(find "$PWD" -type f \( -name "*.rs" -o -name "*.py" \) -not -path "*/target/*") FAILED=0