diff --git a/src/socket/tcp.rs b/src/socket/tcp.rs index 6636a8485..e33c31216 100644 --- a/src/socket/tcp.rs +++ b/src/socket/tcp.rs @@ -277,10 +277,20 @@ impl RttEstimator { #[derive(Debug, Clone, Copy, PartialEq)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] enum Timer { - Idle { keep_alive_at: Option }, - Retransmit { expires_at: Instant }, + Idle { + keep_alive_at: Option, + }, + Retransmit { + expires_at: Instant, + }, FastRetransmit, - Close { expires_at: Instant }, + ZeroWindowProbe { + expires_at: Instant, + delay: Duration, + }, + Close { + expires_at: Instant, + }, } const ACK_DELAY_DEFAULT: Duration = Duration::from_millis(10); @@ -317,6 +327,13 @@ impl Timer { } } + fn should_zero_window_probe(&self, timestamp: Instant) -> bool { + match *self { + Timer::ZeroWindowProbe { expires_at, .. } if timestamp >= expires_at => true, + _ => false, + } + } + fn poll_at(&self) -> PollAt { match *self { Timer::Idle { @@ -325,6 +342,7 @@ impl Timer { Timer::Idle { keep_alive_at: None, } => PollAt::Ingress, + Timer::ZeroWindowProbe { expires_at, .. } => PollAt::Time(expires_at), Timer::Retransmit { expires_at, .. } => PollAt::Time(expires_at), Timer::FastRetransmit => PollAt::Now, Timer::Close { expires_at } => PollAt::Time(expires_at), @@ -353,7 +371,10 @@ impl Timer { fn set_for_retransmit(&mut self, timestamp: Instant, delay: Duration) { match *self { - Timer::Idle { .. } | Timer::FastRetransmit { .. } | Timer::Retransmit { .. } => { + Timer::Idle { .. } + | Timer::FastRetransmit { .. } + | Timer::Retransmit { .. } + | Timer::ZeroWindowProbe { .. } => { *self = Timer::Retransmit { expires_at: timestamp + delay, } @@ -372,12 +393,34 @@ impl Timer { } } - fn is_retransmit(&self) -> bool { - match *self { - Timer::Retransmit { .. } | Timer::FastRetransmit => true, - _ => false, + fn set_for_zero_window_probe(&mut self, timestamp: Instant, delay: Duration) { + *self = Timer::ZeroWindowProbe { + expires_at: timestamp + delay, + delay, + } + } + + fn rewind_zero_window_probe(&mut self, timestamp: Instant) { + if let Timer::ZeroWindowProbe { mut delay, .. } = *self { + delay = (delay * 2).min(Duration::from_millis(RTTE_MAX_RTO as _)); + *self = Timer::ZeroWindowProbe { + expires_at: timestamp + delay, + delay, + } } } + + fn is_idle(&self) -> bool { + matches!(self, Timer::Idle { .. }) + } + + fn is_zero_window_probe(&self) -> bool { + matches!(self, Timer::ZeroWindowProbe { .. }) + } + + fn is_retransmit(&self) -> bool { + matches!(self, Timer::Retransmit { .. } | Timer::FastRetransmit) + } } #[derive(Debug, PartialEq, Eq, Clone, Copy)] @@ -1182,6 +1225,17 @@ impl<'a> Socket<'a> { self.remote_last_ts = None } + // if remote win is zero and we go from having no data to some data pending to + // send, start the zero window probe timer. + if self.remote_win_len == 0 && self.timer.is_idle() { + let delay = self.rtte.retransmission_timeout(); + tcp_trace!("starting zero-window-probe timer for t+{}", delay); + + // We don't have access to the current time here, so use Instant::ZERO instead. + // this will cause the first ZWP to be sent immediately, but that's okay. + self.timer.set_for_zero_window_probe(Instant::ZERO, delay); + } + #[cfg(any(test, feature = "verbose"))] tcp_trace!( "tx buffer: enqueueing {} octets (now {})", @@ -2032,6 +2086,20 @@ impl<'a> Socket<'a> { _ => {} } + // start/stop the Zero Window Probe timer. + if self.remote_win_len == 0 + && !self.tx_buffer.is_empty() + && (self.timer.is_idle() || ack_len > 0) + { + let delay = self.rtte.retransmission_timeout(); + tcp_trace!("starting zero-window-probe timer for t+{}", delay); + self.timer.set_for_zero_window_probe(cx.now(), delay); + } + if self.remote_win_len != 0 && self.timer.is_zero_window_probe() { + tcp_trace!("stopping zero-window-probe timer"); + self.timer.set_for_idle(cx.now(), self.keep_alive); + } + let payload_len = payload.len(); if payload_len == 0 { return None; @@ -2318,6 +2386,8 @@ impl<'a> Socket<'a> { } else if self.timer.should_keep_alive(cx.now()) { // If we need to transmit a keep-alive packet, do it. tcp_trace!("keep-alive timer expired"); + } else if self.timer.should_zero_window_probe(cx.now()) { + tcp_trace!("sending zero-window probe"); } else if self.timer.should_close(cx.now()) { // If we have spent enough time in the TIME-WAIT state, close the socket. tcp_trace!("TIME-WAIT timer expired"); @@ -2360,6 +2430,8 @@ impl<'a> Socket<'a> { payload: &[], }; + let mut is_zero_window_probe = false; + match self.state { // We transmit an RST in the CLOSED state. If we ended up in the CLOSED state // with a specified endpoint, it means that the socket was aborted. @@ -2401,7 +2473,7 @@ impl<'a> Socket<'a> { let win_right_edge = self.local_seq_no + self.remote_win_len; // Max amount of octets we're allowed to send according to the remote window. - let win_limit = if win_right_edge >= self.remote_last_seq { + let mut win_limit = if win_right_edge >= self.remote_last_seq { win_right_edge - self.remote_last_seq } else { // This can happen if we've sent some data and later the remote side @@ -2412,6 +2484,12 @@ impl<'a> Socket<'a> { 0 }; + // To send a zero-window-probe, force the window limit to at least 1 byte. + if win_limit == 0 && self.timer.should_zero_window_probe(cx.now()) { + win_limit = 1; + is_zero_window_probe = true; + } + // Maximum size we're allowed to send. This can be limited by 3 factors: // 1. remote window // 2. MSS the remote is willing to accept, probably determined by their MTU @@ -2510,6 +2588,12 @@ impl<'a> Socket<'a> { } self.ack_delay_timer = AckDelayTimer::Idle; + // Leave the rest of the state intact if sending a zero-window probe. + if is_zero_window_probe { + self.timer.rewind_zero_window_probe(cx.now()); + return Ok(()); + } + // Leave the rest of the state intact if sending a keep-alive packet, since those // carry a fake segment. if is_keep_alive { @@ -7068,6 +7152,312 @@ mod test { assert!(s.window_to_update()); } + // =========================================================================================// + // Tests for zero-window probes. + // =========================================================================================// + + #[test] + fn test_zero_window_probe_enter_on_win_update() { + let mut s = socket_established(); + + assert!(!s.timer.is_zero_window_probe()); + + s.send_slice(b"abcdef123456!@#$%^").unwrap(); + + assert!(!s.timer.is_zero_window_probe()); + + send!( + s, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + window_len: 0, + ..SEND_TEMPL + } + ); + + assert!(s.timer.is_zero_window_probe()); + } + + #[test] + fn test_zero_window_probe_enter_on_send() { + let mut s = socket_established(); + + send!( + s, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + window_len: 0, + ..SEND_TEMPL + } + ); + + assert!(!s.timer.is_zero_window_probe()); + + s.send_slice(b"abcdef123456!@#$%^").unwrap(); + + assert!(s.timer.is_zero_window_probe()); + } + + #[test] + fn test_zero_window_probe_exit() { + let mut s = socket_established(); + + s.send_slice(b"abcdef123456!@#$%^").unwrap(); + + assert!(!s.timer.is_zero_window_probe()); + + send!( + s, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + window_len: 0, + ..SEND_TEMPL + } + ); + + assert!(s.timer.is_zero_window_probe()); + + send!( + s, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + window_len: 6, + ..SEND_TEMPL + } + ); + + assert!(!s.timer.is_zero_window_probe()); + } + + #[test] + fn test_zero_window_probe_exit_ack() { + let mut s = socket_established(); + + s.send_slice(b"abcdef123456!@#$%^").unwrap(); + send!( + s, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + window_len: 0, + ..SEND_TEMPL + } + ); + + recv!( + s, + time 1000, + [TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"a"[..], + ..RECV_TEMPL + }] + ); + + send!( + s, + time 1010, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 2), + window_len: 6, + ..SEND_TEMPL + } + ); + + recv!( + s, + time 1010, + [TcpRepr { + seq_number: LOCAL_SEQ + 2, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"bcdef1"[..], + ..RECV_TEMPL + }] + ); + } + + #[test] + fn test_zero_window_probe_backoff_nack_reply() { + let mut s = socket_established(); + s.send_slice(b"abcdef123456!@#$%^").unwrap(); + send!( + s, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + window_len: 0, + ..SEND_TEMPL + } + ); + + recv_nothing!(s, time 999); + recv!( + s, + time 1000, + [TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"a"[..], + ..RECV_TEMPL + }] + ); + send!( + s, + time 1100, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + window_len: 0, + ..SEND_TEMPL + } + ); + + recv_nothing!(s, time 2999); + recv!( + s, + time 3000, + [TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"a"[..], + ..RECV_TEMPL + }] + ); + send!( + s, + time 3100, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + window_len: 0, + ..SEND_TEMPL + } + ); + + recv_nothing!(s, time 6999); + recv!( + s, + time 7000, + [TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"a"[..], + ..RECV_TEMPL + }] + ); + } + + #[test] + fn test_zero_window_probe_backoff_no_reply() { + let mut s = socket_established(); + s.send_slice(b"abcdef123456!@#$%^").unwrap(); + send!( + s, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + window_len: 0, + ..SEND_TEMPL + } + ); + + recv_nothing!(s, time 999); + recv!( + s, + time 1000, + [TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"a"[..], + ..RECV_TEMPL + }] + ); + + recv_nothing!(s, time 2999); + recv!( + s, + time 3000, + [TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"a"[..], + ..RECV_TEMPL + }] + ); + } + + #[test] + fn test_zero_window_probe_shift() { + let mut s = socket_established(); + + s.send_slice(b"abcdef123456!@#$%^").unwrap(); + send!( + s, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + window_len: 0, + ..SEND_TEMPL + } + ); + + recv_nothing!(s, time 999); + recv!( + s, + time 1000, + [TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"a"[..], + ..RECV_TEMPL + }] + ); + + recv_nothing!(s, time 2999); + recv!( + s, + time 3000, + [TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"a"[..], + ..RECV_TEMPL + }] + ); + + // ack the ZWP byte, but still advertise zero window. + // this should restart the ZWP timer. + send!( + s, + time 3100, + TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 2), + window_len: 0, + ..SEND_TEMPL + } + ); + + // ZWP should be sent at 3100+1000 = 4100 + recv_nothing!(s, time 4099); + recv!( + s, + time 4100, + [TcpRepr { + seq_number: LOCAL_SEQ + 2, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"b"[..], + ..RECV_TEMPL + }] + ); + } + // =========================================================================================// // Tests for timeouts. // =========================================================================================//