Skip to content

Commit

Permalink
Merge branch 'feat-ecn-io' of https://github.com/larseggert/neqo into…
Browse files Browse the repository at this point in the history
… quinn-udp
  • Loading branch information
mxinden committed Jan 30, 2024
2 parents f29ea6d + 2c42747 commit f421e91
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 34 deletions.
4 changes: 3 additions & 1 deletion neqo-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ env_logger = { version = "0.10", default-features = false }
lazy_static = "1.4"
log = { version = "0.4", default-features = false }
qlog = "0.11.0"
time = {version = "0.3.23", features = ["formatting"]}
time = { version = "0.3.23", features = ["formatting"] }
quinn-udp = { git = "https://github.com/quinn-rs/quinn/", optional = true }

[dev-dependencies]
test-fixture = { path = "../test-fixture" }

[features]
deny-warnings = []
ci = []
udp = ["dep:quinn-udp"]

[target."cfg(windows)".dependencies.winapi]
version = "0.3"
Expand Down
2 changes: 2 additions & 0 deletions neqo-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub mod log;
pub mod qlog;
pub mod timer;
pub mod tos;
#[cfg(feature = "udp")]
pub mod udp;

pub use self::codec::{Decoder, Encoder};
pub use self::datagram::Datagram;
Expand Down
23 changes: 23 additions & 0 deletions neqo-common/src/tos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,22 +169,37 @@ impl From<IpTosEcn> for IpTos {
Self(u8::from(v))
}
}

impl From<IpTosDscp> for IpTos {
fn from(v: IpTosDscp) -> Self {
Self(u8::from(v))
}
}

impl From<(IpTosDscp, IpTosEcn)> for IpTos {
fn from(v: (IpTosDscp, IpTosEcn)) -> Self {
Self(u8::from(v.0) | u8::from(v.1))
}
}

impl From<(IpTosEcn, IpTosDscp)> for IpTos {
fn from(v: (IpTosEcn, IpTosDscp)) -> Self {
Self(u8::from(v.0) | u8::from(v.1))
}
}

impl From<IpTos> for u8 {
fn from(v: IpTos) -> Self {
v.0
}
}

impl From<u8> for IpTos {
fn from(v: u8) -> Self {
Self(v)
}
}

impl Debug for IpTos {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("IpTos")
Expand Down Expand Up @@ -287,4 +302,12 @@ mod tests {
let iptos_dscp: IpTos = dscp.into();
assert_eq!(u8::from(iptos_dscp), dscp as u8);
}

#[test]
fn u8_to_iptos() {
let tos = 0x8b;
let iptos: IpTos = (IpTosEcn::Ce, IpTosDscp::Af41).into();
assert_eq!(tos, u8::from(iptos));
assert_eq!(IpTos::from(tos), iptos);
}
}
155 changes: 155 additions & 0 deletions neqo-common/src/udp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use std::{
io::{self, IoSliceMut},
net::SocketAddr,
os::fd::AsFd,
slice,
};

use quinn_udp::{EcnCodepoint, RecvMeta, Transmit, UdpSocketState};

use crate::{Datagram, IpTos};

/// Send the UDP datagram on the specified socket.
///
/// # Arguments
///
/// * `socket` - The UDP socket to send the datagram on.
/// * `d` - The datagram to send.
///
/// # Returns
///
/// An `io::Result` indicating whether the datagram was sent successfully.
///
/// # Errors
///
/// Returns an `io::Error` if the UDP socket fails to send the datagram.
///
/// # Panics
///
/// Panics if the datagram is too large to send.
pub fn tx(socket: impl AsFd, d: &Datagram) -> io::Result<usize> {
// TODO: Don't instantiate on each write.
let send_state = UdpSocketState::new((&socket).into()).unwrap();
let transmit = Transmit {
destination: d.destination(),
ecn: EcnCodepoint::from_bits(Into::<u8>::into(d.tos())),
contents: d[..].to_vec().into(),
segment_size: None,
src_ip: Some(d.source().ip()),
};
let n = send_state
.send((&socket).into(), slice::from_ref(&transmit))
.unwrap();
Ok(n)
}

