From a7497fc0946c47e4c1e46b001cf15c06bd068f2a Mon Sep 17 00:00:00 2001 From: Appelmans Date: Fri, 20 Dec 2024 10:09:43 -0800 Subject: [PATCH 1/6] Another try at stream batching --- quic/s2n-quic-core/src/connection/limits.rs | 11 +++ quic/s2n-quic-transport/src/stream/manager.rs | 4 +- .../src/stream/manager/tests.rs | 85 ++++++++++++++++++ .../src/stream/stream_container.rs | 90 +++++++++++++++---- 4 files changed, 172 insertions(+), 18 deletions(-) diff --git a/quic/s2n-quic-core/src/connection/limits.rs b/quic/s2n-quic-core/src/connection/limits.rs index 565a59d2ed..bdd877a8c5 100644 --- a/quic/s2n-quic-core/src/connection/limits.rs +++ b/quic/s2n-quic-core/src/connection/limits.rs @@ -35,6 +35,8 @@ const MAX_KEEP_ALIVE_PERIOD_DEFAULT: Duration = Duration::from_secs(30); //# received. pub const ANTI_AMPLIFICATION_MULTIPLIER: u8 = 3; +pub const DEFAULT_STREAM_BURST_SIZE: u8 = 1; + #[non_exhaustive] #[derive(Debug)] pub struct ConnectionInfo<'a> { @@ -74,6 +76,7 @@ pub struct Limits { pub(crate) initial_round_trip_time: Duration, pub(crate) migration_support: MigrationSupport, pub(crate) anti_amplification_multiplier: u8, + pub(crate) stream_burst_size: u8, } impl Default for Limits { @@ -120,6 +123,7 @@ impl Limits { initial_round_trip_time: recovery::DEFAULT_INITIAL_RTT, migration_support: MigrationSupport::RECOMMENDED, anti_amplification_multiplier: ANTI_AMPLIFICATION_MULTIPLIER, + stream_burst_size: DEFAULT_STREAM_BURST_SIZE, } } @@ -222,6 +226,7 @@ impl Limits { max_active_connection_ids, u64 ); + setter!(with_stream_burst_size, stream_burst_size, u8); setter!(with_ack_elicitation_interval, ack_elicitation_interval, u8); setter!(with_max_ack_ranges, ack_ranges_limit, u8); setter!( @@ -384,6 +389,12 @@ impl Limits { pub fn anti_amplification_multiplier(&self) -> u8 { self.anti_amplification_multiplier } + + #[doc(hidden)] + #[inline] + pub fn stream_burst_size(&self) -> u8 { + self.stream_burst_size + } } /// Creates limits for a given connection diff --git a/quic/s2n-quic-transport/src/stream/manager.rs b/quic/s2n-quic-transport/src/stream/manager.rs index efa396e920..49575d99fd 100644 --- a/quic/s2n-quic-transport/src/stream/manager.rs +++ b/quic/s2n-quic-transport/src/stream/manager.rs @@ -593,7 +593,7 @@ impl stream::Manager for AbstractStreamManager { connection_limits.stream_limits(), min_rtt, ), - streams: StreamContainer::new(), + streams: StreamContainer::new(connection_limits), next_stream_ids: StreamIdSet::initial(), local_endpoint_type, initial_local_limits, @@ -866,7 +866,7 @@ impl stream::Manager for AbstractStreamManager { } if context.transmission_constraint().can_transmit() { - self.inner.streams.iterate_transmission_list( + self.inner.streams.send_on_transmission_list( &mut self.inner.stream_controller, |stream: &mut S| { transmit_result = stream.on_transmit(context); diff --git a/quic/s2n-quic-transport/src/stream/manager/tests.rs b/quic/s2n-quic-transport/src/stream/manager/tests.rs index 1e078b98d6..37294e0910 100644 --- a/quic/s2n-quic-transport/src/stream/manager/tests.rs +++ b/quic/s2n-quic-transport/src/stream/manager/tests.rs @@ -2530,6 +2530,11 @@ fn on_transmit_queries_streams_for_data() { endpoint::Type::Server, ); + assert_eq!( + [stream_1, stream_2, stream_3, stream_4], + *manager.streams_waiting_for_transmission() + ); + assert_eq!( Err(OnTransmitError::CouldNotWriteFrame), manager.on_transmit(&mut write_context) @@ -3249,3 +3254,83 @@ fn stream_transmission_fairness_test() { } } } + +#[test] +fn stream_batching_test() { + // Create stream manager with a burst size of 5 + let batch_size = 5; + let limits = ConnectionLimits::default() + .with_stream_burst_size(batch_size) + .unwrap(); + + let mut manager = AbstractStreamManager::::new( + &limits, + endpoint::Type::Server, + create_default_initial_flow_control_limits(), + create_default_initial_flow_control_limits(), + DEFAULT_INITIAL_RTT, + ); + + // Create some open Streams + let mut stream_ids: VecDeque = (0..4) + .map(|_| { + let (accept_waker, _accept_wake_counter) = new_count_waker(); + let (_wakeup_queue, wakeup_handle) = create_wakeup_queue_and_handle(); + let mut token = connection::OpenToken::new(); + + let result = match manager.poll_open_local_stream( + StreamType::Bidirectional, + &mut token, + &mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle), + &Context::from_waker(&accept_waker), + ) { + Poll::Ready(res) => res, + Poll::Pending => Err(connection::Error::unspecified()), + }; + result.unwrap() + }) + .collect(); + + // Create a context that can only fit packets of size 50 + let mut frame_buffer = OutgoingFrameBuffer::new(); + let max_packet_size = 50; + frame_buffer.set_max_packet_size(Some(max_packet_size)); + let mut write_context = MockWriteContext::new( + time::now(), + &mut frame_buffer, + transmission::Constraint::None, + transmission::Mode::Normal, + endpoint::Type::Server, + ); + + const DATA_SIZE: usize = 2000; + let array: [u8; DATA_SIZE] = [1; DATA_SIZE]; + + // Set up each stream to have much more data to send than can fit in our test packet + for stream_id in &stream_ids { + manager + .with_asserted_stream(*stream_id, |stream: &mut stream::StreamImpl| { + let data_to_send = bytes::Bytes::copy_from_slice(&array); + stream.poll_request(ops::Request::default().send(&mut [data_to_send]), None) + }) + .unwrap(); + } + // make sure the order matches creation order + assert_eq!(stream_ids, manager.streams_waiting_for_transmission()); + + // Send 40 packets. Each stream gets to be the first to fill up a packet "batch_size" times. + // Then the stream gets sent to the back of the transmission list. + for idx in 1..=40 { + dbg!(idx); + let _ = manager.on_transmit(&mut write_context); + + assert_eq!(stream_ids, manager.streams_waiting_for_transmission()); + + write_context.frame_buffer.flush(); + + if idx % batch_size == 0 { + // The first stream gets sent to the back of the transmission list once we have sent "batch_size" packets + stream_ids.rotate_left(1); + } + } +} diff --git a/quic/s2n-quic-transport/src/stream/stream_container.rs b/quic/s2n-quic-transport/src/stream/stream_container.rs index f9b3ad10bb..c831ae4784 100644 --- a/quic/s2n-quic-transport/src/stream/stream_container.rs +++ b/quic/s2n-quic-transport/src/stream/stream_container.rs @@ -8,8 +8,8 @@ #![allow(unknown_lints, clippy::non_send_fields_in_send_ty)] use crate::{ - stream, - stream::{stream_impl::StreamTrait, stream_interests::StreamInterests}, + connection, + stream::{self, stream_impl::StreamTrait, stream_interests::StreamInterests}, transmission, }; use alloc::rc::Rc; @@ -145,10 +145,12 @@ struct InterestLists { /// stream flow control window to increase waiting_for_stream_flow_control_credits: LinkedList>, + counter: u8, + limit: u8, } impl InterestLists { - fn new() -> Self { + fn new(connection_limits: &connection::Limits) -> Self { Self { done_streams: LinkedList::new(DoneStreamsAdapter::new()), waiting_for_frame_delivery: LinkedList::new(WaitingForFrameDeliveryAdapter::new()), @@ -160,6 +162,8 @@ impl InterestLists { waiting_for_stream_flow_control_credits: LinkedList::new( WaitingForStreamFlowControlCreditsAdapter::new(), ), + counter: 0, + limit: connection_limits.stream_burst_size(), } } @@ -168,7 +172,7 @@ impl InterestLists { &mut self, node: &Rc>, interests: StreamInterests, - result: StreamContainerIterationResult, + _result: StreamContainerIterationResult, ) -> bool { // Note that all comparisons start by checking whether the stream is // already part of the given list. This is required in order for the @@ -181,11 +185,7 @@ impl InterestLists { ($interest:expr, $link_name:ident, $list_name:ident) => { if $interest != node.$link_name.is_linked() { if $interest { - if matches!(result, StreamContainerIterationResult::Continue) { - self.$list_name.push_back(node.clone()); - } else { - self.$list_name.push_front(node.clone()); - } + self.$list_name.push_back(node.clone()); } else { // Safety: We know that the node is only ever part of this list. // While elements are in temporary lists, they always get unlinked @@ -331,11 +331,11 @@ macro_rules! iterate_interruptible { impl StreamContainer { /// Creates a new `StreamContainer` - pub fn new() -> Self { + pub fn new(connection_limits: &connection::Limits) -> Self { Self { stream_map: RBTree::new(StreamTreeAdapter::new()), nr_active_streams: 0, - interest_lists: InterestLists::new(), + interest_lists: InterestLists::new(connection_limits), } } @@ -537,11 +537,7 @@ impl StreamContainer { ); } - /// Iterates over all `Stream`s which are waiting for transmission, - /// and executes the given function on each `Stream` - /// - /// The `stream::Controller` will be notified of streams that have been - /// closed to allow for further streams to be opened. + #[cfg(test)] pub fn iterate_transmission_list(&mut self, controller: &mut stream::Controller, mut func: F) where F: FnMut(&mut S) -> StreamContainerIterationResult, @@ -555,6 +551,68 @@ impl StreamContainer { ); } + /// Iterates over all `Stream`s which are waiting for transmission, + /// and executes the given function on each `Stream` + /// + /// The `stream::Controller` will be notified of streams that have been + /// closed to allow for further streams to be opened. + pub fn send_on_transmission_list(&mut self, controller: &mut stream::Controller, mut func: F) + where + F: FnMut(&mut S) -> StreamContainerIterationResult, + { + // Head node gets pushed to the back of the list if it has run out of sending credits + if self.interest_lists.counter >= self.interest_lists.limit { + if let Some(node) = self.interest_lists.waiting_for_transmission.pop_front() { + self.interest_lists.waiting_for_transmission.push_back(node); + self.interest_lists.counter = 0; + } + } + + let mut extracted_list = self.interest_lists.waiting_for_transmission.take(); + let mut cursor = extracted_list.front_mut(); + + let mut head_node = true; + while let Some(stream) = cursor.remove() { + // Note that while we iterate over the intrusive lists here + // `stream` is part of no list anymore, since it also got dropped + // from list that is described by the `cursor`. + debug_assert!(!stream.waiting_for_transmission_link.is_linked()); + let mut mut_stream = stream.inner.borrow_mut(); + let result = func(&mut *mut_stream); + + // Update the interests after the interaction + let interests = mut_stream.get_stream_interests(); + self.interest_lists + .update_interests(&stream, interests, result); + + if head_node { + if matches!(result, StreamContainerIterationResult::Continue) { + self.interest_lists.counter += 1; + } + + if !matches!(interests.transmission, transmission::Interest::NewData) { + self.interest_lists.counter = 0; + } + head_node = false; + } + + match result { + StreamContainerIterationResult::BreakAndInsertAtBack => { + self.interest_lists + .waiting_for_transmission + .back_mut() + .splice_after(extracted_list); + break; + } + StreamContainerIterationResult::Continue => {} + } + } + + if !self.interest_lists.done_streams.is_empty() { + self.finalize_done_streams(controller); + } + } + /// Iterates over all `Stream`s which are waiting for retransmission, /// and executes the given function on each `Stream` /// From 379fcc76fb044077b637a819066722f51911abf4 Mon Sep 17 00:00:00 2001 From: Appelmans Date: Thu, 2 Jan 2025 11:32:24 -0800 Subject: [PATCH 2/6] Removes unused result --- .../src/stream/stream_container.rs | 38 ++++--------------- 1 file changed, 8 insertions(+), 30 deletions(-) diff --git a/quic/s2n-quic-transport/src/stream/stream_container.rs b/quic/s2n-quic-transport/src/stream/stream_container.rs index c831ae4784..f059df6982 100644 --- a/quic/s2n-quic-transport/src/stream/stream_container.rs +++ b/quic/s2n-quic-transport/src/stream/stream_container.rs @@ -168,12 +168,7 @@ impl InterestLists { } /// Update all interest lists based on latest interest reported by a Node - fn update_interests( - &mut self, - node: &Rc>, - interests: StreamInterests, - _result: StreamContainerIterationResult, - ) -> bool { + fn update_interests(&mut self, node: &Rc>, interests: StreamInterests) -> bool { // Note that all comparisons start by checking whether the stream is // already part of the given list. This is required in order for the // following operation to be safe. Inserting an element in a list while @@ -280,11 +275,7 @@ macro_rules! iterate_uninterruptible { mut_stream.get_stream_interests() }; - $sel.interest_lists.update_interests( - &stream, - interests, - StreamContainerIterationResult::Continue, - ); + $sel.interest_lists.update_interests(&stream, interests); } if !$sel.interest_lists.done_streams.is_empty() { @@ -308,8 +299,7 @@ macro_rules! iterate_interruptible { // Update the interests after the interaction let interests = mut_stream.get_stream_interests(); - $sel.interest_lists - .update_interests(&stream, interests, result); + $sel.interest_lists.update_interests(&stream, interests); match result { StreamContainerIterationResult::BreakAndInsertAtBack => { @@ -347,11 +337,7 @@ impl StreamContainer { let new_stream = Rc::new(StreamNode::new(stream)); - self.interest_lists.update_interests( - &new_stream, - interests, - StreamContainerIterationResult::Continue, - ); + self.interest_lists.update_interests(&new_stream, interests); self.stream_map.insert(new_stream); self.nr_active_streams += 1; @@ -414,11 +400,7 @@ impl StreamContainer { // Update the interest lists after the interactions and then remove // all finalized streams - if self.interest_lists.update_interests( - &node_ptr, - interests, - StreamContainerIterationResult::Continue, - ) { + if self.interest_lists.update_interests(&node_ptr, interests) { self.finalize_done_streams(controller); } @@ -582,8 +564,7 @@ impl StreamContainer { // Update the interests after the interaction let interests = mut_stream.get_stream_interests(); - self.interest_lists - .update_interests(&stream, interests, result); + self.interest_lists.update_interests(&stream, interests); if head_node { if matches!(result, StreamContainerIterationResult::Continue) { @@ -659,11 +640,8 @@ impl StreamContainer { // Safety: The stream reference is obtained from the RBTree, which // stores it's nodes as `Rc` let stream_node_rc = unsafe { stream_node_rc_from_ref(stream) }; - self.interest_lists.update_interests( - &stream_node_rc, - interests, - StreamContainerIterationResult::Continue, - ); + self.interest_lists + .update_interests(&stream_node_rc, interests); } if !self.interest_lists.done_streams.is_empty() { From fb6f2c150d64da573facfe0b363cd61a2ec8cfa4 Mon Sep 17 00:00:00 2001 From: Appelmans Date: Thu, 2 Jan 2025 12:46:46 -0800 Subject: [PATCH 3/6] Some PR feedback --- quic/s2n-quic-core/src/connection/limits.rs | 12 +- .../src/stream/manager/tests.rs | 129 +++++++++--------- .../src/stream/stream_container.rs | 16 +-- 3 files changed, 79 insertions(+), 78 deletions(-) diff --git a/quic/s2n-quic-core/src/connection/limits.rs b/quic/s2n-quic-core/src/connection/limits.rs index bdd877a8c5..233c890e5e 100644 --- a/quic/s2n-quic-core/src/connection/limits.rs +++ b/quic/s2n-quic-core/src/connection/limits.rs @@ -35,7 +35,7 @@ const MAX_KEEP_ALIVE_PERIOD_DEFAULT: Duration = Duration::from_secs(30); //# received. pub const ANTI_AMPLIFICATION_MULTIPLIER: u8 = 3; -pub const DEFAULT_STREAM_BURST_SIZE: u8 = 1; +pub const DEFAULT_STREAM_BATCH_SIZE: u8 = 1; #[non_exhaustive] #[derive(Debug)] @@ -76,7 +76,7 @@ pub struct Limits { pub(crate) initial_round_trip_time: Duration, pub(crate) migration_support: MigrationSupport, pub(crate) anti_amplification_multiplier: u8, - pub(crate) stream_burst_size: u8, + pub(crate) stream_batch_size: u8, } impl Default for Limits { @@ -123,7 +123,7 @@ impl Limits { initial_round_trip_time: recovery::DEFAULT_INITIAL_RTT, migration_support: MigrationSupport::RECOMMENDED, anti_amplification_multiplier: ANTI_AMPLIFICATION_MULTIPLIER, - stream_burst_size: DEFAULT_STREAM_BURST_SIZE, + stream_batch_size: DEFAULT_STREAM_BATCH_SIZE, } } @@ -226,7 +226,7 @@ impl Limits { max_active_connection_ids, u64 ); - setter!(with_stream_burst_size, stream_burst_size, u8); + setter!(with_stream_batch_size, stream_batch_size, u8); setter!(with_ack_elicitation_interval, ack_elicitation_interval, u8); setter!(with_max_ack_ranges, ack_ranges_limit, u8); setter!( @@ -392,8 +392,8 @@ impl Limits { #[doc(hidden)] #[inline] - pub fn stream_burst_size(&self) -> u8 { - self.stream_burst_size + pub fn stream_batch_size(&self) -> u8 { + self.stream_batch_size } } diff --git a/quic/s2n-quic-transport/src/stream/manager/tests.rs b/quic/s2n-quic-transport/src/stream/manager/tests.rs index 37294e0910..cedf8c26e9 100644 --- a/quic/s2n-quic-transport/src/stream/manager/tests.rs +++ b/quic/s2n-quic-transport/src/stream/manager/tests.rs @@ -3257,80 +3257,81 @@ fn stream_transmission_fairness_test() { #[test] fn stream_batching_test() { - // Create stream manager with a burst size of 5 - let batch_size = 5; - let limits = ConnectionLimits::default() - .with_stream_burst_size(batch_size) - .unwrap(); + for batch_size in 1..=10 { + dbg!(batch_size); + let limits = ConnectionLimits::default() + .with_stream_batch_size(batch_size) + .unwrap(); - let mut manager = AbstractStreamManager::::new( - &limits, - endpoint::Type::Server, - create_default_initial_flow_control_limits(), - create_default_initial_flow_control_limits(), - DEFAULT_INITIAL_RTT, - ); + let mut manager = AbstractStreamManager::::new( + &limits, + endpoint::Type::Server, + create_default_initial_flow_control_limits(), + create_default_initial_flow_control_limits(), + DEFAULT_INITIAL_RTT, + ); - // Create some open Streams - let mut stream_ids: VecDeque = (0..4) - .map(|_| { - let (accept_waker, _accept_wake_counter) = new_count_waker(); - let (_wakeup_queue, wakeup_handle) = create_wakeup_queue_and_handle(); - let mut token = connection::OpenToken::new(); + // Create some open Streams + let mut stream_ids: VecDeque = (0..4) + .map(|_| { + let (accept_waker, _accept_wake_counter) = new_count_waker(); + let (_wakeup_queue, wakeup_handle) = create_wakeup_queue_and_handle(); + let mut token = connection::OpenToken::new(); - let result = match manager.poll_open_local_stream( - StreamType::Bidirectional, - &mut token, - &mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle), - &Context::from_waker(&accept_waker), - ) { - Poll::Ready(res) => res, - Poll::Pending => Err(connection::Error::unspecified()), - }; - result.unwrap() - }) - .collect(); + let result = match manager.poll_open_local_stream( + StreamType::Bidirectional, + &mut token, + &mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle), + &Context::from_waker(&accept_waker), + ) { + Poll::Ready(res) => res, + Poll::Pending => Err(connection::Error::unspecified()), + }; + result.unwrap() + }) + .collect(); - // Create a context that can only fit packets of size 50 - let mut frame_buffer = OutgoingFrameBuffer::new(); - let max_packet_size = 50; - frame_buffer.set_max_packet_size(Some(max_packet_size)); - let mut write_context = MockWriteContext::new( - time::now(), - &mut frame_buffer, - transmission::Constraint::None, - transmission::Mode::Normal, - endpoint::Type::Server, - ); + // Create a context that can only fit packets of size 50 + let mut frame_buffer = OutgoingFrameBuffer::new(); + let max_packet_size = 50; + frame_buffer.set_max_packet_size(Some(max_packet_size)); + let mut write_context = MockWriteContext::new( + time::now(), + &mut frame_buffer, + transmission::Constraint::None, + transmission::Mode::Normal, + endpoint::Type::Server, + ); - const DATA_SIZE: usize = 2000; - let array: [u8; DATA_SIZE] = [1; DATA_SIZE]; + const DATA_SIZE: usize = 2000; + let array: [u8; DATA_SIZE] = [1; DATA_SIZE]; - // Set up each stream to have much more data to send than can fit in our test packet - for stream_id in &stream_ids { - manager - .with_asserted_stream(*stream_id, |stream: &mut stream::StreamImpl| { - let data_to_send = bytes::Bytes::copy_from_slice(&array); - stream.poll_request(ops::Request::default().send(&mut [data_to_send]), None) - }) - .unwrap(); - } - // make sure the order matches creation order - assert_eq!(stream_ids, manager.streams_waiting_for_transmission()); + // Set up each stream to have much more data to send than can fit in our test packet + for stream_id in &stream_ids { + manager + .with_asserted_stream(*stream_id, |stream: &mut stream::StreamImpl| { + let data_to_send = bytes::Bytes::copy_from_slice(&array); + stream.poll_request(ops::Request::default().send(&mut [data_to_send]), None) + }) + .unwrap(); + } + // make sure the order matches creation order + assert_eq!(stream_ids, manager.streams_waiting_for_transmission()); - // Send 40 packets. Each stream gets to be the first to fill up a packet "batch_size" times. - // Then the stream gets sent to the back of the transmission list. - for idx in 1..=40 { - dbg!(idx); - let _ = manager.on_transmit(&mut write_context); + // Send 40 packets. Each stream gets to be the first to fill up a packet "batch_size" times. + // Then the stream gets sent to the back of the transmission list. + for idx in 1..=40 { + dbg!(idx); + let _ = manager.on_transmit(&mut write_context); - assert_eq!(stream_ids, manager.streams_waiting_for_transmission()); + assert_eq!(stream_ids, manager.streams_waiting_for_transmission()); - write_context.frame_buffer.flush(); + write_context.frame_buffer.flush(); - if idx % batch_size == 0 { - // The first stream gets sent to the back of the transmission list once we have sent "batch_size" packets - stream_ids.rotate_left(1); + if idx % batch_size == 0 { + // The first stream gets sent to the back of the transmission list once we have sent "batch_size" packets + stream_ids.rotate_left(1); + } } } } diff --git a/quic/s2n-quic-transport/src/stream/stream_container.rs b/quic/s2n-quic-transport/src/stream/stream_container.rs index f059df6982..2bf2510e41 100644 --- a/quic/s2n-quic-transport/src/stream/stream_container.rs +++ b/quic/s2n-quic-transport/src/stream/stream_container.rs @@ -145,8 +145,8 @@ struct InterestLists { /// stream flow control window to increase waiting_for_stream_flow_control_credits: LinkedList>, - counter: u8, - limit: u8, + transmission_counter: u8, + transmission_limit: u8, } impl InterestLists { @@ -162,8 +162,8 @@ impl InterestLists { waiting_for_stream_flow_control_credits: LinkedList::new( WaitingForStreamFlowControlCreditsAdapter::new(), ), - counter: 0, - limit: connection_limits.stream_burst_size(), + transmission_counter: 0, + transmission_limit: connection_limits.stream_batch_size(), } } @@ -543,10 +543,10 @@ impl StreamContainer { F: FnMut(&mut S) -> StreamContainerIterationResult, { // Head node gets pushed to the back of the list if it has run out of sending credits - if self.interest_lists.counter >= self.interest_lists.limit { + if self.interest_lists.transmission_counter >= self.interest_lists.transmission_limit { if let Some(node) = self.interest_lists.waiting_for_transmission.pop_front() { self.interest_lists.waiting_for_transmission.push_back(node); - self.interest_lists.counter = 0; + self.interest_lists.transmission_counter = 0; } } @@ -568,11 +568,11 @@ impl StreamContainer { if head_node { if matches!(result, StreamContainerIterationResult::Continue) { - self.interest_lists.counter += 1; + self.interest_lists.transmission_counter += 1; } if !matches!(interests.transmission, transmission::Interest::NewData) { - self.interest_lists.counter = 0; + self.interest_lists.transmission_counter = 0; } head_node = false; } From aa5a2fa5138f0dea89a9d59537cbe5d96443741c Mon Sep 17 00:00:00 2001 From: Appelmans Date: Fri, 3 Jan 2025 14:48:46 -0800 Subject: [PATCH 4/6] Expanded batching logic to retransmissions --- quic/s2n-quic-transport/src/stream/manager.rs | 2 +- .../src/stream/stream_container.rs | 133 +++++++++++------- 2 files changed, 83 insertions(+), 52 deletions(-) diff --git a/quic/s2n-quic-transport/src/stream/manager.rs b/quic/s2n-quic-transport/src/stream/manager.rs index 49575d99fd..191f97e753 100644 --- a/quic/s2n-quic-transport/src/stream/manager.rs +++ b/quic/s2n-quic-transport/src/stream/manager.rs @@ -849,7 +849,7 @@ impl stream::Manager for AbstractStreamManager { transmission::context::RetransmissionContext::new(context); // Prioritize retransmitting lost data - self.inner.streams.iterate_retransmission_list( + self.inner.streams.send_on_retransmission_list( &mut self.inner.stream_controller, |stream: &mut S| { transmit_result = stream.on_transmit(&mut retransmission_context); diff --git a/quic/s2n-quic-transport/src/stream/stream_container.rs b/quic/s2n-quic-transport/src/stream/stream_container.rs index 2bf2510e41..9ef4df9f46 100644 --- a/quic/s2n-quic-transport/src/stream/stream_container.rs +++ b/quic/s2n-quic-transport/src/stream/stream_container.rs @@ -319,6 +319,63 @@ macro_rules! iterate_interruptible { }; } +macro_rules! send_on_transmission_list { + ($sel:ident, $list_name:tt, $link_name:ident, $controller:ident, $func:ident) => { + // Head node gets pushed to the back of the list if it has run out of sending credits + if $sel.interest_lists.transmission_counter >= $sel.interest_lists.transmission_limit { + if let Some(node) = $sel.interest_lists.$list_name.pop_front() { + $sel.interest_lists.$list_name.push_back(node); + $sel.interest_lists.transmission_counter = 0; + } + } + + let mut extracted_list = $sel.interest_lists.$list_name.take(); + let mut cursor = extracted_list.front_mut(); + + let mut head_node = true; + while let Some(stream) = cursor.remove() { + // Note that while we iterate over the intrusive lists here + // `stream` is part of no list anymore, since it also got dropped + // from list that is described by the `cursor`. + debug_assert!(!stream.$link_name.is_linked()); + let mut mut_stream = stream.inner.borrow_mut(); + let result = $func(&mut *mut_stream); + + // Update the interests after the interaction + let interests = mut_stream.get_stream_interests(); + $sel.interest_lists.update_interests(&stream, interests); + + if head_node { + if matches!(result, StreamContainerIterationResult::Continue) { + $sel.interest_lists.transmission_counter += 1; + } + + if !matches!(interests.transmission, transmission::Interest::NewData) + && !matches!(interests.transmission, transmission::Interest::LostData) + { + $sel.interest_lists.transmission_counter = 0; + } + head_node = false; + } + + match result { + StreamContainerIterationResult::BreakAndInsertAtBack => { + $sel.interest_lists + .$list_name + .back_mut() + .splice_after(extracted_list); + break; + } + StreamContainerIterationResult::Continue => {} + } + } + + if !$sel.interest_lists.done_streams.is_empty() { + $sel.finalize_done_streams($controller); + } + }; +} + impl StreamContainer { /// Creates a new `StreamContainer` pub fn new(connection_limits: &connection::Limits) -> Self { @@ -542,56 +599,30 @@ impl StreamContainer { where F: FnMut(&mut S) -> StreamContainerIterationResult, { - // Head node gets pushed to the back of the list if it has run out of sending credits - if self.interest_lists.transmission_counter >= self.interest_lists.transmission_limit { - if let Some(node) = self.interest_lists.waiting_for_transmission.pop_front() { - self.interest_lists.waiting_for_transmission.push_back(node); - self.interest_lists.transmission_counter = 0; - } - } - - let mut extracted_list = self.interest_lists.waiting_for_transmission.take(); - let mut cursor = extracted_list.front_mut(); - - let mut head_node = true; - while let Some(stream) = cursor.remove() { - // Note that while we iterate over the intrusive lists here - // `stream` is part of no list anymore, since it also got dropped - // from list that is described by the `cursor`. - debug_assert!(!stream.waiting_for_transmission_link.is_linked()); - let mut mut_stream = stream.inner.borrow_mut(); - let result = func(&mut *mut_stream); - - // Update the interests after the interaction - let interests = mut_stream.get_stream_interests(); - self.interest_lists.update_interests(&stream, interests); - - if head_node { - if matches!(result, StreamContainerIterationResult::Continue) { - self.interest_lists.transmission_counter += 1; - } - - if !matches!(interests.transmission, transmission::Interest::NewData) { - self.interest_lists.transmission_counter = 0; - } - head_node = false; - } - - match result { - StreamContainerIterationResult::BreakAndInsertAtBack => { - self.interest_lists - .waiting_for_transmission - .back_mut() - .splice_after(extracted_list); - break; - } - StreamContainerIterationResult::Continue => {} - } - } + send_on_transmission_list!( + self, + waiting_for_transmission, + waiting_for_transmission_link, + controller, + func + ); + } - if !self.interest_lists.done_streams.is_empty() { - self.finalize_done_streams(controller); - } + #[cfg(test)] + pub fn iterate_retransmission_list( + &mut self, + controller: &mut stream::Controller, + mut func: F, + ) where + F: FnMut(&mut S) -> StreamContainerIterationResult, + { + iterate_interruptible!( + self, + waiting_for_retransmission, + waiting_for_retransmission_link, + controller, + func + ); } /// Iterates over all `Stream`s which are waiting for retransmission, @@ -599,14 +630,14 @@ impl StreamContainer { /// /// The `stream::Controller` will be notified of streams that have been /// closed to allow for further streams to be opened. - pub fn iterate_retransmission_list( + pub fn send_on_retransmission_list( &mut self, controller: &mut stream::Controller, mut func: F, ) where F: FnMut(&mut S) -> StreamContainerIterationResult, { - iterate_interruptible!( + send_on_transmission_list!( self, waiting_for_retransmission, waiting_for_retransmission_link, From df3e674c5a370c0a916a43034607db842468e79f Mon Sep 17 00:00:00 2001 From: Appelmans Date: Fri, 3 Jan 2025 15:15:02 -0800 Subject: [PATCH 5/6] typo --- quic/s2n-quic-core/src/packet/encoding.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quic/s2n-quic-core/src/packet/encoding.rs b/quic/s2n-quic-core/src/packet/encoding.rs index 992255da20..14d7609f88 100644 --- a/quic/s2n-quic-core/src/packet/encoding.rs +++ b/quic/s2n-quic-core/src/packet/encoding.rs @@ -254,7 +254,7 @@ pub trait PacketEncoder= minimum_payload_len`. However, the packet + // Ideally we would check that the `payload_len >= minimum_payload_len`. However, the packet // interceptor may rewrite the packet into something smaller. Instead of preventing that // here, we will rely on the `crate::transmission::Transmission` logic to ensure the // padding is initially written to ensure the minimum is met before interception is applied. From ffd59db62f576d64f28b248dbe048acaaedaadaa Mon Sep 17 00:00:00 2001 From: Appelmans Date: Mon, 6 Jan 2025 16:43:30 -0800 Subject: [PATCH 6/6] Separated retrans/trans logic --- .../src/stream/stream_container.rs | 32 +++++++++++++------ 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/quic/s2n-quic-transport/src/stream/stream_container.rs b/quic/s2n-quic-transport/src/stream/stream_container.rs index 9ef4df9f46..23cbd5e24f 100644 --- a/quic/s2n-quic-transport/src/stream/stream_container.rs +++ b/quic/s2n-quic-transport/src/stream/stream_container.rs @@ -146,6 +146,7 @@ struct InterestLists { waiting_for_stream_flow_control_credits: LinkedList>, transmission_counter: u8, + retransmission_counter: u8, transmission_limit: u8, } @@ -163,6 +164,7 @@ impl InterestLists { WaitingForStreamFlowControlCreditsAdapter::new(), ), transmission_counter: 0, + retransmission_counter: 0, transmission_limit: connection_limits.stream_batch_size(), } } @@ -320,12 +322,20 @@ macro_rules! iterate_interruptible { } macro_rules! send_on_transmission_list { - ($sel:ident, $list_name:tt, $link_name:ident, $controller:ident, $func:ident) => { + ( + $sel:ident, + $list_name:ident, + $link_name:ident, + $controller:ident, + $func:ident, + $counter:ident, + $interest_type:pat, + ) => { // Head node gets pushed to the back of the list if it has run out of sending credits - if $sel.interest_lists.transmission_counter >= $sel.interest_lists.transmission_limit { + if $sel.interest_lists.$counter >= $sel.interest_lists.transmission_limit { if let Some(node) = $sel.interest_lists.$list_name.pop_front() { $sel.interest_lists.$list_name.push_back(node); - $sel.interest_lists.transmission_counter = 0; + $sel.interest_lists.$counter = 0; } } @@ -347,13 +357,11 @@ macro_rules! send_on_transmission_list { if head_node { if matches!(result, StreamContainerIterationResult::Continue) { - $sel.interest_lists.transmission_counter += 1; + $sel.interest_lists.$counter += 1; } - if !matches!(interests.transmission, transmission::Interest::NewData) - && !matches!(interests.transmission, transmission::Interest::LostData) - { - $sel.interest_lists.transmission_counter = 0; + if !matches!(interests.transmission, $interest_type) { + $sel.interest_lists.$counter = 0; } head_node = false; } @@ -604,7 +612,9 @@ impl StreamContainer { waiting_for_transmission, waiting_for_transmission_link, controller, - func + func, + transmission_counter, + transmission::Interest::NewData, ); } @@ -642,7 +652,9 @@ impl StreamContainer { waiting_for_retransmission, waiting_for_retransmission_link, controller, - func + func, + retransmission_counter, + transmission::Interest::LostData, ); }