Skip to content

Commit

Permalink
fix(s2n-quic-transport): fix client connection migration when local a…
Browse files Browse the repository at this point in the history
…ddress handle changes (#1874)
  • Loading branch information
camshaft authored Aug 11, 2023
1 parent 2b5cb23 commit b1b5470
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 15 deletions.
3 changes: 2 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ rust-toolchain
**/.github
**/docs
**/scripts
**/specs
**/specs
**/perf.data*
8 changes: 6 additions & 2 deletions quic/s2n-quic-core/src/path/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,12 @@ impl Handle for Tuple {

#[inline]
fn maybe_update(&mut self, other: &Self) {
// once we discover our path, update the address local address
if self.local_address.port() == 0 {
if other.local_address.port() == 0 {
return;
}

// once we discover our path, or the port changes, update the address with the new information
if self.local_address.port() != other.local_address.port() {
self.local_address = other.local_address;
}
}
Expand Down
8 changes: 6 additions & 2 deletions quic/s2n-quic-core/src/xdp/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,12 @@ impl Handle for Tuple {

#[inline]
fn maybe_update(&mut self, other: &Self) {
// once we discover our path, update the address full address
if self.local_address.port == 0 {
if other.local_address.port == 0 {
return;
}

// once we discover our path, or the port changes, update the address with the new information
if self.local_address.port != other.local_address.port {
*self = *other;
}
}
Expand Down
8 changes: 6 additions & 2 deletions quic/s2n-quic-platform/src/message/msg/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,12 @@ impl path::Handle for Handle {

#[inline]
fn maybe_update(&mut self, other: &Self) {
// once we discover our path, update the address local address
if self.local_address.port() == 0 {
if other.local_address.port() == 0 {
return;
}

// once we discover our path, or the port changes, update the address with the new information
if self.local_address.port() != other.local_address.port() {
self.local_address = other.local_address;
}
}
Expand Down
2 changes: 1 addition & 1 deletion quic/s2n-quic-transport/src/path/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ impl<Config: endpoint::Config> Manager<Config> {
//# the client MUST discard these packets.
if Config::ENDPOINT_TYPE.is_client() {
return Err(DatagramDropReason::UnknownServerAddress);
};
}

//= https://www.rfc-editor.org/rfc/rfc9000#section-9
//# The design of QUIC relies on endpoints retaining a stable address
Expand Down
17 changes: 10 additions & 7 deletions quic/s2n-quic-transport/src/path/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,14 +545,17 @@ impl<Config: endpoint::Config> Path<Config> {
cwnd.saturating_sub(bytes_in_flight) < mtu
}

// Compare a Path based on its PathHandle.
//
// Currently the local_address on the Client connection is unknown and set to
// a default un-specified value; therefore only the remote_address is used
// to compare Paths.
/// Compare a Path based on its PathHandle.
///
/// In case the local_address on the connection is unknown and set to
/// a default un-specified value only the remote_address is used
/// to compare Paths.
///
/// In the case of the local endpoint being a client, the remote address is only used
/// since the client might experience address rebinding.
#[inline]
fn eq_by_handle(&self, handle: &Config::PathHandle) -> bool {
if self.handle.local_address().port() == 0 {
// Only compare the remote address if we haven't updated the local yet
if Config::ENDPOINT_TYPE.is_client() || self.handle.local_address().port() == 0 {
s2n_quic_core::path::Handle::eq(&self.handle.remote_address(), &handle.remote_address())
} else {
self.handle.eq(handle)
Expand Down
1 change: 1 addition & 0 deletions quic/s2n-quic/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mod setup;
use setup::*;

mod blackhole;
mod connection_migration;
mod interceptor;
mod mtu;
mod no_tls;
Expand Down
131 changes: 131 additions & 0 deletions quic/s2n-quic/src/tests/connection_migration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use super::*;

fn run_test<F>(mut on_rebind: F)
where
F: FnMut(SocketAddr) -> SocketAddr + Send + 'static,
{
let model = Model::default();
let rtt = Duration::from_millis(10);
let rebind_rate = rtt * 2;
// we currently only support 4 migrations
let rebind_count = 4;

model.set_delay(rtt / 2);

let expected_paths = Arc::new(Mutex::new(vec![]));
let expected_paths_pub = expected_paths.clone();

let on_socket = move |socket: io::Socket| {
spawn(async move {
let mut local_addr = socket.local_addr().unwrap();
for _ in 0..rebind_count {
local_addr = on_rebind(local_addr);
delay(rebind_rate).await;
if let Ok(mut paths) = expected_paths_pub.lock() {
paths.push(local_addr);
}
socket.rebind(local_addr);
}
});
};

let active_paths = recorder::ActivePathUpdated::new();
let active_path_sub = active_paths.clone();

test(model, move |handle| {
let server = Server::builder()
.with_io(handle.builder().build()?)?
.with_tls(SERVER_CERTS)?
.with_event((events(), active_path_sub))?
.start()?;

let client_io = handle.builder().on_socket(on_socket).build()?;

let client = Client::builder()
.with_io(client_io)?
.with_tls(certificates::CERT_PEM)?
.with_event(events())?
.start()?;

let addr = start_server(server)?;
primary::spawn(async move {
let connect = Connect::new(addr).with_server_name("localhost");
let mut conn = client.connect(connect).await.unwrap();
let mut stream = conn.open_bidirectional_stream().await.unwrap();

stream.send(Bytes::from_static(b"A")).await.unwrap();

delay(rebind_rate / 2).await;

for _ in 0..rebind_count {
stream.send(Bytes::from_static(b"B")).await.unwrap();
delay(rebind_rate).await;
}

stream.finish().unwrap();

let chunk = stream
.receive()
.await
.unwrap()
.expect("a chunk should be available");
assert_eq!(&chunk[..], &b"ABBBB"[..]);

assert!(
stream.receive().await.unwrap().is_none(),
"stream should be finished"
);
});

Ok(addr)
})
.unwrap();

assert_eq!(
&*active_paths.events().lock().unwrap(),
&*expected_paths.lock().unwrap()
);
}

/// Rebinds the IP of an address
fn rebind_ip(mut addr: SocketAddr) -> SocketAddr {
let ip = match addr.ip() {
std::net::IpAddr::V4(ip) => {
let mut v = u32::from_be_bytes(ip.octets());
v += 1;
std::net::Ipv4Addr::from(v).into()
}
std::net::IpAddr::V6(ip) => {
let mut v = u128::from_be_bytes(ip.octets());
v += 1;
std::net::Ipv6Addr::from(v).into()
}
};
addr.set_ip(ip);
addr
}

/// Rebinds the port of an address
fn rebind_port(mut addr: SocketAddr) -> SocketAddr {
let port = addr.port() + 1;
addr.set_port(port);
addr
}

#[test]
fn ip_rebind_test() {
run_test(rebind_ip);
}

#[test]
fn port_rebind_test() {
run_test(rebind_port);
}

#[test]
fn ip_and_port_rebind_test() {
run_test(|addr| rebind_ip(rebind_port(addr)));
}
10 changes: 10 additions & 0 deletions quic/s2n-quic/src/tests/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,13 @@ event_recorder!(
HandshakeStatusUpdated,
on_handshake_status_updated
);
event_recorder!(
ActivePathUpdated,
ActivePathUpdated,
on_active_path_updated,
SocketAddr,
|event: &events::ActivePathUpdated, storage: &mut Vec<SocketAddr>| {
let addr = (&event.active.remote_addr).into();
storage.push(addr);
}
);

0 comments on commit b1b5470

Please sign in to comment.