diff --git a/.gitignore b/.gitignore index ab41634c3d..1427245ea2 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ Cargo.lock dhat-heap.json flamegraph.svg perf.data* +*.snap.new diff --git a/dc/s2n-quic-dc/src/event/generated.rs b/dc/s2n-quic-dc/src/event/generated.rs index 034631a94e..768aa9dbb0 100644 --- a/dc/s2n-quic-dc/src/event/generated.rs +++ b/dc/s2n-quic-dc/src/event/generated.rs @@ -439,6 +439,55 @@ pub mod testing { use crate::event::snapshot::Location; use core::sync::atomic::{AtomicU32, Ordering}; use std::sync::Mutex; + pub mod endpoint { + use super::*; + pub struct Subscriber { + location: Option, + output: Mutex>, + } + impl Drop for Subscriber { + fn drop(&mut self) { + if std::thread::panicking() { + return; + } + if let Some(location) = self.location.as_ref() { + location.snapshot_log(&self.output.lock().unwrap()); + } + } + } + impl Subscriber { + #[doc = r" Creates a subscriber with snapshot assertions enabled"] + #[track_caller] + pub fn snapshot() -> Self { + let mut sub = Self::no_snapshot(); + sub.location = Location::from_thread_name(); + sub + } + #[doc = r" Creates a subscriber with snapshot assertions enabled"] + #[track_caller] + pub fn named_snapshot(name: Name) -> Self { + let mut sub = Self::no_snapshot(); + sub.location = Some(Location::new(name)); + sub + } + #[doc = r" Creates a subscriber with snapshot assertions disabled"] + pub fn no_snapshot() -> Self { + Self { + location: None, + output: Default::default(), + } + } + } + impl super::super::Subscriber for Subscriber { + type ConnectionContext = (); + fn create_connection_context( + &self, + _meta: &api::ConnectionMeta, + _info: &api::ConnectionInfo, + ) -> Self::ConnectionContext { + } + } + } #[derive(Clone, Debug)] pub struct Subscriber { location: Option, diff --git a/quic/s2n-quic-core/src/event/generated.rs b/quic/s2n-quic-core/src/event/generated.rs index 5070eb2cde..a418bd3005 100644 --- a/quic/s2n-quic-core/src/event/generated.rs +++ b/quic/s2n-quic-core/src/event/generated.rs @@ -1148,6 +1148,16 @@ pub mod api { pub struct PlatformTx { #[doc = " The number of packets sent"] pub count: usize, + #[doc = " The number of syscalls performed"] + pub syscalls: usize, + #[doc = " The number of syscalls that got blocked"] + pub blocked_syscalls: usize, + #[doc = " The total number of errors encountered since the last event"] + pub total_errors: usize, + #[doc = " The number of specific error codes dropped"] + #[doc = ""] + #[doc = " This can happen when a burst of errors exceeds the capacity of the recorder"] + pub dropped_errors: usize, } impl Event for PlatformTx { const NAME: &'static str = "platform:tx"; @@ -1168,6 +1178,16 @@ pub mod api { pub struct PlatformRx { #[doc = " The number of packets received"] pub count: usize, + #[doc = " The number of syscalls performed"] + pub syscalls: usize, + #[doc = " The number of syscalls that got blocked"] + pub blocked_syscalls: usize, + #[doc = " The total number of errors encountered since the last event"] + pub total_errors: usize, + #[doc = " The number of specific error codes dropped"] + #[doc = ""] + #[doc = " This can happen when a burst of errors exceeds the capacity of the recorder"] + pub dropped_errors: usize, } impl Event for PlatformRx { const NAME: &'static str = "platform:rx"; @@ -1215,6 +1235,15 @@ pub mod api { } #[derive(Clone, Debug)] #[non_exhaustive] + pub struct PlatformEventLoopStarted<'a> { + #[doc = " The local address of the socket"] + pub local_address: SocketAddress<'a>, + } + impl<'a> Event for PlatformEventLoopStarted<'a> { + const NAME: &'static str = "platform:started"; + } + #[derive(Clone, Debug)] + #[non_exhaustive] pub enum PlatformFeatureConfiguration { #[non_exhaustive] #[doc = " Emitted when segment offload was configured"] @@ -2397,8 +2426,14 @@ pub mod tracing { api::EndpointType::Client {} => self.client.id(), api::EndpointType::Server {} => self.server.id(), }; - let api::PlatformTx { count } = event; - tracing :: event ! (target : "platform_tx" , parent : parent , tracing :: Level :: DEBUG , count = tracing :: field :: debug (count)); + let api::PlatformTx { + count, + syscalls, + blocked_syscalls, + total_errors, + dropped_errors, + } = event; + tracing :: event ! (target : "platform_tx" , parent : parent , tracing :: Level :: DEBUG , count = tracing :: field :: debug (count) , syscalls = tracing :: field :: debug (syscalls) , blocked_syscalls = tracing :: field :: debug (blocked_syscalls) , total_errors = tracing :: field :: debug (total_errors) , dropped_errors = tracing :: field :: debug (dropped_errors)); } #[inline] fn on_platform_tx_error(&mut self, meta: &api::EndpointMeta, event: &api::PlatformTxError) { @@ -2415,8 +2450,14 @@ pub mod tracing { api::EndpointType::Client {} => self.client.id(), api::EndpointType::Server {} => self.server.id(), }; - let api::PlatformRx { count } = event; - tracing :: event ! (target : "platform_rx" , parent : parent , tracing :: Level :: DEBUG , count = tracing :: field :: debug (count)); + let api::PlatformRx { + count, + syscalls, + blocked_syscalls, + total_errors, + dropped_errors, + } = event; + tracing :: event ! (target : "platform_rx" , parent : parent , tracing :: Level :: DEBUG , count = tracing :: field :: debug (count) , syscalls = tracing :: field :: debug (syscalls) , blocked_syscalls = tracing :: field :: debug (blocked_syscalls) , total_errors = tracing :: field :: debug (total_errors) , dropped_errors = tracing :: field :: debug (dropped_errors)); } #[inline] fn on_platform_rx_error(&mut self, meta: &api::EndpointMeta, event: &api::PlatformRxError) { @@ -2474,6 +2515,19 @@ pub mod tracing { } = event; tracing :: event ! (target : "platform_event_loop_sleep" , parent : parent , tracing :: Level :: DEBUG , timeout = tracing :: field :: debug (timeout) , processing_duration = tracing :: field :: debug (processing_duration)); } + #[inline] + fn on_platform_event_loop_started( + &mut self, + meta: &api::EndpointMeta, + event: &api::PlatformEventLoopStarted, + ) { + let parent = match meta.endpoint_type { + api::EndpointType::Client {} => self.client.id(), + api::EndpointType::Server {} => self.server.id(), + }; + let api::PlatformEventLoopStarted { local_address } = event; + tracing :: event ! (target : "platform_event_loop_started" , parent : parent , tracing :: Level :: DEBUG , local_address = tracing :: field :: debug (local_address)); + } } } pub mod builder { @@ -4512,13 +4566,33 @@ pub mod builder { pub struct PlatformTx { #[doc = " The number of packets sent"] pub count: usize, + #[doc = " The number of syscalls performed"] + pub syscalls: usize, + #[doc = " The number of syscalls that got blocked"] + pub blocked_syscalls: usize, + #[doc = " The total number of errors encountered since the last event"] + pub total_errors: usize, + #[doc = " The number of specific error codes dropped"] + #[doc = ""] + #[doc = " This can happen when a burst of errors exceeds the capacity of the recorder"] + pub dropped_errors: usize, } impl IntoEvent for PlatformTx { #[inline] fn into_event(self) -> api::PlatformTx { - let PlatformTx { count } = self; + let PlatformTx { + count, + syscalls, + blocked_syscalls, + total_errors, + dropped_errors, + } = self; api::PlatformTx { count: count.into_event(), + syscalls: syscalls.into_event(), + blocked_syscalls: blocked_syscalls.into_event(), + total_errors: total_errors.into_event(), + dropped_errors: dropped_errors.into_event(), } } } @@ -4542,13 +4616,33 @@ pub mod builder { pub struct PlatformRx { #[doc = " The number of packets received"] pub count: usize, + #[doc = " The number of syscalls performed"] + pub syscalls: usize, + #[doc = " The number of syscalls that got blocked"] + pub blocked_syscalls: usize, + #[doc = " The total number of errors encountered since the last event"] + pub total_errors: usize, + #[doc = " The number of specific error codes dropped"] + #[doc = ""] + #[doc = " This can happen when a burst of errors exceeds the capacity of the recorder"] + pub dropped_errors: usize, } impl IntoEvent for PlatformRx { #[inline] fn into_event(self) -> api::PlatformRx { - let PlatformRx { count } = self; + let PlatformRx { + count, + syscalls, + blocked_syscalls, + total_errors, + dropped_errors, + } = self; api::PlatformRx { count: count.into_event(), + syscalls: syscalls.into_event(), + blocked_syscalls: blocked_syscalls.into_event(), + total_errors: total_errors.into_event(), + dropped_errors: dropped_errors.into_event(), } } } @@ -4626,6 +4720,20 @@ pub mod builder { } } #[derive(Clone, Debug)] + pub struct PlatformEventLoopStarted<'a> { + #[doc = " The local address of the socket"] + pub local_address: SocketAddress<'a>, + } + impl<'a> IntoEvent> for PlatformEventLoopStarted<'a> { + #[inline] + fn into_event(self) -> api::PlatformEventLoopStarted<'a> { + let PlatformEventLoopStarted { local_address } = self; + api::PlatformEventLoopStarted { + local_address: local_address.into_event(), + } + } + } + #[derive(Clone, Debug)] pub enum PlatformFeatureConfiguration { #[doc = " Emitted when segment offload was configured"] Gso { @@ -5478,6 +5586,16 @@ mod traits { let _ = meta; let _ = event; } + #[doc = "Called when the `PlatformEventLoopStarted` event is triggered"] + #[inline] + fn on_platform_event_loop_started( + &mut self, + meta: &EndpointMeta, + event: &PlatformEventLoopStarted, + ) { + let _ = meta; + let _ = event; + } #[doc = r" Called for each event that relates to the endpoint and all connections"] #[inline] fn on_event(&mut self, meta: &M, event: &E) { @@ -6106,6 +6224,15 @@ mod traits { (self.1).on_platform_event_loop_sleep(meta, event); } #[inline] + fn on_platform_event_loop_started( + &mut self, + meta: &EndpointMeta, + event: &PlatformEventLoopStarted, + ) { + (self.0).on_platform_event_loop_started(meta, event); + (self.1).on_platform_event_loop_started(meta, event); + } + #[inline] fn on_event(&mut self, meta: &M, event: &E) { self.0.on_event(meta, event); self.1.on_event(meta, event); @@ -6173,6 +6300,8 @@ mod traits { fn on_platform_event_loop_wakeup(&mut self, event: builder::PlatformEventLoopWakeup); #[doc = "Publishes a `PlatformEventLoopSleep` event to the publisher's subscriber"] fn on_platform_event_loop_sleep(&mut self, event: builder::PlatformEventLoopSleep); + #[doc = "Publishes a `PlatformEventLoopStarted` event to the publisher's subscriber"] + fn on_platform_event_loop_started(&mut self, event: builder::PlatformEventLoopStarted); #[doc = r" Returns the QUIC version, if any"] fn quic_version(&self) -> Option; } @@ -6300,6 +6429,13 @@ mod traits { self.subscriber.on_event(&self.meta, &event); } #[inline] + fn on_platform_event_loop_started(&mut self, event: builder::PlatformEventLoopStarted) { + let event = event.into_event(); + self.subscriber + .on_platform_event_loop_started(&self.meta, &event); + self.subscriber.on_event(&self.meta, &event); + } + #[inline] fn quic_version(&self) -> Option { self.quic_version } @@ -7535,6 +7671,197 @@ pub mod metrics { pub mod testing { use super::*; use crate::event::snapshot::Location; + pub mod endpoint { + use super::*; + pub struct Subscriber { + location: Option, + output: Vec, + pub version_information: u32, + pub endpoint_packet_sent: u32, + pub endpoint_packet_received: u32, + pub endpoint_datagram_sent: u32, + pub endpoint_datagram_received: u32, + pub endpoint_datagram_dropped: u32, + pub endpoint_connection_attempt_failed: u32, + pub platform_tx: u32, + pub platform_tx_error: u32, + pub platform_rx: u32, + pub platform_rx_error: u32, + pub platform_feature_configured: u32, + pub platform_event_loop_wakeup: u32, + pub platform_event_loop_sleep: u32, + pub platform_event_loop_started: u32, + } + impl Drop for Subscriber { + fn drop(&mut self) { + if std::thread::panicking() { + return; + } + if let Some(location) = self.location.as_ref() { + location.snapshot_log(&self.output); + } + } + } + impl Subscriber { + #[doc = r" Creates a subscriber with snapshot assertions enabled"] + #[track_caller] + pub fn snapshot() -> Self { + let mut sub = Self::no_snapshot(); + sub.location = Location::from_thread_name(); + sub + } + #[doc = r" Creates a subscriber with snapshot assertions enabled"] + #[track_caller] + pub fn named_snapshot(name: Name) -> Self { + let mut sub = Self::no_snapshot(); + sub.location = Some(Location::new(name)); + sub + } + #[doc = r" Creates a subscriber with snapshot assertions disabled"] + pub fn no_snapshot() -> Self { + Self { + location: None, + output: Default::default(), + version_information: 0, + endpoint_packet_sent: 0, + endpoint_packet_received: 0, + endpoint_datagram_sent: 0, + endpoint_datagram_received: 0, + endpoint_datagram_dropped: 0, + endpoint_connection_attempt_failed: 0, + platform_tx: 0, + platform_tx_error: 0, + platform_rx: 0, + platform_rx_error: 0, + platform_feature_configured: 0, + platform_event_loop_wakeup: 0, + platform_event_loop_sleep: 0, + platform_event_loop_started: 0, + } + } + } + impl super::super::Subscriber for Subscriber { + type ConnectionContext = (); + fn create_connection_context( + &mut self, + _meta: &api::ConnectionMeta, + _info: &api::ConnectionInfo, + ) -> Self::ConnectionContext { + } + fn on_version_information( + &mut self, + meta: &api::EndpointMeta, + event: &api::VersionInformation, + ) { + self.version_information += 1; + self.output.push(format!("{meta:?} {event:?}")); + } + fn on_endpoint_packet_sent( + &mut self, + meta: &api::EndpointMeta, + event: &api::EndpointPacketSent, + ) { + self.endpoint_packet_sent += 1; + self.output.push(format!("{meta:?} {event:?}")); + } + fn on_endpoint_packet_received( + &mut self, + meta: &api::EndpointMeta, + event: &api::EndpointPacketReceived, + ) { + self.endpoint_packet_received += 1; + self.output.push(format!("{meta:?} {event:?}")); + } + fn on_endpoint_datagram_sent( + &mut self, + meta: &api::EndpointMeta, + event: &api::EndpointDatagramSent, + ) { + self.endpoint_datagram_sent += 1; + self.output.push(format!("{meta:?} {event:?}")); + } + fn on_endpoint_datagram_received( + &mut self, + meta: &api::EndpointMeta, + event: &api::EndpointDatagramReceived, + ) { + self.endpoint_datagram_received += 1; + self.output.push(format!("{meta:?} {event:?}")); + } + fn on_endpoint_datagram_dropped( + &mut self, + meta: &api::EndpointMeta, + event: &api::EndpointDatagramDropped, + ) { + self.endpoint_datagram_dropped += 1; + self.output.push(format!("{meta:?} {event:?}")); + } + fn on_endpoint_connection_attempt_failed( + &mut self, + meta: &api::EndpointMeta, + event: &api::EndpointConnectionAttemptFailed, + ) { + self.endpoint_connection_attempt_failed += 1; + self.output.push(format!("{meta:?} {event:?}")); + } + fn on_platform_tx(&mut self, meta: &api::EndpointMeta, event: &api::PlatformTx) { + self.platform_tx += 1; + self.output.push(format!("{meta:?} {event:?}")); + } + fn on_platform_tx_error( + &mut self, + meta: &api::EndpointMeta, + event: &api::PlatformTxError, + ) { + self.platform_tx_error += 1; + self.output.push(format!("{meta:?} {event:?}")); + } + fn on_platform_rx(&mut self, meta: &api::EndpointMeta, event: &api::PlatformRx) { + self.platform_rx += 1; + self.output.push(format!("{meta:?} {event:?}")); + } + fn on_platform_rx_error( + &mut self, + meta: &api::EndpointMeta, + event: &api::PlatformRxError, + ) { + self.platform_rx_error += 1; + self.output.push(format!("{meta:?} {event:?}")); + } + fn on_platform_feature_configured( + &mut self, + meta: &api::EndpointMeta, + event: &api::PlatformFeatureConfigured, + ) { + self.platform_feature_configured += 1; + self.output.push(format!("{meta:?} {event:?}")); + } + fn on_platform_event_loop_wakeup( + &mut self, + meta: &api::EndpointMeta, + event: &api::PlatformEventLoopWakeup, + ) { + self.platform_event_loop_wakeup += 1; + self.output.push(format!("{meta:?} {event:?}")); + } + fn on_platform_event_loop_sleep( + &mut self, + meta: &api::EndpointMeta, + event: &api::PlatformEventLoopSleep, + ) { + self.platform_event_loop_sleep += 1; + self.output.push(format!("{meta:?} {event:?}")); + } + fn on_platform_event_loop_started( + &mut self, + meta: &api::EndpointMeta, + event: &api::PlatformEventLoopStarted, + ) { + self.platform_event_loop_started += 1; + self.output.push(format!("{meta:?} {event:?}")); + } + } + } #[derive(Clone, Debug)] pub struct Subscriber { location: Option, @@ -7596,6 +7923,7 @@ pub mod testing { pub platform_feature_configured: u32, pub platform_event_loop_wakeup: u32, pub platform_event_loop_sleep: u32, + pub platform_event_loop_started: u32, } impl Drop for Subscriber { fn drop(&mut self) { @@ -7684,6 +8012,7 @@ pub mod testing { platform_feature_configured: 0, platform_event_loop_wakeup: 0, platform_event_loop_sleep: 0, + platform_event_loop_started: 0, } } } @@ -8265,6 +8594,14 @@ pub mod testing { self.platform_event_loop_sleep += 1; self.output.push(format!("{meta:?} {event:?}")); } + fn on_platform_event_loop_started( + &mut self, + meta: &api::EndpointMeta, + event: &api::PlatformEventLoopStarted, + ) { + self.platform_event_loop_started += 1; + self.output.push(format!("{meta:?} {event:?}")); + } } #[derive(Clone, Debug)] pub struct Publisher { @@ -8327,6 +8664,7 @@ pub mod testing { pub platform_feature_configured: u32, pub platform_event_loop_wakeup: u32, pub platform_event_loop_sleep: u32, + pub platform_event_loop_started: u32, } impl Publisher { #[doc = r" Creates a publisher with snapshot assertions enabled"] @@ -8405,6 +8743,7 @@ pub mod testing { platform_feature_configured: 0, platform_event_loop_wakeup: 0, platform_event_loop_sleep: 0, + platform_event_loop_started: 0, } } } @@ -8482,6 +8821,11 @@ pub mod testing { let event = event.into_event(); self.output.push(format!("{event:?}")); } + fn on_platform_event_loop_started(&mut self, event: builder::PlatformEventLoopStarted) { + self.platform_event_loop_started += 1; + let event = event.into_event(); + self.output.push(format!("{event:?}")); + } fn quic_version(&self) -> Option { Some(1) } diff --git a/quic/s2n-quic-core/src/io/event_loop.rs b/quic/s2n-quic-core/src/io/event_loop.rs index 365d48ba01..87a6c9eee5 100644 --- a/quic/s2n-quic-core/src/io/event_loop.rs +++ b/quic/s2n-quic-core/src/io/event_loop.rs @@ -3,7 +3,8 @@ use crate::{ endpoint::Endpoint, - event::{self, EndpointPublisher}, + event::{self, EndpointPublisher, IntoEvent as _}, + inet::SocketAddress, io::{rx::Rx, tx::Tx}, task::cooldown::Cooldown, time::clock::{ClockWithTimer, Timer}, @@ -13,29 +14,36 @@ use core::pin::Pin; pub mod select; use select::Select; -pub struct EventLoop { +pub trait Stats { + fn publish(&mut self, publisher: &mut P); +} + +pub struct EventLoop { pub endpoint: E, pub clock: C, pub rx: R, pub tx: T, pub cooldown: Cooldown, + pub stats: S, } -impl EventLoop +impl EventLoop where E: Endpoint, C: ClockWithTimer, R: Rx, T: Tx, + S: Stats, { /// Starts running the endpoint event loop in an async task - pub async fn start(self) { + pub async fn start(self, local_addr: SocketAddress) { let Self { mut endpoint, clock, mut rx, mut tx, mut cooldown, + mut stats, } = self; /// Creates a event publisher with the endpoint's subscriber @@ -54,6 +62,12 @@ where }}; } + publisher!(clock.get_time()).on_platform_event_loop_started( + event::builder::PlatformEventLoopStarted { + local_address: local_addr.into_event(), + }, + ); + let mut timer = clock.timer(); loop { @@ -97,14 +111,18 @@ where // notify the application that we woke up and why let wakeup_timestamp = clock.get_time(); - publisher!(wakeup_timestamp).on_platform_event_loop_wakeup( - event::builder::PlatformEventLoopWakeup { + { + let mut publisher = publisher!(wakeup_timestamp); + + publisher.on_platform_event_loop_wakeup(event::builder::PlatformEventLoopWakeup { timeout_expired, rx_ready: rx_result.is_some(), tx_ready: tx_result.is_some(), application_wakeup, - }, - ); + }); + + stats.publish(&mut publisher); + } match rx_result { Some(Ok(())) => { diff --git a/quic/s2n-quic-events/events/platform.rs b/quic/s2n-quic-events/events/platform.rs index fb2a8de420..c6919c7f99 100644 --- a/quic/s2n-quic-events/events/platform.rs +++ b/quic/s2n-quic-events/events/platform.rs @@ -7,6 +7,20 @@ struct PlatformTx { /// The number of packets sent count: usize, + + /// The number of syscalls performed + syscalls: usize, + + /// The number of syscalls that got blocked + blocked_syscalls: usize, + + /// The total number of errors encountered since the last event + total_errors: usize, + + /// The number of specific error codes dropped + /// + /// This can happen when a burst of errors exceeds the capacity of the recorder + dropped_errors: usize, } #[event("platform:tx_error")] @@ -30,6 +44,20 @@ impl From for std::io::Error { struct PlatformRx { /// The number of packets received count: usize, + + /// The number of syscalls performed + syscalls: usize, + + /// The number of syscalls that got blocked + blocked_syscalls: usize, + + /// The total number of errors encountered since the last event + total_errors: usize, + + /// The number of specific error codes dropped + /// + /// This can happen when a burst of errors exceeds the capacity of the recorder + dropped_errors: usize, } #[event("platform:rx_error")] @@ -91,3 +119,10 @@ struct PlatformEventLoopSleep { /// The amount of time spent processing endpoint events in a single event loop processing_duration: core::time::Duration, } + +#[event("platform:started")] +#[subject(endpoint)] +struct PlatformEventLoopStarted<'a> { + /// The local address of the socket + local_address: SocketAddress<'a>, +} diff --git a/quic/s2n-quic-events/src/main.rs b/quic/s2n-quic-events/src/main.rs index ec14301b0e..5b2728d57e 100644 --- a/quic/s2n-quic-events/src/main.rs +++ b/quic/s2n-quic-events/src/main.rs @@ -311,6 +311,9 @@ struct Output { pub testing_fields: TokenStream, pub testing_fields_init: TokenStream, pub subscriber_testing: TokenStream, + pub endpoint_subscriber_testing: TokenStream, + pub endpoint_testing_fields: TokenStream, + pub endpoint_testing_fields_init: TokenStream, pub endpoint_publisher_testing: TokenStream, pub connection_publisher_testing: TokenStream, pub metrics_fields: TokenStream, @@ -337,6 +340,9 @@ impl ToTokens for Output { testing_fields, testing_fields_init, subscriber_testing, + endpoint_subscriber_testing, + endpoint_testing_fields, + endpoint_testing_fields_init, endpoint_publisher_testing, connection_publisher_testing, metrics_fields, @@ -747,6 +753,64 @@ impl ToTokens for Output { #imports #mutex + pub mod endpoint { + use super::*; + + pub struct Subscriber { + location: Option, + output: #testing_output_type, + #endpoint_testing_fields + } + + impl Drop for Subscriber { + fn drop(&mut self) { + // don't make any assertions if we're already failing the test + if std::thread::panicking() { + return; + } + + if let Some(location) = self.location.as_ref() { + location.snapshot_log(&self.output #lock); + } + } + } + + impl Subscriber { + /// Creates a subscriber with snapshot assertions enabled + #[track_caller] + pub fn snapshot() -> Self { + let mut sub = Self::no_snapshot(); + sub.location = Location::from_thread_name(); + sub + } + + /// Creates a subscriber with snapshot assertions enabled + #[track_caller] + pub fn named_snapshot(name: Name) -> Self { + let mut sub = Self::no_snapshot(); + sub.location = Some(Location::new(name)); + sub + } + + /// Creates a subscriber with snapshot assertions disabled + pub fn no_snapshot() -> Self { + Self { + location: None, + output: Default::default(), + #endpoint_testing_fields_init + } + } + } + + impl super::super::Subscriber for Subscriber { + type ConnectionContext = (); + + fn create_connection_context(&#mode self, _meta: &api::ConnectionMeta, _info: &api::ConnectionInfo) -> Self::ConnectionContext {} + + #endpoint_subscriber_testing + } + } + #[derive(Clone, Debug)] pub struct Subscriber { location: Option, diff --git a/quic/s2n-quic-events/src/parser.rs b/quic/s2n-quic-events/src/parser.rs index 469c502671..c965825aa4 100644 --- a/quic/s2n-quic-events/src/parser.rs +++ b/quic/s2n-quic-events/src/parser.rs @@ -218,12 +218,25 @@ impl Struct { } )); - output.subscriber_testing.extend(quote!( - #allow_deprecated - fn #function(&#receiver self, meta: &api::EndpointMeta, event: &api::#ident) { - self.#counter #counter_increment; - self.output #lock.push(format!("{meta:?} {event:?}")); - } + for subscriber in [ + &mut output.endpoint_subscriber_testing, + &mut output.subscriber_testing, + ] { + subscriber.extend(quote!( + #allow_deprecated + fn #function(&#receiver self, meta: &api::EndpointMeta, event: &api::#ident) { + self.#counter #counter_increment; + self.output #lock.push(format!("{meta:?} {event:?}")); + } + )); + } + + // add a counter for testing structs + output.endpoint_testing_fields.extend(quote!( + pub #counter: #counter_type, + )); + output.endpoint_testing_fields_init.extend(quote!( + #counter: #counter_init, )); output.endpoint_publisher_testing.extend(quote!( diff --git a/quic/s2n-quic-platform/src/io/testing.rs b/quic/s2n-quic-platform/src/io/testing.rs index 19ff1d7559..ddce97af4f 100644 --- a/quic/s2n-quic-platform/src/io/testing.rs +++ b/quic/s2n-quic-platform/src/io/testing.rs @@ -342,9 +342,15 @@ impl Io { let handle = address.unwrap_or_else(|| buffers.generate_addr()); + let (stats_sender, stats_recv) = crate::socket::stats::channel(); + let socket = buffers.register(handle, mtu_config.max_mtu()); - let tx = socket.tx_task(mtu_config.max_mtu(), queue_send_buffer_size); - let rx = socket.rx_task(mtu_config.max_mtu(), queue_recv_buffer_size); + let tx = socket.tx_task( + mtu_config.max_mtu(), + queue_send_buffer_size, + stats_sender.clone(), + ); + let rx = socket.rx_task(mtu_config.max_mtu(), queue_recv_buffer_size, stats_sender); if let Some(on_socket) = on_socket { on_socket(socket); @@ -358,8 +364,9 @@ impl Io { tx, rx, cooldown: Default::default(), + stats: stats_recv, }; - let join = executor.spawn(event_loop.start()); + let join = executor.spawn(event_loop.start(handle)); Ok((join, handle)) } } diff --git a/quic/s2n-quic-platform/src/io/testing/socket.rs b/quic/s2n-quic-platform/src/io/testing/socket.rs index 3c0143b480..d6976a8060 100644 --- a/quic/s2n-quic-platform/src/io/testing/socket.rs +++ b/quic/s2n-quic-platform/src/io/testing/socket.rs @@ -8,7 +8,7 @@ use super::{ use crate::{ features::Gso, socket::{ - ring, task, + ring, stats, task, task::{rx, tx}, }, syscall::SocketEvents, @@ -21,8 +21,12 @@ use s2n_quic_core::{ use std::{fmt, io, sync::Arc}; /// A task to receive on a socket -pub async fn rx(socket: Socket, producer: ring::Producer) -> io::Result<()> { - let result = task::Receiver::new(producer, socket, Default::default()).await; +pub async fn rx( + socket: Socket, + producer: ring::Producer, + stats: stats::Sender, +) -> io::Result<()> { + let result = task::Receiver::new(producer, socket, Default::default(), stats).await; if let Some(err) = result { Err(err) } else { @@ -31,8 +35,13 @@ pub async fn rx(socket: Socket, producer: ring::Producer) -> io::Result } /// A task to send on a socket -pub async fn tx(socket: Socket, consumer: ring::Consumer, gso: Gso) -> io::Result<()> { - let result = task::Sender::new(consumer, socket, gso, Default::default()).await; +pub async fn tx( + socket: Socket, + consumer: ring::Consumer, + gso: Gso, + stats: stats::Sender, +) -> io::Result<()> { + let result = task::Sender::new(consumer, socket, gso, Default::default(), stats).await; if let Some(err) = result { Err(err) } else { @@ -169,6 +178,7 @@ impl Socket { &self, max_mtu: MaxMtu, queue_recv_buffer_size: Option, + stats: stats::Sender, ) -> impl s2n_quic_core::io::rx::Rx { let payload_len = { let max_mtu: u16 = max_mtu.into(); @@ -190,7 +200,7 @@ impl Socket { consumers.push(consumer); // spawn a task that actually reads from the socket into the ring buffer - super::spawn(super::socket::rx(self.clone(), producer)); + super::spawn(super::socket::rx(self.clone(), producer, stats)); // construct the RX side for the endpoint event loop let max_mtu = MaxMtu::try_from(payload_len as u16).unwrap(); @@ -203,6 +213,7 @@ impl Socket { &self, max_mtu: MaxMtu, queue_send_buffer_size: Option, + stats: stats::Sender, ) -> impl s2n_quic_core::io::tx::Tx { let gso = crate::features::Gso::default(); gso.disable(); @@ -229,7 +240,12 @@ impl Socket { producers.push(producer); // spawn a task that actually flushes the ring buffer to the socket - super::spawn(super::socket::tx(self.clone(), consumer, gso.clone())); + super::spawn(super::socket::tx( + self.clone(), + consumer, + gso.clone(), + stats, + )); // construct the TX side for the endpoint event loop crate::socket::io::tx::Tx::new(producers, gso, max_mtu) @@ -256,11 +272,22 @@ impl tx::Socket for Socket { _cx: &mut Context, entries: &mut [Message], events: &mut tx::Events, + stats: &stats::Sender, ) -> io::Result<()> { - self.0.buffers.tx_host(self.0.host, |queue| { - let count = queue.send(entries); + let mut count = 0; + + let res = self.0.buffers.tx_host(self.0.host, |queue| { + count = queue.send(entries); events.on_complete(count); - }) + }); + + if count > 0 { + stats.send().on_operation_result(&res, |_| count); + } else { + stats.send().on_operation_pending(); + } + + res } } @@ -273,14 +300,25 @@ impl rx::Socket for Socket { cx: &mut Context, entries: &mut [Message], events: &mut rx::Events, + stats: &stats::Sender, ) -> io::Result<()> { - self.0.buffers.rx_host(self.0.host, |queue| { - let count = queue.recv(cx, entries); + let mut count = 0; + + let res = self.0.buffers.rx_host(self.0.host, |queue| { + count = queue.recv(cx, entries); if count > 0 { events.on_complete(count); } else { events.blocked() } - }) + }); + + if count > 0 { + stats.recv().on_operation_result(&res, |_| count); + } else { + stats.recv().on_operation_pending(); + } + + res } } diff --git a/quic/s2n-quic-platform/src/io/tokio.rs b/quic/s2n-quic-platform/src/io/tokio.rs index 758803d53b..a1fbf8de67 100644 --- a/quic/s2n-quic-platform/src/io/tokio.rs +++ b/quic/s2n-quic-platform/src/io/tokio.rs @@ -169,6 +169,8 @@ impl Io { }, }); + let (stats_sender, stats_recv) = crate::socket::stats::channel(); + let rx = { // if GRO is enabled, then we need to provide the syscall with the maximum size buffer let payload_len = if gro_enabled { @@ -202,11 +204,21 @@ impl Io { // spawn a task that actually reads from the socket into the ring buffer if idx + 1 == rx_socket_count { - handle.spawn(task::rx(rx_socket, producer, rx_cooldown)); + handle.spawn(task::rx( + rx_socket, + producer, + rx_cooldown, + stats_sender.clone(), + )); break; } else { let rx_socket = rx_socket.try_clone()?; - handle.spawn(task::rx(rx_socket, producer, rx_cooldown.clone())); + handle.spawn(task::rx( + rx_socket, + producer, + rx_cooldown.clone(), + stats_sender.clone(), + )); } } @@ -249,7 +261,13 @@ impl Io { // spawn a task that actually flushes the ring buffer to the socket if idx + 1 == tx_socket_count { - handle.spawn(task::tx(tx_socket, consumer, gso.clone(), tx_cooldown)); + handle.spawn(task::tx( + tx_socket, + consumer, + gso.clone(), + tx_cooldown, + stats_sender.clone(), + )); break; } else { let tx_socket = tx_socket.try_clone()?; @@ -258,6 +276,7 @@ impl Io { consumer, gso.clone(), tx_cooldown.clone(), + stats_sender.clone(), )); } } @@ -276,8 +295,9 @@ impl Io { rx, tx, cooldown: cooldown("ENDPOINT"), + stats: stats_recv, } - .start(), + .start(rx_addr.into()), ); drop(guard); diff --git a/quic/s2n-quic-platform/src/io/tokio/task.rs b/quic/s2n-quic-platform/src/io/tokio/task.rs index a8b37c0fe8..6384dc17ef 100644 --- a/quic/s2n-quic-platform/src/io/tokio/task.rs +++ b/quic/s2n-quic-platform/src/io/tokio/task.rs @@ -23,15 +23,20 @@ macro_rules! libc_msg { #[cfg($cfg)] mod $message { use super::unix; - use crate::{features::Gso, message::$message::Message, socket::ring}; + use crate::{ + features::Gso, + message::$message::Message, + socket::{ring, stats}, + }; use s2n_quic_core::task::cooldown::Cooldown; pub async fn rx>( socket: S, producer: ring::Producer, cooldown: Cooldown, + stats: stats::Sender, ) -> std::io::Result<()> { - unix::rx(socket, producer, cooldown).await + unix::rx(socket, producer, cooldown, stats).await } pub async fn tx>( @@ -39,8 +44,9 @@ macro_rules! libc_msg { consumer: ring::Consumer, gso: Gso, cooldown: Cooldown, + stats: stats::Sender, ) -> std::io::Result<()> { - unix::tx(socket, consumer, gso, cooldown).await + unix::tx(socket, consumer, gso, cooldown, stats).await } } }; diff --git a/quic/s2n-quic-platform/src/io/tokio/task/simple.rs b/quic/s2n-quic-platform/src/io/tokio/task/simple.rs index 90f0137cdb..989282ab9d 100644 --- a/quic/s2n-quic-platform/src/io/tokio/task/simple.rs +++ b/quic/s2n-quic-platform/src/io/tokio/task/simple.rs @@ -5,7 +5,7 @@ use crate::{ features::Gso, message::{simple::Message, Message as _}, socket::{ - ring, task, + ring, stats, task, task::{rx, tx}, }, syscall::SocketEvents, @@ -18,12 +18,13 @@ pub async fn rx>( socket: S, producer: ring::Producer, cooldown: Cooldown, + stats: stats::Sender, ) -> io::Result<()> { let socket = socket.into(); socket.set_nonblocking(true).unwrap(); let socket = UdpSocket::from_std(socket).unwrap(); - let result = task::Receiver::new(producer, socket, cooldown).await; + let result = task::Receiver::new(producer, socket, cooldown, stats).await; if let Some(err) = result { Err(err) } else { @@ -36,12 +37,13 @@ pub async fn tx>( consumer: ring::Consumer, gso: Gso, cooldown: Cooldown, + stats: stats::Sender, ) -> io::Result<()> { let socket = socket.into(); socket.set_nonblocking(true).unwrap(); let socket = UdpSocket::from_std(socket).unwrap(); - let result = task::Sender::new(consumer, socket, gso, cooldown).await; + let result = task::Sender::new(consumer, socket, gso, cooldown, stats).await; if let Some(err) = result { Err(err) } else { @@ -58,11 +60,15 @@ impl tx::Socket for UdpSocket { cx: &mut Context, entries: &mut [Message], events: &mut tx::Events, + stats: &stats::Sender, ) -> io::Result<()> { for entry in entries { let target = (*entry.remote_address()).into(); let payload = entry.payload_mut(); - match self.poll_send_to(cx, payload, target) { + + let res = self.poll_send_to(cx, payload, target); + stats.send().on_operation(&res, |_len| 1); + match res { Poll::Ready(Ok(_)) => { if events.on_complete(1).is_break() { return Ok(()); @@ -93,11 +99,15 @@ impl rx::Socket for UdpSocket { cx: &mut Context, entries: &mut [Message], events: &mut rx::Events, + stats: &stats::Sender, ) -> io::Result<()> { for entry in entries { let payload = entry.payload_mut(); let mut buf = io::ReadBuf::new(payload); - match self.poll_recv_from(cx, &mut buf) { + + let res = self.poll_recv_from(cx, &mut buf); + stats.recv().on_operation(&res, |_len| 1); + match res { Poll::Ready(Ok(addr)) => { unsafe { let len = buf.filled().len(); diff --git a/quic/s2n-quic-platform/src/io/tokio/task/unix.rs b/quic/s2n-quic-platform/src/io/tokio/task/unix.rs index 07fa08f739..f6c933ea2b 100644 --- a/quic/s2n-quic-platform/src/io/tokio/task/unix.rs +++ b/quic/s2n-quic-platform/src/io/tokio/task/unix.rs @@ -4,7 +4,7 @@ use crate::{ features::Gso, socket::{ - ring, + ring, stats, task::{rx, tx}, }, syscall::{SocketType, UnixMessage}, @@ -18,12 +18,13 @@ pub async fn rx, M: UnixMessage + Unpin>( socket: S, producer: ring::Producer, cooldown: Cooldown, + stats: stats::Sender, ) -> io::Result<()> { let socket = socket.into(); socket.set_nonblocking(true).unwrap(); let socket = AsyncFd::new(socket).unwrap(); - let result = rx::Receiver::new(producer, socket, cooldown).await; + let result = rx::Receiver::new(producer, socket, cooldown, stats).await; if let Some(err) = result { Err(err) } else { @@ -36,12 +37,13 @@ pub async fn tx, M: UnixMessage + Unpin>( consumer: ring::Consumer, gso: Gso, cooldown: Cooldown, + stats: stats::Sender, ) -> io::Result<()> { let socket = socket.into(); socket.set_nonblocking(true).unwrap(); let socket = AsyncFd::new(socket).unwrap(); - let result = tx::Sender::new(consumer, socket, gso, cooldown).await; + let result = tx::Sender::new(consumer, socket, gso, cooldown, stats).await; if let Some(err) = result { Err(err) } else { @@ -58,6 +60,7 @@ impl tx::Socket for AsyncFd { cx: &mut Context, entries: &mut [M], events: &mut tx::Events, + stats: &stats::Sender, ) -> io::Result<()> { // Call the syscall for the socket // @@ -65,7 +68,7 @@ impl tx::Socket for AsyncFd { // assume the socket is ready in the general case and then fall back to querying // socket readiness if it's not. This can avoid some things like having to construct // a `std::io::Error` with `WouldBlock` and dereferencing the registration. - M::send(self.get_ref().as_raw_fd(), entries, events); + M::send(self.get_ref().as_raw_fd(), entries, events, stats); // yield back if we weren't blocked if !events.is_blocked() { @@ -107,6 +110,7 @@ impl rx::Socket for AsyncFd { cx: &mut Context, entries: &mut [M], events: &mut rx::Events, + stats: &stats::Sender, ) -> io::Result<()> { // Call the syscall for the socket // @@ -119,6 +123,7 @@ impl rx::Socket for AsyncFd { SocketType::NonBlocking, entries, events, + stats, ); // yield back if we weren't blocked diff --git a/quic/s2n-quic-platform/src/io/turmoil.rs b/quic/s2n-quic-platform/src/io/turmoil.rs index 1cb99aa902..1c58a1a550 100644 --- a/quic/s2n-quic-platform/src/io/turmoil.rs +++ b/quic/s2n-quic-platform/src/io/turmoil.rs @@ -7,6 +7,7 @@ use crate::{ socket::{ io::{rx, tx}, ring::{self, Consumer, Producer}, + stats, }, }; use core::future::Future; @@ -110,9 +111,11 @@ impl Io { (tx, consumer) }; + let (stats_sender, stats_recv) = stats::channel(); + // Spawn a task that does the actual socket calls and coordinates with the event loop // through the ring buffers - tokio::spawn(run_io(socket, rx_producer, tx_consumer)); + tokio::spawn(run_io(socket, rx_producer, tx_consumer, stats_sender)); let event_loop = EventLoop { clock, @@ -120,8 +123,9 @@ impl Io { tx, endpoint, cooldown: Default::default(), + stats: stats_recv, } - .start(); + .start(local_addr); Ok((event_loop, local_addr)) } @@ -159,6 +163,7 @@ async fn run_io( socket: UdpSocket, mut producer: Producer, mut consumer: Consumer, + stats: stats::Sender, ) -> io::Result<()> { let mut poll_producer = false; @@ -195,7 +200,9 @@ async fn run_io( for entry in producer.data() { // Since UDP sockets are stateless, the only errors we should back is a WouldBlock. // If we get any errors, we'll try again later. - if let Ok((len, addr)) = socket.try_recv_from(entry.payload_mut()) { + let res = socket.try_recv_from(entry.payload_mut()); + stats.recv().on_operation_result(&res, |_len| 1); + if let Ok((len, addr)) = res { count += 1; // update the packet information entry.set_remote_address(&(addr.into())); @@ -220,9 +227,11 @@ async fn run_io( let addr = *entry.remote_address(); let addr: std::net::SocketAddr = addr.into(); let payload = entry.payload_mut(); + let res = socket.try_send_to(payload, addr); + stats.recv().on_operation_result(&res, |_len| 1); // Since UDP sockets are stateless, the only errors we should back is a WouldBlock. // If we get any errors, we'll try again later. - if socket.try_send_to(payload, addr).is_ok() { + if res.is_ok() { count += 1; } else { break; diff --git a/quic/s2n-quic-platform/src/io/xdp.rs b/quic/s2n-quic-platform/src/io/xdp.rs index cac439681f..7394b5eadf 100644 --- a/quic/s2n-quic-platform/src/io/xdp.rs +++ b/quic/s2n-quic-platform/src/io/xdp.rs @@ -1,7 +1,7 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -use crate::io::tokio::Clock; +use crate::{io::tokio::Clock, socket::stats}; use s2n_quic_core::{ endpoint::Endpoint, inet::SocketAddress, io::event_loop::EventLoop, path::mtu, }; @@ -20,6 +20,7 @@ pub mod encoder { // export socket types and helpers pub mod socket { + pub use crate::socket::stats; pub use s2n_quic_xdp::socket::*; /// Binds a UDP socket to a particular interface and socket address @@ -39,10 +40,12 @@ pub mod socket { } pub mod tx { + use crate::socket::stats; pub use s2n_quic_core::io::tx::*; pub fn channel( socket: ::std::net::UdpSocket, + stats: stats::Sender, ) -> ( impl Tx, impl core::future::Future>, @@ -77,7 +80,7 @@ pub mod tx { // spawn a task that actually flushes the ring buffer to the socket let cooldown = s2n_quic_core::task::cooldown::Cooldown::default(); - let task = crate::io::tokio::task::tx(socket, consumer, gso.clone(), cooldown); + let task = crate::io::tokio::task::tx(socket, consumer, gso.clone(), cooldown, stats); // construct the TX side for the endpoint event loop let io = crate::socket::io::tx::Tx::new(producers, gso, max_mtu); @@ -117,6 +120,7 @@ pub struct Provider { rx: Rx, tx: Tx, mtu_config_builder: mtu::Builder, + stats: stats::Receiver, handle: Option, } @@ -141,6 +145,7 @@ where rx, mtu_config_builder, handle, + stats, } = self; let mtu_config = mtu_config_builder @@ -160,15 +165,18 @@ where rx, tx, cooldown: crate::io::tokio::cooldown("ENDPOINT"), + stats, }; + let local_addr = SocketAddress::default(); + // spawn the event loop on to the tokio handle let task = if let Some(handle) = handle { - handle.spawn(event_loop.start()) + handle.spawn(event_loop.start(local_addr)) } else { - tokio::spawn(event_loop.start()) + tokio::spawn(event_loop.start(local_addr)) }; - Ok((task, SocketAddress::default())) + Ok((task, local_addr)) } } diff --git a/quic/s2n-quic-platform/src/io/xdp/builder.rs b/quic/s2n-quic-platform/src/io/xdp/builder.rs index bd0b4e110f..7e9308925e 100644 --- a/quic/s2n-quic-platform/src/io/xdp/builder.rs +++ b/quic/s2n-quic-platform/src/io/xdp/builder.rs @@ -1,6 +1,7 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +use crate::socket::stats; use core::mem::size_of; use s2n_quic_core::{ inet::{ethernet, ipv4, udp}, @@ -18,6 +19,7 @@ const MIN_FRAME_OVERHEAD: u16 = pub struct Builder { rx: Rx, tx: Tx, + stats: Option, mtu_config_builder: mtu::Builder, handle: Option, } @@ -27,6 +29,7 @@ impl Default for Builder<(), ()> { Self { rx: (), tx: (), + stats: None, mtu_config_builder: mtu::Config::builder() .with_max_mtu(DEFAULT_FRAME_SIZE as u16 - MIN_FRAME_OVERHEAD) .unwrap(), @@ -42,6 +45,11 @@ impl Builder { self } + pub fn with_stats(mut self, stats: stats::Receiver) -> Self { + self.stats = Some(stats); + self + } + /// Sets the UMEM frame size for the provider pub fn with_frame_size(mut self, frame_size: u16) -> Result { self.mtu_config_builder = self @@ -58,6 +66,7 @@ impl Builder { let Self { tx, handle, + stats, mtu_config_builder, .. } = self; @@ -65,6 +74,7 @@ impl Builder { rx, tx, handle, + stats, mtu_config_builder, } } @@ -77,6 +87,7 @@ impl Builder { let Self { rx, handle, + stats, mtu_config_builder, .. } = self; @@ -84,6 +95,7 @@ impl Builder { rx, tx, handle, + stats, mtu_config_builder, } } @@ -98,13 +110,21 @@ where let Self { rx, tx, + stats, handle, mtu_config_builder, } = self; + + let stats = stats.unwrap_or_else(|| { + let (_sender, receiver) = stats::channel(); + receiver + }); + super::Provider { rx, tx, handle, + stats, mtu_config_builder, } } diff --git a/quic/s2n-quic-platform/src/socket.rs b/quic/s2n-quic-platform/src/socket.rs index 1ced24bfef..fd83ba466c 100644 --- a/quic/s2n-quic-platform/src/socket.rs +++ b/quic/s2n-quic-platform/src/socket.rs @@ -4,4 +4,5 @@ pub mod io; pub mod options; pub mod ring; +pub mod stats; pub mod task; diff --git a/quic/s2n-quic-platform/src/socket/stats.rs b/quic/s2n-quic-platform/src/socket/stats.rs new file mode 100644 index 0000000000..67da63ddcc --- /dev/null +++ b/quic/s2n-quic-platform/src/socket/stats.rs @@ -0,0 +1,284 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use core::{ + fmt, + sync::atomic::{AtomicU64, Ordering}, + task::Poll, +}; +use s2n_quic_core::{ + event::{self, EndpointPublisher}, + io::event_loop, +}; +use std::{ + collections::VecDeque, + ffi::c_int, + io, + sync::{Arc, Mutex}, +}; + +const ERROR_QUEUE_CAP: usize = 256; +type Error = c_int; + +pub fn channel() -> (Sender, Receiver) { + let state = Arc::new(State::default()); + + let sender = Sender(state.clone()); + + let recv = Receiver { + state, + pending_errors: VecDeque::with_capacity(ERROR_QUEUE_CAP), + }; + + (sender, recv) +} + +#[derive(Clone)] +pub struct Sender(Arc); + +impl fmt::Debug for Sender { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Sender").finish_non_exhaustive() + } +} + +impl Sender { + #[inline] + pub fn send(&self) -> &Stats { + &self.0.send + } + + #[inline] + pub fn recv(&self) -> &Stats { + &self.0.recv + } +} + +pub struct Receiver { + state: Arc, + pending_errors: VecDeque, +} + +impl fmt::Debug for Receiver { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Receiver").finish_non_exhaustive() + } +} + +impl event_loop::Stats for Receiver { + #[inline] + fn publish(&mut self, publisher: &mut P) { + self.state.send.publish( + publisher, + &mut self.pending_errors, + |publisher, errno| { + publisher.on_platform_tx_error(event::builder::PlatformTxError { errno }); + }, + |publisher, metrics| publisher.on_platform_tx(metrics.into()), + ); + self.state.recv.publish( + publisher, + &mut self.pending_errors, + |publisher, errno| { + publisher.on_platform_rx_error(event::builder::PlatformRxError { errno }); + }, + |publisher, metrics| publisher.on_platform_rx(metrics.into()), + ); + } +} + +#[derive(Default)] +struct State { + send: Stats, + recv: Stats, +} + +pub struct Stats { + syscalls: AtomicU64, + blocked: AtomicU64, + packets: AtomicU64, + errors: Mutex>, + total_errors: AtomicU64, + dropped_errors: AtomicU64, +} + +impl Default for Stats { + fn default() -> Self { + Self { + syscalls: Default::default(), + blocked: Default::default(), + packets: Default::default(), + errors: Mutex::new(VecDeque::with_capacity(ERROR_QUEUE_CAP)), + total_errors: Default::default(), + dropped_errors: Default::default(), + } + } +} + +impl fmt::Debug for Stats { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Stats").finish_non_exhaustive() + } +} + +impl Stats { + #[inline] + pub fn on_operation(&self, res: &Poll>, count_packets: F) + where + F: FnOnce(&T) -> usize, + { + match res { + Poll::Ready(res) => { + self.on_operation_result(res, count_packets); + } + Poll::Pending => { + self.on_operation_pending(); + } + } + } + + #[inline] + pub fn on_operation_result(&self, res: &io::Result, count_packets: F) + where + F: FnOnce(&T) -> usize, + { + match res { + Ok(value) => { + let packets = count_packets(value); + self.on_operation_ready(packets); + } + Err(err) + if matches!( + err.kind(), + io::ErrorKind::WouldBlock | io::ErrorKind::Interrupted + ) => + { + self.on_operation_pending(); + } + Err(err) => { + self.on_operation_ready(0); + if let Some(err) = err.raw_os_error() { + self.on_error(err); + } else { + self.dropped_errors.fetch_add(1, Ordering::Relaxed); + } + } + } + } + + #[inline] + pub fn on_operation_ready(&self, packets: usize) { + if packets > 0 { + self.packets.fetch_add(packets as _, Ordering::Relaxed); + } + self.syscalls.fetch_add(1, Ordering::Relaxed); + } + + #[inline] + pub fn on_operation_pending(&self) { + self.syscalls.fetch_add(1, Ordering::Relaxed); + self.blocked.fetch_add(1, Ordering::Relaxed); + } + + #[inline] + pub fn on_error(&self, error: Error) { + self.total_errors.fetch_add(1, Ordering::Relaxed); + + let mut did_drop = false; + if let Ok(mut queue) = self.errors.try_lock() { + // drop old errors + if queue.len() == ERROR_QUEUE_CAP { + let _ = queue.pop_front(); + did_drop = true; + } + + queue.push_back(error); + } else { + did_drop = true; + }; + + if did_drop { + self.dropped_errors.fetch_add(1, Ordering::Relaxed); + } + } + + #[inline] + fn publish( + &self, + publisher: &mut P, + errors: &mut VecDeque, + on_error: OnError, + on_metrics: OnMetrics, + ) where + OnError: Fn(&mut P, Error), + OnMetrics: Fn(&mut P, Metrics), + { + core::mem::swap(&mut *self.errors.lock().unwrap(), errors); + + for error in errors.drain(..) { + on_error(publisher, error); + } + + let metrics = self.metrics(); + if metrics.syscalls > 0 { + on_metrics(publisher, metrics); + } + } + + #[inline] + fn metrics(&self) -> Metrics { + macro_rules! take { + ($field:ident) => {{ + let value = self.$field.swap(0, Ordering::Relaxed); + value.try_into().unwrap_or(usize::MAX) + }}; + } + + let packets = take!(packets); + let syscalls = take!(syscalls); + let blocked_syscalls = take!(blocked); + let total_errors = take!(total_errors); + let dropped_errors = take!(dropped_errors); + + Metrics { + packets, + syscalls, + blocked_syscalls, + total_errors, + dropped_errors, + } + } +} + +#[derive(Clone, Copy)] +struct Metrics { + packets: usize, + syscalls: usize, + blocked_syscalls: usize, + total_errors: usize, + dropped_errors: usize, +} + +impl From for event::builder::PlatformRx { + fn from(value: Metrics) -> Self { + Self { + count: value.packets, + syscalls: value.syscalls, + blocked_syscalls: value.blocked_syscalls, + total_errors: value.total_errors, + dropped_errors: value.dropped_errors, + } + } +} + +impl From for event::builder::PlatformTx { + fn from(value: Metrics) -> Self { + Self { + count: value.packets, + syscalls: value.syscalls, + blocked_syscalls: value.blocked_syscalls, + total_errors: value.total_errors, + dropped_errors: value.dropped_errors, + } + } +} diff --git a/quic/s2n-quic-platform/src/socket/std.rs b/quic/s2n-quic-platform/src/socket/std.rs deleted file mode 100644 index 6affa00330..0000000000 --- a/quic/s2n-quic-platform/src/socket/std.rs +++ /dev/null @@ -1,189 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 - -use crate::{ - buffer::Buffer, - message::{ - queue, - simple::{self, Message, Ring}, - Message as _, - }, -}; -use errno::errno; -use s2n_quic_core::{event, inet::SocketAddress, path::LocalAddress}; - -pub use simple::Handle; - -pub trait Socket { - type Error: Error; - - /// Receives a payload and returns the length and source address - fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, Option), Self::Error>; - - /// Sends a payload to the given address and returns the length of the sent payload - fn send_to(&self, buf: &[u8], addr: &SocketAddress) -> Result; -} - -#[cfg(feature = "std")] -impl Socket for std::net::UdpSocket { - type Error = std::io::Error; - - fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, Option), Self::Error> { - debug_assert!(!buf.is_empty()); - let (len, addr) = self.recv_from(buf)?; - Ok((len, Some(addr.into()))) - } - - fn send_to(&self, buf: &[u8], addr: &SocketAddress) -> Result { - debug_assert!(!buf.is_empty()); - let addr: std::net::SocketAddr = (*addr).into(); - self.send_to(buf, addr) - } -} - -pub trait Error { - fn would_block(&self) -> bool; - fn was_interrupted(&self) -> bool; - fn permission_denied(&self) -> bool; - fn connection_reset(&self) -> bool; -} - -#[cfg(feature = "std")] -impl Error for std::io::Error { - fn would_block(&self) -> bool { - self.kind() == std::io::ErrorKind::WouldBlock - } - - fn was_interrupted(&self) -> bool { - self.kind() == std::io::ErrorKind::Interrupted - } - - fn permission_denied(&self) -> bool { - self.kind() == std::io::ErrorKind::PermissionDenied - } - - fn connection_reset(&self) -> bool { - self.kind() == std::io::ErrorKind::ConnectionReset - } -} - -#[derive(Debug, Default)] -pub struct Queue(queue::Queue>); - -impl Queue { - pub fn new(buffer: B) -> Self { - let queue = queue::Queue::new(Ring::new(buffer, 1)); - - Self(queue) - } - - pub fn free_len(&self) -> usize { - self.0.free_len() - } - - pub fn occupied_len(&self) -> usize { - self.0.occupied_len() - } - - pub fn set_local_address(&mut self, local_address: LocalAddress) { - self.0.set_local_address(local_address) - } - - pub fn tx( - &mut self, - socket: &S, - publisher: &mut Publisher, - ) -> Result { - let mut count = 0; - let mut entries = self.0.occupied_mut(); - - for entry in entries.as_mut() { - let remote_address = *entry.remote_address(); - match socket.send_to(entry.payload_mut(), &remote_address) { - Ok(_) => { - count += 1; - - publisher.on_platform_tx(event::builder::PlatformTx { count: 1 }); - } - Err(err) if count > 0 && err.would_block() => { - break; - } - Err(err) if err.was_interrupted() || err.permission_denied() => { - break; - } - Err(err) => { - entries.finish(count); - - publisher - .on_platform_tx_error(event::builder::PlatformTxError { errno: errno().0 }); - - return Err(err); - } - } - } - - entries.finish(count); - - Ok(count) - } - - pub fn rx( - &mut self, - socket: &S, - publisher: &mut Publisher, - ) -> Result { - let mut count = 0; - let mut entries = self.0.free_mut(); - - while let Some(entry) = entries.get_mut(count) { - match socket.recv_from(entry.payload_mut()) { - Ok((payload_len, Some(remote_address))) => { - entry.set_remote_address(&remote_address); - unsafe { - // Safety: The payload_len should not be bigger than the number of - // allocated bytes. - - debug_assert!(payload_len < entry.payload_len()); - let payload_len = payload_len.min(entry.payload_len()); - - entry.set_payload_len(payload_len); - } - - count += 1; - - publisher.on_platform_rx(event::builder::PlatformRx { count: 1 }); - } - Ok((_payload_len, None)) => {} - Err(err) if count > 0 && err.would_block() => { - break; - } - Err(err) if err.was_interrupted() => { - break; - } - Err(err) if err.connection_reset() => { - count += 1; - } - Err(err) => { - entries.finish(count); - - publisher - .on_platform_rx_error(event::builder::PlatformRxError { errno: errno().0 }); - - return Err(err); - } - } - } - - entries.finish(count); - - Ok(count) - } - - pub fn rx_queue(&mut self) -> queue::Occupied { - self.0.occupied_mut() - } - - pub fn tx_queue(&mut self) -> queue::Free { - self.0.free_mut() - } -} diff --git a/quic/s2n-quic-platform/src/socket/task/rx.rs b/quic/s2n-quic-platform/src/socket/task/rx.rs index 1d25d06664..a1c1816b58 100644 --- a/quic/s2n-quic-platform/src/socket/task/rx.rs +++ b/quic/s2n-quic-platform/src/socket/task/rx.rs @@ -3,7 +3,7 @@ use crate::{ message::Message, - socket::{ring::Producer, task::events}, + socket::{ring::Producer, stats, task::events}, }; use core::{ future::Future, @@ -22,6 +22,7 @@ pub trait Socket { cx: &mut Context, entries: &mut [T], events: &mut Events, + stats: &stats::Sender, ) -> Result<(), Self::Error>; } @@ -31,6 +32,7 @@ pub struct Receiver> { rx: S, ring_cooldown: Cooldown, io_cooldown: Cooldown, + stats: stats::Sender, } impl Receiver @@ -39,12 +41,13 @@ where S: Socket + Unpin, { #[inline] - pub fn new(ring: Producer, rx: S, cooldown: Cooldown) -> Self { + pub fn new(ring: Producer, rx: S, cooldown: Cooldown, stats: stats::Sender) -> Self { Self { ring, rx, ring_cooldown: cooldown.clone(), io_cooldown: cooldown, + stats, } } @@ -108,7 +111,7 @@ where let entries = this.ring.data(); // perform the recv syscall - match this.rx.recv(cx, entries, &mut events) { + match this.rx.recv(cx, entries, &mut events, &this.stats) { Ok(_) => { // increment the number of received messages let count = events.take_count() as u32; diff --git a/quic/s2n-quic-platform/src/socket/task/tx.rs b/quic/s2n-quic-platform/src/socket/task/tx.rs index 94407b683e..b2cc649b58 100644 --- a/quic/s2n-quic-platform/src/socket/task/tx.rs +++ b/quic/s2n-quic-platform/src/socket/task/tx.rs @@ -4,7 +4,7 @@ use crate::{ features::Gso, message::Message, - socket::{ring::Consumer, task::events}, + socket::{ring::Consumer, stats, task::events}, }; use core::{ future::Future, @@ -23,6 +23,7 @@ pub trait Socket { cx: &mut Context, entries: &mut [T], events: &mut Events, + stats: &stats::Sender, ) -> Result<(), Self::Error>; } @@ -33,6 +34,7 @@ pub struct Sender> { events: Events, ring_cooldown: Cooldown, io_cooldown: Cooldown, + stats: stats::Sender, } impl Sender @@ -41,13 +43,20 @@ where S: Socket + Unpin, { #[inline] - pub fn new(ring: Consumer, tx: S, gso: Gso, cooldown: Cooldown) -> Self { + pub fn new( + ring: Consumer, + tx: S, + gso: Gso, + cooldown: Cooldown, + stats: stats::Sender, + ) -> Self { Self { ring, tx, events: Events::new(gso), ring_cooldown: cooldown.clone(), io_cooldown: cooldown, + stats, } } @@ -110,7 +119,7 @@ where let entries = this.ring.data(); // perform the send syscall - match this.tx.send(cx, entries, &mut this.events) { + match this.tx.send(cx, entries, &mut this.events, &this.stats) { Ok(_) => { // increment the number of received messages let count = this.events.take_count() as u32; diff --git a/quic/s2n-quic-platform/src/syscall.rs b/quic/s2n-quic-platform/src/syscall.rs index 2e1cededc3..346d736539 100644 --- a/quic/s2n-quic-platform/src/syscall.rs +++ b/quic/s2n-quic-platform/src/syscall.rs @@ -4,6 +4,7 @@ // some platforms contain empty implementations so disable any warnings from those #![allow(unused_variables, unused_macros, unused_mut, clippy::let_and_return)] +use crate::socket::stats; use core::ops::ControlFlow; use socket2::{Domain, Protocol, Socket, Type}; use std::io; @@ -50,12 +51,18 @@ pub trait SocketEvents { #[cfg(unix)] pub trait UnixMessage: crate::message::Message { - fn send(fd: std::os::unix::io::RawFd, entries: &mut [Self], events: &mut E); + fn send( + fd: std::os::unix::io::RawFd, + entries: &mut [Self], + events: &mut E, + stats: &stats::Sender, + ); fn recv( fd: std::os::unix::io::RawFd, ty: SocketType, entries: &mut [Self], events: &mut E, + stats: &stats::Sender, ); } diff --git a/quic/s2n-quic-platform/src/syscall/mmsg.rs b/quic/s2n-quic-platform/src/syscall/mmsg.rs index fdd5fced31..2621688b28 100644 --- a/quic/s2n-quic-platform/src/syscall/mmsg.rs +++ b/quic/s2n-quic-platform/src/syscall/mmsg.rs @@ -2,18 +2,30 @@ // SPDX-License-Identifier: Apache-2.0 use super::{SocketEvents, SocketType, UnixMessage}; +use crate::socket::stats; use libc::mmsghdr; use std::os::unix::io::{AsRawFd, RawFd}; impl UnixMessage for mmsghdr { #[inline] - fn send(fd: RawFd, entries: &mut [Self], events: &mut E) { - send(&fd, entries, events) + fn send( + fd: RawFd, + entries: &mut [Self], + events: &mut E, + stats: &stats::Sender, + ) { + send(&fd, entries, events, stats) } #[inline] - fn recv(fd: RawFd, ty: SocketType, entries: &mut [Self], events: &mut E) { - recv(&fd, ty, entries, events) + fn recv( + fd: RawFd, + ty: SocketType, + entries: &mut [Self], + events: &mut E, + stats: &stats::Sender, + ) { + recv(&fd, ty, entries, events, stats) } } @@ -22,6 +34,7 @@ pub fn send( socket: &Sock, packets: &mut [mmsghdr], events: &mut E, + stats: &stats::Sender, ) { if packets.is_empty() { return; @@ -72,6 +85,8 @@ pub fn send( let res = libc!(sendmmsg(sockfd, msgvec, vlen, flags)); + stats.send().on_operation_result(&res, |count| *count as _); + let _ = match res { Ok(count) => events.on_complete(count as _), Err(error) => events.on_error(error), @@ -84,6 +99,7 @@ pub fn recv( socket_type: SocketType, packets: &mut [mmsghdr], events: &mut E, + stats: &stats::Sender, ) { if packets.is_empty() { return; @@ -144,6 +160,8 @@ pub fn recv( let res = libc!(recvmmsg(sockfd, msgvec, vlen, flags, timeout)); + stats.recv().on_operation_result(&res, |count| *count as _); + let _ = match res { Ok(count) => events.on_complete(count as _), Err(error) => events.on_error(error), diff --git a/quic/s2n-quic-platform/src/syscall/msg.rs b/quic/s2n-quic-platform/src/syscall/msg.rs index 2ccc058974..0158634cd5 100644 --- a/quic/s2n-quic-platform/src/syscall/msg.rs +++ b/quic/s2n-quic-platform/src/syscall/msg.rs @@ -2,19 +2,30 @@ // SPDX-License-Identifier: Apache-2.0 use super::{SocketEvents, SocketType, UnixMessage}; -use crate::message::Message as _; +use crate::{message::Message as _, socket::stats}; use libc::msghdr; use std::os::unix::io::{AsRawFd, RawFd}; impl UnixMessage for msghdr { #[inline] - fn send(fd: RawFd, entries: &mut [Self], events: &mut E) { - send(&fd, entries, events) + fn send( + fd: RawFd, + entries: &mut [Self], + events: &mut E, + stats: &stats::Sender, + ) { + send(&fd, entries, events, stats) } #[inline] - fn recv(fd: RawFd, ty: SocketType, entries: &mut [Self], events: &mut E) { - recv(&fd, ty, entries, events) + fn recv( + fd: RawFd, + ty: SocketType, + entries: &mut [Self], + events: &mut E, + stats: &stats::Sender, + ) { + recv(&fd, ty, entries, events, stats) } } @@ -23,6 +34,7 @@ pub fn send<'a, Sock: AsRawFd, P: IntoIterator, E: Socket socket: &Sock, packets: P, events: &mut E, + stats: &stats::Sender, ) { for packet in packets { #[cfg(debug_assertions)] @@ -81,6 +93,8 @@ pub fn send<'a, Sock: AsRawFd, P: IntoIterator, E: Socket ); } + stats.send().on_operation_result(&result, |_len| 1); + let cf = match result { Ok(_) => events.on_complete(1), Err(err) => events.on_error(err), @@ -98,6 +112,7 @@ pub fn recv<'a, Sock: AsRawFd, P: IntoIterator, E: Socket socket_type: SocketType, packets: P, events: &mut E, + stats: &stats::Sender, ) { let mut flags = match socket_type { SocketType::Blocking => Default::default(), @@ -149,6 +164,8 @@ pub fn recv<'a, Sock: AsRawFd, P: IntoIterator, E: Socket ); } + stats.recv().on_operation_result(&result, |_len| 1); + let cf = match result { Ok(payload_len) => { // update the message based on the return size of the syscall diff --git a/quic/s2n-quic-qns/src/xdp.rs b/quic/s2n-quic-qns/src/xdp.rs index 3407f18cc2..56c7ac2fed 100644 --- a/quic/s2n-quic-qns/src/xdp.rs +++ b/quic/s2n-quic-qns/src/xdp.rs @@ -347,13 +347,15 @@ impl Xdp { self.bpf_task(addr.port(), rx_fds)?; + let (stats_sender, stats_recv) = socket::stats::channel(); + let io_rx = xdp_io::rx::Rx::new(rx, umem.clone()); let io_tx = { let tx = xdp_io::tx::Tx::new(tx, umem, self.tx_encoder()); let udp_tx = { - let (udp_tx, udp_task) = tx::channel(udp_socket); + let (udp_tx, udp_task) = tx::channel(udp_socket, stats_sender); tokio::spawn(udp_task); @@ -372,6 +374,7 @@ impl Xdp { .with_rx(io_rx) .with_tx(io_tx) .with_frame_size(self.frame_size as _)? + .with_stats(stats_recv) .build(); if let Ok(udp_socket) = recv_udp_socket { diff --git a/quic/s2n-quic/src/tests.rs b/quic/s2n-quic/src/tests.rs index c9823b86f1..e4cb361238 100644 --- a/quic/s2n-quic/src/tests.rs +++ b/quic/s2n-quic/src/tests.rs @@ -35,6 +35,7 @@ mod handshake_cid_rotation; mod interceptor; mod mtu; mod no_tls; +mod platform_events; mod pto; mod self_test; mod skip_packets; diff --git a/quic/s2n-quic/src/tests/platform_events.rs b/quic/s2n-quic/src/tests/platform_events.rs new file mode 100644 index 0000000000..6c5b979810 --- /dev/null +++ b/quic/s2n-quic/src/tests/platform_events.rs @@ -0,0 +1,37 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::*; +use s2n_quic_core::event::testing::endpoint; + +#[test] +fn platform_events() { + let model = Model::default(); + + test(model, |handle| { + let server = Server::builder() + .with_io(handle.builder().build()?)? + .with_tls((certificates::CERT_PEM, certificates::KEY_PEM))? + .with_event(( + tracing_events(), + endpoint::Subscriber::named_snapshot("platform_events__server"), + ))? + .with_random(Random::with_seed(456))? + .start()?; + + let client = Client::builder() + .with_io(handle.builder().build()?)? + .with_tls(certificates::CERT_PEM)? + .with_event(( + tracing_events(), + endpoint::Subscriber::named_snapshot("platform_events__client"), + ))? + .with_random(Random::with_seed(456))? + .start()?; + + let addr = start_server(server)?; + start_client(client, addr, Data::new(1000))?; + Ok(addr) + }) + .unwrap(); +} diff --git a/quic/s2n-quic/src/tests/snapshots/platform_events__client.snap b/quic/s2n-quic/src/tests/snapshots/platform_events__client.snap new file mode 100644 index 0000000000..0fccbe243e --- /dev/null +++ b/quic/s2n-quic/src/tests/snapshots/platform_events__client.snap @@ -0,0 +1,27 @@ +--- +source: quic/s2n-quic-core/src/event/snapshot.rs +input_file: quic/s2n-quic/src/tests/platform_events.rs +--- +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.000001)) } PlatformEventLoopStarted { local_address: 1.0.0.1:49153 } +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.000001)) } PlatformEventLoopWakeup { timeout_expired: false, rx_ready: false, tx_ready: true, application_wakeup: false } +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.000001)) } PlatformRx { count: 0, syscalls: 1, blocked_syscalls: 1, total_errors: 0, dropped_errors: 0 } +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.000001)) } PlatformEventLoopSleep { timeout: None, processing_duration: 1µs } +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.000001)) } PlatformEventLoopWakeup { timeout_expired: false, rx_ready: false, tx_ready: false, application_wakeup: true } +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.000001)) } PlatformEventLoopSleep { timeout: Some(999ms), processing_duration: 1µs } +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.100000)) } PlatformEventLoopWakeup { timeout_expired: false, rx_ready: true, tx_ready: false, application_wakeup: false } +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.100000)) } PlatformTx { count: 1, syscalls: 1, blocked_syscalls: 0, total_errors: 0, dropped_errors: 0 } +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.100000)) } PlatformRx { count: 1, syscalls: 2, blocked_syscalls: 1, total_errors: 0, dropped_errors: 0 } +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.100000)) } PlatformEventLoopSleep { timeout: Some(299.995ms), processing_duration: 1µs } +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.100000)) } PlatformEventLoopWakeup { timeout_expired: false, rx_ready: false, tx_ready: false, application_wakeup: true } +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.100000)) } PlatformTx { count: 1, syscalls: 1, blocked_syscalls: 0, total_errors: 0, dropped_errors: 0 } +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.100000)) } PlatformEventLoopSleep { timeout: Some(299.995ms), processing_duration: 1µs } +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.200000)) } PlatformEventLoopWakeup { timeout_expired: false, rx_ready: true, tx_ready: false, application_wakeup: false } +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.200000)) } PlatformTx { count: 1, syscalls: 1, blocked_syscalls: 0, total_errors: 0, dropped_errors: 0 } +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.200000)) } PlatformRx { count: 3, syscalls: 2, blocked_syscalls: 1, total_errors: 0, dropped_errors: 0 } +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.200000)) } PlatformEventLoopSleep { timeout: Some(274.995ms), processing_duration: 1µs } +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.200000)) } PlatformEventLoopWakeup { timeout_expired: false, rx_ready: false, tx_ready: false, application_wakeup: true } +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.200000)) } PlatformTx { count: 2, syscalls: 1, blocked_syscalls: 0, total_errors: 0, dropped_errors: 0 } +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.200000)) } PlatformEventLoopSleep { timeout: Some(274.995ms), processing_duration: 1µs } +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.200000)) } PlatformEventLoopWakeup { timeout_expired: false, rx_ready: true, tx_ready: false, application_wakeup: false } +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.200000)) } PlatformTx { count: 0, syscalls: 1, blocked_syscalls: 1, total_errors: 0, dropped_errors: 0 } +EndpointMeta { endpoint_type: Client, timestamp: Timestamp(Timestamp(0:00:00.200000)) } PlatformRx { count: 0, syscalls: 1, blocked_syscalls: 1, total_errors: 0, dropped_errors: 0 } diff --git a/quic/s2n-quic/src/tests/snapshots/platform_events__server.snap b/quic/s2n-quic/src/tests/snapshots/platform_events__server.snap new file mode 100644 index 0000000000..2d2427ad0d --- /dev/null +++ b/quic/s2n-quic/src/tests/snapshots/platform_events__server.snap @@ -0,0 +1,25 @@ +--- +source: quic/s2n-quic-core/src/event/snapshot.rs +input_file: quic/s2n-quic/src/tests/platform_events.rs +--- +EndpointMeta { endpoint_type: Server, timestamp: Timestamp(Timestamp(0:00:00.000001)) } PlatformEventLoopStarted { local_address: 1.0.0.0:49152 } +EndpointMeta { endpoint_type: Server, timestamp: Timestamp(Timestamp(0:00:00.000001)) } PlatformEventLoopWakeup { timeout_expired: false, rx_ready: false, tx_ready: true, application_wakeup: false } +EndpointMeta { endpoint_type: Server, timestamp: Timestamp(Timestamp(0:00:00.000001)) } PlatformRx { count: 0, syscalls: 1, blocked_syscalls: 1, total_errors: 0, dropped_errors: 0 } +EndpointMeta { endpoint_type: Server, timestamp: Timestamp(Timestamp(0:00:00.000001)) } PlatformEventLoopSleep { timeout: None, processing_duration: 1µs } +EndpointMeta { endpoint_type: Server, timestamp: Timestamp(Timestamp(0:00:00.050000)) } PlatformEventLoopWakeup { timeout_expired: false, rx_ready: true, tx_ready: false, application_wakeup: false } +EndpointMeta { endpoint_type: Server, timestamp: Timestamp(Timestamp(0:00:00.050000)) } PlatformRx { count: 1, syscalls: 2, blocked_syscalls: 1, total_errors: 0, dropped_errors: 0 } +EndpointMeta { endpoint_type: Server, timestamp: Timestamp(Timestamp(0:00:00.050000)) } VersionInformation { server_versions: [1], client_versions: [], chosen_version: Some(1) } +EndpointMeta { endpoint_type: Server, timestamp: Timestamp(Timestamp(0:00:00.050000)) } PlatformEventLoopSleep { timeout: Some(999ms), processing_duration: 1µs } +EndpointMeta { endpoint_type: Server, timestamp: Timestamp(Timestamp(0:00:00.150000)) } PlatformEventLoopWakeup { timeout_expired: false, rx_ready: true, tx_ready: false, application_wakeup: false } +EndpointMeta { endpoint_type: Server, timestamp: Timestamp(Timestamp(0:00:00.150000)) } PlatformTx { count: 1, syscalls: 1, blocked_syscalls: 0, total_errors: 0, dropped_errors: 0 } +EndpointMeta { endpoint_type: Server, timestamp: Timestamp(Timestamp(0:00:00.150000)) } PlatformRx { count: 2, syscalls: 2, blocked_syscalls: 1, total_errors: 0, dropped_errors: 0 } +EndpointMeta { endpoint_type: Server, timestamp: Timestamp(Timestamp(0:00:00.150000)) } VersionInformation { server_versions: [1], client_versions: [], chosen_version: Some(1) } +EndpointMeta { endpoint_type: Server, timestamp: Timestamp(Timestamp(0:00:00.150000)) } PlatformEventLoopSleep { timeout: Some(325ms), processing_duration: 1µs } +EndpointMeta { endpoint_type: Server, timestamp: Timestamp(Timestamp(0:00:00.150000)) } PlatformEventLoopWakeup { timeout_expired: false, rx_ready: false, tx_ready: false, application_wakeup: true } +EndpointMeta { endpoint_type: Server, timestamp: Timestamp(Timestamp(0:00:00.150000)) } PlatformTx { count: 2, syscalls: 1, blocked_syscalls: 0, total_errors: 0, dropped_errors: 0 } +EndpointMeta { endpoint_type: Server, timestamp: Timestamp(Timestamp(0:00:00.150000)) } PlatformEventLoopSleep { timeout: Some(325ms), processing_duration: 1µs } +EndpointMeta { endpoint_type: Server, timestamp: Timestamp(Timestamp(0:00:00.200000)) } PlatformEventLoopWakeup { timeout_expired: true, rx_ready: false, tx_ready: false, application_wakeup: false } +EndpointMeta { endpoint_type: Server, timestamp: Timestamp(Timestamp(0:00:00.200000)) } PlatformTx { count: 1, syscalls: 1, blocked_syscalls: 0, total_errors: 0, dropped_errors: 0 } +EndpointMeta { endpoint_type: Server, timestamp: Timestamp(Timestamp(0:00:00.200000)) } PlatformEventLoopSleep { timeout: Some(275ms), processing_duration: 1µs } +EndpointMeta { endpoint_type: Server, timestamp: Timestamp(Timestamp(0:00:00.200000)) } PlatformEventLoopWakeup { timeout_expired: false, rx_ready: true, tx_ready: false, application_wakeup: false } +EndpointMeta { endpoint_type: Server, timestamp: Timestamp(Timestamp(0:00:00.200000)) } PlatformRx { count: 0, syscalls: 1, blocked_syscalls: 1, total_errors: 0, dropped_errors: 0 }