diff --git a/quic/s2n-quic-core/src/buffer/reader.rs b/quic/s2n-quic-core/src/buffer/reader.rs index 6382f5a434..655c132894 100644 --- a/quic/s2n-quic-core/src/buffer/reader.rs +++ b/quic/s2n-quic-core/src/buffer/reader.rs @@ -44,6 +44,27 @@ pub trait Reader: Storage { .map_or(false, |fin| fin == self.current_offset()) } + /// Skips the data in the reader until `offset` is reached, or the reader storage is exhausted. + #[inline] + fn skip_until(&mut self, offset: VarInt) -> Result<(), Self::Error> { + ensure!(offset > self.current_offset(), Ok(())); + + while let Some(len) = offset.checked_sub(self.current_offset()) { + let len = len.as_u64(); + + // we don't need to skip anything if the difference is 0 + ensure!(len > 0, break); + + // clamp the len to usize + let len = (usize::MAX as u64).min(len) as usize; + let _chunk = self.read_chunk(len)?; + + ensure!(!self.buffer_is_empty(), break); + } + + Ok(()) + } + /// Limits the maximum offset that the caller can read from the reader #[inline] fn with_max_data(&mut self, max_data: VarInt) -> Limit { diff --git a/quic/s2n-quic-core/src/buffer/reader/storage.rs b/quic/s2n-quic-core/src/buffer/reader/storage.rs index 803568f737..304e0556df 100644 --- a/quic/s2n-quic-core/src/buffer/reader/storage.rs +++ b/quic/s2n-quic-core/src/buffer/reader/storage.rs @@ -23,7 +23,7 @@ pub use io_slice::IoSlice; pub use tracked::Tracked; pub trait Storage { - type Error; + type Error: 'static; /// Returns the length of the chunk fn buffered_len(&self) -> usize; diff --git a/quic/s2n-quic-core/src/buffer/reassembler.rs b/quic/s2n-quic-core/src/buffer/reassembler.rs index 8a69df1a3e..3c643952ce 100644 --- a/quic/s2n-quic-core/src/buffer/reassembler.rs +++ b/quic/s2n-quic-core/src/buffer/reassembler.rs @@ -3,8 +3,10 @@ //! This module contains data structures for buffering incoming streams. -use super::Error; -use crate::varint::VarInt; +use crate::{ + buffer::{Error, Reader}, + varint::VarInt, +}; use alloc::collections::{vec_deque, VecDeque}; use bytes::BytesMut; @@ -76,29 +78,35 @@ const UNKNOWN_FINAL_SIZE: u64 = u64::MAX; /// // they will be returned in combined fashion. /// assert_eq!(&[0u8, 1, 2, 3, 4, 5, 6, 7], &buffer.pop().unwrap()[..]); /// ``` -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Default)] pub struct Reassembler { slots: VecDeque, + cursors: Cursors, +} + +#[derive(Clone, Copy, Debug, PartialEq)] +struct Cursors { start_offset: u64, max_recv_offset: u64, final_offset: u64, } -impl Default for Reassembler { +impl Default for Cursors { + #[inline] fn default() -> Self { - Self::new() + Self { + start_offset: 0, + max_recv_offset: 0, + final_offset: UNKNOWN_FINAL_SIZE, + } } } impl Reassembler { /// Creates a new `Reassembler` + #[inline] pub fn new() -> Reassembler { - Reassembler { - slots: VecDeque::new(), - start_offset: 0, - max_recv_offset: 0, - final_offset: UNKNOWN_FINAL_SIZE, - } + Self::default() } /// Returns true if the buffer has completely been written to and the final size is known @@ -112,16 +120,16 @@ impl Reassembler { #[inline] pub fn is_reading_complete(&self) -> bool { self.final_size() - .map_or(false, |len| self.start_offset == len) + .map_or(false, |len| self.cursors.start_offset == len) } /// Returns the final size of the stream, if known #[inline] pub fn final_size(&self) -> Option { - if self.final_offset == UNKNOWN_FINAL_SIZE { + if self.cursors.final_offset == UNKNOWN_FINAL_SIZE { None } else { - Some(self.final_offset) + Some(self.cursors.final_offset) } } @@ -137,7 +145,7 @@ impl Reassembler { #[inline] pub fn is_empty(&self) -> bool { if let Some(slot) = self.slots.front() { - !slot.is_occupied(self.start_offset) + !slot.is_occupied(self.cursors.start_offset) } else { true } @@ -158,115 +166,236 @@ impl Reassembler { /// Pushes a slice at a certain offset #[inline] pub fn write_at(&mut self, offset: VarInt, data: &[u8]) -> Result<(), Error> { - // create a request - let request = Request::new(offset, data)?; - self.write_request(request)?; + let mut request = Request::new(offset, data, false)?; + self.write_reader(&mut request)?; Ok(()) } /// Pushes a slice at a certain offset, which is the end of the buffer #[inline] pub fn write_at_fin(&mut self, offset: VarInt, data: &[u8]) -> Result<(), Error> { - // create a request - let request = Request::new(offset, data)?; - - // compute the final offset for the fin request - let final_offset = request.end_exclusive(); + let mut request = Request::new(offset, data, true)?; + self.write_reader(&mut request)?; + Ok(()) + } - // make sure if we previously saw a final size that they still match - //= https://www.rfc-editor.org/rfc/rfc9000#section-4.5 - //# Once a final size for a stream is known, it cannot change. If a - //# RESET_STREAM or STREAM frame is received indicating a change in the - //# final size for the stream, an endpoint SHOULD respond with an error - //# of type FINAL_SIZE_ERROR; see Section 11 for details on error - //# handling. - if let Some(final_size) = self.final_size() { - ensure!(final_size == final_offset, Err(Error::InvalidFin)); + #[inline] + pub fn write_reader(&mut self, reader: &mut R) -> Result<(), Error> + where + R: Reader + ?Sized, + { + // Trims off any data that has already been received + reader.skip_until(self.current_offset())?; + + // store a snapshot of the cursors in case there's an error + let snapshot = self.cursors; + + self.check_reader_fin(reader)?; + + if let Err(err) = self.write_reader_impl(reader) { + use core::any::TypeId; + if TypeId::of::() != TypeId::of::() { + self.cursors = snapshot; + } + return Err(Error::ReaderError(err)); } - // make sure that we didn't see any previous chunks greater than the final size - ensure!(self.max_recv_offset <= final_offset, Err(Error::InvalidFin)); - - self.final_offset = final_offset; - - self.write_request(request)?; + self.invariants(); Ok(()) } + /// Ensures the final offset doesn't change #[inline] - fn write_request(&mut self, request: Request) -> Result<(), Error> { - // trim off any data that we've already read - let (_, request) = request.split(self.start_offset); - // trim off any data that exceeds our final length - let (mut request, excess) = request.split(self.final_offset); + fn check_reader_fin(&mut self, reader: &mut R) -> Result<(), Error> + where + R: Reader + ?Sized, + { + let buffered_offset = reader + .current_offset() + .checked_add_usize(reader.buffered_len()) + .ok_or(Error::OutOfRange)? + .as_u64(); - // make sure the request isn't trying to write beyond the final size //= https://www.rfc-editor.org/rfc/rfc9000#section-4.5 //# Once a final size for a stream is known, it cannot change. If a //# RESET_STREAM or STREAM frame is received indicating a change in the //# final size for the stream, an endpoint SHOULD respond with an error //# of type FINAL_SIZE_ERROR; see Section 11 for details on error //# handling. - ensure!(excess.is_empty(), Err(Error::InvalidFin)); + match (reader.final_offset(), self.final_size()) { + (Some(actual), Some(expected)) => { + ensure!(actual == expected, Err(Error::InvalidFin)); + } + (Some(final_offset), None) => { + let final_offset = final_offset.as_u64(); - // if the request is empty we're done - ensure!(!request.is_empty(), Ok(())); + // make sure that we didn't see any previous chunks greater than the final size + ensure!( + self.cursors.max_recv_offset <= final_offset, + Err(Error::InvalidFin) + ); + + self.cursors.final_offset = final_offset; + } + (None, Some(expected)) => { + // make sure the reader doesn't exceed a previously known final offset + ensure!(expected >= buffered_offset, Err(Error::InvalidFin)); + } + (None, None) => {} + } // record the maximum offset that we've seen - self.max_recv_offset = self.max_recv_offset.max(request.end_exclusive()); + self.cursors.max_recv_offset = self.cursors.max_recv_offset.max(buffered_offset); + + Ok(()) + } + + #[inline(always)] + fn write_reader_impl(&mut self, reader: &mut R) -> Result<(), R::Error> + where + R: Reader + ?Sized, + { + // if the reader is empty at this point, just make sure it doesn't return an error + if reader.buffer_is_empty() { + let _chunk = reader.read_chunk(0)?; + return Ok(()); + } + + let mut selected = None; // start from the back with the assumption that most data arrives in order - for mut idx in (0..self.slots.len()).rev() { - unsafe { - assume!(self.slots.len() > idx); - } - let slot = &mut self.slots[idx]; + for idx in (0..self.slots.len()).rev() { + let Some(slot) = self.slots.get(idx) else { + debug_assert!(false); + unsafe { + // SAFETY: `idx` should always be in bounds, since it's generated by the range + // `0..slots.len()` + core::hint::unreachable_unchecked() + } + }; - let slot::Outcome { lower, mid, upper } = slot.try_write(request); + // find the first slot that we can write into + ensure!(slot.start() <= reader.current_offset().as_u64(), continue); - // if this slot was completed, we should try and unsplit with the next slot - if slot.is_full() { - let current_block = - Self::align_offset(slot.start(), Self::allocation_size(slot.start())); - let end = slot.end(); + selected = Some(idx); + break; + } - if let Some(next) = self.slots.get(idx + 1) { - let next_block = - Self::align_offset(next.start(), Self::allocation_size(next.start())); + let idx = if let Some(idx) = selected { + idx + } else { + let mut idx = 0; + // set the current request to the upper slot and loop + let mut slot = self.allocate_slot(reader); - if next.start() == end && current_block == next_block { - unsafe { - assume!(self.slots.len() > idx + 1); - } - if let Some(next) = self.slots.remove(idx + 1) { - self.slots[idx].unsplit(next); - } - } - } + // before pushing the slot, make sure the reader doesn't fail + let filled = slot.try_write_reader(reader, &mut true)?; + + if let Some(slot) = filled { + self.slots.push_front(slot); + idx += 1; } + self.slots.push_front(slot); - idx += 1; - self.allocate_request(idx, upper); + ensure!(!reader.buffer_is_empty(), Ok(())); - if let Some(mid) = mid { - self.insert(idx, mid); + idx + }; + + self.write_reader_at(reader, idx)?; + Ok(()) + } + + #[inline(always)] + fn write_reader_at(&mut self, reader: &mut R, mut idx: usize) -> Result<(), R::Error> + where + R: Reader + ?Sized, + { + let initial_idx = idx; + let mut filled_slot = false; + + unsafe { + assume!( + !reader.buffer_is_empty(), + "the first write should always be non-empty" + ); + } + + while !reader.buffer_is_empty() { + let Some(slot) = self.slots.get_mut(idx) else { + debug_assert!(false); + unsafe { core::hint::unreachable_unchecked() } + }; + + let filled = slot.try_write_reader(reader, &mut filled_slot)?; + + idx += 1; + if let Some(slot) = filled { + self.insert(idx, slot); + idx += 1; } - request = lower; + ensure!(!reader.buffer_is_empty(), break); - if request.is_empty() { - break; + if let Some(next) = self.slots.get(idx) { + // the next slot is able to handle the reader + if next.start() <= reader.current_offset().as_u64() { + continue; + } } - } - self.allocate_request(0, request); + let slot = self.allocate_slot(reader); + self.insert(idx, slot); + continue; + } - self.invariants(); + // only try unsplitting if we filled at least one spot + if filled_slot { + self.unsplit_range(initial_idx..idx); + } Ok(()) } + #[inline] + fn unsplit_range(&mut self, range: core::ops::Range) { + // try to merge all of the slots that were modified + for idx in range.rev() { + let Some(slot) = self.slots.get(idx) else { + debug_assert!(false); + unsafe { + // SAFETY: `idx` should always be in bounds, since it's provided by a range + // that was bound to `slots.len()` + core::hint::unreachable_unchecked() + } + }; + + // if this slot was completed, we should try and unsplit with the next slot + ensure!(slot.is_full(), continue); + + let start = slot.start(); + let end = slot.end(); + + let Some(next) = self.slots.get(idx + 1) else { + continue; + }; + + ensure!(next.start() == end, continue); + + let current_block = Self::align_offset(start, Self::allocation_size(start)); + let next_block = Self::align_offset(next.start(), Self::allocation_size(next.start())); + ensure!(current_block == next_block, continue); + + if let Some(next) = self.slots.remove(idx + 1) { + self.slots[idx].unsplit(next); + } else { + debug_assert!(false, "idx + 1 was checked above"); + unsafe { core::hint::unreachable_unchecked() } + } + } + } + /// Advances the read and write cursors and discards any held data /// /// This can be used for copy-avoidance applications where a packet is received in order and @@ -277,6 +406,7 @@ impl Reassembler { ensure!(len > VarInt::ZERO, Ok(())); let new_start_offset = self + .cursors .start_offset .checked_add(len.as_u64()) .ok_or(Error::OutOfRange)?; @@ -286,10 +416,10 @@ impl Reassembler { } // record the maximum offset that we've seen - self.max_recv_offset = self.max_recv_offset.max(new_start_offset); + self.cursors.max_recv_offset = self.cursors.max_recv_offset.max(new_start_offset); // update the current start offset - self.start_offset = new_start_offset; + self.cursors.start_offset = new_start_offset; // clear out the slots to the new start offset while let Some(mut slot) = self.slots.pop_front() { @@ -377,9 +507,9 @@ impl Reassembler { let slot = self.slots.front_mut()?; // make sure the slot has some data - ensure!(slot.is_occupied(self.start_offset), None); + ensure!(slot.is_occupied(self.cursors.start_offset), None); - let is_final_offset = self.final_offset == slot.end(); + let is_final_offset = self.cursors.final_offset == slot.end(); let buffer = slot.data_mut(); let (out, len) = transform(buffer, is_final_offset); @@ -394,9 +524,9 @@ impl Reassembler { self.slots.pop_front(); } - probe::pop(self.start_offset, len); + probe::pop(self.cursors.start_offset, len); - self.start_offset += len as u64; + self.cursors.start_offset += len as u64; self.invariants(); @@ -407,7 +537,7 @@ impl Reassembler { /// receive buffer. #[inline] pub fn consumed_len(&self) -> u64 { - self.start_offset + self.cursors.start_offset } /// Returns the total amount of contiguous received data. @@ -416,7 +546,7 @@ impl Reassembler { /// buffered and available for consumption. #[inline] pub fn total_received_len(&self) -> u64 { - let mut offset = self.start_offset; + let mut offset = self.cursors.start_offset; for slot in &self.slots { ensure!(slot.is_occupied(offset), offset); @@ -432,9 +562,7 @@ impl Reassembler { #[inline] pub fn reset(&mut self) { self.slots.clear(); - self.start_offset = Default::default(); - self.max_recv_offset = 0; - self.final_offset = u64::MAX; + self.cursors = Default::default(); } #[inline(always)] @@ -447,81 +575,43 @@ impl Reassembler { } } + /// Allocates a slot for a reader #[inline] - fn allocate_request(&mut self, mut idx: usize, mut request: Request) { - ensure!(!request.is_empty()); - - // if this is a fin request and the write is under the allocation size, then no need to - // do a full allocation that doesn't end up getting used. - if request.end_exclusive() == self.final_offset { - while !request.is_empty() { - let start = request.start(); - let mut size = Self::allocation_size(start); - let offset = Self::align_offset(start, size); - - let size_candidate = (start - offset) as usize + request.len(); - if size_candidate < size { - size = size_candidate; - } + fn allocate_slot(&mut self, reader: &R) -> Slot + where + R: Reader + ?Sized, + { + let start = reader.current_offset().as_u64(); + let mut size = Self::allocation_size(start); + let mut offset = Self::align_offset(start, size); - // set the current request to the upper slot and loop - request = self.allocate_slot(&mut idx, request, offset, size); + // don't allocate for data we've already consumed + if let Some(diff) = self.cursors.start_offset.checked_sub(offset) { + if diff > 0 { + debug_assert!( + reader.current_offset().as_u64() >= self.cursors.start_offset, + "requests should be split before allocating slots" + ); + offset = self.cursors.start_offset; + size -= diff as usize; } - return; } - while !request.is_empty() { - let start = request.start(); - let size = Self::allocation_size(start); - let offset = Self::align_offset(start, size); - // set the current request to the upper slot and loop - request = self.allocate_slot(&mut idx, request, offset, size); - } - } - - #[inline] - fn allocate_slot<'a>( - &mut self, - idx: &mut usize, - request: Request<'a>, - mut offset: u64, - mut size: usize, - ) -> Request<'a> { - // don't allocate for data we've already consumed - if let Some(diff) = self.start_offset.checked_sub(offset) { - debug_assert!( - request.start() >= self.start_offset, - "requests should be split before allocating slots" - ); - offset = self.start_offset; - size -= diff as usize; + if self.cursors.final_offset + - reader.current_offset().as_u64() + - reader.buffered_len() as u64 + == 0 + { + let size_candidate = (start - offset) as usize + reader.buffered_len(); + if size_candidate < size { + size = size_candidate; + } } let buffer = BytesMut::with_capacity(size); let end = offset + size as u64; - let mut slot = Slot::new(offset, end, buffer); - - let slot::Outcome { lower, mid, upper } = slot.try_write(request); - - unsafe { - assume!(lower.is_empty(), "lower requests should always be empty"); - } - - // first insert the newly-created Slot - debug_assert!(!slot.should_drop()); - self.insert(*idx, slot); - *idx += 1; - - // check if we have a mid-slot and insert that as well - if let Some(mid) = mid { - debug_assert!(!mid.should_drop()); - self.insert(*idx, mid); - *idx += 1; - } - - // return the upper request if we need to allocate more - upper + Slot::new(offset, end, buffer) } /// Aligns an offset to a certain alignment size @@ -576,12 +666,31 @@ impl Reassembler { assert_eq!(actual_len == 0, self.is_empty()); assert_eq!(self.iter().count(), chunks); - let mut prev_end = self.start_offset; + let mut prev_end = self.cursors.start_offset; - for slot in &self.slots { + for (idx, slot) in self.slots.iter().enumerate() { assert!(slot.start() >= prev_end, "{self:#?}"); assert!(!slot.should_drop(), "slot range should be non-empty"); prev_end = slot.end_allocated(); + + // make sure if the slot is full, then it was unsplit into the next slot + if slot.is_full() { + let start = slot.start(); + let end = slot.end(); + + let Some(next) = self.slots.get(idx + 1) else { + continue; + }; + + ensure!(next.start() == end, continue); + + let current_block = Self::align_offset(start, Self::allocation_size(start)); + let next_block = + Self::align_offset(next.start(), Self::allocation_size(next.start())); + ensure!(current_block == next_block, continue); + + panic!("unmerged slots at {idx} and {} {self:#?}", idx + 1); + } } } } @@ -596,7 +705,7 @@ impl<'a> Iter<'a> { #[inline] fn new(buffer: &'a Reassembler) -> Self { Self { - prev_end: buffer.start_offset, + prev_end: buffer.cursors.start_offset, inner: buffer.slots.iter(), } } diff --git a/quic/s2n-quic-core/src/buffer/reassembler/reader.rs b/quic/s2n-quic-core/src/buffer/reassembler/reader.rs index 527703b270..edc77719a8 100644 --- a/quic/s2n-quic-core/src/buffer/reassembler/reader.rs +++ b/quic/s2n-quic-core/src/buffer/reassembler/reader.rs @@ -113,7 +113,7 @@ impl Reader for Reassembler { fn current_offset(&self) -> VarInt { unsafe { // SAFETY: offset will always fit into a VarInt - VarInt::new_unchecked(self.start_offset) + VarInt::new_unchecked(self.cursors.start_offset) } } diff --git a/quic/s2n-quic-core/src/buffer/reassembler/request.rs b/quic/s2n-quic-core/src/buffer/reassembler/request.rs index 8bbe2b6d20..426d846931 100644 --- a/quic/s2n-quic-core/src/buffer/reassembler/request.rs +++ b/quic/s2n-quic-core/src/buffer/reassembler/request.rs @@ -1,15 +1,17 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -use super::Error; -use crate::varint::VarInt; -use bytes::{BufMut, BytesMut}; +use crate::{ + buffer::{reader, writer, Error, Reader}, + varint::VarInt, +}; use core::fmt; #[derive(PartialEq, Eq)] pub struct Request<'a> { offset: u64, data: &'a [u8], + is_fin: bool, } impl<'a> fmt::Debug for Request<'a> { @@ -17,93 +19,79 @@ impl<'a> fmt::Debug for Request<'a> { f.debug_struct("Request") .field("offset", &self.offset) .field("len", &self.data.len()) + .field("is_fin", &self.is_fin) .finish() } } impl<'a> Request<'a> { #[inline] - pub fn new(offset: VarInt, data: &'a [u8]) -> Result { + pub fn new(offset: VarInt, data: &'a [u8], is_fin: bool) -> Result { offset .checked_add_usize(data.len()) .ok_or(Error::OutOfRange)?; Ok(Self { offset: offset.as_u64(), data, + is_fin, }) } +} +impl<'a> Reader for Request<'a> { #[inline] - pub fn split(self, offset: u64) -> (Self, Self) { - let mid = offset.saturating_sub(self.offset); - let mid = self.data.len().min(mid as _); - unsafe { - assume!(mid <= self.data.len()); - } - let (a, b) = self.data.split_at(mid); - - let (a_offset, b_offset) = if self.offset < offset { - (self.offset, offset) - } else { - (offset, self.offset) - }; - - let a = Self { - offset: a_offset, - data: a, - }; - let b = Self { - offset: b_offset, - data: b, - }; - - (a, b) + fn current_offset(&self) -> VarInt { + unsafe { VarInt::new_unchecked(self.offset) } } #[inline] - pub fn write(self, buffer: &mut BytesMut) { - super::probe::write(self.offset, self.data.len()); - let chunk = buffer.chunk_mut(); - unsafe { - let len = self.data.len(); - assume!(len <= chunk.len(), "{:?} <= {:?}", len, chunk.len()); - - // Safety: `chunk` is always going to be uninitialized memory which is allocated through `BytesMut`. - // Since the receive buffer owns this allocation, it's impossible for the request to overlap - // with this `chunk`. - core::ptr::copy_nonoverlapping(self.data.as_ptr(), chunk.as_mut_ptr(), len); - - assume!(buffer.len() + len <= buffer.capacity()); - buffer.advance_mut(len); + fn final_offset(&self) -> Option { + if self.is_fin { + Some(self.current_offset() + self.data.len()) + } else { + None } } +} - #[inline] - pub fn len(&self) -> usize { - self.data.len() - } +impl<'a> reader::Storage for Request<'a> { + type Error = core::convert::Infallible; #[inline] - pub fn is_empty(&self) -> bool { - self.data.is_empty() + fn buffered_len(&self) -> usize { + self.data.len() } #[inline] - pub fn start(&self) -> u64 { - self.offset + fn read_chunk(&mut self, watermark: usize) -> Result, Self::Error> { + let chunk = self.data.read_chunk(watermark)?; + self.offset += chunk.len() as u64; + Ok(chunk) } #[inline] - pub fn end_exclusive(&self) -> u64 { - self.offset + self.len() as u64 + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result + where + Dest: writer::Storage + ?Sized, + { + let mut dest = dest.track_write(); + let chunk = self.data.partial_copy_into(&mut dest)?; + self.offset += chunk.len() as u64; + self.offset += dest.written_len() as u64; + Ok(chunk) } #[inline] - pub fn into_option(self) -> Option { - if self.data.is_empty() { - None - } else { - Some(self) - } + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: writer::Storage + ?Sized, + { + let mut dest = dest.track_write(); + self.data.copy_into(&mut dest)?; + self.offset += dest.written_len() as u64; + Ok(()) } } diff --git a/quic/s2n-quic-core/src/buffer/reassembler/slot.rs b/quic/s2n-quic-core/src/buffer/reassembler/slot.rs index b087e5d3b1..da9898cc57 100644 --- a/quic/s2n-quic-core/src/buffer/reassembler/slot.rs +++ b/quic/s2n-quic-core/src/buffer/reassembler/slot.rs @@ -1,7 +1,10 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -use super::Request; +use crate::{ + buffer::{writer::Storage as _, Reader}, + varint::VarInt, +}; use bytes::{Buf, BufMut, BytesMut}; use core::fmt; @@ -25,13 +28,6 @@ impl fmt::Debug for Slot { } } -#[derive(Debug, PartialEq, Eq)] -pub struct Outcome<'a> { - pub lower: Request<'a>, - pub mid: Option, - pub upper: Request<'a>, -} - impl Slot { #[inline] pub fn new(start: u64, end: u64, data: BytesMut) -> Self { @@ -42,48 +38,108 @@ impl Slot { } #[inline(always)] - pub fn try_write<'a>(&mut self, request: Request<'a>) -> Outcome<'a> { - // trim off chunks lower than the start - let (lower, request) = request.split(self.start()); - - // trim off chunks we've already copied - let (_, request) = request.split(self.end()); - - // trim off chunks higher than the allocated end - let (to_write, upper) = request.split(self.end_allocated()); - - let mut mid = None; - - if let Some(to_write) = to_write.into_option() { - if to_write.start() > self.end() { - // find the split point between the buffers - let len = to_write.start() - self.start(); - - // create a new mid slot - let start = to_write.start(); - let mut data = unsafe { - assume!(self.data.len() < len as usize); - self.data.split_off(len as usize) - }; - - // copy the data to the buffer - to_write.write(&mut data); - - mid = Some(Self { - start, - end: self.end, - data, - }); - self.end = start; - } else { - // copy the data to the buffer - to_write.write(&mut self.data); - } + pub fn try_write_reader( + &mut self, + reader: &mut R, + filled_slot: &mut bool, + ) -> Result, R::Error> + where + R: Reader + ?Sized, + { + debug_assert!(self.start() <= reader.current_offset().as_u64()); + + let end = self.end(); + + if end < self.end_allocated() { + // trim off chunks we've already copied + reader.skip_until(unsafe { VarInt::new_unchecked(end) })?; + } else { + // we've already filled this slot so skip the entire thing on the reader + reader.skip_until(unsafe { VarInt::new_unchecked(self.end_allocated()) })?; + return Ok(None); + } + + ensure!(!reader.buffer_is_empty(), Ok(None)); + + // make sure this slot owns this range of data + ensure!( + reader.current_offset().as_u64() < self.end_allocated(), + Ok(None) + ); + + // if the current offsets match just do a straight copy + if reader.current_offset().as_u64() == end { + self.write_reader_end(reader, filled_slot)?; + self.invariants(); + return Ok(None); + } + + // split off the unfilled chunk from the filled chunk and return this filled one + + // find the split point between the buffers + let unfilled_len = reader.current_offset().as_u64() - self.start(); + + // create a new mid slot + let start = reader.current_offset().as_u64(); + let data = unsafe { + assume!(self.data.len() < unfilled_len as usize,); + self.data.split_off(unfilled_len as usize) + }; + + let mut filled = Self { + start, + end: self.end, + data, + }; + + // copy the data to the buffer + if let Err(err) = filled.write_reader_end(reader, filled_slot) { + // revert the split since the reader failed + self.data.unsplit(filled.data); + return Err(err); } + self.end = start; + self.invariants(); + filled.invariants(); - Outcome { lower, mid, upper } + Ok(Some(filled)) + } + + #[inline(always)] + fn write_reader_end( + &mut self, + reader: &mut R, + filled_slot: &mut bool, + ) -> Result<(), R::Error> + where + R: Reader + ?Sized, + { + debug_assert_eq!(reader.current_offset().as_u64(), self.end()); + + unsafe { + // SAFETY: the data buffer should have at least one byte of spare capacity if we got to + // this point + assume!(self.data.capacity() > self.data.len()); + } + let chunk = self.data.spare_capacity_mut(); + let mut chunk = bytes::buf::UninitSlice::uninit(chunk); + let chunk_len = chunk.len(); + let mut chunk = chunk.track_write(); + reader.copy_into(&mut chunk)?; + let len = chunk.written_len(); + + super::probe::write(self.end(), len); + + unsafe { + // SAFETY: we should not have written more than the spare capacity + assume!(self.data.len() + len <= self.data.capacity()); + self.data.advance_mut(len); + } + *filled_slot |= chunk_len == len; + + Ok(()) } #[inline] @@ -99,51 +155,52 @@ impl Slot { } self.data.unsplit(next.data); self.end = next.end; + self.invariants(); } - #[inline] + #[inline(always)] pub fn is_full(&self) -> bool { self.end() == self.end_allocated() } - #[inline] + #[inline(always)] pub fn is_empty(&self) -> bool { self.data.is_empty() } - #[inline] + #[inline(always)] pub fn is_occupied(&self, prev_offset: u64) -> bool { !self.is_empty() && self.start() == prev_offset } - #[inline] + #[inline(always)] pub fn as_slice(&self) -> &[u8] { &self.data } - #[inline] + #[inline(always)] pub fn data_mut(&mut self) -> &mut BytesMut { &mut self.data } - #[inline] + #[inline(always)] pub fn start(&self) -> u64 { self.start } - #[inline] + #[inline(always)] pub fn add_start(&mut self, len: usize) { self.start += len as u64; self.invariants() } - #[inline] + #[inline(always)] pub fn end(&self) -> u64 { - self.start() + self.data.len() as u64 + self.start + self.data.len() as u64 } - #[inline] + #[inline(always)] pub fn end_allocated(&self) -> u64 { self.end } @@ -173,12 +230,12 @@ impl Slot { } /// Indicates the slot isn't capable of storing any more data and should be dropped - #[inline] + #[inline(always)] pub fn should_drop(&self) -> bool { self.start() == self.end_allocated() } - #[inline] + #[inline(always)] fn invariants(&self) { if cfg!(debug_assertions) { assert!(self.data.capacity() <= 1 << 16, "{:?}", self); @@ -188,110 +245,3 @@ impl Slot { } } } - -#[cfg(test)] -mod tests { - use super::*; - use crate::varint::VarInt; - - #[test] - fn bytes_assumption() { - let mut data = BytesMut::with_capacity(100); - let half = data.split_off(50); - - // after splitting a BytesMut the capacity should also be split - assert_eq!(data.capacity(), 50); - assert_eq!(half.capacity(), 50); - } - - fn slot(range: core::ops::Range, data: &[u8]) -> Slot { - let mut buffer = BytesMut::with_capacity((range.end - range.start) as usize); - buffer.extend_from_slice(data); - Slot::new(range.start, range.end, buffer) - } - - fn req(offset: u64, data: &[u8]) -> Request { - Request::new(VarInt::new(offset).unwrap(), data).unwrap() - } - - macro_rules! assert_write { - ($slot:expr, $request:expr, $expected_slot:expr, $result:expr) => {{ - let mut slot = $slot; - let req = $request; - - let result = slot.try_write(req); - assert_eq!(slot, $expected_slot); - assert_eq!(result, $result); - }}; - } - - #[test] - fn overlap() { - assert_write!( - slot(4..8, &[1]), - req(0, &[42; 12]), - slot(4..8, &[1, 42, 42, 42]), - Outcome { - lower: req(0, &[42; 4]), - mid: None, - upper: req(8, &[42; 4]), - } - ); - } - - #[test] - fn upper() { - assert_write!( - slot(4..8, &[1]), - req(8, &[42; 4]), - slot(4..8, &[1]), - Outcome { - lower: req(4, &[]), - mid: None, - upper: req(8, &[42; 4]), - } - ); - } - - #[test] - fn lower() { - assert_write!( - slot(4..8, &[1]), - req(0, &[42; 4]), - slot(4..8, &[1]), - Outcome { - lower: req(0, &[42; 4]), - mid: None, - upper: req(8, &[]), - } - ); - } - - #[test] - fn mid() { - assert_write!( - slot(4..8, &[1]), - req(6, &[42; 1]), - slot(4..6, &[1]), - Outcome { - lower: req(4, &[]), - mid: Some(slot(6..8, &[42])), - upper: req(8, &[]), - } - ); - } - - #[test] - fn mid_upper() { - assert_write!( - slot(4..8, &[1]), - req(6, &[42; 4]), - slot(4..6, &[1]), - Outcome { - lower: req(4, &[]), - mid: Some(slot(6..8, &[42; 2])), - upper: req(8, &[42; 2]), - } - ); - } -} diff --git a/quic/s2n-quic-core/src/buffer/reassembler/tests.rs b/quic/s2n-quic-core/src/buffer/reassembler/tests.rs index 2b5f4a418f..c968ee66e7 100644 --- a/quic/s2n-quic-core/src/buffer/reassembler/tests.rs +++ b/quic/s2n-quic-core/src/buffer/reassembler/tests.rs @@ -284,7 +284,7 @@ fn chunk_duplicate_test() { let mut buffer = new_receive_buffer(); assert!(buffer.write_at(0u32.into(), &[0, 1, 2, 3]).is_ok()); - assert_eq!(4, buffer.len()); + assert_eq!(4, buffer.len(), "{buffer:#?}"); assert!(buffer.write_at(0u32.into(), &[10, 11, 12, 13]).is_ok()); // exact match assert!(buffer.write_at(0u32.into(), &[20, 21]).is_ok()); // beginning @@ -790,6 +790,8 @@ const INTERESTING_CHUNK_SIZES: &[u32] = &[4, 4095, 4096, 4097]; fn write_start_fin_test() { for size in INTERESTING_CHUNK_SIZES.iter().copied() { for pre_empty_fin in [false, true] { + dbg!(size, pre_empty_fin); + let bytes: Vec = Iterator::map(0..size, |v| v as u8).collect(); let mut buffer = Reassembler::new(); diff --git a/quic/s2n-quic-core/src/buffer/reassembler/writer.rs b/quic/s2n-quic-core/src/buffer/reassembler/writer.rs index 5659b700ab..21700b3a5b 100644 --- a/quic/s2n-quic-core/src/buffer/reassembler/writer.rs +++ b/quic/s2n-quic-core/src/buffer/reassembler/writer.rs @@ -1,12 +1,8 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -use super::{Reassembler, Slot}; -use crate::{ - buffer::{reader::Storage as _, Error, Reader, Writer}, - varint::VarInt, -}; -use bytes::BytesMut; +use super::Reassembler; +use crate::buffer::{Error, Reader, Writer}; impl Writer for Reassembler { #[inline] @@ -14,81 +10,6 @@ impl Writer for Reassembler { where R: Reader + ?Sized, { - // enable checks for the reader - let mut reader = reader.with_checks(); - let reader = &mut reader; - - let final_offset = reader.final_offset(); - - // optimize for the case where the stream consists of a single chunk - if let Some(final_offset) = final_offset { - let mut is_single_chunk_stream = true; - - let offset = reader.current_offset(); - - // the reader is starting at the beginning of the stream - is_single_chunk_stream &= offset == VarInt::ZERO; - // the reader has buffered the final offset - is_single_chunk_stream &= reader.has_buffered_fin(); - // no data has been consumed from the Reassembler - is_single_chunk_stream &= self.consumed_len() == 0; - // we aren't tracking any slots - is_single_chunk_stream &= self.slots.is_empty(); - - if is_single_chunk_stream { - let payload_len = reader.buffered_len(); - let end = final_offset.as_u64(); - - // don't allocate anything if we don't need to - if payload_len == 0 { - let chunk = reader.read_chunk(0)?; - debug_assert!(chunk.is_empty()); - } else { - let mut data = BytesMut::with_capacity(payload_len); - - // copy the whole thing into `data` - reader.copy_into(&mut data)?; - - self.slots.push_back(Slot::new(offset.as_u64(), end, data)); - }; - - // update the final offset after everything was read correctly - self.final_offset = end; - self.invariants(); - - return Ok(()); - } - } - - // TODO add better support for copy avoidance by iterating to the appropriate slot and - // copying into that, if possible - - // fall back to copying individual chunks into the receive buffer - let mut first_write = true; - loop { - let offset = reader.current_offset(); - let chunk = reader.read_chunk(usize::MAX)?; - - // Record the final size before writing to avoid excess allocation. This also needs to - // happen after we read the first chunk in case there are errors. - if first_write { - if let Some(offset) = final_offset { - self.write_at_fin(offset, &[]).map_err(Error::mapped)?; - } - } - - // TODO maybe specialize on BytesMut chunks? - for now we'll just treat them as - // slices - - self.write_at(offset, &chunk).map_err(Error::mapped)?; - - first_write = false; - - if reader.buffer_is_empty() { - break; - } - } - - Ok(()) + self.write_reader(reader) } }