From fe179c918415d6c251384b24fdd6204060265431 Mon Sep 17 00:00:00 2001 From: Ruihan Li Date: Fri, 24 May 2024 11:41:40 +0800 Subject: [PATCH 1/2] Don't delay ACKs for significant window updates --- src/socket/tcp.rs | 101 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 96 insertions(+), 5 deletions(-) diff --git a/src/socket/tcp.rs b/src/socket/tcp.rs index 267f10729..e53cd6101 100644 --- a/src/socket/tcp.rs +++ b/src/socket/tcp.rs @@ -656,7 +656,6 @@ impl<'a> Socket<'a> { /// Return the current window field value, including scaling according to RFC 1323. /// /// Used in internal calculations as well as packet generation. - /// #[inline] fn scaled_window(&self) -> u16 { cmp::min( @@ -665,6 +664,25 @@ impl<'a> Socket<'a> { ) as u16 } + /// Return the last window field value, including scaling according to RFC 1323. + /// + /// Used in internal calculations as well as packet generation. + /// + /// Unlike `remote_last_win`, we take into account new packets received (but not acknowledged) + /// since the last window update and adjust the window length accordingly. This ensures a fair + /// comparison between the last window length and the new window length we're going to + /// advertise. + #[inline] + fn last_scaled_window(&self) -> Option { + let last_ack = self.remote_last_ack?; + let next_ack = self.remote_seq_no + self.rx_buffer.len(); + + let last_win = (self.remote_last_win as usize) << self.remote_win_shift; + let last_win_adjusted = last_ack + last_win - next_ack; + + Some(cmp::min(last_win_adjusted >> self.remote_win_shift, (1 << 16) - 1) as u16) + } + /// Set the timeout duration. /// /// A socket with a timeout duration set will abort the connection if either of the following @@ -2130,13 +2148,26 @@ impl<'a> Socket<'a> { } } + /// Return whether we should send ACK immediately due to significant window updates. + /// + /// ACKs with significant window updates should be sent immediately to let the sender know that + /// more data can be sent. According to the Linux kernel implementation, "significant" means + /// doubling the receive window. The Linux kernel implementation can be found at + /// . fn window_to_update(&self) -> bool { match self.state { State::SynSent | State::SynReceived | State::Established | State::FinWait1 - | State::FinWait2 => self.scaled_window() > self.remote_last_win, + | State::FinWait2 => { + let new_win = self.scaled_window(); + if let Some(last_win) = self.last_scaled_window() { + new_win > 0 && new_win / 2 >= last_win + } else { + false + } + } _ => false, } } @@ -2202,7 +2233,7 @@ impl<'a> Socket<'a> { } else if self.ack_to_transmit() && self.delayed_ack_expired(cx.now()) { // If we have data to acknowledge, do it. tcp_trace!("outgoing segment will acknowledge"); - } else if self.window_to_update() && self.delayed_ack_expired(cx.now()) { + } else if self.window_to_update() { // If we have window length increase to advertise, do it. tcp_trace!("outgoing segment will update window"); } else if self.state == State::Closed { @@ -2452,8 +2483,11 @@ impl<'a> Socket<'a> { } else if self.seq_to_transmit(cx) { // We have a data or flag packet to transmit. PollAt::Now + } else if self.window_to_update() { + // The receive window has been raised significantly. + PollAt::Now } else { - let want_ack = self.ack_to_transmit() || self.window_to_update(); + let want_ack = self.ack_to_transmit(); let delayed_ack_poll_at = match (want_ack, self.ack_delay_timer) { (false, _) => PollAt::Ingress, @@ -2785,7 +2819,7 @@ mod test { s.local_seq_no = LOCAL_SEQ + 1; s.remote_last_seq = LOCAL_SEQ + 1; s.remote_last_ack = Some(REMOTE_SEQ + 1); - s.remote_last_win = 64; + s.remote_last_win = s.scaled_window(); s } @@ -6325,6 +6359,63 @@ mod test { })); } + #[test] + fn test_window_update_with_delay_ack() { + let mut s = socket_established_with_buffer_sizes(6, 6); + s.ack_delay = Some(Duration::from_millis(10)); + + send!( + s, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + payload: &b"abcdef"[..], + ..SEND_TEMPL + } + ); + + recv_nothing!(s, time 5); + + s.recv(|buffer| { + assert_eq!(&buffer[..2], b"ab"); + (2, ()) + }) + .unwrap(); + recv!( + s, + time 5, + Ok(TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1 + 6), + window_len: 2, + ..RECV_TEMPL + }) + ); + + s.recv(|buffer| { + assert_eq!(&buffer[..1], b"c"); + (1, ()) + }) + .unwrap(); + recv_nothing!(s, time 5); + + s.recv(|buffer| { + assert_eq!(&buffer[..1], b"d"); + (1, ()) + }) + .unwrap(); + recv!( + s, + time 5, + Ok(TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1 + 6), + window_len: 4, + ..RECV_TEMPL + }) + ); + } + #[test] fn test_fill_peer_window() { let mut s = socket_established(); From bfbff600ae57a48cf77e4d215a8e2edb037d2fff Mon Sep 17 00:00:00 2001 From: Ruihan Li Date: Sat, 13 Jul 2024 02:10:40 +0800 Subject: [PATCH 2/2] Add the `loopback_benchmark` --- Cargo.toml | 4 ++ README.md | 24 ++++++++ examples/loopback_benchmark.rs | 101 +++++++++++++++++++++++++++++++++ 3 files changed, 129 insertions(+) create mode 100644 examples/loopback_benchmark.rs diff --git a/Cargo.toml b/Cargo.toml index 12fe1f305..99bbdcee8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -292,6 +292,10 @@ required-features = ["std", "medium-ethernet", "medium-ip", "phy-tuntap_interfac name = "loopback" required-features = ["log", "medium-ethernet", "proto-ipv4", "socket-tcp"] +[[example]] +name = "loopback_benchmark" +required-features = ["std", "log", "medium-ethernet", "proto-ipv4", "socket-tcp"] + [[example]] name = "multicast" required-features = ["std", "medium-ethernet", "medium-ip", "phy-tuntap_interface", "proto-ipv4", "proto-igmp", "socket-udp"] diff --git a/README.md b/README.md index b34eef9fd..20c1a301a 100644 --- a/README.md +++ b/README.md @@ -553,6 +553,30 @@ is possible; otherwise, nothing at all will be displayed and no options are acce [wireshark]: https://wireshark.org +### examples/loopback\_benchmark.rs + +_examples/loopback_benchmark.rs_ is another simple throughput benchmark. + +Read its [source code](/examples/loopback_benchmark.rs), then run it as: + +```sh +cargo run --release --example loopback_benchmark +``` + +It establishes a connection to itself via a loopback interface and transfers a large amount +of data in one direction. + +A typical result (achieved on a Intel Core i5-13500H CPU and a Linux 6.9.9 x86_64 kernel running +on a LENOVO XiaoXinPro 14 IRH8 laptop) is as follows: + +``` +$ cargo run --release --example loopback_benchmark +done in 0.558 s, bandwidth is 15.395083 Gbps +``` + +Note: Although the loopback interface can be used in bare-metal environments, +this benchmark _does_ rely on `std` to be able to measure the time cost. + ## License _smoltcp_ is distributed under the terms of 0-clause BSD license. diff --git a/examples/loopback_benchmark.rs b/examples/loopback_benchmark.rs new file mode 100644 index 000000000..f49602ffc --- /dev/null +++ b/examples/loopback_benchmark.rs @@ -0,0 +1,101 @@ +mod utils; + +use log::debug; + +use smoltcp::iface::{Config, Interface, SocketSet}; +use smoltcp::phy::{Device, Loopback, Medium}; +use smoltcp::socket::tcp; +use smoltcp::time::Instant; +use smoltcp::wire::{EthernetAddress, IpAddress, IpCidr}; + +fn main() { + let device = Loopback::new(Medium::Ethernet); + + let mut device = { + utils::setup_logging("info"); + + let (mut opts, mut free) = utils::create_options(); + utils::add_middleware_options(&mut opts, &mut free); + + let mut matches = utils::parse_options(&opts, free); + utils::parse_middleware_options(&mut matches, device, /*loopback=*/ true) + }; + + // Create interface + let config = match device.capabilities().medium { + Medium::Ethernet => { + Config::new(EthernetAddress([0x02, 0x00, 0x00, 0x00, 0x00, 0x01]).into()) + } + Medium::Ip => Config::new(smoltcp::wire::HardwareAddress::Ip), + Medium::Ieee802154 => todo!(), + }; + + let mut iface = Interface::new(config, &mut device, Instant::now()); + iface.update_ip_addrs(|ip_addrs| { + ip_addrs + .push(IpCidr::new(IpAddress::v4(127, 0, 0, 1), 8)) + .unwrap(); + }); + + // Create sockets + let server_socket = { + let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 65536]); + let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 65536]); + tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer) + }; + + let client_socket = { + let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 65536]); + let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 65536]); + tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer) + }; + + let mut sockets: [_; 2] = Default::default(); + let mut sockets = SocketSet::new(&mut sockets[..]); + let server_handle = sockets.add(server_socket); + let client_handle = sockets.add(client_socket); + + let start_time = Instant::now(); + + let mut did_listen = false; + let mut did_connect = false; + let mut processed = 0; + while processed < 1024 * 1024 * 1024 { + iface.poll(Instant::now(), &mut device, &mut sockets); + + let socket = sockets.get_mut::(server_handle); + if !socket.is_active() && !socket.is_listening() && !did_listen { + debug!("listening"); + socket.listen(1234).unwrap(); + did_listen = true; + } + + while socket.can_recv() { + let received = socket.recv(|buffer| (buffer.len(), buffer.len())).unwrap(); + debug!("got {:?}", received,); + processed += received; + } + + let socket = sockets.get_mut::(client_handle); + let cx = iface.context(); + if !socket.is_open() && !did_connect { + debug!("connecting"); + socket + .connect(cx, (IpAddress::v4(127, 0, 0, 1), 1234), 65000) + .unwrap(); + did_connect = true; + } + + while socket.can_send() { + debug!("sending"); + socket.send(|buffer| (buffer.len(), ())).unwrap(); + } + } + + let duration = Instant::now() - start_time; + println!( + "done in {} s, bandwidth is {} Gbps", + duration.total_millis() as f64 / 1000.0, + (processed as u64 * 8 / duration.total_millis()) as f64 / 1000000.0 + ); +}