From f0c42fbafefa9103bc6993a090a62fde6020fa60 Mon Sep 17 00:00:00 2001 From: Aperence Date: Wed, 13 Dec 2023 16:46:55 +0100 Subject: [PATCH] added support for cubic --- bob.bin | Bin 0 -> 125 bytes src/aioquic/quic/configuration.py | 2 +- src/aioquic/quic/congestion/base.py | 10 +- src/aioquic/quic/congestion/cubic.py | 216 +++++++++++++++++++++++++++ src/aioquic/quic/congestion/reno.py | 2 +- src/aioquic/quic/recovery.py | 9 +- tests/test_cubic.py | 68 +++++++++ 7 files changed, 297 insertions(+), 10 deletions(-) create mode 100644 bob.bin create mode 100644 src/aioquic/quic/congestion/cubic.py create mode 100644 tests/test_cubic.py diff --git a/bob.bin b/bob.bin new file mode 100644 index 0000000000000000000000000000000000000000..b9c24139db22baf498433c56de4045a36b4d4f86 GIT binary patch literal 125 zcmV-@0D}MU000012cWg)np) fnHetSX%xj`X_)K}Mj)#FLh;rxSc@lDS`yk^jTJa2 literal 0 HcmV?d00001 diff --git a/src/aioquic/quic/configuration.py b/src/aioquic/quic/configuration.py index 0bf919612..80738ea94 100644 --- a/src/aioquic/quic/configuration.py +++ b/src/aioquic/quic/configuration.py @@ -30,7 +30,7 @@ class QuicConfiguration: """ The name of the congestion control algorithm to use. - Currently supported algorithms: `"reno"`. + Currently supported algorithms: `"reno", `"cubic"`. """ connection_id_length: int = 8 diff --git a/src/aioquic/quic/congestion/base.py b/src/aioquic/quic/congestion/base.py index df6ca0028..9aa3d5d16 100644 --- a/src/aioquic/quic/congestion/base.py +++ b/src/aioquic/quic/congestion/base.py @@ -1,5 +1,5 @@ import abc -from typing import Dict, Iterable, Optional, Protocol +from typing import Any, Dict, Iterable, Optional, Protocol from ..packet_builder import QuicSentPacket @@ -21,7 +21,7 @@ def __init__(self, *, max_datagram_size: int) -> None: self.congestion_window = K_INITIAL_WINDOW * max_datagram_size @abc.abstractmethod - def on_packet_acked(self, *, packet: QuicSentPacket) -> None: + def on_packet_acked(self, *, now: float, packet: QuicSentPacket) -> None: ... # pragma: no cover @abc.abstractmethod @@ -40,6 +40,12 @@ def on_packets_lost(self, *, now: float, packets: Iterable[QuicSentPacket]) -> N def on_rtt_measurement(self, *, now: float, rtt: float) -> None: ... # pragma: no cover + def get_log_data(self) -> Dict[str, Any]: + data = {"cwnd": self.congestion_window} + if self.ssthresh is not None: + data["ssthresh"] = self.ssthresh + return data + class QuicCongestionControlFactory(Protocol): def __call__(self, *, max_datagram_size: int) -> QuicCongestionControl: diff --git a/src/aioquic/quic/congestion/cubic.py b/src/aioquic/quic/congestion/cubic.py new file mode 100644 index 000000000..62aaf0137 --- /dev/null +++ b/src/aioquic/quic/congestion/cubic.py @@ -0,0 +1,216 @@ +from typing import Any, Dict, Iterable + +from ..packet_builder import QuicSentPacket +from .base import ( + K_INITIAL_WINDOW, + K_MINIMUM_WINDOW, + QuicCongestionControl, + QuicRttMonitor, + register_congestion_control, +) + +# cubic specific variables (see https://www.rfc-editor.org/rfc/rfc9438.html#name-definitions) +K_CUBIC_C = 0.4 +K_CUBIC_LOSS_REDUCTION_FACTOR = 0.7 +K_CUBIC_MAX_IDLE_TIME = 2 # reset the cwnd after 2 seconds of inactivity + + +def better_cube_root(x: float) -> float: + if x < 0: + # avoid precision errors that make the cube root returns an imaginary number + return -((-x) ** (1.0 / 3.0)) + else: + return (x) ** (1.0 / 3.0) + + +class CubicCongestionControl(QuicCongestionControl): + """ + Cubic congestion control implementation for aioquic + """ + + def __init__(self, max_datagram_size: int) -> None: + super().__init__(max_datagram_size=max_datagram_size) + # increase by one segment + self.additive_increase_factor: int = max_datagram_size + self._max_datagram_size: int = max_datagram_size + self._congestion_recovery_start_time: float = 0.0 + + self._rtt_monitor = QuicRttMonitor() + + self.rtt: float = 0.02 # starting RTT is considered to be 20ms + + self.reset() + + self.last_ack: float = 0.0 + + def W_cubic(self, t) -> int: + W_max_segments = self._W_max / self._max_datagram_size + target_segments = K_CUBIC_C * (t - self.K) ** 3 + (W_max_segments) + return int(target_segments * self._max_datagram_size) + + def is_reno_friendly(self, t) -> bool: + return self.W_cubic(t) < self._W_est + + def is_concave(self) -> bool: + return self.congestion_window < self._W_max + + def reset(self) -> None: + self.congestion_window = K_INITIAL_WINDOW * self._max_datagram_size + self.ssthresh = None + + self._first_slow_start = True + self._starting_congestion_avoidance = False + self.K: float = 0.0 + self._W_est: int = 0 + self._cwnd_epoch: int = 0 + self._t_epoch: float = 0.0 + self._W_max = self.congestion_window + + def on_packet_acked(self, *, now: float, packet: QuicSentPacket) -> None: + self.bytes_in_flight -= packet.sent_bytes + self.last_ack = packet.sent_time + + if self.ssthresh is None or self.congestion_window < self.ssthresh: + # slow start + self.congestion_window += packet.sent_bytes + else: + # congestion avoidance + if self._first_slow_start and not self._starting_congestion_avoidance: + # exiting slow start without having a loss + self._first_slow_start = False + self._W_max = self.congestion_window + self._t_epoch = now + self._cwnd_epoch = self.congestion_window + self._W_est = self._cwnd_epoch + # calculate K + W_max_segments = self._W_max / self._max_datagram_size + cwnd_epoch_segments = self._cwnd_epoch / self._max_datagram_size + self.K = better_cube_root( + (W_max_segments - cwnd_epoch_segments) / K_CUBIC_C + ) + + # initialize the variables used at start of congestion avoidance + if self._starting_congestion_avoidance: + self._starting_congestion_avoidance = False + self._first_slow_start = False + self._t_epoch = now + self._cwnd_epoch = self.congestion_window + self._W_est = self._cwnd_epoch + # calculate K + W_max_segments = self._W_max / self._max_datagram_size + cwnd_epoch_segments = self._cwnd_epoch / self._max_datagram_size + self.K = better_cube_root( + (W_max_segments - cwnd_epoch_segments) / K_CUBIC_C + ) + + self._W_est = int( + self._W_est + + self.additive_increase_factor + * (packet.sent_bytes / self.congestion_window) + ) + + t = now - self._t_epoch + + target: int = 0 + W_cubic = self.W_cubic(t + self.rtt) + if W_cubic < self.congestion_window: + target = self.congestion_window + elif W_cubic > 1.5 * self.congestion_window: + target = int(self.congestion_window * 1.5) + else: + target = W_cubic + + if self.is_reno_friendly(t): + # reno friendly region of cubic + # (https://www.rfc-editor.org/rfc/rfc9438.html#name-reno-friendly-region) + self.congestion_window = self._W_est + elif self.is_concave(): + # concave region of cubic + # (https://www.rfc-editor.org/rfc/rfc9438.html#name-concave-region) + self.congestion_window = int( + self.congestion_window + + ( + (target - self.congestion_window) + * (self._max_datagram_size / self.congestion_window) + ) + ) + else: + # convex region of cubic + # (https://www.rfc-editor.org/rfc/rfc9438.html#name-convex-region) + self.congestion_window = int( + self.congestion_window + + ( + (target - self.congestion_window) + * (self._max_datagram_size / self.congestion_window) + ) + ) + + def on_packet_sent(self, *, packet: QuicSentPacket) -> None: + self.bytes_in_flight += packet.sent_bytes + if self.last_ack != 0.0: + return + elapsed_idle = packet.sent_time - self.last_ack + if elapsed_idle >= K_CUBIC_MAX_IDLE_TIME: + self.reset() + + def on_packets_expired(self, *, packets: Iterable[QuicSentPacket]) -> None: + for packet in packets: + self.bytes_in_flight -= packet.sent_bytes + + def on_packets_lost(self, *, now: float, packets: Iterable[QuicSentPacket]) -> None: + lost_largest_time = 0.0 + for packet in packets: + self.bytes_in_flight -= packet.sent_bytes + lost_largest_time = packet.sent_time + + # start a new congestion event if packet was sent after the + # start of the previous congestion recovery period. + if lost_largest_time > self._congestion_recovery_start_time: + + self._congestion_recovery_start_time = now + + # Normal congestion handle, can't be used in same time as fast convergence + # self._W_max = self.congestion_window + + # fast convergence + if self._W_max is not None and self.congestion_window < self._W_max: + self._W_max = int( + self.congestion_window * (1 + K_CUBIC_LOSS_REDUCTION_FACTOR) / 2 + ) + else: + self._W_max = self.congestion_window + + # normal congestion MD + flight_size = self.bytes_in_flight + new_ssthresh = max( + int(flight_size * K_CUBIC_LOSS_REDUCTION_FACTOR), + K_MINIMUM_WINDOW * self._max_datagram_size, + ) + self.ssthresh = new_ssthresh + self.congestion_window = max( + self.ssthresh, K_MINIMUM_WINDOW * self._max_datagram_size + ) + + # restart a new congestion avoidance phase + self._starting_congestion_avoidance = True + + def on_rtt_measurement(self, *, now: float, rtt: float) -> None: + self.rtt = rtt + # check whether we should exit slow start + if self.ssthresh is None and self._rtt_monitor.is_rtt_increasing( + rtt=rtt, now=now + ): + self.ssthresh = self.congestion_window + + def get_log_data(self) -> Dict[str, Any]: + data = super().get_log_data() + + if self._W_max is None: + data["cubic-wmax"] = None + else: + data["cubic-wmax"] = int(self._W_max) + + return data + + +register_congestion_control("cubic", CubicCongestionControl) diff --git a/src/aioquic/quic/congestion/reno.py b/src/aioquic/quic/congestion/reno.py index 3bd0ee1c0..0ccf0792a 100644 --- a/src/aioquic/quic/congestion/reno.py +++ b/src/aioquic/quic/congestion/reno.py @@ -23,7 +23,7 @@ def __init__(self, *, max_datagram_size: int) -> None: self._congestion_stash = 0 self._rtt_monitor = QuicRttMonitor() - def on_packet_acked(self, *, packet: QuicSentPacket) -> None: + def on_packet_acked(self, *, now: float, packet: QuicSentPacket) -> None: self.bytes_in_flight -= packet.sent_bytes # don't increase window in congestion recovery diff --git a/src/aioquic/quic/recovery.py b/src/aioquic/quic/recovery.py index 3c165a7ec..fe601b028 100644 --- a/src/aioquic/quic/recovery.py +++ b/src/aioquic/quic/recovery.py @@ -2,7 +2,7 @@ import math from typing import Any, Callable, Dict, Iterable, List, Optional -from .congestion import reno # noqa +from .congestion import cubic, reno # noqa from .congestion.base import K_GRANULARITY, create_congestion_control from .logger import QuicLoggerTrace from .packet_builder import QuicDeliveryState, QuicSentPacket @@ -199,7 +199,7 @@ def on_ack_received( is_ack_eliciting = True space.ack_eliciting_in_flight -= 1 if packet.in_flight: - self._cc.on_packet_acked(packet=packet) + self._cc.on_packet_acked(packet=packet, now=now) largest_newly_acked = packet_number largest_sent_time = packet.sent_time @@ -334,10 +334,7 @@ def _get_loss_space(self) -> Optional[QuicPacketSpace]: return loss_space def _log_metrics_updated(self, log_rtt=False) -> None: - data: Dict[str, Any] = { - "bytes_in_flight": self._cc.bytes_in_flight, - "cwnd": self._cc.congestion_window, - } + data: Dict[str, Any] = self._cc.get_log_data() if self._cc.ssthresh is not None: data["ssthresh"] = self._cc.ssthresh diff --git a/tests/test_cubic.py b/tests/test_cubic.py new file mode 100644 index 000000000..17ab86326 --- /dev/null +++ b/tests/test_cubic.py @@ -0,0 +1,68 @@ +import unittest + +from aioquic.quic.congestion.cubic import ( + K_CUBIC_C, + K_CUBIC_LOSS_REDUCTION_FACTOR, + CubicCongestionControl, + QuicSentPacket, + better_cube_root, +) + + +def W_cubic(t, K, W_max): + return K_CUBIC_C * (t - K) ** 3 + (W_max) + + +class CubicTests(unittest.TestCase): + def test_congestion_avoidance(self): + """ + Check if the cubic implementation respects the mathematical + formula defined in the rfc 9438 + """ + + max_datagram_size = 1440 + + n = 400 # number of ms to check + + W_max = 5 # starting W_max + K = better_cube_root(W_max * (1 - K_CUBIC_LOSS_REDUCTION_FACTOR) / K_CUBIC_C) + cwnd = W_max * K_CUBIC_LOSS_REDUCTION_FACTOR + + correct = [] + + test_range = range(n) + + for i in test_range: + correct.append(W_cubic(i / 1000, K, W_max) * max_datagram_size) + + cubic = CubicCongestionControl(max_datagram_size) + cubic.rtt = 0 + cubic._W_max = W_max * max_datagram_size + cubic._starting_congestion_avoidance = True + cubic.congestion_window = cwnd * max_datagram_size + cubic.ssthresh = cubic.congestion_window + cubic._W_est = 0 + + results = [] + for i in test_range: + cwnd = cubic.congestion_window // max_datagram_size # number of segments + + # simulate the reception of cwnd packets (a full window of acks) + for _ in range(int(cwnd)): + packet = QuicSentPacket(None, True, True, True, 0, 0) + packet.sent_bytes = 0 # won't affect results + + cubic.on_packet_acked(packet=packet, now=(i / 1000)) + + results.append(cubic.congestion_window) + + for i in test_range: + # check if it is almost equal to the value of W_cubic + self.assertTrue( + correct[i] * 0.99 <= results[i] <= 1.01 * correct[i], + f"Error at {i}ms, Result={results[i]}, Expected={correct[i]}", + ) + + +if __name__ == "__main__": + unittest.main()