From f9ea87d7026ab24038a59f31e81ee61fb008c9dc Mon Sep 17 00:00:00 2001 From: Cameron Bytheway Date: Mon, 2 Dec 2024 17:28:36 -0700 Subject: [PATCH] feat(s2n-quic-dc): emit basic stream events --- dc/s2n-quic-dc/events/connection.rs | 34 +- dc/s2n-quic-dc/src/event/generated.rs | 404 ++++++++-- dc/s2n-quic-dc/src/event/generated/metrics.rs | 66 +- .../src/event/generated/metrics/aggregate.rs | 695 ++++++++++-------- .../src/event/generated/metrics/probe.rs | 211 +++--- dc/s2n-quic-dc/src/stream/recv/application.rs | 43 +- dc/s2n-quic-dc/src/stream/send/application.rs | 48 +- dc/s2n-quic-dc/src/stream/shared.rs | 17 +- 8 files changed, 1017 insertions(+), 501 deletions(-) diff --git a/dc/s2n-quic-dc/events/connection.rs b/dc/s2n-quic-dc/events/connection.rs index b64624c86..26cf70889 100644 --- a/dc/s2n-quic-dc/events/connection.rs +++ b/dc/s2n-quic-dc/events/connection.rs @@ -1,8 +1,8 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -#[event("application:write")] -pub struct ApplicationWrite { +#[event("stream:write")] +pub struct StreamWrite { /// The number of bytes that the application tried to write #[measure("provided", Bytes)] total_len: usize, @@ -11,10 +11,25 @@ pub struct ApplicationWrite { #[measure("committed", Bytes)] #[counter("committed.total", Bytes)] write_len: usize, + + /// The operation was ready to be performed + #[bool_counter("ready")] + ready: bool, +} + +#[event("stream:write_shutdown")] +pub struct StreamWriteShutdown { + /// The number of bytes in the send buffer at the time of shutdown + #[measure("buffer_len", Bytes)] + buffer_len: usize, + + /// If the stream required a background task to drive the stream shutdown + #[bool_counter("background")] + background: bool, } -#[event("application:read")] -pub struct ApplicationRead { +#[event("stream:read")] +pub struct StreamRead { /// The number of bytes that the application tried to read #[measure("capacity", Bytes)] capacity: usize, @@ -23,4 +38,15 @@ pub struct ApplicationRead { #[measure("committed", Bytes)] #[counter("committed.total", Bytes)] read_len: usize, + + /// The operation was ready to be performed + #[bool_counter("ready")] + ready: bool, +} + +#[event("stream:read_shutdown")] +pub struct StreamReadShutdown { + /// If the stream required a background task to drive the stream shutdown + #[bool_counter("background")] + background: bool, } diff --git a/dc/s2n-quic-dc/src/event/generated.rs b/dc/s2n-quic-dc/src/event/generated.rs index d64c8cebd..55e33db0d 100644 --- a/dc/s2n-quic-dc/src/event/generated.rs +++ b/dc/s2n-quic-dc/src/event/generated.rs @@ -616,43 +616,86 @@ pub mod api { } #[derive(Clone, Debug)] #[non_exhaustive] - pub struct ApplicationWrite { + pub struct StreamWrite { #[doc = " The number of bytes that the application tried to write"] pub total_len: usize, #[doc = " The amount that was written"] pub write_len: usize, + #[doc = " The operation was ready to be performed"] + pub ready: bool, } #[cfg(any(test, feature = "testing"))] - impl crate::event::snapshot::Fmt for ApplicationWrite { + impl crate::event::snapshot::Fmt for StreamWrite { fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result { - let mut fmt = fmt.debug_struct("ApplicationWrite"); + let mut fmt = fmt.debug_struct("StreamWrite"); fmt.field("total_len", &self.total_len); fmt.field("write_len", &self.write_len); + fmt.field("ready", &self.ready); fmt.finish() } } - impl Event for ApplicationWrite { - const NAME: &'static str = "application:write"; + impl Event for StreamWrite { + const NAME: &'static str = "stream:write"; } #[derive(Clone, Debug)] #[non_exhaustive] - pub struct ApplicationRead { + pub struct StreamWriteShutdown { + #[doc = " The number of bytes in the send buffer at the time of shutdown"] + pub buffer_len: usize, + #[doc = " If the stream required a background task to drive the stream shutdown"] + pub background: bool, + } + #[cfg(any(test, feature = "testing"))] + impl crate::event::snapshot::Fmt for StreamWriteShutdown { + fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result { + let mut fmt = fmt.debug_struct("StreamWriteShutdown"); + fmt.field("buffer_len", &self.buffer_len); + fmt.field("background", &self.background); + fmt.finish() + } + } + impl Event for StreamWriteShutdown { + const NAME: &'static str = "stream:write_shutdown"; + } + #[derive(Clone, Debug)] + #[non_exhaustive] + pub struct StreamRead { #[doc = " The number of bytes that the application tried to read"] pub capacity: usize, #[doc = " The amount that was read"] pub read_len: usize, + #[doc = " The operation was ready to be performed"] + pub ready: bool, } #[cfg(any(test, feature = "testing"))] - impl crate::event::snapshot::Fmt for ApplicationRead { + impl crate::event::snapshot::Fmt for StreamRead { fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result { - let mut fmt = fmt.debug_struct("ApplicationRead"); + let mut fmt = fmt.debug_struct("StreamRead"); fmt.field("capacity", &self.capacity); fmt.field("read_len", &self.read_len); + fmt.field("ready", &self.ready); fmt.finish() } } - impl Event for ApplicationRead { - const NAME: &'static str = "application:read"; + impl Event for StreamRead { + const NAME: &'static str = "stream:read"; + } + #[derive(Clone, Debug)] + #[non_exhaustive] + pub struct StreamReadShutdown { + #[doc = " If the stream required a background task to drive the stream shutdown"] + pub background: bool, + } + #[cfg(any(test, feature = "testing"))] + impl crate::event::snapshot::Fmt for StreamReadShutdown { + fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result { + let mut fmt = fmt.debug_struct("StreamReadShutdown"); + fmt.field("background", &self.background); + fmt.finish() + } + } + impl Event for StreamReadShutdown { + const NAME: &'static str = "stream:read_shutdown"; } #[derive(Clone, Debug)] #[non_exhaustive] @@ -1542,29 +1585,59 @@ pub mod tracing { tracing :: event ! (target : "acceptor_stream_dequeued" , parent : parent , tracing :: Level :: DEBUG , remote_address = tracing :: field :: debug (remote_address) , credential_id = tracing :: field :: debug (credential_id) , stream_id = tracing :: field :: debug (stream_id) , sojourn_time = tracing :: field :: debug (sojourn_time)); } #[inline] - fn on_application_write( + fn on_stream_write( &self, context: &Self::ConnectionContext, _meta: &api::ConnectionMeta, - event: &api::ApplicationWrite, + event: &api::StreamWrite, ) { let id = context.id(); - let api::ApplicationWrite { + let api::StreamWrite { total_len, write_len, + ready, + } = event; + tracing :: event ! (target : "stream_write" , parent : id , tracing :: Level :: DEBUG , total_len = tracing :: field :: debug (total_len) , write_len = tracing :: field :: debug (write_len) , ready = tracing :: field :: debug (ready)); + } + #[inline] + fn on_stream_write_shutdown( + &self, + context: &Self::ConnectionContext, + _meta: &api::ConnectionMeta, + event: &api::StreamWriteShutdown, + ) { + let id = context.id(); + let api::StreamWriteShutdown { + buffer_len, + background, + } = event; + tracing :: event ! (target : "stream_write_shutdown" , parent : id , tracing :: Level :: DEBUG , buffer_len = tracing :: field :: debug (buffer_len) , background = tracing :: field :: debug (background)); + } + #[inline] + fn on_stream_read( + &self, + context: &Self::ConnectionContext, + _meta: &api::ConnectionMeta, + event: &api::StreamRead, + ) { + let id = context.id(); + let api::StreamRead { + capacity, + read_len, + ready, } = event; - tracing :: event ! (target : "application_write" , parent : id , tracing :: Level :: DEBUG , total_len = tracing :: field :: debug (total_len) , write_len = tracing :: field :: debug (write_len)); + tracing :: event ! (target : "stream_read" , parent : id , tracing :: Level :: DEBUG , capacity = tracing :: field :: debug (capacity) , read_len = tracing :: field :: debug (read_len) , ready = tracing :: field :: debug (ready)); } #[inline] - fn on_application_read( + fn on_stream_read_shutdown( &self, context: &Self::ConnectionContext, _meta: &api::ConnectionMeta, - event: &api::ApplicationRead, + event: &api::StreamReadShutdown, ) { let id = context.id(); - let api::ApplicationRead { capacity, read_len } = event; - tracing :: event ! (target : "application_read" , parent : id , tracing :: Level :: DEBUG , capacity = tracing :: field :: debug (capacity) , read_len = tracing :: field :: debug (read_len)); + let api::StreamReadShutdown { background } = event; + tracing :: event ! (target : "stream_read_shutdown" , parent : id , tracing :: Level :: DEBUG , background = tracing :: field :: debug (background)); } #[inline] fn on_endpoint_initialized( @@ -2497,39 +2570,84 @@ pub mod builder { } } #[derive(Clone, Debug)] - pub struct ApplicationWrite { + pub struct StreamWrite { #[doc = " The number of bytes that the application tried to write"] pub total_len: usize, #[doc = " The amount that was written"] pub write_len: usize, + #[doc = " The operation was ready to be performed"] + pub ready: bool, } - impl IntoEvent for ApplicationWrite { + impl IntoEvent for StreamWrite { #[inline] - fn into_event(self) -> api::ApplicationWrite { - let ApplicationWrite { + fn into_event(self) -> api::StreamWrite { + let StreamWrite { total_len, write_len, + ready, } = self; - api::ApplicationWrite { + api::StreamWrite { total_len: total_len.into_event(), write_len: write_len.into_event(), + ready: ready.into_event(), } } } #[derive(Clone, Debug)] - pub struct ApplicationRead { + pub struct StreamWriteShutdown { + #[doc = " The number of bytes in the send buffer at the time of shutdown"] + pub buffer_len: usize, + #[doc = " If the stream required a background task to drive the stream shutdown"] + pub background: bool, + } + impl IntoEvent for StreamWriteShutdown { + #[inline] + fn into_event(self) -> api::StreamWriteShutdown { + let StreamWriteShutdown { + buffer_len, + background, + } = self; + api::StreamWriteShutdown { + buffer_len: buffer_len.into_event(), + background: background.into_event(), + } + } + } + #[derive(Clone, Debug)] + pub struct StreamRead { #[doc = " The number of bytes that the application tried to read"] pub capacity: usize, #[doc = " The amount that was read"] pub read_len: usize, + #[doc = " The operation was ready to be performed"] + pub ready: bool, } - impl IntoEvent for ApplicationRead { + impl IntoEvent for StreamRead { #[inline] - fn into_event(self) -> api::ApplicationRead { - let ApplicationRead { capacity, read_len } = self; - api::ApplicationRead { + fn into_event(self) -> api::StreamRead { + let StreamRead { + capacity, + read_len, + ready, + } = self; + api::StreamRead { capacity: capacity.into_event(), read_len: read_len.into_event(), + ready: ready.into_event(), + } + } + } + #[derive(Clone, Debug)] + pub struct StreamReadShutdown { + #[doc = " If the stream required a background task to drive the stream shutdown"] + pub background: bool, + } + impl IntoEvent for StreamReadShutdown { + #[inline] + fn into_event(self) -> api::StreamReadShutdown { + let StreamReadShutdown { background } = self; + api::StreamReadShutdown { + background: background.into_event(), } } } @@ -3370,25 +3488,49 @@ mod traits { let _ = meta; let _ = event; } - #[doc = "Called when the `ApplicationWrite` event is triggered"] + #[doc = "Called when the `StreamWrite` event is triggered"] + #[inline] + fn on_stream_write( + &self, + context: &Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::StreamWrite, + ) { + let _ = context; + let _ = meta; + let _ = event; + } + #[doc = "Called when the `StreamWriteShutdown` event is triggered"] + #[inline] + fn on_stream_write_shutdown( + &self, + context: &Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::StreamWriteShutdown, + ) { + let _ = context; + let _ = meta; + let _ = event; + } + #[doc = "Called when the `StreamRead` event is triggered"] #[inline] - fn on_application_write( + fn on_stream_read( &self, context: &Self::ConnectionContext, meta: &api::ConnectionMeta, - event: &api::ApplicationWrite, + event: &api::StreamRead, ) { let _ = context; let _ = meta; let _ = event; } - #[doc = "Called when the `ApplicationRead` event is triggered"] + #[doc = "Called when the `StreamReadShutdown` event is triggered"] #[inline] - fn on_application_read( + fn on_stream_read_shutdown( &self, context: &Self::ConnectionContext, meta: &api::ConnectionMeta, - event: &api::ApplicationRead, + event: &api::StreamReadShutdown, ) { let _ = context; let _ = meta; @@ -3854,22 +3996,40 @@ mod traits { self.as_ref().on_acceptor_stream_dequeued(meta, event); } #[inline] - fn on_application_write( + fn on_stream_write( &self, context: &Self::ConnectionContext, meta: &api::ConnectionMeta, - event: &api::ApplicationWrite, + event: &api::StreamWrite, ) { - self.as_ref().on_application_write(context, meta, event); + self.as_ref().on_stream_write(context, meta, event); } #[inline] - fn on_application_read( + fn on_stream_write_shutdown( &self, context: &Self::ConnectionContext, meta: &api::ConnectionMeta, - event: &api::ApplicationRead, + event: &api::StreamWriteShutdown, ) { - self.as_ref().on_application_read(context, meta, event); + self.as_ref().on_stream_write_shutdown(context, meta, event); + } + #[inline] + fn on_stream_read( + &self, + context: &Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::StreamRead, + ) { + self.as_ref().on_stream_read(context, meta, event); + } + #[inline] + fn on_stream_read_shutdown( + &self, + context: &Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::StreamReadShutdown, + ) { + self.as_ref().on_stream_read_shutdown(context, meta, event); } #[inline] fn on_endpoint_initialized( @@ -4298,24 +4458,44 @@ mod traits { (self.1).on_acceptor_stream_dequeued(meta, event); } #[inline] - fn on_application_write( + fn on_stream_write( &self, context: &Self::ConnectionContext, meta: &api::ConnectionMeta, - event: &api::ApplicationWrite, + event: &api::StreamWrite, ) { - (self.0).on_application_write(&context.0, meta, event); - (self.1).on_application_write(&context.1, meta, event); + (self.0).on_stream_write(&context.0, meta, event); + (self.1).on_stream_write(&context.1, meta, event); } #[inline] - fn on_application_read( + fn on_stream_write_shutdown( &self, context: &Self::ConnectionContext, meta: &api::ConnectionMeta, - event: &api::ApplicationRead, + event: &api::StreamWriteShutdown, ) { - (self.0).on_application_read(&context.0, meta, event); - (self.1).on_application_read(&context.1, meta, event); + (self.0).on_stream_write_shutdown(&context.0, meta, event); + (self.1).on_stream_write_shutdown(&context.1, meta, event); + } + #[inline] + fn on_stream_read( + &self, + context: &Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::StreamRead, + ) { + (self.0).on_stream_read(&context.0, meta, event); + (self.1).on_stream_read(&context.1, meta, event); + } + #[inline] + fn on_stream_read_shutdown( + &self, + context: &Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::StreamReadShutdown, + ) { + (self.0).on_stream_read_shutdown(&context.0, meta, event); + (self.1).on_stream_read_shutdown(&context.1, meta, event); } #[inline] fn on_endpoint_initialized( @@ -5090,10 +5270,14 @@ mod traits { } } pub trait ConnectionPublisher { - #[doc = "Publishes a `ApplicationWrite` event to the publisher's subscriber"] - fn on_application_write(&self, event: builder::ApplicationWrite); - #[doc = "Publishes a `ApplicationRead` event to the publisher's subscriber"] - fn on_application_read(&self, event: builder::ApplicationRead); + #[doc = "Publishes a `StreamWrite` event to the publisher's subscriber"] + fn on_stream_write(&self, event: builder::StreamWrite); + #[doc = "Publishes a `StreamWriteShutdown` event to the publisher's subscriber"] + fn on_stream_write_shutdown(&self, event: builder::StreamWriteShutdown); + #[doc = "Publishes a `StreamRead` event to the publisher's subscriber"] + fn on_stream_read(&self, event: builder::StreamRead); + #[doc = "Publishes a `StreamReadShutdown` event to the publisher's subscriber"] + fn on_stream_read_shutdown(&self, event: builder::StreamReadShutdown); #[doc = r" Returns the QUIC version negotiated for the current connection, if any"] fn quic_version(&self) -> u32; #[doc = r" Returns the [`Subject`] for the current publisher"] @@ -5131,19 +5315,37 @@ mod traits { } impl<'a, Sub: Subscriber> ConnectionPublisher for ConnectionPublisherSubscriber<'a, Sub> { #[inline] - fn on_application_write(&self, event: builder::ApplicationWrite) { + fn on_stream_write(&self, event: builder::StreamWrite) { + let event = event.into_event(); + self.subscriber + .on_stream_write(self.context, &self.meta, &event); + self.subscriber + .on_connection_event(self.context, &self.meta, &event); + self.subscriber.on_event(&self.meta, &event); + } + #[inline] + fn on_stream_write_shutdown(&self, event: builder::StreamWriteShutdown) { + let event = event.into_event(); + self.subscriber + .on_stream_write_shutdown(self.context, &self.meta, &event); + self.subscriber + .on_connection_event(self.context, &self.meta, &event); + self.subscriber.on_event(&self.meta, &event); + } + #[inline] + fn on_stream_read(&self, event: builder::StreamRead) { let event = event.into_event(); self.subscriber - .on_application_write(self.context, &self.meta, &event); + .on_stream_read(self.context, &self.meta, &event); self.subscriber .on_connection_event(self.context, &self.meta, &event); self.subscriber.on_event(&self.meta, &event); } #[inline] - fn on_application_read(&self, event: builder::ApplicationRead) { + fn on_stream_read_shutdown(&self, event: builder::StreamReadShutdown) { let event = event.into_event(); self.subscriber - .on_application_read(self.context, &self.meta, &event); + .on_stream_read_shutdown(self.context, &self.meta, &event); self.subscriber .on_connection_event(self.context, &self.meta, &event); self.subscriber.on_event(&self.meta, &event); @@ -5867,8 +6069,10 @@ pub mod testing { pub acceptor_udp_io_error: AtomicU32, pub acceptor_stream_pruned: AtomicU32, pub acceptor_stream_dequeued: AtomicU32, - pub application_write: AtomicU32, - pub application_read: AtomicU32, + pub stream_write: AtomicU32, + pub stream_write_shutdown: AtomicU32, + pub stream_read: AtomicU32, + pub stream_read_shutdown: AtomicU32, pub endpoint_initialized: AtomicU32, pub path_secret_map_initialized: AtomicU32, pub path_secret_map_uninitialized: AtomicU32, @@ -5946,8 +6150,10 @@ pub mod testing { acceptor_udp_io_error: AtomicU32::new(0), acceptor_stream_pruned: AtomicU32::new(0), acceptor_stream_dequeued: AtomicU32::new(0), - application_write: AtomicU32::new(0), - application_read: AtomicU32::new(0), + stream_write: AtomicU32::new(0), + stream_write_shutdown: AtomicU32::new(0), + stream_read: AtomicU32::new(0), + stream_read_shutdown: AtomicU32::new(0), endpoint_initialized: AtomicU32::new(0), path_secret_map_initialized: AtomicU32::new(0), path_secret_map_uninitialized: AtomicU32::new(0), @@ -6198,13 +6404,13 @@ pub mod testing { let out = format!("{meta:?} {event:?}"); self.output.lock().unwrap().push(out); } - fn on_application_write( + fn on_stream_write( &self, _context: &Self::ConnectionContext, meta: &api::ConnectionMeta, - event: &api::ApplicationWrite, + event: &api::StreamWrite, ) { - self.application_write.fetch_add(1, Ordering::Relaxed); + self.stream_write.fetch_add(1, Ordering::Relaxed); if self.location.is_some() { let meta = crate::event::snapshot::Fmt::to_snapshot(meta); let event = crate::event::snapshot::Fmt::to_snapshot(event); @@ -6212,13 +6418,41 @@ pub mod testing { self.output.lock().unwrap().push(out); } } - fn on_application_read( + fn on_stream_write_shutdown( &self, _context: &Self::ConnectionContext, meta: &api::ConnectionMeta, - event: &api::ApplicationRead, + event: &api::StreamWriteShutdown, ) { - self.application_read.fetch_add(1, Ordering::Relaxed); + self.stream_write_shutdown.fetch_add(1, Ordering::Relaxed); + if self.location.is_some() { + let meta = crate::event::snapshot::Fmt::to_snapshot(meta); + let event = crate::event::snapshot::Fmt::to_snapshot(event); + let out = format!("{meta:?} {event:?}"); + self.output.lock().unwrap().push(out); + } + } + fn on_stream_read( + &self, + _context: &Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::StreamRead, + ) { + self.stream_read.fetch_add(1, Ordering::Relaxed); + if self.location.is_some() { + let meta = crate::event::snapshot::Fmt::to_snapshot(meta); + let event = crate::event::snapshot::Fmt::to_snapshot(event); + let out = format!("{meta:?} {event:?}"); + self.output.lock().unwrap().push(out); + } + } + fn on_stream_read_shutdown( + &self, + _context: &Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::StreamReadShutdown, + ) { + self.stream_read_shutdown.fetch_add(1, Ordering::Relaxed); if self.location.is_some() { let meta = crate::event::snapshot::Fmt::to_snapshot(meta); let event = crate::event::snapshot::Fmt::to_snapshot(event); @@ -6578,8 +6812,10 @@ pub mod testing { pub acceptor_udp_io_error: AtomicU32, pub acceptor_stream_pruned: AtomicU32, pub acceptor_stream_dequeued: AtomicU32, - pub application_write: AtomicU32, - pub application_read: AtomicU32, + pub stream_write: AtomicU32, + pub stream_write_shutdown: AtomicU32, + pub stream_read: AtomicU32, + pub stream_read_shutdown: AtomicU32, pub endpoint_initialized: AtomicU32, pub path_secret_map_initialized: AtomicU32, pub path_secret_map_uninitialized: AtomicU32, @@ -6647,8 +6883,10 @@ pub mod testing { acceptor_udp_io_error: AtomicU32::new(0), acceptor_stream_pruned: AtomicU32::new(0), acceptor_stream_dequeued: AtomicU32::new(0), - application_write: AtomicU32::new(0), - application_read: AtomicU32::new(0), + stream_write: AtomicU32::new(0), + stream_write_shutdown: AtomicU32::new(0), + stream_read: AtomicU32::new(0), + stream_read_shutdown: AtomicU32::new(0), endpoint_initialized: AtomicU32::new(0), path_secret_map_initialized: AtomicU32::new(0), path_secret_map_uninitialized: AtomicU32::new(0), @@ -7073,8 +7311,26 @@ pub mod testing { } } impl super::ConnectionPublisher for Publisher { - fn on_application_write(&self, event: builder::ApplicationWrite) { - self.application_write.fetch_add(1, Ordering::Relaxed); + fn on_stream_write(&self, event: builder::StreamWrite) { + self.stream_write.fetch_add(1, Ordering::Relaxed); + let event = event.into_event(); + if self.location.is_some() { + let event = crate::event::snapshot::Fmt::to_snapshot(&event); + let out = format!("{event:?}"); + self.output.lock().unwrap().push(out); + } + } + fn on_stream_write_shutdown(&self, event: builder::StreamWriteShutdown) { + self.stream_write_shutdown.fetch_add(1, Ordering::Relaxed); + let event = event.into_event(); + if self.location.is_some() { + let event = crate::event::snapshot::Fmt::to_snapshot(&event); + let out = format!("{event:?}"); + self.output.lock().unwrap().push(out); + } + } + fn on_stream_read(&self, event: builder::StreamRead) { + self.stream_read.fetch_add(1, Ordering::Relaxed); let event = event.into_event(); if self.location.is_some() { let event = crate::event::snapshot::Fmt::to_snapshot(&event); @@ -7082,8 +7338,8 @@ pub mod testing { self.output.lock().unwrap().push(out); } } - fn on_application_read(&self, event: builder::ApplicationRead) { - self.application_read.fetch_add(1, Ordering::Relaxed); + fn on_stream_read_shutdown(&self, event: builder::StreamReadShutdown) { + self.stream_read_shutdown.fetch_add(1, Ordering::Relaxed); let event = event.into_event(); if self.location.is_some() { let event = crate::event::snapshot::Fmt::to_snapshot(&event); diff --git a/dc/s2n-quic-dc/src/event/generated/metrics.rs b/dc/s2n-quic-dc/src/event/generated/metrics.rs index 5a38b228f..a30b7e7ad 100644 --- a/dc/s2n-quic-dc/src/event/generated/metrics.rs +++ b/dc/s2n-quic-dc/src/event/generated/metrics.rs @@ -26,8 +26,10 @@ where } pub struct Context { recorder: R, - application_write: AtomicU32, - application_read: AtomicU32, + stream_write: AtomicU32, + stream_write_shutdown: AtomicU32, + stream_read: AtomicU32, + stream_read_shutdown: AtomicU32, } impl event::Subscriber for Subscriber where @@ -41,42 +43,74 @@ where ) -> Self::ConnectionContext { Context { recorder: self.subscriber.create_connection_context(meta, info), - application_write: AtomicU32::new(0), - application_read: AtomicU32::new(0), + stream_write: AtomicU32::new(0), + stream_write_shutdown: AtomicU32::new(0), + stream_read: AtomicU32::new(0), + stream_read_shutdown: AtomicU32::new(0), } } #[inline] - fn on_application_write( + fn on_stream_write( &self, context: &Self::ConnectionContext, meta: &api::ConnectionMeta, - event: &api::ApplicationWrite, + event: &api::StreamWrite, ) { - context.application_write.fetch_add(1, Ordering::Relaxed); + context.stream_write.fetch_add(1, Ordering::Relaxed); self.subscriber - .on_application_write(&context.recorder, meta, event); + .on_stream_write(&context.recorder, meta, event); } #[inline] - fn on_application_read( + fn on_stream_write_shutdown( &self, context: &Self::ConnectionContext, meta: &api::ConnectionMeta, - event: &api::ApplicationRead, + event: &api::StreamWriteShutdown, ) { - context.application_read.fetch_add(1, Ordering::Relaxed); + context + .stream_write_shutdown + .fetch_add(1, Ordering::Relaxed); self.subscriber - .on_application_read(&context.recorder, meta, event); + .on_stream_write_shutdown(&context.recorder, meta, event); + } + #[inline] + fn on_stream_read( + &self, + context: &Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::StreamRead, + ) { + context.stream_read.fetch_add(1, Ordering::Relaxed); + self.subscriber + .on_stream_read(&context.recorder, meta, event); + } + #[inline] + fn on_stream_read_shutdown( + &self, + context: &Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::StreamReadShutdown, + ) { + context.stream_read_shutdown.fetch_add(1, Ordering::Relaxed); + self.subscriber + .on_stream_read_shutdown(&context.recorder, meta, event); } } impl Drop for Context { fn drop(&mut self) { self.recorder.increment_counter( - "application_write", - self.application_write.load(Ordering::Relaxed) as _, + "stream_write", + self.stream_write.load(Ordering::Relaxed) as _, + ); + self.recorder.increment_counter( + "stream_write_shutdown", + self.stream_write_shutdown.load(Ordering::Relaxed) as _, ); + self.recorder + .increment_counter("stream_read", self.stream_read.load(Ordering::Relaxed) as _); self.recorder.increment_counter( - "application_read", - self.application_read.load(Ordering::Relaxed) as _, + "stream_read_shutdown", + self.stream_read_shutdown.load(Ordering::Relaxed) as _, ); } } diff --git a/dc/s2n-quic-dc/src/event/generated/metrics/aggregate.rs b/dc/s2n-quic-dc/src/event/generated/metrics/aggregate.rs index c1e767ed2..f8371cdb2 100644 --- a/dc/s2n-quic-dc/src/event/generated/metrics/aggregate.rs +++ b/dc/s2n-quic-dc/src/event/generated/metrics/aggregate.rs @@ -12,7 +12,7 @@ use crate::event::{ AsVariant, BoolRecorder, Info, Metric, NominalRecorder, Recorder, Registry, Units, }, }; -static INFO: &[Info; 126usize] = &[ +static INFO: &[Info; 133usize] = &[ info::Builder { id: 0usize, name: Str::new("acceptor_tcp_started\0"), @@ -297,474 +297,516 @@ static INFO: &[Info; 126usize] = &[ .build(), info::Builder { id: 47usize, - name: Str::new("application_write\0"), + name: Str::new("stream_write\0"), units: Units::None, } .build(), info::Builder { id: 48usize, - name: Str::new("application_write.provided\0"), + name: Str::new("stream_write.provided\0"), units: Units::Bytes, } .build(), info::Builder { id: 49usize, - name: Str::new("application_write.committed.total\0"), + name: Str::new("stream_write.committed.total\0"), units: Units::Bytes, } .build(), info::Builder { id: 50usize, - name: Str::new("application_write.committed\0"), + name: Str::new("stream_write.committed\0"), units: Units::Bytes, } .build(), info::Builder { id: 51usize, - name: Str::new("application_read\0"), + name: Str::new("stream_write.ready\0"), units: Units::None, } .build(), info::Builder { id: 52usize, - name: Str::new("application_read.capacity\0"), - units: Units::Bytes, + name: Str::new("stream_write_shutdown\0"), + units: Units::None, } .build(), info::Builder { id: 53usize, - name: Str::new("application_read.committed.total\0"), + name: Str::new("stream_write_shutdown.buffer_len\0"), units: Units::Bytes, } .build(), info::Builder { id: 54usize, - name: Str::new("application_read.committed\0"), - units: Units::Bytes, + name: Str::new("stream_write_shutdown.background\0"), + units: Units::None, } .build(), info::Builder { id: 55usize, - name: Str::new("endpoint_initialized\0"), + name: Str::new("stream_read\0"), units: Units::None, } .build(), info::Builder { id: 56usize, + name: Str::new("stream_read.capacity\0"), + units: Units::Bytes, + } + .build(), + info::Builder { + id: 57usize, + name: Str::new("stream_read.committed.total\0"), + units: Units::Bytes, + } + .build(), + info::Builder { + id: 58usize, + name: Str::new("stream_read.committed\0"), + units: Units::Bytes, + } + .build(), + info::Builder { + id: 59usize, + name: Str::new("stream_read.ready\0"), + units: Units::None, + } + .build(), + info::Builder { + id: 60usize, + name: Str::new("stream_read_shutdown\0"), + units: Units::None, + } + .build(), + info::Builder { + id: 61usize, + name: Str::new("stream_read_shutdown.background\0"), + units: Units::None, + } + .build(), + info::Builder { + id: 62usize, + name: Str::new("endpoint_initialized\0"), + units: Units::None, + } + .build(), + info::Builder { + id: 63usize, name: Str::new("endpoint_initialized.acceptor.protocol\0"), units: Units::None, } .build(), info::Builder { - id: 57usize, + id: 64usize, name: Str::new("endpoint_initialized.handshake.protocol\0"), units: Units::None, } .build(), info::Builder { - id: 58usize, + id: 65usize, name: Str::new("endpoint_initialized.tcp\0"), units: Units::None, } .build(), info::Builder { - id: 59usize, + id: 66usize, name: Str::new("endpoint_initialized.udp\0"), units: Units::None, } .build(), info::Builder { - id: 60usize, + id: 67usize, name: Str::new("path_secret_map_initialized\0"), units: Units::None, } .build(), info::Builder { - id: 61usize, + id: 68usize, name: Str::new("path_secret_map_initialized.capacity\0"), units: Units::None, } .build(), info::Builder { - id: 62usize, + id: 69usize, name: Str::new("path_secret_map_uninitialized\0"), units: Units::None, } .build(), info::Builder { - id: 63usize, + id: 70usize, name: Str::new("path_secret_map_uninitialized.capacity\0"), units: Units::None, } .build(), info::Builder { - id: 64usize, + id: 71usize, name: Str::new("path_secret_map_uninitialized.entries\0"), units: Units::None, } .build(), info::Builder { - id: 65usize, + id: 72usize, name: Str::new("path_secret_map_uninitialized.lifetime\0"), units: Units::Duration, } .build(), info::Builder { - id: 66usize, + id: 73usize, name: Str::new("path_secret_map_background_handshake_requested\0"), units: Units::None, } .build(), info::Builder { - id: 67usize, + id: 74usize, name: Str::new("path_secret_map_background_handshake_requested.peer_address.protocol\0"), units: Units::None, } .build(), info::Builder { - id: 68usize, + id: 75usize, name: Str::new("path_secret_map_entry_inserted\0"), units: Units::None, } .build(), info::Builder { - id: 69usize, + id: 76usize, name: Str::new("path_secret_map_entry_inserted.peer_address.protocol\0"), units: Units::None, } .build(), info::Builder { - id: 70usize, + id: 77usize, name: Str::new("path_secret_map_entry_ready\0"), units: Units::None, } .build(), info::Builder { - id: 71usize, + id: 78usize, name: Str::new("path_secret_map_entry_ready.peer_address.protocol\0"), units: Units::None, } .build(), info::Builder { - id: 72usize, + id: 79usize, name: Str::new("path_secret_map_entry_replaced\0"), units: Units::None, } .build(), info::Builder { - id: 73usize, + id: 80usize, name: Str::new("path_secret_map_entry_replaced.peer_address.protocol\0"), units: Units::None, } .build(), info::Builder { - id: 74usize, + id: 81usize, name: Str::new("unknown_path_secret_packet_sent\0"), units: Units::None, } .build(), info::Builder { - id: 75usize, + id: 82usize, name: Str::new("unknown_path_secret_packet_sent.peer_address.protocol\0"), units: Units::None, } .build(), info::Builder { - id: 76usize, + id: 83usize, name: Str::new("unknown_path_secret_packet_received\0"), units: Units::None, } .build(), info::Builder { - id: 77usize, + id: 84usize, name: Str::new("unknown_path_secret_packet_received.peer_address.protocol\0"), units: Units::None, } .build(), info::Builder { - id: 78usize, + id: 85usize, name: Str::new("unknown_path_secret_packet_accepted\0"), units: Units::None, } .build(), info::Builder { - id: 79usize, + id: 86usize, name: Str::new("unknown_path_secret_packet_accepted.peer_address.protocol\0"), units: Units::None, } .build(), info::Builder { - id: 80usize, + id: 87usize, name: Str::new("unknown_path_secret_packet_rejected\0"), units: Units::None, } .build(), info::Builder { - id: 81usize, + id: 88usize, name: Str::new("unknown_path_secret_packet_rejected.peer_address.protocol\0"), units: Units::None, } .build(), info::Builder { - id: 82usize, + id: 89usize, name: Str::new("unknown_path_secret_packet_dropped\0"), units: Units::None, } .build(), info::Builder { - id: 83usize, + id: 90usize, name: Str::new("unknown_path_secret_packet_dropped.peer_address.protocol\0"), units: Units::None, } .build(), info::Builder { - id: 84usize, + id: 91usize, name: Str::new("key_accepted\0"), units: Units::None, } .build(), info::Builder { - id: 85usize, + id: 92usize, name: Str::new("key_accepted.gap\0"), units: Units::None, } .build(), info::Builder { - id: 86usize, + id: 93usize, name: Str::new("key_accepted.forward_shift\0"), units: Units::None, } .build(), info::Builder { - id: 87usize, + id: 94usize, name: Str::new("replay_definitely_detected\0"), units: Units::None, } .build(), info::Builder { - id: 88usize, + id: 95usize, name: Str::new("replay_potentially_detected\0"), units: Units::None, } .build(), info::Builder { - id: 89usize, + id: 96usize, name: Str::new("replay_potentially_detected.gap\0"), units: Units::None, } .build(), info::Builder { - id: 90usize, + id: 97usize, name: Str::new("replay_detected_packet_sent\0"), units: Units::None, } .build(), info::Builder { - id: 91usize, + id: 98usize, name: Str::new("replay_detected_packet_sent.peer_address.protocol\0"), units: Units::None, } .build(), info::Builder { - id: 92usize, + id: 99usize, name: Str::new("replay_detected_packet_received\0"), units: Units::None, } .build(), info::Builder { - id: 93usize, + id: 100usize, name: Str::new("replay_detected_packet_received.peer_address.protocol\0"), units: Units::None, } .build(), info::Builder { - id: 94usize, + id: 101usize, name: Str::new("replay_detected_packet_accepted\0"), units: Units::None, } .build(), info::Builder { - id: 95usize, + id: 102usize, name: Str::new("replay_detected_packet_accepted.peer_address.protocol\0"), units: Units::None, } .build(), info::Builder { - id: 96usize, + id: 103usize, name: Str::new("replay_detected_packet_rejected\0"), units: Units::None, } .build(), info::Builder { - id: 97usize, + id: 104usize, name: Str::new("replay_detected_packet_rejected.peer_address.protocol\0"), units: Units::None, } .build(), info::Builder { - id: 98usize, + id: 105usize, name: Str::new("replay_detected_packet_dropped\0"), units: Units::None, } .build(), info::Builder { - id: 99usize, + id: 106usize, name: Str::new("replay_detected_packet_dropped.peer_address.protocol\0"), units: Units::None, } .build(), info::Builder { - id: 100usize, + id: 107usize, name: Str::new("stale_key_packet_sent\0"), units: Units::None, } .build(), info::Builder { - id: 101usize, + id: 108usize, name: Str::new("stale_key_packet_sent.peer_address.protocol\0"), units: Units::None, } .build(), info::Builder { - id: 102usize, + id: 109usize, name: Str::new("stale_key_packet_received\0"), units: Units::None, } .build(), info::Builder { - id: 103usize, + id: 110usize, name: Str::new("stale_key_packet_received.peer_address.protocol\0"), units: Units::None, } .build(), info::Builder { - id: 104usize, + id: 111usize, name: Str::new("stale_key_packet_accepted\0"), units: Units::None, } .build(), info::Builder { - id: 105usize, + id: 112usize, name: Str::new("stale_key_packet_accepted.peer_address.protocol\0"), units: Units::None, } .build(), info::Builder { - id: 106usize, + id: 113usize, name: Str::new("stale_key_packet_rejected\0"), units: Units::None, } .build(), info::Builder { - id: 107usize, + id: 114usize, name: Str::new("stale_key_packet_rejected.peer_address.protocol\0"), units: Units::None, } .build(), info::Builder { - id: 108usize, + id: 115usize, name: Str::new("stale_key_packet_dropped\0"), units: Units::None, } .build(), info::Builder { - id: 109usize, + id: 116usize, name: Str::new("stale_key_packet_dropped.peer_address.protocol\0"), units: Units::None, } .build(), info::Builder { - id: 110usize, + id: 117usize, name: Str::new("path_secret_map_address_cache_accessed\0"), units: Units::None, } .build(), info::Builder { - id: 111usize, + id: 118usize, name: Str::new("path_secret_map_address_cache_accessed.peer_address.protocol\0"), units: Units::None, } .build(), info::Builder { - id: 112usize, + id: 119usize, name: Str::new("path_secret_map_address_cache_accessed.hit\0"), units: Units::None, } .build(), info::Builder { - id: 113usize, + id: 120usize, name: Str::new("path_secret_map_id_cache_accessed\0"), units: Units::None, } .build(), info::Builder { - id: 114usize, + id: 121usize, name: Str::new("path_secret_map_id_cache_accessed.hit\0"), units: Units::None, } .build(), info::Builder { - id: 115usize, + id: 122usize, name: Str::new("path_secret_map_cleaner_cycled\0"), units: Units::None, } .build(), info::Builder { - id: 116usize, + id: 123usize, name: Str::new("path_secret_map_cleaner_cycled.entries.id\0"), units: Units::None, } .build(), info::Builder { - id: 117usize, + id: 124usize, name: Str::new("path_secret_map_cleaner_cycled.entries.id.retired\0"), units: Units::None, } .build(), info::Builder { - id: 118usize, + id: 125usize, name: Str::new("path_secret_map_cleaner_cycled.entries.id.utilization\0"), units: Units::Percent, } .build(), info::Builder { - id: 119usize, + id: 126usize, name: Str::new("path_secret_map_cleaner_cycled.entries.id.utilization.initial\0"), units: Units::Percent, } .build(), info::Builder { - id: 120usize, + id: 127usize, name: Str::new("path_secret_map_cleaner_cycled.entries.address\0"), units: Units::None, } .build(), info::Builder { - id: 121usize, + id: 128usize, name: Str::new("path_secret_map_cleaner_cycled.entries.address.retired\0"), units: Units::None, } .build(), info::Builder { - id: 122usize, + id: 129usize, name: Str::new("path_secret_map_cleaner_cycled.entries.address.utilization\0"), units: Units::Percent, } .build(), info::Builder { - id: 123usize, + id: 130usize, name: Str::new("path_secret_map_cleaner_cycled.entries.address.utilization.initial\0"), units: Units::Percent, } .build(), info::Builder { - id: 124usize, + id: 131usize, name: Str::new("path_secret_map_cleaner_cycled.handshake_requests\0"), units: Units::None, } .build(), info::Builder { - id: 125usize, + id: 132usize, name: Str::new("path_secret_map_cleaner_cycled.handshake_requests.retired\0"), units: Units::None, } @@ -777,15 +819,15 @@ pub struct ConnectionContext { } pub struct Subscriber { #[allow(dead_code)] - counters: Box<[R::Counter; 50usize]>, + counters: Box<[R::Counter; 52usize]>, #[allow(dead_code)] - bool_counters: Box<[R::BoolCounter; 10usize]>, + bool_counters: Box<[R::BoolCounter; 14usize]>, #[allow(dead_code)] nominal_counters: Box<[R::NominalCounter]>, #[allow(dead_code)] nominal_counter_offsets: Box<[usize; 26usize]>, #[allow(dead_code)] - measures: Box<[R::Measure; 33usize]>, + measures: Box<[R::Measure; 34usize]>, #[allow(dead_code)] gauges: Box<[R::Gauge; 0usize]>, #[allow(dead_code)] @@ -812,11 +854,11 @@ impl Subscriber { #[allow(unused_mut)] #[inline] pub fn new(registry: R) -> Self { - let mut counters = Vec::with_capacity(50usize); - let mut bool_counters = Vec::with_capacity(10usize); + let mut counters = Vec::with_capacity(52usize); + let mut bool_counters = Vec::with_capacity(14usize); let mut nominal_counters = Vec::with_capacity(26usize); let mut nominal_counter_offsets = Vec::with_capacity(26usize); - let mut measures = Vec::with_capacity(33usize); + let mut measures = Vec::with_capacity(34usize); let mut gauges = Vec::with_capacity(0usize); let mut timers = Vec::with_capacity(7usize); let mut nominal_timers = Vec::with_capacity(0usize); @@ -841,46 +883,52 @@ impl Subscriber { counters.push(registry.register_counter(&INFO[45usize])); counters.push(registry.register_counter(&INFO[47usize])); counters.push(registry.register_counter(&INFO[49usize])); - counters.push(registry.register_counter(&INFO[51usize])); - counters.push(registry.register_counter(&INFO[53usize])); + counters.push(registry.register_counter(&INFO[52usize])); counters.push(registry.register_counter(&INFO[55usize])); + counters.push(registry.register_counter(&INFO[57usize])); counters.push(registry.register_counter(&INFO[60usize])); counters.push(registry.register_counter(&INFO[62usize])); - counters.push(registry.register_counter(&INFO[66usize])); - counters.push(registry.register_counter(&INFO[68usize])); - counters.push(registry.register_counter(&INFO[70usize])); - counters.push(registry.register_counter(&INFO[72usize])); - counters.push(registry.register_counter(&INFO[74usize])); - counters.push(registry.register_counter(&INFO[76usize])); - counters.push(registry.register_counter(&INFO[78usize])); - counters.push(registry.register_counter(&INFO[80usize])); - counters.push(registry.register_counter(&INFO[82usize])); - counters.push(registry.register_counter(&INFO[84usize])); + counters.push(registry.register_counter(&INFO[67usize])); + counters.push(registry.register_counter(&INFO[69usize])); + counters.push(registry.register_counter(&INFO[73usize])); + counters.push(registry.register_counter(&INFO[75usize])); + counters.push(registry.register_counter(&INFO[77usize])); + counters.push(registry.register_counter(&INFO[79usize])); + counters.push(registry.register_counter(&INFO[81usize])); + counters.push(registry.register_counter(&INFO[83usize])); + counters.push(registry.register_counter(&INFO[85usize])); counters.push(registry.register_counter(&INFO[87usize])); - counters.push(registry.register_counter(&INFO[88usize])); - counters.push(registry.register_counter(&INFO[90usize])); - counters.push(registry.register_counter(&INFO[92usize])); + counters.push(registry.register_counter(&INFO[89usize])); + counters.push(registry.register_counter(&INFO[91usize])); counters.push(registry.register_counter(&INFO[94usize])); - counters.push(registry.register_counter(&INFO[96usize])); - counters.push(registry.register_counter(&INFO[98usize])); - counters.push(registry.register_counter(&INFO[100usize])); - counters.push(registry.register_counter(&INFO[102usize])); - counters.push(registry.register_counter(&INFO[104usize])); - counters.push(registry.register_counter(&INFO[106usize])); - counters.push(registry.register_counter(&INFO[108usize])); - counters.push(registry.register_counter(&INFO[110usize])); + counters.push(registry.register_counter(&INFO[95usize])); + counters.push(registry.register_counter(&INFO[97usize])); + counters.push(registry.register_counter(&INFO[99usize])); + counters.push(registry.register_counter(&INFO[101usize])); + counters.push(registry.register_counter(&INFO[103usize])); + counters.push(registry.register_counter(&INFO[105usize])); + counters.push(registry.register_counter(&INFO[107usize])); + counters.push(registry.register_counter(&INFO[109usize])); + counters.push(registry.register_counter(&INFO[111usize])); counters.push(registry.register_counter(&INFO[113usize])); counters.push(registry.register_counter(&INFO[115usize])); + counters.push(registry.register_counter(&INFO[117usize])); + counters.push(registry.register_counter(&INFO[120usize])); + counters.push(registry.register_counter(&INFO[122usize])); bool_counters.push(registry.register_bool_counter(&INFO[19usize])); bool_counters.push(registry.register_bool_counter(&INFO[20usize])); bool_counters.push(registry.register_bool_counter(&INFO[34usize])); bool_counters.push(registry.register_bool_counter(&INFO[35usize])); bool_counters.push(registry.register_bool_counter(&INFO[36usize])); bool_counters.push(registry.register_bool_counter(&INFO[37usize])); - bool_counters.push(registry.register_bool_counter(&INFO[58usize])); + bool_counters.push(registry.register_bool_counter(&INFO[51usize])); + bool_counters.push(registry.register_bool_counter(&INFO[54usize])); bool_counters.push(registry.register_bool_counter(&INFO[59usize])); - bool_counters.push(registry.register_bool_counter(&INFO[112usize])); - bool_counters.push(registry.register_bool_counter(&INFO[114usize])); + bool_counters.push(registry.register_bool_counter(&INFO[61usize])); + bool_counters.push(registry.register_bool_counter(&INFO[65usize])); + bool_counters.push(registry.register_bool_counter(&INFO[66usize])); + bool_counters.push(registry.register_bool_counter(&INFO[119usize])); + bool_counters.push(registry.register_bool_counter(&INFO[121usize])); { #[allow(unused_imports)] use api::*; @@ -933,7 +981,7 @@ impl Subscriber { let mut count = 0; for variant in ::VARIANTS.iter() { nominal_counters - .push(registry.register_nominal_counter(&INFO[56usize], variant)); + .push(registry.register_nominal_counter(&INFO[63usize], variant)); count += 1; } debug_assert_ne!(count, 0, "field type needs at least one variant"); @@ -944,7 +992,7 @@ impl Subscriber { let mut count = 0; for variant in ::VARIANTS.iter() { nominal_counters - .push(registry.register_nominal_counter(&INFO[57usize], variant)); + .push(registry.register_nominal_counter(&INFO[64usize], variant)); count += 1; } debug_assert_ne!(count, 0, "field type needs at least one variant"); @@ -955,7 +1003,7 @@ impl Subscriber { let mut count = 0; for variant in ::VARIANTS.iter() { nominal_counters - .push(registry.register_nominal_counter(&INFO[67usize], variant)); + .push(registry.register_nominal_counter(&INFO[74usize], variant)); count += 1; } debug_assert_ne!(count, 0, "field type needs at least one variant"); @@ -966,7 +1014,7 @@ impl Subscriber { let mut count = 0; for variant in ::VARIANTS.iter() { nominal_counters - .push(registry.register_nominal_counter(&INFO[69usize], variant)); + .push(registry.register_nominal_counter(&INFO[76usize], variant)); count += 1; } debug_assert_ne!(count, 0, "field type needs at least one variant"); @@ -977,7 +1025,7 @@ impl Subscriber { let mut count = 0; for variant in ::VARIANTS.iter() { nominal_counters - .push(registry.register_nominal_counter(&INFO[71usize], variant)); + .push(registry.register_nominal_counter(&INFO[78usize], variant)); count += 1; } debug_assert_ne!(count, 0, "field type needs at least one variant"); @@ -988,7 +1036,7 @@ impl Subscriber { let mut count = 0; for variant in ::VARIANTS.iter() { nominal_counters - .push(registry.register_nominal_counter(&INFO[73usize], variant)); + .push(registry.register_nominal_counter(&INFO[80usize], variant)); count += 1; } debug_assert_ne!(count, 0, "field type needs at least one variant"); @@ -999,7 +1047,7 @@ impl Subscriber { let mut count = 0; for variant in ::VARIANTS.iter() { nominal_counters - .push(registry.register_nominal_counter(&INFO[75usize], variant)); + .push(registry.register_nominal_counter(&INFO[82usize], variant)); count += 1; } debug_assert_ne!(count, 0, "field type needs at least one variant"); @@ -1010,7 +1058,7 @@ impl Subscriber { let mut count = 0; for variant in ::VARIANTS.iter() { nominal_counters - .push(registry.register_nominal_counter(&INFO[77usize], variant)); + .push(registry.register_nominal_counter(&INFO[84usize], variant)); count += 1; } debug_assert_ne!(count, 0, "field type needs at least one variant"); @@ -1021,7 +1069,7 @@ impl Subscriber { let mut count = 0; for variant in ::VARIANTS.iter() { nominal_counters - .push(registry.register_nominal_counter(&INFO[79usize], variant)); + .push(registry.register_nominal_counter(&INFO[86usize], variant)); count += 1; } debug_assert_ne!(count, 0, "field type needs at least one variant"); @@ -1032,7 +1080,7 @@ impl Subscriber { let mut count = 0; for variant in ::VARIANTS.iter() { nominal_counters - .push(registry.register_nominal_counter(&INFO[81usize], variant)); + .push(registry.register_nominal_counter(&INFO[88usize], variant)); count += 1; } debug_assert_ne!(count, 0, "field type needs at least one variant"); @@ -1043,7 +1091,7 @@ impl Subscriber { let mut count = 0; for variant in ::VARIANTS.iter() { nominal_counters - .push(registry.register_nominal_counter(&INFO[83usize], variant)); + .push(registry.register_nominal_counter(&INFO[90usize], variant)); count += 1; } debug_assert_ne!(count, 0, "field type needs at least one variant"); @@ -1054,7 +1102,7 @@ impl Subscriber { let mut count = 0; for variant in ::VARIANTS.iter() { nominal_counters - .push(registry.register_nominal_counter(&INFO[91usize], variant)); + .push(registry.register_nominal_counter(&INFO[98usize], variant)); count += 1; } debug_assert_ne!(count, 0, "field type needs at least one variant"); @@ -1065,7 +1113,7 @@ impl Subscriber { let mut count = 0; for variant in ::VARIANTS.iter() { nominal_counters - .push(registry.register_nominal_counter(&INFO[93usize], variant)); + .push(registry.register_nominal_counter(&INFO[100usize], variant)); count += 1; } debug_assert_ne!(count, 0, "field type needs at least one variant"); @@ -1076,7 +1124,7 @@ impl Subscriber { let mut count = 0; for variant in ::VARIANTS.iter() { nominal_counters - .push(registry.register_nominal_counter(&INFO[95usize], variant)); + .push(registry.register_nominal_counter(&INFO[102usize], variant)); count += 1; } debug_assert_ne!(count, 0, "field type needs at least one variant"); @@ -1087,7 +1135,7 @@ impl Subscriber { let mut count = 0; for variant in ::VARIANTS.iter() { nominal_counters - .push(registry.register_nominal_counter(&INFO[97usize], variant)); + .push(registry.register_nominal_counter(&INFO[104usize], variant)); count += 1; } debug_assert_ne!(count, 0, "field type needs at least one variant"); @@ -1098,7 +1146,7 @@ impl Subscriber { let mut count = 0; for variant in ::VARIANTS.iter() { nominal_counters - .push(registry.register_nominal_counter(&INFO[99usize], variant)); + .push(registry.register_nominal_counter(&INFO[106usize], variant)); count += 1; } debug_assert_ne!(count, 0, "field type needs at least one variant"); @@ -1109,7 +1157,7 @@ impl Subscriber { let mut count = 0; for variant in ::VARIANTS.iter() { nominal_counters - .push(registry.register_nominal_counter(&INFO[101usize], variant)); + .push(registry.register_nominal_counter(&INFO[108usize], variant)); count += 1; } debug_assert_ne!(count, 0, "field type needs at least one variant"); @@ -1120,7 +1168,7 @@ impl Subscriber { let mut count = 0; for variant in ::VARIANTS.iter() { nominal_counters - .push(registry.register_nominal_counter(&INFO[103usize], variant)); + .push(registry.register_nominal_counter(&INFO[110usize], variant)); count += 1; } debug_assert_ne!(count, 0, "field type needs at least one variant"); @@ -1131,7 +1179,7 @@ impl Subscriber { let mut count = 0; for variant in ::VARIANTS.iter() { nominal_counters - .push(registry.register_nominal_counter(&INFO[105usize], variant)); + .push(registry.register_nominal_counter(&INFO[112usize], variant)); count += 1; } debug_assert_ne!(count, 0, "field type needs at least one variant"); @@ -1142,7 +1190,7 @@ impl Subscriber { let mut count = 0; for variant in ::VARIANTS.iter() { nominal_counters - .push(registry.register_nominal_counter(&INFO[107usize], variant)); + .push(registry.register_nominal_counter(&INFO[114usize], variant)); count += 1; } debug_assert_ne!(count, 0, "field type needs at least one variant"); @@ -1153,7 +1201,7 @@ impl Subscriber { let mut count = 0; for variant in ::VARIANTS.iter() { nominal_counters - .push(registry.register_nominal_counter(&INFO[109usize], variant)); + .push(registry.register_nominal_counter(&INFO[116usize], variant)); count += 1; } debug_assert_ne!(count, 0, "field type needs at least one variant"); @@ -1164,7 +1212,7 @@ impl Subscriber { let mut count = 0; for variant in ::VARIANTS.iter() { nominal_counters - .push(registry.register_nominal_counter(&INFO[111usize], variant)); + .push(registry.register_nominal_counter(&INFO[118usize], variant)); count += 1; } debug_assert_ne!(count, 0, "field type needs at least one variant"); @@ -1185,25 +1233,26 @@ impl Subscriber { measures.push(registry.register_measure(&INFO[33usize])); measures.push(registry.register_measure(&INFO[48usize])); measures.push(registry.register_measure(&INFO[50usize])); - measures.push(registry.register_measure(&INFO[52usize])); - measures.push(registry.register_measure(&INFO[54usize])); - measures.push(registry.register_measure(&INFO[61usize])); - measures.push(registry.register_measure(&INFO[63usize])); - measures.push(registry.register_measure(&INFO[64usize])); - measures.push(registry.register_measure(&INFO[65usize])); - measures.push(registry.register_measure(&INFO[85usize])); - measures.push(registry.register_measure(&INFO[86usize])); - measures.push(registry.register_measure(&INFO[89usize])); - measures.push(registry.register_measure(&INFO[116usize])); - measures.push(registry.register_measure(&INFO[117usize])); - measures.push(registry.register_measure(&INFO[118usize])); - measures.push(registry.register_measure(&INFO[119usize])); - measures.push(registry.register_measure(&INFO[120usize])); - measures.push(registry.register_measure(&INFO[121usize])); - measures.push(registry.register_measure(&INFO[122usize])); + measures.push(registry.register_measure(&INFO[53usize])); + measures.push(registry.register_measure(&INFO[56usize])); + measures.push(registry.register_measure(&INFO[58usize])); + measures.push(registry.register_measure(&INFO[68usize])); + measures.push(registry.register_measure(&INFO[70usize])); + measures.push(registry.register_measure(&INFO[71usize])); + measures.push(registry.register_measure(&INFO[72usize])); + measures.push(registry.register_measure(&INFO[92usize])); + measures.push(registry.register_measure(&INFO[93usize])); + measures.push(registry.register_measure(&INFO[96usize])); measures.push(registry.register_measure(&INFO[123usize])); measures.push(registry.register_measure(&INFO[124usize])); measures.push(registry.register_measure(&INFO[125usize])); + measures.push(registry.register_measure(&INFO[126usize])); + measures.push(registry.register_measure(&INFO[127usize])); + measures.push(registry.register_measure(&INFO[128usize])); + measures.push(registry.register_measure(&INFO[129usize])); + measures.push(registry.register_measure(&INFO[130usize])); + measures.push(registry.register_measure(&INFO[131usize])); + measures.push(registry.register_measure(&INFO[132usize])); timers.push(registry.register_timer(&INFO[5usize])); timers.push(registry.register_timer(&INFO[15usize])); timers.push(registry.register_timer(&INFO[21usize])); @@ -1265,36 +1314,38 @@ impl Subscriber { 17usize => (&INFO[45usize], entry), 18usize => (&INFO[47usize], entry), 19usize => (&INFO[49usize], entry), - 20usize => (&INFO[51usize], entry), - 21usize => (&INFO[53usize], entry), - 22usize => (&INFO[55usize], entry), + 20usize => (&INFO[52usize], entry), + 21usize => (&INFO[55usize], entry), + 22usize => (&INFO[57usize], entry), 23usize => (&INFO[60usize], entry), 24usize => (&INFO[62usize], entry), - 25usize => (&INFO[66usize], entry), - 26usize => (&INFO[68usize], entry), - 27usize => (&INFO[70usize], entry), - 28usize => (&INFO[72usize], entry), - 29usize => (&INFO[74usize], entry), - 30usize => (&INFO[76usize], entry), - 31usize => (&INFO[78usize], entry), - 32usize => (&INFO[80usize], entry), - 33usize => (&INFO[82usize], entry), - 34usize => (&INFO[84usize], entry), - 35usize => (&INFO[87usize], entry), - 36usize => (&INFO[88usize], entry), - 37usize => (&INFO[90usize], entry), - 38usize => (&INFO[92usize], entry), - 39usize => (&INFO[94usize], entry), - 40usize => (&INFO[96usize], entry), - 41usize => (&INFO[98usize], entry), - 42usize => (&INFO[100usize], entry), - 43usize => (&INFO[102usize], entry), - 44usize => (&INFO[104usize], entry), - 45usize => (&INFO[106usize], entry), - 46usize => (&INFO[108usize], entry), - 47usize => (&INFO[110usize], entry), - 48usize => (&INFO[113usize], entry), - 49usize => (&INFO[115usize], entry), + 25usize => (&INFO[67usize], entry), + 26usize => (&INFO[69usize], entry), + 27usize => (&INFO[73usize], entry), + 28usize => (&INFO[75usize], entry), + 29usize => (&INFO[77usize], entry), + 30usize => (&INFO[79usize], entry), + 31usize => (&INFO[81usize], entry), + 32usize => (&INFO[83usize], entry), + 33usize => (&INFO[85usize], entry), + 34usize => (&INFO[87usize], entry), + 35usize => (&INFO[89usize], entry), + 36usize => (&INFO[91usize], entry), + 37usize => (&INFO[94usize], entry), + 38usize => (&INFO[95usize], entry), + 39usize => (&INFO[97usize], entry), + 40usize => (&INFO[99usize], entry), + 41usize => (&INFO[101usize], entry), + 42usize => (&INFO[103usize], entry), + 43usize => (&INFO[105usize], entry), + 44usize => (&INFO[107usize], entry), + 45usize => (&INFO[109usize], entry), + 46usize => (&INFO[111usize], entry), + 47usize => (&INFO[113usize], entry), + 48usize => (&INFO[115usize], entry), + 49usize => (&INFO[117usize], entry), + 50usize => (&INFO[120usize], entry), + 51usize => (&INFO[122usize], entry), _ => unsafe { core::hint::unreachable_unchecked() }, }) } @@ -1318,10 +1369,14 @@ impl Subscriber { 3usize => (&INFO[35usize], entry), 4usize => (&INFO[36usize], entry), 5usize => (&INFO[37usize], entry), - 6usize => (&INFO[58usize], entry), - 7usize => (&INFO[59usize], entry), - 8usize => (&INFO[112usize], entry), - 9usize => (&INFO[114usize], entry), + 6usize => (&INFO[51usize], entry), + 7usize => (&INFO[54usize], entry), + 8usize => (&INFO[59usize], entry), + 9usize => (&INFO[61usize], entry), + 10usize => (&INFO[65usize], entry), + 11usize => (&INFO[66usize], entry), + 12usize => (&INFO[119usize], entry), + 13usize => (&INFO[121usize], entry), _ => unsafe { core::hint::unreachable_unchecked() }, }) } @@ -1370,133 +1425,133 @@ impl Subscriber { let offset = *entry; let variants = ::VARIANTS; let entries = &self.nominal_counters[offset..offset + variants.len()]; - (&INFO[56usize], entries, variants) + (&INFO[63usize], entries, variants) } 5usize => { let offset = *entry; let variants = ::VARIANTS; let entries = &self.nominal_counters[offset..offset + variants.len()]; - (&INFO[57usize], entries, variants) + (&INFO[64usize], entries, variants) } 6usize => { let offset = *entry; let variants = ::VARIANTS; let entries = &self.nominal_counters[offset..offset + variants.len()]; - (&INFO[67usize], entries, variants) + (&INFO[74usize], entries, variants) } 7usize => { let offset = *entry; let variants = ::VARIANTS; let entries = &self.nominal_counters[offset..offset + variants.len()]; - (&INFO[69usize], entries, variants) + (&INFO[76usize], entries, variants) } 8usize => { let offset = *entry; let variants = ::VARIANTS; let entries = &self.nominal_counters[offset..offset + variants.len()]; - (&INFO[71usize], entries, variants) + (&INFO[78usize], entries, variants) } 9usize => { let offset = *entry; let variants = ::VARIANTS; let entries = &self.nominal_counters[offset..offset + variants.len()]; - (&INFO[73usize], entries, variants) + (&INFO[80usize], entries, variants) } 10usize => { let offset = *entry; let variants = ::VARIANTS; let entries = &self.nominal_counters[offset..offset + variants.len()]; - (&INFO[75usize], entries, variants) + (&INFO[82usize], entries, variants) } 11usize => { let offset = *entry; let variants = ::VARIANTS; let entries = &self.nominal_counters[offset..offset + variants.len()]; - (&INFO[77usize], entries, variants) + (&INFO[84usize], entries, variants) } 12usize => { let offset = *entry; let variants = ::VARIANTS; let entries = &self.nominal_counters[offset..offset + variants.len()]; - (&INFO[79usize], entries, variants) + (&INFO[86usize], entries, variants) } 13usize => { let offset = *entry; let variants = ::VARIANTS; let entries = &self.nominal_counters[offset..offset + variants.len()]; - (&INFO[81usize], entries, variants) + (&INFO[88usize], entries, variants) } 14usize => { let offset = *entry; let variants = ::VARIANTS; let entries = &self.nominal_counters[offset..offset + variants.len()]; - (&INFO[83usize], entries, variants) + (&INFO[90usize], entries, variants) } 15usize => { let offset = *entry; let variants = ::VARIANTS; let entries = &self.nominal_counters[offset..offset + variants.len()]; - (&INFO[91usize], entries, variants) + (&INFO[98usize], entries, variants) } 16usize => { let offset = *entry; let variants = ::VARIANTS; let entries = &self.nominal_counters[offset..offset + variants.len()]; - (&INFO[93usize], entries, variants) + (&INFO[100usize], entries, variants) } 17usize => { let offset = *entry; let variants = ::VARIANTS; let entries = &self.nominal_counters[offset..offset + variants.len()]; - (&INFO[95usize], entries, variants) + (&INFO[102usize], entries, variants) } 18usize => { let offset = *entry; let variants = ::VARIANTS; let entries = &self.nominal_counters[offset..offset + variants.len()]; - (&INFO[97usize], entries, variants) + (&INFO[104usize], entries, variants) } 19usize => { let offset = *entry; let variants = ::VARIANTS; let entries = &self.nominal_counters[offset..offset + variants.len()]; - (&INFO[99usize], entries, variants) + (&INFO[106usize], entries, variants) } 20usize => { let offset = *entry; let variants = ::VARIANTS; let entries = &self.nominal_counters[offset..offset + variants.len()]; - (&INFO[101usize], entries, variants) + (&INFO[108usize], entries, variants) } 21usize => { let offset = *entry; let variants = ::VARIANTS; let entries = &self.nominal_counters[offset..offset + variants.len()]; - (&INFO[103usize], entries, variants) + (&INFO[110usize], entries, variants) } 22usize => { let offset = *entry; let variants = ::VARIANTS; let entries = &self.nominal_counters[offset..offset + variants.len()]; - (&INFO[105usize], entries, variants) + (&INFO[112usize], entries, variants) } 23usize => { let offset = *entry; let variants = ::VARIANTS; let entries = &self.nominal_counters[offset..offset + variants.len()]; - (&INFO[107usize], entries, variants) + (&INFO[114usize], entries, variants) } 24usize => { let offset = *entry; let variants = ::VARIANTS; let entries = &self.nominal_counters[offset..offset + variants.len()]; - (&INFO[109usize], entries, variants) + (&INFO[116usize], entries, variants) } 25usize => { let offset = *entry; let variants = ::VARIANTS; let entries = &self.nominal_counters[offset..offset + variants.len()]; - (&INFO[111usize], entries, variants) + (&INFO[118usize], entries, variants) } _ => unsafe { core::hint::unreachable_unchecked() }, }) @@ -1530,25 +1585,26 @@ impl Subscriber { 11usize => (&INFO[33usize], entry), 12usize => (&INFO[48usize], entry), 13usize => (&INFO[50usize], entry), - 14usize => (&INFO[52usize], entry), - 15usize => (&INFO[54usize], entry), - 16usize => (&INFO[61usize], entry), - 17usize => (&INFO[63usize], entry), - 18usize => (&INFO[64usize], entry), - 19usize => (&INFO[65usize], entry), - 20usize => (&INFO[85usize], entry), - 21usize => (&INFO[86usize], entry), - 22usize => (&INFO[89usize], entry), - 23usize => (&INFO[116usize], entry), - 24usize => (&INFO[117usize], entry), - 25usize => (&INFO[118usize], entry), - 26usize => (&INFO[119usize], entry), - 27usize => (&INFO[120usize], entry), - 28usize => (&INFO[121usize], entry), - 29usize => (&INFO[122usize], entry), - 30usize => (&INFO[123usize], entry), - 31usize => (&INFO[124usize], entry), - 32usize => (&INFO[125usize], entry), + 14usize => (&INFO[53usize], entry), + 15usize => (&INFO[56usize], entry), + 16usize => (&INFO[58usize], entry), + 17usize => (&INFO[68usize], entry), + 18usize => (&INFO[70usize], entry), + 19usize => (&INFO[71usize], entry), + 20usize => (&INFO[72usize], entry), + 21usize => (&INFO[92usize], entry), + 22usize => (&INFO[93usize], entry), + 23usize => (&INFO[96usize], entry), + 24usize => (&INFO[123usize], entry), + 25usize => (&INFO[124usize], entry), + 26usize => (&INFO[125usize], entry), + 27usize => (&INFO[126usize], entry), + 28usize => (&INFO[127usize], entry), + 29usize => (&INFO[128usize], entry), + 30usize => (&INFO[129usize], entry), + 31usize => (&INFO[130usize], entry), + 32usize => (&INFO[131usize], entry), + 33usize => (&INFO[132usize], entry), _ => unsafe { core::hint::unreachable_unchecked() }, }) } @@ -1851,11 +1907,11 @@ impl event::Subscriber for Subscriber { let _ = meta; } #[inline] - fn on_application_write( + fn on_stream_write( &self, context: &Self::ConnectionContext, meta: &api::ConnectionMeta, - event: &api::ApplicationWrite, + event: &api::StreamWrite, ) { #[allow(unused_imports)] use api::*; @@ -1863,23 +1919,56 @@ impl event::Subscriber for Subscriber { self.measure(48usize, 12usize, event.total_len); self.count(49usize, 19usize, event.write_len); self.measure(50usize, 13usize, event.write_len); + self.count_bool(51usize, 6usize, event.ready); let _ = context; let _ = meta; let _ = event; } #[inline] - fn on_application_read( + fn on_stream_write_shutdown( &self, context: &Self::ConnectionContext, meta: &api::ConnectionMeta, - event: &api::ApplicationRead, + event: &api::StreamWriteShutdown, ) { #[allow(unused_imports)] use api::*; - self.count(51usize, 20usize, 1usize); - self.measure(52usize, 14usize, event.capacity); - self.count(53usize, 21usize, event.read_len); - self.measure(54usize, 15usize, event.read_len); + self.count(52usize, 20usize, 1usize); + self.measure(53usize, 14usize, event.buffer_len); + self.count_bool(54usize, 7usize, event.background); + let _ = context; + let _ = meta; + let _ = event; + } + #[inline] + fn on_stream_read( + &self, + context: &Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::StreamRead, + ) { + #[allow(unused_imports)] + use api::*; + self.count(55usize, 21usize, 1usize); + self.measure(56usize, 15usize, event.capacity); + self.count(57usize, 22usize, event.read_len); + self.measure(58usize, 16usize, event.read_len); + self.count_bool(59usize, 8usize, event.ready); + let _ = context; + let _ = meta; + let _ = event; + } + #[inline] + fn on_stream_read_shutdown( + &self, + context: &Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::StreamReadShutdown, + ) { + #[allow(unused_imports)] + use api::*; + self.count(60usize, 23usize, 1usize); + self.count_bool(61usize, 9usize, event.background); let _ = context; let _ = meta; let _ = event; @@ -1888,11 +1977,11 @@ impl event::Subscriber for Subscriber { fn on_endpoint_initialized(&self, meta: &api::EndpointMeta, event: &api::EndpointInitialized) { #[allow(unused_imports)] use api::*; - self.count(55usize, 22usize, 1usize); - self.count_nominal(56usize, 4usize, &event.acceptor_addr); - self.count_nominal(57usize, 5usize, &event.handshake_addr); - self.count_bool(58usize, 6usize, event.tcp); - self.count_bool(59usize, 7usize, event.udp); + self.count(62usize, 24usize, 1usize); + self.count_nominal(63usize, 4usize, &event.acceptor_addr); + self.count_nominal(64usize, 5usize, &event.handshake_addr); + self.count_bool(65usize, 10usize, event.tcp); + self.count_bool(66usize, 11usize, event.udp); let _ = event; let _ = meta; } @@ -1904,8 +1993,8 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(60usize, 23usize, 1usize); - self.measure(61usize, 16usize, event.capacity); + self.count(67usize, 25usize, 1usize); + self.measure(68usize, 17usize, event.capacity); let _ = event; let _ = meta; } @@ -1917,10 +2006,10 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(62usize, 24usize, 1usize); - self.measure(63usize, 17usize, event.capacity); - self.measure(64usize, 18usize, event.entries); - self.measure(65usize, 19usize, event.lifetime); + self.count(69usize, 26usize, 1usize); + self.measure(70usize, 18usize, event.capacity); + self.measure(71usize, 19usize, event.entries); + self.measure(72usize, 20usize, event.lifetime); let _ = event; let _ = meta; } @@ -1932,8 +2021,8 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(66usize, 25usize, 1usize); - self.count_nominal(67usize, 6usize, &event.peer_address); + self.count(73usize, 27usize, 1usize); + self.count_nominal(74usize, 6usize, &event.peer_address); let _ = event; let _ = meta; } @@ -1945,8 +2034,8 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(68usize, 26usize, 1usize); - self.count_nominal(69usize, 7usize, &event.peer_address); + self.count(75usize, 28usize, 1usize); + self.count_nominal(76usize, 7usize, &event.peer_address); let _ = event; let _ = meta; } @@ -1958,8 +2047,8 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(70usize, 27usize, 1usize); - self.count_nominal(71usize, 8usize, &event.peer_address); + self.count(77usize, 29usize, 1usize); + self.count_nominal(78usize, 8usize, &event.peer_address); let _ = event; let _ = meta; } @@ -1971,8 +2060,8 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(72usize, 28usize, 1usize); - self.count_nominal(73usize, 9usize, &event.peer_address); + self.count(79usize, 30usize, 1usize); + self.count_nominal(80usize, 9usize, &event.peer_address); let _ = event; let _ = meta; } @@ -1984,8 +2073,8 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(74usize, 29usize, 1usize); - self.count_nominal(75usize, 10usize, &event.peer_address); + self.count(81usize, 31usize, 1usize); + self.count_nominal(82usize, 10usize, &event.peer_address); let _ = event; let _ = meta; } @@ -1997,8 +2086,8 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(76usize, 30usize, 1usize); - self.count_nominal(77usize, 11usize, &event.peer_address); + self.count(83usize, 32usize, 1usize); + self.count_nominal(84usize, 11usize, &event.peer_address); let _ = event; let _ = meta; } @@ -2010,8 +2099,8 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(78usize, 31usize, 1usize); - self.count_nominal(79usize, 12usize, &event.peer_address); + self.count(85usize, 33usize, 1usize); + self.count_nominal(86usize, 12usize, &event.peer_address); let _ = event; let _ = meta; } @@ -2023,8 +2112,8 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(80usize, 32usize, 1usize); - self.count_nominal(81usize, 13usize, &event.peer_address); + self.count(87usize, 34usize, 1usize); + self.count_nominal(88usize, 13usize, &event.peer_address); let _ = event; let _ = meta; } @@ -2036,8 +2125,8 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(82usize, 33usize, 1usize); - self.count_nominal(83usize, 14usize, &event.peer_address); + self.count(89usize, 35usize, 1usize); + self.count_nominal(90usize, 14usize, &event.peer_address); let _ = event; let _ = meta; } @@ -2045,9 +2134,9 @@ impl event::Subscriber for Subscriber { fn on_key_accepted(&self, meta: &api::EndpointMeta, event: &api::KeyAccepted) { #[allow(unused_imports)] use api::*; - self.count(84usize, 34usize, 1usize); - self.measure(85usize, 20usize, event.gap); - self.measure(86usize, 21usize, event.forward_shift); + self.count(91usize, 36usize, 1usize); + self.measure(92usize, 21usize, event.gap); + self.measure(93usize, 22usize, event.forward_shift); let _ = event; let _ = meta; } @@ -2059,7 +2148,7 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(87usize, 35usize, 1usize); + self.count(94usize, 37usize, 1usize); let _ = event; let _ = meta; } @@ -2071,8 +2160,8 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(88usize, 36usize, 1usize); - self.measure(89usize, 22usize, event.gap); + self.count(95usize, 38usize, 1usize); + self.measure(96usize, 23usize, event.gap); let _ = event; let _ = meta; } @@ -2084,8 +2173,8 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(90usize, 37usize, 1usize); - self.count_nominal(91usize, 15usize, &event.peer_address); + self.count(97usize, 39usize, 1usize); + self.count_nominal(98usize, 15usize, &event.peer_address); let _ = event; let _ = meta; } @@ -2097,8 +2186,8 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(92usize, 38usize, 1usize); - self.count_nominal(93usize, 16usize, &event.peer_address); + self.count(99usize, 40usize, 1usize); + self.count_nominal(100usize, 16usize, &event.peer_address); let _ = event; let _ = meta; } @@ -2110,8 +2199,8 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(94usize, 39usize, 1usize); - self.count_nominal(95usize, 17usize, &event.peer_address); + self.count(101usize, 41usize, 1usize); + self.count_nominal(102usize, 17usize, &event.peer_address); let _ = event; let _ = meta; } @@ -2123,8 +2212,8 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(96usize, 40usize, 1usize); - self.count_nominal(97usize, 18usize, &event.peer_address); + self.count(103usize, 42usize, 1usize); + self.count_nominal(104usize, 18usize, &event.peer_address); let _ = event; let _ = meta; } @@ -2136,8 +2225,8 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(98usize, 41usize, 1usize); - self.count_nominal(99usize, 19usize, &event.peer_address); + self.count(105usize, 43usize, 1usize); + self.count_nominal(106usize, 19usize, &event.peer_address); let _ = event; let _ = meta; } @@ -2145,8 +2234,8 @@ impl event::Subscriber for Subscriber { fn on_stale_key_packet_sent(&self, meta: &api::EndpointMeta, event: &api::StaleKeyPacketSent) { #[allow(unused_imports)] use api::*; - self.count(100usize, 42usize, 1usize); - self.count_nominal(101usize, 20usize, &event.peer_address); + self.count(107usize, 44usize, 1usize); + self.count_nominal(108usize, 20usize, &event.peer_address); let _ = event; let _ = meta; } @@ -2158,8 +2247,8 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(102usize, 43usize, 1usize); - self.count_nominal(103usize, 21usize, &event.peer_address); + self.count(109usize, 45usize, 1usize); + self.count_nominal(110usize, 21usize, &event.peer_address); let _ = event; let _ = meta; } @@ -2171,8 +2260,8 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(104usize, 44usize, 1usize); - self.count_nominal(105usize, 22usize, &event.peer_address); + self.count(111usize, 46usize, 1usize); + self.count_nominal(112usize, 22usize, &event.peer_address); let _ = event; let _ = meta; } @@ -2184,8 +2273,8 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(106usize, 45usize, 1usize); - self.count_nominal(107usize, 23usize, &event.peer_address); + self.count(113usize, 47usize, 1usize); + self.count_nominal(114usize, 23usize, &event.peer_address); let _ = event; let _ = meta; } @@ -2197,8 +2286,8 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(108usize, 46usize, 1usize); - self.count_nominal(109usize, 24usize, &event.peer_address); + self.count(115usize, 48usize, 1usize); + self.count_nominal(116usize, 24usize, &event.peer_address); let _ = event; let _ = meta; } @@ -2210,9 +2299,9 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(110usize, 47usize, 1usize); - self.count_nominal(111usize, 25usize, &event.peer_address); - self.count_bool(112usize, 8usize, event.hit); + self.count(117usize, 49usize, 1usize); + self.count_nominal(118usize, 25usize, &event.peer_address); + self.count_bool(119usize, 12usize, event.hit); let _ = event; let _ = meta; } @@ -2224,8 +2313,8 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(113usize, 48usize, 1usize); - self.count_bool(114usize, 9usize, event.hit); + self.count(120usize, 50usize, 1usize); + self.count_bool(121usize, 13usize, event.hit); let _ = event; let _ = meta; } @@ -2237,17 +2326,17 @@ impl event::Subscriber for Subscriber { ) { #[allow(unused_imports)] use api::*; - self.count(115usize, 49usize, 1usize); - self.measure(116usize, 23usize, event.id_entries); - self.measure(117usize, 24usize, event.id_entries_retired); - self.measure(118usize, 25usize, event.id_entries_utilization); - self.measure(119usize, 26usize, event.id_entries_initial_utilization); - self.measure(120usize, 27usize, event.address_entries); - self.measure(121usize, 28usize, event.address_entries_retired); - self.measure(122usize, 29usize, event.address_entries_utilization); - self.measure(123usize, 30usize, event.address_entries_initial_utilization); - self.measure(124usize, 31usize, event.handshake_requests); - self.measure(125usize, 32usize, event.handshake_requests_retired); + self.count(122usize, 51usize, 1usize); + self.measure(123usize, 24usize, event.id_entries); + self.measure(124usize, 25usize, event.id_entries_retired); + self.measure(125usize, 26usize, event.id_entries_utilization); + self.measure(126usize, 27usize, event.id_entries_initial_utilization); + self.measure(127usize, 28usize, event.address_entries); + self.measure(128usize, 29usize, event.address_entries_retired); + self.measure(129usize, 30usize, event.address_entries_utilization); + self.measure(130usize, 31usize, event.address_entries_initial_utilization); + self.measure(131usize, 32usize, event.handshake_requests); + self.measure(132usize, 33usize, event.handshake_requests_retired); let _ = event; let _ = meta; } diff --git a/dc/s2n-quic-dc/src/event/generated/metrics/probe.rs b/dc/s2n-quic-dc/src/event/generated/metrics/probe.rs index 74c1c460b..c44511318 100644 --- a/dc/s2n-quic-dc/src/event/generated/metrics/probe.rs +++ b/dc/s2n-quic-dc/src/event/generated/metrics/probe.rs @@ -35,38 +35,40 @@ mod counter { 41usize => Self(acceptor_udp_io_error), 42usize => Self(acceptor_stream_pruned), 45usize => Self(acceptor_stream_dequeued), - 47usize => Self(application_write), - 49usize => Self(application_write__committed__total), - 51usize => Self(application_read), - 53usize => Self(application_read__committed__total), - 55usize => Self(endpoint_initialized), - 60usize => Self(path_secret_map_initialized), - 62usize => Self(path_secret_map_uninitialized), - 66usize => Self(path_secret_map_background_handshake_requested), - 68usize => Self(path_secret_map_entry_inserted), - 70usize => Self(path_secret_map_entry_ready), - 72usize => Self(path_secret_map_entry_replaced), - 74usize => Self(unknown_path_secret_packet_sent), - 76usize => Self(unknown_path_secret_packet_received), - 78usize => Self(unknown_path_secret_packet_accepted), - 80usize => Self(unknown_path_secret_packet_rejected), - 82usize => Self(unknown_path_secret_packet_dropped), - 84usize => Self(key_accepted), - 87usize => Self(replay_definitely_detected), - 88usize => Self(replay_potentially_detected), - 90usize => Self(replay_detected_packet_sent), - 92usize => Self(replay_detected_packet_received), - 94usize => Self(replay_detected_packet_accepted), - 96usize => Self(replay_detected_packet_rejected), - 98usize => Self(replay_detected_packet_dropped), - 100usize => Self(stale_key_packet_sent), - 102usize => Self(stale_key_packet_received), - 104usize => Self(stale_key_packet_accepted), - 106usize => Self(stale_key_packet_rejected), - 108usize => Self(stale_key_packet_dropped), - 110usize => Self(path_secret_map_address_cache_accessed), - 113usize => Self(path_secret_map_id_cache_accessed), - 115usize => Self(path_secret_map_cleaner_cycled), + 47usize => Self(stream_write), + 49usize => Self(stream_write__committed__total), + 52usize => Self(stream_write_shutdown), + 55usize => Self(stream_read), + 57usize => Self(stream_read__committed__total), + 60usize => Self(stream_read_shutdown), + 62usize => Self(endpoint_initialized), + 67usize => Self(path_secret_map_initialized), + 69usize => Self(path_secret_map_uninitialized), + 73usize => Self(path_secret_map_background_handshake_requested), + 75usize => Self(path_secret_map_entry_inserted), + 77usize => Self(path_secret_map_entry_ready), + 79usize => Self(path_secret_map_entry_replaced), + 81usize => Self(unknown_path_secret_packet_sent), + 83usize => Self(unknown_path_secret_packet_received), + 85usize => Self(unknown_path_secret_packet_accepted), + 87usize => Self(unknown_path_secret_packet_rejected), + 89usize => Self(unknown_path_secret_packet_dropped), + 91usize => Self(key_accepted), + 94usize => Self(replay_definitely_detected), + 95usize => Self(replay_potentially_detected), + 97usize => Self(replay_detected_packet_sent), + 99usize => Self(replay_detected_packet_received), + 101usize => Self(replay_detected_packet_accepted), + 103usize => Self(replay_detected_packet_rejected), + 105usize => Self(replay_detected_packet_dropped), + 107usize => Self(stale_key_packet_sent), + 109usize => Self(stale_key_packet_received), + 111usize => Self(stale_key_packet_accepted), + 113usize => Self(stale_key_packet_rejected), + 115usize => Self(stale_key_packet_dropped), + 117usize => Self(path_secret_map_address_cache_accessed), + 120usize => Self(path_secret_map_id_cache_accessed), + 122usize => Self(path_secret_map_cleaner_cycled), _ => unreachable!("invalid info: {info:?}"), } } @@ -114,14 +116,18 @@ mod counter { fn acceptor_stream_pruned(value: u64); # [link_name = s2n_quic_dc__event__counter__acceptor_stream_dequeued] fn acceptor_stream_dequeued(value: u64); - # [link_name = s2n_quic_dc__event__counter__application_write] - fn application_write(value: u64); - # [link_name = s2n_quic_dc__event__counter__application_write__committed__total] - fn application_write__committed__total(value: u64); - # [link_name = s2n_quic_dc__event__counter__application_read] - fn application_read(value: u64); - # [link_name = s2n_quic_dc__event__counter__application_read__committed__total] - fn application_read__committed__total(value: u64); + # [link_name = s2n_quic_dc__event__counter__stream_write] + fn stream_write(value: u64); + # [link_name = s2n_quic_dc__event__counter__stream_write__committed__total] + fn stream_write__committed__total(value: u64); + # [link_name = s2n_quic_dc__event__counter__stream_write_shutdown] + fn stream_write_shutdown(value: u64); + # [link_name = s2n_quic_dc__event__counter__stream_read] + fn stream_read(value: u64); + # [link_name = s2n_quic_dc__event__counter__stream_read__committed__total] + fn stream_read__committed__total(value: u64); + # [link_name = s2n_quic_dc__event__counter__stream_read_shutdown] + fn stream_read_shutdown(value: u64); # [link_name = s2n_quic_dc__event__counter__endpoint_initialized] fn endpoint_initialized(value: u64); # [link_name = s2n_quic_dc__event__counter__path_secret_map_initialized] @@ -193,10 +199,14 @@ mod counter { 35usize => Self(acceptor_udp_packet_received__is_retransmisson), 36usize => Self(acceptor_udp_packet_received__is_fin), 37usize => Self(acceptor_udp_packet_received__is_fin_known), - 58usize => Self(endpoint_initialized__tcp), - 59usize => Self(endpoint_initialized__udp), - 112usize => Self(path_secret_map_address_cache_accessed__hit), - 114usize => Self(path_secret_map_id_cache_accessed__hit), + 51usize => Self(stream_write__ready), + 54usize => Self(stream_write_shutdown__background), + 59usize => Self(stream_read__ready), + 61usize => Self(stream_read_shutdown__background), + 65usize => Self(endpoint_initialized__tcp), + 66usize => Self(endpoint_initialized__udp), + 119usize => Self(path_secret_map_address_cache_accessed__hit), + 121usize => Self(path_secret_map_id_cache_accessed__hit), _ => unreachable!("invalid info: {info:?}"), } } @@ -220,6 +230,14 @@ mod counter { fn acceptor_udp_packet_received__is_fin(value: bool); # [link_name = s2n_quic_dc__event__counter__bool__acceptor_udp_packet_received__is_fin_known] fn acceptor_udp_packet_received__is_fin_known(value: bool); + # [link_name = s2n_quic_dc__event__counter__bool__stream_write__ready] + fn stream_write__ready(value: bool); + # [link_name = s2n_quic_dc__event__counter__bool__stream_write_shutdown__background] + fn stream_write_shutdown__background(value: bool); + # [link_name = s2n_quic_dc__event__counter__bool__stream_read__ready] + fn stream_read__ready(value: bool); + # [link_name = s2n_quic_dc__event__counter__bool__stream_read_shutdown__background] + fn stream_read_shutdown__background(value: bool); # [link_name = s2n_quic_dc__event__counter__bool__endpoint_initialized__tcp] fn endpoint_initialized__tcp(value: bool); # [link_name = s2n_quic_dc__event__counter__bool__endpoint_initialized__udp] @@ -243,30 +261,30 @@ mod counter { 23usize => Self(acceptor_tcp_packet_dropped__reason), 39usize => Self(acceptor_udp_packet_dropped__reason), 44usize => Self(acceptor_stream_pruned__reason), - 56usize => Self(endpoint_initialized__acceptor__protocol), - 57usize => Self(endpoint_initialized__handshake__protocol), - 67usize => { + 63usize => Self(endpoint_initialized__acceptor__protocol), + 64usize => Self(endpoint_initialized__handshake__protocol), + 74usize => { Self(path_secret_map_background_handshake_requested__peer_address__protocol) } - 69usize => Self(path_secret_map_entry_inserted__peer_address__protocol), - 71usize => Self(path_secret_map_entry_ready__peer_address__protocol), - 73usize => Self(path_secret_map_entry_replaced__peer_address__protocol), - 75usize => Self(unknown_path_secret_packet_sent__peer_address__protocol), - 77usize => Self(unknown_path_secret_packet_received__peer_address__protocol), - 79usize => Self(unknown_path_secret_packet_accepted__peer_address__protocol), - 81usize => Self(unknown_path_secret_packet_rejected__peer_address__protocol), - 83usize => Self(unknown_path_secret_packet_dropped__peer_address__protocol), - 91usize => Self(replay_detected_packet_sent__peer_address__protocol), - 93usize => Self(replay_detected_packet_received__peer_address__protocol), - 95usize => Self(replay_detected_packet_accepted__peer_address__protocol), - 97usize => Self(replay_detected_packet_rejected__peer_address__protocol), - 99usize => Self(replay_detected_packet_dropped__peer_address__protocol), - 101usize => Self(stale_key_packet_sent__peer_address__protocol), - 103usize => Self(stale_key_packet_received__peer_address__protocol), - 105usize => Self(stale_key_packet_accepted__peer_address__protocol), - 107usize => Self(stale_key_packet_rejected__peer_address__protocol), - 109usize => Self(stale_key_packet_dropped__peer_address__protocol), - 111usize => { + 76usize => Self(path_secret_map_entry_inserted__peer_address__protocol), + 78usize => Self(path_secret_map_entry_ready__peer_address__protocol), + 80usize => Self(path_secret_map_entry_replaced__peer_address__protocol), + 82usize => Self(unknown_path_secret_packet_sent__peer_address__protocol), + 84usize => Self(unknown_path_secret_packet_received__peer_address__protocol), + 86usize => Self(unknown_path_secret_packet_accepted__peer_address__protocol), + 88usize => Self(unknown_path_secret_packet_rejected__peer_address__protocol), + 90usize => Self(unknown_path_secret_packet_dropped__peer_address__protocol), + 98usize => Self(replay_detected_packet_sent__peer_address__protocol), + 100usize => Self(replay_detected_packet_received__peer_address__protocol), + 102usize => Self(replay_detected_packet_accepted__peer_address__protocol), + 104usize => Self(replay_detected_packet_rejected__peer_address__protocol), + 106usize => Self(replay_detected_packet_dropped__peer_address__protocol), + 108usize => Self(stale_key_packet_sent__peer_address__protocol), + 110usize => Self(stale_key_packet_received__peer_address__protocol), + 112usize => Self(stale_key_packet_accepted__peer_address__protocol), + 114usize => Self(stale_key_packet_rejected__peer_address__protocol), + 116usize => Self(stale_key_packet_dropped__peer_address__protocol), + 118usize => { Self(path_secret_map_address_cache_accessed__peer_address__protocol) } _ => unreachable!("invalid info: {info:?}"), @@ -465,29 +483,30 @@ mod measure { 27usize => Self(acceptor_tcp_stream_enqueued__blocked_count), 31usize => Self(acceptor_udp_datagram_received__len), 33usize => Self(acceptor_udp_packet_received__payload_len), - 48usize => Self(application_write__provided), - 50usize => Self(application_write__committed), - 52usize => Self(application_read__capacity), - 54usize => Self(application_read__committed), - 61usize => Self(path_secret_map_initialized__capacity), - 63usize => Self(path_secret_map_uninitialized__capacity), - 64usize => Self(path_secret_map_uninitialized__entries), - 65usize => Self(path_secret_map_uninitialized__lifetime), - 85usize => Self(key_accepted__gap), - 86usize => Self(key_accepted__forward_shift), - 89usize => Self(replay_potentially_detected__gap), - 116usize => Self(path_secret_map_cleaner_cycled__entries__id), - 117usize => Self(path_secret_map_cleaner_cycled__entries__id__retired), - 118usize => Self(path_secret_map_cleaner_cycled__entries__id__utilization), - 119usize => Self(path_secret_map_cleaner_cycled__entries__id__utilization__initial), - 120usize => Self(path_secret_map_cleaner_cycled__entries__address), - 121usize => Self(path_secret_map_cleaner_cycled__entries__address__retired), - 122usize => Self(path_secret_map_cleaner_cycled__entries__address__utilization), - 123usize => { + 48usize => Self(stream_write__provided), + 50usize => Self(stream_write__committed), + 53usize => Self(stream_write_shutdown__buffer_len), + 56usize => Self(stream_read__capacity), + 58usize => Self(stream_read__committed), + 68usize => Self(path_secret_map_initialized__capacity), + 70usize => Self(path_secret_map_uninitialized__capacity), + 71usize => Self(path_secret_map_uninitialized__entries), + 72usize => Self(path_secret_map_uninitialized__lifetime), + 92usize => Self(key_accepted__gap), + 93usize => Self(key_accepted__forward_shift), + 96usize => Self(replay_potentially_detected__gap), + 123usize => Self(path_secret_map_cleaner_cycled__entries__id), + 124usize => Self(path_secret_map_cleaner_cycled__entries__id__retired), + 125usize => Self(path_secret_map_cleaner_cycled__entries__id__utilization), + 126usize => Self(path_secret_map_cleaner_cycled__entries__id__utilization__initial), + 127usize => Self(path_secret_map_cleaner_cycled__entries__address), + 128usize => Self(path_secret_map_cleaner_cycled__entries__address__retired), + 129usize => Self(path_secret_map_cleaner_cycled__entries__address__utilization), + 130usize => { Self(path_secret_map_cleaner_cycled__entries__address__utilization__initial) } - 124usize => Self(path_secret_map_cleaner_cycled__handshake_requests), - 125usize => Self(path_secret_map_cleaner_cycled__handshake_requests__retired), + 131usize => Self(path_secret_map_cleaner_cycled__handshake_requests), + 132usize => Self(path_secret_map_cleaner_cycled__handshake_requests__retired), _ => unreachable!("invalid info: {info:?}"), } } @@ -523,14 +542,16 @@ mod measure { fn acceptor_udp_datagram_received__len(value: u64); # [link_name = s2n_quic_dc__event__measure__acceptor_udp_packet_received__payload_len] fn acceptor_udp_packet_received__payload_len(value: u64); - # [link_name = s2n_quic_dc__event__measure__application_write__provided] - fn application_write__provided(value: u64); - # [link_name = s2n_quic_dc__event__measure__application_write__committed] - fn application_write__committed(value: u64); - # [link_name = s2n_quic_dc__event__measure__application_read__capacity] - fn application_read__capacity(value: u64); - # [link_name = s2n_quic_dc__event__measure__application_read__committed] - fn application_read__committed(value: u64); + # [link_name = s2n_quic_dc__event__measure__stream_write__provided] + fn stream_write__provided(value: u64); + # [link_name = s2n_quic_dc__event__measure__stream_write__committed] + fn stream_write__committed(value: u64); + # [link_name = s2n_quic_dc__event__measure__stream_write_shutdown__buffer_len] + fn stream_write_shutdown__buffer_len(value: u64); + # [link_name = s2n_quic_dc__event__measure__stream_read__capacity] + fn stream_read__capacity(value: u64); + # [link_name = s2n_quic_dc__event__measure__stream_read__committed] + fn stream_read__committed(value: u64); # [link_name = s2n_quic_dc__event__measure__path_secret_map_initialized__capacity] fn path_secret_map_initialized__capacity(value: u64); # [link_name = s2n_quic_dc__event__measure__path_secret_map_uninitialized__capacity] diff --git a/dc/s2n-quic-dc/src/stream/recv/application.rs b/dc/s2n-quic-dc/src/stream/recv/application.rs index 8dff47381..5b51c6998 100644 --- a/dc/s2n-quic-dc/src/stream/recv/application.rs +++ b/dc/s2n-quic-dc/src/stream/recv/application.rs @@ -3,7 +3,8 @@ use crate::{ clock::Timer, - event, msg, + event::{self, ConnectionPublisher as _}, + msg, stream::{recv, runtime, shared::ArcShared, socket}, }; use core::{ @@ -143,7 +144,9 @@ where where S: buffer::writer::Storage, { - waker::debug_assert_contract(cx, |cx| { + let capacity = out_buf.remaining_capacity(); + + let res = waker::debug_assert_contract(cx, |cx| { let mut out_buf = out_buf.track_write(); let res = self.0.poll_read_into(cx, &mut out_buf); @@ -161,7 +164,25 @@ where res?; Ok(out_buf.written_len()).into() - }) + }); + + let read_len = match &res { + Poll::Ready(Ok(len)) => *len, + Poll::Ready(Err(_)) => 0, + Poll::Pending => 0, + }; + + self.0 + .shared + .common + .publisher() + .on_stream_read(event::builder::StreamRead { + capacity, + read_len, + ready: res.is_ready(), + }); + + res } } @@ -308,10 +329,24 @@ where #[inline] fn shutdown(mut self: Box) { + // If the application never read from the stream try to do so now + if let LocalState::Ready = self.local_state { + let mut storage = buffer::writer::storage::Empty; + let waker = s2n_quic_core::task::waker::noop(); + let mut cx = core::task::Context::from_waker(&waker); + let _ = self.poll_read_into(&mut cx, &mut storage.track_write()); + } + + let background = matches!(self.local_state, LocalState::Ready); + + self.shared + .publisher() + .on_stream_read_shutdown(event::builder::StreamReadShutdown { background }); + // If we haven't exited the `Ready` state then spawn a task to do it for the application // // This is important for processing any secret control packets that the server sends us - if let LocalState::Ready = self.local_state { + if background { tracing::debug!("spawning task to read server's response"); let runtime = self.runtime.clone(); let handle = Shutdown(self); diff --git a/dc/s2n-quic-dc/src/stream/send/application.rs b/dc/s2n-quic-dc/src/stream/send/application.rs index b846d4b4e..9a8d89204 100644 --- a/dc/s2n-quic-dc/src/stream/send/application.rs +++ b/dc/s2n-quic-dc/src/stream/send/application.rs @@ -2,7 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - clock, event, msg, + clock, + event::{self, ConnectionPublisher}, + msg, stream::{ pacer, runtime, send::{flow, queue}, @@ -90,7 +92,9 @@ where where S: buffer::reader::storage::Infallible, { - waker::debug_assert_contract(cx, |cx| { + let total_len = buf.buffered_len(); + + let res = waker::debug_assert_contract(cx, |cx| { // if we've already shut down the stream then return early if !self.0.open { ensure!( @@ -111,7 +115,25 @@ where } res.into() - }) + }); + + let write_len = match &res { + Poll::Ready(Ok(len)) => *len, + Poll::Ready(Err(_)) => 0, + Poll::Pending => 0, + }; + + self.0 + .shared + .common + .publisher() + .on_stream_write(event::builder::StreamWrite { + total_len, + write_len, + ready: res.is_ready(), + }); + + res } /// Shutdown the stream for writing. @@ -283,16 +305,34 @@ where self.sockets.write_application().send_finish()?; } + let buffer_len = queue.accepted_len(); + // pass things to the worker if we need to gracefully shut down if !self.sockets.write_application().features().is_stream() { + self.shared + .publisher() + .on_stream_write_shutdown(event::builder::StreamWriteShutdown { + background: false, + buffer_len, + }); + let is_panicking = matches!(ty, ShutdownType::Drop { is_panicking: true }); self.shared.sender.shutdown(queue, is_panicking); return Ok(()); } + let background = !queue.is_empty(); + + self.shared + .publisher() + .on_stream_write_shutdown(event::builder::StreamWriteShutdown { + background, + buffer_len, + }); + // if we're using TCP and we get blocked from writing a final offset then spawn a task // to do it for us - if !queue.is_empty() { + if background { let shared = self.shared.clone(); let sockets = self.sockets.clone(); self.runtime.spawn_send_shutdown(Shutdown { diff --git a/dc/s2n-quic-dc/src/stream/shared.rs b/dc/s2n-quic-dc/src/stream/shared.rs index a1f25f04f..64e75222f 100644 --- a/dc/s2n-quic-dc/src/stream/shared.rs +++ b/dc/s2n-quic-dc/src/stream/shared.rs @@ -192,9 +192,24 @@ where ); Ok(()) } +} + +impl Common +where + Sub: event::Subscriber, + Clock: ?Sized + s2n_quic_core::time::Clock, +{ + #[inline] + pub fn publisher(&self) -> event::ConnectionPublisherSubscriber { + let now = self.clock.get_time(); + self.publisher_with_time(now) + } #[inline] - pub fn publisher(&self, timestamp: Timestamp) -> event::ConnectionPublisherSubscriber { + pub fn publisher_with_time( + &self, + timestamp: Timestamp, + ) -> event::ConnectionPublisherSubscriber { event::ConnectionPublisherSubscriber::new( event::builder::ConnectionMeta { id: 0, // TODO