Skip to content

Commit

Permalink
support retrieving rx timestamp for TcpStream
Browse files Browse the repository at this point in the history
Documentation: https://docs.kernel.org/networking/timestamping.html

This modifies Stream type to be able to setup rx timestamp on its
socket, and use `recvmsg` to read the returned timestamp. The rx
timestamp of the last recvmsg is stored as `rx_ts`

In the context of tcp stream, if the returned byte range spans multiple
timestamps, the last one is returned.
  • Loading branch information
dqminh authored and gumpt committed Sep 23, 2024
1 parent 75d6e00 commit 7e0368d
Show file tree
Hide file tree
Showing 2 changed files with 236 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .bleep
Original file line number Diff line number Diff line change
@@ -1 +1 @@
7e74e88f3e50bfdb38eeea244f3100d8477db7e4
2b28af8029c2e74b642c3a7445dfd6768eda1b24
242 changes: 235 additions & 7 deletions pingora-core/src/protocols/l4/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
use async_trait::async_trait;
use futures::FutureExt;
use log::{debug, error};

use pingora_error::{ErrorType::*, OrErr, Result};
use std::io::IoSliceMut;
use std::os::unix::io::AsRawFd;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant, SystemTime};
use tokio::io::{self, AsyncRead, AsyncWrite, AsyncWriteExt, BufStream, ReadBuf};
use tokio::io::{self, AsyncRead, AsyncWrite, AsyncWriteExt, BufStream, Interest, ReadBuf};
use tokio::net::{TcpStream, UnixStream};

use crate::protocols::l4::ext::{set_tcp_keepalive, TcpKeepalive};
Expand Down Expand Up @@ -118,6 +120,162 @@ impl AsRawFd for RawStream {
}
}

#[derive(Debug)]
struct RawStreamWrapper {
pub(crate) stream: RawStream,
/// store the last rx timestamp of the stream.
pub(crate) rx_ts: Option<SystemTime>,
#[cfg(target_os = "linux")]
/// This can be reused across multiple recvmsg calls. The cmsg buffer may
/// come from old sockets created by older version of pingora and so,
/// this vector can only grow.
reusable_cmsg_space: Vec<u8>,
}

impl RawStreamWrapper {
pub fn new(stream: RawStream) -> Self {
RawStreamWrapper {
stream,
rx_ts: None,
#[cfg(target_os = "linux")]
reusable_cmsg_space: nix::cmsg_space!(nix::sys::time::TimeSpec),
}
}
}

impl AsyncRead for RawStreamWrapper {
#[cfg(not(target_os = "linux"))]
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// Safety: Basic enum pin projection
unsafe {
let rs_wrapper = Pin::get_unchecked_mut(self);
match &mut rs_wrapper.stream {
RawStream::Tcp(s) => Pin::new_unchecked(s).poll_read(cx, buf),
RawStream::Unix(s) => Pin::new_unchecked(s).poll_read(cx, buf),
}
}
}

#[cfg(target_os = "linux")]
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
use futures::ready;
use nix::sys::socket::{recvmsg, ControlMessageOwned, MsgFlags, SockaddrStorage};

// Safety: Basic pin projection to get mutable stream
let rs_wrapper = unsafe { Pin::get_unchecked_mut(self) };
match &mut rs_wrapper.stream {
RawStream::Tcp(s) => {
loop {
ready!(s.poll_read_ready(cx))?;
// Safety: maybe uninitialized bytes will only be passed to recvmsg
let b = unsafe {
&mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>]
as *mut [u8])
};
let mut iov = [IoSliceMut::new(b)];
rs_wrapper.reusable_cmsg_space.clear();

match s.try_io(Interest::READABLE, || {
recvmsg::<SockaddrStorage>(
s.as_raw_fd(),
&mut iov,
Some(&mut rs_wrapper.reusable_cmsg_space),
MsgFlags::empty(),
)
.map_err(|errno| errno.into())
}) {
Ok(r) => {
if let Some(ControlMessageOwned::ScmTimestampsns(rtime)) = r
.cmsgs()
.find(|i| matches!(i, ControlMessageOwned::ScmTimestampsns(_)))
{
// The returned timestamp is a real (i.e. not monotonic) timestamp
// https://docs.kernel.org/networking/timestamping.html
rs_wrapper.rx_ts =
SystemTime::UNIX_EPOCH.checked_add(rtime.system.into());
}
// Safety: We trust `recvmsg` to have filled up `r.bytes` bytes in the buffer.
unsafe {
buf.assume_init(r.bytes);
}
buf.advance(r.bytes);
return Poll::Ready(Ok(()));
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => return Poll::Ready(Err(e)),
}
}
}
// Unix RX timestamp only works with datagram for now, so we do not care about it
RawStream::Unix(s) => unsafe { Pin::new_unchecked(s).poll_read(cx, buf) },
}
}
}

