Skip to content

Commit

Permalink
feat(s2n-quic-dc): implement connection-level metric aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft committed Dec 3, 2024
1 parent 0dfeff7 commit daf7802
Show file tree
Hide file tree
Showing 14 changed files with 2,448 additions and 1,795 deletions.
27 changes: 27 additions & 0 deletions dc/s2n-quic-dc/events/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#[event("stream:write_flushed")]
#[checkpoint("latency")]
#[measure_counter("conn")]
pub struct StreamWriteFlushed {
/// The number of bytes that the application tried to write
#[measure("provided", Bytes)]
Expand All @@ -11,17 +12,20 @@ pub struct StreamWriteFlushed {
/// The amount that was written
#[measure("committed", Bytes)]
#[counter("committed.total", Bytes)]
#[measure_counter("committed.conn", Bytes)]
committed_len: usize,

/// The amount of time it took to process the write request
///
/// Note that this includes both any syscall and encryption overhead
#[measure("processing_duration", Duration)]
#[measure_counter("processing_duration.conn", Duration)]
processing_duration: core::time::Duration,
}

#[event("stream:write_fin_flushed")]
#[checkpoint("latency")]
#[measure_counter("conn")]
pub struct StreamWriteFinFlushed {
/// The number of bytes that the application tried to write
#[measure("provided", Bytes)]
Expand All @@ -30,17 +34,20 @@ pub struct StreamWriteFinFlushed {
/// The amount that was written
#[measure("committed", Bytes)]
#[counter("committed.total", Bytes)]
#[measure_counter("committed.conn", Bytes)]
committed_len: usize,

/// The amount of time it took to process the write request
///
/// Note that this includes both any syscall and encryption overhead
#[measure("processing_duration", Duration)]
#[measure_counter("processing_duration.conn", Duration)]
processing_duration: core::time::Duration,
}

#[event("stream:write_blocked")]
#[checkpoint("latency")]
#[measure_counter("conn.stream.write.blocked")]
pub struct StreamWriteBlocked {
/// The number of bytes that the application tried to write
#[measure("provided", Bytes)]
Expand All @@ -53,6 +60,7 @@ pub struct StreamWriteBlocked {
///
/// Note that this includes both any syscall and encryption overhead
#[measure("processing_duration", Duration)]
#[measure_counter("processing_duration.conn", Duration)]
processing_duration: core::time::Duration,
}

Expand All @@ -70,6 +78,7 @@ pub struct StreamWriteErrored {
///
/// Note that this includes both any syscall and encryption overhead
#[measure("processing_duration", Duration)]
#[measure_counter("processing_duration.conn", Duration)]
processing_duration: core::time::Duration,

/// The system `errno` from the returned error
Expand All @@ -89,6 +98,7 @@ pub struct StreamWriteShutdown {
}

#[event("stream:write_socket_flushed")]
#[measure_counter("conn")]
pub struct StreamWriteSocketFlushed {
/// The number of bytes that the stream tried to write to the socket
#[measure("provided", Bytes)]
Expand All @@ -97,10 +107,12 @@ pub struct StreamWriteSocketFlushed {
/// The amount that was written
#[measure("committed", Bytes)]
#[counter("committed.total", Bytes)]
#[measure_counter("committed.conn", Bytes)]
committed_len: usize,
}

#[event("stream:write_socket_blocked")]
#[measure_counter("conn.stream.write.socket.blocked")]
pub struct StreamWriteSocketBlocked {
/// The number of bytes that the stream tried to write to the socket
#[measure("provided", Bytes)]
Expand All @@ -119,6 +131,7 @@ pub struct StreamWriteSocketErrored {

#[event("stream:read_flushed")]
#[checkpoint("latency")]
#[measure_counter("conn")]
pub struct StreamReadFlushed {
/// The number of bytes that the application tried to read
#[measure("capacity", Bytes)]
Expand All @@ -127,17 +140,20 @@ pub struct StreamReadFlushed {
/// The amount that was read into the provided buffer
#[measure("committed", Bytes)]
#[counter("committed.total", Bytes)]
#[measure_counter("committed.conn", Bytes)]
committed_len: usize,

/// The amount of time it took to process the read request
///
/// Note that this includes both any syscall and decryption overhead
#[measure("processing_duration", Duration)]
#[measure_counter("processing_duration.conn", Duration)]
processing_duration: core::time::Duration,
}

#[event("stream:read_fin_flushed")]
#[checkpoint("latency")]
#[measure_counter("conn")]
pub struct StreamReadFinFlushed {
/// The number of bytes that the application tried to read
#[measure("capacity", Bytes)]
Expand All @@ -147,6 +163,7 @@ pub struct StreamReadFinFlushed {
///
/// Note that this includes both any syscall and decryption overhead
#[measure("processing_duration", Duration)]
#[measure_counter("processing_duration.conn", Duration)]
processing_duration: core::time::Duration,
}

Expand All @@ -161,6 +178,7 @@ pub struct StreamReadBlocked {
///
/// Note that this includes both any syscall and decryption overhead
#[measure("processing_duration", Duration)]
#[measure_counter("processing_duration.conn", Duration)]
processing_duration: core::time::Duration,
}

Expand All @@ -175,6 +193,7 @@ pub struct StreamReadErrored {
///
/// Note that this includes both any syscall and decryption overhead
#[measure("processing_duration", Duration)]
#[measure_counter("processing_duration.conn", Duration)]
processing_duration: core::time::Duration,

/// The system `errno` from the returned error
Expand All @@ -190,6 +209,7 @@ pub struct StreamReadShutdown {
}

#[event("stream:read_socket_flushed")]
#[measure_counter("conn")]
pub struct StreamReadSocketFlushed {
/// The number of bytes that the stream tried to read from the socket
#[measure("capacity", Bytes)]
Expand All @@ -198,10 +218,12 @@ pub struct StreamReadSocketFlushed {
/// The amount that was read into the provided buffer
#[measure("committed", Bytes)]
#[counter("committed.total", Bytes)]
#[measure_counter("committed.conn", Bytes)]
committed_len: usize,
}

#[event("stream:read_socket_blocked")]
#[measure_counter("conn")]
pub struct StreamReadSocketBlocked {
/// The number of bytes that the stream tried to read from the socket
#[measure("capacity", Bytes)]
Expand All @@ -217,3 +239,8 @@ pub struct StreamReadSocketErrored {
/// The system `errno` from the returned error
errno: Option<i32>,
}

// NOTE - This event MUST come last, since connection-level aggregation depends on it
#[event("connection:closed")]
// #[checkpoint("latency")]
pub struct ConnectionClosed {}
Loading

0 comments on commit daf7802

Please sign in to comment.