Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s2n-quic-transport): Adds stream batching functionality #2433

Merged
merged 6 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions quic/s2n-quic-core/src/connection/limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const MAX_KEEP_ALIVE_PERIOD_DEFAULT: Duration = Duration::from_secs(30);
//# received.
pub const ANTI_AMPLIFICATION_MULTIPLIER: u8 = 3;

pub const DEFAULT_STREAM_BURST_SIZE: u8 = 1;

#[non_exhaustive]
#[derive(Debug)]
pub struct ConnectionInfo<'a> {
Expand Down Expand Up @@ -74,6 +76,7 @@ pub struct Limits {
pub(crate) initial_round_trip_time: Duration,
pub(crate) migration_support: MigrationSupport,
pub(crate) anti_amplification_multiplier: u8,
pub(crate) stream_burst_size: u8,
}

impl Default for Limits {
Expand Down Expand Up @@ -120,6 +123,7 @@ impl Limits {
initial_round_trip_time: recovery::DEFAULT_INITIAL_RTT,
migration_support: MigrationSupport::RECOMMENDED,
anti_amplification_multiplier: ANTI_AMPLIFICATION_MULTIPLIER,
stream_burst_size: DEFAULT_STREAM_BURST_SIZE,
}
}

Expand Down Expand Up @@ -222,6 +226,7 @@ impl Limits {
max_active_connection_ids,
u64
);
setter!(with_stream_burst_size, stream_burst_size, u8);
setter!(with_ack_elicitation_interval, ack_elicitation_interval, u8);
setter!(with_max_ack_ranges, ack_ranges_limit, u8);
setter!(
Expand Down Expand Up @@ -384,6 +389,12 @@ impl Limits {
pub fn anti_amplification_multiplier(&self) -> u8 {
self.anti_amplification_multiplier
}

#[doc(hidden)]
#[inline]
pub fn stream_burst_size(&self) -> u8 {
self.stream_burst_size
}
}

/// Creates limits for a given connection
Expand Down
4 changes: 2 additions & 2 deletions quic/s2n-quic-transport/src/stream/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ impl<S: 'static + StreamTrait> stream::Manager for AbstractStreamManager<S> {
connection_limits.stream_limits(),
min_rtt,
),
streams: StreamContainer::new(),
streams: StreamContainer::new(connection_limits),
next_stream_ids: StreamIdSet::initial(),
local_endpoint_type,
initial_local_limits,
Expand Down Expand Up @@ -866,7 +866,7 @@ impl<S: 'static + StreamTrait> stream::Manager for AbstractStreamManager<S> {
}

