Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(s2n-quic-platform): drain tx packets before closing host #2106

Merged
merged 1 commit into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions quic/s2n-quic-platform/src/io/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ pub use time::now;

pub use bach::task::{self, primary, spawn};

// returns `true` if the caller is being executed in a testing environment
pub fn is_in_env() -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I understand the name, why not something like is_testing_env?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was thinking testing was already in the module path name (io::testing::is_in_env()) so it seemed a bit redundant. But I don't have any strong opinions on the name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, yeah thats fine then

bach::task::scope::try_borrow_with(|scope| scope.is_some())
}

pub mod rand {
pub use ::bach::rand::*;

Expand Down
58 changes: 44 additions & 14 deletions quic/s2n-quic-platform/src/io/testing/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl Buffers {
if !lock.is_open {
return Err(io::Error::new(
io::ErrorKind::ConnectionReset,
"host is closed",
"network is closed",
));
}

Expand Down Expand Up @@ -192,22 +192,23 @@ impl Buffers {
};

let mut queues = vec![];
let mut to_remove = vec![];

// find all of the queues with at least one packet to transmit
for queue in lock.tx.values_mut() {
for (host, queue) in lock.tx.iter_mut() {
if queue.packets.is_empty() {
continue;
}

queues.push(queue);
queues.push((*host, queue));
}

// shuffle the queue so each endpoint has a fair chance of transmitting
super::rand::shuffle(&mut queues);

loop {
'done: loop {
let mut has_result = false;
for queue in &mut queues {
for (host, queue) in &mut queues {
// transmit a single packet at a time per queue so they are fairly
// transmitted
if let Some(packet) = queue.packets.pop_front() {
Expand All @@ -220,16 +221,24 @@ impl Buffers {
}

if result.is_err() {
return;
break 'done;
}
} else if !queue.is_open {
// if the queue is both closed and empty, then remove it
to_remove.push(*host);
}
}

// if all of the queues are empty then just return
if !has_result {
return;
break 'done;
}
}

// clean up any queues that are closed and empty
for host in to_remove {
lock.tx.remove(&host);
}
}

pub fn execute<N: Network>(&self, n: &mut N) {
Expand All @@ -250,13 +259,7 @@ impl Buffers {

pub fn close_host(&mut self, host: HostId) {
if let Ok(mut lock) = self.inner.lock() {
lock.tx.remove(&host);
lock.rx.remove(&host);
if let Some(addrs) = lock.host_to_addr.remove(&host) {
for addr in addrs {
lock.addr_to_host.remove(&addr);
}
}
lock.close_host(host)
}
}

Expand Down Expand Up @@ -301,13 +304,39 @@ impl Default for State {
}
}

impl State {
pub fn close_host(&mut self, host: HostId) {
tracing::trace!(closing = ?host);

if let Some(tx) = self.tx.get_mut(&host) {
// if we don't have any packets remaining, then remove the outgoing packets
// immediately, otherwise mark it closed
if tx.packets.is_empty() {
self.tx.remove(&host);
} else {
tx.is_open = false;
}
}

self.rx.remove(&host);

if let Some(addrs) = self.host_to_addr.remove(&host) {
for addr in addrs {
tracing::trace!(closing = ?addr);
self.addr_to_host.remove(&addr);
}
}
}
}

#[derive(Clone, Debug)]
pub struct Queue {
capacity: usize,
mtu: u16,
packets: VecDeque<Packet>,
local_address: LocalAddress,
waker: Option<Waker>,
is_open: bool,
}

impl Queue {
Expand All @@ -319,6 +348,7 @@ impl Queue {
packets: VecDeque::new(),
local_address,
waker: None,
is_open: true,
}
}

Expand Down
Loading