Skip to content

Commit

Permalink
feat(s2n-quic-core): add buffer reader/writer traits
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft committed Jan 19, 2024
1 parent 2d060fe commit 0902985
Show file tree
Hide file tree
Showing 40 changed files with 2,785 additions and 211 deletions.
42 changes: 12 additions & 30 deletions quic/s2n-quic-bench/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// SPDX-License-Identifier: Apache-2.0

use criterion::{black_box, BenchmarkId, Criterion, Throughput};
use s2n_quic_core::{buffer::ReceiveBuffer, varint::VarInt};
use s2n_quic_core::{
buffer::{reader::Storage as _, writer, Reassembler},
varint::VarInt,
};

pub fn benchmarks(c: &mut Criterion) {
let mut group = c.benchmark_group("buffer");
Expand All @@ -13,19 +16,21 @@ pub fn benchmarks(c: &mut Criterion) {
group.throughput(Throughput::Bytes(input.len() as _));

group.bench_with_input(BenchmarkId::new("skip", size), &input, |b, _input| {
let mut buffer = ReceiveBuffer::new();
let mut buffer = Reassembler::new();
let size = VarInt::try_from(size).unwrap();
b.iter(move || {
buffer.skip(black_box(size)).unwrap();
});
});

group.bench_with_input(BenchmarkId::new("write_at", size), &input, |b, input| {
let mut buffer = ReceiveBuffer::new();
let mut buffer = Reassembler::new();
let mut offset = VarInt::from_u8(0);
let len = VarInt::new(input.len() as _).unwrap();
b.iter(move || {
buffer.write_at(offset, input).unwrap();
buffer.copy_into_buf(&mut NoOpBuf);
// Avoid oversampling the `pop` implementation
buffer.copy_into(&mut writer::storage::Discard).unwrap();
offset += len;
});
});
Expand All @@ -36,42 +41,19 @@ pub fn benchmarks(c: &mut Criterion) {
BenchmarkId::new("write_at_fragmented", size),
&input,
|b, input| {
let mut buffer = ReceiveBuffer::new();
let mut buffer = Reassembler::new();
let mut offset = VarInt::from_u8(0);
let len = VarInt::new(input.len() as _).unwrap();
b.iter(move || {
let first_offset = offset + len;
buffer.write_at(first_offset, input).unwrap();
let second_offset = offset;
buffer.write_at(second_offset, input).unwrap();
buffer.copy_into_buf(&mut NoOpBuf);
// Avoid oversampling the `pop` implementation
buffer.copy_into(&mut writer::storage::Discard).unwrap();
offset = first_offset + len;
});
},
);
}
}

/// A BufMut implementation that doesn't actually copy data into it
///
/// This is used to avoid oversampling the `pop` implementation for
/// `write_at` benchmarks.
struct NoOpBuf;

unsafe impl bytes::BufMut for NoOpBuf {
#[inline]
fn remaining_mut(&self) -> usize {
usize::MAX
}

#[inline]
unsafe fn advance_mut(&mut self, _cnt: usize) {}

#[inline]
fn put_slice(&mut self, _slice: &[u8]) {}

#[inline]
fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
unimplemented!()
}
}
14 changes: 14 additions & 0 deletions quic/s2n-quic-core/src/buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

pub mod duplex;
mod error;
pub mod reader;
pub mod reassembler;
pub mod writer;

pub use duplex::Duplex;
pub use error::Error;
pub use reader::Reader;
pub use reassembler::Reassembler;
pub use writer::Writer;
17 changes: 17 additions & 0 deletions quic/s2n-quic-core/src/buffer/duplex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use super::{Error, Reader, Writer};
use crate::varint::VarInt;

mod split;

pub use split::Split;

pub trait Duplex: Reader + Writer {}

impl<T: Reader + Writer> Duplex for T {}

pub trait Skip: Duplex {
fn skip(&mut self, len: VarInt, final_offset: Option<VarInt>) -> Result<(), Error>;
}
164 changes: 164 additions & 0 deletions quic/s2n-quic-core/src/buffer/duplex/split.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::{
buffer::{
duplex,
reader::{self, storage::Infallible as _, Reader, Storage as _},
writer::{self, Writer},
Error,
},
varint::VarInt,
};
use core::convert::Infallible;

pub struct Split<'a, C, D>
where
C: writer::Storage + ?Sized,
D: duplex::Skip<Error = Infallible> + ?Sized,
{
chunk: &'a mut C,
duplex: &'a mut D,
}

impl<'a, C, D> Split<'a, C, D>
where
C: writer::Storage + ?Sized,
D: duplex::Skip<Error = Infallible> + ?Sized,
{
#[inline]
pub fn new(chunk: &'a mut C, duplex: &'a mut D) -> Self {
Self { chunk, duplex }
}
}

