Skip to content

Commit

Permalink
feat(s2n-quic): implement updatable connection limits (#2508)
Browse files Browse the repository at this point in the history
  • Loading branch information
maddeleine authored and camshaft committed Mar 4, 2025
1 parent f21fb99 commit 55cd809
Show file tree
Hide file tree
Showing 12 changed files with 182 additions and 9 deletions.
65 changes: 64 additions & 1 deletion quic/s2n-quic-core/src/connection/limits.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

#[cfg(feature = "alloc")]
use crate::application::ServerName;
use crate::{
ack,
event::{api::SocketAddress, IntoEvent},
Expand All @@ -12,6 +13,8 @@ use crate::{
MaxDatagramFrameSize, MaxIdleTimeout, MigrationSupport, TransportParameters,
},
};
#[cfg(feature = "alloc")]
use bytes::Bytes;
use core::time::Duration;
use s2n_codec::decoder_invariant;

Expand Down Expand Up @@ -53,6 +56,30 @@ impl<'a> ConnectionInfo<'a> {
}
}

#[non_exhaustive]
#[derive(Debug)]
#[cfg(feature = "alloc")]
pub struct HandshakeInfo<'a> {
pub remote_address: SocketAddress<'a>,
pub server_name: Option<&'a ServerName>,
pub application_protocol: &'a Bytes,
}

#[cfg(feature = "alloc")]
impl<'a> HandshakeInfo<'a> {
pub fn new(
remote_address: &'a inet::SocketAddress,
server_name: Option<&'a ServerName>,
application_protocol: &'a Bytes,
) -> HandshakeInfo<'a> {
Self {
remote_address: remote_address.into_event(),
server_name,
application_protocol,
}
}
}

#[derive(Clone, Copy, Debug)]
pub struct Limits {
pub(crate) max_idle_timeout: MaxIdleTimeout,
Expand Down Expand Up @@ -397,16 +424,41 @@ impl Limits {
}
}

#[must_use]
#[derive(Debug)]
pub struct UpdatableLimits<'a>(&'a mut Limits);

impl<'a> UpdatableLimits<'a> {
pub fn new(limits: &'a mut Limits) -> UpdatableLimits<'a> {
UpdatableLimits(limits)
}

pub fn with_stream_batch_size(&mut self, size: u8) {
self.0.stream_batch_size = size;
}
}

/// Creates limits for a given connection
pub trait Limiter: 'static + Send {
fn on_connection(&mut self, info: &ConnectionInfo) -> Limits;

/// Provides another opportunity to change connection limits with information
/// from the handshake
#[inline]
#[cfg(feature = "alloc")]
fn on_post_handshake(&mut self, info: &HandshakeInfo, limits: &mut UpdatableLimits) {
let _ = info;
let _ = limits;
}
}

/// Implement Limiter for a Limits struct
impl Limiter for Limits {
fn on_connection(&mut self, _into: &ConnectionInfo) -> Limits {
*self
}
#[cfg(feature = "alloc")]
fn on_post_handshake(&mut self, _info: &HandshakeInfo, _limits: &mut UpdatableLimits) {}
}

#[cfg(test)]
Expand All @@ -429,4 +481,15 @@ mod tests {
assert!(limits.with_bidirectional_remote_data_window(data).is_ok());
assert!(limits.with_unidirectional_data_window(data).is_ok());
}

