From 2762b5b8a9fb1aa70542ecad2b93edc12911ac7f Mon Sep 17 00:00:00 2001 From: XOR-op <17672363+XOR-op@users.noreply.github.com> Date: Wed, 22 May 2024 19:52:49 -0400 Subject: [PATCH 01/11] impl: ability to resize tcp receive buffer --- src/socket/tcp.rs | 60 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/src/socket/tcp.rs b/src/socket/tcp.rs index 42f99175f..a61bc2e79 100644 --- a/src/socket/tcp.rs +++ b/src/socket/tcp.rs @@ -523,7 +523,17 @@ impl<'a> Socket<'a> { panic!("receiving buffer too large, cannot exceed 1 GiB") } let rx_cap_log2 = mem::size_of::() * 8 - rx_capacity.leading_zeros() as usize; + let remote_win_shift = rx_cap_log2.saturating_sub(16) as u8; + Self::new_with_window_scaling(rx_buffer, tx_buffer, remote_win_shift) + } + /// Create a socket using the given buffers and window scaling factor defined in [RFC 1323]. + /// + /// See also the [local_recv_win_scale](#method.local_recv_win_scale) method. + pub fn new_with_window_scaling(rx_buffer: T, tx_buffer: T, recv_win_scale: u8) -> Socket<'a> + where + T: Into>, + { Socket { state: State::Closed, timer: Timer::new(), @@ -543,7 +553,7 @@ impl<'a> Socket<'a> { remote_last_ack: None, remote_last_win: 0, remote_win_len: 0, - remote_win_shift: rx_cap_log2.saturating_sub(16) as u8, + remote_win_shift: recv_win_scale, remote_win_scale: None, remote_has_sack: false, remote_mss: DEFAULT_MSS, @@ -774,6 +784,14 @@ impl<'a> Socket<'a> { } } + /// Return the local receive window scaling factor defined in [RFC 1323]. + /// + /// The value will become constant after the connection is established. + /// It may be reset to 0 during the handshake if remote side does not support window scaling. + pub fn local_recv_win_scale(&self) -> u8 { + self.remote_win_shift + } + /// Return the time-to-live (IPv4) or hop limit (IPv6) value used in outgoing packets. /// /// See also the [set_hop_limit](#method.set_hop_limit) method @@ -2603,6 +2621,46 @@ impl<'a> Socket<'a> { } } +impl Socket<'static> { + /// TODO: DOCS + pub fn replace_recv_buffer>>( + &mut self, + new_buffer: T, + ) -> Result, SocketBuffer<'static>> { + let mut replaced_buf = new_buffer.into(); + /* Check if the new buffer is valid + * Requirements: + * 1. The new buffer must be larger than the length of remaining data in the current buffer + * 2. The new buffer must be multiple of (1 << self.remote_win_shift) + */ + if replaced_buf.capacity() < self.rx_buffer.len() + || replaced_buf.capacity() % (1 << self.remote_win_shift) != 0 + { + return Err(replaced_buf); + } + replaced_buf.clear(); + self.rx_buffer.dequeue_many_with(|buf| { + let enqueued_len = replaced_buf.enqueue_slice(buf); + assert_eq!(enqueued_len, buf.len()); + (enqueued_len, replaced_buf.get_allocated(0, enqueued_len)) + }); + if self.rx_buffer.len() > 0 { + // copy the wrapped around part + self.rx_buffer.dequeue_many_with(|buf| { + let enqueued_len = replaced_buf.enqueue_slice(buf); + assert_eq!(enqueued_len, buf.len()); + ( + enqueued_len, + replaced_buf.get_allocated(buf.len() - enqueued_len, enqueued_len), + ) + }); + } + assert_eq!(self.rx_buffer.len(), 0); + mem::swap(&mut self.rx_buffer, &mut replaced_buf); + Ok(replaced_buf) + } +} + impl<'a> fmt::Write for Socket<'a> { fn write_str(&mut self, slice: &str) -> fmt::Result { let slice = slice.as_bytes(); From e219a482836aee93a2f0872817601fcfae518ab0 Mon Sep 17 00:00:00 2001 From: XOR-op <17672363+XOR-op@users.noreply.github.com> Date: Wed, 22 May 2024 20:01:35 -0400 Subject: [PATCH 02/11] impl: new_with_window_scaling will panic --- src/socket/tcp.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/socket/tcp.rs b/src/socket/tcp.rs index a61bc2e79..1b6f65d85 100644 --- a/src/socket/tcp.rs +++ b/src/socket/tcp.rs @@ -529,11 +529,21 @@ impl<'a> Socket<'a> { /// Create a socket using the given buffers and window scaling factor defined in [RFC 1323]. /// + /// # Panics + /// + /// Panics if the window scaling factor is greater than 14. + /// /// See also the [local_recv_win_scale](#method.local_recv_win_scale) method. pub fn new_with_window_scaling(rx_buffer: T, tx_buffer: T, recv_win_scale: u8) -> Socket<'a> where T: Into>, { + if recv_win_scale > 14 { + panic!("window scaling factor too large, must be <= 14") + } + + let (rx_buffer, tx_buffer) = (rx_buffer.into(), tx_buffer.into()); + Socket { state: State::Closed, timer: Timer::new(), From fee79d8f4d4e2826be1b87bdb348c7c7f1896980 Mon Sep 17 00:00:00 2001 From: XOR-op <17672363+XOR-op@users.noreply.github.com> Date: Wed, 22 May 2024 20:51:34 -0400 Subject: [PATCH 03/11] test: resize recv buffer --- src/socket/tcp.rs | 52 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/src/socket/tcp.rs b/src/socket/tcp.rs index 1b6f65d85..74c627264 100644 --- a/src/socket/tcp.rs +++ b/src/socket/tcp.rs @@ -8219,4 +8219,56 @@ mod test { }] ); } + + // =========================================================================================// + // Tests for window scaling + // =========================================================================================// + + fn socket_established_with_window_scaling() -> TestSocket { + let mut s = socket_established(); + s.remote_win_shift = 10; + const BASE: usize = 1 << 10; + s.tx_buffer = SocketBuffer::new(vec![0u8; 64 * BASE]); + s.rx_buffer = SocketBuffer::new(vec![0u8; 64 * BASE]); + s + } + + #[test] + #[should_panic] + fn test_new_with_too_large_window_scale() { + Socket::new_with_window_scaling( + SocketBuffer::new(Vec::with_capacity(128)), + SocketBuffer::new(Vec::with_capacity(128)), + 15, + ); + } + + #[test] + fn test_new_with_window_scale() { + let socket = Socket::new_with_window_scaling( + SocketBuffer::new(Vec::with_capacity(128)), + SocketBuffer::new(Vec::with_capacity(128)), + 14, + ); + assert_eq!(socket.local_recv_win_scale(), 14); + } + + #[test] + fn test_resize_recv_buffer_invalid_size() { + let mut s = socket_established_with_window_scaling(); + assert_eq!(s.rx_buffer.enqueue_slice(&[42; 31 * 1024]), 31 * 1024); + assert_eq!(s.rx_buffer.len(), 31 * 1024); + assert!(s + .replace_recv_buffer(SocketBuffer::new(vec![7u8; 32 * 1024 + 512])) + .is_err()); + assert!(s + .replace_recv_buffer(SocketBuffer::new(vec![7u8; 16 * 1024])) + .is_err()); + let old_buffer = s + .replace_recv_buffer(SocketBuffer::new(vec![7u8; 32 * 1024])) + .unwrap(); + assert_eq!(old_buffer.capacity(), 64 * 1024); + assert_eq!(s.rx_buffer.len(), 31 * 1024); + assert_eq!(s.rx_buffer.capacity(), 32 * 1024); + } } From e14df0dd2d8eed43e6088de2fff07462f7e4d724 Mon Sep 17 00:00:00 2001 From: XOR-op <17672363+XOR-op@users.noreply.github.com> Date: Wed, 22 May 2024 21:12:28 -0400 Subject: [PATCH 04/11] docs: replace_recv_buffer --- src/socket/tcp.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/socket/tcp.rs b/src/socket/tcp.rs index 74c627264..657178564 100644 --- a/src/socket/tcp.rs +++ b/src/socket/tcp.rs @@ -2632,7 +2632,21 @@ impl<'a> Socket<'a> { } impl Socket<'static> { - /// TODO: DOCS + /// Replace the receive buffer with a new one. + /// + /// The requirements for the new buffer are: + /// 1. The new buffer must be larger than the length of remaining data in the current buffer + /// 2. The new buffer must be multiple of (1 << self.remote_win_shift) + /// + /// Note: self.remote_win_shift cannot be modified after the connection is established. Use + /// `new_with_window_scaling` to create a new socket with a pre-defined window scale. Details can + /// be found in [RFC 1323]. + /// + /// If the new buffer does not meet the requirements, the new buffer is returned as an error; + /// otherwise, the old buffer is returned as an Ok value. + /// + /// See also the [new_with_window_scaling](struct.Socket.html#method.new_with_window_scaling) and + /// [local_recv_win_scale](struct.Socket.html#method.local_recv_win_scale) methods. pub fn replace_recv_buffer>>( &mut self, new_buffer: T, From 13b9695ad6172f984110d406d22d261400817e8f Mon Sep 17 00:00:00 2001 From: XOR-op <17672363+XOR-op@users.noreply.github.com> Date: Wed, 22 May 2024 21:21:46 -0400 Subject: [PATCH 05/11] lints: fix clippy --- src/socket/tcp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/socket/tcp.rs b/src/socket/tcp.rs index 657178564..ef4360ba6 100644 --- a/src/socket/tcp.rs +++ b/src/socket/tcp.rs @@ -2668,7 +2668,7 @@ impl Socket<'static> { assert_eq!(enqueued_len, buf.len()); (enqueued_len, replaced_buf.get_allocated(0, enqueued_len)) }); - if self.rx_buffer.len() > 0 { + if !self.rx_buffer.is_empty() { // copy the wrapped around part self.rx_buffer.dequeue_many_with(|buf| { let enqueued_len = replaced_buf.enqueue_slice(buf); From 0cb9b064a1ee7b701a20599e0c878e3aecb83eda Mon Sep 17 00:00:00 2001 From: XOR-op <17672363+XOR-op@users.noreply.github.com> Date: Fri, 14 Jun 2024 18:19:39 -0400 Subject: [PATCH 06/11] change: use set_* api for window scale --- src/socket/tcp.rs | 80 +++++++++++++++++++++++++++++------------------ 1 file changed, 49 insertions(+), 31 deletions(-) diff --git a/src/socket/tcp.rs b/src/socket/tcp.rs index ef4360ba6..0b02fbc70 100644 --- a/src/socket/tcp.rs +++ b/src/socket/tcp.rs @@ -524,24 +524,6 @@ impl<'a> Socket<'a> { } let rx_cap_log2 = mem::size_of::() * 8 - rx_capacity.leading_zeros() as usize; let remote_win_shift = rx_cap_log2.saturating_sub(16) as u8; - Self::new_with_window_scaling(rx_buffer, tx_buffer, remote_win_shift) - } - - /// Create a socket using the given buffers and window scaling factor defined in [RFC 1323]. - /// - /// # Panics - /// - /// Panics if the window scaling factor is greater than 14. - /// - /// See also the [local_recv_win_scale](#method.local_recv_win_scale) method. - pub fn new_with_window_scaling(rx_buffer: T, tx_buffer: T, recv_win_scale: u8) -> Socket<'a> - where - T: Into>, - { - if recv_win_scale > 14 { - panic!("window scaling factor too large, must be <= 14") - } - let (rx_buffer, tx_buffer) = (rx_buffer.into(), tx_buffer.into()); Socket { @@ -563,7 +545,7 @@ impl<'a> Socket<'a> { remote_last_ack: None, remote_last_win: 0, remote_win_len: 0, - remote_win_shift: recv_win_scale, + remote_win_shift, remote_win_scale: None, remote_has_sack: false, remote_mss: DEFAULT_MSS, @@ -802,6 +784,28 @@ impl<'a> Socket<'a> { self.remote_win_shift } + /// Set the local receive window scaling factor defined in [RFC 1323]. + /// + /// The value will become constant after the connection is established. + /// It may be reset to 0 during the handshake if remote side does not support window scaling. + /// + /// # Errors + /// Returns an error if the socket is not in the `Closed` or `Listen` state, or if the + /// receive buffer is smaller than (1< Result<(), ()> { + if self.rx_buffer.capacity() < (1 << scale) as usize { + return Err(()); + } + + match self.state { + State::Closed | State::Listen => { + self.remote_win_shift = scale; + Ok(()) + } + _ => Err(()), + } + } + /// Return the time-to-live (IPv4) or hop limit (IPv6) value used in outgoing packets. /// /// See also the [set_hop_limit](#method.set_hop_limit) method @@ -8248,23 +8252,37 @@ mod test { } #[test] - #[should_panic] - fn test_new_with_too_large_window_scale() { - Socket::new_with_window_scaling( - SocketBuffer::new(Vec::with_capacity(128)), - SocketBuffer::new(Vec::with_capacity(128)), - 15, + fn test_too_large_window_scale() { + let mut socket = Socket::new( + SocketBuffer::new(vec![0; 128]), + SocketBuffer::new(vec![0; 128]), ); + assert!(socket.set_local_recv_win_scale(15).is_err()) } #[test] - fn test_new_with_window_scale() { - let socket = Socket::new_with_window_scaling( - SocketBuffer::new(Vec::with_capacity(128)), - SocketBuffer::new(Vec::with_capacity(128)), - 14, + fn test_set_window_scale() { + let mut socket = Socket::new( + SocketBuffer::new(vec![0; 128]), + SocketBuffer::new(vec![0; 128]), ); - assert_eq!(socket.local_recv_win_scale(), 14); + assert!(matches!(socket.state, State::Closed)); + assert_eq!(socket.rx_buffer.capacity(), 128); + assert!(socket.set_local_recv_win_scale(6).is_ok()); + assert!(socket.set_local_recv_win_scale(14).is_err()); + assert_eq!(socket.local_recv_win_scale(), 6); + } + + #[test] + fn test_set_scale_with_tcp_state() { + let mut socket = socket(); + assert!(socket.set_local_recv_win_scale(1).is_ok()); + let mut socket = socket_established(); + assert!(socket.set_local_recv_win_scale(1).is_err()); + let mut socket = socket_listen(); + assert!(socket.set_local_recv_win_scale(1).is_ok()); + let mut socket = socket_syn_received(); + assert!(socket.set_local_recv_win_scale(1).is_err()); } #[test] From d7824c8d5081aeb264ccb14953ec391461313852 Mon Sep 17 00:00:00 2001 From: XOR-op <17672363+XOR-op@users.noreply.github.com> Date: Fri, 14 Jun 2024 18:38:52 -0400 Subject: [PATCH 07/11] lints: fix clippy --- src/socket/tcp.rs | 47 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/src/socket/tcp.rs b/src/socket/tcp.rs index 0b02fbc70..564c2dafe 100644 --- a/src/socket/tcp.rs +++ b/src/socket/tcp.rs @@ -63,6 +63,30 @@ impl Display for ConnectError { #[cfg(feature = "std")] impl std::error::Error for ConnectError {} +/// Error returned by set_* +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum ArgumentError { + InvalidArgs, + InvalidState, + InsufficientResource, +} + +impl Display for crate::socket::tcp::ArgumentError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + crate::socket::tcp::ArgumentError::InvalidArgs => write!(f, "invalid arguments by RFC"), + crate::socket::tcp::ArgumentError::InvalidState => write!(f, "invalid state"), + crate::socket::tcp::ArgumentError::InsufficientResource => { + write!(f, "insufficient runtime resource") + } + } + } +} + +#[cfg(feature = "std")] +impl std::error::Error for crate::socket::tcp::ArgumentError {} + /// Error returned by [`Socket::send`] #[derive(Debug, PartialEq, Eq, Clone, Copy)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] @@ -523,8 +547,6 @@ impl<'a> Socket<'a> { panic!("receiving buffer too large, cannot exceed 1 GiB") } let rx_cap_log2 = mem::size_of::() * 8 - rx_capacity.leading_zeros() as usize; - let remote_win_shift = rx_cap_log2.saturating_sub(16) as u8; - let (rx_buffer, tx_buffer) = (rx_buffer.into(), tx_buffer.into()); Socket { state: State::Closed, @@ -545,7 +567,7 @@ impl<'a> Socket<'a> { remote_last_ack: None, remote_last_win: 0, remote_win_len: 0, - remote_win_shift, + remote_win_shift: rx_cap_log2.saturating_sub(16) as u8, remote_win_scale: None, remote_has_sack: false, remote_mss: DEFAULT_MSS, @@ -790,11 +812,16 @@ impl<'a> Socket<'a> { /// It may be reset to 0 during the handshake if remote side does not support window scaling. /// /// # Errors - /// Returns an error if the socket is not in the `Closed` or `Listen` state, or if the - /// receive buffer is smaller than (1< Result<(), ()> { + /// `Err(ArgumentError::InvalidArgs)` if the scale is greater than 14. + /// `Err(ArgumentError::InvalidState)` if the socket is not in the `Closed` or `Listen` state. + /// `Err(ArgumentError::InsufficientResource)` if the receive buffer is smaller than (1< Result<(), ArgumentError> { + if scale > 14 { + return Err(ArgumentError::InvalidArgs); + } + if self.rx_buffer.capacity() < (1 << scale) as usize { - return Err(()); + return Err(ArgumentError::InsufficientResource); } match self.state { @@ -802,7 +829,7 @@ impl<'a> Socket<'a> { self.remote_win_shift = scale; Ok(()) } - _ => Err(()), + _ => Err(ArgumentError::InvalidState), } } @@ -8254,8 +8281,8 @@ mod test { #[test] fn test_too_large_window_scale() { let mut socket = Socket::new( - SocketBuffer::new(vec![0; 128]), - SocketBuffer::new(vec![0; 128]), + SocketBuffer::new(vec![0; 8 * (1 << 15)]), + SocketBuffer::new(vec![0; 8 * (1 << 15)]), ); assert!(socket.set_local_recv_win_scale(15).is_err()) } From 125a9b0b64824e9c0ecc668b90d6feea32a2e53e Mon Sep 17 00:00:00 2001 From: XOR-op <17672363+XOR-op@users.noreply.github.com> Date: Wed, 10 Jul 2024 19:13:33 -0400 Subject: [PATCH 08/11] fix: lifetime constraint --- src/socket/tcp.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/socket/tcp.rs b/src/socket/tcp.rs index 564c2dafe..dd73d1d18 100644 --- a/src/socket/tcp.rs +++ b/src/socket/tcp.rs @@ -2660,9 +2660,7 @@ impl<'a> Socket<'a> { .unwrap_or(&PollAt::Ingress) } } -} -impl Socket<'static> { /// Replace the receive buffer with a new one. /// /// The requirements for the new buffer are: @@ -2678,10 +2676,10 @@ impl Socket<'static> { /// /// See also the [new_with_window_scaling](struct.Socket.html#method.new_with_window_scaling) and /// [local_recv_win_scale](struct.Socket.html#method.local_recv_win_scale) methods. - pub fn replace_recv_buffer>>( + pub fn replace_recv_buffer>>( &mut self, new_buffer: T, - ) -> Result, SocketBuffer<'static>> { + ) -> Result, SocketBuffer<'a>> { let mut replaced_buf = new_buffer.into(); /* Check if the new buffer is valid * Requirements: From 5b4582c6905710b1308ad40a78300384f70fd239 Mon Sep 17 00:00:00 2001 From: XOR-op <17672363+XOR-op@users.noreply.github.com> Date: Wed, 10 Jul 2024 23:43:14 -0400 Subject: [PATCH 09/11] fix: configuration overridden by private reset() --- src/socket/tcp.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/socket/tcp.rs b/src/socket/tcp.rs index dd73d1d18..c8be47e54 100644 --- a/src/socket/tcp.rs +++ b/src/socket/tcp.rs @@ -887,6 +887,7 @@ impl<'a> Socket<'a> { fn reset(&mut self) { let rx_cap_log2 = mem::size_of::() * 8 - self.rx_buffer.capacity().leading_zeros() as usize; + let new_rx_win_shift = rx_cap_log2.saturating_sub(16) as u8; self.state = State::Closed; self.timer = Timer::new(); @@ -904,7 +905,10 @@ impl<'a> Socket<'a> { self.remote_last_win = 0; self.remote_win_len = 0; self.remote_win_scale = None; - self.remote_win_shift = rx_cap_log2.saturating_sub(16) as u8; + // keep user-specified window scaling across connect()/listen() + if self.remote_win_shift < new_rx_win_shift { + self.remote_win_shift = new_rx_win_shift; + } self.remote_mss = DEFAULT_MSS; self.remote_last_ts = None; self.ack_delay_timer = AckDelayTimer::Idle; @@ -2388,6 +2392,7 @@ impl<'a> Socket<'a> { } else if self.timer.should_close(cx.now()) { // If we have spent enough time in the TIME-WAIT state, close the socket. tcp_trace!("TIME-WAIT timer expired"); + self.remote_win_shift = 0; self.reset(); return Ok(()); } else { From f29fa12a3f4f2151eb22b4df6855ffb052811545 Mon Sep 17 00:00:00 2001 From: XOR-op <17672363+XOR-op@users.noreply.github.com> Date: Thu, 11 Jul 2024 00:10:00 -0400 Subject: [PATCH 10/11] fix: docs --- src/socket/tcp.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/socket/tcp.rs b/src/socket/tcp.rs index c8be47e54..50eb597e2 100644 --- a/src/socket/tcp.rs +++ b/src/socket/tcp.rs @@ -2672,15 +2672,10 @@ impl<'a> Socket<'a> { /// 1. The new buffer must be larger than the length of remaining data in the current buffer /// 2. The new buffer must be multiple of (1 << self.remote_win_shift) /// - /// Note: self.remote_win_shift cannot be modified after the connection is established. Use - /// `new_with_window_scaling` to create a new socket with a pre-defined window scale. Details can - /// be found in [RFC 1323]. - /// /// If the new buffer does not meet the requirements, the new buffer is returned as an error; /// otherwise, the old buffer is returned as an Ok value. /// - /// See also the [new_with_window_scaling](struct.Socket.html#method.new_with_window_scaling) and - /// [local_recv_win_scale](struct.Socket.html#method.local_recv_win_scale) methods. + /// See also the [local_recv_win_scale](struct.Socket.html#method.local_recv_win_scale) methods. pub fn replace_recv_buffer>>( &mut self, new_buffer: T, From 37eb312a1543ef7d1ccad8e558314bdcd47ea331 Mon Sep 17 00:00:00 2001 From: XOR-op <17672363+XOR-op@users.noreply.github.com> Date: Thu, 20 Feb 2025 02:07:07 -0500 Subject: [PATCH 11/11] fix: copy unallocated data for assembler --- src/socket/tcp.rs | 42 ++++++++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/src/socket/tcp.rs b/src/socket/tcp.rs index 50eb597e2..fc8bf6b22 100644 --- a/src/socket/tcp.rs +++ b/src/socket/tcp.rs @@ -2692,23 +2692,33 @@ impl<'a> Socket<'a> { return Err(replaced_buf); } replaced_buf.clear(); - self.rx_buffer.dequeue_many_with(|buf| { - let enqueued_len = replaced_buf.enqueue_slice(buf); - assert_eq!(enqueued_len, buf.len()); - (enqueued_len, replaced_buf.get_allocated(0, enqueued_len)) - }); - if !self.rx_buffer.is_empty() { - // copy the wrapped around part - self.rx_buffer.dequeue_many_with(|buf| { - let enqueued_len = replaced_buf.enqueue_slice(buf); - assert_eq!(enqueued_len, buf.len()); - ( - enqueued_len, - replaced_buf.get_allocated(buf.len() - enqueued_len, enqueued_len), - ) - }); + + // We should copy both allocated data and unallocated data (for assembler) + let allocated1 = self.rx_buffer.get_allocated(0, self.rx_buffer.len()); + let l = replaced_buf.enqueue_slice(allocated1); + assert_eq!(l, allocated1.len()); + if allocated1.len() < self.rx_buffer.len() { + let allocated2 = self + .rx_buffer + .get_allocated(allocated1.len(), self.rx_buffer.len() - allocated1.len()); + let l = replaced_buf.enqueue_slice(allocated2); + assert_eq!(l, allocated2.len()); } - assert_eq!(self.rx_buffer.len(), 0); + + // make sure assembler can work properly + let unallocated1 = self.rx_buffer.get_unallocated(0, self.rx_buffer.window()); + let unallocated1_len = unallocated1.len(); + let l = replaced_buf.write_unallocated(0, unallocated1); + assert_eq!(l, unallocated1.len()); + if unallocated1_len < self.rx_buffer.window() { + let unallocated2 = self + .rx_buffer + .get_unallocated(unallocated1_len, self.rx_buffer.window() - unallocated1_len); + let l = replaced_buf.write_unallocated(unallocated1_len, unallocated2); + assert_eq!(l, unallocated2.len()); + } + assert_eq!(replaced_buf.len(), self.rx_buffer.len()); + mem::swap(&mut self.rx_buffer, &mut replaced_buf); Ok(replaced_buf) }