/// Delegates to the inner Duplex
impl<'a, C, D> reader::Storage for Split<'a, C, D>
where
C: writer::Storage + ?Sized,
D: duplex::Skip<Error = Infallible> + ?Sized,
{
type Error = D::Error;

#[inline]
fn buffered_len(&self) -> usize {
self.duplex.buffered_len()
}

#[inline]
fn buffer_is_empty(&self) -> bool {
self.duplex.buffer_is_empty()
}

#[inline]
fn read_chunk(&mut self, watermark: usize) -> Result<reader::storage::Chunk<'_>, Self::Error> {
self.duplex.read_chunk(watermark)
}

#[inline]
fn partial_copy_into<Dest>(
&mut self,
dest: &mut Dest,
) -> Result<reader::storage::Chunk<'_>, Self::Error>
where
Dest: crate::buffer::writer::Storage + ?Sized,
{
self.duplex.partial_copy_into(dest)
}

#[inline]
fn copy_into<Dest>(&mut self, dest: &mut Dest) -> Result<(), Self::Error>
where
Dest: crate::buffer::writer::Storage + ?Sized,
{
self.duplex.copy_into(dest)
}
}

/// Delegates to the inner Duplex
impl<'a, C, D> Reader for Split<'a, C, D>
where
C: writer::Storage + ?Sized,
D: duplex::Skip<Error = Infallible> + ?Sized,
{
#[inline]
fn current_offset(&self) -> VarInt {
self.duplex.current_offset()
}

#[inline]
fn final_offset(&self) -> Option<VarInt> {
self.duplex.final_offset()
}

#[inline]
fn has_buffered_fin(&self) -> bool {
self.duplex.has_buffered_fin()
}

#[inline]
fn is_consumed(&self) -> bool {
self.duplex.is_consumed()
}
}

impl<'a, C, D> Writer for Split<'a, C, D>
where
C: writer::Storage + ?Sized,
D: duplex::Skip<Error = Infallible> + ?Sized,
{
#[inline]
fn copy_from<R>(&mut self, reader: &mut R) -> Result<(), Error<R::Error>>
where
R: Reader + ?Sized,
{
// enable reader checks
let mut reader = reader.with_checks();
let reader = &mut reader;

let initial_offset = reader.current_offset();
let final_offset = reader.final_offset();
let is_contiguous = initial_offset == self.duplex.current_offset();

{
// if the chunk specializes writing zero-copy Bytes/BytesMut, then just write to the
// receive buffer, since that's what it stores
let mut should_delegate = C::SPECIALIZES_BYTES || C::SPECIALIZES_BYTES_MUT;

// if this packet is non-contiguous, then delegate to the wrapped writer
should_delegate |= !is_contiguous;

// if the chunk doesn't have any remaining capacity, then delegate
should_delegate |= !self.chunk.has_remaining_capacity();

if should_delegate {
self.duplex.copy_from(reader)?;

if !self.duplex.buffer_is_empty() && self.chunk.has_remaining_capacity() {
self.duplex.infallible_copy_into(self.chunk);
}

return Ok(());
}
}

debug_assert!(
self.chunk.has_remaining_capacity(),
"this code should only be executed if the chunk is empty"
);

reader.copy_into(self.chunk)?;

let write_len = reader.current_offset() - initial_offset;

self.duplex
.skip(write_len, final_offset)
.map_err(Error::mapped)?;

if !reader.buffer_is_empty() {
self.duplex.copy_from(reader)?;
}

Ok(())
}
}
46 changes: 46 additions & 0 deletions quic/s2n-quic-core/src/buffer/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum Error<Reader = core::convert::Infallible> {
/// An invalid data range was provided
OutOfRange,
/// The provided final size was invalid for the buffer's state
InvalidFin,
/// The provided reader failed
ReaderError(Reader),
}

impl<Reader> From<Reader> for Error<Reader> {
#[inline]
fn from(reader: Reader) -> Self {
Self::ReaderError(reader)
}
}

impl Error {
#[inline]
pub fn mapped<Reader>(error: Error) -> Error<Reader> {
match error {
Error::OutOfRange => Error::OutOfRange,
Error::InvalidFin => Error::InvalidFin,
Error::ReaderError(_) => unreachable!(),
}
}
}

#[cfg(feature = "std")]
impl<Reader: std::error::Error> std::error::Error for Error<Reader> {}

impl<Reader: core::fmt::Display> core::fmt::Display for Error<Reader> {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
match self {
Self::OutOfRange => write!(f, "write extends out of the maximum possible offset"),
Self::InvalidFin => write!(
f,
"write modifies the final offset in a non-compliant manner"
),
Self::ReaderError(reader) => write!(f, "the provided reader failed with: {reader}"),
}
}
}
6 changes: 0 additions & 6 deletions quic/s2n-quic-core/src/buffer/mod.rs

This file was deleted.

Loading

0 comments on commit 0902985

Please sign in to comment.