diff --git a/neqo-bin/src/client/http09.rs b/neqo-bin/src/client/http09.rs index 2f31928058..cc0092c504 100644 --- a/neqo-bin/src/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -26,7 +26,7 @@ use neqo_transport::{ use url::Url; use super::{get_output_file, qlog_new, Args, CloseState, Res}; -use crate::STREAM_IO_BUFFER_SIZE; +use crate::{client::local_addr_for, STREAM_IO_BUFFER_SIZE}; pub struct Handler<'a> { streams: HashMap>>, @@ -37,6 +37,7 @@ pub struct Handler<'a> { token: Option, needs_key_update: bool, read_buffer: Vec, + migration: Option<&'a (u16, SocketAddr)>, } impl Handler<'_> { @@ -85,6 +86,26 @@ impl super::Handler for Handler<'_> { self.download_urls(client); } } + ConnectionEvent::StateChange(State::Confirmed) => { + if let Some((local_port, migration_addr)) = self.migration.take() { + let local_addr = local_addr_for(migration_addr, *local_port); + qdebug!("Migrating path to {:?} -> {:?}", local_addr, migration_addr); + client + .migrate( + Some(local_addr), + Some(*migration_addr), + false, + Instant::now(), + ) + .map(|()| { + qinfo!( + "Connection migrated to {:?} -> {:?}", + local_addr, + migration_addr + ); + })?; + } + } ConnectionEvent::StateChange( State::WaitInitial | State::Handshaking | State::Connected, ) => { @@ -211,7 +232,11 @@ impl super::Client for Connection { } impl<'b> Handler<'b> { - pub fn new(url_queue: VecDeque, args: &'b Args) -> Self { + pub fn new( + url_queue: VecDeque, + args: &'b Args, + migration: Option<&'b (u16, SocketAddr)>, + ) -> Self { Self { streams: HashMap::new(), url_queue, @@ -221,6 +246,7 @@ impl<'b> Handler<'b> { token: None, needs_key_update: args.key_update, read_buffer: vec![0; STREAM_IO_BUFFER_SIZE], + migration, } } diff --git a/neqo-bin/src/client/mod.rs b/neqo-bin/src/client/mod.rs index 8bfac59703..f0e959bb53 100644 --- a/neqo-bin/src/client/mod.rs +++ b/neqo-bin/src/client/mod.rs @@ -31,7 +31,7 @@ use neqo_crypto::{ use neqo_http3::Output; use neqo_transport::{AppError, CloseReason, ConnectionId, Version}; use tokio::time::Sleep; -use url::{Origin, Url}; +use url::{Host, Origin, Url}; use crate::SharedArgs; @@ -493,6 +493,29 @@ fn qlog_new(args: &Args, hostname: &str, cid: &ConnectionId) -> Res { .map_err(Error::QlogError) } +const fn local_addr_for(remote_addr: &SocketAddr, local_port: u16) -> SocketAddr { + match remote_addr { + SocketAddr::V4(..) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), local_port), + SocketAddr::V6(..) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), local_port), + } +} + +fn urls_by_origin(urls: &[Url]) -> impl Iterator)> { + urls.iter() + .fold(HashMap::>::new(), |mut urls, url| { + urls.entry(url.origin()).or_default().push_back(url.clone()); + urls + }) + .into_iter() + .filter_map(|(origin, urls)| match origin { + Origin::Tuple(_scheme, h, p) => Some(((h, p), urls)), + Origin::Opaque(x) => { + qwarn!("Opaque origin {x:?}"); + None + } + }) +} + pub async fn client(mut args: Args) -> Res<()> { neqo_common::log::init( args.shared @@ -506,46 +529,24 @@ pub async fn client(mut args: Args) -> Res<()> { init()?; - let urls_by_origin = args - .urls - .clone() - .into_iter() - .fold(HashMap::>::new(), |mut urls, url| { - urls.entry(url.origin()).or_default().push_back(url); - urls - }) - .into_iter() - .filter_map(|(origin, urls)| match origin { - Origin::Tuple(_scheme, h, p) => Some(((h, p), urls)), - Origin::Opaque(x) => { - qwarn!("Opaque origin {x:?}"); - None - } - }); - - for ((host, port), mut urls) in urls_by_origin { + for ((host, port), mut urls) in urls_by_origin(&args.urls) { if args.resume && urls.len() < 2 { qerror!("Resumption to {host} cannot work without at least 2 URLs."); exit(127); } - let remote_addr = format!("{host}:{port}").to_socket_addrs()?.find(|addr| { + let mut remote_addrs = format!("{host}:{port}").to_socket_addrs()?.filter(|addr| { !matches!( (addr, args.ipv4_only, args.ipv6_only), (SocketAddr::V4(..), false, true) | (SocketAddr::V6(..), true, false) ) }); + let remote_addr = remote_addrs.next(); let Some(remote_addr) = remote_addr else { qerror!("No compatible address found for: {host}"); exit(1); }; - - let local_addr = match remote_addr { - SocketAddr::V4(..) => SocketAddr::new(IpAddr::V4(Ipv4Addr::from([0; 4])), 0), - SocketAddr::V6(..) => SocketAddr::new(IpAddr::V6(Ipv6Addr::from([0; 16])), 0), - }; - - let mut socket = crate::udp::Socket::bind(local_addr)?; + let mut socket = crate::udp::Socket::bind(local_addr_for(&remote_addr, 0))?; let real_local = socket.local_addr().unwrap(); qinfo!( "{} Client connecting: {:?} -> {:?}", @@ -554,6 +555,18 @@ pub async fn client(mut args: Args) -> Res<()> { remote_addr, ); + let migration = if args.shared.qns_test.as_deref() == Some("connectionmigration") { + #[allow(clippy::option_if_let_else)] + if let Some(addr) = remote_addrs.next() { + Some((real_local.port(), addr)) + } else { + qerror!("Cannot migrate from {host} when there is no address that follows"); + exit(127); + } + } else { + None + }; + let hostname = format!("{host}"); let mut token: Option = None; let mut first = true; @@ -571,7 +584,7 @@ pub async fn client(mut args: Args) -> Res<()> { http09::create_client(&args, real_local, remote_addr, &hostname, token) .expect("failed to create client"); - let handler = http09::Handler::new(to_request, &args); + let handler = http09::Handler::new(to_request, &args, migration.as_ref()); Runner { args: &args,