Skip to content

Commit

Permalink
test(s2n-quic-platform): drain tx packets before closing host (#2106)
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft authored Feb 9, 2024
1 parent f3290b4 commit 9b543eb
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 14 deletions.
5 changes: 5 additions & 0 deletions quic/s2n-quic-platform/src/io/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ pub use time::now;

pub use bach::task::{self, primary, spawn};

// returns `true` if the caller is being executed in a testing environment
pub fn is_in_env() -> bool {
bach::task::scope::try_borrow_with(|scope| scope.is_some())
}

pub mod rand {
pub use ::bach::rand::*;

Expand Down
58 changes: 44 additions & 14 deletions quic/s2n-quic-platform/src/io/testing/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl Buffers {
if !lock.is_open {
return Err(io::Error::new(
io::ErrorKind::ConnectionReset,
"host is closed",
"network is closed",
));
}

Expand Down Expand Up @@ -192,22 +192,23 @@ impl Buffers {
};

let mut queues = vec![];
let mut to_remove = vec![];

// find all of the queues with at least one packet to transmit
for queue in lock.tx.values_mut() {
for (host, queue) in lock.tx.iter_mut() {
if queue.packets.is_empty() {
continue;
}

queues.push(queue);
queues.push((*host, queue));
}

// shuffle the queue so each endpoint has a fair chance of transmitting
super::rand::shuffle(&mut queues);

loop {
'done: loop {
let mut has_result = false;
for queue in &mut queues {
for (host, queue) in &mut queues {
// transmit a single packet at a time per queue so they are fairly
// transmitted
if let Some(packet) = queue.packets.pop_front() {
Expand All @@ -220,16 +221,24 @@ impl Buffers {
}

if result.is_err() {
return;
break 'done;
}
} else if !queue.is_open {
// if the queue is both closed and empty, then remove it
to_remove.push(*host);
}
}

// if all of the queues are empty then just return
if !has_result {
return;
break 'done;
}
}

// clean up any queues that are closed and empty
for host in to_remove {
lock.tx.remove(&host);
}
}

pub fn execute<N: Network>(&self, n: &mut N) {
Expand All @@ -250,13 +259,7 @@ impl Buffers {

pub fn close_host(&mut self, host: HostId) {
if let Ok(mut lock) = self.inner.lock() {
lock.tx.remove(&host);
lock.rx.remove(&host);
if let Some(addrs) = lock.host_to_addr.remove(&host) {
for addr in addrs {
lock.addr_to_host.remove(&addr);
}
}
lock.close_host(host)
}
}

Expand Down Expand Up @@ -301,13 +304,39 @@ impl Default for State {
}
}

impl State {
pub fn close_host(&mut self, host: HostId) {
tracing::trace!(closing = ?host);

if let Some(tx) = self.tx.get_mut(&host) {
// if we don't have any packets remaining, then remove the outgoing packets
// immediately, otherwise mark it closed
if tx.packets.is_empty() {
self.tx.remove(&host);
} else {
tx.is_open = false;
}
}

self.rx.remove(&host);

if let Some(addrs) = self.host_to_addr.remove(&host) {
for addr in addrs {
tracing::trace!(closing = ?addr);
self.addr_to_host.remove(&addr);
}
}
}
}

#[derive(Clone, Debug)]
pub struct Queue {
capacity: usize,
mtu: u16,
packets: VecDeque<Packet>,
local_address: LocalAddress,
waker: Option<Waker>,
is_open: bool,
}

impl Queue {
Expand All @@ -319,6 +348,7 @@ impl Queue {
packets: VecDeque::new(),
local_address,
waker: None,
is_open: true,
}
}

Expand Down

0 comments on commit 9b543eb

Please sign in to comment.