diff --git a/quic/s2n-quic-platform/src/io/testing.rs b/quic/s2n-quic-platform/src/io/testing.rs index 8e6ae5e0a9..2ef87829f5 100644 --- a/quic/s2n-quic-platform/src/io/testing.rs +++ b/quic/s2n-quic-platform/src/io/testing.rs @@ -23,6 +23,11 @@ pub use time::now; pub use bach::task::{self, primary, spawn}; +// returns `true` if the caller is being executed in a testing environment +pub fn is_in_env() -> bool { + bach::task::scope::try_borrow_with(|scope| scope.is_some()) +} + pub mod rand { pub use ::bach::rand::*; diff --git a/quic/s2n-quic-platform/src/io/testing/network.rs b/quic/s2n-quic-platform/src/io/testing/network.rs index b37aa33029..ae70f29216 100644 --- a/quic/s2n-quic-platform/src/io/testing/network.rs +++ b/quic/s2n-quic-platform/src/io/testing/network.rs @@ -130,7 +130,7 @@ impl Buffers { if !lock.is_open { return Err(io::Error::new( io::ErrorKind::ConnectionReset, - "host is closed", + "network is closed", )); } @@ -192,22 +192,23 @@ impl Buffers { }; let mut queues = vec![]; + let mut to_remove = vec![]; // find all of the queues with at least one packet to transmit - for queue in lock.tx.values_mut() { + for (host, queue) in lock.tx.iter_mut() { if queue.packets.is_empty() { continue; } - queues.push(queue); + queues.push((*host, queue)); } // shuffle the queue so each endpoint has a fair chance of transmitting super::rand::shuffle(&mut queues); - loop { + 'done: loop { let mut has_result = false; - for queue in &mut queues { + for (host, queue) in &mut queues { // transmit a single packet at a time per queue so they are fairly // transmitted if let Some(packet) = queue.packets.pop_front() { @@ -220,16 +221,24 @@ impl Buffers { } if result.is_err() { - return; + break 'done; } + } else if !queue.is_open { + // if the queue is both closed and empty, then remove it + to_remove.push(*host); } } // if all of the queues are empty then just return if !has_result { - return; + break 'done; } } + + // clean up any queues that are closed and empty + for host in to_remove { + lock.tx.remove(&host); + } } pub fn execute(&self, n: &mut N) { @@ -250,13 +259,7 @@ impl Buffers { pub fn close_host(&mut self, host: HostId) { if let Ok(mut lock) = self.inner.lock() { - lock.tx.remove(&host); - lock.rx.remove(&host); - if let Some(addrs) = lock.host_to_addr.remove(&host) { - for addr in addrs { - lock.addr_to_host.remove(&addr); - } - } + lock.close_host(host) } } @@ -301,6 +304,31 @@ impl Default for State { } } +impl State { + pub fn close_host(&mut self, host: HostId) { + tracing::trace!(closing = ?host); + + if let Some(tx) = self.tx.get_mut(&host) { + // if we don't have any packets remaining, then remove the outgoing packets + // immediately, otherwise mark it closed + if tx.packets.is_empty() { + self.tx.remove(&host); + } else { + tx.is_open = false; + } + } + + self.rx.remove(&host); + + if let Some(addrs) = self.host_to_addr.remove(&host) { + for addr in addrs { + tracing::trace!(closing = ?addr); + self.addr_to_host.remove(&addr); + } + } + } +} + #[derive(Clone, Debug)] pub struct Queue { capacity: usize, @@ -308,6 +336,7 @@ pub struct Queue { packets: VecDeque, local_address: LocalAddress, waker: Option, + is_open: bool, } impl Queue { @@ -319,6 +348,7 @@ impl Queue { packets: VecDeque::new(), local_address, waker: None, + is_open: true, } }