// Limits can be updated through the UpdatableLimits wrapper
#[test]
fn updatable_limits() {
let mut limits = Limits::default();
assert_eq!(limits.stream_batch_size, 1);
let mut updatable_limits = UpdatableLimits::new(&mut limits);
let new_size = 10;
updatable_limits.with_stream_batch_size(new_size);
assert_eq!(limits.stream_batch_size, new_size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl connection::Trait for TestConnection {
_subscriber: &mut <Self::Config as endpoint::Config>::EventSubscriber,
_datagram: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
_dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
_conn_limits_endpoint: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
) -> Result<(), connection::Error> {
Ok(())
}
Expand All @@ -148,6 +149,7 @@ impl connection::Trait for TestConnection {
_packet_interceptor: &mut <Self::Config as endpoint::Config>::PacketInterceptor,
_datagram_endpoint: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
_dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
_conn_limits_endpoint: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
) -> Result<(), ProcessingError> {
Ok(())
}
Expand All @@ -163,6 +165,7 @@ impl connection::Trait for TestConnection {
_packet_interceptor: &mut <Self::Config as endpoint::Config>::PacketInterceptor,
_datagram_endpoint: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
_dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
_conn_limits_endpoint: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
) -> Result<(), ProcessingError> {
Ok(())
}
Expand All @@ -178,6 +181,7 @@ impl connection::Trait for TestConnection {
_packet_interceptor: &mut <Self::Config as endpoint::Config>::PacketInterceptor,
_datagram_endpoint: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
_dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
_connection_limits_endpoint: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
) -> Result<(), ProcessingError> {
Ok(())
}
Expand All @@ -193,6 +197,7 @@ impl connection::Trait for TestConnection {
_packet_interceptor: &mut <Self::Config as endpoint::Config>::PacketInterceptor,
_datagram_endpoint: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
_dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
_limits_endpoint: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
) -> Result<(), ProcessingError> {
Ok(())
}
Expand Down
14 changes: 13 additions & 1 deletion quic/s2n-quic-transport/src/connection/connection_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ impl<Config: endpoint::Config> ConnectionImpl<Config> {
subscriber: &mut Config::EventSubscriber,
datagram: &mut Config::DatagramEndpoint,
dc: &mut Config::DcEndpoint,
limits: &mut Config::ConnectionLimits,
) -> Result<(), connection::Error> {
let mut publisher = self.event_context.publisher(timestamp, subscriber);
let space_manager = &mut self.space_manager;
Expand All @@ -285,6 +286,7 @@ impl<Config: endpoint::Config> ConnectionImpl<Config> {
&mut publisher,
datagram,
dc,
limits,
) {
Poll::Ready(Ok(())) => {}
// use `from` instead of `into` so the location is correctly captured
Expand Down Expand Up @@ -658,6 +660,7 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
parameters.event_subscriber,
parameters.datagram_endpoint,
parameters.dc_endpoint,
parameters.limits_endpoint,
) {
connection.with_event_publisher(
parameters.timestamp,
Expand Down Expand Up @@ -1128,12 +1131,13 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
subscriber: &mut Config::EventSubscriber,
datagram: &mut Config::DatagramEndpoint,
dc: &mut Config::DcEndpoint,
conn_limits: &mut Config::ConnectionLimits,
) -> Result<(), connection::Error> {
// reset the queued state first so that new wakeup request are not missed
self.wakeup_handle.wakeup_handled();

// check if crypto progress can be made
self.update_crypto_state(timestamp, subscriber, datagram, dc)?;
self.update_crypto_state(timestamp, subscriber, datagram, dc, conn_limits)?;

// return an error if the application set one
self.error?;
Expand Down Expand Up @@ -1220,6 +1224,7 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
packet_interceptor: &mut Config::PacketInterceptor,
datagram_endpoint: &mut Config::DatagramEndpoint,
dc_endpoint: &mut Config::DcEndpoint,
connection_limits_endpoint: &mut Config::ConnectionLimits,
) -> Result<(), ProcessingError> {
//= https://www.rfc-editor.org/rfc/rfc9000#section-7.2
//= type=TODO
Expand Down Expand Up @@ -1261,6 +1266,7 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
packet_interceptor,
datagram_endpoint,
dc_endpoint,
connection_limits_endpoint,
)?;
}

Expand All @@ -1278,6 +1284,7 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
packet_interceptor: &mut Config::PacketInterceptor,
datagram_endpoint: &mut Config::DatagramEndpoint,
dc_endpoint: &mut Config::DcEndpoint,
connection_limits_endpoint: &mut Config::ConnectionLimits,
) -> Result<(), ProcessingError> {
if let Some((space, handshake_status)) = self.space_manager.initial_mut() {
let mut publisher = self.event_context.publisher(datagram.timestamp, subscriber);
Expand Down Expand Up @@ -1339,6 +1346,7 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
subscriber,
datagram_endpoint,
dc_endpoint,
connection_limits_endpoint,
)?;

// notify the connection a packet was processed
Expand All @@ -1359,6 +1367,7 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
packet_interceptor: &mut Config::PacketInterceptor,
datagram_endpoint: &mut Config::DatagramEndpoint,
dc_endpoint: &mut Config::DcEndpoint,
connection_limits_endpoint: &mut Config::ConnectionLimits,
) -> Result<(), ProcessingError> {
let mut publisher = self.event_context.publisher(datagram.timestamp, subscriber);

Expand Down Expand Up @@ -1444,6 +1453,7 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
subscriber,
datagram_endpoint,
dc_endpoint,
connection_limits_endpoint,
)?;

// notify the connection a packet was processed
Expand All @@ -1464,6 +1474,7 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
packet_interceptor: &mut Config::PacketInterceptor,
datagram_endpoint: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
dc_endpoint: &mut Config::DcEndpoint,
limits_endpoint: &mut Config::ConnectionLimits,
) -> Result<(), ProcessingError> {
let mut publisher = self.event_context.publisher(datagram.timestamp, subscriber);

Expand Down Expand Up @@ -1561,6 +1572,7 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
&mut publisher,
datagram_endpoint,
dc_endpoint,
limits_endpoint,
)?;
}
// notify the connection a packet was processed
Expand Down
11 changes: 11 additions & 0 deletions quic/s2n-quic-transport/src/connection/connection_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub trait ConnectionTrait: 'static + Send + Sized {
subscriber: &mut <Self::Config as endpoint::Config>::EventSubscriber,
datagram: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
conn_limits: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
) -> Result<(), connection::Error>;

