From 14d5703e66492312d87c0baa3c5fc66053f96f98 Mon Sep 17 00:00:00 2001 From: Wesley Rosenblum <55108558+WesleyRosenblum@users.noreply.github.com> Date: Thu, 12 Oct 2023 13:45:43 -0700 Subject: [PATCH] fix(s2n-quic-transport): send the largest acked for the current path to the congestion controller (#1996) --- quic/s2n-quic-core/src/path/mod.rs | 16 ++- .../src/recovery/congestion_controller.rs | 33 +++-- .../src/recovery/manager.rs | 23 ++-- ...w_acked_packets_congestion_controller.snap | 7 +- .../src/recovery/manager/tests.rs | 114 +++++++----------- 5 files changed, 98 insertions(+), 95 deletions(-) diff --git a/quic/s2n-quic-core/src/path/mod.rs b/quic/s2n-quic-core/src/path/mod.rs index 349c3eed7e..99ebdc1a79 100644 --- a/quic/s2n-quic-core/src/path/mod.rs +++ b/quic/s2n-quic-core/src/path/mod.rs @@ -3,7 +3,7 @@ use crate::{ event, - inet::{SocketAddress, SocketAddressV4, SocketAddressV6}, + inet::{IpV4Address, IpV6Address, SocketAddress, SocketAddressV4, SocketAddressV6}, }; use core::{ convert::{TryFrom, TryInto}, @@ -127,6 +127,20 @@ macro_rules! impl_addr { #[cfg_attr(any(test, feature = "generator"), derive(TypeGenerator))] pub struct $name(pub SocketAddress); + impl From> for $name { + #[inline] + fn from(value: event::api::SocketAddress<'_>) -> Self { + match value { + event::api::SocketAddress::IpV4 { ip, port } => { + $name(IpV4Address::new(*ip).with_port(port).into()) + } + event::api::SocketAddress::IpV6 { ip, port } => { + $name(IpV6Address::new(*ip).with_port(port).into()) + } + } + } + } + impl From for $name { #[inline] fn from(value: SocketAddress) -> Self { diff --git a/quic/s2n-quic-core/src/recovery/congestion_controller.rs b/quic/s2n-quic-core/src/recovery/congestion_controller.rs index e7dcc67276..17cffe9fce 100644 --- a/quic/s2n-quic-core/src/recovery/congestion_controller.rs +++ b/quic/s2n-quic-core/src/recovery/congestion_controller.rs @@ -385,6 +385,7 @@ pub mod testing { pub mod mock { use super::*; + use crate::path::RemoteAddress; #[derive(Debug, Default)] pub struct Endpoint {} @@ -394,15 +395,16 @@ pub mod testing { fn new_congestion_controller( &mut self, - _path_info: super::PathInfo, + path_info: super::PathInfo, ) -> Self::CongestionController { - CongestionController::default() + CongestionController::new(path_info.remote_address.into()) } } - /// Returning this instead of a `()` ensures the information gets passed back in testing #[derive(Clone, Copy, Debug, Default)] - pub struct PacketInfo(()); + pub struct PacketInfo { + remote_address: RemoteAddress, + } #[derive(Clone, Copy, Debug, PartialEq)] pub struct CongestionController { @@ -419,6 +421,7 @@ pub mod testing { pub loss_bursts: u32, pub app_limited: Option, pub slow_start: bool, + pub remote_address: RemoteAddress, } impl Default for CongestionController { @@ -437,6 +440,16 @@ pub mod testing { loss_bursts: 0, app_limited: None, slow_start: true, + remote_address: RemoteAddress::default(), + } + } + } + + impl CongestionController { + pub fn new(remote_address: RemoteAddress) -> Self { + Self { + remote_address, + ..Default::default() } } } @@ -471,7 +484,9 @@ pub mod testing { self.bytes_in_flight += bytes_sent as u32; self.requires_fast_retransmission = false; self.app_limited = app_limited; - PacketInfo(()) + PacketInfo { + remote_address: self.remote_address, + } } fn on_rtt_update( @@ -488,25 +503,29 @@ pub mod testing { &mut self, _newest_acked_time_sent: Timestamp, _sent_bytes: usize, - _newest_acked_packet_info: Self::PacketInfo, + newest_acked_packet_info: Self::PacketInfo, _rtt_estimator: &RttEstimator, _random_generator: &mut dyn random::Generator, _ack_receive_time: Timestamp, _publisher: &mut Pub, ) { + assert_eq!(self.remote_address, newest_acked_packet_info.remote_address); + self.on_packet_ack += 1; } fn on_packet_lost( &mut self, lost_bytes: u32, - _packet_info: Self::PacketInfo, + packet_info: Self::PacketInfo, persistent_congestion: bool, new_loss_burst: bool, _random_generator: &mut dyn random::Generator, _timestamp: Timestamp, _publisher: &mut Pub, ) { + assert_eq!(self.remote_address, packet_info.remote_address); + self.bytes_in_flight = self.bytes_in_flight.saturating_sub(lost_bytes); self.lost_bytes += lost_bytes; self.persistent_congestion = Some(persistent_congestion); diff --git a/quic/s2n-quic-transport/src/recovery/manager.rs b/quic/s2n-quic-transport/src/recovery/manager.rs index ffb87c77b7..ed051e4161 100644 --- a/quic/s2n-quic-transport/src/recovery/manager.rs +++ b/quic/s2n-quic-transport/src/recovery/manager.rs @@ -439,9 +439,8 @@ impl Manager { } }; - let mut newly_acked_packets = SmallVec::< - [SentPacketInfo; ACKED_PACKETS_INITIAL_CAPACITY], - >::new(); + let mut newly_acked_packets = + SmallVec::<[PacketDetails; ACKED_PACKETS_INITIAL_CAPACITY]>::new(); let (largest_newly_acked, includes_ack_eliciting) = self.process_ack_range( &mut newly_acked_packets, timestamp, @@ -469,10 +468,8 @@ impl Manager { publisher, ); - let (_, largest_newly_acked_info) = largest_newly_acked; self.process_new_acked_packets( &newly_acked_packets, - largest_newly_acked_info, acked_new_largest_packet, timestamp, ecn_counts, @@ -493,7 +490,7 @@ impl Manager { fn process_ack_range, Pub: event::ConnectionPublisher>( &mut self, newly_acked_packets: &mut SmallVec< - [SentPacketInfo; ACKED_PACKETS_INITIAL_CAPACITY], + [PacketDetails; ACKED_PACKETS_INITIAL_CAPACITY], >, timestamp: Timestamp, packet_number: PacketNumber, @@ -528,7 +525,7 @@ impl Manager { let mut newly_acked_range: Option<(PacketNumber, PacketNumber)> = None; for (packet_number, acked_packet_info) in self.sent_packets.remove_range(pn_range) { - newly_acked_packets.push(acked_packet_info); + newly_acked_packets.push((packet_number, acked_packet_info)); if largest_newly_acked.map_or(true, |(pn, _)| packet_number > pn) { largest_newly_acked = Some((packet_number, acked_packet_info)); @@ -629,9 +626,8 @@ impl Manager { fn process_new_acked_packets, Pub: event::ConnectionPublisher>( &mut self, newly_acked_packets: &SmallVec< - [SentPacketInfo; ACKED_PACKETS_INITIAL_CAPACITY], + [PacketDetails; ACKED_PACKETS_INITIAL_CAPACITY], >, - largest_newly_acked: SentPacketInfo, new_largest_packet: bool, timestamp: Timestamp, ecn_counts: Option, @@ -648,9 +644,10 @@ impl Manager { let current_path_id = context.path_id(); let is_handshake_confirmed = context.is_handshake_confirmed(); let mut current_path_acked_bytes = 0; + let mut current_path_largest_newly_acked = None; let mut newly_acked_ecn_counts = EcnCounts::default(); - for acked_packet_info in newly_acked_packets { + for (packet_number, acked_packet_info) in newly_acked_packets { let path = context.path_mut_by_id(acked_packet_info.path_id); let sent_bytes = acked_packet_info.sent_bytes as usize; @@ -658,6 +655,10 @@ impl Manager { if acked_packet_info.path_id == current_path_id { current_path_acked_bytes += sent_bytes; + + if current_path_largest_newly_acked.map_or(true, |(pn, _)| packet_number > pn) { + current_path_largest_newly_acked = Some((packet_number, acked_packet_info)); + } } else if sent_bytes > 0 { path.congestion_controller.on_ack( acked_packet_info.time_sent, @@ -718,6 +719,8 @@ impl Manager { } if current_path_acked_bytes > 0 { + let (_, largest_newly_acked) = current_path_largest_newly_acked + .expect("At least some bytes were acknowledged on the current path"); let path = context.path_mut(); path.congestion_controller.on_ack( largest_newly_acked.time_sent, diff --git a/quic/s2n-quic-transport/src/recovery/manager/snapshots/quic__s2n-quic-transport__src__recovery__manager__tests__events__process_new_acked_packets_congestion_controller.snap b/quic/s2n-quic-transport/src/recovery/manager/snapshots/quic__s2n-quic-transport__src__recovery__manager__tests__events__process_new_acked_packets_congestion_controller.snap index 4503c3f173..283178b60b 100644 --- a/quic/s2n-quic-transport/src/recovery/manager/snapshots/quic__s2n-quic-transport__src__recovery__manager__tests__events__process_new_acked_packets_congestion_controller.snap +++ b/quic/s2n-quic-transport/src/recovery/manager/snapshots/quic__s2n-quic-transport__src__recovery__manager__tests__events__process_new_acked_packets_congestion_controller.snap @@ -1,11 +1,8 @@ --- source: quic/s2n-quic-transport/src/recovery/manager/tests.rs -assertion_line: 668 expression: "" --- PathCreated { active: Path { local_addr: 0.0.0.0:0, local_cid: 0x4c6f63616c4900000000000000004c6f63616c49, remote_addr: 127.0.0.1:80, remote_cid: 0x5065657249640000000000000000506565724964, id: 0, is_active: true }, new: Path { local_addr: 0.0.0.0:0, local_cid: 0x4c6f63616c4900000000000000004c6f63616c49, remote_addr: 127.0.0.2:80, remote_cid: 0x5065657249640000000000000000506565724964, id: 1, is_active: false } } MtuUpdated { path_id: 1, mtu: 1200, cause: NewPath } -AckRangeReceived { packet_header: OneRtt { number: 1 }, path: Path { local_addr: 0.0.0.0:0, local_cid: 0x4c6f63616c4900000000000000004c6f63616c49, remote_addr: 127.0.0.1:80, remote_cid: 0x5065657249640000000000000000506565724964, id: 0, is_active: true }, ack_range: 1..=1 } -RecoveryMetrics { path: Path { local_addr: 0.0.0.0:0, local_cid: 0x4c6f63616c4900000000000000004c6f63616c49, remote_addr: 127.0.0.1:80, remote_cid: 0x5065657249640000000000000000506565724964, id: 0, is_active: true }, min_rtt: 500ms, smoothed_rtt: 500ms, latest_rtt: 500ms, rtt_variance: 250ms, max_ack_delay: 100ms, pto_count: 0, congestion_window: 15000, bytes_in_flight: 128, congestion_limited: false } -AckRangeReceived { packet_header: OneRtt { number: 2 }, path: Path { local_addr: 0.0.0.0:0, local_cid: 0x4c6f63616c4900000000000000004c6f63616c49, remote_addr: 127.0.0.1:80, remote_cid: 0x5065657249640000000000000000506565724964, id: 0, is_active: true }, ack_range: 2..=2 } -RecoveryMetrics { path: Path { local_addr: 0.0.0.0:0, local_cid: 0x4c6f63616c4900000000000000004c6f63616c49, remote_addr: 127.0.0.1:80, remote_cid: 0x5065657249640000000000000000506565724964, id: 0, is_active: true }, min_rtt: 500ms, smoothed_rtt: 500ms, latest_rtt: 500ms, rtt_variance: 250ms, max_ack_delay: 100ms, pto_count: 0, congestion_window: 15000, bytes_in_flight: 128, congestion_limited: false } +AckRangeReceived { packet_header: OneRtt { number: 1 }, path: Path { local_addr: 0.0.0.0:0, local_cid: 0x4c6f63616c4900000000000000004c6f63616c49, remote_addr: 127.0.0.1:80, remote_cid: 0x5065657249640000000000000000506565724964, id: 0, is_active: true }, ack_range: 1..=2 } +RecoveryMetrics { path: Path { local_addr: 0.0.0.0:0, local_cid: 0x4c6f63616c4900000000000000004c6f63616c49, remote_addr: 127.0.0.1:80, remote_cid: 0x5065657249640000000000000000506565724964, id: 0, is_active: true }, min_rtt: 333ms, smoothed_rtt: 333ms, latest_rtt: 333ms, rtt_variance: 166.5ms, max_ack_delay: 100ms, pto_count: 0, congestion_window: 15000, bytes_in_flight: 128, congestion_limited: false } diff --git a/quic/s2n-quic-transport/src/recovery/manager/tests.rs b/quic/s2n-quic-transport/src/recovery/manager/tests.rs index 7f2e632986..d9e8820185 100644 --- a/quic/s2n-quic-transport/src/recovery/manager/tests.rs +++ b/quic/s2n-quic-transport/src/recovery/manager/tests.rs @@ -692,25 +692,18 @@ fn process_new_acked_packets_update_pto_timer() { #[test] // congestion_controller.on_packet_ack should be updated for the path the packet was sent on // -// Setup 1: +// Setup: // - create path manager with two validated paths // - send a packet on each path // - packet 1 on path 1 // - packet 2 on path 2 // -// Trigger 1: -// - send ack for packet 1 on path 1 -// -// Expectation 1: -// - cc.on_packet_ack should be updated for first_path -// - cc.on_packet_ack should not be updated for second_path -// -// Trigger 2: -// - send ack for packet 2 on path 2 +// Trigger: +// - send ack for packet 1 and 2 on path 1 // -// Expectation 2: -// - cc.on_packet_ack should not be updated for first_path -// - cc.on_packet_ack should be updated for second_path +// Expectation: +// - cc.on_packet_ack should be incremented once for the first_path +// - cc.on_packet_ack should be incremented once for the second_path fn process_new_acked_packets_congestion_controller() { // Setup: let space = PacketNumberSpace::ApplicationData; @@ -757,40 +750,11 @@ fn process_new_acked_packets_congestion_controller() { &mut publisher, ); - // Trigger 1: - // Ack packet 1 on path 1 - let ack_receive_time = time_sent + Duration::from_millis(500); - helper_ack_packets_on_path( - 1..=1, - ack_receive_time, - &mut context, - &mut manager, - first_addr, - None, - &mut publisher, - ); - - // Expectation 1: - assert_eq!( - context - .path_by_id(first_path_id) - .congestion_controller - .on_packet_ack, - 1 - ); - assert_eq!( - context - .path_by_id(second_path_id) - .congestion_controller - .on_packet_ack, - 0 - ); - - // Trigger 2: - // Ack packet 2 on path 1 + // Trigger: + // Ack packets 1 and 2 on path 1 let ack_receive_time = time_sent + Duration::from_millis(500); helper_ack_packets_on_path( - 2..=2, + 1..=2, ack_receive_time, &mut context, &mut manager, @@ -799,7 +763,7 @@ fn process_new_acked_packets_congestion_controller() { &mut publisher, ); - // Expectation 2: + // Expectation: assert_eq!( context .path_by_id(first_path_id) @@ -1837,31 +1801,37 @@ fn remove_lost_packets_persistent_congestion_path_aware() { space.new_packet_number(VarInt::from_u8(9)), space.new_packet_number(VarInt::from_u8(10)), ); - manager.sent_packets.insert( - space.new_packet_number(VarInt::from_u8(9)), - SentPacketInfo::new( - true, - 1, - now, - AckElicitation::Eliciting, - first_path_id, - ecn, - transmission::Mode::Normal, - Default::default(), - ), + context.set_path_id(first_path_id); + manager.on_packet_sent( + sent_packets_to_remove.start(), + Outcome { + ack_elicitation: AckElicitation::Eliciting, + is_congestion_controlled: true, + bytes_sent: 1, + bytes_progressed: 0, + }, + now, + ecn, + transmission::Mode::Normal, + None, + &mut context, + &mut publisher, ); - manager.sent_packets.insert( - space.new_packet_number(VarInt::from_u8(10)), - SentPacketInfo::new( - true, - 1, - now, - AckElicitation::Eliciting, - second_path_id, - ecn, - transmission::Mode::Normal, - Default::default(), - ), + context.set_path_id(second_path_id); + manager.on_packet_sent( + sent_packets_to_remove.end(), + Outcome { + ack_elicitation: AckElicitation::Eliciting, + is_congestion_controlled: true, + bytes_sent: 1, + bytes_progressed: 0, + }, + now, + ecn, + transmission::Mode::Normal, + None, + &mut context, + &mut publisher, ); // Trigger: @@ -3530,7 +3500,7 @@ fn helper_generate_path_manager_with_first_addr( connection::PeerId::TEST_ID, connection::LocalId::TEST_ID, RttEstimator::new(max_ack_delay), - MockCongestionController::default(), + MockCongestionController::new(first_addr), true, DEFAULT_MAX_MTU, ); @@ -3551,7 +3521,7 @@ fn helper_generate_client_path_manager( connection::PeerId::TEST_ID, connection::LocalId::TEST_ID, RttEstimator::new(max_ack_delay), - MockCongestionController::default(), + MockCongestionController::new(first_addr), false, DEFAULT_MAX_MTU, );