Skip to content

Commit

Permalink
Expanded batching logic to retransmissions
Browse files Browse the repository at this point in the history
  • Loading branch information
maddeleine committed Jan 3, 2025
1 parent fb6f2c1 commit aa5a2fa
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 52 deletions.
2 changes: 1 addition & 1 deletion quic/s2n-quic-transport/src/stream/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ impl<S: 'static + StreamTrait> stream::Manager for AbstractStreamManager<S> {
transmission::context::RetransmissionContext::new(context);

// Prioritize retransmitting lost data
self.inner.streams.iterate_retransmission_list(
self.inner.streams.send_on_retransmission_list(
&mut self.inner.stream_controller,
|stream: &mut S| {
transmit_result = stream.on_transmit(&mut retransmission_context);
Expand Down
133 changes: 82 additions & 51 deletions quic/s2n-quic-transport/src/stream/stream_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,63 @@ macro_rules! iterate_interruptible {
};
}

macro_rules! send_on_transmission_list {
($sel:ident, $list_name:tt, $link_name:ident, $controller:ident, $func:ident) => {
// Head node gets pushed to the back of the list if it has run out of sending credits
if $sel.interest_lists.transmission_counter >= $sel.interest_lists.transmission_limit {
if let Some(node) = $sel.interest_lists.$list_name.pop_front() {
$sel.interest_lists.$list_name.push_back(node);
$sel.interest_lists.transmission_counter = 0;
}
}

let mut extracted_list = $sel.interest_lists.$list_name.take();
let mut cursor = extracted_list.front_mut();

let mut head_node = true;
while let Some(stream) = cursor.remove() {
// Note that while we iterate over the intrusive lists here
// `stream` is part of no list anymore, since it also got dropped
// from list that is described by the `cursor`.
debug_assert!(!stream.$link_name.is_linked());
let mut mut_stream = stream.inner.borrow_mut();
let result = $func(&mut *mut_stream);

// Update the interests after the interaction
let interests = mut_stream.get_stream_interests();
$sel.interest_lists.update_interests(&stream, interests);

if head_node {
if matches!(result, StreamContainerIterationResult::Continue) {
$sel.interest_lists.transmission_counter += 1;
}

if !matches!(interests.transmission, transmission::Interest::NewData)
&& !matches!(interests.transmission, transmission::Interest::LostData)
{
$sel.interest_lists.transmission_counter = 0;
}
head_node = false;
}

match result {
StreamContainerIterationResult::BreakAndInsertAtBack => {
$sel.interest_lists
.$list_name
.back_mut()
.splice_after(extracted_list);
break;
}
StreamContainerIterationResult::Continue => {}
}
}

if !$sel.interest_lists.done_streams.is_empty() {
$sel.finalize_done_streams($controller);
}
};
}

impl<S: StreamTrait> StreamContainer<S> {
/// Creates a new `StreamContainer`
pub fn new(connection_limits: &connection::Limits) -> Self {
Expand Down Expand Up @@ -542,71 +599,45 @@ impl<S: StreamTrait> StreamContainer<S> {
where
F: FnMut(&mut S) -> StreamContainerIterationResult,
{
// Head node gets pushed to the back of the list if it has run out of sending credits
if self.interest_lists.transmission_counter >= self.interest_lists.transmission_limit {
if let Some(node) = self.interest_lists.waiting_for_transmission.pop_front() {
self.interest_lists.waiting_for_transmission.push_back(node);
self.interest_lists.transmission_counter = 0;
}
}

let mut extracted_list = self.interest_lists.waiting_for_transmission.take();
let mut cursor = extracted_list.front_mut();

let mut head_node = true;
while let Some(stream) = cursor.remove() {
// Note that while we iterate over the intrusive lists here
// `stream` is part of no list anymore, since it also got dropped
// from list that is described by the `cursor`.
debug_assert!(!stream.waiting_for_transmission_link.is_linked());
let mut mut_stream = stream.inner.borrow_mut();
let result = func(&mut *mut_stream);

// Update the interests after the interaction
let interests = mut_stream.get_stream_interests();
self.interest_lists.update_interests(&stream, interests);

if head_node {
if matches!(result, StreamContainerIterationResult::Continue) {
self.interest_lists.transmission_counter += 1;
}

if !matches!(interests.transmission, transmission::Interest::NewData) {
self.interest_lists.transmission_counter = 0;
}
head_node = false;
}

match result {
StreamContainerIterationResult::BreakAndInsertAtBack => {
self.interest_lists
.waiting_for_transmission
.back_mut()
.splice_after(extracted_list);
break;
}
StreamContainerIterationResult::Continue => {}
}
}
send_on_transmission_list!(
self,
waiting_for_transmission,
waiting_for_transmission_link,
controller,
func
);
}

if !self.interest_lists.done_streams.is_empty() {
self.finalize_done_streams(controller);
}
#[cfg(test)]
pub fn iterate_retransmission_list<F>(
&mut self,
controller: &mut stream::Controller,
mut func: F,
) where
F: FnMut(&mut S) -> StreamContainerIterationResult,
{
iterate_interruptible!(
self,
waiting_for_retransmission,
waiting_for_retransmission_link,
controller,
func
);
}

/// Iterates over all `Stream`s which are waiting for retransmission,
/// and executes the given function on each `Stream`
///
/// The `stream::Controller` will be notified of streams that have been
/// closed to allow for further streams to be opened.
pub fn iterate_retransmission_list<F>(
pub fn send_on_retransmission_list<F>(
&mut self,
controller: &mut stream::Controller,
mut func: F,
) where
F: FnMut(&mut S) -> StreamContainerIterationResult,
{
iterate_interruptible!(
send_on_transmission_list!(
self,
waiting_for_retransmission,
waiting_for_retransmission_link,
Expand Down

0 comments on commit aa5a2fa

Please sign in to comment.