From 2c42747544f4d3c0d13763cad7534d8b8a1fc026 Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Tue, 23 Jan 2024 14:36:52 +0200 Subject: [PATCH] Initial commit --- neqo-client/Cargo.toml | 8 ++- neqo-client/src/main.rs | 58 +++++++-------- neqo-common/Cargo.toml | 4 +- neqo-common/src/lib.rs | 2 + neqo-common/src/tos.rs | 23 ++++++ neqo-common/src/udp.rs | 150 +++++++++++++++++++++++++++++++++++++++ neqo-interop/src/main.rs | 46 +++++++----- neqo-server/src/main.rs | 35 ++++----- 8 files changed, 256 insertions(+), 70 deletions(-) create mode 100644 neqo-common/src/udp.rs diff --git a/neqo-client/Cargo.toml b/neqo-client/Cargo.toml index ca11186f95..20e1f4e944 100644 --- a/neqo-client/Cargo.toml +++ b/neqo-client/Cargo.toml @@ -1,16 +1,18 @@ [package] name = "neqo-client" version = "0.6.8" -authors = ["Martin Thomson ", +authors = [ + "Martin Thomson ", "Dragana Damjanovic ", - "Andy Grover "] + "Andy Grover ", +] edition = "2018" rust-version = "1.70.0" license = "MIT OR Apache-2.0" [dependencies] mio = "~0.6.23" -neqo-common = { path="./../neqo-common" } +neqo-common = { path = "./../neqo-common", features = ["udp"] } neqo-crypto = { path = "./../neqo-crypto" } neqo-http3 = { path = "./../neqo-http3" } neqo-qpack = { path = "./../neqo-qpack" } diff --git a/neqo-client/src/main.rs b/neqo-client/src/main.rs index 677829ad05..87436610ce 100644 --- a/neqo-client/src/main.rs +++ b/neqo-client/src/main.rs @@ -7,12 +7,11 @@ #![cfg_attr(feature = "deny-warnings", deny(warnings))] #![warn(clippy::use_self)] -use common::IpTos; use qlog::{events::EventImportance, streamer::QlogStreamer}; -use mio::{net::UdpSocket, Events, Poll, PollOpt, Ready, Token}; +use mio::{Events, Poll, PollOpt, Ready, Token}; -use neqo_common::{self as common, event::Provider, hex, qlog::NeqoQlog, Datagram, Role}; +use neqo_common::{self as common, event::Provider, hex, qlog::NeqoQlog, udp, Datagram, Role}; use neqo_crypto::{ constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256}, init, AuthenticationStatus, Cipher, ResumptionToken, @@ -33,7 +32,8 @@ use std::{ fmt::{self, Display}, fs::{create_dir_all, File, OpenOptions}, io::{self, ErrorKind, Write}, - net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs}, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs, UdpSocket}, + os::fd::{AsRawFd, FromRawFd}, path::PathBuf, process::exit, rc::Rc, @@ -346,14 +346,6 @@ impl QuicParameters { } } -fn emit_datagram(socket: &mio::net::UdpSocket, d: Datagram) -> io::Result<()> { - let sent = socket.send_to(&d[..], &d.destination())?; - if sent != d.len() { - eprintln!("Unable to send all {} bytes of datagram", d.len()); - } - Ok(()) -} - fn get_output_file( url: &Url, output_dir: &Option, @@ -414,7 +406,9 @@ fn process_loop( let mut datagrams: Vec = Vec::new(); 'read: loop { - match socket.recv_from(&mut buf[..]) { + let mut tos = 0; + let mut ttl = 0; + match udp::rx(socket, &mut buf[..], &mut tos, &mut ttl) { Err(ref err) if err.kind() == ErrorKind::WouldBlock || err.kind() == ErrorKind::Interrupted => @@ -432,7 +426,7 @@ fn process_loop( } if sz > 0 { let d = - Datagram::new(remote, *local_addr, IpTos::default(), None, &buf[..sz]); + Datagram::new(remote, *local_addr, tos.into(), Some(ttl), &buf[..sz]); datagrams.push(d); } } @@ -452,7 +446,7 @@ fn process_loop( 'write: loop { match client.process_output(Instant::now()) { Output::Datagram(dgram) => { - if let Err(e) = emit_datagram(socket, dgram) { + if let Err(e) = udp::tx(socket, &dgram) { eprintln!("UDP write error: {e}"); client.close(Instant::now(), 0, e.to_string()); exiting = true; @@ -1060,7 +1054,7 @@ fn main() -> Res<()> { SocketAddr::V6(..) => SocketAddr::new(IpAddr::V6(Ipv6Addr::from([0; 16])), 0), }; - let socket = match UdpSocket::bind(&local_addr) { + let socket = match UdpSocket::bind(local_addr) { Err(e) => { eprintln!("Unable to bind UDP socket: {e}"); exit(1) @@ -1068,15 +1062,17 @@ fn main() -> Res<()> { Ok(s) => s, }; + let real_local = socket.local_addr().unwrap(); + let mio_socket = + unsafe { ::from_raw_fd(socket.as_raw_fd()) }; let poll = Poll::new()?; poll.register( - &socket, + &mio_socket, Token(0), Ready::readable() | Ready::writable(), PollOpt::edge(), )?; - let real_local = socket.local_addr().unwrap(); println!( "{} Client connecting: {:?} -> {:?}", if args.use_old_http { "H9" } else { "H3" }, @@ -1131,7 +1127,7 @@ mod old { collections::{HashMap, VecDeque}, fs::File, io::{ErrorKind, Write}, - net::SocketAddr, + net::{SocketAddr, UdpSocket}, path::PathBuf, process::exit, rc::Rc, @@ -1142,14 +1138,18 @@ mod old { use super::{qlog_new, KeyUpdateState, Res}; use mio::{Events, Poll}; - use neqo_common::{event::Provider, Datagram, IpTos}; + use neqo_common::{ + event::Provider, + udp::{self}, + Datagram, + }; use neqo_crypto::{AuthenticationStatus, ResumptionToken}; use neqo_transport::{ Connection, ConnectionEvent, EmptyConnectionIdGenerator, Error, Output, State, StreamId, StreamType, }; - use super::{emit_datagram, get_output_file, Args}; + use super::{get_output_file, Args}; struct HandlerOld<'b> { streams: HashMap>, @@ -1335,12 +1335,12 @@ mod old { fn process_loop_old( local_addr: &SocketAddr, - socket: &mio::net::UdpSocket, + socket: &UdpSocket, poll: &Poll, client: &mut Connection, handler: &mut HandlerOld, ) -> Res { - let buf = &mut [0u8; 2048]; + let mut buf = [0; u16::MAX as usize]; let mut events = Events::with_capacity(1024); let mut timeout: Option = None; loop { @@ -1350,7 +1350,9 @@ mod old { )?; 'read: loop { - match socket.recv_from(&mut buf[..]) { + let mut tos = 0; + let mut ttl = 0; + match udp::rx(socket, &mut buf[..], &mut tos, &mut ttl) { Err(ref err) if err.kind() == ErrorKind::WouldBlock || err.kind() == ErrorKind::Interrupted => @@ -1370,8 +1372,8 @@ mod old { let d = Datagram::new( remote, *local_addr, - IpTos::default(), - None, + tos.into(), + Some(ttl), &buf[..sz], ); client.process_input(&d, Instant::now()); @@ -1390,7 +1392,7 @@ mod old { 'write: loop { match client.process_output(Instant::now()) { Output::Datagram(dgram) => { - if let Err(e) = emit_datagram(socket, dgram) { + if let Err(e) = udp::tx(socket, &dgram) { eprintln!("UDP write error: {e}"); client.close(Instant::now(), 0, e.to_string()); exiting = true; @@ -1418,7 +1420,7 @@ mod old { #[allow(clippy::too_many_arguments)] pub fn old_client( args: &Args, - socket: &mio::net::UdpSocket, + socket: &UdpSocket, poll: &Poll, local_addr: SocketAddr, remote_addr: SocketAddr, diff --git a/neqo-common/Cargo.toml b/neqo-common/Cargo.toml index f6fd952a18..6ad9599eb7 100644 --- a/neqo-common/Cargo.toml +++ b/neqo-common/Cargo.toml @@ -13,7 +13,8 @@ env_logger = { version = "0.10", default-features = false } lazy_static = "1.4" log = { version = "0.4", default-features = false } qlog = "0.11.0" -time = {version = "0.3.23", features = ["formatting"]} +time = { version = "0.3.23", features = ["formatting"] } +quinn-udp = { git = "https://github.com/quinn-rs/quinn/", optional = true } [dev-dependencies] test-fixture = { path = "../test-fixture" } @@ -21,6 +22,7 @@ test-fixture = { path = "../test-fixture" } [features] deny-warnings = [] ci = [] +udp = ["dep:quinn-udp"] [target."cfg(windows)".dependencies.winapi] version = "0.3" diff --git a/neqo-common/src/lib.rs b/neqo-common/src/lib.rs index 202f39e0fb..ad7d765436 100644 --- a/neqo-common/src/lib.rs +++ b/neqo-common/src/lib.rs @@ -17,6 +17,8 @@ pub mod log; pub mod qlog; pub mod timer; pub mod tos; +#[cfg(feature = "udp")] +pub mod udp; pub use self::codec::{Decoder, Encoder}; pub use self::datagram::Datagram; diff --git a/neqo-common/src/tos.rs b/neqo-common/src/tos.rs index aa360d1d53..9e937b3266 100644 --- a/neqo-common/src/tos.rs +++ b/neqo-common/src/tos.rs @@ -169,22 +169,37 @@ impl From for IpTos { Self(u8::from(v)) } } + impl From for IpTos { fn from(v: IpTosDscp) -> Self { Self(u8::from(v)) } } + impl From<(IpTosDscp, IpTosEcn)> for IpTos { fn from(v: (IpTosDscp, IpTosEcn)) -> Self { Self(u8::from(v.0) | u8::from(v.1)) } } + +impl From<(IpTosEcn, IpTosDscp)> for IpTos { + fn from(v: (IpTosEcn, IpTosDscp)) -> Self { + Self(u8::from(v.0) | u8::from(v.1)) + } +} + impl From for u8 { fn from(v: IpTos) -> Self { v.0 } } +impl From for IpTos { + fn from(v: u8) -> Self { + Self(v) + } +} + impl Debug for IpTos { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_tuple("IpTos") @@ -287,4 +302,12 @@ mod tests { let iptos_dscp: IpTos = dscp.into(); assert_eq!(u8::from(iptos_dscp), dscp as u8); } + + #[test] + fn u8_to_iptos() { + let tos = 0x8b; + let iptos: IpTos = (IpTosEcn::Ce, IpTosDscp::Af41).into(); + assert_eq!(tos, u8::from(iptos)); + assert_eq!(IpTos::from(tos), iptos); + } } diff --git a/neqo-common/src/udp.rs b/neqo-common/src/udp.rs new file mode 100644 index 0000000000..d248659d35 --- /dev/null +++ b/neqo-common/src/udp.rs @@ -0,0 +1,150 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::{ + io::{self, IoSliceMut}, + net::{SocketAddr, UdpSocket}, + slice, +}; + +use quinn_udp::{EcnCodepoint, RecvMeta, Transmit, UdpSocketState}; + +use crate::{Datagram, IpTos}; + +/// Send the UDP datagram on the specified socket. +/// +/// # Arguments +/// +/// * `socket` - The UDP socket to send the datagram on. +/// * `d` - The datagram to send. +/// +/// # Returns +/// +/// An `io::Result` indicating whether the datagram was sent successfully. +/// +/// # Errors +/// +/// Returns an `io::Error` if the UDP socket fails to send the datagram. +/// +/// # Panics +/// +/// Panics if the datagram is too large to send. +pub fn tx(socket: &UdpSocket, d: &Datagram) -> io::Result { + let send_state = UdpSocketState::new(socket.into()).unwrap(); + let transmit = Transmit { + destination: d.destination(), + ecn: EcnCodepoint::from_bits(Into::::into(d.tos())), + contents: d[..].to_vec().into(), + segment_size: None, + src_ip: Some(d.source().ip()), + }; + let n = send_state + .send(socket.into(), slice::from_ref(&transmit)) + .unwrap(); + Ok(n) +} + +/// Receive a UDP datagram on the specified socket. +/// +/// # Arguments +/// +/// * `socket` - The UDP socket to receive the datagram on. +/// * `buf` - The buffer to receive the datagram into. +/// * `tos` - The type-of-service (TOS) or traffic class (TC) value of the received datagram. +/// * `ttl` - The time-to-live (TTL) or hop limit (HL) value of the received datagram. +/// +/// # Returns +/// +/// An `io::Result` indicating the size of the received datagram and the source address. +/// +/// # Errors +/// +/// Returns an `io::Error` if the UDP socket fails to receive the datagram. +/// +/// # Panics +/// +/// Panics if the datagram is too large to receive. +pub fn rx( + socket: &UdpSocket, + buf: &mut [u8], + tos: &mut u8, + ttl: &mut u8, +) -> io::Result<(usize, SocketAddr)> { + let mut meta = RecvMeta::default(); + let recv_state = UdpSocketState::new(socket.into()).unwrap(); + + #[cfg(test)] + // `UdpSocketState` switches to non-blocking mode, undo that for the tests. + socket.set_nonblocking(false).unwrap(); + + match recv_state.recv( + socket.into(), + &mut [IoSliceMut::new(buf)], + slice::from_mut(&mut meta), + ) { + Err(e) => Err(e), + Ok(n) => { + *tos = if meta.ecn.is_some() { + meta.ecn.unwrap() as u8 + } else { + IpTos::default().into() + }; + *ttl = 0xff; // TODO: get the real TTL + Ok((n, meta.addr)) + } + } +} + +#[cfg(test)] +mod tests { + use crate::{IpTos, IpTosDscp, IpTosEcn}; + + use super::*; + + #[test] + fn datagram_io() { + // Create UDP sockets for testing. + let sender = UdpSocket::bind("127.0.0.1:0").unwrap(); + let receiver = UdpSocket::bind("127.0.0.1:8080").unwrap(); + + // Create a sample datagram. + let tos_tx = IpTos::from((IpTosDscp::Le, IpTosEcn::Ce)); + let ttl_tx = 128; + let datagram = Datagram::new( + sender.local_addr().unwrap(), + receiver.local_addr().unwrap(), + tos_tx, + Some(ttl_tx), + "Hello, world!".as_bytes().to_vec(), + ); + + // Call the emit_datagram function. + let result = tx(&sender, &datagram); + + // Assert that the datagram was sent successfully. + assert!(result.is_ok()); + + // Create a buffer for receiving the datagram. + let mut buf = [0; u16::MAX as usize]; + + // Create variables for storing TOS and TTL values. + let mut tos_rx = 0; + let mut ttl_rx = 0; + + // Call the recv_datagram function. + let result = rx(&receiver, &mut buf, &mut tos_rx, &mut ttl_rx); + + // Assert that the datagram was received successfully. + println!("Received {result:?}"); + assert!(result.is_ok()); + + // Assert that the ECN and TTL values are correct. + // TODO: Also check DSCP once quinn-udp supports it. + // assert_eq!(IpTosEcn::from(u8::from(tos_tx)), IpTosEcn::from(tos_rx)); + // assert_eq!(tos_tx, tos_rx.into()); + assert_ne!(ttl_tx, ttl_rx); + } +} diff --git a/neqo-interop/src/main.rs b/neqo-interop/src/main.rs index 254b953f22..a6b5d33162 100644 --- a/neqo-interop/src/main.rs +++ b/neqo-interop/src/main.rs @@ -7,7 +7,12 @@ #![cfg_attr(feature = "deny-warnings", deny(warnings))] #![warn(clippy::use_self)] -use neqo_common::{event::Provider, hex, Datagram, IpTos}; +use neqo_common::{ + event::Provider, + hex, + udp::{rx, tx}, + Datagram, +}; use neqo_crypto::{init, AuthenticationStatus, ResumptionToken}; use neqo_http3::{Header, Http3Client, Http3ClientEvent, Http3Parameters, Http3State, Priority}; use neqo_transport::{ @@ -60,13 +65,6 @@ trait Handler { } } -fn emit_datagram(socket: &UdpSocket, d: Datagram) { - let sent = socket.send(&d[..]).expect("Error sending datagram"); - if sent != d.len() { - eprintln!("Unable to send all {} bytes of datagram", d.len()); - } -} - lazy_static::lazy_static! { static ref TEST_TIMEOUT: Mutex = Mutex::new(Duration::from_secs(5)); } @@ -103,7 +101,7 @@ fn process_loop( client: &mut Connection, handler: &mut dyn Handler, ) -> Result { - let buf = &mut [0u8; 2048]; + let mut buf = [0; u16::MAX as usize]; let timer = Timer::new(); loop { @@ -116,7 +114,10 @@ fn process_loop( match output { Output::Datagram(dgram) => { let dgram = handler.rewrite_out(&dgram).unwrap_or(dgram); - emit_datagram(&nctx.socket, dgram); + if let Err(e) = tx(&nctx.socket, &dgram) { + eprintln!("UDP write error: {e}"); + continue; + } } Output::Callback(duration) => { let delay = min(timer.check()?, duration); @@ -133,7 +134,9 @@ fn process_loop( return Ok(client.state().clone()); } - let sz = match nctx.socket.recv(&mut buf[..]) { + let mut tos = 0; + let mut ttl = 0; + let (sz, _) = match rx(&nctx.socket, &mut buf[..], &mut tos, &mut ttl) { Ok(sz) => sz, Err(e) => { return Err(String::from(match e.kind() { @@ -151,8 +154,8 @@ fn process_loop( let received = Datagram::new( nctx.remote_addr, nctx.local_addr, - IpTos::default(), - None, + tos.into(), + Some(ttl), &buf[..sz], ); client.process_input(&received, Instant::now()); @@ -268,7 +271,7 @@ fn process_loop_h3( connect: bool, close: bool, ) -> Result { - let buf = &mut [0u8; 2048]; + let mut buf = [0; u16::MAX as usize]; let timer = Timer::new(); loop { @@ -285,7 +288,12 @@ fn process_loop_h3( loop { let output = handler.h3.conn().process_output(Instant::now()); match output { - Output::Datagram(dgram) => emit_datagram(&nctx.socket, dgram), + Output::Datagram(dgram) => { + if let Err(e) = tx(&nctx.socket, &dgram) { + eprintln!("UDP write error: {e}"); + break; + } + } Output::Callback(duration) => { let delay = min(timer.check()?, duration); nctx.socket.set_read_timeout(Some(delay)).unwrap(); @@ -300,7 +308,9 @@ fn process_loop_h3( return Ok(handler.h3.conn().state().clone()); } - let sz = match nctx.socket.recv(&mut buf[..]) { + let mut tos = 0; + let mut ttl = 0; + let (sz, _) = match rx(&nctx.socket, &mut buf[..], &mut tos, &mut ttl) { Ok(sz) => sz, Err(e) => { return Err(String::from(match e.kind() { @@ -318,8 +328,8 @@ fn process_loop_h3( let received = Datagram::new( nctx.remote_addr, nctx.local_addr, - IpTos::default(), - None, + tos.into(), + Some(ttl), &buf[..sz], ); handler.h3.process_input(&received, Instant::now()); diff --git a/neqo-server/src/main.rs b/neqo-server/src/main.rs index eb6a00b8cc..97c53e92a8 100644 --- a/neqo-server/src/main.rs +++ b/neqo-server/src/main.rs @@ -17,7 +17,7 @@ use std::{ io, io::Read, mem, - net::{SocketAddr, ToSocketAddrs}, + net::{SocketAddr, ToSocketAddrs, UdpSocket}, path::PathBuf, process::exit, rc::Rc, @@ -25,12 +25,12 @@ use std::{ time::{Duration, Instant}, }; -use mio::{net::UdpSocket, Events, Poll, PollOpt, Ready, Token}; +use mio::{Events, Poll, PollOpt, Ready, Token}; use mio_extras::timer::{Builder, Timeout, Timer}; use neqo_transport::ConnectionIdGenerator; use structopt::StructOpt; -use neqo_common::{hex, qdebug, qinfo, qwarn, Datagram, Header, IpTos}; +use neqo_common::{hex, qdebug, qinfo, qwarn, udp, Datagram, Header}; use neqo_crypto::{ constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256}, generate_ech_keys, init_db, random, AntiReplay, Cipher, @@ -317,15 +317,6 @@ impl QuicParameters { } } -fn emit_packet(socket: &mut UdpSocket, out_dgram: Datagram) { - let sent = socket - .send_to(&out_dgram, &out_dgram.destination()) - .expect("Error sending datagram"); - if sent != out_dgram.len() { - eprintln!("Unable to send all {} bytes of datagram", out_dgram.len()); - } -} - fn qns_read_response(filename: &str) -> Option> { let mut file_path = PathBuf::from("/www"); file_path.push(filename.trim_matches(|p| p == '/')); @@ -588,8 +579,10 @@ fn read_dgram( socket: &mut UdpSocket, local_address: &SocketAddr, ) -> Result, io::Error> { - let buf = &mut [0u8; 2048]; - let (sz, remote_addr) = match socket.recv_from(&mut buf[..]) { + let mut buf = [0; u16::MAX as usize]; + let mut tos = 0; + let mut ttl = 0; + let (sz, remote_addr) = match udp::rx(socket, &mut buf[..], &mut tos, &mut ttl) { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => return Ok(None), Err(err) => { eprintln!("UDP recv error: {err:?}"); @@ -609,8 +602,8 @@ fn read_dgram( Ok(Some(Datagram::new( remote_addr, *local_address, - IpTos::default(), - None, + tos.into(), + Some(ttl), &buf[..sz], ))) } @@ -672,15 +665,15 @@ impl ServersRunner { Ok(s) => s, }; - let also_v4 = if socket.only_v6().unwrap_or(true) { + let mio_socket = mio::net::UdpSocket::from_socket(socket.try_clone()?)?; + let also_v4 = if mio_socket.only_v6().unwrap_or(true) { "" } else { " as well as V4" }; println!("Server waiting for connection on: {local_addr:?}{also_v4}"); - self.poll.register( - &socket, + &mio_socket, Token(i), Ready::readable() | Ready::writable(), PollOpt::edge(), @@ -744,7 +737,9 @@ impl ServersRunner { match self.server.process(dgram, self.args.now()) { Output::Datagram(dgram) => { let socket = self.find_socket(dgram.source()); - emit_packet(socket, dgram); + if let Err(e) = udp::tx(socket, &dgram) { + eprintln!("UDP write error: {}", e); + } true } Output::Callback(new_timeout) => {