Skip to content

Commit

Permalink
Another try at stream batching
Browse files Browse the repository at this point in the history
  • Loading branch information
maddeleine committed Jan 2, 2025
1 parent bf217ba commit a7497fc
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 18 deletions.
11 changes: 11 additions & 0 deletions quic/s2n-quic-core/src/connection/limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const MAX_KEEP_ALIVE_PERIOD_DEFAULT: Duration = Duration::from_secs(30);
//# received.
pub const ANTI_AMPLIFICATION_MULTIPLIER: u8 = 3;

pub const DEFAULT_STREAM_BURST_SIZE: u8 = 1;

#[non_exhaustive]
#[derive(Debug)]
pub struct ConnectionInfo<'a> {
Expand Down Expand Up @@ -74,6 +76,7 @@ pub struct Limits {
pub(crate) initial_round_trip_time: Duration,
pub(crate) migration_support: MigrationSupport,
pub(crate) anti_amplification_multiplier: u8,
pub(crate) stream_burst_size: u8,
}

impl Default for Limits {
Expand Down Expand Up @@ -120,6 +123,7 @@ impl Limits {
initial_round_trip_time: recovery::DEFAULT_INITIAL_RTT,
migration_support: MigrationSupport::RECOMMENDED,
anti_amplification_multiplier: ANTI_AMPLIFICATION_MULTIPLIER,
stream_burst_size: DEFAULT_STREAM_BURST_SIZE,
}
}

Expand Down Expand Up @@ -222,6 +226,7 @@ impl Limits {
max_active_connection_ids,
u64
);
setter!(with_stream_burst_size, stream_burst_size, u8);
setter!(with_ack_elicitation_interval, ack_elicitation_interval, u8);
setter!(with_max_ack_ranges, ack_ranges_limit, u8);
setter!(
Expand Down Expand Up @@ -384,6 +389,12 @@ impl Limits {
pub fn anti_amplification_multiplier(&self) -> u8 {
self.anti_amplification_multiplier
}

#[doc(hidden)]
#[inline]
pub fn stream_burst_size(&self) -> u8 {
self.stream_burst_size
}
}

/// Creates limits for a given connection
Expand Down
4 changes: 2 additions & 2 deletions quic/s2n-quic-transport/src/stream/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ impl<S: 'static + StreamTrait> stream::Manager for AbstractStreamManager<S> {
connection_limits.stream_limits(),
min_rtt,
),
streams: StreamContainer::new(),
streams: StreamContainer::new(connection_limits),
next_stream_ids: StreamIdSet::initial(),
local_endpoint_type,
initial_local_limits,
Expand Down Expand Up @@ -866,7 +866,7 @@ impl<S: 'static + StreamTrait> stream::Manager for AbstractStreamManager<S> {
}