impl AsyncWrite for RawStreamWrapper {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
// Safety: Basic enum pin projection
unsafe {
match &mut Pin::get_unchecked_mut(self).stream {
RawStream::Tcp(s) => Pin::new_unchecked(s).poll_write(cx, buf),
RawStream::Unix(s) => Pin::new_unchecked(s).poll_write(cx, buf),
}
}
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
// Safety: Basic enum pin projection
unsafe {
match &mut Pin::get_unchecked_mut(self).stream {
RawStream::Tcp(s) => Pin::new_unchecked(s).poll_flush(cx),
RawStream::Unix(s) => Pin::new_unchecked(s).poll_flush(cx),
}
}
}

fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
// Safety: Basic enum pin projection
unsafe {
match &mut Pin::get_unchecked_mut(self).stream {
RawStream::Tcp(s) => Pin::new_unchecked(s).poll_shutdown(cx),
RawStream::Unix(s) => Pin::new_unchecked(s).poll_shutdown(cx),
}
}
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
// Safety: Basic enum pin projection
unsafe {
match &mut Pin::get_unchecked_mut(self).stream {
RawStream::Tcp(s) => Pin::new_unchecked(s).poll_write_vectored(cx, bufs),
RawStream::Unix(s) => Pin::new_unchecked(s).poll_write_vectored(cx, bufs),
}
}
}

fn is_write_vectored(&self) -> bool {
self.stream.is_write_vectored()
}
}

impl AsRawFd for RawStreamWrapper {
fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
self.stream.as_raw_fd()
}
}

