Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't delay ACKs for significant window updates #935

Merged
merged 2 commits into from
Jul 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,10 @@ required-features = ["std", "medium-ethernet", "medium-ip", "phy-tuntap_interfac
name = "loopback"
required-features = ["log", "medium-ethernet", "proto-ipv4", "socket-tcp"]

[[example]]
name = "loopback_benchmark"
required-features = ["std", "log", "medium-ethernet", "proto-ipv4", "socket-tcp"]

[[example]]
name = "multicast"
required-features = ["std", "medium-ethernet", "medium-ip", "phy-tuntap_interface", "proto-ipv4", "proto-igmp", "socket-udp"]
Expand Down
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,30 @@ is possible; otherwise, nothing at all will be displayed and no options are acce

[wireshark]: https://wireshark.org

### examples/loopback\_benchmark.rs

_examples/loopback_benchmark.rs_ is another simple throughput benchmark.

Read its [source code](/examples/loopback_benchmark.rs), then run it as:

```sh
cargo run --release --example loopback_benchmark
```

It establishes a connection to itself via a loopback interface and transfers a large amount
of data in one direction.

A typical result (achieved on a Intel Core i5-13500H CPU and a Linux 6.9.9 x86_64 kernel running
on a LENOVO XiaoXinPro 14 IRH8 laptop) is as follows:

```
$ cargo run --release --example loopback_benchmark
done in 0.558 s, bandwidth is 15.395083 Gbps
```

Note: Although the loopback interface can be used in bare-metal environments,
this benchmark _does_ rely on `std` to be able to measure the time cost.

## License

_smoltcp_ is distributed under the terms of 0-clause BSD license.
Expand Down
101 changes: 101 additions & 0 deletions examples/loopback_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
mod utils;

use log::debug;

use smoltcp::iface::{Config, Interface, SocketSet};
use smoltcp::phy::{Device, Loopback, Medium};
use smoltcp::socket::tcp;
use smoltcp::time::Instant;
use smoltcp::wire::{EthernetAddress, IpAddress, IpCidr};

fn main() {
let device = Loopback::new(Medium::Ethernet);

let mut device = {
utils::setup_logging("info");

let (mut opts, mut free) = utils::create_options();
utils::add_middleware_options(&mut opts, &mut free);

let mut matches = utils::parse_options(&opts, free);
utils::parse_middleware_options(&mut matches, device, /*loopback=*/ true)
};

// Create interface
let config = match device.capabilities().medium {
Medium::Ethernet => {
Config::new(EthernetAddress([0x02, 0x00, 0x00, 0x00, 0x00, 0x01]).into())
}
Medium::Ip => Config::new(smoltcp::wire::HardwareAddress::Ip),
Medium::Ieee802154 => todo!(),
};

let mut iface = Interface::new(config, &mut device, Instant::now());
iface.update_ip_addrs(|ip_addrs| {
ip_addrs
.push(IpCidr::new(IpAddress::v4(127, 0, 0, 1), 8))
.unwrap();
});

// Create sockets
let server_socket = {
let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 65536]);
let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 65536]);
tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer)
};

let client_socket = {
let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 65536]);
let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 65536]);
tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer)
};

let mut sockets: [_; 2] = Default::default();
let mut sockets = SocketSet::new(&mut sockets[..]);
let server_handle = sockets.add(server_socket);
let client_handle = sockets.add(client_socket);

let start_time = Instant::now();

let mut did_listen = false;
let mut did_connect = false;
let mut processed = 0;
while processed < 1024 * 1024 * 1024 {
iface.poll(Instant::now(), &mut device, &mut sockets);

let socket = sockets.get_mut::<tcp::Socket>(server_handle);
if !socket.is_active() && !socket.is_listening() && !did_listen {
debug!("listening");
socket.listen(1234).unwrap();
did_listen = true;
}

while socket.can_recv() {
let received = socket.recv(|buffer| (buffer.len(), buffer.len())).unwrap();
debug!("got {:?}", received,);
processed += received;
}

let socket = sockets.get_mut::<tcp::Socket>(client_handle);
let cx = iface.context();
if !socket.is_open() && !did_connect {
debug!("connecting");
socket
.connect(cx, (IpAddress::v4(127, 0, 0, 1), 1234), 65000)
.unwrap();
did_connect = true;
}

while socket.can_send() {
debug!("sending");
socket.send(|buffer| (buffer.len(), ())).unwrap();
}
}