// Packet handling
Expand All @@ -126,6 +127,7 @@ pub trait ConnectionTrait: 'static + Send + Sized {
packet_interceptor: &mut <Self::Config as endpoint::Config>::PacketInterceptor,
datagram_endpoint: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
connection_limits_endpoint: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
) -> Result<(), ProcessingError>;

/// Is called when an unprotected initial packet had been received
Expand All @@ -139,6 +141,7 @@ pub trait ConnectionTrait: 'static + Send + Sized {
packet_interceptor: &mut <Self::Config as endpoint::Config>::PacketInterceptor,
datagram_endpoint: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
connection_limits_endpoint: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
) -> Result<(), ProcessingError>;

/// Is called when a handshake packet had been received
Expand All @@ -152,6 +155,7 @@ pub trait ConnectionTrait: 'static + Send + Sized {
packet_interceptor: &mut <Self::Config as endpoint::Config>::PacketInterceptor,
datagram_endpoint: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
connection_limits_endpoint: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
) -> Result<(), ProcessingError>;

/// Is called when a short packet had been received
Expand All @@ -165,6 +169,7 @@ pub trait ConnectionTrait: 'static + Send + Sized {
packet_interceptor: &mut <Self::Config as endpoint::Config>::PacketInterceptor,
datagram_endpoint: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
limits_endpoint: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
) -> Result<(), ProcessingError>;

/// Is called when a version negotiation packet had been received
Expand Down Expand Up @@ -225,6 +230,7 @@ pub trait ConnectionTrait: 'static + Send + Sized {
packet_interceptor: &mut <Self::Config as endpoint::Config>::PacketInterceptor,
datagram_endpoint: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
connection_limits_endpoint: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
check_for_stateless_reset: &mut bool,
) -> Result<(), connection::Error> {
macro_rules! emit_drop_reason {
Expand Down Expand Up @@ -286,6 +292,7 @@ pub trait ConnectionTrait: 'static + Send + Sized {
packet_interceptor,
datagram_endpoint,
dc_endpoint,
connection_limits_endpoint,
),
ProtectedPacket::VersionNegotiation(packet) => self.handle_version_negotiation_packet(
datagram,
Expand All @@ -303,6 +310,7 @@ pub trait ConnectionTrait: 'static + Send + Sized {
packet_interceptor,
datagram_endpoint,
dc_endpoint,
connection_limits_endpoint,
),
ProtectedPacket::ZeroRtt(packet) => self.handle_zero_rtt_packet(
datagram,
Expand All @@ -320,6 +328,7 @@ pub trait ConnectionTrait: 'static + Send + Sized {
packet_interceptor,
datagram_endpoint,
dc_endpoint,
connection_limits_endpoint,
),
ProtectedPacket::Retry(packet) => {
self.handle_retry_packet(datagram, path_id, packet, subscriber, packet_interceptor)
Expand Down Expand Up @@ -378,6 +387,7 @@ pub trait ConnectionTrait: 'static + Send + Sized {
packet_interceptor: &mut <Self::Config as endpoint::Config>::PacketInterceptor,
datagram_endpoint: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
connection_limits_endpoint: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
check_for_stateless_reset: &mut bool,
) -> Result<(), connection::Error> {
macro_rules! emit_drop_reason {
Expand Down Expand Up @@ -432,6 +442,7 @@ pub trait ConnectionTrait: 'static + Send + Sized {
packet_interceptor,
datagram_endpoint,
dc_endpoint,
connection_limits_endpoint,
check_for_stateless_reset,
);

Expand Down
2 changes: 2 additions & 0 deletions quic/s2n-quic-transport/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,6 @@ pub struct Parameters<'a, Cfg: endpoint::Config> {
pub dc_endpoint: &'a mut Cfg::DcEndpoint,
/// The event subscriber for the endpoint
pub event_subscriber: &'a mut Cfg::EventSubscriber,
/// The connection limits provider
pub limits_endpoint: &'a mut Cfg::ConnectionLimits,
}
3 changes: 3 additions & 0 deletions quic/s2n-quic-transport/src/endpoint/initial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ impl<Config: endpoint::Config> endpoint::Endpoint<Config> {
datagram_endpoint: endpoint_context.datagram,
dc_endpoint: endpoint_context.dc,
open_registry: None,
limits_endpoint: endpoint_context.connection_limits,
};

let mut connection = <Config as endpoint::Config>::Connection::new(connection_parameters)?;
Expand Down Expand Up @@ -367,6 +368,7 @@ impl<Config: endpoint::Config> endpoint::Endpoint<Config> {
endpoint_context.packet_interceptor,
endpoint_context.datagram,
endpoint_context.dc,
endpoint_context.connection_limits,
)
.map_err(|err| {
use connection::ProcessingError;
Expand All @@ -391,6 +393,7 @@ impl<Config: endpoint::Config> endpoint::Endpoint<Config> {
endpoint_context.packet_interceptor,
endpoint_context.datagram,
endpoint_context.dc,
endpoint_context.connection_limits,
&mut false,
)?;

Expand Down
Loading

0 comments on commit 55cd809

Please sign in to comment.