diff --git a/quic/s2n-quic-core/src/connection/limits.rs b/quic/s2n-quic-core/src/connection/limits.rs index bdd877a8c..233c890e5 100644 --- a/quic/s2n-quic-core/src/connection/limits.rs +++ b/quic/s2n-quic-core/src/connection/limits.rs @@ -35,7 +35,7 @@ const MAX_KEEP_ALIVE_PERIOD_DEFAULT: Duration = Duration::from_secs(30); //# received. pub const ANTI_AMPLIFICATION_MULTIPLIER: u8 = 3; -pub const DEFAULT_STREAM_BURST_SIZE: u8 = 1; +pub const DEFAULT_STREAM_BATCH_SIZE: u8 = 1; #[non_exhaustive] #[derive(Debug)] @@ -76,7 +76,7 @@ pub struct Limits { pub(crate) initial_round_trip_time: Duration, pub(crate) migration_support: MigrationSupport, pub(crate) anti_amplification_multiplier: u8, - pub(crate) stream_burst_size: u8, + pub(crate) stream_batch_size: u8, } impl Default for Limits { @@ -123,7 +123,7 @@ impl Limits { initial_round_trip_time: recovery::DEFAULT_INITIAL_RTT, migration_support: MigrationSupport::RECOMMENDED, anti_amplification_multiplier: ANTI_AMPLIFICATION_MULTIPLIER, - stream_burst_size: DEFAULT_STREAM_BURST_SIZE, + stream_batch_size: DEFAULT_STREAM_BATCH_SIZE, } } @@ -226,7 +226,7 @@ impl Limits { max_active_connection_ids, u64 ); - setter!(with_stream_burst_size, stream_burst_size, u8); + setter!(with_stream_batch_size, stream_batch_size, u8); setter!(with_ack_elicitation_interval, ack_elicitation_interval, u8); setter!(with_max_ack_ranges, ack_ranges_limit, u8); setter!( @@ -392,8 +392,8 @@ impl Limits { #[doc(hidden)] #[inline] - pub fn stream_burst_size(&self) -> u8 { - self.stream_burst_size + pub fn stream_batch_size(&self) -> u8 { + self.stream_batch_size } } diff --git a/quic/s2n-quic-transport/src/stream/manager/tests.rs b/quic/s2n-quic-transport/src/stream/manager/tests.rs index 37294e091..cedf8c26e 100644 --- a/quic/s2n-quic-transport/src/stream/manager/tests.rs +++ b/quic/s2n-quic-transport/src/stream/manager/tests.rs @@ -3257,80 +3257,81 @@ fn stream_transmission_fairness_test() { #[test] fn stream_batching_test() { - // Create stream manager with a burst size of 5 - let batch_size = 5; - let limits = ConnectionLimits::default() - .with_stream_burst_size(batch_size) - .unwrap(); + for batch_size in 1..=10 { + dbg!(batch_size); + let limits = ConnectionLimits::default() + .with_stream_batch_size(batch_size) + .unwrap(); - let mut manager = AbstractStreamManager::::new( - &limits, - endpoint::Type::Server, - create_default_initial_flow_control_limits(), - create_default_initial_flow_control_limits(), - DEFAULT_INITIAL_RTT, - ); + let mut manager = AbstractStreamManager::::new( + &limits, + endpoint::Type::Server, + create_default_initial_flow_control_limits(), + create_default_initial_flow_control_limits(), + DEFAULT_INITIAL_RTT, + ); - // Create some open Streams - let mut stream_ids: VecDeque = (0..4) - .map(|_| { - let (accept_waker, _accept_wake_counter) = new_count_waker(); - let (_wakeup_queue, wakeup_handle) = create_wakeup_queue_and_handle(); - let mut token = connection::OpenToken::new(); + // Create some open Streams + let mut stream_ids: VecDeque = (0..4) + .map(|_| { + let (accept_waker, _accept_wake_counter) = new_count_waker(); + let (_wakeup_queue, wakeup_handle) = create_wakeup_queue_and_handle(); + let mut token = connection::OpenToken::new(); - let result = match manager.poll_open_local_stream( - StreamType::Bidirectional, - &mut token, - &mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle), - &Context::from_waker(&accept_waker), - ) { - Poll::Ready(res) => res, - Poll::Pending => Err(connection::Error::unspecified()), - }; - result.unwrap() - }) - .collect(); + let result = match manager.poll_open_local_stream( + StreamType::Bidirectional, + &mut token, + &mut ConnectionApiCallContext::from_wakeup_handle(&wakeup_handle), + &Context::from_waker(&accept_waker), + ) { + Poll::Ready(res) => res, + Poll::Pending => Err(connection::Error::unspecified()), + }; + result.unwrap() + }) + .collect(); - // Create a context that can only fit packets of size 50 - let mut frame_buffer = OutgoingFrameBuffer::new(); - let max_packet_size = 50; - frame_buffer.set_max_packet_size(Some(max_packet_size)); - let mut write_context = MockWriteContext::new( - time::now(), - &mut frame_buffer, - transmission::Constraint::None, - transmission::Mode::Normal, - endpoint::Type::Server, - ); + // Create a context that can only fit packets of size 50 + let mut frame_buffer = OutgoingFrameBuffer::new(); + let max_packet_size = 50; + frame_buffer.set_max_packet_size(Some(max_packet_size)); + let mut write_context = MockWriteContext::new( + time::now(), + &mut frame_buffer, + transmission::Constraint::None, + transmission::Mode::Normal, + endpoint::Type::Server, + ); - const DATA_SIZE: usize = 2000; - let array: [u8; DATA_SIZE] = [1; DATA_SIZE]; + const DATA_SIZE: usize = 2000; + let array: [u8; DATA_SIZE] = [1; DATA_SIZE]; - // Set up each stream to have much more data to send than can fit in our test packet - for stream_id in &stream_ids { - manager - .with_asserted_stream(*stream_id, |stream: &mut stream::StreamImpl| { - let data_to_send = bytes::Bytes::copy_from_slice(&array); - stream.poll_request(ops::Request::default().send(&mut [data_to_send]), None) - }) - .unwrap(); - } - // make sure the order matches creation order - assert_eq!(stream_ids, manager.streams_waiting_for_transmission()); + // Set up each stream to have much more data to send than can fit in our test packet + for stream_id in &stream_ids { + manager + .with_asserted_stream(*stream_id, |stream: &mut stream::StreamImpl| { + let data_to_send = bytes::Bytes::copy_from_slice(&array); + stream.poll_request(ops::Request::default().send(&mut [data_to_send]), None) + }) + .unwrap(); + } + // make sure the order matches creation order + assert_eq!(stream_ids, manager.streams_waiting_for_transmission()); - // Send 40 packets. Each stream gets to be the first to fill up a packet "batch_size" times. - // Then the stream gets sent to the back of the transmission list. - for idx in 1..=40 { - dbg!(idx); - let _ = manager.on_transmit(&mut write_context); + // Send 40 packets. Each stream gets to be the first to fill up a packet "batch_size" times. + // Then the stream gets sent to the back of the transmission list. + for idx in 1..=40 { + dbg!(idx); + let _ = manager.on_transmit(&mut write_context); - assert_eq!(stream_ids, manager.streams_waiting_for_transmission()); + assert_eq!(stream_ids, manager.streams_waiting_for_transmission()); - write_context.frame_buffer.flush(); + write_context.frame_buffer.flush(); - if idx % batch_size == 0 { - // The first stream gets sent to the back of the transmission list once we have sent "batch_size" packets - stream_ids.rotate_left(1); + if idx % batch_size == 0 { + // The first stream gets sent to the back of the transmission list once we have sent "batch_size" packets + stream_ids.rotate_left(1); + } } } } diff --git a/quic/s2n-quic-transport/src/stream/stream_container.rs b/quic/s2n-quic-transport/src/stream/stream_container.rs index f059df698..2bf2510e4 100644 --- a/quic/s2n-quic-transport/src/stream/stream_container.rs +++ b/quic/s2n-quic-transport/src/stream/stream_container.rs @@ -145,8 +145,8 @@ struct InterestLists { /// stream flow control window to increase waiting_for_stream_flow_control_credits: LinkedList>, - counter: u8, - limit: u8, + transmission_counter: u8, + transmission_limit: u8, } impl InterestLists { @@ -162,8 +162,8 @@ impl InterestLists { waiting_for_stream_flow_control_credits: LinkedList::new( WaitingForStreamFlowControlCreditsAdapter::new(), ), - counter: 0, - limit: connection_limits.stream_burst_size(), + transmission_counter: 0, + transmission_limit: connection_limits.stream_batch_size(), } } @@ -543,10 +543,10 @@ impl StreamContainer { F: FnMut(&mut S) -> StreamContainerIterationResult, { // Head node gets pushed to the back of the list if it has run out of sending credits - if self.interest_lists.counter >= self.interest_lists.limit { + if self.interest_lists.transmission_counter >= self.interest_lists.transmission_limit { if let Some(node) = self.interest_lists.waiting_for_transmission.pop_front() { self.interest_lists.waiting_for_transmission.push_back(node); - self.interest_lists.counter = 0; + self.interest_lists.transmission_counter = 0; } } @@ -568,11 +568,11 @@ impl StreamContainer { if head_node { if matches!(result, StreamContainerIterationResult::Continue) { - self.interest_lists.counter += 1; + self.interest_lists.transmission_counter += 1; } if !matches!(interests.transmission, transmission::Interest::NewData) { - self.interest_lists.counter = 0; + self.interest_lists.transmission_counter = 0; } head_node = false; }