let duration = Instant::now() - start_time;
println!(
"done in {} s, bandwidth is {} Gbps",
duration.total_millis() as f64 / 1000.0,
(processed as u64 * 8 / duration.total_millis()) as f64 / 1000000.0
);
}
101 changes: 96 additions & 5 deletions src/socket/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,6 @@ impl<'a> Socket<'a> {
/// Return the current window field value, including scaling according to RFC 1323.
///
/// Used in internal calculations as well as packet generation.
///
#[inline]
fn scaled_window(&self) -> u16 {
cmp::min(
Expand All @@ -665,6 +664,25 @@ impl<'a> Socket<'a> {
) as u16
}

/// Return the last window field value, including scaling according to RFC 1323.
///
/// Used in internal calculations as well as packet generation.
///
/// Unlike `remote_last_win`, we take into account new packets received (but not acknowledged)
/// since the last window update and adjust the window length accordingly. This ensures a fair
/// comparison between the last window length and the new window length we're going to
/// advertise.
#[inline]
fn last_scaled_window(&self) -> Option<u16> {
let last_ack = self.remote_last_ack?;
let next_ack = self.remote_seq_no + self.rx_buffer.len();

let last_win = (self.remote_last_win as usize) << self.remote_win_shift;
let last_win_adjusted = last_ack + last_win - next_ack;

Some(cmp::min(last_win_adjusted >> self.remote_win_shift, (1 << 16) - 1) as u16)
}

/// Set the timeout duration.
///
/// A socket with a timeout duration set will abort the connection if either of the following
Expand Down Expand Up @@ -2130,13 +2148,26 @@ impl<'a> Socket<'a> {
}
}

/// Return whether we should send ACK immediately due to significant window updates.
///
/// ACKs with significant window updates should be sent immediately to let the sender know that
/// more data can be sent. According to the Linux kernel implementation, "significant" means
/// doubling the receive window. The Linux kernel implementation can be found at
/// <https://elixir.bootlin.com/linux/v6.9.9/source/net/ipv4/tcp.c#L1472>.
fn window_to_update(&self) -> bool {
match self.state {
State::SynSent
| State::SynReceived
| State::Established
| State::FinWait1
| State::FinWait2 => self.scaled_window() > self.remote_last_win,
| State::FinWait2 => {
let new_win = self.scaled_window();
if let Some(last_win) = self.last_scaled_window() {
new_win > 0 && new_win / 2 >= last_win
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment here explaining the calculation (the definition of "significantly" from the PR body)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} else {
false
}
}
_ => false,
}
}
Expand Down Expand Up @@ -2202,7 +2233,7 @@ impl<'a> Socket<'a> {
} else if self.ack_to_transmit() && self.delayed_ack_expired(cx.now()) {
// If we have data to acknowledge, do it.
tcp_trace!("outgoing segment will acknowledge");
} else if self.window_to_update() && self.delayed_ack_expired(cx.now()) {
} else if self.window_to_update() {
// If we have window length increase to advertise, do it.
tcp_trace!("outgoing segment will update window");
} else if self.state == State::Closed {
Expand Down Expand Up @@ -2452,8 +2483,11 @@ impl<'a> Socket<'a> {
} else if self.seq_to_transmit(cx) {
// We have a data or flag packet to transmit.
PollAt::Now
} else if self.window_to_update() {
// The receive window has been raised significantly.
PollAt::Now
} else {
let want_ack = self.ack_to_transmit() || self.window_to_update();
let want_ack = self.ack_to_transmit();

let delayed_ack_poll_at = match (want_ack, self.ack_delay_timer) {
(false, _) => PollAt::Ingress,
Expand Down Expand Up @@ -2785,7 +2819,7 @@ mod test {
s.local_seq_no = LOCAL_SEQ + 1;
s.remote_last_seq = LOCAL_SEQ + 1;
s.remote_last_ack = Some(REMOTE_SEQ + 1);
s.remote_last_win = 64;
s.remote_last_win = s.scaled_window();
s
}

Expand Down Expand Up @@ -6325,6 +6359,63 @@ mod test {
}));
}

#[test]
fn test_window_update_with_delay_ack() {
let mut s = socket_established_with_buffer_sizes(6, 6);
s.ack_delay = Some(Duration::from_millis(10));

send!(
s,
TcpRepr {
seq_number: REMOTE_SEQ + 1,
ack_number: Some(LOCAL_SEQ + 1),
payload: &b"abcdef"[..],
..SEND_TEMPL
}
);

recv_nothing!(s, time 5);

s.recv(|buffer| {
assert_eq!(&buffer[..2], b"ab");
(2, ())
})
.unwrap();
recv!(
s,
time 5,
Ok(TcpRepr {
seq_number: LOCAL_SEQ + 1,
ack_number: Some(REMOTE_SEQ + 1 + 6),
window_len: 2,
..RECV_TEMPL
})
);

s.recv(|buffer| {
assert_eq!(&buffer[..1], b"c");
(1, ())
})
.unwrap();
recv_nothing!(s, time 5);

s.recv(|buffer| {
assert_eq!(&buffer[..1], b"d");
(1, ())
})
.unwrap();
recv!(
s,
time 5,
Ok(TcpRepr {
seq_number: LOCAL_SEQ + 1,
ack_number: Some(REMOTE_SEQ + 1 + 6),
window_len: 4,
..RECV_TEMPL
})
);
}

#[test]
fn test_fill_peer_window() {
let mut s = socket_established();
Expand Down
Loading