diff --git a/.bleep b/.bleep index a5d99c62..f8542c16 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -2b28af8029c2e74b642c3a7445dfd6768eda1b24 \ No newline at end of file +5bbb21bd377e352872ab767af7583e4d8e9022f8 \ No newline at end of file diff --git a/pingora-core/src/protocols/l4/stream.rs b/pingora-core/src/protocols/l4/stream.rs index 37f7486c..8c5f3cf1 100644 --- a/pingora-core/src/protocols/l4/stream.rs +++ b/pingora-core/src/protocols/l4/stream.rs @@ -125,6 +125,8 @@ struct RawStreamWrapper { pub(crate) stream: RawStream, /// store the last rx timestamp of the stream. pub(crate) rx_ts: Option, + /// enable reading rx timestamp + pub(crate) enable_rx_ts: bool, #[cfg(target_os = "linux")] /// This can be reused across multiple recvmsg calls. The cmsg buffer may /// come from old sockets created by older version of pingora and so, @@ -137,10 +139,15 @@ impl RawStreamWrapper { RawStreamWrapper { stream, rx_ts: None, + enable_rx_ts: false, #[cfg(target_os = "linux")] reusable_cmsg_space: nix::cmsg_space!(nix::sys::time::TimeSpec), } } + + pub fn enable_rx_ts(&mut self, enable_rx_ts: bool) { + self.enable_rx_ts = enable_rx_ts; + } } impl AsyncRead for RawStreamWrapper { @@ -169,6 +176,18 @@ impl AsyncRead for RawStreamWrapper { use futures::ready; use nix::sys::socket::{recvmsg, ControlMessageOwned, MsgFlags, SockaddrStorage}; + // if we do not need rx timestamp, then use the standard path + if !self.enable_rx_ts { + // Safety: Basic enum pin projection + unsafe { + let rs_wrapper = Pin::get_unchecked_mut(self); + match &mut rs_wrapper.stream { + RawStream::Tcp(s) => return Pin::new_unchecked(s).poll_read(cx, buf), + RawStream::Unix(s) => return Pin::new_unchecked(s).poll_read(cx, buf), + } + } + } + // Safety: Basic pin projection to get mutable stream let rs_wrapper = unsafe { Pin::get_unchecked_mut(self) }; match &mut rs_wrapper.stream { @@ -331,9 +350,11 @@ impl Stream { if let RawStream::Tcp(s) = &self.stream.get_mut().stream { let timestamp_options = TimestampingFlag::SOF_TIMESTAMPING_RX_SOFTWARE | TimestampingFlag::SOF_TIMESTAMPING_SOFTWARE; - return setsockopt(s.as_raw_fd(), sockopt::Timestamping, ×tamp_options) - .or_err(InternalError, "failed to set SOF_TIMESTAMPING_RX_SOFTWARE"); + setsockopt(s.as_raw_fd(), sockopt::Timestamping, ×tamp_options) + .or_err(InternalError, "failed to set SOF_TIMESTAMPING_RX_SOFTWARE")?; + self.stream.get_mut().enable_rx_ts(true); } + Ok(()) } @@ -755,4 +776,29 @@ mod tests { assert_eq!(n, message.len()); assert!(stream.rx_ts.is_some()); } + + #[cfg(target_os = "linux")] + #[tokio::test] + async fn test_rx_timestamp_standard_path() { + let message = "hello world".as_bytes(); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let notify = Arc::new(Notify::new()); + let notify2 = notify.clone(); + + tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + notify2.notified().await; + stream.write_all(message).await.unwrap(); + }); + + let mut stream: Stream = TcpStream::connect(addr).await.unwrap().into(); + std::thread::sleep(Duration::from_micros(100)); + notify.notify_one(); + + let mut buffer = vec![0u8; message.len()]; + let n = stream.read(buffer.as_mut_slice()).await.unwrap(); + assert_eq!(n, message.len()); + assert!(stream.rx_ts.is_none()); + } }