/// Receive a UDP datagram on the specified socket.
///
/// # Arguments
///
/// * `socket` - The UDP socket to receive the datagram on.
/// * `buf` - The buffer to receive the datagram into.
/// * `tos` - The type-of-service (TOS) or traffic class (TC) value of the received datagram.
/// * `ttl` - The time-to-live (TTL) or hop limit (HL) value of the received datagram.
///
/// # Returns
///
/// An `io::Result` indicating the size of the received datagram and the source address.
///
/// # Errors
///
/// Returns an `io::Error` if the UDP socket fails to receive the datagram.
///
/// # Panics
///
/// Panics if the datagram is too large to receive.
pub fn rx(
socket: impl AsFd,
buf: &mut [u8],
// TODO: Can these be return values instead of mutable inputs?
tos: &mut u8,
ttl: &mut u8,
) -> io::Result<(usize, SocketAddr)> {
let mut meta = RecvMeta::default();
// TODO: Don't instantiate on each read.
let recv_state = UdpSocketState::new((&socket).into()).unwrap();

// TODO: needed?
// #[cfg(test)]
// // `UdpSocketState` switches to non-blocking mode, undo that for the tests.
// socket.set_nonblocking(false).unwrap();

match recv_state.recv(
(&socket).into(),
&mut [IoSliceMut::new(buf)],
slice::from_mut(&mut meta),
) {
Err(e) => Err(e),
Ok(n) => {
*tos = if meta.ecn.is_some() {
meta.ecn.unwrap() as u8
} else {
IpTos::default().into()
};
*ttl = 0xff; // TODO: get the real TTL
Ok((n, meta.addr))
}
}
}

#[cfg(test)]
mod tests {
use crate::{IpTos, IpTosDscp, IpTosEcn};

use super::*;

#[test]
fn datagram_io() {
// Create UDP sockets for testing.
let sender = UdpSocket::bind("127.0.0.1:0").unwrap();
let receiver = UdpSocket::bind("127.0.0.1:8080").unwrap();

// Create a sample datagram.
let tos_tx = IpTos::from((IpTosDscp::Le, IpTosEcn::Ce));
let ttl_tx = 128;
let datagram = Datagram::new(
sender.local_addr().unwrap(),
receiver.local_addr().unwrap(),
tos_tx,
Some(ttl_tx),
"Hello, world!".as_bytes().to_vec(),
);

// Call the emit_datagram function.
let result = tx(&sender, &datagram);

// Assert that the datagram was sent successfully.
assert!(result.is_ok());

// Create a buffer for receiving the datagram.
let mut buf = [0; u16::MAX as usize];

// Create variables for storing TOS and TTL values.
let mut tos_rx = 0;
let mut ttl_rx = 0;

// Call the recv_datagram function.
let result = rx(&receiver, &mut buf, &mut tos_rx, &mut ttl_rx);

// Assert that the datagram was received successfully.
println!("Received {result:?}");
assert!(result.is_ok());

// Assert that the ECN and TTL values are correct.
// TODO: Also check DSCP once quinn-udp supports it.
// assert_eq!(IpTosEcn::from(u8::from(tos_tx)), IpTosEcn::from(tos_rx));
// assert_eq!(tos_tx, tos_rx.into());
assert_ne!(ttl_tx, ttl_rx);
}
}
2 changes: 1 addition & 1 deletion neqo-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ license = "MIT OR Apache-2.0"
[dependencies]
futures = "0.3"
log = {version = "0.4.17", default-features = false}
neqo-common = { path="./../neqo-common" }
neqo-common = { path="./../neqo-common", features = ["udp"] }
neqo-crypto = { path = "./../neqo-crypto" }
neqo-http3 = { path = "./../neqo-http3" }
neqo-qpack = { path = "./../neqo-qpack" }
Expand Down
49 changes: 17 additions & 32 deletions neqo-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ use futures::{
future::{select, select_all, Either},
FutureExt,
};
use tokio::{net::UdpSocket, time::Sleep};
use tokio::time::Sleep;

use neqo_transport::ConnectionIdGenerator;
use structopt::StructOpt;

