diff --git a/quic/s2n-quic-transport/src/stream/manager.rs b/quic/s2n-quic-transport/src/stream/manager.rs index 49575d99f..191f97e75 100644 --- a/quic/s2n-quic-transport/src/stream/manager.rs +++ b/quic/s2n-quic-transport/src/stream/manager.rs @@ -849,7 +849,7 @@ impl stream::Manager for AbstractStreamManager { transmission::context::RetransmissionContext::new(context); // Prioritize retransmitting lost data - self.inner.streams.iterate_retransmission_list( + self.inner.streams.send_on_retransmission_list( &mut self.inner.stream_controller, |stream: &mut S| { transmit_result = stream.on_transmit(&mut retransmission_context); diff --git a/quic/s2n-quic-transport/src/stream/stream_container.rs b/quic/s2n-quic-transport/src/stream/stream_container.rs index 2bf2510e4..9ef4df9f4 100644 --- a/quic/s2n-quic-transport/src/stream/stream_container.rs +++ b/quic/s2n-quic-transport/src/stream/stream_container.rs @@ -319,6 +319,63 @@ macro_rules! iterate_interruptible { }; } +macro_rules! send_on_transmission_list { + ($sel:ident, $list_name:tt, $link_name:ident, $controller:ident, $func:ident) => { + // Head node gets pushed to the back of the list if it has run out of sending credits + if $sel.interest_lists.transmission_counter >= $sel.interest_lists.transmission_limit { + if let Some(node) = $sel.interest_lists.$list_name.pop_front() { + $sel.interest_lists.$list_name.push_back(node); + $sel.interest_lists.transmission_counter = 0; + } + } + + let mut extracted_list = $sel.interest_lists.$list_name.take(); + let mut cursor = extracted_list.front_mut(); + + let mut head_node = true; + while let Some(stream) = cursor.remove() { + // Note that while we iterate over the intrusive lists here + // `stream` is part of no list anymore, since it also got dropped + // from list that is described by the `cursor`. + debug_assert!(!stream.$link_name.is_linked()); + let mut mut_stream = stream.inner.borrow_mut(); + let result = $func(&mut *mut_stream); + + // Update the interests after the interaction + let interests = mut_stream.get_stream_interests(); + $sel.interest_lists.update_interests(&stream, interests); + + if head_node { + if matches!(result, StreamContainerIterationResult::Continue) { + $sel.interest_lists.transmission_counter += 1; + } + + if !matches!(interests.transmission, transmission::Interest::NewData) + && !matches!(interests.transmission, transmission::Interest::LostData) + { + $sel.interest_lists.transmission_counter = 0; + } + head_node = false; + } + + match result { + StreamContainerIterationResult::BreakAndInsertAtBack => { + $sel.interest_lists + .$list_name + .back_mut() + .splice_after(extracted_list); + break; + } + StreamContainerIterationResult::Continue => {} + } + } + + if !$sel.interest_lists.done_streams.is_empty() { + $sel.finalize_done_streams($controller); + } + }; +} + impl StreamContainer { /// Creates a new `StreamContainer` pub fn new(connection_limits: &connection::Limits) -> Self { @@ -542,56 +599,30 @@ impl StreamContainer { where F: FnMut(&mut S) -> StreamContainerIterationResult, { - // Head node gets pushed to the back of the list if it has run out of sending credits - if self.interest_lists.transmission_counter >= self.interest_lists.transmission_limit { - if let Some(node) = self.interest_lists.waiting_for_transmission.pop_front() { - self.interest_lists.waiting_for_transmission.push_back(node); - self.interest_lists.transmission_counter = 0; - } - } - - let mut extracted_list = self.interest_lists.waiting_for_transmission.take(); - let mut cursor = extracted_list.front_mut(); - - let mut head_node = true; - while let Some(stream) = cursor.remove() { - // Note that while we iterate over the intrusive lists here - // `stream` is part of no list anymore, since it also got dropped - // from list that is described by the `cursor`. - debug_assert!(!stream.waiting_for_transmission_link.is_linked()); - let mut mut_stream = stream.inner.borrow_mut(); - let result = func(&mut *mut_stream); - - // Update the interests after the interaction - let interests = mut_stream.get_stream_interests(); - self.interest_lists.update_interests(&stream, interests); - - if head_node { - if matches!(result, StreamContainerIterationResult::Continue) { - self.interest_lists.transmission_counter += 1; - } - - if !matches!(interests.transmission, transmission::Interest::NewData) { - self.interest_lists.transmission_counter = 0; - } - head_node = false; - } - - match result { - StreamContainerIterationResult::BreakAndInsertAtBack => { - self.interest_lists - .waiting_for_transmission - .back_mut() - .splice_after(extracted_list); - break; - } - StreamContainerIterationResult::Continue => {} - } - } + send_on_transmission_list!( + self, + waiting_for_transmission, + waiting_for_transmission_link, + controller, + func + ); + } - if !self.interest_lists.done_streams.is_empty() { - self.finalize_done_streams(controller); - } + #[cfg(test)] + pub fn iterate_retransmission_list( + &mut self, + controller: &mut stream::Controller, + mut func: F, + ) where + F: FnMut(&mut S) -> StreamContainerIterationResult, + { + iterate_interruptible!( + self, + waiting_for_retransmission, + waiting_for_retransmission_link, + controller, + func + ); } /// Iterates over all `Stream`s which are waiting for retransmission, @@ -599,14 +630,14 @@ impl StreamContainer { /// /// The `stream::Controller` will be notified of streams that have been /// closed to allow for further streams to be opened. - pub fn iterate_retransmission_list( + pub fn send_on_retransmission_list( &mut self, controller: &mut stream::Controller, mut func: F, ) where F: FnMut(&mut S) -> StreamContainerIterationResult, { - iterate_interruptible!( + send_on_transmission_list!( self, waiting_for_retransmission, waiting_for_retransmission_link,