From d57a55fe5041c77245b1a7045f5bac00bcba1fcc Mon Sep 17 00:00:00 2001 From: Cameron Bytheway Date: Tue, 16 Jan 2024 18:31:06 -0700 Subject: [PATCH] feat(s2n-quic-core): add buffer reader --- quic/s2n-quic-bench/src/buffer.rs | 36 +-- quic/s2n-quic-core/src/buffer/duplex.rs | 13 + quic/s2n-quic-core/src/buffer/duplex/split.rs | 70 +++++ quic/s2n-quic-core/src/buffer/error.rs | 46 +++ quic/s2n-quic-core/src/buffer/mod.rs | 8 + quic/s2n-quic-core/src/buffer/reader.rs | 63 +++++ quic/s2n-quic-core/src/buffer/reader/chunk.rs | 64 +++++ .../src/buffer/reader/chunk/buf.rs | 110 ++++++++ .../src/buffer/reader/chunk/bytes.rs | 94 +++++++ .../src/buffer/reader/chunk/full_copy.rs | 50 ++++ .../src/buffer/reader/chunk/io_slice.rs | 228 +++++++++++++++ .../src/buffer/reader/chunk/slice.rs | 53 ++++ .../src/buffer/reader/chunk/tests.rs | 157 +++++++++++ .../src/buffer/reader/chunk/trailer.rs | 94 +++++++ quic/s2n-quic-core/src/buffer/reader/empty.rs | 77 +++++ .../src/buffer/reader/incremental.rs | 168 +++++++++++ quic/s2n-quic-core/src/buffer/reader/limit.rs | 101 +++++++ quic/s2n-quic-core/src/buffer/reader/slice.rs | 103 +++++++ .../src/buffer/receive_buffer.rs | 263 +++++++++++++----- .../src/buffer/receive_buffer/tests.rs | 15 +- quic/s2n-quic-core/src/buffer/writer.rs | 11 + quic/s2n-quic-core/src/buffer/writer/chunk.rs | 75 +++++ .../src/buffer/writer/chunk/buf.rs | 111 ++++++++ .../src/buffer/writer/chunk/byte_queue.rs | 72 +++++ .../src/buffer/writer/chunk/discard.rs | 17 ++ .../src/buffer/writer/chunk/empty.rs | 22 ++ .../src/buffer/writer/chunk/limit.rs | 80 ++++++ .../src/buffer/writer/chunk/tracked.rs | 76 +++++ .../src/buffer/writer/chunk/uninit_slice.rs | 36 +++ quic/s2n-quic-core/src/stream/testing.rs | 64 ++++- .../src/stream/receive_stream.rs | 13 +- scripts/copyright_check | 2 +- 32 files changed, 2274 insertions(+), 118 deletions(-) create mode 100644 quic/s2n-quic-core/src/buffer/duplex.rs create mode 100644 quic/s2n-quic-core/src/buffer/duplex/split.rs create mode 100644 quic/s2n-quic-core/src/buffer/error.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/chunk.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/chunk/buf.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/chunk/bytes.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/chunk/full_copy.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/chunk/io_slice.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/chunk/slice.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/chunk/tests.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/chunk/trailer.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/empty.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/incremental.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/limit.rs create mode 100644 quic/s2n-quic-core/src/buffer/reader/slice.rs create mode 100644 quic/s2n-quic-core/src/buffer/writer.rs create mode 100644 quic/s2n-quic-core/src/buffer/writer/chunk.rs create mode 100644 quic/s2n-quic-core/src/buffer/writer/chunk/buf.rs create mode 100644 quic/s2n-quic-core/src/buffer/writer/chunk/byte_queue.rs create mode 100644 quic/s2n-quic-core/src/buffer/writer/chunk/discard.rs create mode 100644 quic/s2n-quic-core/src/buffer/writer/chunk/empty.rs create mode 100644 quic/s2n-quic-core/src/buffer/writer/chunk/limit.rs create mode 100644 quic/s2n-quic-core/src/buffer/writer/chunk/tracked.rs create mode 100644 quic/s2n-quic-core/src/buffer/writer/chunk/uninit_slice.rs diff --git a/quic/s2n-quic-bench/src/buffer.rs b/quic/s2n-quic-bench/src/buffer.rs index a3835f22fd..f3608c71b1 100644 --- a/quic/s2n-quic-bench/src/buffer.rs +++ b/quic/s2n-quic-bench/src/buffer.rs @@ -2,7 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 use criterion::{black_box, BenchmarkId, Criterion, Throughput}; -use s2n_quic_core::{buffer::ReceiveBuffer, varint::VarInt}; +use s2n_quic_core::{ + buffer::{reader::Chunk as _, writer, ReceiveBuffer}, + varint::VarInt, +}; pub fn benchmarks(c: &mut Criterion) { let mut group = c.benchmark_group("buffer"); @@ -14,6 +17,7 @@ pub fn benchmarks(c: &mut Criterion) { group.bench_with_input(BenchmarkId::new("skip", size), &input, |b, _input| { let mut buffer = ReceiveBuffer::new(); + let size = VarInt::try_from(size).unwrap(); b.iter(move || { buffer.skip(black_box(size)).unwrap(); }); @@ -25,7 +29,8 @@ pub fn benchmarks(c: &mut Criterion) { let len = VarInt::new(input.len() as _).unwrap(); b.iter(move || { buffer.write_at(offset, input).unwrap(); - buffer.copy_into_buf(&mut NoOpBuf); + // Avoid oversampling the `pop` implementation + buffer.force_copy_into(&mut writer::chunk::Discard).unwrap(); offset += len; }); }); @@ -44,34 +49,11 @@ pub fn benchmarks(c: &mut Criterion) { buffer.write_at(first_offset, input).unwrap(); let second_offset = offset; buffer.write_at(second_offset, input).unwrap(); - buffer.copy_into_buf(&mut NoOpBuf); + // Avoid oversampling the `pop` implementation + buffer.force_copy_into(&mut writer::chunk::Discard).unwrap(); offset = first_offset + len; }); }, ); } } - -/// A BufMut implementation that doesn't actually copy data into it -/// -/// This is used to avoid oversampling the `pop` implementation for -/// `write_at` benchmarks. -struct NoOpBuf; - -unsafe impl bytes::BufMut for NoOpBuf { - #[inline] - fn remaining_mut(&self) -> usize { - usize::MAX - } - - #[inline] - unsafe fn advance_mut(&mut self, _cnt: usize) {} - - #[inline] - fn put_slice(&mut self, _slice: &[u8]) {} - - #[inline] - fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice { - unimplemented!() - } -} diff --git a/quic/s2n-quic-core/src/buffer/duplex.rs b/quic/s2n-quic-core/src/buffer/duplex.rs new file mode 100644 index 0000000000..b26311006d --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/duplex.rs @@ -0,0 +1,13 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Error, Reader, Writer}; +use crate::varint::VarInt; + +mod split; + +pub use split::Split; + +pub trait Duplex: Reader + Writer { + fn skip(&mut self, len: VarInt, final_offset: Option) -> Result<(), Error>; +} diff --git a/quic/s2n-quic-core/src/buffer/duplex/split.rs b/quic/s2n-quic-core/src/buffer/duplex/split.rs new file mode 100644 index 0000000000..9e15fda387 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/duplex/split.rs @@ -0,0 +1,70 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::buffer::{ + reader::Reader, + writer::{Chunk, Writer}, + Duplex, Error, +}; + +pub struct Split<'a, C: Chunk, D: Duplex + ?Sized> { + chunk: &'a mut C, + duplex: &'a mut D, +} + +impl<'a, C: Chunk, D: Duplex + ?Sized> Split<'a, C, D> { + #[inline] + pub fn new(chunk: &'a mut C, duplex: &'a mut D) -> Self { + Self { chunk, duplex } + } +} + +impl<'a, C: Chunk, D: Duplex + ?Sized> Writer + for Split<'a, C, D> +{ + #[inline] + fn copy_from(&mut self, reader: &mut R) -> Result<(), Error> { + let initial_offset = reader.current_offset(); + let final_offset = reader.final_offset(); + let is_contiguous = initial_offset == self.duplex.current_offset(); + + { + // if the chunk specializes writing zero-copy Bytes/BytesMut, then just write to the + // receive buffer, since that's what it stores + let mut should_delegate = C::SPECIALIZES_BYTES || C::SPECIALIZES_BYTES_MUT; + + // if this packet is non-contiguous, then delegate to the wrapped writer + should_delegate |= !is_contiguous; + + // if the chunk doesn't have any remaining capacity, then delegate + should_delegate |= !self.chunk.has_remaining_capacity(); + + if should_delegate { + self.duplex.copy_from(reader)?; + + if !self.duplex.buffer_is_empty() && self.chunk.has_remaining_capacity() { + self.duplex + .copy_into(self.chunk) + .expect("duplex error is infallible"); + } + + return Ok(()); + } + } + + debug_assert!(self.chunk.has_remaining_capacity()); + + reader.copy_into(self.chunk)?; + let write_len = initial_offset - reader.current_offset(); + + self.duplex + .skip(write_len, final_offset) + .map_err(Error::mapped)?; + + if !reader.buffer_is_empty() { + self.duplex.copy_from(reader)?; + } + + Ok(()) + } +} diff --git a/quic/s2n-quic-core/src/buffer/error.rs b/quic/s2n-quic-core/src/buffer/error.rs new file mode 100644 index 0000000000..1c321d2118 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/error.rs @@ -0,0 +1,46 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum Error { + /// An invalid data range was provided + OutOfRange, + /// The provided final size was invalid for the buffer's state + InvalidFin, + /// The provided reader failed + ReaderError(Reader), +} + +impl From for Error { + #[inline] + fn from(reader: Reader) -> Self { + Self::ReaderError(reader) + } +} + +impl Error { + #[inline] + pub fn mapped(error: Error) -> Error { + match error { + Error::OutOfRange => Error::OutOfRange, + Error::InvalidFin => Error::InvalidFin, + Error::ReaderError(_) => unreachable!(), + } + } +} + +#[cfg(feature = "std")] +impl std::error::Error for Error {} + +impl core::fmt::Display for Error { + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + match self { + Self::OutOfRange => write!(f, "write extends out of the maximum possible offset"), + Self::InvalidFin => write!( + f, + "write modifies the final offset in a non-compliant manner" + ), + Self::ReaderError(reader) => write!(f, "the provided reader failed with: {reader}"), + } + } +} diff --git a/quic/s2n-quic-core/src/buffer/mod.rs b/quic/s2n-quic-core/src/buffer/mod.rs index d252d8b391..297b757f31 100644 --- a/quic/s2n-quic-core/src/buffer/mod.rs +++ b/quic/s2n-quic-core/src/buffer/mod.rs @@ -1,6 +1,14 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +pub mod duplex; +mod error; +pub mod reader; mod receive_buffer; +pub mod writer; +pub use duplex::Duplex; +pub use error::Error; +pub use reader::Reader; pub use receive_buffer::*; +pub use writer::Writer; diff --git a/quic/s2n-quic-core/src/buffer/reader.rs b/quic/s2n-quic-core/src/buffer/reader.rs new file mode 100644 index 0000000000..00c73c78df --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader.rs @@ -0,0 +1,63 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::varint::VarInt; + +pub mod chunk; +mod empty; +pub mod incremental; +mod limit; +mod slice; + +pub use chunk::Chunk; +pub use empty::Empty; +pub use incremental::Incremental; +pub use limit::Limit; +pub use slice::Slice; + +pub trait Reader: Chunk { + /// Returns the currently read offset for the stream + fn current_offset(&self) -> VarInt; + + /// Returns the final offset for the stream + fn final_offset(&self) -> Option; + + /// Returns `true` if the reader has the final offset buffered + #[inline] + fn has_buffered_fin(&self) -> bool { + self.final_offset().map_or(false, |fin| { + let buffered_end = self + .current_offset() + .as_u64() + .saturating_add(self.buffered_len() as u64); + fin == buffered_end + }) + } + + /// Returns `true` if the reader is finished producing data + #[inline] + fn is_consumed(&self) -> bool { + self.final_offset() + .map_or(false, |fin| fin == self.current_offset()) + } + + /// Limits the maximum offset that the caller can read from the reader + #[inline] + fn with_max_data(&mut self, max_data: VarInt) -> Limit { + let max_buffered_len = max_data.saturating_sub(self.current_offset()); + let max_buffered_len = max_buffered_len.as_u64().min(self.buffered_len() as u64) as usize; + self.with_limit(max_buffered_len) + } + + /// Limits the maximum amount of data that the caller can read from the reader + #[inline] + fn with_limit(&mut self, max_buffered_len: usize) -> Limit { + Limit::new(self, max_buffered_len) + } + + /// Temporarily clears the buffer for the reader, while preserving the offsets + #[inline] + fn with_empty_buffer(&self) -> Empty { + Empty::new(self) + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/chunk.rs b/quic/s2n-quic-core/src/buffer/reader/chunk.rs new file mode 100644 index 0000000000..e14fa58d01 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/chunk.rs @@ -0,0 +1,64 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +mod buf; +mod bytes; +mod full_copy; +mod io_slice; +mod slice; +mod trailer; + +#[cfg(test)] +mod tests; + +pub use buf::Buf; +pub use full_copy::FullCopy; +pub use io_slice::IoSlice; +pub use trailer::Trailer; + +pub trait Chunk { + type Error; + + /// Returns the length of the chunk + fn buffered_len(&self) -> usize; + + /// Returns if the chunk is empty + #[inline] + fn buffer_is_empty(&self) -> bool { + self.buffered_len() == 0 + } + + /// Reads the current trailer for the chunk + fn read_trailer(&mut self, watermark: usize) -> Result, Self::Error>; + + /// Copies the chunk of bytes into `dest`, with a trailing set of bytes. + /// + /// Implementations should either fill the `dest` completely or exhaust the buffered data. + /// + /// The chunk may optionally return a `Trailer`, which can be used by the caller to defer + /// copying the trailing chunk until later. + fn partial_copy_into(&mut self, dest: &mut Dest) -> Result, Self::Error> + where + Dest: crate::buffer::writer::Chunk; + + /// Forces the entire chunk to be copied + /// + /// The returned `Trailer` will always be empty. + #[inline] + fn full_copy(&mut self) -> FullCopy { + FullCopy::new(self) + } + + /// Copies the chunk of bytes into `dest`. + /// + /// Implementations should either fill the `dest` completely or exhaust the buffered data. + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: crate::buffer::writer::Chunk, + { + let mut trailer = self.partial_copy_into(dest)?; + let _: Result<(), core::convert::Infallible> = trailer.copy_into(dest); + Ok(()) + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/chunk/buf.rs b/quic/s2n-quic-core/src/buffer/reader/chunk/buf.rs new file mode 100644 index 0000000000..1f6bb062da --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/chunk/buf.rs @@ -0,0 +1,110 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Chunk, Trailer}; +use crate::assume; +use core::cmp::Ordering; + +pub struct Buf<'a, B: bytes::Buf> { + buf: &'a mut B, + pending: usize, +} + +impl<'a, B> Buf<'a, B> +where + B: bytes::Buf, +{ + #[inline] + pub fn new(buf: &'a mut B) -> Self { + Self { buf, pending: 0 } + } + + #[inline] + fn commit_pending(&mut self) { + if self.pending > 0 { + unsafe { + assume!(self.buf.remaining() >= self.pending); + assume!(self.buf.chunk().len() >= self.pending); + } + self.buf.advance(self.pending); + self.pending = 0; + } + } +} + +impl<'a, B> Chunk for Buf<'a, B> +where + B: bytes::Buf, +{ + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + unsafe { + assume!(self.buf.remaining() >= self.pending); + assume!(self.buf.chunk().len() >= self.pending); + } + self.buf.remaining() - self.pending + } + + #[inline] + fn read_trailer(&mut self, watermark: usize) -> Result { + self.commit_pending(); + let chunk = self.buf.chunk(); + let len = chunk.len().min(watermark); + self.pending = len; + Ok(chunk[..len].into()) + } + + #[inline] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + self.commit_pending(); + + ensure!(dest.has_remaining_capacity(), Ok(Trailer::empty())); + + loop { + let chunk_len = self.buf.chunk().len(); + + if chunk_len == 0 { + debug_assert_eq!( + self.buf.remaining(), + 0, + "buf returned empty chunk with remaining bytes" + ); + return Ok(Trailer::empty()); + } + + match chunk_len.cmp(&dest.remaining_capacity()) { + Ordering::Less if self.buf.has_remaining() => { + dest.put_slice(self.buf.chunk()); + self.buf.advance(chunk_len); + continue; + } + Ordering::Less | Ordering::Equal => { + let chunk = self.buf.chunk(); + self.pending = chunk.len(); + return Ok(chunk.into()); + } + Ordering::Greater => { + let len = dest.remaining_capacity(); + let chunk = &self.buf.chunk()[..len]; + self.pending = len; + return Ok(chunk.into()); + } + } + } + } +} + +impl<'a, B> Drop for Buf<'a, B> +where + B: bytes::Buf, +{ + #[inline] + fn drop(&mut self) { + self.commit_pending(); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/chunk/bytes.rs b/quic/s2n-quic-core/src/buffer/reader/chunk/bytes.rs new file mode 100644 index 0000000000..307274963d --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/chunk/bytes.rs @@ -0,0 +1,94 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Chunk, Trailer}; +use bytes::{Bytes, BytesMut}; + +impl Chunk for BytesMut { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len() + } + + #[inline] + fn read_trailer(&mut self, watermark: usize) -> Result { + let len = self.len().min(watermark); + Ok(self.split_to(len).into()) + } + + #[inline] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + self.read_trailer(dest.remaining_capacity()) + } + + #[inline] + fn copy_into( + &mut self, + dest: &mut Dest, + ) -> Result<(), Self::Error> { + let watermark = self.len().min(dest.remaining_capacity()); + + if Dest::SPECIALIZES_BYTES_MUT { + let buffer = self.split_to(watermark); + dest.put_bytes_mut(buffer); + } else if Dest::SPECIALIZES_BYTES { + let buffer = self.split_to(watermark); + dest.put_bytes(buffer.freeze()); + } else { + // copy bytes into the destination buf + dest.put_slice(&self[..watermark]); + // advance the chunk rather than splitting to avoid refcount churn + bytes::Buf::advance(self, watermark) + } + + Ok(()) + } +} + +impl Chunk for Bytes { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len() + } + + #[inline] + fn read_trailer(&mut self, watermark: usize) -> Result { + let len = self.len().min(watermark); + Ok(self.split_to(len).into()) + } + + #[inline] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + self.read_trailer(dest.remaining_capacity()) + } + + #[inline] + fn copy_into( + &mut self, + dest: &mut Dest, + ) -> Result<(), Self::Error> { + let watermark = self.len().min(dest.remaining_capacity()); + + if Dest::SPECIALIZES_BYTES { + let buffer = self.split_to(watermark); + dest.put_bytes(buffer); + } else { + // copy bytes into the destination buf + dest.put_slice(&self[..watermark]); + // advance the chunk rather than splitting to avoid refcount churn + bytes::Buf::advance(self, watermark) + } + + Ok(()) + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/chunk/full_copy.rs b/quic/s2n-quic-core/src/buffer/reader/chunk/full_copy.rs new file mode 100644 index 0000000000..b0a97a19f1 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/chunk/full_copy.rs @@ -0,0 +1,50 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Chunk, Trailer}; + +#[derive(Debug)] +pub struct FullCopy<'a, C: Chunk + ?Sized>(&'a mut C); + +impl<'a, C: Chunk + ?Sized> FullCopy<'a, C> { + #[inline] + pub fn new(chunk: &'a mut C) -> Self { + Self(chunk) + } +} + +impl<'a, C: Chunk + ?Sized> Chunk for FullCopy<'a, C> { + type Error = C::Error; + + #[inline] + fn buffered_len(&self) -> usize { + self.0.buffered_len() + } + + #[inline] + fn buffer_is_empty(&self) -> bool { + self.0.buffer_is_empty() + } + + #[inline] + fn read_trailer(&mut self, watermark: usize) -> Result { + self.0.read_trailer(watermark) + } + + #[inline] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + self.0.copy_into(dest)?; + Ok(Trailer::empty()) + } + + #[inline] + fn copy_into( + &mut self, + dest: &mut Dest, + ) -> Result<(), Self::Error> { + self.0.copy_into(dest) + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/chunk/io_slice.rs b/quic/s2n-quic-core/src/buffer/reader/chunk/io_slice.rs new file mode 100644 index 0000000000..ab2f27da2a --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/chunk/io_slice.rs @@ -0,0 +1,228 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Chunk, Trailer}; +use crate::assume; +use core::cmp::Ordering; + +pub struct IoSlice<'a, T> { + len: usize, + head: &'a [u8], + buf: &'a [T], +} + +impl<'a, T> IoSlice<'a, T> +where + T: core::ops::Deref, +{ + #[inline] + pub fn new(buf: &'a [T]) -> Self { + let mut len = 0; + + let mut first_non_empty = None; + for (idx, buf) in buf.iter().enumerate() { + len += buf.len(); + if !buf.is_empty() && first_non_empty.is_none() { + first_non_empty = Some(idx); + } + } + + if let Some(idx) = first_non_empty { + let buf = &buf[idx..]; + + unsafe { + assume!(!buf.is_empty()); + } + + let mut slice = Self { + len, + head: &[], + buf, + }; + slice.advance_buf_once(); + slice + } else { + Self { + len: 0, + head: &[], + buf: &[], + } + } + } + + #[inline(always)] + fn advance_buf(&mut self) { + while self.head.is_empty() && !self.buf.is_empty() { + self.advance_buf_once(); + } + } + + #[inline(always)] + fn advance_buf_once(&mut self) { + let (head, tail) = self.buf.split_at(1); + self.head = &head[0][..]; + self.buf = tail; + } + + #[inline] + fn sub_len(&mut self, len: usize) { + unsafe { + assume!(self.len >= len); + } + self.set_len(self.len - len); + } + + #[inline] + fn set_len(&mut self, len: usize) { + if cfg!(debug_assertions) { + let mut computed = self.head.len(); + for buf in self.buf.iter() { + computed += buf.len(); + } + assert_eq!(len, computed); + } + self.len = len; + } +} + +impl<'a, T> bytes::Buf for IoSlice<'a, T> +where + T: core::ops::Deref, +{ + #[inline] + fn remaining(&self) -> usize { + self.len + } + + #[inline] + fn chunk(&self) -> &[u8] { + self.head + } + + #[inline] + fn advance(&mut self, mut cnt: usize) { + assert!(cnt <= self.len); + let new_len = self.len - cnt; + + if new_len == 0 { + self.head = &[]; + self.buf = &[]; + self.set_len(new_len); + return; + } + + while cnt > 0 { + let len = self.head.len().min(cnt); + cnt -= len; + + if len >= self.head.len() { + unsafe { + assume!(!self.buf.is_empty()); + } + + self.head = &[]; + self.advance_buf(); + continue; + } + + self.head = &self.head[len..]; + break; + } + + self.set_len(new_len); + } +} + +impl<'a, T> Chunk for IoSlice<'a, T> +where + T: core::ops::Deref, +{ + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len + } + + #[inline] + fn read_trailer(&mut self, watermark: usize) -> Result { + // we only have one chunk left so do the happy path + if self.buf.is_empty() { + let len = self.head.len().min(watermark); + let (head, tail) = self.head.split_at(len); + self.head = tail; + self.set_len(tail.len()); + return Ok(head.into()); + } + + // head can be returned and we need to take the next buf entry + if self.head.len() >= watermark { + let head = self.head; + self.head = &[]; + unsafe { + assume!(!self.buf.is_empty()); + } + self.advance_buf(); + self.sub_len(head.len()); + return Ok(head.into()); + } + + // we just need to split off the current head and return it + let (head, tail) = self.head.split_at(watermark); + self.head = tail; + self.sub_len(head.len()); + Ok(head.into()) + } + + #[inline] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + ensure!(dest.has_remaining_capacity(), Ok(Default::default())); + + loop { + // we only have one chunk left so do the happy path + if self.buf.is_empty() { + let len = self.head.len().min(dest.remaining_capacity()); + let (head, tail) = self.head.split_at(len); + self.head = tail; + self.set_len(tail.len()); + return Ok(head.into()); + } + + match self.head.len().cmp(&dest.remaining_capacity()) { + // head needs to be copied into dest and we need to take the next buf entry + Ordering::Less => { + let len = self.head.len(); + dest.put_slice(self.head); + self.head = &[]; + unsafe { + assume!(!self.buf.is_empty()); + } + self.advance_buf(); + self.sub_len(len); + continue; + } + // head can be returned and we need to take the next buf entry + Ordering::Equal => { + let head = self.head; + self.head = &[]; + unsafe { + assume!(!self.buf.is_empty()); + } + self.advance_buf(); + self.sub_len(head.len()); + return Ok(head.into()); + } + // we just need to split off the current head and return it + Ordering::Greater => { + let (head, tail) = self.head.split_at(dest.remaining_capacity()); + self.head = tail; + self.sub_len(head.len()); + return Ok(head.into()); + } + } + } + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/chunk/slice.rs b/quic/s2n-quic-core/src/buffer/reader/chunk/slice.rs new file mode 100644 index 0000000000..6818dbab83 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/chunk/slice.rs @@ -0,0 +1,53 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Chunk, Trailer}; + +macro_rules! impl_slice { + ($ty:ty, $split:ident) => { + impl Chunk for $ty { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len() + } + + #[inline] + fn buffer_is_empty(&self) -> bool { + self.is_empty() + } + + #[inline] + fn read_trailer(&mut self, watermark: usize) -> Result { + let len = self.len().min(watermark); + let (trailer, remaining) = core::mem::take(self).$split(len); + *self = remaining; + Ok((&*trailer).into()) + } + + #[inline] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + self.read_trailer(dest.remaining_capacity()) + } + + #[inline] + fn copy_into( + &mut self, + dest: &mut Dest, + ) -> Result<(), Self::Error> { + let len = self.len().min(dest.remaining_capacity()); + let (trailer, remaining) = core::mem::take(self).$split(len); + *self = remaining; + dest.put_slice(trailer); + Ok(()) + } + } + }; +} + +impl_slice!(&[u8], split_at); +impl_slice!(&mut [u8], split_at_mut); diff --git a/quic/s2n-quic-core/src/buffer/reader/chunk/tests.rs b/quic/s2n-quic-core/src/buffer/reader/chunk/tests.rs new file mode 100644 index 0000000000..fc36660204 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/chunk/tests.rs @@ -0,0 +1,157 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::*; + +/// ensures each implementation returns a trailing chunk correctly +#[test] +#[cfg_attr(miri, ignore)] // This test is too expensive for miri to complete in a reasonable amount of time +fn trailer_test() { + let mut dest = vec![]; + let mut source = vec![]; + bolero::check!() + .with_type::<(u16, u16)>() + .for_each(|(dest_len, source_len)| { + source.resize(*source_len as usize, 42); + + let dest_len = (*source_len).min(*dest_len) as usize; + dest.resize(dest_len, 0); + let expected = &source[..dest_len]; + let dest = &mut dest[..]; + + // direct implementation + { + let mut chunk = &source[..]; + let mut target = &mut dest[..]; + + let trailer = chunk.partial_copy_into(&mut target).unwrap(); + assert_eq!(expected, &*trailer); + assert!( + dest.iter().all(|b| *b == 0), + "no bytes should be copied into dest" + ); + } + + // IoSlice implementation + { + let io_slice = [&source[..]]; + let mut chunk = IoSlice::new(&io_slice); + let mut target = &mut dest[..]; + + let trailer = chunk.partial_copy_into(&mut target).unwrap(); + assert_eq!(expected, &*trailer); + assert!( + dest.iter().all(|b| *b == 0), + "no bytes should be copied into dest" + ); + } + + // Buf implementation + { + let mut slice = &source[..]; + let mut chunk = Buf::new(&mut slice); + let mut target = &mut dest[..]; + + let trailer = chunk.partial_copy_into(&mut target).unwrap(); + assert_eq!(expected, &*trailer); + assert!( + dest.iter().all(|b| *b == 0), + "no bytes should be copied into dest" + ); + } + + // full_copy + { + let mut source = &source[..]; + let mut chunk = source.full_copy(); + let mut target = &mut dest[..]; + + let trailer = chunk.partial_copy_into(&mut target).unwrap(); + assert!(trailer.is_empty()); + assert_eq!(expected, dest); + dest.fill(0); + } + }); +} + +/// ensures each chunk type correctly copies multiple chunks into the destination +#[test] +#[cfg_attr(miri, ignore)] // This test is too expensive for miri to complete in a reasonable amount of time +fn io_slice_test() { + let mut dest = vec![]; + let mut source: Vec> = vec![]; + let mut pool = vec![]; + let mut expected = vec![]; + bolero::check!() + .with_type::<(u16, Vec)>() + .for_each(|(dest_len, source_lens)| { + while source.len() > source_lens.len() { + pool.push(source.pop().unwrap()); + } + + while source.len() < source_lens.len() { + source.push(pool.pop().unwrap_or_default()); + } + + let mut source_len = 0; + let mut last_chunk_idx = 0; + let mut last_chunk_len = 0; + let mut remaining_len = *dest_len as usize; + for (idx, (len, source)) in source_lens.iter().zip(&mut source).enumerate() { + let fill = (idx + 1) as u8; + let len = *len as usize; + source.resize(len, fill); + source.fill(fill); + if len > 0 && remaining_len > 0 { + last_chunk_idx = idx; + last_chunk_len = len.min(remaining_len); + } + source_len += len; + remaining_len = remaining_len.saturating_sub(len); + } + + let dest_len = source_len.min(*dest_len as usize); + dest.resize(dest_len, 0); + dest.fill(0); + let dest = &mut dest[..]; + + expected.resize(dest_len, 0); + expected.fill(0); + + { + // don't copy the last chunk, since that should be returned + let source = &source[..last_chunk_idx]; + crate::slice::vectored_copy(source, &mut [&mut expected[..]]); + } + + let expected_chunk = source + .get(last_chunk_idx) + .map(|v| &v[..last_chunk_len]) + .unwrap_or(&[]); + + // IoSlice implementation + { + let mut source = IoSlice::new(&source); + let mut target = &mut dest[..]; + + let chunk = source.partial_copy_into(&mut target).unwrap(); + + assert_eq!(expected, dest); + assert_eq!(expected_chunk, &*chunk); + // reset the destination + dest.fill(0); + } + + // Buf implementation + { + let mut source = IoSlice::new(&source); + let mut source = Buf::new(&mut source); + let mut target = &mut dest[..]; + + let chunk = source.partial_copy_into(&mut target).unwrap(); + + assert_eq!(expected, dest); + assert_eq!(expected_chunk, &*chunk); + } + }); +} diff --git a/quic/s2n-quic-core/src/buffer/reader/chunk/trailer.rs b/quic/s2n-quic-core/src/buffer/reader/chunk/trailer.rs new file mode 100644 index 0000000000..992b0949b6 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/chunk/trailer.rs @@ -0,0 +1,94 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use bytes::{Bytes, BytesMut}; + +/// Trailing chunk from a copy operation +/// +/// This is returned from [`Chunk::copy_into_slice`] to allow the caller to defer copying the final +/// chunk until later. +#[derive(Clone, Debug)] +#[must_use = "trailing data cannot be discarded"] +pub enum Trailer<'a> { + Slice(&'a [u8]), + Bytes(Bytes), + BytesMut(BytesMut), +} + +impl<'a> Default for Trailer<'a> { + #[inline] + fn default() -> Self { + Self::empty() + } +} + +impl<'a> Trailer<'a> { + #[inline] + pub fn empty() -> Self { + Self::Slice(&[]) + } +} + +impl<'a> From<&'a [u8]> for Trailer<'a> { + #[inline] + fn from(chunk: &'a [u8]) -> Self { + Self::Slice(chunk) + } +} + +impl<'a> From for Trailer<'a> { + #[inline] + fn from(chunk: Bytes) -> Self { + Self::Bytes(chunk) + } +} + +impl<'a> From for Trailer<'a> { + #[inline] + fn from(chunk: BytesMut) -> Self { + Self::BytesMut(chunk) + } +} + +impl<'a> core::ops::Deref for Trailer<'a> { + type Target = [u8]; + + #[inline] + fn deref(&self) -> &Self::Target { + match self { + Self::Slice(t) => t, + Self::Bytes(t) => t, + Self::BytesMut(t) => t, + } + } +} + +impl<'a> super::Chunk for Trailer<'a> { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len() + } + + #[inline] + fn read_trailer(&mut self, watermark: usize) -> Result { + match self { + Self::Slice(v) => v.read_trailer(watermark), + Self::Bytes(v) => v.read_trailer(watermark), + Self::BytesMut(v) => v.read_trailer(watermark), + } + } + + #[inline] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + match self { + Self::Slice(v) => v.partial_copy_into(dest), + Self::Bytes(v) => v.partial_copy_into(dest), + Self::BytesMut(v) => v.partial_copy_into(dest), + } + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/empty.rs b/quic/s2n-quic-core/src/buffer/reader/empty.rs new file mode 100644 index 0000000000..6770b50383 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/empty.rs @@ -0,0 +1,77 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{chunk, Chunk, Reader, VarInt}; + +/// Returns an empty buffer for the current offset of an inner reader +#[derive(Debug)] +pub struct Empty<'a, R: Reader + ?Sized>(&'a R); + +impl<'a, R: Reader + ?Sized> Empty<'a, R> { + #[inline] + pub fn new(reader: &'a R) -> Self { + Self(reader) + } +} + +impl<'a, R: Reader + ?Sized> Chunk for Empty<'a, R> { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + 0 + } + + #[inline] + fn read_trailer(&mut self, _watermark: usize) -> Result { + Ok(chunk::Trailer::empty()) + } + + #[inline] + fn partial_copy_into( + &mut self, + _dest: &mut Dest, + ) -> Result { + Ok(chunk::Trailer::empty()) + } +} + +impl<'a, R: Reader + ?Sized> Reader for Empty<'a, R> { + #[inline] + fn current_offset(&self) -> VarInt { + self.0.current_offset() + } + + #[inline] + fn final_offset(&self) -> Option { + self.0.final_offset() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::stream::testing::Data; + + #[test] + fn empty_test() { + let mut reader = Data::new(1000); + assert_eq!(reader.buffered_len(), 1000); + + { + assert_eq!(reader.with_empty_buffer().buffered_len(), 0); + } + + let mut dest = &mut [0u8; 16][..]; + let trailer = reader.partial_copy_into(&mut dest).unwrap(); + assert_eq!(trailer.len(), 16); + + let mut reader = reader.with_empty_buffer(); + + assert_eq!(reader.buffered_len(), 0); + assert!(reader.buffer_is_empty()); + + let trailer = reader.partial_copy_into(&mut dest).unwrap(); + assert_eq!(trailer.len(), 0); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/incremental.rs b/quic/s2n-quic-core/src/buffer/reader/incremental.rs new file mode 100644 index 0000000000..2d39df3dd9 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/incremental.rs @@ -0,0 +1,168 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{chunk, Chunk, Reader, VarInt}; +use crate::{buffer, ensure}; + +/// Implements an incremental reader that handles temporary chunks as the stream data +/// +/// This is useful for scenarios where the the stream isn't completely buffered in memory and +/// chunks come in gradually. +#[derive(Debug, Default)] +pub struct Incremental { + current_offset: VarInt, + final_offset: Option, +} + +impl Incremental { + #[inline] + pub fn with_chunk<'a, C: Chunk>( + &'a mut self, + chunk: &'a mut C, + is_fin: bool, + ) -> Result, buffer::Error> { + let mut chunk = WithChunk { + incremental: self, + chunk, + }; + + if is_fin { + chunk.set_fin()?; + } else { + ensure!( + chunk.incremental.final_offset.is_none(), + Err(buffer::Error::InvalidFin) + ); + } + + Ok(chunk) + } +} + +impl Chunk for Incremental { + type Error = core::convert::Infallible; + + #[inline] + fn read_trailer(&mut self, _watermark: usize) -> Result { + Ok(Default::default()) + } + + #[inline] + fn partial_copy_into( + &mut self, + _dest: &mut Dest, + ) -> Result { + Ok(Default::default()) + } + + #[inline] + fn buffered_len(&self) -> usize { + 0 + } +} + +impl Reader for Incremental { + #[inline] + fn current_offset(&self) -> VarInt { + self.current_offset + } + + #[inline] + fn final_offset(&self) -> Option { + self.final_offset + } +} + +pub struct WithChunk<'a, C: Chunk> { + incremental: &'a mut Incremental, + chunk: &'a mut C, +} + +impl<'a, C: Chunk> WithChunk<'a, C> { + #[inline] + pub fn set_fin(&mut self) -> Result<&mut Self, buffer::Error> { + let final_offset = self + .incremental + .current_offset + .checked_add_usize(self.buffered_len()) + .ok_or(buffer::Error::OutOfRange)?; + + // make sure the final length doesn't change + if let Some(current) = self.incremental.final_offset { + ensure!(final_offset == current, Err(buffer::Error::InvalidFin)); + } + + self.incremental.final_offset = Some(final_offset); + + Ok(self) + } +} + +impl<'a, C: Chunk> Chunk for WithChunk<'a, C> { + type Error = C::Error; + + #[inline] + fn read_trailer(&mut self, watermark: usize) -> Result { + let trailer = self.chunk.read_trailer(watermark)?; + self.incremental.current_offset += trailer.len(); + Ok(trailer) + } + + #[inline] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + let len = dest.remaining_capacity(); + let trailer = self.chunk.partial_copy_into(dest)?; + self.incremental.current_offset += len; + Ok(trailer) + } + + #[inline] + fn buffered_len(&self) -> usize { + self.chunk.buffered_len() + } +} + +impl<'a, C: Chunk> Reader for WithChunk<'a, C> { + #[inline] + fn current_offset(&self) -> VarInt { + self.incremental.current_offset() + } + + #[inline] + fn final_offset(&self) -> Option { + self.incremental.final_offset() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn incremental_test() { + let mut incremental = Incremental::default(); + + assert_eq!(incremental.current_offset(), VarInt::ZERO); + assert_eq!(incremental.final_offset, None); + assert_eq!(incremental.buffered_len(), 0); + + { + let mut chunk: &[u8] = &[1, 2, 3, 4]; + let mut with_chunk = incremental.with_chunk(&mut chunk, false).unwrap(); + + assert_eq!(with_chunk.buffered_len(), 4); + + let mut dest: &mut [u8] = &mut [0; 4]; + let trailer = with_chunk.partial_copy_into(&mut dest).unwrap(); + assert_eq!(&*trailer, &[1, 2, 3, 4]); + + assert_eq!(with_chunk.buffered_len(), 0); + } + + assert_eq!(incremental.current_offset(), VarInt::from_u8(4)); + assert_eq!(incremental.buffered_len(), 0); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/limit.rs b/quic/s2n-quic-core/src/buffer/reader/limit.rs new file mode 100644 index 0000000000..abf16b63a0 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/limit.rs @@ -0,0 +1,101 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{chunk, Chunk, Reader, VarInt}; +use crate::buffer::writer::Chunk as _; + +/// Wraps a reader and limits the amount of that can be read from it +/// +/// This can be used for applying back pressure to the reader with flow control. +pub struct Limit<'a, R: Reader + ?Sized> { + buffered_len: usize, + reader: &'a mut R, +} + +impl<'a, R: Reader + ?Sized> Limit<'a, R> { + #[inline] + pub fn new(reader: &'a mut R, max_buffered_len: usize) -> Self { + let buffered_len = max_buffered_len.min(reader.buffered_len()); + + Self { + buffered_len, + reader, + } + } +} + +impl<'a, R: Reader + ?Sized> Chunk for Limit<'a, R> { + type Error = R::Error; + + #[inline] + fn buffered_len(&self) -> usize { + self.buffered_len + } + + #[inline] + fn read_trailer(&mut self, watermark: usize) -> Result { + let watermark = self.buffered_len.min(watermark); + let trailer = self.reader.read_trailer(watermark)?; + unsafe { + assume!(trailer.len() <= self.buffered_len); + } + self.buffered_len -= trailer.len(); + Ok(trailer) + } + + #[inline] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + let mut dest = dest.limit(self.buffered_len); + let mut dest = dest.tracked(); + let trailer = self.reader.partial_copy_into(&mut dest)?; + let len = dest.written_len() + trailer.len(); + unsafe { + assume!(len <= self.buffered_len); + } + self.buffered_len -= len; + Ok(trailer) + } +} + +impl<'a, R: Reader + ?Sized> Reader for Limit<'a, R> { + #[inline] + fn current_offset(&self) -> VarInt { + self.reader.current_offset() + } + + #[inline] + fn final_offset(&self) -> Option { + self.reader.final_offset() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::stream::testing::Data; + + #[test] + fn max_data_test() { + let mut reader = Data::new(1000); + assert_eq!(reader.buffered_len(), 1000); + + let max_data = 32usize; + + let mut reader = reader.with_max_data(VarInt::from_u8(max_data as _)); + assert_eq!(reader.buffered_len(), max_data); + + let mut dest = &mut [0u8; 16][..]; + let trailer = reader.partial_copy_into(&mut dest).unwrap(); + assert_eq!(trailer.len(), 16); + + assert_eq!(reader.buffered_len(), max_data - 16); + + let mut dest = &mut [0u8; 16][..]; + let trailer = reader.partial_copy_into(&mut dest).unwrap(); + assert_eq!(trailer.len(), 16); + assert!(reader.buffer_is_empty()); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reader/slice.rs b/quic/s2n-quic-core/src/buffer/reader/slice.rs new file mode 100644 index 0000000000..99d9a8d600 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/slice.rs @@ -0,0 +1,103 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{chunk, Chunk, Reader, VarInt}; +use crate::buffer; + +/// Wraps a single slice as a reader. +/// +/// This can be used for scenarios where the entire stream is buffered and known up-front. +#[derive(Debug)] +pub struct Slice<'a, C> { + chunk: &'a mut C, + current_offset: VarInt, + final_offset: VarInt, +} + +impl<'a, C> Slice<'a, C> +where + C: Chunk, +{ + #[inline] + pub fn new(chunk: &'a mut C) -> Result { + let final_offset = VarInt::try_from(chunk.buffered_len()) + .ok() + .ok_or(buffer::Error::OutOfRange)?; + Ok(Self { + chunk, + current_offset: VarInt::ZERO, + final_offset, + }) + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.chunk.buffer_is_empty() + } +} + +impl<'a, C> Chunk for Slice<'a, C> +where + C: Chunk, +{ + type Error = C::Error; + + #[inline] + fn read_trailer(&mut self, watermark: usize) -> Result { + let trailer = self.chunk.read_trailer(watermark)?; + self.current_offset += trailer.len(); + Ok(trailer) + } + + #[inline] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + let len = dest.remaining_capacity(); + let trailer = self.chunk.partial_copy_into(dest)?; + self.current_offset += len; + Ok(trailer) + } + + #[inline] + fn buffered_len(&self) -> usize { + self.chunk.buffered_len() + } +} + +impl<'a, C> Reader for Slice<'a, C> +where + C: Chunk, +{ + #[inline] + fn current_offset(&self) -> VarInt { + self.current_offset + } + + #[inline] + fn final_offset(&self) -> Option { + Some(self.final_offset) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn slice_test() { + let mut chunk: &[u8] = &[1, 2, 3, 4]; + let mut reader = Slice::new(&mut chunk).unwrap(); + + assert_eq!(reader.current_offset(), VarInt::ZERO); + assert_eq!(reader.final_offset(), Some(VarInt::from_u8(4))); + + let mut dest: &mut [u8] = &mut [0; 4]; + let trailer = reader.partial_copy_into(&mut dest).unwrap(); + assert_eq!(&*trailer, &[1, 2, 3, 4]); + + assert_eq!(reader.current_offset(), VarInt::from_u8(4)); + assert!(reader.buffer_is_empty()); + } +} diff --git a/quic/s2n-quic-core/src/buffer/receive_buffer.rs b/quic/s2n-quic-core/src/buffer/receive_buffer.rs index dcb86b5acc..9ce048402f 100644 --- a/quic/s2n-quic-core/src/buffer/receive_buffer.rs +++ b/quic/s2n-quic-core/src/buffer/receive_buffer.rs @@ -4,10 +4,9 @@ //! This module contains data structures for buffering incoming and outgoing data //! in Quic streams. -use crate::varint::VarInt; +use crate::{buffer, varint::VarInt}; use alloc::collections::{vec_deque, VecDeque}; use bytes::BytesMut; -use core::fmt; mod probe; mod request; @@ -19,29 +18,7 @@ mod tests; use request::Request; use slot::Slot; -/// Enumerates error that can occur while inserting data into the Receive Buffer -#[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub enum ReceiveBufferError { - /// An invalid data range was provided - OutOfRange, - /// The provided final size was invalid for the buffer's state - InvalidFin, -} - -#[cfg(feature = "std")] -impl std::error::Error for ReceiveBufferError {} - -impl fmt::Display for ReceiveBufferError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Self::OutOfRange => write!(f, "write extends out of the maximum possible offset"), - Self::InvalidFin => write!( - f, - "write modifies the final offset in a non-compliant manner" - ), - } - } -} +pub use super::Error as ReceiveBufferError; /// The default buffer size for slots that the [`ReceiveBuffer`] uses. /// @@ -300,13 +277,13 @@ impl ReceiveBuffer { /// This can be used for copy-avoidance applications where a packet is received in order and /// doesn't need to be stored temporarily for future packets to unblock the stream. #[inline] - pub fn skip(&mut self, len: usize) -> Result<(), ReceiveBufferError> { + pub fn skip(&mut self, len: VarInt) -> Result<(), ReceiveBufferError> { // zero-length skip is a no-op - ensure!(len > 0, Ok(())); + ensure!(len > VarInt::ZERO, Ok(())); let new_start_offset = self .start_offset - .checked_add(len as u64) + .checked_add(len.as_u64()) .ok_or(ReceiveBufferError::OutOfRange)?; if let Some(final_size) = self.final_size() { @@ -398,48 +375,6 @@ impl ReceiveBuffer { }) } - /// Copies all the available buffered data into the provided `buf`'s remaining capacity. - /// - /// This method is slightly more efficient than [`Self::pop`] when the caller ends up copying - /// the buffered data into another slice, since it avoids a refcount increment/decrement on - /// the contained [`BytesMut`]. - /// - /// The total number of bytes copied is returned. - #[inline] - pub fn copy_into_buf(&mut self, buf: &mut B) -> usize { - use bytes::Buf; - - let mut total = 0; - - loop { - let remaining = buf.remaining_mut(); - // ensure we have enough capacity in the destination buf - ensure!(remaining > 0, total); - - let transform = |buffer: &mut BytesMut, is_final_offset| { - let watermark = buffer.len().min(remaining); - - // copy bytes into the destination buf - buf.put_slice(&buffer[..watermark]); - // advance the chunk rather than splitting to avoid refcount churn - buffer.advance(watermark); - total += watermark; - - (is_final_offset, watermark) - }; - - match self.pop_transform(transform) { - // if we're at the final offset, then no need to keep iterating - Some(true) => break, - Some(false) => continue, - // no more available chunks - None => break, - } - } - - total - } - /// Pops a buffer from the front of the receive queue as long as the `transform` function returns a /// non-empty buffer. #[inline] @@ -707,3 +642,191 @@ impl<'a> Iterator for Drain<'a> { (len, Some(len)) } } + +impl buffer::reader::Chunk for ReceiveBuffer { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.len() + } + + #[inline] + fn buffer_is_empty(&self) -> bool { + self.is_empty() + } + + #[inline] + fn read_trailer( + &mut self, + watermark: usize, + ) -> Result { + if let Some(chunk) = self.pop_watermarked(watermark) { + return Ok(chunk.into()); + } + + Ok(Default::default()) + } + + #[inline] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + let mut prev = BytesMut::new(); + + loop { + let remaining = dest.remaining_capacity(); + // ensure we have enough capacity in the destination buf + ensure!(remaining > 0, Ok(Default::default())); + + match self.pop_watermarked(remaining) { + Some(chunk) => { + let mut prev = core::mem::replace(&mut prev, chunk); + if !prev.is_empty() { + let _: Result<(), core::convert::Infallible> = prev.copy_into(dest); + } + } + None if prev.is_empty() => { + return Ok(Default::default()); + } + None => { + return Ok(prev.into()); + } + } + } + } + + #[inline] + fn copy_into( + &mut self, + dest: &mut Dest, + ) -> Result<(), Self::Error> { + loop { + let remaining = dest.remaining_capacity(); + // ensure we have enough capacity in the destination buf + ensure!(remaining > 0, Ok(())); + + let transform = |buffer: &mut BytesMut, _is_final_offset| { + let len = buffer.len().min(remaining); + let _: Result<(), core::convert::Infallible> = buffer.copy_into(dest); + ((), len) + }; + + if self.pop_transform(transform).is_none() { + return Ok(()); + } + } + } +} + +impl buffer::reader::Reader for ReceiveBuffer { + #[inline] + fn current_offset(&self) -> VarInt { + unsafe { + // SAFETY: offset will always fit into a VarInt + VarInt::new_unchecked(self.start_offset) + } + } + + #[inline] + fn final_offset(&self) -> Option { + self.final_size().map(|v| unsafe { + // SAFETY: offset will always fit into a VarInt + VarInt::new_unchecked(v) + }) + } +} + +impl buffer::writer::Writer for ReceiveBuffer { + #[inline] + fn copy_from( + &mut self, + reader: &mut R, + ) -> Result<(), buffer::Error> { + let offset = reader.current_offset(); + let final_offset = reader.final_offset(); + let is_fin = reader.has_buffered_fin(); + + // optimize for the case where the stream consists of a single chunk + { + let mut is_single_chunk_stream = true; + + is_single_chunk_stream &= offset == VarInt::ZERO; + is_single_chunk_stream &= is_fin; + is_single_chunk_stream &= self.consumed_len() == 0; + is_single_chunk_stream &= self.slots.is_empty(); + + if is_single_chunk_stream { + let payload_len = reader.buffered_len(); + let final_offset = final_offset.expect("mis-reporting is_fin"); + let end = final_offset.as_u64(); + + // don't allocate anything if we don't need to + if payload_len == 0 { + let trailer = reader.read_trailer(0)?; + debug_assert!(trailer.is_empty()); + } else { + let mut data = BytesMut::with_capacity(payload_len); + + // copy the whole thing into `data` + reader.copy_into(&mut data)?; + + self.slots.push_back(Slot::new(offset.as_u64(), end, data)); + }; + + // update the final offset after everything was read correctly + self.final_offset = end; + self.invariants(); + + return Ok(()); + } + } + + // TODO add better support for copy avoidance by iterating to the appropriate slot and + // copying into that, if possible + + // fall back to copying the trailers into the receive buffer + let mut first_write = true; + loop { + let offset = reader.current_offset(); + let trailer = reader.read_trailer(usize::MAX)?; + + if let Some(offset) = final_offset.filter(|_| first_write) { + // record the final size before writing to avoid excess allocation + self.write_at_fin(offset, &[]) + .map_err(buffer::Error::mapped)?; + } + + // TODO maybe specialize on BytesMut trailers? - for now we'll just treat them as + // slices + self.write_at(offset, &trailer) + .map_err(buffer::Error::mapped)?; + + first_write = false; + + if reader.buffer_is_empty() { + break; + } + } + + Ok(()) + } +} + +impl buffer::Duplex for ReceiveBuffer { + #[inline] + fn skip( + &mut self, + len: VarInt, + final_offset: Option, + ) -> Result<(), ReceiveBufferError> { + if let Some(offset) = final_offset { + self.write_at_fin(offset, &[])?; + } + + (*self).skip(len)?; + + Ok(()) + } +} diff --git a/quic/s2n-quic-core/src/buffer/receive_buffer/tests.rs b/quic/s2n-quic-core/src/buffer/receive_buffer/tests.rs index bef282f0b0..4dda92fbbf 100644 --- a/quic/s2n-quic-core/src/buffer/receive_buffer/tests.rs +++ b/quic/s2n-quic-core/src/buffer/receive_buffer/tests.rs @@ -20,7 +20,7 @@ enum Op { watermark: Option, }, Skip { - len: usize, + len: VarInt, }, } @@ -59,8 +59,8 @@ fn model_test() { let consumed_len = buffer.consumed_len(); if buffer.skip(len).is_ok() { let new_consumed_len = buffer.consumed_len(); - assert_eq!(new_consumed_len, consumed_len + len as u64); - recv.seek_forward(len); + assert_eq!(new_consumed_len, consumed_len + len.as_u64()); + recv.seek_forward(len.as_u64()); } } } @@ -92,15 +92,18 @@ fn write_and_pop() { #[test] #[cfg_attr(miri, ignore)] // This test is too expensive for miri to complete in a reasonable amount of time fn write_and_copy_into_buf() { + use crate::buffer::reader::Chunk; + let mut buffer = ReceiveBuffer::new(); let mut offset = VarInt::default(); - let mut output = vec![]; + let mut output: Vec = vec![]; for len in 0..10000 { + dbg!(len, offset); let chunk = Data::send_one_at(offset.as_u64(), len); buffer.write_at(offset, &chunk).unwrap(); offset += chunk.len(); - let copied_len = buffer.copy_into_buf(&mut output); - assert_eq!(copied_len, chunk.len()); + buffer.copy_into(&mut output).unwrap(); + assert_eq!(output.len(), chunk.len()); assert_eq!(&output[..], &chunk[..]); output.clear(); } diff --git a/quic/s2n-quic-core/src/buffer/writer.rs b/quic/s2n-quic-core/src/buffer/writer.rs new file mode 100644 index 0000000000..f7a481df7f --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer.rs @@ -0,0 +1,11 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +pub mod chunk; + +pub use chunk::Chunk; + +pub trait Writer { + fn copy_from(&mut self, reader: &mut R) + -> Result<(), super::Error>; +} diff --git a/quic/s2n-quic-core/src/buffer/writer/chunk.rs b/quic/s2n-quic-core/src/buffer/writer/chunk.rs new file mode 100644 index 0000000000..b9fddb9b25 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/chunk.rs @@ -0,0 +1,75 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::buffer::reader::chunk::Trailer; +use bytes::{buf::UninitSlice, Bytes, BytesMut}; + +mod buf; +mod byte_queue; +mod discard; +mod empty; +mod limit; +mod tracked; +mod uninit_slice; + +pub use buf::BufMut; +pub use discard::Discard; +pub use empty::Empty; +pub use limit::Limit; +pub use tracked::Tracked; + +pub trait Chunk { + const SPECIALIZES_BYTES: bool = false; + const SPECIALIZES_BYTES_MUT: bool = false; + + fn put_slice(&mut self, bytes: &[u8]); + + #[inline(always)] + fn put_uninit_slice(&mut self, payload_len: usize, f: F) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + // we can specialize on an empty payload + ensure!(payload_len == 0, Ok(false)); + + f(UninitSlice::new(&mut []))?; + + Ok(true) + } + + fn remaining_capacity(&self) -> usize; + + #[inline] + fn has_remaining_capacity(&self) -> bool { + self.remaining_capacity() > 0 + } + + #[inline] + fn put_bytes(&mut self, bytes: Bytes) { + self.put_slice(&bytes); + } + + #[inline] + fn put_bytes_mut(&mut self, bytes: BytesMut) { + self.put_slice(&bytes); + } + + #[inline] + fn put_trailer(&mut self, trailer: Trailer) { + match trailer { + Trailer::Slice(v) => self.put_slice(v), + Trailer::Bytes(v) => self.put_bytes(v), + Trailer::BytesMut(v) => self.put_bytes_mut(v), + } + } + + #[inline] + fn limit(&mut self, max_len: usize) -> Limit { + Limit::new(self, max_len) + } + + #[inline] + fn tracked(&mut self) -> Tracked { + Tracked::new(self) + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer/chunk/buf.rs b/quic/s2n-quic-core/src/buffer/writer/chunk/buf.rs new file mode 100644 index 0000000000..8757c236a8 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/chunk/buf.rs @@ -0,0 +1,111 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Chunk, UninitSlice}; + +pub struct BufMut<'a, T: bytes::BufMut> { + buf_mut: &'a mut T, +} + +impl<'a, T: bytes::BufMut> BufMut<'a, T> { + #[inline] + pub fn new(buf_mut: &'a mut T) -> Self { + Self { buf_mut } + } +} + +impl<'a, T: bytes::BufMut> Chunk for BufMut<'a, T> { + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + self.buf_mut.put_slice(bytes); + } + + #[inline] + fn remaining_capacity(&self) -> usize { + self.buf_mut.remaining_mut() + } + + #[inline] + fn put_uninit_slice(&mut self, payload_len: usize, f: F) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + let chunk = self.buf_mut.chunk_mut(); + ensure!(chunk.len() >= payload_len, Ok(false)); + + f(&mut chunk[..payload_len])?; + + unsafe { + self.buf_mut.advance_mut(payload_len); + } + + Ok(true) + } +} + +macro_rules! impl_buf_mut { + ($ty:ty $(, $reserve:ident)?) => { + impl Chunk for $ty { + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + bytes::BufMut::put_slice(self, bytes); + } + + #[inline] + fn remaining_capacity(&self) -> usize { + bytes::BufMut::remaining_mut(self) + } + + #[inline] + fn put_uninit_slice( + &mut self, + payload_len: usize, + f: F, + ) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + use bytes::BufMut; + + $( + self.$reserve(payload_len); + )? + + let chunk = self.chunk_mut(); + ensure!(chunk.len() >= payload_len, Ok(false)); + + f(&mut chunk[..payload_len])?; + + unsafe { + self.advance_mut(payload_len); + } + + Ok(true) + } + } + }; +} + +impl_buf_mut!(bytes::BytesMut, reserve); +impl_buf_mut!(alloc::vec::Vec, reserve); +impl_buf_mut!(&mut [u8]); +impl_buf_mut!(&mut [core::mem::MaybeUninit]); + +#[cfg(test)] +mod tests { + use crate::buffer::{reader::Chunk as _, writer::Chunk as _}; + + #[test] + fn vec_test() { + let mut buffer: Vec = vec![]; + assert_eq!(buffer.remaining_capacity(), isize::MAX as usize); + + let expected = &b"hello world!"[..]; + + let mut source = expected; + + source.copy_into(&mut buffer).unwrap(); + + assert_eq!(&buffer, expected); + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer/chunk/byte_queue.rs b/quic/s2n-quic-core/src/buffer/writer/chunk/byte_queue.rs new file mode 100644 index 0000000000..0a8d6f21b1 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/chunk/byte_queue.rs @@ -0,0 +1,72 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Bytes, BytesMut, Chunk}; +use alloc::{collections::VecDeque, vec::Vec}; + +macro_rules! impl_queue { + ($ty:ident, $push:ident) => { + impl Chunk for $ty { + const SPECIALIZES_BYTES: bool = true; + const SPECIALIZES_BYTES_MUT: bool = true; + + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + self.put_bytes(Bytes::copy_from_slice(bytes)); + } + + #[inline] + fn remaining_capacity(&self) -> usize { + usize::MAX + } + + #[inline] + fn has_remaining_capacity(&self) -> bool { + true + } + + #[inline] + fn put_bytes(&mut self, bytes: Bytes) { + self.$push(bytes); + } + + #[inline] + fn put_bytes_mut(&mut self, bytes: BytesMut) { + self.$push(bytes.freeze()); + } + } + + impl Chunk for $ty { + const SPECIALIZES_BYTES_MUT: bool = true; + + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + self.put_bytes_mut(BytesMut::from(bytes)); + } + + #[inline] + fn remaining_capacity(&self) -> usize { + usize::MAX + } + + #[inline] + fn has_remaining_capacity(&self) -> bool { + true + } + + #[inline] + fn put_bytes(&mut self, bytes: Bytes) { + // we can't convert Bytes into BytesMut so we'll need to copy it + self.put_slice(&bytes); + } + + #[inline] + fn put_bytes_mut(&mut self, bytes: BytesMut) { + self.$push(bytes); + } + } + }; +} + +impl_queue!(Vec, push); +impl_queue!(VecDeque, push_back); diff --git a/quic/s2n-quic-core/src/buffer/writer/chunk/discard.rs b/quic/s2n-quic-core/src/buffer/writer/chunk/discard.rs new file mode 100644 index 0000000000..44cfc79a41 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/chunk/discard.rs @@ -0,0 +1,17 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +#[derive(Clone, Copy, Debug, Default)] +pub struct Discard; + +impl super::Chunk for Discard { + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + let _ = bytes; + } + + #[inline] + fn remaining_capacity(&self) -> usize { + usize::MAX + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer/chunk/empty.rs b/quic/s2n-quic-core/src/buffer/writer/chunk/empty.rs new file mode 100644 index 0000000000..f86d673f83 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/chunk/empty.rs @@ -0,0 +1,22 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::Chunk; + +#[derive(Clone, Copy, Debug, Default)] +pub struct Empty; + +impl Chunk for Empty { + #[inline] + fn put_slice(&mut self, slice: &[u8]) { + debug_assert!( + slice.is_empty(), + "cannot put a non-empty slice in empty writer chunk" + ); + } + + #[inline] + fn remaining_capacity(&self) -> usize { + 0 + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer/chunk/limit.rs b/quic/s2n-quic-core/src/buffer/writer/chunk/limit.rs new file mode 100644 index 0000000000..e035354cfa --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/chunk/limit.rs @@ -0,0 +1,80 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Chunk, Trailer}; +use bytes::{buf::UninitSlice, Bytes, BytesMut}; + +pub struct Limit<'a, C: Chunk + ?Sized> { + chunk: &'a mut C, + remaining_capacity: usize, +} + +impl<'a, C: Chunk + ?Sized> Limit<'a, C> { + #[inline] + pub fn new(chunk: &'a mut C, remaining_capacity: usize) -> Self { + let remaining_capacity = chunk.remaining_capacity().min(remaining_capacity); + Self { + chunk, + remaining_capacity, + } + } +} + +impl<'a, C: Chunk + ?Sized> Chunk for Limit<'a, C> { + const SPECIALIZES_BYTES: bool = C::SPECIALIZES_BYTES; + const SPECIALIZES_BYTES_MUT: bool = C::SPECIALIZES_BYTES_MUT; + + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + debug_assert!(bytes.len() <= self.remaining_capacity); + self.chunk.put_slice(bytes); + self.remaining_capacity -= bytes.len(); + } + + #[inline(always)] + fn put_uninit_slice(&mut self, payload_len: usize, f: F) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + debug_assert!(payload_len <= self.remaining_capacity); + let did_write = self.chunk.put_uninit_slice(payload_len, f)?; + if did_write { + self.remaining_capacity -= payload_len; + } + Ok(did_write) + } + + #[inline] + fn remaining_capacity(&self) -> usize { + self.chunk.remaining_capacity().min(self.remaining_capacity) + } + + #[inline] + fn has_remaining_capacity(&self) -> bool { + self.remaining_capacity > 0 && self.chunk.has_remaining_capacity() + } + + #[inline] + fn put_bytes(&mut self, bytes: Bytes) { + let len = bytes.len(); + debug_assert!(len <= self.remaining_capacity); + self.chunk.put_bytes(bytes); + self.remaining_capacity -= len; + } + + #[inline] + fn put_bytes_mut(&mut self, bytes: BytesMut) { + let len = bytes.len(); + debug_assert!(len <= self.remaining_capacity); + self.chunk.put_bytes_mut(bytes); + self.remaining_capacity -= len; + } + + #[inline] + fn put_trailer(&mut self, trailer: Trailer) { + let len = trailer.len(); + debug_assert!(len <= self.remaining_capacity); + self.chunk.put_trailer(trailer); + self.remaining_capacity -= len; + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer/chunk/tracked.rs b/quic/s2n-quic-core/src/buffer/writer/chunk/tracked.rs new file mode 100644 index 0000000000..c0a3f2b188 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/chunk/tracked.rs @@ -0,0 +1,76 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{Chunk, Trailer}; +use bytes::{buf::UninitSlice, Bytes, BytesMut}; + +pub struct Tracked<'a, C: Chunk + ?Sized> { + chunk: &'a mut C, + written: usize, +} + +impl<'a, C: Chunk + ?Sized> Tracked<'a, C> { + #[inline] + pub fn new(chunk: &'a mut C) -> Self { + Self { chunk, written: 0 } + } + + #[inline] + pub fn written_len(&self) -> usize { + self.written + } +} + +impl<'a, C: Chunk + ?Sized> Chunk for Tracked<'a, C> { + const SPECIALIZES_BYTES: bool = C::SPECIALIZES_BYTES; + const SPECIALIZES_BYTES_MUT: bool = C::SPECIALIZES_BYTES_MUT; + + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + self.chunk.put_slice(bytes); + self.written += bytes.len(); + } + + #[inline(always)] + fn put_uninit_slice(&mut self, payload_len: usize, f: F) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + let did_write = self.chunk.put_uninit_slice(payload_len, f)?; + if did_write { + self.written += payload_len; + } + Ok(did_write) + } + + #[inline] + fn remaining_capacity(&self) -> usize { + self.chunk.remaining_capacity() + } + + #[inline] + fn has_remaining_capacity(&self) -> bool { + self.chunk.has_remaining_capacity() + } + + #[inline] + fn put_bytes(&mut self, bytes: Bytes) { + let len = bytes.len(); + self.chunk.put_bytes(bytes); + self.written += len; + } + + #[inline] + fn put_bytes_mut(&mut self, bytes: BytesMut) { + let len = bytes.len(); + self.chunk.put_bytes_mut(bytes); + self.written += len; + } + + #[inline] + fn put_trailer(&mut self, trailer: Trailer) { + let len = trailer.len(); + self.chunk.put_trailer(trailer); + self.written += len; + } +} diff --git a/quic/s2n-quic-core/src/buffer/writer/chunk/uninit_slice.rs b/quic/s2n-quic-core/src/buffer/writer/chunk/uninit_slice.rs new file mode 100644 index 0000000000..66b08fea76 --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/writer/chunk/uninit_slice.rs @@ -0,0 +1,36 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::Chunk; +use bytes::buf::UninitSlice; + +impl Chunk for &mut UninitSlice { + #[inline] + fn put_slice(&mut self, bytes: &[u8]) { + self[..bytes.len()].copy_from_slice(bytes); + let empty = UninitSlice::new(&mut []); + let next = core::mem::replace(self, empty); + *self = &mut next[bytes.len()..]; + } + + #[inline] + fn put_uninit_slice(&mut self, payload_len: usize, f: F) -> Result + where + F: FnOnce(&mut UninitSlice) -> Result<(), Error>, + { + ensure!(self.len() >= payload_len, Ok(false)); + + f(&mut self[..payload_len])?; + + let empty = UninitSlice::new(&mut []); + let next = core::mem::replace(self, empty); + *self = &mut next[payload_len..]; + + Ok(true) + } + + #[inline] + fn remaining_capacity(&self) -> usize { + self.len() + } +} diff --git a/quic/s2n-quic-core/src/stream/testing.rs b/quic/s2n-quic-core/src/stream/testing.rs index d815b9f53f..e5e5e412eb 100644 --- a/quic/s2n-quic-core/src/stream/testing.rs +++ b/quic/s2n-quic-core/src/stream/testing.rs @@ -3,14 +3,12 @@ //! A model that ensures stream data is correctly sent and received between peers -#[cfg(feature = "generator")] -use bolero_generator::*; - -#[cfg(test)] -use bolero::generator::*; - +use crate::buffer::reader; use bytes::Bytes; +#[cfg(any(test, feature = "generator"))] +use bolero_generator::*; + static DATA: Bytes = { const INNER: [u8; DATA_LEN] = { let mut data = [0; DATA_LEN]; @@ -125,7 +123,7 @@ impl Data { let amount = ((self.len - self.offset) as usize).min(amount); let chunk = Self::send_one_at(self.offset, amount); - self.seek_forward(chunk.len()); + self.seek_forward(chunk.len() as u64); Some(chunk) } @@ -147,8 +145,56 @@ impl Data { } /// Moves the current offset forward by the provided `len` - pub fn seek_forward(&mut self, len: usize) { - self.offset += len as u64; + pub fn seek_forward(&mut self, len: u64) { + self.offset += len; + } +} + +impl reader::Chunk for Data { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + (self.len - self.offset).try_into().unwrap() + } + + #[inline] + fn read_trailer(&mut self, watermark: usize) -> Result { + if let Some(chunk) = self.send_one(watermark) { + return Ok(chunk.into()); + } + + Ok(Default::default()) + } + + #[inline] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result { + while let Some(chunk) = self.send_one(dest.remaining_capacity()) { + // if the chunk matches the destination then it's a trailer + if chunk.len() == dest.remaining_capacity() || self.is_finished() { + return Ok(chunk.into()); + } + + // otherwise copy the chunk into the destination + dest.put_bytes(chunk); + } + + Ok(Default::default()) + } +} + +impl reader::Reader for Data { + #[inline] + fn current_offset(&self) -> crate::varint::VarInt { + self.offset().try_into().unwrap() + } + + #[inline] + fn final_offset(&self) -> Option { + Some(self.len.try_into().unwrap()) } } diff --git a/quic/s2n-quic-transport/src/stream/receive_stream.rs b/quic/s2n-quic-transport/src/stream/receive_stream.rs index b2a7d911e0..6fec7f3287 100644 --- a/quic/s2n-quic-transport/src/stream/receive_stream.rs +++ b/quic/s2n-quic-transport/src/stream/receive_stream.rs @@ -447,11 +447,12 @@ impl ReceiveStream { // If this is the last frame then inform the receive_buffer so it can check for any // final size errors. - let write_result = if frame.is_fin { - self.receive_buffer.write_at_fin(frame.offset, frame.data) - } else { - self.receive_buffer.write_at(frame.offset, frame.data) - }; + let write_result: Result<(), StreamReceiveBufferError> = + if frame.is_fin { + self.receive_buffer.write_at_fin(frame.offset, frame.data) + } else { + self.receive_buffer.write_at(frame.offset, frame.data) + }; write_result.map_err(|error| { match error { @@ -470,6 +471,8 @@ impl ReceiveStream { //# of type FINAL_SIZE_ERROR; see Section 11 for details on error //# handling. StreamReceiveBufferError::InvalidFin => transport::Error::FINAL_SIZE_ERROR, + // the reader is infallible + StreamReceiveBufferError::ReaderError(_) => unreachable!(), } .with_reason("data reception error") .with_frame_type(frame.tag().into()) diff --git a/scripts/copyright_check b/scripts/copyright_check index e2003e87de..1846e45b8c 100755 --- a/scripts/copyright_check +++ b/scripts/copyright_check @@ -7,7 +7,7 @@ set -e -S2N_QUIC_FILES=$(find "$PWD" -type f \( -name "*.rs" -o -name "*.py" \) -not \( -path "*/s2n-quic/target/*" -o -path "*/s2n-tls-sys/s2n/*" \)) +S2N_QUIC_FILES=$(find "$PWD" -type f \( -name "*.rs" -o -name "*.py" \) -not -path "*/target/*") FAILED=0