use neqo_common::{hex, qdebug, qinfo, qwarn, Datagram, Header, IpTos};
use neqo_common::{hex, qdebug, qinfo, qwarn, udp, Datagram, Header};
use neqo_crypto::{
constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256},
generate_ech_keys, init_db, random, AntiReplay, Cipher,
Expand Down Expand Up @@ -320,21 +320,6 @@ impl QuicParameters {
}
}

async fn emit_packet(socket: &mut UdpSocket, out_dgram: Datagram) {
let sent = match socket.send_to(&out_dgram, &out_dgram.destination()).await {
Err(ref err) => {
if err.kind() != io::ErrorKind::WouldBlock || err.kind() == io::ErrorKind::Interrupted {
eprintln!("UDP send error: {err:?}");
}
0
}
Ok(res) => res,
};
if sent != out_dgram.len() {
eprintln!("Unable to send all {} bytes of datagram", out_dgram.len());
}
}

fn qns_read_response(filename: &str) -> Option<Vec<u8>> {
let mut file_path = PathBuf::from("/www");
file_path.push(filename.trim_matches(|p| p == '/'));
Expand Down Expand Up @@ -594,17 +579,14 @@ impl HttpServer for SimpleServer {
}

fn read_dgram(
socket: &mut UdpSocket,
socket: &mut tokio::net::UdpSocket,
local_address: &SocketAddr,
) -> Result<Option<Datagram>, io::Error> {
let buf = &mut [0u8; 2048];
let (sz, remote_addr) = match socket.try_recv_from(&mut buf[..]) {
Err(ref err)
if err.kind() == io::ErrorKind::WouldBlock
|| err.kind() == io::ErrorKind::Interrupted =>
{
return Ok(None)
}
let mut buf = [0; u16::MAX as usize];
let mut tos = 0;
let mut ttl = 0;
let (sz, remote_addr) = match udp::rx(socket, &mut buf[..], &mut tos, &mut ttl) {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => return Ok(None),
Err(err) => {
eprintln!("UDP recv error: {err:?}");
return Err(err);
Expand All @@ -623,8 +605,8 @@ fn read_dgram(
Ok(Some(Datagram::new(
remote_addr,
*local_address,
IpTos::default(),
None,
tos.into(),
Some(ttl),
&buf[..sz],
)))
}
Expand All @@ -634,7 +616,7 @@ struct ServersRunner {
args: Args,
server: Box<dyn HttpServer>,
timeout: Option<Pin<Box<Sleep>>>,
sockets: Vec<(SocketAddr, UdpSocket)>,
sockets: Vec<(SocketAddr, tokio::net::UdpSocket)>,
}

impl ServersRunner {
Expand Down Expand Up @@ -678,13 +660,14 @@ impl ServersRunner {

print!("Server waiting for connection on: {local_addr:?}");

// TODO: needed?
socket
.set_nonblocking(true)
.expect("set_nonblocking to succeed");

self.sockets.push((
host,
UdpSocket::from_std(socket).expect("conversion to Tokio socket to succeed"),
tokio::net::UdpSocket::from_std(socket).expect("conversion to Tokio socket to succeed"),
));
}

Expand Down Expand Up @@ -725,7 +708,7 @@ impl ServersRunner {
}

/// Tries to find a socket, but then just falls back to sending from the first.
fn find_socket(&mut self, addr: SocketAddr) -> &mut UdpSocket {
fn find_socket(&mut self, addr: SocketAddr) -> &mut tokio::net::UdpSocket {
let ((_host, first_socket), rest) = self.sockets.split_first_mut().unwrap();
rest.iter_mut()
.map(|(_host, socket)| socket)
Expand All @@ -743,7 +726,9 @@ impl ServersRunner {
match self.server.process(dgram.take(), self.args.now()) {
Output::Datagram(dgram) => {
let socket = self.find_socket(dgram.source());
emit_packet(socket, dgram).await;
if let Err(e) = udp::tx(socket, &dgram) {
eprintln!("UDP write error: {}", e);
}
}
Output::Callback(new_timeout) => {
qinfo!("Setting timeout of {:?}", new_timeout);
Expand Down

0 comments on commit f421e91

Please sign in to comment.