diff --git a/quic/s2n-quic-core/src/connection/limits.rs b/quic/s2n-quic-core/src/connection/limits.rs index 233c890e5e..17bdc8dbe6 100644 --- a/quic/s2n-quic-core/src/connection/limits.rs +++ b/quic/s2n-quic-core/src/connection/limits.rs @@ -1,6 +1,7 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 - +#[cfg(feature = "alloc")] +use crate::application::ServerName; use crate::{ ack, event::{api::SocketAddress, IntoEvent}, @@ -12,6 +13,8 @@ use crate::{ MaxDatagramFrameSize, MaxIdleTimeout, MigrationSupport, TransportParameters, }, }; +#[cfg(feature = "alloc")] +use bytes::Bytes; use core::time::Duration; use s2n_codec::decoder_invariant; @@ -53,6 +56,30 @@ impl<'a> ConnectionInfo<'a> { } } +#[non_exhaustive] +#[derive(Debug)] +#[cfg(feature = "alloc")] +pub struct HandshakeInfo<'a> { + pub remote_address: SocketAddress<'a>, + pub server_name: Option<&'a ServerName>, + pub application_protocol: &'a Bytes, +} + +#[cfg(feature = "alloc")] +impl<'a> HandshakeInfo<'a> { + pub fn new( + remote_address: &'a inet::SocketAddress, + server_name: Option<&'a ServerName>, + application_protocol: &'a Bytes, + ) -> HandshakeInfo<'a> { + Self { + remote_address: remote_address.into_event(), + server_name, + application_protocol, + } + } +} + #[derive(Clone, Copy, Debug)] pub struct Limits { pub(crate) max_idle_timeout: MaxIdleTimeout, @@ -397,9 +424,32 @@ impl Limits { } } +#[must_use] +#[derive(Debug)] +pub struct UpdatableLimits<'a>(&'a mut Limits); + +impl<'a> UpdatableLimits<'a> { + pub fn new(limits: &'a mut Limits) -> UpdatableLimits<'a> { + UpdatableLimits(limits) + } + + pub fn with_stream_batch_size(&mut self, size: u8) { + self.0.stream_batch_size = size; + } +} + /// Creates limits for a given connection pub trait Limiter: 'static + Send { fn on_connection(&mut self, info: &ConnectionInfo) -> Limits; + + /// Provides another opportunity to change connection limits with information + /// from the handshake + #[inline] + #[cfg(feature = "alloc")] + fn on_post_handshake(&mut self, info: &HandshakeInfo, limits: &mut UpdatableLimits) { + let _ = info; + let _ = limits; + } } /// Implement Limiter for a Limits struct @@ -407,6 +457,8 @@ impl Limiter for Limits { fn on_connection(&mut self, _into: &ConnectionInfo) -> Limits { *self } + #[cfg(feature = "alloc")] + fn on_post_handshake(&mut self, _info: &HandshakeInfo, _limits: &mut UpdatableLimits) {} } #[cfg(test)] @@ -429,4 +481,15 @@ mod tests { assert!(limits.with_bidirectional_remote_data_window(data).is_ok()); assert!(limits.with_unidirectional_data_window(data).is_ok()); } + + // Limits can be updated through the UpdatableLimits wrapper + #[test] + fn updatable_limits() { + let mut limits = Limits::default(); + assert_eq!(limits.stream_batch_size, 1); + let mut updatable_limits = UpdatableLimits::new(&mut limits); + let new_size = 10; + updatable_limits.with_stream_batch_size(new_size); + assert_eq!(limits.stream_batch_size, new_size); + } } diff --git a/quic/s2n-quic-transport/src/connection/connection_container/tests.rs b/quic/s2n-quic-transport/src/connection/connection_container/tests.rs index 45094e79c8..ceef14dba9 100644 --- a/quic/s2n-quic-transport/src/connection/connection_container/tests.rs +++ b/quic/s2n-quic-transport/src/connection/connection_container/tests.rs @@ -134,6 +134,7 @@ impl connection::Trait for TestConnection { _subscriber: &mut ::EventSubscriber, _datagram: &mut ::DatagramEndpoint, _dc_endpoint: &mut ::DcEndpoint, + _conn_limits_endpoint: &mut ::ConnectionLimits, ) -> Result<(), connection::Error> { Ok(()) } @@ -148,6 +149,7 @@ impl connection::Trait for TestConnection { _packet_interceptor: &mut ::PacketInterceptor, _datagram_endpoint: &mut ::DatagramEndpoint, _dc_endpoint: &mut ::DcEndpoint, + _conn_limits_endpoint: &mut ::ConnectionLimits, ) -> Result<(), ProcessingError> { Ok(()) } @@ -163,6 +165,7 @@ impl connection::Trait for TestConnection { _packet_interceptor: &mut ::PacketInterceptor, _datagram_endpoint: &mut ::DatagramEndpoint, _dc_endpoint: &mut ::DcEndpoint, + _conn_limits_endpoint: &mut ::ConnectionLimits, ) -> Result<(), ProcessingError> { Ok(()) } @@ -178,6 +181,7 @@ impl connection::Trait for TestConnection { _packet_interceptor: &mut ::PacketInterceptor, _datagram_endpoint: &mut ::DatagramEndpoint, _dc_endpoint: &mut ::DcEndpoint, + _connection_limits_endpoint: &mut ::ConnectionLimits, ) -> Result<(), ProcessingError> { Ok(()) } @@ -193,6 +197,7 @@ impl connection::Trait for TestConnection { _packet_interceptor: &mut ::PacketInterceptor, _datagram_endpoint: &mut ::DatagramEndpoint, _dc_endpoint: &mut ::DcEndpoint, + _limits_endpoint: &mut ::ConnectionLimits, ) -> Result<(), ProcessingError> { Ok(()) } diff --git a/quic/s2n-quic-transport/src/connection/connection_impl.rs b/quic/s2n-quic-transport/src/connection/connection_impl.rs index f61030acf4..934853f27b 100644 --- a/quic/s2n-quic-transport/src/connection/connection_impl.rs +++ b/quic/s2n-quic-transport/src/connection/connection_impl.rs @@ -272,6 +272,7 @@ impl ConnectionImpl { subscriber: &mut Config::EventSubscriber, datagram: &mut Config::DatagramEndpoint, dc: &mut Config::DcEndpoint, + limits: &mut Config::ConnectionLimits, ) -> Result<(), connection::Error> { let mut publisher = self.event_context.publisher(timestamp, subscriber); let space_manager = &mut self.space_manager; @@ -285,6 +286,7 @@ impl ConnectionImpl { &mut publisher, datagram, dc, + limits, ) { Poll::Ready(Ok(())) => {} // use `from` instead of `into` so the location is correctly captured @@ -658,6 +660,7 @@ impl connection::Trait for ConnectionImpl { parameters.event_subscriber, parameters.datagram_endpoint, parameters.dc_endpoint, + parameters.limits_endpoint, ) { connection.with_event_publisher( parameters.timestamp, @@ -1128,12 +1131,13 @@ impl connection::Trait for ConnectionImpl { subscriber: &mut Config::EventSubscriber, datagram: &mut Config::DatagramEndpoint, dc: &mut Config::DcEndpoint, + conn_limits: &mut Config::ConnectionLimits, ) -> Result<(), connection::Error> { // reset the queued state first so that new wakeup request are not missed self.wakeup_handle.wakeup_handled(); // check if crypto progress can be made - self.update_crypto_state(timestamp, subscriber, datagram, dc)?; + self.update_crypto_state(timestamp, subscriber, datagram, dc, conn_limits)?; // return an error if the application set one self.error?; @@ -1220,6 +1224,7 @@ impl connection::Trait for ConnectionImpl { packet_interceptor: &mut Config::PacketInterceptor, datagram_endpoint: &mut Config::DatagramEndpoint, dc_endpoint: &mut Config::DcEndpoint, + connection_limits_endpoint: &mut Config::ConnectionLimits, ) -> Result<(), ProcessingError> { //= https://www.rfc-editor.org/rfc/rfc9000#section-7.2 //= type=TODO @@ -1261,6 +1266,7 @@ impl connection::Trait for ConnectionImpl { packet_interceptor, datagram_endpoint, dc_endpoint, + connection_limits_endpoint, )?; } @@ -1278,6 +1284,7 @@ impl connection::Trait for ConnectionImpl { packet_interceptor: &mut Config::PacketInterceptor, datagram_endpoint: &mut Config::DatagramEndpoint, dc_endpoint: &mut Config::DcEndpoint, + connection_limits_endpoint: &mut Config::ConnectionLimits, ) -> Result<(), ProcessingError> { if let Some((space, handshake_status)) = self.space_manager.initial_mut() { let mut publisher = self.event_context.publisher(datagram.timestamp, subscriber); @@ -1339,6 +1346,7 @@ impl connection::Trait for ConnectionImpl { subscriber, datagram_endpoint, dc_endpoint, + connection_limits_endpoint, )?; // notify the connection a packet was processed @@ -1359,6 +1367,7 @@ impl connection::Trait for ConnectionImpl { packet_interceptor: &mut Config::PacketInterceptor, datagram_endpoint: &mut Config::DatagramEndpoint, dc_endpoint: &mut Config::DcEndpoint, + connection_limits_endpoint: &mut Config::ConnectionLimits, ) -> Result<(), ProcessingError> { let mut publisher = self.event_context.publisher(datagram.timestamp, subscriber); @@ -1444,6 +1453,7 @@ impl connection::Trait for ConnectionImpl { subscriber, datagram_endpoint, dc_endpoint, + connection_limits_endpoint, )?; // notify the connection a packet was processed @@ -1464,6 +1474,7 @@ impl connection::Trait for ConnectionImpl { packet_interceptor: &mut Config::PacketInterceptor, datagram_endpoint: &mut ::DatagramEndpoint, dc_endpoint: &mut Config::DcEndpoint, + limits_endpoint: &mut Config::ConnectionLimits, ) -> Result<(), ProcessingError> { let mut publisher = self.event_context.publisher(datagram.timestamp, subscriber); @@ -1561,6 +1572,7 @@ impl connection::Trait for ConnectionImpl { &mut publisher, datagram_endpoint, dc_endpoint, + limits_endpoint, )?; } // notify the connection a packet was processed diff --git a/quic/s2n-quic-transport/src/connection/connection_trait.rs b/quic/s2n-quic-transport/src/connection/connection_trait.rs index b6f74b96b7..305c18e211 100644 --- a/quic/s2n-quic-transport/src/connection/connection_trait.rs +++ b/quic/s2n-quic-transport/src/connection/connection_trait.rs @@ -111,6 +111,7 @@ pub trait ConnectionTrait: 'static + Send + Sized { subscriber: &mut ::EventSubscriber, datagram: &mut ::DatagramEndpoint, dc_endpoint: &mut ::DcEndpoint, + conn_limits: &mut ::ConnectionLimits, ) -> Result<(), connection::Error>; // Packet handling @@ -126,6 +127,7 @@ pub trait ConnectionTrait: 'static + Send + Sized { packet_interceptor: &mut ::PacketInterceptor, datagram_endpoint: &mut ::DatagramEndpoint, dc_endpoint: &mut ::DcEndpoint, + connection_limits_endpoint: &mut ::ConnectionLimits, ) -> Result<(), ProcessingError>; /// Is called when an unprotected initial packet had been received @@ -139,6 +141,7 @@ pub trait ConnectionTrait: 'static + Send + Sized { packet_interceptor: &mut ::PacketInterceptor, datagram_endpoint: &mut ::DatagramEndpoint, dc_endpoint: &mut ::DcEndpoint, + connection_limits_endpoint: &mut ::ConnectionLimits, ) -> Result<(), ProcessingError>; /// Is called when a handshake packet had been received @@ -152,6 +155,7 @@ pub trait ConnectionTrait: 'static + Send + Sized { packet_interceptor: &mut ::PacketInterceptor, datagram_endpoint: &mut ::DatagramEndpoint, dc_endpoint: &mut ::DcEndpoint, + connection_limits_endpoint: &mut ::ConnectionLimits, ) -> Result<(), ProcessingError>; /// Is called when a short packet had been received @@ -165,6 +169,7 @@ pub trait ConnectionTrait: 'static + Send + Sized { packet_interceptor: &mut ::PacketInterceptor, datagram_endpoint: &mut ::DatagramEndpoint, dc_endpoint: &mut ::DcEndpoint, + limits_endpoint: &mut ::ConnectionLimits, ) -> Result<(), ProcessingError>; /// Is called when a version negotiation packet had been received @@ -225,6 +230,7 @@ pub trait ConnectionTrait: 'static + Send + Sized { packet_interceptor: &mut ::PacketInterceptor, datagram_endpoint: &mut ::DatagramEndpoint, dc_endpoint: &mut ::DcEndpoint, + connection_limits_endpoint: &mut ::ConnectionLimits, check_for_stateless_reset: &mut bool, ) -> Result<(), connection::Error> { macro_rules! emit_drop_reason { @@ -286,6 +292,7 @@ pub trait ConnectionTrait: 'static + Send + Sized { packet_interceptor, datagram_endpoint, dc_endpoint, + connection_limits_endpoint, ), ProtectedPacket::VersionNegotiation(packet) => self.handle_version_negotiation_packet( datagram, @@ -303,6 +310,7 @@ pub trait ConnectionTrait: 'static + Send + Sized { packet_interceptor, datagram_endpoint, dc_endpoint, + connection_limits_endpoint, ), ProtectedPacket::ZeroRtt(packet) => self.handle_zero_rtt_packet( datagram, @@ -320,6 +328,7 @@ pub trait ConnectionTrait: 'static + Send + Sized { packet_interceptor, datagram_endpoint, dc_endpoint, + connection_limits_endpoint, ), ProtectedPacket::Retry(packet) => { self.handle_retry_packet(datagram, path_id, packet, subscriber, packet_interceptor) @@ -378,6 +387,7 @@ pub trait ConnectionTrait: 'static + Send + Sized { packet_interceptor: &mut ::PacketInterceptor, datagram_endpoint: &mut ::DatagramEndpoint, dc_endpoint: &mut ::DcEndpoint, + connection_limits_endpoint: &mut ::ConnectionLimits, check_for_stateless_reset: &mut bool, ) -> Result<(), connection::Error> { macro_rules! emit_drop_reason { @@ -432,6 +442,7 @@ pub trait ConnectionTrait: 'static + Send + Sized { packet_interceptor, datagram_endpoint, dc_endpoint, + connection_limits_endpoint, check_for_stateless_reset, ); diff --git a/quic/s2n-quic-transport/src/connection/mod.rs b/quic/s2n-quic-transport/src/connection/mod.rs index 8a6a932d48..800774f37f 100644 --- a/quic/s2n-quic-transport/src/connection/mod.rs +++ b/quic/s2n-quic-transport/src/connection/mod.rs @@ -87,4 +87,6 @@ pub struct Parameters<'a, Cfg: endpoint::Config> { pub dc_endpoint: &'a mut Cfg::DcEndpoint, /// The event subscriber for the endpoint pub event_subscriber: &'a mut Cfg::EventSubscriber, + /// The connection limits provider + pub limits_endpoint: &'a mut Cfg::ConnectionLimits, } diff --git a/quic/s2n-quic-transport/src/endpoint/initial.rs b/quic/s2n-quic-transport/src/endpoint/initial.rs index f3959fdc3c..ab7318e6c7 100644 --- a/quic/s2n-quic-transport/src/endpoint/initial.rs +++ b/quic/s2n-quic-transport/src/endpoint/initial.rs @@ -315,6 +315,7 @@ impl endpoint::Endpoint { datagram_endpoint: endpoint_context.datagram, dc_endpoint: endpoint_context.dc, open_registry: None, + limits_endpoint: endpoint_context.connection_limits, }; let mut connection = ::Connection::new(connection_parameters)?; @@ -367,6 +368,7 @@ impl endpoint::Endpoint { endpoint_context.packet_interceptor, endpoint_context.datagram, endpoint_context.dc, + endpoint_context.connection_limits, ) .map_err(|err| { use connection::ProcessingError; @@ -391,6 +393,7 @@ impl endpoint::Endpoint { endpoint_context.packet_interceptor, endpoint_context.datagram, endpoint_context.dc, + endpoint_context.connection_limits, &mut false, )?; diff --git a/quic/s2n-quic-transport/src/endpoint/mod.rs b/quic/s2n-quic-transport/src/endpoint/mod.rs index ee41fb630f..ea6ccbc2e0 100644 --- a/quic/s2n-quic-transport/src/endpoint/mod.rs +++ b/quic/s2n-quic-transport/src/endpoint/mod.rs @@ -214,6 +214,7 @@ impl s2n_quic_core::endpoint::Endpoint for Endpoint { endpoint_context.event_subscriber, endpoint_context.datagram, endpoint_context.dc, + endpoint_context.connection_limits, ) { conn.close( error, @@ -607,6 +608,7 @@ impl Endpoint { endpoint_context.packet_interceptor, endpoint_context.datagram, endpoint_context.dc, + endpoint_context.connection_limits, &mut check_for_stateless_reset, ) { //= https://www.rfc-editor.org/rfc/rfc9000#section-10.2.1 @@ -633,6 +635,7 @@ impl Endpoint { endpoint_context.packet_interceptor, endpoint_context.datagram, endpoint_context.dc, + endpoint_context.connection_limits, &mut check_for_stateless_reset, ) { //= https://www.rfc-editor.org/rfc/rfc9000#section-10.2.1 @@ -1228,6 +1231,7 @@ impl Endpoint { datagram_endpoint: endpoint_context.datagram, dc_endpoint: endpoint_context.dc, open_registry, + limits_endpoint: endpoint_context.connection_limits, }; let connection = ::Connection::new(connection_parameters)?; self.connections diff --git a/quic/s2n-quic-transport/src/space/mod.rs b/quic/s2n-quic-transport/src/space/mod.rs index 9c02489683..dc0241d237 100644 --- a/quic/s2n-quic-transport/src/space/mod.rs +++ b/quic/s2n-quic-transport/src/space/mod.rs @@ -248,6 +248,7 @@ impl PacketSpaceManager { publisher: &mut Pub, datagram: &mut Config::DatagramEndpoint, dc: &mut Config::DcEndpoint, + limits_endpoint: &mut Config::ConnectionLimits, ) -> Poll> { if let Some(session_info) = self.session_info.as_mut() { let mut context: SessionContext = SessionContext { @@ -268,6 +269,7 @@ impl PacketSpaceManager { publisher, datagram, dc, + limits_endpoint, }; match session_info.session.poll(&mut context)? { @@ -295,6 +297,7 @@ impl PacketSpaceManager { publisher: &mut Pub, datagram: &mut Config::DatagramEndpoint, dc: &mut Config::DcEndpoint, + limits_endpoint: &mut Config::ConnectionLimits, ) -> Result<(), transport::Error> { if let Some(session_info) = self.session_info.as_mut() { let mut context: SessionContext = SessionContext { @@ -315,6 +318,7 @@ impl PacketSpaceManager { publisher, datagram, dc, + limits_endpoint, }; session_info diff --git a/quic/s2n-quic-transport/src/space/session_context.rs b/quic/s2n-quic-transport/src/space/session_context.rs index 374a4555da..438b18e1fa 100644 --- a/quic/s2n-quic-transport/src/space/session_context.rs +++ b/quic/s2n-quic-transport/src/space/session_context.rs @@ -17,15 +17,20 @@ use s2n_codec::{DecoderBuffer, DecoderValue}; use s2n_quic_core::{ ack, application::ServerName, - connection::{InitialId, PeerId}, - crypto, - crypto::{tls, tls::ApplicationParameters, CryptoSuite, Key}, + connection::{ + limits::{HandshakeInfo, Limiter, UpdatableLimits}, + InitialId, PeerId, + }, + crypto::{ + self, + tls::{self, ApplicationParameters}, + CryptoSuite, Key, + }, ct::ConstantTimeEq, datagram::{ConnectionInfo, Endpoint}, - dc, - dc::Endpoint as _, - event, + dc::{self, Endpoint as _}, event::{ + self, builder::{DcState, DcStateChanged}, IntoEvent, }, @@ -62,6 +67,7 @@ pub struct SessionContext<'a, Config: endpoint::Config, Pub: event::ConnectionPu pub publisher: &'a mut Pub, pub datagram: &'a mut Config::DatagramEndpoint, pub dc: &'a mut Config::DcEndpoint, + pub limits_endpoint: &'a mut Config::ConnectionLimits, } impl SessionContext<'_, Config, Pub> { @@ -411,6 +417,16 @@ impl endpoint::Type::Server => self.on_client_params(param_decoder)?, }; + let remote_address = self.path_manager.active_path().remote_address().0; + let info = HandshakeInfo::new( + &remote_address, + self.server_name.as_ref(), + self.application_protocol, + ); + let mut updatable_limits = UpdatableLimits::new(self.limits); + self.limits_endpoint + .on_post_handshake(&info, &mut updatable_limits); + self.local_id_registry .set_active_connection_id_limit(active_connection_id_limit.as_u64()); diff --git a/quic/s2n-quic/src/provider/limits.rs b/quic/s2n-quic/src/provider/limits.rs index 3f6649a33a..7869340b2b 100644 --- a/quic/s2n-quic/src/provider/limits.rs +++ b/quic/s2n-quic/src/provider/limits.rs @@ -3,7 +3,9 @@ //! Provides limits support for a connection -pub use s2n_quic_core::connection::limits::{ConnectionInfo, Limiter, Limits}; +pub use s2n_quic_core::connection::limits::{ + ConnectionInfo, HandshakeInfo, Limiter, Limits, UpdatableLimits, +}; pub trait Provider { type Limits: 'static + Send + Limiter; diff --git a/quic/s2n-quic/src/tests.rs b/quic/s2n-quic/src/tests.rs index e4cb361238..d612ac8137 100644 --- a/quic/s2n-quic/src/tests.rs +++ b/quic/s2n-quic/src/tests.rs @@ -24,6 +24,7 @@ use std::{ #[macro_use] mod recorder; +mod connection_limits; mod resumption; mod setup; use setup::*; diff --git a/quic/s2n-quic/src/tests/connection_limits.rs b/quic/s2n-quic/src/tests/connection_limits.rs new file mode 100644 index 0000000000..5664291822 --- /dev/null +++ b/quic/s2n-quic/src/tests/connection_limits.rs @@ -0,0 +1,50 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::*; +use s2n_quic_core::connection::limits::{ + ConnectionInfo, HandshakeInfo, Limiter, Limits, UpdatableLimits, +}; + +#[test] +fn connection_limits() { + struct LimitsProvider; + impl Limiter for LimitsProvider { + fn on_connection(&mut self, info: &ConnectionInfo) -> Limits { + let addr: [u8; 4] = [1, 0, 0, 1]; + let port = 49153; + assert_eq!(info.remote_address.ip(), addr); + assert_eq!(info.remote_address.port(), port); + Limits::default() + } + + fn on_post_handshake(&mut self, info: &HandshakeInfo, limits: &mut UpdatableLimits) { + let addr: [u8; 4] = [1, 0, 0, 1]; + let port = 49153; + assert_eq!(info.remote_address.ip(), addr); + assert_eq!(info.remote_address.port(), port); + assert_eq!(*info.server_name.unwrap(), "localhost".into()); + assert_eq!(info.application_protocol, "h3"); + limits.with_stream_batch_size(10); + } + } + + let model = Model::default(); + test(model, |handle| { + let server = Server::builder() + .with_io(handle.builder().build()?)? + .with_tls(SERVER_CERTS)? + .with_limits(LimitsProvider)? + .start()?; + + let client = Client::builder() + .with_io(handle.builder().build().unwrap())? + .with_tls(certificates::CERT_PEM)? + .start()?; + let addr = start_server(server)?; + start_client(client, addr, Data::new(1000))?; + + Ok(addr) + }) + .unwrap(); +}