if context.transmission_constraint().can_transmit() {
self.inner.streams.iterate_transmission_list(
self.inner.streams.send_on_transmission_list(
&mut self.inner.stream_controller,
|stream: &mut S| {
transmit_result = stream.on_transmit(context);
Expand Down
85 changes: 85 additions & 0 deletions quic/s2n-quic-transport/src/stream/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2530,6 +2530,11 @@ fn on_transmit_queries_streams_for_data() {
endpoint::Type::Server,
);

assert_eq!(
[stream_1, stream_2, stream_3, stream_4],
*manager.streams_waiting_for_transmission()
);

assert_eq!(
Err(OnTransmitError::CouldNotWriteFrame),
manager.on_transmit(&mut write_context)
Expand Down Expand Up @@ -3249,3 +3254,83 @@ fn stream_transmission_fairness_test() {
}
}
}

#[test]
fn stream_batching_test() {
// Create stream manager with a burst size of 5
let batch_size = 5;
let limits = ConnectionLimits::default()
.with_stream_burst_size(batch_size)
.unwrap();

let mut manager = AbstractStreamManager::<stream::StreamImpl>::new(
&limits,
endpoint::Type::Server,
create_default_initial_flow_control_limits(),
create_default_initial_flow_control_limits(),
DEFAULT_INITIAL_RTT,
);

// Create some open Streams
let mut stream_ids: VecDeque<StreamId> = (0..4)
.map(|_| {
let (accept_waker, _accept_wake_counter) = new_count_waker();
let (_wakeup_queue, wakeup_handle) = create_wakeup_queue_and_handle();
let mut token = connection::OpenToken::new();

let result = match manager.poll_open_local_stream(
StreamType::Bidirectional,
&mut token,
&mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle),
&Context::from_waker(&accept_waker),
) {
Poll::Ready(res) => res,
Poll::Pending => Err(connection::Error::unspecified()),
};
result.unwrap()
})
.collect();

// Create a context that can only fit packets of size 50
let mut frame_buffer = OutgoingFrameBuffer::new();
let max_packet_size = 50;
frame_buffer.set_max_packet_size(Some(max_packet_size));
let mut write_context = MockWriteContext::new(
time::now(),
&mut frame_buffer,
transmission::Constraint::None,
transmission::Mode::Normal,
endpoint::Type::Server,
);

const DATA_SIZE: usize = 2000;
let array: [u8; DATA_SIZE] = [1; DATA_SIZE];

// Set up each stream to have much more data to send than can fit in our test packet
for stream_id in &stream_ids {
manager
.with_asserted_stream(*stream_id, |stream: &mut stream::StreamImpl| {
let data_to_send = bytes::Bytes::copy_from_slice(&array);
stream.poll_request(ops::Request::default().send(&mut [data_to_send]), None)
})
.unwrap();
}
// make sure the order matches creation order
assert_eq!(stream_ids, manager.streams_waiting_for_transmission());

// Send 40 packets. Each stream gets to be the first to fill up a packet "batch_size" times.
// Then the stream gets sent to the back of the transmission list.
for idx in 1..=40 {
dbg!(idx);
let _ = manager.on_transmit(&mut write_context);

assert_eq!(stream_ids, manager.streams_waiting_for_transmission());

write_context.frame_buffer.flush();

if idx % batch_size == 0 {
// The first stream gets sent to the back of the transmission list once we have sent "batch_size" packets
stream_ids.rotate_left(1);
}
}
}
90 changes: 74 additions & 16 deletions quic/s2n-quic-transport/src/stream/stream_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
#![allow(unknown_lints, clippy::non_send_fields_in_send_ty)]

use crate::{
stream,
stream::{stream_impl::StreamTrait, stream_interests::StreamInterests},
connection,
stream::{self, stream_impl::StreamTrait, stream_interests::StreamInterests},
transmission,
};
use alloc::rc::Rc;
Expand Down Expand Up @@ -145,10 +145,12 @@ struct InterestLists<S> {
/// stream flow control window to increase
waiting_for_stream_flow_control_credits:
LinkedList<WaitingForStreamFlowControlCreditsAdapter<S>>,
counter: u8,
limit: u8,
}

impl<S: StreamTrait> InterestLists<S> {
fn new() -> Self {
fn new(connection_limits: &connection::Limits) -> Self {
Self {
done_streams: LinkedList::new(DoneStreamsAdapter::new()),
waiting_for_frame_delivery: LinkedList::new(WaitingForFrameDeliveryAdapter::new()),
Expand All @@ -160,6 +162,8 @@ impl<S: StreamTrait> InterestLists<S> {
waiting_for_stream_flow_control_credits: LinkedList::new(
WaitingForStreamFlowControlCreditsAdapter::new(),
),
counter: 0,
limit: connection_limits.stream_burst_size(),
}
}

Expand All @@ -168,7 +172,7 @@ impl<S: StreamTrait> InterestLists<S> {
&mut self,
node: &Rc<StreamNode<S>>,
interests: StreamInterests,
result: StreamContainerIterationResult,
_result: StreamContainerIterationResult,
) -> bool {
// Note that all comparisons start by checking whether the stream is
// already part of the given list. This is required in order for the
Expand All @@ -181,11 +185,7 @@ impl<S: StreamTrait> InterestLists<S> {
($interest:expr, $link_name:ident, $list_name:ident) => {
if $interest != node.$link_name.is_linked() {
if $interest {
if matches!(result, StreamContainerIterationResult::Continue) {
self.$list_name.push_back(node.clone());
} else {
self.$list_name.push_front(node.clone());
}
self.$list_name.push_back(node.clone());
} else {
// Safety: We know that the node is only ever part of this list.
// While elements are in temporary lists, they always get unlinked
Expand Down Expand Up @@ -331,11 +331,11 @@ macro_rules! iterate_interruptible {

impl<S: StreamTrait> StreamContainer<S> {
/// Creates a new `StreamContainer`
pub fn new() -> Self {
pub fn new(connection_limits: &connection::Limits) -> Self {
Self {
stream_map: RBTree::new(StreamTreeAdapter::new()),
nr_active_streams: 0,
interest_lists: InterestLists::new(),
interest_lists: InterestLists::new(connection_limits),
}
}

Expand Down Expand Up @@ -537,11 +537,7 @@ impl<S: StreamTrait> StreamContainer<S> {
);
}

/// Iterates over all `Stream`s which are waiting for transmission,
/// and executes the given function on each `Stream`
///
/// The `stream::Controller` will be notified of streams that have been
/// closed to allow for further streams to be opened.
#[cfg(test)]
pub fn iterate_transmission_list<F>(&mut self, controller: &mut stream::Controller, mut func: F)
where
F: FnMut(&mut S) -> StreamContainerIterationResult,
Expand All @@ -555,6 +551,68 @@ impl<S: StreamTrait> StreamContainer<S> {
);
}

/// Iterates over all `Stream`s which are waiting for transmission,
/// and executes the given function on each `Stream`
///
/// The `stream::Controller` will be notified of streams that have been
/// closed to allow for further streams to be opened.
pub fn send_on_transmission_list<F>(&mut self, controller: &mut stream::Controller, mut func: F)
where
F: FnMut(&mut S) -> StreamContainerIterationResult,
{
// Head node gets pushed to the back of the list if it has run out of sending credits
if self.interest_lists.counter >= self.interest_lists.limit {
if let Some(node) = self.interest_lists.waiting_for_transmission.pop_front() {
self.interest_lists.waiting_for_transmission.push_back(node);
self.interest_lists.counter = 0;
}
}

let mut extracted_list = self.interest_lists.waiting_for_transmission.take();
let mut cursor = extracted_list.front_mut();

let mut head_node = true;
while let Some(stream) = cursor.remove() {
// Note that while we iterate over the intrusive lists here
// `stream` is part of no list anymore, since it also got dropped
// from list that is described by the `cursor`.
debug_assert!(!stream.waiting_for_transmission_link.is_linked());
let mut mut_stream = stream.inner.borrow_mut();
let result = func(&mut *mut_stream);

// Update the interests after the interaction
let interests = mut_stream.get_stream_interests();
self.interest_lists
.update_interests(&stream, interests, result);

if head_node {
if matches!(result, StreamContainerIterationResult::Continue) {
self.interest_lists.counter += 1;
}

if !matches!(interests.transmission, transmission::Interest::NewData) {
self.interest_lists.counter = 0;
}
head_node = false;
}

match result {
StreamContainerIterationResult::BreakAndInsertAtBack => {
self.interest_lists
.waiting_for_transmission
.back_mut()
.splice_after(extracted_list);
break;
}
StreamContainerIterationResult::Continue => {}
}
}

if !self.interest_lists.done_streams.is_empty() {
self.finalize_done_streams(controller);
}
}

/// Iterates over all `Stream`s which are waiting for retransmission,
/// and executes the given function on each `Stream`
///
Expand Down

0 comments on commit a7497fc

Please sign in to comment.