// Large read buffering helps reducing syscalls with little trade-off
// Ssl layer always does "small" reads in 16k (TLS record size) so L4 read buffer helps a lot.
const BUF_READ_SIZE: usize = 64 * 1024;
Expand All @@ -133,7 +291,7 @@ const BUF_WRITE_SIZE: usize = 1460;
/// A concrete type for transport layer connection + extra fields for logging
#[derive(Debug)]
pub struct Stream {
stream: BufStream<RawStream>,
stream: BufStream<RawStreamWrapper>,
buffer_write: bool,
proxy_digest: Option<Arc<ProxyDigest>>,
socket_digest: Option<Arc<SocketDigest>>,
Expand All @@ -143,12 +301,14 @@ pub struct Stream {
pub tracer: Option<Tracer>,
read_pending_time: AccumulatedDuration,
write_pending_time: AccumulatedDuration,
/// Last rx timestamp associated with the last recvmsg call.
pub rx_ts: Option<SystemTime>,
}

impl Stream {
/// set TCP nodelay for this connection if `self` is TCP
pub fn set_nodelay(&mut self) -> Result<()> {
if let RawStream::Tcp(s) = &self.stream.get_ref() {
if let RawStream::Tcp(s) = &self.stream.get_mut().stream {
s.set_nodelay(true)
.or_err(ConnectError, "failed to set_nodelay")?;
}
Expand All @@ -157,40 +317,68 @@ impl Stream {

/// set TCP keepalive settings for this connection if `self` is TCP
pub fn set_keepalive(&mut self, ka: &TcpKeepalive) -> Result<()> {
if let RawStream::Tcp(s) = &self.stream.get_ref() {
if let RawStream::Tcp(s) = &self.stream.get_mut().stream {
debug!("Setting tcp keepalive");
set_tcp_keepalive(s, ka)?;
}
Ok(())
}

#[cfg(target_os = "linux")]
pub fn set_rx_timestamp(&mut self) -> Result<()> {
use nix::sys::socket::{setsockopt, sockopt, TimestampingFlag};

if let RawStream::Tcp(s) = &self.stream.get_mut().stream {
let timestamp_options = TimestampingFlag::SOF_TIMESTAMPING_RX_SOFTWARE
| TimestampingFlag::SOF_TIMESTAMPING_SOFTWARE;
return setsockopt(s.as_raw_fd(), sockopt::Timestamping, &timestamp_options)
.or_err(InternalError, "failed to set SOF_TIMESTAMPING_RX_SOFTWARE");
}
Ok(())
}

#[cfg(not(target_os = "linux"))]
pub fn set_rx_timestamp(&mut self) -> io::Result<()> {
Ok(())
}
}

impl From<TcpStream> for Stream {
fn from(s: TcpStream) -> Self {
Stream {
stream: BufStream::with_capacity(BUF_READ_SIZE, BUF_WRITE_SIZE, RawStream::Tcp(s)),
stream: BufStream::with_capacity(
BUF_READ_SIZE,
BUF_WRITE_SIZE,
RawStreamWrapper::new(RawStream::Tcp(s)),
),
buffer_write: true,
established_ts: SystemTime::now(),
proxy_digest: None,
socket_digest: None,
tracer: None,
read_pending_time: AccumulatedDuration::new(),
write_pending_time: AccumulatedDuration::new(),
rx_ts: None,
}
}
}

impl From<UnixStream> for Stream {
fn from(s: UnixStream) -> Self {
Stream {
stream: BufStream::with_capacity(BUF_READ_SIZE, BUF_WRITE_SIZE, RawStream::Unix(s)),
stream: BufStream::with_capacity(
BUF_READ_SIZE,
BUF_WRITE_SIZE,
RawStreamWrapper::new(RawStream::Unix(s)),
),
buffer_write: true,
established_ts: SystemTime::now(),
proxy_digest: None,
socket_digest: None,
tracer: None,
read_pending_time: AccumulatedDuration::new(),
write_pending_time: AccumulatedDuration::new(),
rx_ts: None,
}
}
}
Expand Down Expand Up @@ -262,7 +450,7 @@ impl Drop for Stream {
t.0.on_disconnected();
}
/* use nodelay/local_addr function to detect socket status */
let ret = match &self.stream.get_ref() {
let ret = match &self.stream.get_ref().stream {
RawStream::Tcp(s) => s.nodelay().err(),
RawStream::Unix(s) => s.local_addr().err(),
};
Expand Down Expand Up @@ -298,6 +486,7 @@ impl AsyncRead for Stream {
) -> Poll<io::Result<()>> {
let result = Pin::new(&mut self.stream).poll_read(cx, buf);
self.read_pending_time.poll_time(&result);
self.rx_ts = self.stream.get_ref().rx_ts;
result
}
}
Expand Down Expand Up @@ -528,3 +717,42 @@ impl AccumulatedDuration {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::sync::Notify;

#[cfg(target_os = "linux")]
#[tokio::test]
async fn test_rx_timestamp() {
let message = "hello world".as_bytes();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();

tokio::spawn(async move {
let (mut stream, _) = listener.accept().await.unwrap();
notify2.notified().await;
stream.write_all(message).await.unwrap();
});

let mut stream: Stream = TcpStream::connect(addr).await.unwrap().into();
stream.set_rx_timestamp().unwrap();
// Receive the message
// setsockopt for SO_TIMESTAMPING is asynchronous so sleep a little bit
// to let kernel do the work
std::thread::sleep(Duration::from_micros(100));
notify.notify_one();

let mut buffer = vec![0u8; message.len()];
let n = stream.read(buffer.as_mut_slice()).await.unwrap();
assert_eq!(n, message.len());
assert!(stream.rx_ts.is_some());
}
}

0 comments on commit 7e0368d

Please sign in to comment.