From 721f3e1ecedfc6301fd5c1800fe5601fd9218b02 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 8 Mar 2024 17:28:41 +0100 Subject: [PATCH] refactor(client): de-duplicate process & run and split into h3 & h09 The Neqo Client binary supports both http3 and http09 (prev. "old"). Before this commit both the http3 and the http09 implementation had their own `run` and `process` `fn`, orchestrating the interaction between handler, client and I/O. While similar, they had subtle differences e.g. when to terminate. This commit splits the http3 and http09 specific logic into two separate modules, but extracts duplicate logic (e.g. `run` and `process`) into the shared root module. --- neqo-bin/src/bin/client/http09.rs | 274 +++++++++ neqo-bin/src/bin/client/http3.rs | 456 +++++++++++++++ neqo-bin/src/bin/client/main.rs | 904 +++--------------------------- 3 files changed, 814 insertions(+), 820 deletions(-) create mode 100644 neqo-bin/src/bin/client/http09.rs create mode 100644 neqo-bin/src/bin/client/http3.rs diff --git a/neqo-bin/src/bin/client/http09.rs b/neqo-bin/src/bin/client/http09.rs new file mode 100644 index 0000000000..a7dc2c21c7 --- /dev/null +++ b/neqo-bin/src/bin/client/http09.rs @@ -0,0 +1,274 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! An [HTTP 0.9](https://www.w3.org/Protocols/HTTP/AsImplemented.html) client implementation. + +use std::{ + cell::RefCell, + collections::{HashMap, VecDeque}, + fs::File, + io::Write, + net::SocketAddr, + path::PathBuf, + rc::Rc, + time::Instant, +}; + +use neqo_common::{event::Provider, Datagram}; +use neqo_crypto::{AuthenticationStatus, ResumptionToken}; +use neqo_transport::{ + Connection, ConnectionEvent, EmptyConnectionIdGenerator, Error, Output, State, StreamId, + StreamType, +}; +use url::Url; + +use super::{get_output_file, Args, KeyUpdateState, Res}; +use crate::qlog_new; + +pub struct Handler<'a> { + streams: HashMap>, + url_queue: VecDeque, + all_paths: Vec, + args: &'a Args, + token: Option, + key_update: KeyUpdateState, +} + +impl<'a> super::Handler for Handler<'a> { + type Client = Connection; + + fn handle(&mut self, client: &mut Self::Client) -> Res { + while let Some(event) = client.next_event() { + match event { + ConnectionEvent::AuthenticationNeeded => { + client.authenticated(AuthenticationStatus::Ok, Instant::now()); + } + ConnectionEvent::RecvStreamReadable { stream_id } => { + self.read(client, stream_id)?; + } + ConnectionEvent::SendStreamWritable { stream_id } => { + println!("stream {stream_id} writable"); + } + ConnectionEvent::SendStreamComplete { stream_id } => { + println!("stream {stream_id} complete"); + } + ConnectionEvent::SendStreamCreatable { stream_type } => { + println!("stream {stream_type:?} creatable"); + if stream_type == StreamType::BiDi { + self.download_urls(client); + } + } + ConnectionEvent::StateChange( + State::WaitInitial | State::Handshaking | State::Connected, + ) => { + println!("{event:?}"); + self.download_urls(client); + } + ConnectionEvent::StateChange(State::Confirmed) => { + self.maybe_key_update(client)?; + } + ConnectionEvent::ResumptionToken(token) => { + self.token = Some(token); + } + _ => { + println!("Unhandled event {event:?}"); + } + } + } + + if self.streams.is_empty() && self.url_queue.is_empty() { + // Handler is done. + return Ok(true); + } + + Ok(false) + } + + fn maybe_key_update(&mut self, c: &mut Self::Client) -> Res<()> { + self.key_update.maybe_update(|| c.initiate_key_update())?; + self.download_urls(c); + Ok(()) + } + + fn take_token(&mut self) -> Option { + self.token.take() + } + + fn has_token(&self) -> bool { + self.token.is_some() + } +} + +pub(crate) fn create_client( + args: &Args, + local_addr: SocketAddr, + remote_addr: SocketAddr, + hostname: &str, + resumption_token: Option, +) -> Res { + let alpn = match args.shared.alpn.as_str() { + "hq-29" | "hq-30" | "hq-31" | "hq-32" => args.shared.alpn.as_str(), + _ => "hq-interop", + }; + + let mut client = Connection::new_client( + hostname, + &[alpn], + Rc::new(RefCell::new(EmptyConnectionIdGenerator::default())), + local_addr, + remote_addr, + args.shared.quic_parameters.get(alpn), + Instant::now(), + )?; + + if let Some(tok) = resumption_token { + client.enable_resumption(Instant::now(), tok)?; + } + + let ciphers = args.get_ciphers(); + if !ciphers.is_empty() { + client.set_ciphers(&ciphers)?; + } + + client.set_qlog(qlog_new(args, hostname, client.odcid().unwrap())?); + + Ok(client) +} + +impl super::Client for Connection { + fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { + self.process(dgram, now) + } + + fn close(&mut self, now: Instant, app_error: neqo_transport::AppError, msg: S) + where + S: AsRef + std::fmt::Display, + { + self.close(now, app_error, msg); + } + + fn is_closed(&self) -> bool { + matches!(self.state(), State::Closed(..)) + } +} + +impl<'b> Handler<'b> { + pub fn new(url_queue: VecDeque, args: &'b Args, key_update: KeyUpdateState) -> Self { + Self { + streams: HashMap::new(), + url_queue, + all_paths: Vec::new(), + args, + token: None, + key_update, + } + } + + fn download_urls(&mut self, client: &mut Connection) { + loop { + if self.url_queue.is_empty() { + break; + } + if self.streams.len() >= self.args.concurrency { + break; + } + if !self.download_next(client) { + break; + } + } + } + + fn download_next(&mut self, client: &mut Connection) -> bool { + if self.key_update.needed() { + println!("Deferring requests until after first key update"); + return false; + } + let url = self + .url_queue + .pop_front() + .expect("download_next called with empty queue"); + match client.stream_create(StreamType::BiDi) { + Ok(client_stream_id) => { + println!("Created stream {client_stream_id} for {url}"); + let req = format!("GET {}\r\n", url.path()); + _ = client + .stream_send(client_stream_id, req.as_bytes()) + .unwrap(); + client.stream_close_send(client_stream_id).unwrap(); + let out_file = get_output_file(&url, &self.args.output_dir, &mut self.all_paths); + self.streams.insert(client_stream_id, out_file); + true + } + Err(e @ (Error::StreamLimitError | Error::ConnectionState)) => { + println!("Cannot create stream {e:?}"); + self.url_queue.push_front(url); + false + } + Err(e) => { + panic!("Error creating stream {e:?}"); + } + } + } + + /// Read and maybe print received data from a stream. + // Returns bool: was fin received? + fn read_from_stream( + client: &mut Connection, + stream_id: StreamId, + output_read_data: bool, + maybe_out_file: &mut Option, + ) -> Res { + let mut data = vec![0; 4096]; + loop { + let (sz, fin) = client.stream_recv(stream_id, &mut data)?; + if sz == 0 { + return Ok(fin); + } + + if let Some(out_file) = maybe_out_file { + out_file.write_all(&data[..sz])?; + } else if !output_read_data { + println!("READ[{stream_id}]: {sz} bytes"); + } else { + println!( + "READ[{}]: {}", + stream_id, + String::from_utf8(data.clone()).unwrap() + ); + } + if fin { + return Ok(true); + } + } + } + + fn read(&mut self, client: &mut Connection, stream_id: StreamId) -> Res<()> { + let mut maybe_maybe_out_file = self.streams.get_mut(&stream_id); + match &mut maybe_maybe_out_file { + None => { + println!("Data on unexpected stream: {stream_id}"); + return Ok(()); + } + Some(maybe_out_file) => { + let fin_recvd = Self::read_from_stream( + client, + stream_id, + self.args.output_read_data, + maybe_out_file, + )?; + + if fin_recvd { + if maybe_out_file.is_none() { + println!(""); + } + self.streams.remove(&stream_id); + self.download_urls(client); + } + } + } + Ok(()) + } +} diff --git a/neqo-bin/src/bin/client/http3.rs b/neqo-bin/src/bin/client/http3.rs new file mode 100644 index 0000000000..754de9cb16 --- /dev/null +++ b/neqo-bin/src/bin/client/http3.rs @@ -0,0 +1,456 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! An HTTP 3 client implementation. + +use std::{ + cell::RefCell, + collections::{HashMap, VecDeque}, + fmt::Display, + fs::File, + io::Write, + net::SocketAddr, + path::PathBuf, + rc::Rc, + time::Instant, +}; + +use neqo_common::{event::Provider, hex, Datagram, Header}; +use neqo_crypto::{AuthenticationStatus, ResumptionToken}; +use neqo_http3::{Error, Http3Client, Http3ClientEvent, Http3Parameters, Http3State, Priority}; +use neqo_transport::{ + AppError, Connection, EmptyConnectionIdGenerator, Error as TransportError, Output, StreamId, +}; +use url::Url; + +use crate::{get_output_file, qlog_new, Args, KeyUpdateState, Res}; + +pub(crate) struct Handler<'a> { + #[allow( + unknown_lints, + clippy::struct_field_names, + clippy::redundant_field_names + )] + url_handler: UrlHandler<'a>, + key_update: KeyUpdateState, + token: Option, + output_read_data: bool, +} + +impl<'a> Handler<'a> { + pub(crate) fn new( + url_queue: VecDeque, + args: &'a Args, + key_update: KeyUpdateState, + ) -> Self { + let url_handler = UrlHandler { + url_queue, + stream_handlers: HashMap::new(), + all_paths: Vec::new(), + handler_type: if args.test.is_some() { + StreamHandlerType::Upload + } else { + StreamHandlerType::Download + }, + args, + }; + + Self { + url_handler, + key_update, + token: None, + output_read_data: args.output_read_data, + } + } +} + +pub(crate) fn create_client( + args: &Args, + local_addr: SocketAddr, + remote_addr: SocketAddr, + hostname: &str, + resumption_token: Option, +) -> Res { + let mut transport = Connection::new_client( + hostname, + &[&args.shared.alpn], + Rc::new(RefCell::new(EmptyConnectionIdGenerator::default())), + local_addr, + remote_addr, + args.shared.quic_parameters.get(args.shared.alpn.as_str()), + Instant::now(), + )?; + let ciphers = args.get_ciphers(); + if !ciphers.is_empty() { + transport.set_ciphers(&ciphers)?; + } + let mut client = Http3Client::new_with_conn( + transport, + Http3Parameters::default() + .max_table_size_encoder(args.shared.max_table_size_encoder) + .max_table_size_decoder(args.shared.max_table_size_decoder) + .max_blocked_streams(args.shared.max_blocked_streams) + .max_concurrent_push_streams(args.max_concurrent_push_streams), + ); + + let qlog = qlog_new(args, hostname, client.connection_id())?; + client.set_qlog(qlog); + if let Some(ech) = &args.ech { + client.enable_ech(ech).expect("enable ECH"); + } + if let Some(token) = resumption_token { + client + .enable_resumption(Instant::now(), token) + .expect("enable resumption"); + } + + Ok(client) +} + +impl super::Client for Http3Client { + fn is_closed(&self) -> bool { + matches!(self.state(), Http3State::Closed(..)) + } + + fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { + self.process(dgram, now) + } + + fn close(&mut self, now: Instant, app_error: AppError, msg: S) + where + S: AsRef + Display, + { + self.close(now, app_error, msg); + } +} + +impl<'a> super::Handler for Handler<'a> { + type Client = Http3Client; + + fn handle(&mut self, client: &mut Http3Client) -> Res { + while let Some(event) = client.next_event() { + match event { + Http3ClientEvent::AuthenticationNeeded => { + client.authenticated(AuthenticationStatus::Ok, Instant::now()); + } + Http3ClientEvent::HeaderReady { + stream_id, + headers, + fin, + .. + } => { + if let Some(handler) = self.url_handler.stream_handler(stream_id) { + handler.process_header_ready(stream_id, fin, headers); + } else { + println!("Data on unexpected stream: {stream_id}"); + } + if fin { + self.url_handler.on_stream_fin(client, stream_id); + } + } + Http3ClientEvent::DataReadable { stream_id } => { + let mut stream_done = false; + match self.url_handler.stream_handler(stream_id) { + None => { + println!("Data on unexpected stream: {stream_id}"); + } + Some(handler) => loop { + let mut data = vec![0; 4096]; + let (sz, fin) = client + .read_data(Instant::now(), stream_id, &mut data) + .expect("Read should succeed"); + + handler.process_data_readable( + stream_id, + fin, + data, + sz, + self.output_read_data, + )?; + + if fin { + stream_done = true; + break; + } + + if sz == 0 { + break; + } + }, + } + + if stream_done { + self.url_handler.on_stream_fin(client, stream_id); + } + } + Http3ClientEvent::DataWritable { stream_id } => { + match self.url_handler.stream_handler(stream_id) { + None => { + println!("Data on unexpected stream: {stream_id}"); + } + Some(handler) => { + handler.process_data_writable(client, stream_id); + } + } + } + Http3ClientEvent::StateChange(Http3State::Connected) + | Http3ClientEvent::RequestsCreatable => { + self.url_handler.process_urls(client); + } + Http3ClientEvent::ResumptionToken(t) => self.token = Some(t), + _ => { + println!("Unhandled event {event:?}"); + } + } + } + + Ok(self.url_handler.done()) + } + + fn maybe_key_update(&mut self, c: &mut Http3Client) -> Res<()> { + self.key_update.maybe_update(|| c.initiate_key_update())?; + self.url_handler.process_urls(c); + Ok(()) + } + + fn take_token(&mut self) -> Option { + self.token.take() + } + + fn has_token(&self) -> bool { + self.token.is_some() + } +} + +trait StreamHandler { + fn process_header_ready(&mut self, stream_id: StreamId, fin: bool, headers: Vec
); + fn process_data_readable( + &mut self, + stream_id: StreamId, + fin: bool, + data: Vec, + sz: usize, + output_read_data: bool, + ) -> Res; + fn process_data_writable(&mut self, client: &mut Http3Client, stream_id: StreamId); +} + +enum StreamHandlerType { + Download, + Upload, +} + +impl StreamHandlerType { + fn make_handler( + handler_type: &Self, + url: &Url, + args: &Args, + all_paths: &mut Vec, + client: &mut Http3Client, + client_stream_id: StreamId, + ) -> Box { + match handler_type { + Self::Download => { + let out_file = get_output_file(url, &args.output_dir, all_paths); + client.stream_close_send(client_stream_id).unwrap(); + Box::new(DownloadStreamHandler { out_file }) + } + Self::Upload => Box::new(UploadStreamHandler { + data: vec![42; args.upload_size], + offset: 0, + chunk_size: 32768, + start: Instant::now(), + }), + } + } +} + +struct DownloadStreamHandler { + out_file: Option, +} + +impl StreamHandler for DownloadStreamHandler { + fn process_header_ready(&mut self, stream_id: StreamId, fin: bool, headers: Vec
) { + if self.out_file.is_none() { + println!("READ HEADERS[{stream_id}]: fin={fin} {headers:?}"); + } + } + + fn process_data_readable( + &mut self, + stream_id: StreamId, + fin: bool, + data: Vec, + sz: usize, + output_read_data: bool, + ) -> Res { + if let Some(out_file) = &mut self.out_file { + if sz > 0 { + out_file.write_all(&data[..sz])?; + } + return Ok(true); + } else if !output_read_data { + println!("READ[{stream_id}]: {sz} bytes"); + } else if let Ok(txt) = String::from_utf8(data.clone()) { + println!("READ[{stream_id}]: {txt}"); + } else { + println!("READ[{}]: 0x{}", stream_id, hex(&data)); + } + + if fin && self.out_file.is_none() { + println!(""); + } + + Ok(true) + } + + fn process_data_writable(&mut self, _client: &mut Http3Client, _stream_id: StreamId) {} +} + +struct UploadStreamHandler { + data: Vec, + offset: usize, + chunk_size: usize, + start: Instant, +} + +impl StreamHandler for UploadStreamHandler { + fn process_header_ready(&mut self, stream_id: StreamId, fin: bool, headers: Vec
) { + println!("READ HEADERS[{stream_id}]: fin={fin} {headers:?}"); + } + + fn process_data_readable( + &mut self, + stream_id: StreamId, + _fin: bool, + data: Vec, + _sz: usize, + _output_read_data: bool, + ) -> Res { + if let Ok(txt) = String::from_utf8(data.clone()) { + let trimmed_txt = txt.trim_end_matches(char::from(0)); + let parsed: usize = trimmed_txt.parse().unwrap(); + if parsed == self.data.len() { + let upload_time = Instant::now().duration_since(self.start); + println!("Stream ID: {stream_id:?}, Upload time: {upload_time:?}"); + } + } else { + panic!("Unexpected data [{}]: 0x{}", stream_id, hex(&data)); + } + Ok(true) + } + + fn process_data_writable(&mut self, client: &mut Http3Client, stream_id: StreamId) { + while self.offset < self.data.len() { + let end = self.offset + self.chunk_size.min(self.data.len() - self.offset); + let chunk = &self.data[self.offset..end]; + match client.send_data(stream_id, chunk) { + Ok(amount) => { + if amount == 0 { + break; + } + self.offset += amount; + if self.offset == self.data.len() { + client.stream_close_send(stream_id).unwrap(); + } + } + Err(_) => break, + }; + } + } +} + +struct UrlHandler<'a> { + url_queue: VecDeque, + stream_handlers: HashMap>, + all_paths: Vec, + handler_type: StreamHandlerType, + args: &'a Args, +} + +impl<'a> UrlHandler<'a> { + fn stream_handler(&mut self, stream_id: StreamId) -> Option<&mut Box> { + self.stream_handlers.get_mut(&stream_id) + } + + fn process_urls(&mut self, client: &mut Http3Client) { + loop { + if self.url_queue.is_empty() { + break; + } + if self.stream_handlers.len() >= self.args.concurrency { + break; + } + if !self.next_url(client) { + break; + } + } + } + + fn next_url(&mut self, client: &mut Http3Client) -> bool { + let url = self + .url_queue + .pop_front() + .expect("download_next called with empty queue"); + match client.fetch( + Instant::now(), + &self.args.method, + &url, + &to_headers(&self.args.header), + Priority::default(), + ) { + Ok(client_stream_id) => { + println!("Successfully created stream id {client_stream_id} for {url}"); + + let handler: Box = StreamHandlerType::make_handler( + &self.handler_type, + &url, + self.args, + &mut self.all_paths, + client, + client_stream_id, + ); + self.stream_handlers.insert(client_stream_id, handler); + true + } + Err( + Error::TransportError(TransportError::StreamLimitError) + | Error::StreamLimitError + | Error::Unavailable, + ) => { + self.url_queue.push_front(url); + false + } + Err(e) => { + panic!("Can't create stream {e}"); + } + } + } + + fn done(&mut self) -> bool { + self.stream_handlers.is_empty() && self.url_queue.is_empty() + } + + fn on_stream_fin(&mut self, client: &mut Http3Client, stream_id: StreamId) { + self.stream_handlers.remove(&stream_id); + self.process_urls(client); + } +} + +fn to_headers(values: &[impl AsRef]) -> Vec
{ + values + .iter() + .scan(None, |state, value| { + if let Some(name) = state.take() { + *state = None; + Some(Header::new(name, value.as_ref())) + } else { + *state = Some(value.as_ref().to_string()); + None + } + }) + .collect() +} diff --git a/neqo-bin/src/bin/client/main.rs b/neqo-bin/src/bin/client/main.rs index 2f9be1f3d7..4710f8b222 100644 --- a/neqo-bin/src/bin/client/main.rs +++ b/neqo-bin/src/bin/client/main.rs @@ -5,16 +5,14 @@ // except according to those terms. use std::{ - cell::RefCell, collections::{HashMap, VecDeque}, fmt::{self, Display}, fs::{create_dir_all, File, OpenOptions}, - io::{self, Write}, + io, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs}, path::PathBuf, pin::Pin, process::exit, - rc::Rc, time::Instant, }; @@ -23,24 +21,20 @@ use futures::{ future::{select, Either}, FutureExt, TryFutureExt, }; -use neqo_common::{ - self as common, event::Provider, hex, qdebug, qinfo, qlog::NeqoQlog, udp, Datagram, Role, -}; +use neqo_common::{self as common, qdebug, qinfo, qlog::NeqoQlog, udp, Datagram, Role}; use neqo_crypto::{ constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256}, - init, AuthenticationStatus, Cipher, ResumptionToken, -}; -use neqo_http3::{ - Error, Header, Http3Client, Http3ClientEvent, Http3Parameters, Http3State, Output, Priority, -}; -use neqo_transport::{ - Connection, ConnectionId, EmptyConnectionIdGenerator, Error as TransportError, StreamId, - Version, + init, Cipher, ResumptionToken, }; +use neqo_http3::{Error, Output}; +use neqo_transport::{AppError, ConnectionId, Error as TransportError, Version}; use qlog::{events::EventImportance, streamer::QlogStreamer}; use tokio::time::Sleep; use url::{Origin, Url}; +mod http09; +mod http3; + #[derive(Debug)] pub enum ClientError { ArgumentError(&'static str), @@ -207,7 +201,16 @@ impl Args { self.shared.quic_parameters.quic_version = vec![Version::Version1]; match testcase.as_str() { // TODO: Add "ecn" when that is ready. - "http3" => {} + "http3" => { + if let Some(testcase) = &self.test { + if testcase.as_str() != "upload" { + eprintln!("Unsupported test case: {testcase}"); + exit(127) + } + + self.method = String::from("POST"); + } + } "handshake" | "transfer" | "retry" => { self.shared.use_old_http = true; } @@ -304,421 +307,60 @@ async fn ready( select(socket_ready, timeout_ready).await.factor_first().0 } -trait StreamHandler { - fn process_header_ready(&mut self, stream_id: StreamId, fin: bool, headers: Vec
); - fn process_data_readable( - &mut self, - stream_id: StreamId, - fin: bool, - data: Vec, - sz: usize, - output_read_data: bool, - ) -> Res; - fn process_data_writable(&mut self, client: &mut Http3Client, stream_id: StreamId); -} - -enum StreamHandlerType { - Download, - Upload, -} - -impl StreamHandlerType { - fn make_handler( - handler_type: &Self, - url: &Url, - args: &Args, - all_paths: &mut Vec, - client: &mut Http3Client, - client_stream_id: StreamId, - ) -> Box { - match handler_type { - Self::Download => { - let out_file = get_output_file(url, &args.output_dir, all_paths); - client.stream_close_send(client_stream_id).unwrap(); - Box::new(DownloadStreamHandler { out_file }) - } - Self::Upload => Box::new(UploadStreamHandler { - data: vec![42; args.upload_size], - offset: 0, - chunk_size: 32768, - start: Instant::now(), - }), - } - } -} - -struct DownloadStreamHandler { - out_file: Option, -} - -impl StreamHandler for DownloadStreamHandler { - fn process_header_ready(&mut self, stream_id: StreamId, fin: bool, headers: Vec
) { - if self.out_file.is_none() { - println!("READ HEADERS[{stream_id}]: fin={fin} {headers:?}"); - } - } - - fn process_data_readable( - &mut self, - stream_id: StreamId, - fin: bool, - data: Vec, - sz: usize, - output_read_data: bool, - ) -> Res { - if let Some(out_file) = &mut self.out_file { - if sz > 0 { - out_file.write_all(&data[..sz])?; - } - return Ok(true); - } else if !output_read_data { - println!("READ[{stream_id}]: {sz} bytes"); - } else if let Ok(txt) = String::from_utf8(data.clone()) { - println!("READ[{stream_id}]: {txt}"); - } else { - println!("READ[{}]: 0x{}", stream_id, hex(&data)); - } - - if fin && self.out_file.is_none() { - println!(""); - } - - Ok(true) - } - - fn process_data_writable(&mut self, _client: &mut Http3Client, _stream_id: StreamId) {} -} +/// Handles a given task on the provided [`Client`]. +trait Handler { + type Client: Client; -struct UploadStreamHandler { - data: Vec, - offset: usize, - chunk_size: usize, - start: Instant, + fn handle(&mut self, client: &mut Self::Client) -> Res; + fn maybe_key_update(&mut self, c: &mut Self::Client) -> Res<()>; + fn take_token(&mut self) -> Option; + fn has_token(&self) -> bool; } -impl StreamHandler for UploadStreamHandler { - fn process_header_ready(&mut self, stream_id: StreamId, fin: bool, headers: Vec
) { - println!("READ HEADERS[{stream_id}]: fin={fin} {headers:?}"); - } - - fn process_data_readable( - &mut self, - stream_id: StreamId, - _fin: bool, - data: Vec, - _sz: usize, - _output_read_data: bool, - ) -> Res { - if let Ok(txt) = String::from_utf8(data.clone()) { - let trimmed_txt = txt.trim_end_matches(char::from(0)); - let parsed: usize = trimmed_txt.parse().unwrap(); - if parsed == self.data.len() { - let upload_time = Instant::now().duration_since(self.start); - println!("Stream ID: {stream_id:?}, Upload time: {upload_time:?}"); - } - } else { - panic!("Unexpected data [{}]: 0x{}", stream_id, hex(&data)); - } - Ok(true) - } - - fn process_data_writable(&mut self, client: &mut Http3Client, stream_id: StreamId) { - while self.offset < self.data.len() { - let end = self.offset + self.chunk_size.min(self.data.len() - self.offset); - let chunk = &self.data[self.offset..end]; - match client.send_data(stream_id, chunk) { - Ok(amount) => { - if amount == 0 { - break; - } - self.offset += amount; - if self.offset == self.data.len() { - client.stream_close_send(stream_id).unwrap(); - } - } - Err(_) => break, - }; - } - } +/// Network client, e.g. [`neqo_transport::Connection`] or [`neqo_http3::Http3Client`]. +trait Client { + fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output; + fn close(&mut self, now: Instant, app_error: AppError, msg: S) + where + S: AsRef + Display; + fn is_closed(&self) -> bool; } -struct URLHandler<'a> { - url_queue: VecDeque, - stream_handlers: HashMap>, - all_paths: Vec, - handler_type: StreamHandlerType, +struct Runner<'a, H: Handler> { + local_addr: SocketAddr, + socket: &'a mut udp::Socket, + client: H::Client, + handler: H, + timeout: Option>>, args: &'a Args, } -impl<'a> URLHandler<'a> { - fn stream_handler(&mut self, stream_id: StreamId) -> Option<&mut Box> { - self.stream_handlers.get_mut(&stream_id) - } - - fn process_urls(&mut self, client: &mut Http3Client) { +impl<'a, H: Handler> Runner<'a, H> { + async fn run(mut self) -> Res> { loop { - if self.url_queue.is_empty() { - break; - } - if self.stream_handlers.len() >= self.args.concurrency { - break; - } - if !self.next_url(client) { - break; - } - } - } + let handler_done = self.handler.handle(&mut self.client)?; - fn next_url(&mut self, client: &mut Http3Client) -> bool { - let url = self - .url_queue - .pop_front() - .expect("download_next called with empty queue"); - match client.fetch( - Instant::now(), - &self.args.method, - &url, - &to_headers(&self.args.header), - Priority::default(), - ) { - Ok(client_stream_id) => { - println!("Successfully created stream id {client_stream_id} for {url}"); - - let handler: Box = StreamHandlerType::make_handler( - &self.handler_type, - &url, - self.args, - &mut self.all_paths, - client, - client_stream_id, - ); - self.stream_handlers.insert(client_stream_id, handler); - true - } - Err( - Error::TransportError(TransportError::StreamLimitError) - | Error::StreamLimitError - | Error::Unavailable, - ) => { - self.url_queue.push_front(url); - false - } - Err(e) => { - panic!("Can't create stream {e}"); - } - } - } - - fn done(&mut self) -> bool { - self.stream_handlers.is_empty() && self.url_queue.is_empty() - } - - fn on_stream_fin(&mut self, client: &mut Http3Client, stream_id: StreamId) -> bool { - self.stream_handlers.remove(&stream_id); - self.process_urls(client); - if self.done() { - client.close(Instant::now(), 0, "kthxbye!"); - return false; - } - true - } -} - -struct Handler<'a> { - #[allow( - unknown_lints, - clippy::struct_field_names, - clippy::redundant_field_names - )] - url_handler: URLHandler<'a>, - key_update: KeyUpdateState, - token: Option, - output_read_data: bool, -} - -impl<'a> Handler<'a> { - pub fn new( - url_handler: URLHandler<'a>, - key_update: KeyUpdateState, - output_read_data: bool, - ) -> Self { - Self { - url_handler, - key_update, - token: None, - output_read_data, - } - } - - fn maybe_key_update(&mut self, c: &mut Http3Client) -> Res<()> { - self.key_update.maybe_update(|| c.initiate_key_update())?; - self.url_handler.process_urls(c); - Ok(()) - } - - fn handle(&mut self, client: &mut Http3Client) -> Res { - while let Some(event) = client.next_event() { - match event { - Http3ClientEvent::AuthenticationNeeded => { - client.authenticated(AuthenticationStatus::Ok, Instant::now()); - } - Http3ClientEvent::HeaderReady { - stream_id, - headers, - fin, - .. - } => { - if let Some(handler) = self.url_handler.stream_handler(stream_id) { - handler.process_header_ready(stream_id, fin, headers); - } else { - println!("Data on unexpected stream: {stream_id}"); - return Ok(false); - } - if fin { - return Ok(self.url_handler.on_stream_fin(client, stream_id)); - } - } - Http3ClientEvent::DataReadable { stream_id } => { - let mut stream_done = false; - match self.url_handler.stream_handler(stream_id) { - None => { - println!("Data on unexpected stream: {stream_id}"); - return Ok(false); - } - Some(handler) => loop { - let mut data = vec![0; 4096]; - let (sz, fin) = client - .read_data(Instant::now(), stream_id, &mut data) - .expect("Read should succeed"); - - handler.process_data_readable( - stream_id, - fin, - data, - sz, - self.output_read_data, - )?; - - if fin { - stream_done = true; - break; - } - - if sz == 0 { - break; - } - }, - } - - if stream_done { - return Ok(self.url_handler.on_stream_fin(client, stream_id)); + match (handler_done, self.args.resume, self.handler.has_token()) { + // Handler isn't done. Continue. + (false, _, _) => {}, + // Handler done. Resumption token needed but not present. Continue. + (true, true, false) => { + qdebug!("Handler done. Waiting for resumption token."); } - } - Http3ClientEvent::DataWritable { stream_id } => { - match self.url_handler.stream_handler(stream_id) { - None => { - println!("Data on unexpected stream: {stream_id}"); - return Ok(false); - } - Some(handler) => { - handler.process_data_writable(client, stream_id); - return Ok(true); - } + // Handler is done, no resumption token needed. Close. + (true, false, _) | + // Handler is done, resumption token needed and present. Close. + (true, true, true) => { + self.client.close(Instant::now(), 0, "kthxbye!"); } } - Http3ClientEvent::StateChange(Http3State::Connected) - | Http3ClientEvent::RequestsCreatable => { - self.url_handler.process_urls(client); - } - Http3ClientEvent::ResumptionToken(t) => self.token = Some(t), - _ => { - println!("Unhandled event {event:?}"); - } - } - } - - Ok(true) - } -} - -fn to_headers(values: &[impl AsRef]) -> Vec
{ - values - .iter() - .scan(None, |state, value| { - if let Some(name) = state.take() { - *state = None; - Some(Header::new(name, value.as_ref())) - } else { - *state = Some(value.as_ref().to_string()); - None - } - }) - .collect() -} -struct ClientRunner<'a> { - local_addr: SocketAddr, - socket: &'a mut udp::Socket, - client: Http3Client, - handler: Handler<'a>, - timeout: Option>>, - args: &'a Args, -} - -impl<'a> ClientRunner<'a> { - fn new( - args: &'a mut Args, - socket: &'a mut udp::Socket, - local_addr: SocketAddr, - remote_addr: SocketAddr, - hostname: &str, - url_queue: VecDeque, - resumption_token: Option, - ) -> ClientRunner<'a> { - if let Some(testcase) = &args.test { - if testcase.as_str() != "upload" { - eprintln!("Unsupported test case: {testcase}"); - exit(127) - } - } - - let client = create_http3_client(args, local_addr, remote_addr, hostname, resumption_token) - .expect("failed to create client"); - if args.test.is_some() { - args.method = String::from("POST"); - } - let key_update = KeyUpdateState(args.key_update); - let url_handler = URLHandler { - url_queue, - stream_handlers: HashMap::new(), - all_paths: Vec::new(), - handler_type: if args.test.is_some() { - StreamHandlerType::Upload - } else { - StreamHandlerType::Download - }, - args, - }; - let handler = Handler::new(url_handler, key_update, args.output_read_data); - - Self { - local_addr, - socket, - client, - handler, - timeout: None, - args, - } - } + self.process(None).await?; - async fn run(mut self) -> Res> { - loop { - if !self.handler.handle(&mut self.client)? { - break; + if self.client.is_closed() { + return Ok(self.handler.take_token()); } - self.process(None).await?; - match ready(self.socket, self.timeout.as_mut()).await? { Ready::Socket => loop { let dgrams = self.socket.recv(&self.local_addr)?; @@ -734,24 +376,7 @@ impl<'a> ClientRunner<'a> { self.timeout = None; } } - - if let Http3State::Closed(..) = self.client.state() { - break; - } } - - let token = if self.args.test.is_none() && self.args.resume { - // If we haven't received an event, take a token if there is one. - // Lots of servers don't provide NEW_TOKEN, but a session ticket - // without NEW_TOKEN is better than nothing. - self.handler - .token - .take() - .or_else(|| self.client.take_resumption_token(Instant::now())) - } else { - None - }; - Ok(token) } async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> { @@ -777,49 +402,6 @@ impl<'a> ClientRunner<'a> { } } -fn create_http3_client( - args: &mut Args, - local_addr: SocketAddr, - remote_addr: SocketAddr, - hostname: &str, - resumption_token: Option, -) -> Res { - let mut transport = Connection::new_client( - hostname, - &[&args.shared.alpn], - Rc::new(RefCell::new(EmptyConnectionIdGenerator::default())), - local_addr, - remote_addr, - args.shared.quic_parameters.get(args.shared.alpn.as_str()), - Instant::now(), - )?; - let ciphers = args.get_ciphers(); - if !ciphers.is_empty() { - transport.set_ciphers(&ciphers)?; - } - let mut client = Http3Client::new_with_conn( - transport, - Http3Parameters::default() - .max_table_size_encoder(args.shared.max_table_size_encoder) - .max_table_size_decoder(args.shared.max_table_size_decoder) - .max_blocked_streams(args.shared.max_blocked_streams) - .max_concurrent_push_streams(args.max_concurrent_push_streams), - ); - - let qlog = qlog_new(args, hostname, client.connection_id())?; - client.set_qlog(qlog); - if let Some(ech) = &args.ech { - client.enable_ech(ech).expect("enable ECH"); - } - if let Some(token) = resumption_token { - client - .enable_resumption(Instant::now(), token) - .expect("enable resumption"); - } - - Ok(client) -} - fn qlog_new(args: &Args, hostname: &str, cid: &ConnectionId) -> Res { if let Some(qlog_dir) = &args.shared.qlog_dir { let mut qlog_path = qlog_dir.clone(); @@ -916,28 +498,39 @@ async fn main() -> Res<()> { first = false; + let key_update = KeyUpdateState(args.key_update); + token = if args.shared.use_old_http { - old::ClientRunner::new( - &args, - &mut socket, - real_local, - remote_addr, - &hostname, - to_request, - token, - )? + let client = + http09::create_client(&args, real_local, remote_addr, &hostname, token) + .expect("failed to create client"); + + let handler = http09::Handler::new(to_request, &args, key_update); + + Runner { + args: &args, + client, + handler, + local_addr: real_local, + socket: &mut socket, + timeout: None, + } .run() .await? } else { - ClientRunner::new( - &mut args, - &mut socket, - real_local, - remote_addr, - &hostname, - to_request, - token, - ) + let client = http3::create_client(&args, real_local, remote_addr, &hostname, token) + .expect("failed to create client"); + + let handler = http3::Handler::new(to_request, &args, key_update); + + Runner { + args: &args, + client, + handler, + local_addr: real_local, + socket: &mut socket, + timeout: None, + } .run() .await? }; @@ -946,332 +539,3 @@ async fn main() -> Res<()> { Ok(()) } - -mod old { - use std::{ - cell::RefCell, - collections::{HashMap, VecDeque}, - fs::File, - io::{self, Write}, - net::SocketAddr, - path::PathBuf, - pin::Pin, - rc::Rc, - time::Instant, - }; - - use neqo_common::{event::Provider, qdebug, qinfo, udp, Datagram}; - use neqo_crypto::{AuthenticationStatus, ResumptionToken}; - use neqo_transport::{ - Connection, ConnectionEvent, EmptyConnectionIdGenerator, Error, Output, State, StreamId, - StreamType, - }; - use tokio::time::Sleep; - use url::Url; - - use super::{get_output_file, qlog_new, ready, Args, KeyUpdateState, Ready, Res}; - - struct HandlerOld<'b> { - streams: HashMap>, - url_queue: VecDeque, - all_paths: Vec, - args: &'b Args, - token: Option, - key_update: KeyUpdateState, - } - - impl<'b> HandlerOld<'b> { - fn download_urls(&mut self, client: &mut Connection) { - loop { - if self.url_queue.is_empty() { - break; - } - if self.streams.len() >= self.args.concurrency { - break; - } - if !self.download_next(client) { - break; - } - } - } - - fn download_next(&mut self, client: &mut Connection) -> bool { - if self.key_update.needed() { - println!("Deferring requests until after first key update"); - return false; - } - let url = self - .url_queue - .pop_front() - .expect("download_next called with empty queue"); - match client.stream_create(StreamType::BiDi) { - Ok(client_stream_id) => { - println!("Created stream {client_stream_id} for {url}"); - let req = format!("GET {}\r\n", url.path()); - _ = client - .stream_send(client_stream_id, req.as_bytes()) - .unwrap(); - client.stream_close_send(client_stream_id).unwrap(); - let out_file = - get_output_file(&url, &self.args.output_dir, &mut self.all_paths); - self.streams.insert(client_stream_id, out_file); - true - } - Err(e @ (Error::StreamLimitError | Error::ConnectionState)) => { - println!("Cannot create stream {e:?}"); - self.url_queue.push_front(url); - false - } - Err(e) => { - panic!("Error creating stream {e:?}"); - } - } - } - - /// Read and maybe print received data from a stream. - // Returns bool: was fin received? - fn read_from_stream( - client: &mut Connection, - stream_id: StreamId, - output_read_data: bool, - maybe_out_file: &mut Option, - ) -> Res { - let mut data = vec![0; 4096]; - loop { - let (sz, fin) = client.stream_recv(stream_id, &mut data)?; - if sz == 0 { - return Ok(fin); - } - - if let Some(out_file) = maybe_out_file { - out_file.write_all(&data[..sz])?; - } else if !output_read_data { - println!("READ[{stream_id}]: {sz} bytes"); - } else { - println!( - "READ[{}]: {}", - stream_id, - String::from_utf8(data.clone()).unwrap() - ); - } - if fin { - return Ok(true); - } - } - } - - fn maybe_key_update(&mut self, c: &mut Connection) -> Res<()> { - self.key_update.maybe_update(|| c.initiate_key_update())?; - self.download_urls(c); - Ok(()) - } - - fn read(&mut self, client: &mut Connection, stream_id: StreamId) -> Res<()> { - let mut maybe_maybe_out_file = self.streams.get_mut(&stream_id); - match &mut maybe_maybe_out_file { - None => { - println!("Data on unexpected stream: {stream_id}"); - return Ok(()); - } - Some(maybe_out_file) => { - let fin_recvd = Self::read_from_stream( - client, - stream_id, - self.args.output_read_data, - maybe_out_file, - )?; - - if fin_recvd { - if maybe_out_file.is_none() { - println!(""); - } - self.streams.remove(&stream_id); - self.download_urls(client); - } - } - } - Ok(()) - } - - /// Handle events on the connection. - /// - /// Returns `Ok(true)` when done, i.e. url queue is empty and streams are closed. - fn handle(&mut self, client: &mut Connection) -> Res { - while let Some(event) = client.next_event() { - match event { - ConnectionEvent::AuthenticationNeeded => { - client.authenticated(AuthenticationStatus::Ok, Instant::now()); - } - ConnectionEvent::RecvStreamReadable { stream_id } => { - self.read(client, stream_id)?; - } - ConnectionEvent::SendStreamWritable { stream_id } => { - println!("stream {stream_id} writable"); - } - ConnectionEvent::SendStreamComplete { stream_id } => { - println!("stream {stream_id} complete"); - } - ConnectionEvent::SendStreamCreatable { stream_type } => { - println!("stream {stream_type:?} creatable"); - if stream_type == StreamType::BiDi { - self.download_urls(client); - } - } - ConnectionEvent::StateChange( - State::WaitInitial | State::Handshaking | State::Connected, - ) => { - println!("{event:?}"); - self.download_urls(client); - } - ConnectionEvent::StateChange(State::Confirmed) => { - self.maybe_key_update(client)?; - } - ConnectionEvent::ResumptionToken(token) => { - self.token = Some(token); - } - _ => { - println!("Unhandled event {event:?}"); - } - } - } - - if self.streams.is_empty() && self.url_queue.is_empty() { - // Handler is done. - return Ok(true); - } - - Ok(false) - } - } - - pub struct ClientRunner<'a> { - local_addr: SocketAddr, - socket: &'a mut udp::Socket, - client: Connection, - handler: HandlerOld<'a>, - timeout: Option>>, - args: &'a Args, - } - - impl<'a> ClientRunner<'a> { - pub fn new( - args: &'a Args, - socket: &'a mut udp::Socket, - local_addr: SocketAddr, - remote_addr: SocketAddr, - origin: &str, - url_queue: VecDeque, - token: Option, - ) -> Res> { - let alpn = match args.shared.alpn.as_str() { - "hq-29" | "hq-30" | "hq-31" | "hq-32" => args.shared.alpn.as_str(), - _ => "hq-interop", - }; - - let mut client = Connection::new_client( - origin, - &[alpn], - Rc::new(RefCell::new(EmptyConnectionIdGenerator::default())), - local_addr, - remote_addr, - args.shared.quic_parameters.get(alpn), - Instant::now(), - )?; - - if let Some(tok) = token { - client.enable_resumption(Instant::now(), tok)?; - } - - let ciphers = args.get_ciphers(); - if !ciphers.is_empty() { - client.set_ciphers(&ciphers)?; - } - - client.set_qlog(qlog_new(args, origin, client.odcid().unwrap())?); - - let key_update = KeyUpdateState(args.key_update); - let handler = HandlerOld { - streams: HashMap::new(), - url_queue, - all_paths: Vec::new(), - args, - token: None, - key_update, - }; - - Ok(Self { - local_addr, - socket, - client, - handler, - timeout: None, - args, - }) - } - - pub async fn run(mut self) -> Res> { - loop { - let handler_done = self.handler.handle(&mut self.client)?; - - match (handler_done, self.args.resume, self.handler.token.is_some()) { - // Handler isn't done. Continue. - (false, _, _) => {}, - // Handler done. Resumption token needed but not present. Continue. - (true, true, false) => { - qdebug!("Handler done. Waiting for resumption token."); - } - // Handler is done, no resumption token needed. Close. - (true, false, _) | - // Handler is done, resumption token needed and present. Close. - (true, true, true) => { - self.client.close(Instant::now(), 0, "kthxbye!"); - } - } - - self.process(None).await?; - - if let State::Closed(..) = self.client.state() { - return Ok(self.handler.token.take()); - } - - match ready(self.socket, self.timeout.as_mut()).await? { - Ready::Socket => loop { - let dgrams = self.socket.recv(&self.local_addr)?; - if dgrams.is_empty() { - break; - } - for dgram in &dgrams { - self.process(Some(dgram)).await?; - } - self.handler.maybe_key_update(&mut self.client)?; - }, - Ready::Timeout => { - self.timeout = None; - } - } - } - } - - async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> { - loop { - match self.client.process(dgram.take(), Instant::now()) { - Output::Datagram(dgram) => { - self.socket.writable().await?; - self.socket.send(dgram)?; - } - Output::Callback(new_timeout) => { - qinfo!("Setting timeout of {:?}", new_timeout); - self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout))); - break; - } - Output::None => { - qdebug!("Output::None"); - break; - } - } - } - - Ok(()) - } - } -}