Skip to content

Commit

Permalink
fix(s2n-quic-transport): send the largest acked for the current path …
Browse files Browse the repository at this point in the history
…to the congestion controller (#1996)
  • Loading branch information
WesleyRosenblum authored Oct 12, 2023
1 parent b955c59 commit 14d5703
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 95 deletions.
16 changes: 15 additions & 1 deletion quic/s2n-quic-core/src/path/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::{
event,
inet::{SocketAddress, SocketAddressV4, SocketAddressV6},
inet::{IpV4Address, IpV6Address, SocketAddress, SocketAddressV4, SocketAddressV6},
};
use core::{
convert::{TryFrom, TryInto},
Expand Down Expand Up @@ -127,6 +127,20 @@ macro_rules! impl_addr {
#[cfg_attr(any(test, feature = "generator"), derive(TypeGenerator))]
pub struct $name(pub SocketAddress);

impl From<event::api::SocketAddress<'_>> for $name {
#[inline]
fn from(value: event::api::SocketAddress<'_>) -> Self {
match value {
event::api::SocketAddress::IpV4 { ip, port } => {
$name(IpV4Address::new(*ip).with_port(port).into())
}
event::api::SocketAddress::IpV6 { ip, port } => {
$name(IpV6Address::new(*ip).with_port(port).into())
}
}
}
}

impl From<SocketAddress> for $name {
#[inline]
fn from(value: SocketAddress) -> Self {
Expand Down
33 changes: 26 additions & 7 deletions quic/s2n-quic-core/src/recovery/congestion_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ pub mod testing {

pub mod mock {
use super::*;
use crate::path::RemoteAddress;

#[derive(Debug, Default)]
pub struct Endpoint {}
Expand All @@ -394,15 +395,16 @@ pub mod testing {

fn new_congestion_controller(
&mut self,
_path_info: super::PathInfo,
path_info: super::PathInfo,
) -> Self::CongestionController {
CongestionController::default()
CongestionController::new(path_info.remote_address.into())
}
}

/// Returning this instead of a `()` ensures the information gets passed back in testing
#[derive(Clone, Copy, Debug, Default)]
pub struct PacketInfo(());
pub struct PacketInfo {
remote_address: RemoteAddress,
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub struct CongestionController {
Expand All @@ -419,6 +421,7 @@ pub mod testing {
pub loss_bursts: u32,
pub app_limited: Option<bool>,
pub slow_start: bool,
pub remote_address: RemoteAddress,
}

impl Default for CongestionController {
Expand All @@ -437,6 +440,16 @@ pub mod testing {
loss_bursts: 0,
app_limited: None,
slow_start: true,
remote_address: RemoteAddress::default(),
}
}
}

impl CongestionController {
pub fn new(remote_address: RemoteAddress) -> Self {
Self {
remote_address,
..Default::default()
}
}
}
Expand Down Expand Up @@ -471,7 +484,9 @@ pub mod testing {
self.bytes_in_flight += bytes_sent as u32;
self.requires_fast_retransmission = false;
self.app_limited = app_limited;
PacketInfo(())
PacketInfo {
remote_address: self.remote_address,
}
}

fn on_rtt_update<Pub: Publisher>(
Expand All @@ -488,25 +503,29 @@ pub mod testing {
&mut self,
_newest_acked_time_sent: Timestamp,
_sent_bytes: usize,
_newest_acked_packet_info: Self::PacketInfo,
newest_acked_packet_info: Self::PacketInfo,
_rtt_estimator: &RttEstimator,
_random_generator: &mut dyn random::Generator,
_ack_receive_time: Timestamp,
_publisher: &mut Pub,
) {
assert_eq!(self.remote_address, newest_acked_packet_info.remote_address);

self.on_packet_ack += 1;
}

fn on_packet_lost<Pub: Publisher>(
&mut self,
lost_bytes: u32,
_packet_info: Self::PacketInfo,
packet_info: Self::PacketInfo,
persistent_congestion: bool,
new_loss_burst: bool,
_random_generator: &mut dyn random::Generator,
_timestamp: Timestamp,
_publisher: &mut Pub,
) {
assert_eq!(self.remote_address, packet_info.remote_address);

self.bytes_in_flight = self.bytes_in_flight.saturating_sub(lost_bytes);
self.lost_bytes += lost_bytes;
self.persistent_congestion = Some(persistent_congestion);
Expand Down
23 changes: 13 additions & 10 deletions quic/s2n-quic-transport/src/recovery/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,9 +439,8 @@ impl<Config: endpoint::Config> Manager<Config> {
}
};

let mut newly_acked_packets = SmallVec::<
[SentPacketInfo<packet_info_type!()>; ACKED_PACKETS_INITIAL_CAPACITY],
>::new();
let mut newly_acked_packets =
SmallVec::<[PacketDetails<packet_info_type!()>; ACKED_PACKETS_INITIAL_CAPACITY]>::new();
let (largest_newly_acked, includes_ack_eliciting) = self.process_ack_range(
&mut newly_acked_packets,
timestamp,
Expand Down Expand Up @@ -469,10 +468,8 @@ impl<Config: endpoint::Config> Manager<Config> {
publisher,
);

let (_, largest_newly_acked_info) = largest_newly_acked;
self.process_new_acked_packets(
&newly_acked_packets,
largest_newly_acked_info,
acked_new_largest_packet,
timestamp,
ecn_counts,
Expand All @@ -493,7 +490,7 @@ impl<Config: endpoint::Config> Manager<Config> {
fn process_ack_range<Ctx: Context<Config>, Pub: event::ConnectionPublisher>(
&mut self,
newly_acked_packets: &mut SmallVec<
[SentPacketInfo<packet_info_type!()>; ACKED_PACKETS_INITIAL_CAPACITY],
[PacketDetails<packet_info_type!()>; ACKED_PACKETS_INITIAL_CAPACITY],
>,
timestamp: Timestamp,
packet_number: PacketNumber,
Expand Down Expand Up @@ -528,7 +525,7 @@ impl<Config: endpoint::Config> Manager<Config> {
let mut newly_acked_range: Option<(PacketNumber, PacketNumber)> = None;

for (packet_number, acked_packet_info) in self.sent_packets.remove_range(pn_range) {
newly_acked_packets.push(acked_packet_info);
newly_acked_packets.push((packet_number, acked_packet_info));

if largest_newly_acked.map_or(true, |(pn, _)| packet_number > pn) {
largest_newly_acked = Some((packet_number, acked_packet_info));
Expand Down Expand Up @@ -629,9 +626,8 @@ impl<Config: endpoint::Config> Manager<Config> {
fn process_new_acked_packets<Ctx: Context<Config>, Pub: event::ConnectionPublisher>(
&mut self,
newly_acked_packets: &SmallVec<
[SentPacketInfo<packet_info_type!()>; ACKED_PACKETS_INITIAL_CAPACITY],
[PacketDetails<packet_info_type!()>; ACKED_PACKETS_INITIAL_CAPACITY],
>,
largest_newly_acked: SentPacketInfo<packet_info_type!()>,
new_largest_packet: bool,
timestamp: Timestamp,
ecn_counts: Option<EcnCounts>,
Expand All @@ -648,16 +644,21 @@ impl<Config: endpoint::Config> Manager<Config> {
let current_path_id = context.path_id();
let is_handshake_confirmed = context.is_handshake_confirmed();
let mut current_path_acked_bytes = 0;
let mut current_path_largest_newly_acked = None;
let mut newly_acked_ecn_counts = EcnCounts::default();

for acked_packet_info in newly_acked_packets {
for (packet_number, acked_packet_info) in newly_acked_packets {
let path = context.path_mut_by_id(acked_packet_info.path_id);

let sent_bytes = acked_packet_info.sent_bytes as usize;
newly_acked_ecn_counts.increment(acked_packet_info.ecn);

if acked_packet_info.path_id == current_path_id {
current_path_acked_bytes += sent_bytes;

if current_path_largest_newly_acked.map_or(true, |(pn, _)| packet_number > pn) {
current_path_largest_newly_acked = Some((packet_number, acked_packet_info));
}
} else if sent_bytes > 0 {
path.congestion_controller.on_ack(
acked_packet_info.time_sent,
Expand Down Expand Up @@ -718,6 +719,8 @@ impl<Config: endpoint::Config> Manager<Config> {
}

if current_path_acked_bytes > 0 {
let (_, largest_newly_acked) = current_path_largest_newly_acked
.expect("At least some bytes were acknowledged on the current path");
let path = context.path_mut();
path.congestion_controller.on_ack(
largest_newly_acked.time_sent,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
---
source: quic/s2n-quic-transport/src/recovery/manager/tests.rs
assertion_line: 668
expression: ""
---
PathCreated { active: Path { local_addr: 0.0.0.0:0, local_cid: 0x4c6f63616c4900000000000000004c6f63616c49, remote_addr: 127.0.0.1:80, remote_cid: 0x5065657249640000000000000000506565724964, id: 0, is_active: true }, new: Path { local_addr: 0.0.0.0:0, local_cid: 0x4c6f63616c4900000000000000004c6f63616c49, remote_addr: 127.0.0.2:80, remote_cid: 0x5065657249640000000000000000506565724964, id: 1, is_active: false } }
MtuUpdated { path_id: 1, mtu: 1200, cause: NewPath }
AckRangeReceived { packet_header: OneRtt { number: 1 }, path: Path { local_addr: 0.0.0.0:0, local_cid: 0x4c6f63616c4900000000000000004c6f63616c49, remote_addr: 127.0.0.1:80, remote_cid: 0x5065657249640000000000000000506565724964, id: 0, is_active: true }, ack_range: 1..=1 }
RecoveryMetrics { path: Path { local_addr: 0.0.0.0:0, local_cid: 0x4c6f63616c4900000000000000004c6f63616c49, remote_addr: 127.0.0.1:80, remote_cid: 0x5065657249640000000000000000506565724964, id: 0, is_active: true }, min_rtt: 500ms, smoothed_rtt: 500ms, latest_rtt: 500ms, rtt_variance: 250ms, max_ack_delay: 100ms, pto_count: 0, congestion_window: 15000, bytes_in_flight: 128, congestion_limited: false }
AckRangeReceived { packet_header: OneRtt { number: 2 }, path: Path { local_addr: 0.0.0.0:0, local_cid: 0x4c6f63616c4900000000000000004c6f63616c49, remote_addr: 127.0.0.1:80, remote_cid: 0x5065657249640000000000000000506565724964, id: 0, is_active: true }, ack_range: 2..=2 }
RecoveryMetrics { path: Path { local_addr: 0.0.0.0:0, local_cid: 0x4c6f63616c4900000000000000004c6f63616c49, remote_addr: 127.0.0.1:80, remote_cid: 0x5065657249640000000000000000506565724964, id: 0, is_active: true }, min_rtt: 500ms, smoothed_rtt: 500ms, latest_rtt: 500ms, rtt_variance: 250ms, max_ack_delay: 100ms, pto_count: 0, congestion_window: 15000, bytes_in_flight: 128, congestion_limited: false }
AckRangeReceived { packet_header: OneRtt { number: 1 }, path: Path { local_addr: 0.0.0.0:0, local_cid: 0x4c6f63616c4900000000000000004c6f63616c49, remote_addr: 127.0.0.1:80, remote_cid: 0x5065657249640000000000000000506565724964, id: 0, is_active: true }, ack_range: 1..=2 }
RecoveryMetrics { path: Path { local_addr: 0.0.0.0:0, local_cid: 0x4c6f63616c4900000000000000004c6f63616c49, remote_addr: 127.0.0.1:80, remote_cid: 0x5065657249640000000000000000506565724964, id: 0, is_active: true }, min_rtt: 333ms, smoothed_rtt: 333ms, latest_rtt: 333ms, rtt_variance: 166.5ms, max_ack_delay: 100ms, pto_count: 0, congestion_window: 15000, bytes_in_flight: 128, congestion_limited: false }
114 changes: 42 additions & 72 deletions quic/s2n-quic-transport/src/recovery/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,25 +692,18 @@ fn process_new_acked_packets_update_pto_timer() {
#[test]
// congestion_controller.on_packet_ack should be updated for the path the packet was sent on
//
// Setup 1:
// Setup:
// - create path manager with two validated paths
// - send a packet on each path
// - packet 1 on path 1
// - packet 2 on path 2
//
// Trigger 1:
// - send ack for packet 1 on path 1
//
// Expectation 1:
// - cc.on_packet_ack should be updated for first_path
// - cc.on_packet_ack should not be updated for second_path
//
// Trigger 2:
// - send ack for packet 2 on path 2
// Trigger:
// - send ack for packet 1 and 2 on path 1
//
// Expectation 2:
// - cc.on_packet_ack should not be updated for first_path
// - cc.on_packet_ack should be updated for second_path
// Expectation:
// - cc.on_packet_ack should be incremented once for the first_path
// - cc.on_packet_ack should be incremented once for the second_path
fn process_new_acked_packets_congestion_controller() {
// Setup:
let space = PacketNumberSpace::ApplicationData;
Expand Down Expand Up @@ -757,40 +750,11 @@ fn process_new_acked_packets_congestion_controller() {
&mut publisher,
);

// Trigger 1:
// Ack packet 1 on path 1
let ack_receive_time = time_sent + Duration::from_millis(500);
helper_ack_packets_on_path(
1..=1,
ack_receive_time,
&mut context,
&mut manager,
first_addr,
None,
&mut publisher,
);

// Expectation 1:
assert_eq!(
context
.path_by_id(first_path_id)
.congestion_controller
.on_packet_ack,
1
);
assert_eq!(
context
.path_by_id(second_path_id)
.congestion_controller
.on_packet_ack,
0
);

// Trigger 2:
// Ack packet 2 on path 1
// Trigger:
// Ack packets 1 and 2 on path 1
let ack_receive_time = time_sent + Duration::from_millis(500);
helper_ack_packets_on_path(
2..=2,
1..=2,
ack_receive_time,
&mut context,
&mut manager,
Expand All @@ -799,7 +763,7 @@ fn process_new_acked_packets_congestion_controller() {
&mut publisher,
);

// Expectation 2:
// Expectation:
assert_eq!(
context
.path_by_id(first_path_id)
Expand Down Expand Up @@ -1837,31 +1801,37 @@ fn remove_lost_packets_persistent_congestion_path_aware() {
space.new_packet_number(VarInt::from_u8(9)),
space.new_packet_number(VarInt::from_u8(10)),
);
manager.sent_packets.insert(
space.new_packet_number(VarInt::from_u8(9)),
SentPacketInfo::new(
true,
1,
now,
AckElicitation::Eliciting,
first_path_id,
ecn,
transmission::Mode::Normal,
Default::default(),
),
context.set_path_id(first_path_id);
manager.on_packet_sent(
sent_packets_to_remove.start(),
Outcome {
ack_elicitation: AckElicitation::Eliciting,
is_congestion_controlled: true,
bytes_sent: 1,
bytes_progressed: 0,
},
now,
ecn,
transmission::Mode::Normal,
None,
&mut context,
&mut publisher,
);
manager.sent_packets.insert(
space.new_packet_number(VarInt::from_u8(10)),
SentPacketInfo::new(
true,
1,
now,
AckElicitation::Eliciting,
second_path_id,
ecn,
transmission::Mode::Normal,
Default::default(),
),
context.set_path_id(second_path_id);
manager.on_packet_sent(
sent_packets_to_remove.end(),
Outcome {
ack_elicitation: AckElicitation::Eliciting,
is_congestion_controlled: true,
bytes_sent: 1,
bytes_progressed: 0,
},
now,
ecn,
transmission::Mode::Normal,
None,
&mut context,
&mut publisher,
);

// Trigger:
Expand Down Expand Up @@ -3530,7 +3500,7 @@ fn helper_generate_path_manager_with_first_addr(
connection::PeerId::TEST_ID,
connection::LocalId::TEST_ID,
RttEstimator::new(max_ack_delay),
MockCongestionController::default(),
MockCongestionController::new(first_addr),
true,
DEFAULT_MAX_MTU,
);
Expand All @@ -3551,7 +3521,7 @@ fn helper_generate_client_path_manager(
connection::PeerId::TEST_ID,
connection::LocalId::TEST_ID,
RttEstimator::new(max_ack_delay),
MockCongestionController::default(),
MockCongestionController::new(first_addr),
false,
DEFAULT_MAX_MTU,
);
Expand Down

0 comments on commit 14d5703

Please sign in to comment.