From e85a76ce56c2bfc000d3a2a4e85192f8c2c15ce0 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sun, 17 Mar 2024 21:08:27 +0100 Subject: [PATCH] feat(bin): add single connection benchmark Benchmark neqo-server -> neqo-client 1GB transfer using criterion. --- neqo-bin/Cargo.toml | 12 + neqo-bin/benches/main.rs | 48 ++ neqo-bin/src/bin/client/main.rs | 534 +---------------- neqo-bin/src/bin/server/main.rs | 615 +------------------- neqo-bin/src/{bin => }/client/http09.rs | 3 +- neqo-bin/src/{bin => }/client/http3.rs | 2 +- neqo-bin/src/client/mod.rs | 562 ++++++++++++++++++ neqo-bin/src/lib.rs | 32 ++ neqo-bin/src/server/mod.rs | 633 +++++++++++++++++++++ neqo-bin/src/{bin => }/server/old_https.rs | 0 10 files changed, 1295 insertions(+), 1146 deletions(-) create mode 100644 neqo-bin/benches/main.rs rename neqo-bin/src/{bin => }/client/http09.rs (99%) rename neqo-bin/src/{bin => }/client/http3.rs (99%) create mode 100644 neqo-bin/src/client/mod.rs create mode 100644 neqo-bin/src/server/mod.rs rename neqo-bin/src/{bin => }/server/old_https.rs (100%) diff --git a/neqo-bin/Cargo.toml b/neqo-bin/Cargo.toml index d36d2ecdca..6d7cb019a7 100644 --- a/neqo-bin/Cargo.toml +++ b/neqo-bin/Cargo.toml @@ -39,6 +39,18 @@ regex = { version = "1.9", default-features = false, features = ["unicode-perl"] tokio = { version = "1", default-features = false, features = ["net", "time", "macros", "rt", "rt-multi-thread"] } url = { version = "2.5", default-features = false } +[dev-dependencies] +criterion = { version = "0.5", default-features = false, features = ["html_reports", "async_tokio"] } +tokio = { version = "1", default-features = false, features = ["sync"] } + +[features] +bench = [] + [lib] # See https://github.com/bheisler/criterion.rs/blob/master/book/src/faq.md#cargo-bench-gives-unrecognized-option-errors-for-valid-command-line-options bench = false + +[[bench]] +name = "main" +harness = false +required-features = ["bench"] diff --git a/neqo-bin/benches/main.rs b/neqo-bin/benches/main.rs new file mode 100644 index 0000000000..715dcd8b06 --- /dev/null +++ b/neqo-bin/benches/main.rs @@ -0,0 +1,48 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::{path::PathBuf, str::FromStr}; + +use criterion::{criterion_group, criterion_main, Criterion, Throughput}; + +fn benchmark_transfer(c: &mut Criterion) { + neqo_crypto::init_db(PathBuf::from_str("../test-fixture/db").unwrap()); + + let runtime = tokio::runtime::Runtime::new().unwrap(); + + let mut group = c.benchmark_group("main"); + group.throughput(Throughput::Bytes(1073741824)); + + let (done_sender, mut done_receiver) = tokio::sync::oneshot::channel(); + std::thread::spawn(move || { + tokio::runtime::Runtime::new().unwrap().block_on(async { + let mut server = Box::pin(neqo_bin::server::server(neqo_bin::server::Args::new())); + tokio::select! { + _ = &mut done_receiver => {} + _ = &mut server => {} + } + }) + }); + + group.bench_function("single-connection", |b| { + b.to_async(&runtime).iter(|| async move { + neqo_bin::client::client(neqo_bin::client::Args::new()) + .await + .unwrap(); + }) + }); + + group.finish(); + + done_sender.send(()).unwrap(); +} + +criterion_group! { + name = transfer; + config = Criterion::default().sample_size(10); + targets = benchmark_transfer +} +criterion_main!(transfer); diff --git a/neqo-bin/src/bin/client/main.rs b/neqo-bin/src/bin/client/main.rs index d472dfb2bc..ec7dd0177c 100644 --- a/neqo-bin/src/bin/client/main.rs +++ b/neqo-bin/src/bin/client/main.rs @@ -4,539 +4,11 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::{ - collections::{HashMap, VecDeque}, - fmt::{self, Display}, - fs::{create_dir_all, File, OpenOptions}, - io, - net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs}, - path::PathBuf, - pin::Pin, - process::exit, - time::Instant, -}; - use clap::Parser; -use futures::{ - future::{select, Either}, - FutureExt, TryFutureExt, -}; -use neqo_bin::udp; -use neqo_common::{self as common, qdebug, qinfo, qlog::NeqoQlog, Datagram, Role}; -use neqo_crypto::{ - constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256}, - init, Cipher, ResumptionToken, -}; -use neqo_http3::{Error, Output}; -use neqo_transport::{AppError, ConnectionId, Error as TransportError, Version}; -use qlog::{events::EventImportance, streamer::QlogStreamer}; -use tokio::time::Sleep; -use url::{Origin, Url}; - -mod http09; -mod http3; - -#[derive(Debug)] -pub enum ClientError { - ArgumentError(&'static str), - Http3Error(neqo_http3::Error), - IoError(io::Error), - QlogError, - TransportError(neqo_transport::Error), -} - -impl From for ClientError { - fn from(err: io::Error) -> Self { - Self::IoError(err) - } -} - -impl From for ClientError { - fn from(err: neqo_http3::Error) -> Self { - Self::Http3Error(err) - } -} - -impl From for ClientError { - fn from(_err: qlog::Error) -> Self { - Self::QlogError - } -} - -impl From for ClientError { - fn from(err: neqo_transport::Error) -> Self { - Self::TransportError(err) - } -} - -impl Display for ClientError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Error: {self:?}")?; - Ok(()) - } -} - -impl std::error::Error for ClientError {} - -type Res = Result; - -/// Track whether a key update is needed. -#[derive(Debug, PartialEq, Eq)] -struct KeyUpdateState(bool); - -impl KeyUpdateState { - pub fn maybe_update(&mut self, update_fn: F) -> Res<()> - where - F: FnOnce() -> Result<(), E>, - E: Into, - { - if self.0 { - if let Err(e) = update_fn() { - let e = e.into(); - match e { - ClientError::TransportError(TransportError::KeyUpdateBlocked) - | ClientError::Http3Error(Error::TransportError( - TransportError::KeyUpdateBlocked, - )) => (), - _ => return Err(e), - } - } else { - println!("Keys updated"); - self.0 = false; - } - } - Ok(()) - } - - fn needed(&self) -> bool { - self.0 - } -} - -#[derive(Debug, Parser)] -#[command(author, version, about, long_about = None)] -#[allow(clippy::struct_excessive_bools)] // Not a good use of that lint. -pub struct Args { - #[command(flatten)] - shared: neqo_bin::SharedArgs, - - urls: Vec, - - #[arg(short = 'm', default_value = "GET")] - method: String, - - #[arg(short = 'H', long, number_of_values = 2)] - header: Vec, - - #[arg(name = "max-push", short = 'p', long, default_value = "10")] - max_concurrent_push_streams: u64, - - #[arg(name = "download-in-series", long)] - /// Download resources in series using separate connections. - download_in_series: bool, - - #[arg(name = "concurrency", long, default_value = "100")] - /// The maximum number of requests to have outstanding at one time. - concurrency: usize, - - #[arg(name = "output-read-data", long)] - /// Output received data to stdout - output_read_data: bool, - - #[arg(name = "output-dir", long)] - /// Save contents of fetched URLs to a directory - output_dir: Option, - - #[arg(short = 'r', long)] - /// Client attempts to resume by making multiple connections to servers. - /// Requires that 2 or more URLs are listed for each server. - /// Use this for 0-RTT: the stack always attempts 0-RTT on resumption. - resume: bool, - - #[arg(name = "key-update", long)] - /// Attempt to initiate a key update immediately after confirming the connection. - key_update: bool, - - #[arg(name = "ech", long, value_parser = |s: &str| hex::decode(s))] - /// Enable encrypted client hello (ECH). - /// This takes an encoded ECH configuration in hexadecimal format. - ech: Option>, - - #[arg(name = "ipv4-only", short = '4', long)] - /// Connect only over IPv4 - ipv4_only: bool, - - #[arg(name = "ipv6-only", short = '6', long)] - /// Connect only over IPv6 - ipv6_only: bool, - - /// The test that this client will run. Currently, we only support "upload". - #[arg(name = "test", long)] - test: Option, - - /// The request size that will be used for upload test. - #[arg(name = "upload-size", long, default_value = "100")] - upload_size: usize, -} - -impl Args { - fn get_ciphers(&self) -> Vec { - self.shared - .ciphers - .iter() - .filter_map(|c| match c.as_str() { - "TLS_AES_128_GCM_SHA256" => Some(TLS_AES_128_GCM_SHA256), - "TLS_AES_256_GCM_SHA384" => Some(TLS_AES_256_GCM_SHA384), - "TLS_CHACHA20_POLY1305_SHA256" => Some(TLS_CHACHA20_POLY1305_SHA256), - _ => None, - }) - .collect::>() - } - - fn update_for_tests(&mut self) { - let Some(testcase) = self.shared.qns_test.as_ref() else { - return; - }; - - // Only use v1 for most QNS tests. - self.shared.quic_parameters.quic_version = vec![Version::Version1]; - match testcase.as_str() { - // TODO: Add "ecn" when that is ready. - "http3" => { - if let Some(testcase) = &self.test { - if testcase.as_str() != "upload" { - eprintln!("Unsupported test case: {testcase}"); - exit(127) - } - - self.method = String::from("POST"); - } - } - "handshake" | "transfer" | "retry" => { - self.shared.use_old_http = true; - } - "zerortt" | "resumption" => { - if self.urls.len() < 2 { - eprintln!("Warning: resumption tests won't work without >1 URL"); - exit(127); - } - self.shared.use_old_http = true; - self.resume = true; - } - "multiconnect" => { - self.shared.use_old_http = true; - self.download_in_series = true; - } - "chacha20" => { - self.shared.use_old_http = true; - self.shared.ciphers.clear(); - self.shared - .ciphers - .extend_from_slice(&[String::from("TLS_CHACHA20_POLY1305_SHA256")]); - } - "keyupdate" => { - self.shared.use_old_http = true; - self.key_update = true; - } - "v2" => { - self.shared.use_old_http = true; - // Use default version set for this test (which allows compatible vneg.) - self.shared.quic_parameters.quic_version.clear(); - } - _ => exit(127), - } - } -} - -fn get_output_file( - url: &Url, - output_dir: &Option, - all_paths: &mut Vec, -) -> Option { - if let Some(ref dir) = output_dir { - let mut out_path = dir.clone(); - - let url_path = if url.path() == "/" { - // If no path is given... call it "root"? - "root" - } else { - // Omit leading slash - &url.path()[1..] - }; - out_path.push(url_path); - - if all_paths.contains(&out_path) { - eprintln!("duplicate path {}", out_path.display()); - return None; - } - - eprintln!("Saving {url} to {out_path:?}"); - - if let Some(parent) = out_path.parent() { - create_dir_all(parent).ok()?; - } - - let f = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&out_path) - .ok()?; - - all_paths.push(out_path); - Some(f) - } else { - None - } -} - -enum Ready { - Socket, - Timeout, -} - -// Wait for the socket to be readable or the timeout to fire. -async fn ready( - socket: &udp::Socket, - mut timeout: Option<&mut Pin>>, -) -> Result { - let socket_ready = Box::pin(socket.readable()).map_ok(|()| Ready::Socket); - let timeout_ready = timeout - .as_mut() - .map_or(Either::Right(futures::future::pending()), Either::Left) - .map(|()| Ok(Ready::Timeout)); - select(socket_ready, timeout_ready).await.factor_first().0 -} - -/// Handles a given task on the provided [`Client`]. -trait Handler { - type Client: Client; - - fn handle(&mut self, client: &mut Self::Client) -> Res; - fn maybe_key_update(&mut self, c: &mut Self::Client) -> Res<()>; - fn take_token(&mut self) -> Option; - fn has_token(&self) -> bool; -} - -/// Network client, e.g. [`neqo_transport::Connection`] or [`neqo_http3::Http3Client`]. -trait Client { - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output; - fn close(&mut self, now: Instant, app_error: AppError, msg: S) - where - S: AsRef + Display; - fn is_closed(&self) -> bool; -} - -struct Runner<'a, H: Handler> { - local_addr: SocketAddr, - socket: &'a mut udp::Socket, - client: H::Client, - handler: H, - timeout: Option>>, - args: &'a Args, -} - -impl<'a, H: Handler> Runner<'a, H> { - async fn run(mut self) -> Res> { - loop { - let handler_done = self.handler.handle(&mut self.client)?; - - match (handler_done, self.args.resume, self.handler.has_token()) { - // Handler isn't done. Continue. - (false, _, _) => {}, - // Handler done. Resumption token needed but not present. Continue. - (true, true, false) => { - qdebug!("Handler done. Waiting for resumption token."); - } - // Handler is done, no resumption token needed. Close. - (true, false, _) | - // Handler is done, resumption token needed and present. Close. - (true, true, true) => { - self.client.close(Instant::now(), 0, "kthxbye!"); - } - } - - self.process(None).await?; - - if self.client.is_closed() { - return Ok(self.handler.take_token()); - } - - match ready(self.socket, self.timeout.as_mut()).await? { - Ready::Socket => loop { - let dgrams = self.socket.recv(&self.local_addr)?; - if dgrams.is_empty() { - break; - } - for dgram in &dgrams { - self.process(Some(dgram)).await?; - } - self.handler.maybe_key_update(&mut self.client)?; - }, - Ready::Timeout => { - self.timeout = None; - } - } - } - } - - async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> { - loop { - match self.client.process(dgram.take(), Instant::now()) { - Output::Datagram(dgram) => { - self.socket.writable().await?; - self.socket.send(dgram)?; - } - Output::Callback(new_timeout) => { - qinfo!("Setting timeout of {:?}", new_timeout); - self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout))); - break; - } - Output::None => { - qdebug!("Output::None"); - break; - } - } - } - - Ok(()) - } -} - -fn qlog_new(args: &Args, hostname: &str, cid: &ConnectionId) -> Res { - if let Some(qlog_dir) = &args.shared.qlog_dir { - let mut qlog_path = qlog_dir.clone(); - let filename = format!("{hostname}-{cid}.sqlog"); - qlog_path.push(filename); - - let f = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&qlog_path)?; - - let streamer = QlogStreamer::new( - qlog::QLOG_VERSION.to_string(), - Some("Example qlog".to_string()), - Some("Example qlog description".to_string()), - None, - std::time::Instant::now(), - common::qlog::new_trace(Role::Client), - EventImportance::Base, - Box::new(f), - ); - - Ok(NeqoQlog::enabled(streamer, qlog_path)?) - } else { - Ok(NeqoQlog::disabled()) - } -} #[tokio::main] -async fn main() -> Res<()> { - init(); - - let mut args = Args::parse(); - args.update_for_tests(); - - let urls_by_origin = args - .urls - .clone() - .into_iter() - .fold(HashMap::>::new(), |mut urls, url| { - urls.entry(url.origin()).or_default().push_back(url); - urls - }) - .into_iter() - .filter_map(|(origin, urls)| match origin { - Origin::Tuple(_scheme, h, p) => Some(((h, p), urls)), - Origin::Opaque(x) => { - eprintln!("Opaque origin {x:?}"); - None - } - }); - - for ((host, port), mut urls) in urls_by_origin { - if args.resume && urls.len() < 2 { - eprintln!("Resumption to {host} cannot work without at least 2 URLs."); - exit(127); - } - - let remote_addr = format!("{host}:{port}").to_socket_addrs()?.find(|addr| { - !matches!( - (addr, args.ipv4_only, args.ipv6_only), - (SocketAddr::V4(..), false, true) | (SocketAddr::V6(..), true, false) - ) - }); - let Some(remote_addr) = remote_addr else { - eprintln!("No compatible address found for: {host}"); - exit(1); - }; - - let local_addr = match remote_addr { - SocketAddr::V4(..) => SocketAddr::new(IpAddr::V4(Ipv4Addr::from([0; 4])), 0), - SocketAddr::V6(..) => SocketAddr::new(IpAddr::V6(Ipv6Addr::from([0; 16])), 0), - }; - - let mut socket = udp::Socket::bind(local_addr)?; - let real_local = socket.local_addr().unwrap(); - println!( - "{} Client connecting: {:?} -> {:?}", - if args.shared.use_old_http { "H9" } else { "H3" }, - real_local, - remote_addr, - ); - - let hostname = format!("{host}"); - let mut token: Option = None; - let mut first = true; - while !urls.is_empty() { - let to_request = if (args.resume && first) || args.download_in_series { - urls.pop_front().into_iter().collect() - } else { - std::mem::take(&mut urls) - }; - - first = false; - - let key_update = KeyUpdateState(args.key_update); - - token = if args.shared.use_old_http { - let client = - http09::create_client(&args, real_local, remote_addr, &hostname, token) - .expect("failed to create client"); - - let handler = http09::Handler::new(to_request, &args, key_update); - - Runner { - args: &args, - client, - handler, - local_addr: real_local, - socket: &mut socket, - timeout: None, - } - .run() - .await? - } else { - let client = http3::create_client(&args, real_local, remote_addr, &hostname, token) - .expect("failed to create client"); - - let handler = http3::Handler::new(to_request, &args, key_update); - - Runner { - args: &args, - client, - handler, - local_addr: real_local, - socket: &mut socket, - timeout: None, - } - .run() - .await? - }; - } - } +async fn main() -> Result<(), neqo_bin::client::ClientError> { + let args = neqo_bin::client::Args::parse(); - Ok(()) + neqo_bin::client::client(args).await } diff --git a/neqo-bin/src/bin/server/main.rs b/neqo-bin/src/bin/server/main.rs index f694cf98c1..8d166c7487 100644 --- a/neqo-bin/src/bin/server/main.rs +++ b/neqo-bin/src/bin/server/main.rs @@ -4,620 +4,11 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::{ - cell::RefCell, - cmp::min, - collections::HashMap, - fmt::{self, Display}, - fs::OpenOptions, - io::{self, Read}, - net::{SocketAddr, ToSocketAddrs}, - path::PathBuf, - pin::Pin, - process::exit, - rc::Rc, - time::{Duration, Instant}, -}; - use clap::Parser; -use futures::{ - future::{select, select_all, Either}, - FutureExt, -}; -use neqo_bin::udp; -use neqo_common::{hex, qinfo, qwarn, Datagram, Header}; -use neqo_crypto::{ - constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256}, - generate_ech_keys, init_db, random, AntiReplay, Cipher, -}; -use neqo_http3::{ - Error, Http3OrWebTransportStream, Http3Parameters, Http3Server, Http3ServerEvent, StreamId, -}; -use neqo_transport::{ - server::ValidateAddress, ConnectionIdGenerator, Output, RandomConnectionIdGenerator, Version, -}; -use tokio::time::Sleep; - -use crate::old_https::Http09Server; - -const ANTI_REPLAY_WINDOW: Duration = Duration::from_secs(10); - -mod old_https; - -#[derive(Debug)] -pub enum ServerError { - ArgumentError(&'static str), - Http3Error(neqo_http3::Error), - IoError(io::Error), - QlogError, - TransportError(neqo_transport::Error), -} - -impl From for ServerError { - fn from(err: io::Error) -> Self { - Self::IoError(err) - } -} - -impl From for ServerError { - fn from(err: neqo_http3::Error) -> Self { - Self::Http3Error(err) - } -} - -impl From for ServerError { - fn from(_err: qlog::Error) -> Self { - Self::QlogError - } -} - -impl From for ServerError { - fn from(err: neqo_transport::Error) -> Self { - Self::TransportError(err) - } -} - -impl Display for ServerError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Error: {self:?}")?; - Ok(()) - } -} - -impl std::error::Error for ServerError {} - -#[derive(Debug, Parser)] -#[command(author, version, about, long_about = None)] -struct Args { - #[command(flatten)] - shared: neqo_bin::SharedArgs, - - /// List of IP:port to listen on - #[arg(default_value = "[::]:4433")] - hosts: Vec, - - #[arg(short = 'd', long, default_value = "./test-fixture/db")] - /// NSS database directory. - db: PathBuf, - - #[arg(short = 'k', long, default_value = "key")] - /// Name of key from NSS database. - key: String, - - #[arg(name = "retry", long)] - /// Force a retry - retry: bool, - - #[arg(name = "ech", long)] - /// Enable encrypted client hello (ECH). - /// This generates a new set of ECH keys when it is invoked. - /// The resulting configuration is printed to stdout in hexadecimal format. - ech: bool, -} - -impl Args { - fn get_ciphers(&self) -> Vec { - self.shared - .ciphers - .iter() - .filter_map(|c| match c.as_str() { - "TLS_AES_128_GCM_SHA256" => Some(TLS_AES_128_GCM_SHA256), - "TLS_AES_256_GCM_SHA384" => Some(TLS_AES_256_GCM_SHA384), - "TLS_CHACHA20_POLY1305_SHA256" => Some(TLS_CHACHA20_POLY1305_SHA256), - _ => None, - }) - .collect::>() - } - - fn listen_addresses(&self) -> Vec { - self.hosts - .iter() - .filter_map(|host| host.to_socket_addrs().ok()) - .flatten() - .chain(self.shared.quic_parameters.preferred_address_v4()) - .chain(self.shared.quic_parameters.preferred_address_v6()) - .collect() - } - - fn now(&self) -> Instant { - if self.shared.qns_test.is_some() { - // When NSS starts its anti-replay it blocks any acceptance of 0-RTT for a - // single period. This ensures that an attacker that is able to force a - // server to reboot is unable to use that to flush the anti-replay buffers - // and have something replayed. - // - // However, this is a massive inconvenience for us when we are testing. - // As we can't initialize `AntiReplay` in the past (see `neqo_common::time` - // for why), fast forward time here so that the connections get times from - // in the future. - // - // This is NOT SAFE. Don't do this. - Instant::now() + ANTI_REPLAY_WINDOW - } else { - Instant::now() - } - } -} - -fn qns_read_response(filename: &str) -> Option> { - let mut file_path = PathBuf::from("/www"); - file_path.push(filename.trim_matches(|p| p == '/')); - - OpenOptions::new() - .read(true) - .open(&file_path) - .map_err(|_e| eprintln!("Could not open {}", file_path.display())) - .ok() - .and_then(|mut f| { - let mut data = Vec::new(); - match f.read_to_end(&mut data) { - Ok(sz) => { - println!("{} bytes read from {}", sz, file_path.display()); - Some(data) - } - Err(e) => { - eprintln!("Error reading data: {e:?}"); - None - } - } - }) -} - -trait HttpServer: Display { - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output; - fn process_events(&mut self, args: &Args, now: Instant); - fn set_qlog_dir(&mut self, dir: Option); - fn set_ciphers(&mut self, ciphers: &[Cipher]); - fn validate_address(&mut self, when: ValidateAddress); - fn enable_ech(&mut self) -> &[u8]; -} - -struct ResponseData { - data: Vec, - offset: usize, - remaining: usize, -} - -impl From<&[u8]> for ResponseData { - fn from(data: &[u8]) -> Self { - Self::from(data.to_vec()) - } -} - -impl From> for ResponseData { - fn from(data: Vec) -> Self { - let remaining = data.len(); - Self { - data, - offset: 0, - remaining, - } - } -} - -impl ResponseData { - fn repeat(buf: &[u8], total: usize) -> Self { - Self { - data: buf.to_owned(), - offset: 0, - remaining: total, - } - } - - fn send(&mut self, stream: &mut Http3OrWebTransportStream) { - while self.remaining > 0 { - let end = min(self.data.len(), self.offset + self.remaining); - let slice = &self.data[self.offset..end]; - match stream.send_data(slice) { - Ok(0) => { - return; - } - Ok(sent) => { - self.remaining -= sent; - self.offset = (self.offset + sent) % self.data.len(); - } - Err(e) => { - qwarn!("Error writing to stream {}: {:?}", stream, e); - return; - } - } - } - } - - fn done(&self) -> bool { - self.remaining == 0 - } -} - -struct SimpleServer { - server: Http3Server, - /// Progress writing to each stream. - remaining_data: HashMap, - posts: HashMap, -} - -impl SimpleServer { - const MESSAGE: &'static [u8] = b"I am the very model of a modern Major-General,\n\ - I've information vegetable, animal, and mineral,\n\ - I know the kings of England, and I quote the fights historical\n\ - From Marathon to Waterloo, in order categorical;\n\ - I'm very well acquainted, too, with matters mathematical,\n\ - I understand equations, both the simple and quadratical,\n\ - About binomial theorem, I'm teeming with a lot o' news,\n\ - With many cheerful facts about the square of the hypotenuse.\n"; - - pub fn new( - args: &Args, - anti_replay: AntiReplay, - cid_mgr: Rc>, - ) -> Self { - let server = Http3Server::new( - args.now(), - &[args.key.clone()], - &[args.shared.alpn.clone()], - anti_replay, - cid_mgr, - Http3Parameters::default() - .connection_parameters(args.shared.quic_parameters.get(&args.shared.alpn)) - .max_table_size_encoder(args.shared.max_table_size_encoder) - .max_table_size_decoder(args.shared.max_table_size_decoder) - .max_blocked_streams(args.shared.max_blocked_streams), - None, - ) - .expect("We cannot make a server!"); - Self { - server, - remaining_data: HashMap::new(), - posts: HashMap::new(), - } - } -} - -impl Display for SimpleServer { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.server.fmt(f) - } -} - -impl HttpServer for SimpleServer { - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { - self.server.process(dgram, now) - } - - fn process_events(&mut self, args: &Args, _now: Instant) { - while let Some(event) = self.server.next_event() { - match event { - Http3ServerEvent::Headers { - mut stream, - headers, - fin, - } => { - println!("Headers (request={stream} fin={fin}): {headers:?}"); - - let post = if let Some(method) = headers.iter().find(|&h| h.name() == ":method") - { - method.value() == "POST" - } else { - false - }; - if post { - self.posts.insert(stream, 0); - continue; - } - - let mut response = - if let Some(path) = headers.iter().find(|&h| h.name() == ":path") { - if args.shared.qns_test.is_some() { - if let Some(data) = qns_read_response(path.value()) { - ResponseData::from(data) - } else { - ResponseData::from(Self::MESSAGE) - } - } else if let Ok(count) = - path.value().trim_matches(|p| p == '/').parse::() - { - ResponseData::repeat(Self::MESSAGE, count) - } else { - ResponseData::from(Self::MESSAGE) - } - } else { - stream - .cancel_fetch(Error::HttpRequestIncomplete.code()) - .unwrap(); - continue; - }; - - stream - .send_headers(&[ - Header::new(":status", "200"), - Header::new("content-length", response.remaining.to_string()), - ]) - .unwrap(); - response.send(&mut stream); - if response.done() { - stream.stream_close_send().unwrap(); - } else { - self.remaining_data.insert(stream.stream_id(), response); - } - } - Http3ServerEvent::DataWritable { mut stream } => { - if self.posts.get_mut(&stream).is_none() { - if let Some(remaining) = self.remaining_data.get_mut(&stream.stream_id()) { - remaining.send(&mut stream); - if remaining.done() { - self.remaining_data.remove(&stream.stream_id()); - stream.stream_close_send().unwrap(); - } - } - } - } - - Http3ServerEvent::Data { - mut stream, - data, - fin, - } => { - if let Some(received) = self.posts.get_mut(&stream) { - *received += data.len(); - } - if fin { - if let Some(received) = self.posts.remove(&stream) { - let msg = received.to_string().as_bytes().to_vec(); - stream - .send_headers(&[Header::new(":status", "200")]) - .unwrap(); - stream.send_data(&msg).unwrap(); - stream.stream_close_send().unwrap(); - } - } - } - _ => {} - } - } - } - - fn set_qlog_dir(&mut self, dir: Option) { - self.server.set_qlog_dir(dir); - } - - fn validate_address(&mut self, v: ValidateAddress) { - self.server.set_validation(v); - } - - fn set_ciphers(&mut self, ciphers: &[Cipher]) { - self.server.set_ciphers(ciphers); - } - - fn enable_ech(&mut self) -> &[u8] { - let (sk, pk) = generate_ech_keys().expect("should create ECH keys"); - self.server - .enable_ech(random::<1>()[0], "public.example", &sk, &pk) - .unwrap(); - self.server.ech_config() - } -} - -struct ServersRunner { - args: Args, - server: Box, - timeout: Option>>, - sockets: Vec<(SocketAddr, udp::Socket)>, -} - -impl ServersRunner { - pub fn new(args: Args) -> Result { - let hosts = args.listen_addresses(); - if hosts.is_empty() { - eprintln!("No valid hosts defined"); - return Err(io::Error::new(io::ErrorKind::InvalidInput, "No hosts")); - } - let sockets = hosts - .into_iter() - .map(|host| { - let socket = udp::Socket::bind(host)?; - let local_addr = socket.local_addr()?; - println!("Server waiting for connection on: {local_addr:?}"); - - Ok((host, socket)) - }) - .collect::>()?; - let server = Self::create_server(&args); - - Ok(Self { - args, - server, - timeout: None, - sockets, - }) - } - - fn create_server(args: &Args) -> Box { - // Note: this is the exception to the case where we use `Args::now`. - let anti_replay = AntiReplay::new(Instant::now(), ANTI_REPLAY_WINDOW, 7, 14) - .expect("unable to setup anti-replay"); - let cid_mgr = Rc::new(RefCell::new(RandomConnectionIdGenerator::new(10))); - - let mut svr: Box = if args.shared.use_old_http { - Box::new( - Http09Server::new( - args.now(), - &[args.key.clone()], - &[args.shared.alpn.clone()], - anti_replay, - cid_mgr, - args.shared.quic_parameters.get(&args.shared.alpn), - ) - .expect("We cannot make a server!"), - ) - } else { - Box::new(SimpleServer::new(args, anti_replay, cid_mgr)) - }; - svr.set_ciphers(&args.get_ciphers()); - svr.set_qlog_dir(args.shared.qlog_dir.clone()); - if args.retry { - svr.validate_address(ValidateAddress::Always); - } - if args.ech { - let cfg = svr.enable_ech(); - println!("ECHConfigList: {}", hex(cfg)); - } - svr - } - - /// Tries to find a socket, but then just falls back to sending from the first. - fn find_socket(&mut self, addr: SocketAddr) -> &mut udp::Socket { - let ((_host, first_socket), rest) = self.sockets.split_first_mut().unwrap(); - rest.iter_mut() - .map(|(_host, socket)| socket) - .find(|socket| { - socket - .local_addr() - .ok() - .map_or(false, |socket_addr| socket_addr == addr) - }) - .unwrap_or(first_socket) - } - - async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> { - loop { - match self.server.process(dgram.take(), self.args.now()) { - Output::Datagram(dgram) => { - let socket = self.find_socket(dgram.source()); - socket.writable().await?; - socket.send(dgram)?; - } - Output::Callback(new_timeout) => { - qinfo!("Setting timeout of {:?}", new_timeout); - self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout))); - break; - } - Output::None => { - break; - } - } - } - Ok(()) - } - - // Wait for any of the sockets to be readable or the timeout to fire. - async fn ready(&mut self) -> Result { - let sockets_ready = select_all( - self.sockets - .iter() - .map(|(_host, socket)| Box::pin(socket.readable())), - ) - .map(|(res, inx, _)| match res { - Ok(()) => Ok(Ready::Socket(inx)), - Err(e) => Err(e), - }); - let timeout_ready = self - .timeout - .as_mut() - .map_or(Either::Right(futures::future::pending()), Either::Left) - .map(|()| Ok(Ready::Timeout)); - select(sockets_ready, timeout_ready).await.factor_first().0 - } - - async fn run(&mut self) -> Result<(), io::Error> { - loop { - match self.ready().await? { - Ready::Socket(inx) => loop { - let (host, socket) = self.sockets.get_mut(inx).unwrap(); - let dgrams = socket.recv(host)?; - if dgrams.is_empty() { - break; - } - for dgram in dgrams { - self.process(Some(&dgram)).await?; - } - }, - Ready::Timeout => { - self.timeout = None; - self.process(None).await?; - } - } - - self.server.process_events(&self.args, self.args.now()); - self.process(None).await?; - } - } -} - -enum Ready { - Socket(usize), - Timeout, -} #[tokio::main] -async fn main() -> Result<(), io::Error> { - const HQ_INTEROP: &str = "hq-interop"; - - let mut args = Args::parse(); - assert!(!args.key.is_empty(), "Need at least one key"); - - init_db(args.db.clone()); - - if let Some(testcase) = args.shared.qns_test.as_ref() { - if args.shared.quic_parameters.quic_version.is_empty() { - // Quic Interop Runner expects the server to support `Version1` - // only. Exceptions are testcases `versionnegotiation` (not yet - // implemented) and `v2`. - if testcase != "v2" { - args.shared.quic_parameters.quic_version = vec![Version::Version1]; - } - } else { - qwarn!("Both -V and --qns-test were set. Ignoring testcase specific versions."); - } - - // TODO: More options to deduplicate with client? - match testcase.as_str() { - "http3" => (), - "zerortt" => { - args.shared.use_old_http = true; - args.shared.alpn = String::from(HQ_INTEROP); - args.shared.quic_parameters.max_streams_bidi = 100; - } - "handshake" | "transfer" | "resumption" | "multiconnect" | "v2" => { - args.shared.use_old_http = true; - args.shared.alpn = String::from(HQ_INTEROP); - } - "chacha20" => { - args.shared.use_old_http = true; - args.shared.alpn = String::from(HQ_INTEROP); - args.shared.ciphers.clear(); - args.shared - .ciphers - .extend_from_slice(&[String::from("TLS_CHACHA20_POLY1305_SHA256")]); - } - "retry" => { - args.shared.use_old_http = true; - args.shared.alpn = String::from(HQ_INTEROP); - args.retry = true; - } - _ => exit(127), - } - } +async fn main() -> Result<(), std::io::Error> { + let args = neqo_bin::server::Args::parse(); - let mut servers_runner = ServersRunner::new(args)?; - servers_runner.run().await + neqo_bin::server::server(args).await } diff --git a/neqo-bin/src/bin/client/http09.rs b/neqo-bin/src/client/http09.rs similarity index 99% rename from neqo-bin/src/bin/client/http09.rs rename to neqo-bin/src/client/http09.rs index a7dc2c21c7..8c11cd2087 100644 --- a/neqo-bin/src/bin/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -25,8 +25,7 @@ use neqo_transport::{ }; use url::Url; -use super::{get_output_file, Args, KeyUpdateState, Res}; -use crate::qlog_new; +use super::{get_output_file, qlog_new, Args, KeyUpdateState, Res}; pub struct Handler<'a> { streams: HashMap>, diff --git a/neqo-bin/src/bin/client/http3.rs b/neqo-bin/src/client/http3.rs similarity index 99% rename from neqo-bin/src/bin/client/http3.rs rename to neqo-bin/src/client/http3.rs index 754de9cb16..d38553fae6 100644 --- a/neqo-bin/src/bin/client/http3.rs +++ b/neqo-bin/src/client/http3.rs @@ -26,7 +26,7 @@ use neqo_transport::{ }; use url::Url; -use crate::{get_output_file, qlog_new, Args, KeyUpdateState, Res}; +use super::{get_output_file, qlog_new, Args, KeyUpdateState, Res}; pub(crate) struct Handler<'a> { #[allow( diff --git a/neqo-bin/src/client/mod.rs b/neqo-bin/src/client/mod.rs new file mode 100644 index 0000000000..074272dfdb --- /dev/null +++ b/neqo-bin/src/client/mod.rs @@ -0,0 +1,562 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::{ + collections::{HashMap, VecDeque}, + fmt::{self, Display}, + fs::{create_dir_all, File, OpenOptions}, + io, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs}, + path::PathBuf, + pin::Pin, + process::exit, + str::FromStr, + time::Instant, +}; + +use crate::udp; +use clap::Parser; +use futures::{ + future::{select, Either}, + FutureExt, TryFutureExt, +}; +use neqo_common::{self as common, qdebug, qinfo, qlog::NeqoQlog, Datagram, Role}; +use neqo_crypto::{ + constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256}, + Cipher, ResumptionToken, +}; +use neqo_http3::{Error, Output}; +use neqo_transport::{AppError, ConnectionId, Error as TransportError, Version}; +use qlog::{events::EventImportance, streamer::QlogStreamer}; +use tokio::time::Sleep; +use url::{Origin, Url}; + +mod http09; +mod http3; + +#[derive(Debug)] +pub enum ClientError { + ArgumentError(&'static str), + Http3Error(neqo_http3::Error), + IoError(io::Error), + QlogError, + TransportError(neqo_transport::Error), +} + +impl From for ClientError { + fn from(err: io::Error) -> Self { + Self::IoError(err) + } +} + +impl From for ClientError { + fn from(err: neqo_http3::Error) -> Self { + Self::Http3Error(err) + } +} + +impl From for ClientError { + fn from(_err: qlog::Error) -> Self { + Self::QlogError + } +} + +impl From for ClientError { + fn from(err: neqo_transport::Error) -> Self { + Self::TransportError(err) + } +} + +impl Display for ClientError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Error: {self:?}")?; + Ok(()) + } +} + +impl std::error::Error for ClientError {} + +type Res = Result; + +/// Track whether a key update is needed. +#[derive(Debug, PartialEq, Eq)] +struct KeyUpdateState(bool); + +impl KeyUpdateState { + pub fn maybe_update(&mut self, update_fn: F) -> Res<()> + where + F: FnOnce() -> Result<(), E>, + E: Into, + { + if self.0 { + if let Err(e) = update_fn() { + let e = e.into(); + match e { + ClientError::TransportError(TransportError::KeyUpdateBlocked) + | ClientError::Http3Error(Error::TransportError( + TransportError::KeyUpdateBlocked, + )) => (), + _ => return Err(e), + } + } else { + println!("Keys updated"); + self.0 = false; + } + } + Ok(()) + } + + fn needed(&self) -> bool { + self.0 + } +} + +#[derive(Debug, Parser)] +#[command(author, version, about, long_about = None)] +#[allow(clippy::struct_excessive_bools)] // Not a good use of that lint. +pub struct Args { + #[command(flatten)] + shared: crate::SharedArgs, + + urls: Vec, + + #[arg(short = 'm', default_value = "GET")] + method: String, + + #[arg(short = 'H', long, number_of_values = 2)] + header: Vec, + + #[arg(name = "max-push", short = 'p', long, default_value = "10")] + max_concurrent_push_streams: u64, + + #[arg(name = "download-in-series", long)] + /// Download resources in series using separate connections. + download_in_series: bool, + + #[arg(name = "concurrency", long, default_value = "100")] + /// The maximum number of requests to have outstanding at one time. + concurrency: usize, + + #[arg(name = "output-read-data", long)] + /// Output received data to stdout + output_read_data: bool, + + #[arg(name = "output-dir", long)] + /// Save contents of fetched URLs to a directory + output_dir: Option, + + #[arg(short = 'r', long)] + /// Client attempts to resume by making multiple connections to servers. + /// Requires that 2 or more URLs are listed for each server. + /// Use this for 0-RTT: the stack always attempts 0-RTT on resumption. + resume: bool, + + #[arg(name = "key-update", long)] + /// Attempt to initiate a key update immediately after confirming the connection. + key_update: bool, + + #[arg(name = "ech", long, value_parser = |s: &str| hex::decode(s))] + /// Enable encrypted client hello (ECH). + /// This takes an encoded ECH configuration in hexadecimal format. + ech: Option>, + + #[arg(name = "ipv4-only", short = '4', long)] + /// Connect only over IPv4 + ipv4_only: bool, + + #[arg(name = "ipv6-only", short = '6', long)] + /// Connect only over IPv6 + ipv6_only: bool, + + /// The test that this client will run. Currently, we only support "upload". + #[arg(name = "test", long)] + test: Option, + + /// The request size that will be used for upload test. + #[arg(name = "upload-size", long, default_value = "100")] + upload_size: usize, +} + +impl Args { + // TODO + pub fn new() -> Self { + Self { + shared: crate::SharedArgs::new(), + urls: vec![Url::from_str("http://127.0.0.1:12345/1073741824").unwrap()], + method: "GET".into(), + header: vec![], + max_concurrent_push_streams: 10, + download_in_series: false, + concurrency: 100, + output_read_data: false, + output_dir: Some("/tmp/out".into()), + resume: false, + key_update: false, + ech: None, + ipv4_only: false, + ipv6_only: false, + test: None, + upload_size: 100, + } + } + fn get_ciphers(&self) -> Vec { + self.shared + .ciphers + .iter() + .filter_map(|c| match c.as_str() { + "TLS_AES_128_GCM_SHA256" => Some(TLS_AES_128_GCM_SHA256), + "TLS_AES_256_GCM_SHA384" => Some(TLS_AES_256_GCM_SHA384), + "TLS_CHACHA20_POLY1305_SHA256" => Some(TLS_CHACHA20_POLY1305_SHA256), + _ => None, + }) + .collect::>() + } + + fn update_for_tests(&mut self) { + let Some(testcase) = self.shared.qns_test.as_ref() else { + return; + }; + + // Only use v1 for most QNS tests. + self.shared.quic_parameters.quic_version = vec![Version::Version1]; + match testcase.as_str() { + // TODO: Add "ecn" when that is ready. + "http3" => { + if let Some(testcase) = &self.test { + if testcase.as_str() != "upload" { + eprintln!("Unsupported test case: {testcase}"); + exit(127) + } + + self.method = String::from("POST"); + } + } + "handshake" | "transfer" | "retry" => { + self.shared.use_old_http = true; + } + "zerortt" | "resumption" => { + if self.urls.len() < 2 { + eprintln!("Warning: resumption tests won't work without >1 URL"); + exit(127); + } + self.shared.use_old_http = true; + self.resume = true; + } + "multiconnect" => { + self.shared.use_old_http = true; + self.download_in_series = true; + } + "chacha20" => { + self.shared.use_old_http = true; + self.shared.ciphers.clear(); + self.shared + .ciphers + .extend_from_slice(&[String::from("TLS_CHACHA20_POLY1305_SHA256")]); + } + "keyupdate" => { + self.shared.use_old_http = true; + self.key_update = true; + } + "v2" => { + self.shared.use_old_http = true; + // Use default version set for this test (which allows compatible vneg.) + self.shared.quic_parameters.quic_version.clear(); + } + _ => exit(127), + } + } +} + +fn get_output_file( + url: &Url, + output_dir: &Option, + all_paths: &mut Vec, +) -> Option { + if let Some(ref dir) = output_dir { + let mut out_path = dir.clone(); + + let url_path = if url.path() == "/" { + // If no path is given... call it "root"? + "root" + } else { + // Omit leading slash + &url.path()[1..] + }; + out_path.push(url_path); + + if all_paths.contains(&out_path) { + eprintln!("duplicate path {}", out_path.display()); + return None; + } + + eprintln!("Saving {url} to {out_path:?}"); + + if let Some(parent) = out_path.parent() { + create_dir_all(parent).ok()?; + } + + let f = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&out_path) + .ok()?; + + all_paths.push(out_path); + Some(f) + } else { + None + } +} + +enum Ready { + Socket, + Timeout, +} + +// Wait for the socket to be readable or the timeout to fire. +async fn ready( + socket: &udp::Socket, + mut timeout: Option<&mut Pin>>, +) -> Result { + let socket_ready = Box::pin(socket.readable()).map_ok(|()| Ready::Socket); + let timeout_ready = timeout + .as_mut() + .map_or(Either::Right(futures::future::pending()), Either::Left) + .map(|()| Ok(Ready::Timeout)); + select(socket_ready, timeout_ready).await.factor_first().0 +} + +/// Handles a given task on the provided [`Client`]. +trait Handler { + type Client: Client; + + fn handle(&mut self, client: &mut Self::Client) -> Res; + fn maybe_key_update(&mut self, c: &mut Self::Client) -> Res<()>; + fn take_token(&mut self) -> Option; + fn has_token(&self) -> bool; +} + +/// Network client, e.g. [`neqo_transport::Connection`] or [`neqo_http3::Http3Client`]. +trait Client { + fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output; + fn close(&mut self, now: Instant, app_error: AppError, msg: S) + where + S: AsRef + Display; + fn is_closed(&self) -> bool; +} + +struct Runner<'a, H: Handler> { + local_addr: SocketAddr, + socket: &'a mut udp::Socket, + client: H::Client, + handler: H, + timeout: Option>>, + args: &'a Args, +} + +impl<'a, H: Handler> Runner<'a, H> { + async fn run(mut self) -> Res> { + loop { + let handler_done = self.handler.handle(&mut self.client)?; + + match (handler_done, self.args.resume, self.handler.has_token()) { + // Handler isn't done. Continue. + (false, _, _) => {}, + // Handler done. Resumption token needed but not present. Continue. + (true, true, false) => { + qdebug!("Handler done. Waiting for resumption token."); + } + // Handler is done, no resumption token needed. Close. + (true, false, _) | + // Handler is done, resumption token needed and present. Close. + (true, true, true) => { + self.client.close(Instant::now(), 0, "kthxbye!"); + } + } + + self.process(None).await?; + + if self.client.is_closed() { + return Ok(self.handler.take_token()); + } + + match ready(self.socket, self.timeout.as_mut()).await? { + Ready::Socket => loop { + let dgrams = self.socket.recv(&self.local_addr)?; + if dgrams.is_empty() { + break; + } + for dgram in &dgrams { + self.process(Some(dgram)).await?; + } + self.handler.maybe_key_update(&mut self.client)?; + }, + Ready::Timeout => { + self.timeout = None; + } + } + } + } + + async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> { + loop { + match self.client.process(dgram.take(), Instant::now()) { + Output::Datagram(dgram) => { + self.socket.writable().await?; + self.socket.send(dgram)?; + } + Output::Callback(new_timeout) => { + qinfo!("Setting timeout of {:?}", new_timeout); + self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout))); + break; + } + Output::None => { + qdebug!("Output::None"); + break; + } + } + } + + Ok(()) + } +} + +fn qlog_new(args: &Args, hostname: &str, cid: &ConnectionId) -> Res { + if let Some(qlog_dir) = &args.shared.qlog_dir { + let mut qlog_path = qlog_dir.clone(); + let filename = format!("{hostname}-{cid}.sqlog"); + qlog_path.push(filename); + + let f = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&qlog_path)?; + + let streamer = QlogStreamer::new( + qlog::QLOG_VERSION.to_string(), + Some("Example qlog".to_string()), + Some("Example qlog description".to_string()), + None, + std::time::Instant::now(), + common::qlog::new_trace(Role::Client), + EventImportance::Base, + Box::new(f), + ); + + Ok(NeqoQlog::enabled(streamer, qlog_path)?) + } else { + Ok(NeqoQlog::disabled()) + } +} + +pub async fn client(mut args: Args) -> Res<()> { + // TODO + // neqo_crypto::init(); + args.update_for_tests(); + + let urls_by_origin = args + .urls + .clone() + .into_iter() + .fold(HashMap::>::new(), |mut urls, url| { + urls.entry(url.origin()).or_default().push_back(url); + urls + }) + .into_iter() + .filter_map(|(origin, urls)| match origin { + Origin::Tuple(_scheme, h, p) => Some(((h, p), urls)), + Origin::Opaque(x) => { + eprintln!("Opaque origin {x:?}"); + None + } + }); + + for ((host, port), mut urls) in urls_by_origin { + if args.resume && urls.len() < 2 { + eprintln!("Resumption to {host} cannot work without at least 2 URLs."); + exit(127); + } + + let remote_addr = format!("{host}:{port}").to_socket_addrs()?.find(|addr| { + !matches!( + (addr, args.ipv4_only, args.ipv6_only), + (SocketAddr::V4(..), false, true) | (SocketAddr::V6(..), true, false) + ) + }); + let Some(remote_addr) = remote_addr else { + eprintln!("No compatible address found for: {host}"); + exit(1); + }; + + let local_addr = match remote_addr { + SocketAddr::V4(..) => SocketAddr::new(IpAddr::V4(Ipv4Addr::from([0; 4])), 0), + SocketAddr::V6(..) => SocketAddr::new(IpAddr::V6(Ipv6Addr::from([0; 16])), 0), + }; + + let mut socket = udp::Socket::bind(local_addr)?; + let real_local = socket.local_addr().unwrap(); + println!( + "{} Client connecting: {:?} -> {:?}", + if args.shared.use_old_http { "H9" } else { "H3" }, + real_local, + remote_addr, + ); + + let hostname = format!("{host}"); + let mut token: Option = None; + let mut first = true; + while !urls.is_empty() { + let to_request = if (args.resume && first) || args.download_in_series { + urls.pop_front().into_iter().collect() + } else { + std::mem::take(&mut urls) + }; + + first = false; + + let key_update = KeyUpdateState(args.key_update); + + token = if args.shared.use_old_http { + let client = + http09::create_client(&args, real_local, remote_addr, &hostname, token) + .expect("failed to create client"); + + let handler = http09::Handler::new(to_request, &args, key_update); + + Runner { + args: &args, + client, + handler, + local_addr: real_local, + socket: &mut socket, + timeout: None, + } + .run() + .await? + } else { + let client = http3::create_client(&args, real_local, remote_addr, &hostname, token) + .expect("failed to create client"); + + let handler = http3::Handler::new(to_request, &args, key_update); + + Runner { + args: &args, + client, + handler, + local_addr: real_local, + socket: &mut socket, + timeout: None, + } + .run() + .await? + }; + } + } + + Ok(()) +} diff --git a/neqo-bin/src/lib.rs b/neqo-bin/src/lib.rs index 8a7ff69b69..ca3a0ab489 100644 --- a/neqo-bin/src/lib.rs +++ b/neqo-bin/src/lib.rs @@ -17,6 +17,9 @@ use neqo_transport::{ Version, }; +// TODO: Needs to be pub? +pub mod client; +pub mod server; pub mod udp; #[derive(Debug, Parser)] @@ -57,6 +60,22 @@ pub struct SharedArgs { pub quic_parameters: QuicParameters, } +impl SharedArgs { + pub fn new() -> Self { + Self { + alpn: "h3".into(), + qlog_dir: None, + max_table_size_encoder: 16384, + max_table_size_decoder: 16384, + max_blocked_streams: 10, + ciphers: vec![], + qns_test: None, + use_old_http: false, + quic_parameters: QuicParameters::new(), + } + } +} + #[derive(Debug, Parser)] pub struct QuicParameters { #[arg( @@ -103,6 +122,19 @@ pub struct QuicParameters { } impl QuicParameters { + pub fn new() -> Self { + Self { + quic_version: vec![], + max_streams_bidi: 16, + max_streams_uni: 16, + idle_timeout: 30, + congestion_control: CongestionControlAlgorithm::NewReno, + pacing: true, + preferred_address_v4: None, + preferred_address_v6: None, + } + } + fn get_sock_addr(opt: &Option, v: &str, f: F) -> Option where F: FnMut(&SocketAddr) -> bool, diff --git a/neqo-bin/src/server/mod.rs b/neqo-bin/src/server/mod.rs new file mode 100644 index 0000000000..bdc8fcda62 --- /dev/null +++ b/neqo-bin/src/server/mod.rs @@ -0,0 +1,633 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::{ + cell::RefCell, + cmp::min, + collections::HashMap, + fmt::{self, Display}, + fs::OpenOptions, + io::{self, Read}, + net::{SocketAddr, ToSocketAddrs}, + path::PathBuf, + pin::Pin, + process::exit, + rc::Rc, + str::FromStr, + time::{Duration, Instant}, +}; + +use crate::udp; +use clap::Parser; +use futures::{ + future::{select, select_all, Either}, + FutureExt, +}; +use neqo_common::{hex, qinfo, qwarn, Datagram, Header}; +use neqo_crypto::{ + constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256}, + generate_ech_keys, init_db, random, AntiReplay, Cipher, +}; +use neqo_http3::{ + Error, Http3OrWebTransportStream, Http3Parameters, Http3Server, Http3ServerEvent, StreamId, +}; +use neqo_transport::{ + server::ValidateAddress, ConnectionIdGenerator, Output, RandomConnectionIdGenerator, Version, +}; +use tokio::time::Sleep; + +use old_https::Http09Server; + +const ANTI_REPLAY_WINDOW: Duration = Duration::from_secs(10); + +mod old_https; + +#[derive(Debug)] +pub enum ServerError { + ArgumentError(&'static str), + Http3Error(neqo_http3::Error), + IoError(io::Error), + QlogError, + TransportError(neqo_transport::Error), +} + +impl From for ServerError { + fn from(err: io::Error) -> Self { + Self::IoError(err) + } +} + +impl From for ServerError { + fn from(err: neqo_http3::Error) -> Self { + Self::Http3Error(err) + } +} + +impl From for ServerError { + fn from(_err: qlog::Error) -> Self { + Self::QlogError + } +} + +impl From for ServerError { + fn from(err: neqo_transport::Error) -> Self { + Self::TransportError(err) + } +} + +impl Display for ServerError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Error: {self:?}")?; + Ok(()) + } +} + +impl std::error::Error for ServerError {} + +#[derive(Debug, Parser)] +#[command(author, version, about, long_about = None)] +pub struct Args { + #[command(flatten)] + shared: crate::SharedArgs, + + /// List of IP:port to listen on + #[arg(default_value = "[::]:4433")] + hosts: Vec, + + #[arg(short = 'd', long, default_value = "./test-fixture/db")] + /// NSS database directory. + db: PathBuf, + + #[arg(short = 'k', long, default_value = "key")] + /// Name of key from NSS database. + key: String, + + #[arg(name = "retry", long)] + /// Force a retry + retry: bool, + + #[arg(name = "ech", long)] + /// Enable encrypted client hello (ECH). + /// This generates a new set of ECH keys when it is invoked. + /// The resulting configuration is printed to stdout in hexadecimal format. + ech: bool, +} + +impl Args { + pub fn new() -> Self { + Self { + shared: crate::SharedArgs::new(), + hosts: vec!["[::]:12345".to_string()], + db: PathBuf::from_str("/home/mxinden/code/github.com/mozilla/neqo/test-fixture/db") + .unwrap(), + key: "key".to_string(), + retry: false, + ech: false, + } + } + fn get_ciphers(&self) -> Vec { + self.shared + .ciphers + .iter() + .filter_map(|c| match c.as_str() { + "TLS_AES_128_GCM_SHA256" => Some(TLS_AES_128_GCM_SHA256), + "TLS_AES_256_GCM_SHA384" => Some(TLS_AES_256_GCM_SHA384), + "TLS_CHACHA20_POLY1305_SHA256" => Some(TLS_CHACHA20_POLY1305_SHA256), + _ => None, + }) + .collect::>() + } + + fn listen_addresses(&self) -> Vec { + self.hosts + .iter() + .filter_map(|host| host.to_socket_addrs().ok()) + .flatten() + .chain(self.shared.quic_parameters.preferred_address_v4()) + .chain(self.shared.quic_parameters.preferred_address_v6()) + .collect() + } + + fn now(&self) -> Instant { + if self.shared.qns_test.is_some() { + // When NSS starts its anti-replay it blocks any acceptance of 0-RTT for a + // single period. This ensures that an attacker that is able to force a + // server to reboot is unable to use that to flush the anti-replay buffers + // and have something replayed. + // + // However, this is a massive inconvenience for us when we are testing. + // As we can't initialize `AntiReplay` in the past (see `neqo_common::time` + // for why), fast forward time here so that the connections get times from + // in the future. + // + // This is NOT SAFE. Don't do this. + Instant::now() + ANTI_REPLAY_WINDOW + } else { + Instant::now() + } + } +} + +fn qns_read_response(filename: &str) -> Option> { + let mut file_path = PathBuf::from("/www"); + file_path.push(filename.trim_matches(|p| p == '/')); + + OpenOptions::new() + .read(true) + .open(&file_path) + .map_err(|_e| eprintln!("Could not open {}", file_path.display())) + .ok() + .and_then(|mut f| { + let mut data = Vec::new(); + match f.read_to_end(&mut data) { + Ok(sz) => { + println!("{} bytes read from {}", sz, file_path.display()); + Some(data) + } + Err(e) => { + eprintln!("Error reading data: {e:?}"); + None + } + } + }) +} + +trait HttpServer: Display { + fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output; + fn process_events(&mut self, args: &Args, now: Instant); + fn set_qlog_dir(&mut self, dir: Option); + fn set_ciphers(&mut self, ciphers: &[Cipher]); + fn validate_address(&mut self, when: ValidateAddress); + fn enable_ech(&mut self) -> &[u8]; +} + +struct ResponseData { + data: Vec, + offset: usize, + remaining: usize, +} + +impl From<&[u8]> for ResponseData { + fn from(data: &[u8]) -> Self { + Self::from(data.to_vec()) + } +} + +impl From> for ResponseData { + fn from(data: Vec) -> Self { + let remaining = data.len(); + Self { + data, + offset: 0, + remaining, + } + } +} + +impl ResponseData { + fn repeat(buf: &[u8], total: usize) -> Self { + Self { + data: buf.to_owned(), + offset: 0, + remaining: total, + } + } + + fn send(&mut self, stream: &mut Http3OrWebTransportStream) { + while self.remaining > 0 { + let end = min(self.data.len(), self.offset + self.remaining); + let slice = &self.data[self.offset..end]; + match stream.send_data(slice) { + Ok(0) => { + return; + } + Ok(sent) => { + self.remaining -= sent; + self.offset = (self.offset + sent) % self.data.len(); + } + Err(e) => { + qwarn!("Error writing to stream {}: {:?}", stream, e); + return; + } + } + } + } + + fn done(&self) -> bool { + self.remaining == 0 + } +} + +struct SimpleServer { + server: Http3Server, + /// Progress writing to each stream. + remaining_data: HashMap, + posts: HashMap, +} + +impl SimpleServer { + const MESSAGE: &'static [u8] = b"I am the very model of a modern Major-General,\n\ + I've information vegetable, animal, and mineral,\n\ + I know the kings of England, and I quote the fights historical\n\ + From Marathon to Waterloo, in order categorical;\n\ + I'm very well acquainted, too, with matters mathematical,\n\ + I understand equations, both the simple and quadratical,\n\ + About binomial theorem, I'm teeming with a lot o' news,\n\ + With many cheerful facts about the square of the hypotenuse.\n"; + + pub fn new( + args: &Args, + anti_replay: AntiReplay, + cid_mgr: Rc>, + ) -> Self { + let server = Http3Server::new( + args.now(), + &[args.key.clone()], + &[args.shared.alpn.clone()], + anti_replay, + cid_mgr, + Http3Parameters::default() + .connection_parameters(args.shared.quic_parameters.get(&args.shared.alpn)) + .max_table_size_encoder(args.shared.max_table_size_encoder) + .max_table_size_decoder(args.shared.max_table_size_decoder) + .max_blocked_streams(args.shared.max_blocked_streams), + None, + ) + .expect("We cannot make a server!"); + Self { + server, + remaining_data: HashMap::new(), + posts: HashMap::new(), + } + } +} + +impl Display for SimpleServer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.server.fmt(f) + } +} + +impl HttpServer for SimpleServer { + fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { + self.server.process(dgram, now) + } + + fn process_events(&mut self, args: &Args, _now: Instant) { + while let Some(event) = self.server.next_event() { + match event { + Http3ServerEvent::Headers { + mut stream, + headers, + fin, + } => { + println!("Headers (request={stream} fin={fin}): {headers:?}"); + + let post = if let Some(method) = headers.iter().find(|&h| h.name() == ":method") + { + method.value() == "POST" + } else { + false + }; + if post { + self.posts.insert(stream, 0); + continue; + } + + let mut response = + if let Some(path) = headers.iter().find(|&h| h.name() == ":path") { + if args.shared.qns_test.is_some() { + if let Some(data) = qns_read_response(path.value()) { + ResponseData::from(data) + } else { + ResponseData::from(Self::MESSAGE) + } + } else if let Ok(count) = + path.value().trim_matches(|p| p == '/').parse::() + { + ResponseData::repeat(Self::MESSAGE, count) + } else { + ResponseData::from(Self::MESSAGE) + } + } else { + stream + .cancel_fetch(Error::HttpRequestIncomplete.code()) + .unwrap(); + continue; + }; + + stream + .send_headers(&[ + Header::new(":status", "200"), + Header::new("content-length", response.remaining.to_string()), + ]) + .unwrap(); + response.send(&mut stream); + if response.done() { + stream.stream_close_send().unwrap(); + } else { + self.remaining_data.insert(stream.stream_id(), response); + } + } + Http3ServerEvent::DataWritable { mut stream } => { + if self.posts.get_mut(&stream).is_none() { + if let Some(remaining) = self.remaining_data.get_mut(&stream.stream_id()) { + remaining.send(&mut stream); + if remaining.done() { + self.remaining_data.remove(&stream.stream_id()); + stream.stream_close_send().unwrap(); + } + } + } + } + + Http3ServerEvent::Data { + mut stream, + data, + fin, + } => { + if let Some(received) = self.posts.get_mut(&stream) { + *received += data.len(); + } + if fin { + if let Some(received) = self.posts.remove(&stream) { + let msg = received.to_string().as_bytes().to_vec(); + stream + .send_headers(&[Header::new(":status", "200")]) + .unwrap(); + stream.send_data(&msg).unwrap(); + stream.stream_close_send().unwrap(); + } + } + } + _ => {} + } + } + } + + fn set_qlog_dir(&mut self, dir: Option) { + self.server.set_qlog_dir(dir); + } + + fn validate_address(&mut self, v: ValidateAddress) { + self.server.set_validation(v); + } + + fn set_ciphers(&mut self, ciphers: &[Cipher]) { + self.server.set_ciphers(ciphers); + } + + fn enable_ech(&mut self) -> &[u8] { + let (sk, pk) = generate_ech_keys().expect("should create ECH keys"); + self.server + .enable_ech(random::<1>()[0], "public.example", &sk, &pk) + .unwrap(); + self.server.ech_config() + } +} + +struct ServersRunner { + args: Args, + server: Box, + timeout: Option>>, + sockets: Vec<(SocketAddr, udp::Socket)>, +} + +impl ServersRunner { + pub fn new(args: Args) -> Result { + let hosts = args.listen_addresses(); + if hosts.is_empty() { + eprintln!("No valid hosts defined"); + return Err(io::Error::new(io::ErrorKind::InvalidInput, "No hosts")); + } + let sockets = hosts + .into_iter() + .map(|host| { + let socket = udp::Socket::bind(host)?; + let local_addr = socket.local_addr()?; + println!("Server waiting for connection on: {local_addr:?}"); + + Ok((host, socket)) + }) + .collect::>()?; + let server = Self::create_server(&args); + + Ok(Self { + args, + server, + timeout: None, + sockets, + }) + } + + fn create_server(args: &Args) -> Box { + // Note: this is the exception to the case where we use `Args::now`. + let anti_replay = AntiReplay::new(Instant::now(), ANTI_REPLAY_WINDOW, 7, 14) + .expect("unable to setup anti-replay"); + let cid_mgr = Rc::new(RefCell::new(RandomConnectionIdGenerator::new(10))); + + let mut svr: Box = if args.shared.use_old_http { + Box::new( + Http09Server::new( + args.now(), + &[args.key.clone()], + &[args.shared.alpn.clone()], + anti_replay, + cid_mgr, + args.shared.quic_parameters.get(&args.shared.alpn), + ) + .expect("We cannot make a server!"), + ) + } else { + Box::new(SimpleServer::new(args, anti_replay, cid_mgr)) + }; + svr.set_ciphers(&args.get_ciphers()); + svr.set_qlog_dir(args.shared.qlog_dir.clone()); + if args.retry { + svr.validate_address(ValidateAddress::Always); + } + if args.ech { + let cfg = svr.enable_ech(); + println!("ECHConfigList: {}", hex(cfg)); + } + svr + } + + /// Tries to find a socket, but then just falls back to sending from the first. + fn find_socket(&mut self, addr: SocketAddr) -> &mut udp::Socket { + let ((_host, first_socket), rest) = self.sockets.split_first_mut().unwrap(); + rest.iter_mut() + .map(|(_host, socket)| socket) + .find(|socket| { + socket + .local_addr() + .ok() + .map_or(false, |socket_addr| socket_addr == addr) + }) + .unwrap_or(first_socket) + } + + async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> { + loop { + match self.server.process(dgram.take(), self.args.now()) { + Output::Datagram(dgram) => { + let socket = self.find_socket(dgram.source()); + socket.writable().await?; + socket.send(dgram)?; + } + Output::Callback(new_timeout) => { + qinfo!("Setting timeout of {:?}", new_timeout); + self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout))); + break; + } + Output::None => { + break; + } + } + } + Ok(()) + } + + // Wait for any of the sockets to be readable or the timeout to fire. + async fn ready(&mut self) -> Result { + let sockets_ready = select_all( + self.sockets + .iter() + .map(|(_host, socket)| Box::pin(socket.readable())), + ) + .map(|(res, inx, _)| match res { + Ok(()) => Ok(Ready::Socket(inx)), + Err(e) => Err(e), + }); + let timeout_ready = self + .timeout + .as_mut() + .map_or(Either::Right(futures::future::pending()), Either::Left) + .map(|()| Ok(Ready::Timeout)); + select(sockets_ready, timeout_ready).await.factor_first().0 + } + + async fn run(&mut self) -> Result<(), io::Error> { + loop { + match self.ready().await? { + Ready::Socket(inx) => loop { + let (host, socket) = self.sockets.get_mut(inx).unwrap(); + let dgrams = socket.recv(host)?; + if dgrams.is_empty() { + break; + } + for dgram in dgrams { + self.process(Some(&dgram)).await?; + } + }, + Ready::Timeout => { + self.timeout = None; + self.process(None).await?; + } + } + + self.server.process_events(&self.args, self.args.now()); + self.process(None).await?; + } + } +} + +enum Ready { + Socket(usize), + Timeout, +} + +pub async fn server(mut args: Args) -> Result<(), io::Error> { + const HQ_INTEROP: &str = "hq-interop"; + + assert!(!args.key.is_empty(), "Need at least one key"); + + init_db(args.db.clone()); + + if let Some(testcase) = args.shared.qns_test.as_ref() { + if args.shared.quic_parameters.quic_version.is_empty() { + // Quic Interop Runner expects the server to support `Version1` + // only. Exceptions are testcases `versionnegotiation` (not yet + // implemented) and `v2`. + if testcase != "v2" { + args.shared.quic_parameters.quic_version = vec![Version::Version1]; + } + } else { + qwarn!("Both -V and --qns-test were set. Ignoring testcase specific versions."); + } + + // TODO: More options to deduplicate with client? + match testcase.as_str() { + "http3" => (), + "zerortt" => { + args.shared.use_old_http = true; + args.shared.alpn = String::from(HQ_INTEROP); + args.shared.quic_parameters.max_streams_bidi = 100; + } + "handshake" | "transfer" | "resumption" | "multiconnect" | "v2" => { + args.shared.use_old_http = true; + args.shared.alpn = String::from(HQ_INTEROP); + } + "chacha20" => { + args.shared.use_old_http = true; + args.shared.alpn = String::from(HQ_INTEROP); + args.shared.ciphers.clear(); + args.shared + .ciphers + .extend_from_slice(&[String::from("TLS_CHACHA20_POLY1305_SHA256")]); + } + "retry" => { + args.shared.use_old_http = true; + args.shared.alpn = String::from(HQ_INTEROP); + args.retry = true; + } + _ => exit(127), + } + } + + let mut servers_runner = ServersRunner::new(args)?; + servers_runner.run().await +} diff --git a/neqo-bin/src/bin/server/old_https.rs b/neqo-bin/src/server/old_https.rs similarity index 100% rename from neqo-bin/src/bin/server/old_https.rs rename to neqo-bin/src/server/old_https.rs