From 40cab0bd3a7300f44ce7b696c03feb7e202d35c7 Mon Sep 17 00:00:00 2001 From: Kolby Moroz Liebl <31669092+KolbyML@users.noreply.github.com> Date: Mon, 20 Jan 2025 23:29:35 -0700 Subject: [PATCH] feat: add uTP duration metrics (#1646) --- crates/metrics/src/overlay.rs | 30 ++++++++++++++-- crates/portalnet/src/lib.rs | 2 +- crates/portalnet/src/overlay/protocol.rs | 2 +- crates/portalnet/src/overlay/request.rs | 9 +++-- crates/portalnet/src/overlay/service.rs | 17 +++++---- crates/portalnet/src/put_content.rs | 2 +- .../{utp_controller.rs => utp/controller.rs} | 35 ++++++++++++++----- crates/portalnet/src/utp/mod.rs | 2 ++ crates/portalnet/src/utp/timed_semaphore.rs | 17 +++++++++ 9 files changed, 88 insertions(+), 28 deletions(-) rename crates/portalnet/src/{utp_controller.rs => utp/controller.rs} (87%) create mode 100644 crates/portalnet/src/utp/mod.rs create mode 100644 crates/portalnet/src/utp/timed_semaphore.rs diff --git a/crates/metrics/src/overlay.rs b/crates/metrics/src/overlay.rs index 94b0036c7..44680594a 100644 --- a/crates/metrics/src/overlay.rs +++ b/crates/metrics/src/overlay.rs @@ -2,12 +2,16 @@ use ethportal_api::types::portal_wire::{Request, Response}; use prometheus_exporter::{ self, prometheus::{ - opts, register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, + histogram_opts, opts, register_histogram_vec_with_registry, + register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, HistogramVec, IntCounterVec, IntGaugeVec, Registry, }, }; -use crate::labels::{MessageDirectionLabel, MessageLabel, UtpDirectionLabel, UtpOutcomeLabel}; +use crate::{ + labels::{MessageDirectionLabel, MessageLabel, UtpDirectionLabel, UtpOutcomeLabel}, + timer::DiscardOnDropHistogramTimer, +}; /// Contains metrics reporters for use in the overlay network /// (eg. `portalnet/src/overlay.rs` & `portalnet/src/overlay_service.rs`). @@ -18,6 +22,7 @@ pub struct OverlayMetrics { pub message_total: IntCounterVec, pub utp_outcome_total: IntCounterVec, pub utp_active_gauge: IntGaugeVec, + pub utp_connection_duration: HistogramVec, pub validation_total: IntCounterVec, } @@ -47,6 +52,14 @@ impl OverlayMetrics { &["protocol", "direction"], registry )?; + let utp_connection_duration = register_histogram_vec_with_registry!( + histogram_opts!( + "trin_utp_connection_duration", + "the time taken to complete a utp transfer" + ), + &["protocol", "direction"], + registry + )?; let validation_total = register_int_counter_vec_with_registry!( opts!( "trin_validation_total", @@ -59,6 +72,7 @@ impl OverlayMetrics { message_total, utp_outcome_total, utp_active_gauge, + utp_connection_duration, validation_total, }) } @@ -157,6 +171,18 @@ impl OverlayMetricsReporter { .dec(); } + pub fn start_utp_process_timer( + &self, + direction: UtpDirectionLabel, + ) -> DiscardOnDropHistogramTimer { + DiscardOnDropHistogramTimer::new( + self.overlay_metrics + .utp_connection_duration + .with_label_values(&[&self.protocol, direction.into()]) + .clone(), + ) + } + // // Validations // diff --git a/crates/portalnet/src/lib.rs b/crates/portalnet/src/lib.rs index 5f3e23da3..a0749a0be 100644 --- a/crates/portalnet/src/lib.rs +++ b/crates/portalnet/src/lib.rs @@ -13,4 +13,4 @@ pub mod put_content; pub mod socket; pub mod types; pub mod utils; -pub mod utp_controller; +pub mod utp; diff --git a/crates/portalnet/src/overlay/protocol.rs b/crates/portalnet/src/overlay/protocol.rs index 03d49085e..1a2b3003c 100644 --- a/crates/portalnet/src/overlay/protocol.rs +++ b/crates/portalnet/src/overlay/protocol.rs @@ -59,7 +59,7 @@ use crate::{ kbucket::{Entry, SharedKBucketsTable}, node::Node, }, - utp_controller::UtpController, + utp::controller::UtpController, }; /// Overlay protocol is a layer on top of discv5 that handles all requests from the overlay networks diff --git a/crates/portalnet/src/overlay/request.rs b/crates/portalnet/src/overlay/request.rs index 57ba71cd8..1dfef8f2e 100644 --- a/crates/portalnet/src/overlay/request.rs +++ b/crates/portalnet/src/overlay/request.rs @@ -6,10 +6,9 @@ use ethportal_api::types::{ portal_wire::{Request, Response}, }; use futures::channel::oneshot; -use tokio::sync::OwnedSemaphorePermit; use super::errors::OverlayRequestError; -use crate::find::query_pool::QueryId; +use crate::{find::query_pool::QueryId, utp::timed_semaphore::OwnedTimedSemaphorePermit}; /// An incoming or outgoing request. #[derive(Debug, PartialEq)] @@ -44,7 +43,7 @@ pub struct OverlayRequest { /// Will be None for requests that are not associated with a query. pub query_id: Option, /// An optional permit to allow for transfer caps - pub request_permit: Option, + pub request_permit: Option, } impl OverlayRequest { @@ -54,7 +53,7 @@ impl OverlayRequest { direction: RequestDirection, responder: Option, query_id: Option, - request_permit: Option, + request_permit: Option, ) -> Self { OverlayRequest { id: rand::random(), @@ -77,7 +76,7 @@ pub struct ActiveOutgoingRequest { /// An optional QueryID for the query that this request is associated with. pub query_id: Option, /// An optional permit to allow for transfer caps - pub request_permit: Option, + pub request_permit: Option, } /// A response for a particular overlay request. diff --git a/crates/portalnet/src/overlay/service.rs b/crates/portalnet/src/overlay/service.rs index f7ac2909b..dbc1ba2cb 100644 --- a/crates/portalnet/src/overlay/service.rs +++ b/crates/portalnet/src/overlay/service.rs @@ -44,7 +44,6 @@ use tokio::{ sync::{ broadcast, mpsc::{self, UnboundedReceiver, UnboundedSender}, - OwnedSemaphorePermit, }, task::JoinHandle, }; @@ -85,7 +84,7 @@ use crate::{ node::Node, }, utils::portal_wire, - utp_controller::UtpController, + utp::{controller::UtpController, timed_semaphore::OwnedTimedSemaphorePermit}, }; pub const FIND_NODES_MAX_NODES: usize = 32; @@ -1031,7 +1030,7 @@ impl< let utp = Arc::clone(&self.utp_controller); tokio::spawn(async move { utp.accept_outbound_stream(cid, &content).await; - drop(permit); + permit.drop(); }); // Connection id is sent as BE because uTP header values are stored also as BE @@ -1218,7 +1217,7 @@ impl< }) .collect(); let _ = join_all(handles).await; - drop(permit); + permit.drop(); return; } }; @@ -1242,7 +1241,7 @@ impl< }) .collect(); let _ = join_all(handles).await; - drop(permit); + permit.drop(); return; } }; @@ -1302,7 +1301,7 @@ impl< Some(utp_processing.utp_controller), ); // explicitly drop semaphore permit in thread so the permit is moved into the thread - drop(permit); + permit.drop(); }); let accept = Accept { @@ -1433,7 +1432,7 @@ impl< source: Enr, request: Request, query_id: Option, - request_permit: Option, + request_permit: Option, ) { // If the node is present in the routing table, but the node is not connected, then // use the existing entry's value and direction. Otherwise, build a new entry from @@ -1493,7 +1492,7 @@ impl< response: Accept, enr: Enr, offer: Request, - request_permit: Option, + request_permit: Option, ) -> anyhow::Result { // Check that a valid triggering request was sent let mut gossip_result_tx = None; @@ -1593,7 +1592,7 @@ impl< } // explicitly drop permit in the thread so the permit is included in the thread if let Some(permit) = request_permit { - drop(permit); + permit.drop(); } }); diff --git a/crates/portalnet/src/put_content.rs b/crates/portalnet/src/put_content.rs index c66076699..30c8f72c5 100644 --- a/crates/portalnet/src/put_content.rs +++ b/crates/portalnet/src/put_content.rs @@ -22,7 +22,7 @@ use crate::{ request::{OverlayRequest, RequestDirection}, }, types::kbucket::SharedKBucketsTable, - utp_controller::UtpController, + utp::controller::UtpController, }; /// Datatype to store the result of a put content request. diff --git a/crates/portalnet/src/utp_controller.rs b/crates/portalnet/src/utp/controller.rs similarity index 87% rename from crates/portalnet/src/utp_controller.rs rename to crates/portalnet/src/utp/controller.rs index 988f93c54..dfe29e556 100644 --- a/crates/portalnet/src/utp_controller.rs +++ b/crates/portalnet/src/utp/controller.rs @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration}; use anyhow::anyhow; use bytes::Bytes; use lazy_static::lazy_static; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tokio::sync::Semaphore; use tracing::debug; use trin_metrics::{ labels::{UtpDirectionLabel, UtpOutcomeLabel}, @@ -11,6 +11,7 @@ use trin_metrics::{ }; use utp_rs::{cid::ConnectionId, conn::ConnectionConfig, socket::UtpSocket}; +use super::timed_semaphore::OwnedTimedSemaphorePermit; use crate::discovery::UtpEnr; /// UtpController is meant to be a container which contains all code related to/for managing uTP /// streams We are implementing this because we want the utils of controlling uTP connection to be @@ -65,29 +66,45 @@ impl UtpController { } /// Non-blocking method to try and acquire a permit for an outbound uTP transfer. - // `try_acquire_owned()` isn't blocking and will instantly return with - // `Some(TryAcquireError::NoPermits)` error if there isn't a permit available - pub fn get_outbound_semaphore(&self) -> Option { + /// `try_acquire_owned()` isn't blocking and will instantly return with + /// `Some(TryAcquireError::NoPermits)` error if there isn't a permit available + pub fn get_outbound_semaphore(&self) -> Option { match self .outbound_utp_transfer_semaphore .clone() .try_acquire_owned() { - Ok(permit) => Some(permit), + Ok(permit) => { + let histogram_timer = self + .metrics + .start_utp_process_timer(UtpDirectionLabel::Outbound); + Some(OwnedTimedSemaphorePermit { + permit, + histogram_timer, + }) + } Err(_) => None, } } /// Non-blocking method to try and acquire a permit for an inbound uTP transfer. - // `try_acquire_owned()` isn't blocking and will instantly return with - // `Some(TryAcquireError::NoPermits)` error if there isn't a permit available - pub fn get_inbound_semaphore(&self) -> Option { + /// `try_acquire_owned()` isn't blocking and will instantly return with + /// `Some(TryAcquireError::NoPermits)` error if there isn't a permit available + pub fn get_inbound_semaphore(&self) -> Option { match self .inbound_utp_transfer_semaphore .clone() .try_acquire_owned() { - Ok(permit) => Some(permit), + Ok(permit) => { + let histogram_timer = self + .metrics + .start_utp_process_timer(UtpDirectionLabel::Inbound); + Some(OwnedTimedSemaphorePermit { + permit, + histogram_timer, + }) + } Err(_) => None, } } diff --git a/crates/portalnet/src/utp/mod.rs b/crates/portalnet/src/utp/mod.rs new file mode 100644 index 000000000..3b5c7e84c --- /dev/null +++ b/crates/portalnet/src/utp/mod.rs @@ -0,0 +1,2 @@ +pub mod controller; +pub mod timed_semaphore; diff --git a/crates/portalnet/src/utp/timed_semaphore.rs b/crates/portalnet/src/utp/timed_semaphore.rs new file mode 100644 index 000000000..ca478117c --- /dev/null +++ b/crates/portalnet/src/utp/timed_semaphore.rs @@ -0,0 +1,17 @@ +use tokio::sync::OwnedSemaphorePermit; +use trin_metrics::timer::DiscardOnDropHistogramTimer; + +/// A owned semaphore which records the time it has been alive from initialization to when drop() is +/// called. +#[derive(Debug)] +pub struct OwnedTimedSemaphorePermit { + pub permit: OwnedSemaphorePermit, + pub histogram_timer: DiscardOnDropHistogramTimer, +} + +impl OwnedTimedSemaphorePermit { + pub fn drop(self) { + self.histogram_timer.stop_and_record(); + drop(self.permit); + } +}