Skip to content

Commit

Permalink
feat(s2n-quic-core): add buffer reader
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft committed Jan 17, 2024
1 parent 2d060fe commit ffebc04
Show file tree
Hide file tree
Showing 22 changed files with 1,676 additions and 31 deletions.
25 changes: 25 additions & 0 deletions quic/s2n-quic-core/src/buffer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,31 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

pub mod reader;
mod receive_buffer;
pub mod writer;

pub use receive_buffer::*;

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum Error {
/// An invalid data range was provided
OutOfRange,
/// The provided final size was invalid for the buffer's state
InvalidFin,
}

#[cfg(feature = "std")]
impl std::error::Error for Error {}

impl core::fmt::Display for Error {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
match self {
Self::OutOfRange => write!(f, "write extends out of the maximum possible offset"),
Self::InvalidFin => write!(
f,
"write modifies the final offset in a non-compliant manner"
),
}
}
}
48 changes: 48 additions & 0 deletions quic/s2n-quic-core/src/buffer/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::varint::VarInt;

pub mod chunk;
mod empty;
pub mod incremental;
mod max_data;
mod slice;

pub use chunk::Chunk;
pub use empty::Empty;
pub use incremental::Incremental;
pub use max_data::MaxData;
pub use slice::Slice;

pub trait Reader: Chunk {
/// Returns the currently read offset for the stream
fn current_offset(&self) -> VarInt;

/// Returns the final offset for the stream
fn final_offset(&self) -> Option<VarInt>;

/// Returns `true` if the reader has the final offset buffered
#[inline]
fn has_buffered_fin(&self) -> bool {
self.final_offset().map_or(false, |fin| {
let buffered_end = self
.current_offset()
.as_u64()
.saturating_add(self.buffered_len() as u64);
fin == buffered_end
})
}

/// Limits the maximum offset that the caller can read from the reader
#[inline]
fn with_max_data(&mut self, max_data: VarInt) -> MaxData<Self> {
MaxData::new(self, max_data)
}

/// Temporarily clears the buffer for the reader, while preserving the offsets
#[inline]
fn with_empty_buffer(&self) -> Empty<Self> {
Empty::new(self)
}
}
51 changes: 51 additions & 0 deletions quic/s2n-quic-core/src/buffer/reader/chunk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

mod buf;
mod bytes;
mod force_copy;
mod io_slice;
mod slice;
mod trailer;

#[cfg(test)]
mod tests;

pub use buf::Buf;
pub use force_copy::ForceCopy;
pub use io_slice::IoSlice;
pub use trailer::Trailer;

pub trait Chunk {
type Error;

/// Returns the length of the chunk
fn buffered_len(&self) -> usize;

/// Returns if the chunk is empty
#[inline]
fn buffer_is_empty(&self) -> bool {
self.buffered_len() == 0
}

/// Reads the current trailer for the chunk
fn read_trailer(&mut self, watermark: usize) -> Result<Trailer<'_>, Self::Error>;

/// Copies the chunk of bytes into `dest`.
///
/// `dest` must always be less than or equal to the value returned from `buffered_len`.
///
/// The chunk may optionally return a `Trailer`, which can be used by the caller to defer
/// copying the trailing chunk until later.
fn copy_into<Dest>(&mut self, dest: &mut Dest) -> Result<Trailer<'_>, Self::Error>
where
Dest: crate::buffer::writer::Chunk;

/// Forces the entire chunk to be copied
///
/// The returned `Trailer` will always be empty.
#[inline]
fn force_copy(&mut self) -> ForceCopy<Self> {
ForceCopy::new(self)
}
}
113 changes: 113 additions & 0 deletions quic/s2n-quic-core/src/buffer/reader/chunk/buf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use super::{Chunk, Trailer};
use crate::assume;
use core::cmp::Ordering;

pub struct Buf<'a, B: bytes::Buf> {
buf: &'a mut B,
pending: usize,
}

impl<'a, B> Buf<'a, B>
where
B: bytes::Buf,
{
#[inline]
pub fn new(buf: &'a mut B) -> Self {
Self { buf, pending: 0 }
}

#[inline]
fn commit_pending(&mut self) {
if self.pending > 0 {
unsafe {
assume!(self.buf.remaining() >= self.pending);
assume!(self.buf.chunk().len() >= self.pending);
}
self.buf.advance(self.pending);
self.pending = 0;
}
}
}