if context.transmission_constraint().can_transmit() {
self.inner.streams.iterate_transmission_list(
self.inner.streams.send_on_transmission_list(
&mut self.inner.stream_controller,
|stream: &mut S| {
transmit_result = stream.on_transmit(context);
Expand Down
85 changes: 85 additions & 0 deletions quic/s2n-quic-transport/src/stream/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2530,6 +2530,11 @@ fn on_transmit_queries_streams_for_data() {
endpoint::Type::Server,
);

assert_eq!(
[stream_1, stream_2, stream_3, stream_4],
*manager.streams_waiting_for_transmission()
);

assert_eq!(
Err(OnTransmitError::CouldNotWriteFrame),
manager.on_transmit(&mut write_context)
Expand Down Expand Up @@ -3249,3 +3254,83 @@ fn stream_transmission_fairness_test() {
}
}
}

#[test]
fn stream_batching_test() {
// Create stream manager with a burst size of 5
let batch_size = 5;
let limits = ConnectionLimits::default()
.with_stream_burst_size(batch_size)
.unwrap();

let mut manager = AbstractStreamManager::<stream::StreamImpl>::new(
&limits,
endpoint::Type::Server,
create_default_initial_flow_control_limits(),
create_default_initial_flow_control_limits(),
DEFAULT_INITIAL_RTT,
);

// Create some open Streams
let mut stream_ids: VecDeque<StreamId> = (0..4)
.map(|_| {
let (accept_waker, _accept_wake_counter) = new_count_waker();
let (_wakeup_queue, wakeup_handle) = create_wakeup_queue_and_handle();
let mut token = connection::OpenToken::new();

let result = match manager.poll_open_local_stream(
StreamType::Bidirectional,
&mut token,
&mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle),
&Context::from_waker(&accept_waker),
) {
Poll::Ready(res) => res,
Poll::Pending => Err(connection::Error::unspecified()),
};
result.unwrap()
})
.collect();

// Create a context that can only fit packets of size 50
let mut frame_buffer = OutgoingFrameBuffer::new();
let max_packet_size = 50;
frame_buffer.set_max_packet_size(Some(max_packet_size));
let mut write_context = MockWriteContext::new(
time::now(),
&mut frame_buffer,
transmission::Constraint::None,
transmission::Mode::Normal,
endpoint::Type::Server,
);

const DATA_SIZE: usize = 2000;
let array: [u8; DATA_SIZE] = [1; DATA_SIZE];

// Set up each stream to have much more data to send than can fit in our test packet
for stream_id in &stream_ids {
manager
.with_asserted_stream(*stream_id, |stream: &mut stream::StreamImpl| {
let data_to_send = bytes::Bytes::copy_from_slice(&array);
stream.poll_request(ops::Request::default().send(&mut [data_to_send]), None)
})
.unwrap();
}
// make sure the order matches creation order
assert_eq!(stream_ids, manager.streams_waiting_for_transmission());

// Send 40 packets. Each stream gets to be the first to fill up a packet "batch_size" times.
// Then the stream gets sent to the back of the transmission list.
for idx in 1..=40 {
dbg!(idx);
let _ = manager.on_transmit(&mut write_context);

assert_eq!(stream_ids, manager.streams_waiting_for_transmission());

write_context.frame_buffer.flush();

if idx % batch_size == 0 {
// The first stream gets sent to the back of the transmission list once we have sent "batch_size" packets
stream_ids.rotate_left(1);
}
}
}
122 changes: 79 additions & 43 deletions quic/s2n-quic-transport/src/stream/stream_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
#![allow(unknown_lints, clippy::non_send_fields_in_send_ty)]

use crate::{
stream,
stream::{stream_impl::StreamTrait, stream_interests::StreamInterests},
connection,
stream::{self, stream_impl::StreamTrait, stream_interests::StreamInterests},
transmission,
};
use alloc::rc::Rc;
Expand Down Expand Up @@ -145,10 +145,12 @@ struct InterestLists<S> {
/// stream flow control window to increase
waiting_for_stream_flow_control_credits:
LinkedList<WaitingForStreamFlowControlCreditsAdapter<S>>,
counter: u8,
limit: u8,
}

impl<S: StreamTrait> InterestLists<S> {
fn new() -> Self {
fn new(connection_limits: &connection::Limits) -> Self {
Self {
done_streams: LinkedList::new(DoneStreamsAdapter::new()),
waiting_for_frame_delivery: LinkedList::new(WaitingForFrameDeliveryAdapter::new()),
Expand All @@ -160,16 +162,13 @@ impl<S: StreamTrait> InterestLists<S> {
waiting_for_stream_flow_control_credits: LinkedList::new(
WaitingForStreamFlowControlCreditsAdapter::new(),
),
counter: 0,
limit: connection_limits.stream_burst_size(),
}
}

/// Update all interest lists based on latest interest reported by a Node
fn update_interests(
&mut self,
node: &Rc<StreamNode<S>>,
interests: StreamInterests,
result: StreamContainerIterationResult,
) -> bool {
fn update_interests(&mut self, node: &Rc<StreamNode<S>>, interests: StreamInterests) -> bool {
// Note that all comparisons start by checking whether the stream is
// already part of the given list. This is required in order for the
// following operation to be safe. Inserting an element in a list while
Expand All @@ -181,11 +180,7 @@ impl<S: StreamTrait> InterestLists<S> {
($interest:expr, $link_name:ident, $list_name:ident) => {
if $interest != node.$link_name.is_linked() {
if $interest {
if matches!(result, StreamContainerIterationResult::Continue) {
self.$list_name.push_back(node.clone());
} else {
self.$list_name.push_front(node.clone());
}
self.$list_name.push_back(node.clone());
} else {
// Safety: We know that the node is only ever part of this list.
// While elements are in temporary lists, they always get unlinked
Expand Down Expand Up @@ -280,11 +275,7 @@ macro_rules! iterate_uninterruptible {
mut_stream.get_stream_interests()
};

$sel.interest_lists.update_interests(
&stream,
interests,
StreamContainerIterationResult::Continue,
);
$sel.interest_lists.update_interests(&stream, interests);
}

if !$sel.interest_lists.done_streams.is_empty() {
Expand All @@ -308,8 +299,7 @@ macro_rules! iterate_interruptible {

// Update the interests after the interaction
let interests = mut_stream.get_stream_interests();
$sel.interest_lists
.update_interests(&stream, interests, result);
$sel.interest_lists.update_interests(&stream, interests);

match result {
StreamContainerIterationResult::BreakAndInsertAtBack => {
Expand All @@ -331,11 +321,11 @@ macro_rules! iterate_interruptible {

impl<S: StreamTrait> StreamContainer<S> {
/// Creates a new `StreamContainer`
pub fn new() -> Self {
pub fn new(connection_limits: &connection::Limits) -> Self {
Self {
stream_map: RBTree::new(StreamTreeAdapter::new()),
nr_active_streams: 0,
interest_lists: InterestLists::new(),
interest_lists: InterestLists::new(connection_limits),
}
}

Expand All @@ -347,11 +337,7 @@ impl<S: StreamTrait> StreamContainer<S> {

let new_stream = Rc::new(StreamNode::new(stream));

self.interest_lists.update_interests(
&new_stream,
interests,
StreamContainerIterationResult::Continue,
);
self.interest_lists.update_interests(&new_stream, interests);

self.stream_map.insert(new_stream);
self.nr_active_streams += 1;
Expand Down Expand Up @@ -414,11 +400,7 @@ impl<S: StreamTrait> StreamContainer<S> {

// Update the interest lists after the interactions and then remove
// all finalized streams
if self.interest_lists.update_interests(
&node_ptr,
interests,
StreamContainerIterationResult::Continue,
) {
if self.interest_lists.update_interests(&node_ptr, interests) {
self.finalize_done_streams(controller);
}

Expand Down Expand Up @@ -537,11 +519,7 @@ impl<S: StreamTrait> StreamContainer<S> {
);
}

/// Iterates over all `Stream`s which are waiting for transmission,
/// and executes the given function on each `Stream`
///
/// The `stream::Controller` will be notified of streams that have been
/// closed to allow for further streams to be opened.
#[cfg(test)]
pub fn iterate_transmission_list<F>(&mut self, controller: &mut stream::Controller, mut func: F)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason we need to keep this and not just merge the two?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two reasons to iterate over the transmission list currently. One is to send on each stream node in the transmission list. The other is to read the order of the transmission list (used only in testing).
Basically I now need to distinguish these two codepaths, because when we're reading over the list, we don't want to increment the transmission counter. Obviously we do want that behavior when sending on the list. So the two separate codepaths are now iterate_transmission_list, which is only used for tests, and send_on_transmission_list, which is what contains the counting logic.

where
F: FnMut(&mut S) -> StreamContainerIterationResult,
Expand All @@ -555,6 +533,67 @@ impl<S: StreamTrait> StreamContainer<S> {
);
}

/// Iterates over all `Stream`s which are waiting for transmission,
/// and executes the given function on each `Stream`
///
/// The `stream::Controller` will be notified of streams that have been
/// closed to allow for further streams to be opened.
pub fn send_on_transmission_list<F>(&mut self, controller: &mut stream::Controller, mut func: F)
where
F: FnMut(&mut S) -> StreamContainerIterationResult,
{
// Head node gets pushed to the back of the list if it has run out of sending credits
if self.interest_lists.counter >= self.interest_lists.limit {
if let Some(node) = self.interest_lists.waiting_for_transmission.pop_front() {
self.interest_lists.waiting_for_transmission.push_back(node);
self.interest_lists.counter = 0;
}
}

let mut extracted_list = self.interest_lists.waiting_for_transmission.take();
let mut cursor = extracted_list.front_mut();

let mut head_node = true;
while let Some(stream) = cursor.remove() {
// Note that while we iterate over the intrusive lists here
// `stream` is part of no list anymore, since it also got dropped
// from list that is described by the `cursor`.
debug_assert!(!stream.waiting_for_transmission_link.is_linked());
let mut mut_stream = stream.inner.borrow_mut();
let result = func(&mut *mut_stream);

// Update the interests after the interaction
let interests = mut_stream.get_stream_interests();
self.interest_lists.update_interests(&stream, interests);

if head_node {
if matches!(result, StreamContainerIterationResult::Continue) {
self.interest_lists.counter += 1;
}

if !matches!(interests.transmission, transmission::Interest::NewData) {
self.interest_lists.counter = 0;
}
head_node = false;
}

match result {
StreamContainerIterationResult::BreakAndInsertAtBack => {
self.interest_lists
.waiting_for_transmission
.back_mut()
.splice_after(extracted_list);
break;
}
StreamContainerIterationResult::Continue => {}
}
}

if !self.interest_lists.done_streams.is_empty() {
self.finalize_done_streams(controller);
}
}

/// Iterates over all `Stream`s which are waiting for retransmission,
/// and executes the given function on each `Stream`
///
Expand Down Expand Up @@ -601,11 +640,8 @@ impl<S: StreamTrait> StreamContainer<S> {
// Safety: The stream reference is obtained from the RBTree, which
// stores it's nodes as `Rc`
let stream_node_rc = unsafe { stream_node_rc_from_ref(stream) };
self.interest_lists.update_interests(
&stream_node_rc,
interests,
StreamContainerIterationResult::Continue,
);
self.interest_lists
.update_interests(&stream_node_rc, interests);
}

if !self.interest_lists.done_streams.is_empty() {
Expand Down
Loading