From fb3e4f262aa3972caa2421ea345db2e8083a41c9 Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Tue, 15 Oct 2024 11:31:19 +0200 Subject: [PATCH 1/7] feat: Support QNS `connectionmigration` test --- neqo-bin/src/client/http09.rs | 35 ++++++++++++++++++++++++++++++++--- neqo-bin/src/client/mod.rs | 33 ++++++++++++++++++++++++--------- 2 files changed, 56 insertions(+), 12 deletions(-) diff --git a/neqo-bin/src/client/http09.rs b/neqo-bin/src/client/http09.rs index 2f31928058..2a5fcfc58f 100644 --- a/neqo-bin/src/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -26,7 +26,7 @@ use neqo_transport::{ use url::Url; use super::{get_output_file, qlog_new, Args, CloseState, Res}; -use crate::STREAM_IO_BUFFER_SIZE; +use crate::{client::unspecified_addr, STREAM_IO_BUFFER_SIZE}; pub struct Handler<'a> { streams: HashMap>>, @@ -37,6 +37,7 @@ pub struct Handler<'a> { token: Option, needs_key_update: bool, read_buffer: Vec, + migration: Option<&'a (u16, SocketAddr)>, } impl Handler<'_> { @@ -86,10 +87,33 @@ impl super::Handler for Handler<'_> { } } ConnectionEvent::StateChange( - State::WaitInitial | State::Handshaking | State::Connected, + State::WaitInitial | State::Handshaking | State::Connected | State::Confirmed, ) => { qdebug!("{event:?}"); self.download_urls(client); + if event == ConnectionEvent::StateChange(State::Confirmed) { + if let Some((local_port, migration_addr)) = &self.migration { + let mut local_addr = unspecified_addr(migration_addr); + local_addr.set_port(*local_port); + qdebug!("Migrating path to {:?} -> {:?}", local_addr, migration_addr); + client + .migrate( + Some(local_addr), + Some(*migration_addr), + false, + Instant::now(), + ) + .map(|()| { + qinfo!( + "Connection migrated to {:?} -> {:?}", + local_addr, + migration_addr + ); + // Don't do another migration. + self.migration = None; + })?; + } + } } ConnectionEvent::ZeroRttRejected => { qdebug!("{event:?}"); @@ -211,7 +235,11 @@ impl super::Client for Connection { } impl<'b> Handler<'b> { - pub fn new(url_queue: VecDeque, args: &'b Args) -> Self { + pub fn new( + url_queue: VecDeque, + args: &'b Args, + migration: Option<&'b (u16, SocketAddr)>, + ) -> Self { Self { streams: HashMap::new(), url_queue, @@ -221,6 +249,7 @@ impl<'b> Handler<'b> { token: None, needs_key_update: args.key_update, read_buffer: vec![0; STREAM_IO_BUFFER_SIZE], + migration, } } diff --git a/neqo-bin/src/client/mod.rs b/neqo-bin/src/client/mod.rs index 8bfac59703..6cba50a00a 100644 --- a/neqo-bin/src/client/mod.rs +++ b/neqo-bin/src/client/mod.rs @@ -493,6 +493,14 @@ fn qlog_new(args: &Args, hostname: &str, cid: &ConnectionId) -> Res { .map_err(Error::QlogError) } +const fn unspecified_addr(addr: &SocketAddr) -> SocketAddr { + match addr { + SocketAddr::V4(..) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), + SocketAddr::V6(..) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0), + } +} + +#[allow(clippy::too_many_lines)] pub async fn client(mut args: Args) -> Res<()> { neqo_common::log::init( args.shared @@ -529,23 +537,18 @@ pub async fn client(mut args: Args) -> Res<()> { exit(127); } - let remote_addr = format!("{host}:{port}").to_socket_addrs()?.find(|addr| { + let mut remote_addrs = format!("{host}:{port}").to_socket_addrs()?.filter(|addr| { !matches!( (addr, args.ipv4_only, args.ipv6_only), (SocketAddr::V4(..), false, true) | (SocketAddr::V6(..), true, false) ) }); + let remote_addr = remote_addrs.next(); let Some(remote_addr) = remote_addr else { qerror!("No compatible address found for: {host}"); exit(1); }; - - let local_addr = match remote_addr { - SocketAddr::V4(..) => SocketAddr::new(IpAddr::V4(Ipv4Addr::from([0; 4])), 0), - SocketAddr::V6(..) => SocketAddr::new(IpAddr::V6(Ipv6Addr::from([0; 16])), 0), - }; - - let mut socket = crate::udp::Socket::bind(local_addr)?; + let mut socket = crate::udp::Socket::bind(unspecified_addr(&remote_addr))?; let real_local = socket.local_addr().unwrap(); qinfo!( "{} Client connecting: {:?} -> {:?}", @@ -554,6 +557,18 @@ pub async fn client(mut args: Args) -> Res<()> { remote_addr, ); + let migration = if args.shared.qns_test == Some("connectionmigration".to_owned()) { + remote_addrs.next().map_or_else( + || { + qerror!("No migration address found for {host}"); + exit(127); + }, + |migration_addr| Some((real_local.port(), migration_addr)), + ) + } else { + None + }; + let hostname = format!("{host}"); let mut token: Option = None; let mut first = true; @@ -571,7 +586,7 @@ pub async fn client(mut args: Args) -> Res<()> { http09::create_client(&args, real_local, remote_addr, &hostname, token) .expect("failed to create client"); - let handler = http09::Handler::new(to_request, &args); + let handler = http09::Handler::new(to_request, &args, migration.as_ref()); Runner { args: &args, From c97782bd7b31e03ebc266d5de105bd4df22b64cd Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Fri, 18 Oct 2024 11:44:14 +0300 Subject: [PATCH 2/7] Address review comments --- neqo-bin/src/client/http09.rs | 49 +++++++++++++-------------- neqo-bin/src/client/mod.rs | 64 +++++++++++++++++------------------ 2 files changed, 55 insertions(+), 58 deletions(-) diff --git a/neqo-bin/src/client/http09.rs b/neqo-bin/src/client/http09.rs index 2a5fcfc58f..62b79c7e95 100644 --- a/neqo-bin/src/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -26,7 +26,7 @@ use neqo_transport::{ use url::Url; use super::{get_output_file, qlog_new, Args, CloseState, Res}; -use crate::{client::unspecified_addr, STREAM_IO_BUFFER_SIZE}; +use crate::{client::local_addr_for, STREAM_IO_BUFFER_SIZE}; pub struct Handler<'a> { streams: HashMap>>, @@ -86,34 +86,33 @@ impl super::Handler for Handler<'_> { self.download_urls(client); } } + ConnectionEvent::StateChange(State::Confirmed) => { + if let Some((local_port, migration_addr)) = &self.migration { + let local_addr = local_addr_for(migration_addr, *local_port); + qdebug!("Migrating path to {:?} -> {:?}", local_addr, migration_addr); + client + .migrate( + Some(local_addr), + Some(*migration_addr), + false, + Instant::now(), + ) + .map(|()| { + qinfo!( + "Connection migrated to {:?} -> {:?}", + local_addr, + migration_addr + ); + // Don't do another migration. + self.migration = None; + })?; + } + } ConnectionEvent::StateChange( - State::WaitInitial | State::Handshaking | State::Connected | State::Confirmed, + State::WaitInitial | State::Handshaking | State::Connected, ) => { qdebug!("{event:?}"); self.download_urls(client); - if event == ConnectionEvent::StateChange(State::Confirmed) { - if let Some((local_port, migration_addr)) = &self.migration { - let mut local_addr = unspecified_addr(migration_addr); - local_addr.set_port(*local_port); - qdebug!("Migrating path to {:?} -> {:?}", local_addr, migration_addr); - client - .migrate( - Some(local_addr), - Some(*migration_addr), - false, - Instant::now(), - ) - .map(|()| { - qinfo!( - "Connection migrated to {:?} -> {:?}", - local_addr, - migration_addr - ); - // Don't do another migration. - self.migration = None; - })?; - } - } } ConnectionEvent::ZeroRttRejected => { qdebug!("{event:?}"); diff --git a/neqo-bin/src/client/mod.rs b/neqo-bin/src/client/mod.rs index 6cba50a00a..f0e959bb53 100644 --- a/neqo-bin/src/client/mod.rs +++ b/neqo-bin/src/client/mod.rs @@ -31,7 +31,7 @@ use neqo_crypto::{ use neqo_http3::Output; use neqo_transport::{AppError, CloseReason, ConnectionId, Version}; use tokio::time::Sleep; -use url::{Origin, Url}; +use url::{Host, Origin, Url}; use crate::SharedArgs; @@ -493,14 +493,29 @@ fn qlog_new(args: &Args, hostname: &str, cid: &ConnectionId) -> Res { .map_err(Error::QlogError) } -const fn unspecified_addr(addr: &SocketAddr) -> SocketAddr { - match addr { - SocketAddr::V4(..) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), - SocketAddr::V6(..) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0), +const fn local_addr_for(remote_addr: &SocketAddr, local_port: u16) -> SocketAddr { + match remote_addr { + SocketAddr::V4(..) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), local_port), + SocketAddr::V6(..) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), local_port), } } -#[allow(clippy::too_many_lines)] +fn urls_by_origin(urls: &[Url]) -> impl Iterator)> { + urls.iter() + .fold(HashMap::>::new(), |mut urls, url| { + urls.entry(url.origin()).or_default().push_back(url.clone()); + urls + }) + .into_iter() + .filter_map(|(origin, urls)| match origin { + Origin::Tuple(_scheme, h, p) => Some(((h, p), urls)), + Origin::Opaque(x) => { + qwarn!("Opaque origin {x:?}"); + None + } + }) +} + pub async fn client(mut args: Args) -> Res<()> { neqo_common::log::init( args.shared @@ -514,24 +529,7 @@ pub async fn client(mut args: Args) -> Res<()> { init()?; - let urls_by_origin = args - .urls - .clone() - .into_iter() - .fold(HashMap::>::new(), |mut urls, url| { - urls.entry(url.origin()).or_default().push_back(url); - urls - }) - .into_iter() - .filter_map(|(origin, urls)| match origin { - Origin::Tuple(_scheme, h, p) => Some(((h, p), urls)), - Origin::Opaque(x) => { - qwarn!("Opaque origin {x:?}"); - None - } - }); - - for ((host, port), mut urls) in urls_by_origin { + for ((host, port), mut urls) in urls_by_origin(&args.urls) { if args.resume && urls.len() < 2 { qerror!("Resumption to {host} cannot work without at least 2 URLs."); exit(127); @@ -548,7 +546,7 @@ pub async fn client(mut args: Args) -> Res<()> { qerror!("No compatible address found for: {host}"); exit(1); }; - let mut socket = crate::udp::Socket::bind(unspecified_addr(&remote_addr))?; + let mut socket = crate::udp::Socket::bind(local_addr_for(&remote_addr, 0))?; let real_local = socket.local_addr().unwrap(); qinfo!( "{} Client connecting: {:?} -> {:?}", @@ -557,14 +555,14 @@ pub async fn client(mut args: Args) -> Res<()> { remote_addr, ); - let migration = if args.shared.qns_test == Some("connectionmigration".to_owned()) { - remote_addrs.next().map_or_else( - || { - qerror!("No migration address found for {host}"); - exit(127); - }, - |migration_addr| Some((real_local.port(), migration_addr)), - ) + let migration = if args.shared.qns_test.as_deref() == Some("connectionmigration") { + #[allow(clippy::option_if_let_else)] + if let Some(addr) = remote_addrs.next() { + Some((real_local.port(), addr)) + } else { + qerror!("Cannot migrate from {host} when there is no address that follows"); + exit(127); + } } else { None }; From 6a37f0236d094a8e3b2ac64d1038b9be92d74180 Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Mon, 21 Oct 2024 09:44:45 +0300 Subject: [PATCH 3/7] Update neqo-bin/src/client/http09.rs Co-authored-by: Martin Thomson Signed-off-by: Lars Eggert --- neqo-bin/src/client/http09.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neqo-bin/src/client/http09.rs b/neqo-bin/src/client/http09.rs index 62b79c7e95..caa7b2cc1b 100644 --- a/neqo-bin/src/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -87,7 +87,7 @@ impl super::Handler for Handler<'_> { } } ConnectionEvent::StateChange(State::Confirmed) => { - if let Some((local_port, migration_addr)) = &self.migration { + if let Some((local_port, migration_addr)) = self.migration.take() { let local_addr = local_addr_for(migration_addr, *local_port); qdebug!("Migrating path to {:?} -> {:?}", local_addr, migration_addr); client From ee3f5ee33599e52c045faf124afda5cf017037d2 Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Mon, 21 Oct 2024 09:44:58 +0300 Subject: [PATCH 4/7] Update neqo-bin/src/client/http09.rs Co-authored-by: Martin Thomson Signed-off-by: Lars Eggert --- neqo-bin/src/client/http09.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neqo-bin/src/client/http09.rs b/neqo-bin/src/client/http09.rs index caa7b2cc1b..fd78cb183b 100644 --- a/neqo-bin/src/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -88,7 +88,7 @@ impl super::Handler for Handler<'_> { } ConnectionEvent::StateChange(State::Confirmed) => { if let Some((local_port, migration_addr)) = self.migration.take() { - let local_addr = local_addr_for(migration_addr, *local_port); + let local_addr = local_addr_for(migration_addr, local_port); qdebug!("Migrating path to {:?} -> {:?}", local_addr, migration_addr); client .migrate( From 7d15f35959b6ee82ae92c22cb2e469dff75efda3 Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Mon, 21 Oct 2024 09:45:03 +0300 Subject: [PATCH 5/7] Update neqo-bin/src/client/http09.rs Co-authored-by: Martin Thomson Signed-off-by: Lars Eggert --- neqo-bin/src/client/http09.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neqo-bin/src/client/http09.rs b/neqo-bin/src/client/http09.rs index fd78cb183b..9f6d142b4e 100644 --- a/neqo-bin/src/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -93,7 +93,7 @@ impl super::Handler for Handler<'_> { client .migrate( Some(local_addr), - Some(*migration_addr), + Some(migration_addr), false, Instant::now(), ) From a0548c3376c8da6979affca92e3d2485e20acd4d Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Mon, 21 Oct 2024 09:45:09 +0300 Subject: [PATCH 6/7] Update neqo-bin/src/client/http09.rs Co-authored-by: Martin Thomson Signed-off-by: Lars Eggert --- neqo-bin/src/client/http09.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/neqo-bin/src/client/http09.rs b/neqo-bin/src/client/http09.rs index 9f6d142b4e..8dc7a5f66e 100644 --- a/neqo-bin/src/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -103,8 +103,6 @@ impl super::Handler for Handler<'_> { local_addr, migration_addr ); - // Don't do another migration. - self.migration = None; })?; } } From a2c2eaa5d867a2a472148260f8a3525baa85dfef Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Mon, 21 Oct 2024 11:37:58 +0300 Subject: [PATCH 7/7] Fix --- neqo-bin/src/client/http09.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/neqo-bin/src/client/http09.rs b/neqo-bin/src/client/http09.rs index 8dc7a5f66e..cc0092c504 100644 --- a/neqo-bin/src/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -88,12 +88,12 @@ impl super::Handler for Handler<'_> { } ConnectionEvent::StateChange(State::Confirmed) => { if let Some((local_port, migration_addr)) = self.migration.take() { - let local_addr = local_addr_for(migration_addr, local_port); + let local_addr = local_addr_for(migration_addr, *local_port); qdebug!("Migrating path to {:?} -> {:?}", local_addr, migration_addr); client .migrate( Some(local_addr), - Some(migration_addr), + Some(*migration_addr), false, Instant::now(), )