impl<'a, B> Chunk for Buf<'a, B>
where
B: bytes::Buf,
{
type Error = core::convert::Infallible;

#[inline]
fn buffered_len(&self) -> usize {
unsafe {
assume!(self.buf.remaining() >= self.pending);
assume!(self.buf.chunk().len() >= self.pending);
}
self.buf.remaining() - self.pending
}

#[inline]
fn read_trailer(&mut self, watermark: usize) -> Result<Trailer, Self::Error> {
self.commit_pending();
let chunk = self.buf.chunk();
let len = chunk.len().min(watermark);
if len > 0 {
self.pending = len;
}
Ok(chunk[..len].into())
}

#[inline]
fn copy_into<Dest: crate::buffer::writer::Chunk>(
&mut self,
dest: &mut Dest,
) -> Result<Trailer, Self::Error> {
unsafe {
assume!(dest.remaining_capacity() <= self.buffered_len());
}

self.commit_pending();

loop {
let chunk_len = self.buf.chunk().len();

if chunk_len == 0 {
debug_assert_eq!(
self.buf.remaining(),
0,
"buf returned empty chunk with remaining bytes"
);
}

match chunk_len.cmp(&dest.remaining_capacity()) {
Ordering::Less => {
dest.put_slice(self.buf.chunk());
self.buf.advance(chunk_len);
continue;
}
Ordering::Equal => {
let chunk = self.buf.chunk();
self.pending = chunk.len();
return Ok(chunk.into());
}
Ordering::Greater => {
let len = dest.remaining_capacity();
let chunk = &self.buf.chunk()[..len];
self.pending = len;
return Ok(chunk.into());
}
}
}
}
}

impl<'a, B> Drop for Buf<'a, B>
where
B: bytes::Buf,
{
#[inline]
fn drop(&mut self) {
self.commit_pending();
}
}
38 changes: 38 additions & 0 deletions quic/s2n-quic-core/src/buffer/reader/chunk/bytes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use super::{Chunk, Trailer};

macro_rules! impl_bytes {
($ty:ty) => {
impl Chunk for $ty {
type Error = core::convert::Infallible;

#[inline]
fn buffered_len(&self) -> usize {
self.len()
}

#[inline]
fn read_trailer(&mut self, watermark: usize) -> Result<Trailer, Self::Error> {
let len = self.len().min(watermark);
if self.len() == len {
Ok(core::mem::replace(self, Self::new()).into())
} else {
Ok(self.split_to(len).into())
}
}

#[inline]
fn copy_into<Dest: crate::buffer::writer::Chunk>(
&mut self,
dest: &mut Dest,
) -> Result<Trailer, Self::Error> {
self.read_trailer(dest.remaining_capacity())
}
}
};
}

impl_bytes!(bytes::Bytes);
impl_bytes!(bytes::BytesMut);
43 changes: 43 additions & 0 deletions quic/s2n-quic-core/src/buffer/reader/chunk/force_copy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use super::{Chunk, Trailer};

#[derive(Debug)]
pub struct ForceCopy<'a, C: Chunk + ?Sized>(&'a mut C);

impl<'a, C: Chunk + ?Sized> ForceCopy<'a, C> {
#[inline]
pub fn new(chunk: &'a mut C) -> Self {
Self(chunk)
}
}

impl<'a, C: Chunk + ?Sized> Chunk for ForceCopy<'a, C> {
type Error = C::Error;

#[inline]
fn buffered_len(&self) -> usize {
self.0.buffered_len()
}

#[inline]
fn buffer_is_empty(&self) -> bool {
self.0.buffer_is_empty()
}

#[inline]
fn read_trailer(&mut self, watermark: usize) -> Result<Trailer, Self::Error> {
self.0.read_trailer(watermark)
}

#[inline]
fn copy_into<Dest: crate::buffer::writer::Chunk>(
&mut self,
dest: &mut Dest,
) -> Result<Trailer, Self::Error> {
let trailer = self.0.copy_into(dest)?;
dest.put_trailer(trailer);
Ok(Trailer::empty())
}
}
Loading

0 comments on commit ffebc04

Please sign in to comment.