From 9190ca55118424e202af53e9927bf60d0560c8b2 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Thu, 12 Sep 2024 21:25:06 +0200 Subject: [PATCH] Add Client stats Signed-off-by: Tomasz Pietrek --- .config/nats.dic | 2 + async-nats/src/client.rs | 40 ++++++++++++++++++++ async-nats/src/connector.rs | 10 ++++- async-nats/src/header.rs | 4 ++ async-nats/src/lib.rs | 52 ++++++++++++++++++++++++- async-nats/tests/client_tests.rs | 65 ++++++++++++++++++++++++++++++++ 6 files changed, 170 insertions(+), 3 deletions(-) diff --git a/.config/nats.dic b/.config/nats.dic index fec16ee97..52c7cfea1 100644 --- a/.config/nats.dic +++ b/.config/nats.dic @@ -156,3 +156,5 @@ create_consumer_strict_on_stream leafnodes get_stream get_stream_no_info +lifecycle +AtomicU64 diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index 4ff1140c8..f64b11717 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -83,6 +83,7 @@ pub struct Client { inbox_prefix: Arc, request_timeout: Option, max_payload: Arc, + connection_stats: Arc, } impl Sink for Client { @@ -108,6 +109,7 @@ impl Sink for Client { } impl Client { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( info: tokio::sync::watch::Receiver, state: tokio::sync::watch::Receiver, @@ -116,6 +118,7 @@ impl Client { inbox_prefix: String, request_timeout: Option, max_payload: Arc, + statistics: Arc, ) -> Client { let poll_sender = PollSender::new(sender.clone()); Client { @@ -128,6 +131,7 @@ impl Client { inbox_prefix: inbox_prefix.into(), request_timeout, max_payload, + connection_stats: statistics, } } @@ -649,6 +653,26 @@ impl Client { .await .map_err(Into::into) } + + /// Returns struct representing statistics of the whole lifecycle of the client. + /// This includes number of bytes sent/received, number of messages sent/received, + /// and number of times the connection was established. + /// As this returns [Arc] with [AtomicU64] fields, it can be safely reused and shared + /// across threads. + /// + /// # Examples + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// let client = async_nats::connect("demo.nats.io").await?; + /// let statistics = client.statistics(); + /// println!("client statistics: {:#?}", statistics); + /// # Ok(()) + /// # } + /// ``` + pub fn statistics(&self) -> Arc { + self.connection_stats.clone() + } } /// Used for building customized requests. @@ -826,3 +850,19 @@ impl Display for FlushErrorKind { } pub type FlushError = Error; + +/// Represents statistics for the instance of the client throughout its lifecycle. +#[derive(Default, Debug)] +pub struct Statistics { + /// Number of bytes received. This does not include the protocol overhead. + pub in_bytes: AtomicU64, + /// Number of bytes sent. This doe not include the protocol overhead. + pub out_bytes: AtomicU64, + /// Number of messages received. + pub in_messages: AtomicU64, + /// Number of messages sent. + pub out_messages: AtomicU64, + /// Number of times connection was established. + /// Initial connect will be counted as well, then all successful reconnects. + pub connects: AtomicU64, +} diff --git a/async-nats/src/connector.rs b/async-nats/src/connector.rs index 87112c038..7bf54307a 100644 --- a/async-nats/src/connector.rs +++ b/async-nats/src/connector.rs @@ -12,6 +12,7 @@ // limitations under the License. use crate::auth::Auth; +use crate::client::Statistics; use crate::connection::Connection; use crate::connection::State; use crate::options::CallbackArg1; @@ -40,6 +41,7 @@ use std::cmp; use std::io; use std::path::PathBuf; use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use tokio::net::TcpStream; @@ -70,6 +72,7 @@ pub(crate) struct Connector { /// A map of servers and number of connect attempts. servers: Vec<(ServerAddr, usize)>, options: ConnectorOptions, + pub(crate) connect_stats: Arc, attempts: usize, pub(crate) events_tx: tokio::sync::mpsc::Sender, pub(crate) state_tx: tokio::sync::watch::Sender, @@ -93,6 +96,7 @@ impl Connector { events_tx: tokio::sync::mpsc::Sender, state_tx: tokio::sync::watch::Sender, max_payload: Arc, + connect_stats: Arc, ) -> Result { let servers = addrs.to_server_addrs()?.map(|addr| (addr, 0)).collect(); @@ -103,13 +107,16 @@ impl Connector { events_tx, state_tx, max_payload, + connect_stats, }) } pub(crate) async fn connect(&mut self) -> Result<(ServerInfo, Connection), ConnectError> { loop { match self.try_connect().await { - Ok(inner) => return Ok(inner), + Ok(inner) => { + return Ok(inner); + } Err(error) => match error.kind() { ConnectErrorKind::MaxReconnects => { return Err(ConnectError::with_source( @@ -284,6 +291,7 @@ impl Connector { Some(_) => { tracing::debug!("connected to {}", server_info.port); self.attempts = 0; + self.connect_stats.connects.add(1, Ordering::Relaxed); self.events_tx.send(Event::Connected).await.ok(); self.state_tx.send(State::Connected).ok(); self.max_payload.store( diff --git a/async-nats/src/header.rs b/async-nats/src/header.rs index 2dec615ed..cc595cb42 100644 --- a/async-nats/src/header.rs +++ b/async-nats/src/header.rs @@ -110,6 +110,10 @@ impl HeaderMap { pub fn is_empty(&self) -> bool { self.inner.is_empty() } + + pub fn len(&self) -> usize { + self.inner.len() + } } impl HeaderMap { diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 3981f172c..b6079c005 100755 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -213,6 +213,7 @@ use std::pin::Pin; use std::slice; use std::str::{self, FromStr}; use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::task::{Context, Poll}; use tokio::io::ErrorKind; @@ -251,7 +252,9 @@ mod connector; mod options; pub use auth::Auth; -pub use client::{Client, PublishError, Request, RequestError, RequestErrorKind, SubscribeError}; +pub use client::{ + Client, PublishError, Request, RequestError, RequestErrorKind, Statistics, SubscribeError, +}; pub use options::{AuthError, ConnectOptions}; mod crypto; @@ -667,6 +670,15 @@ impl ConnectionHandler { description, length, } => { + self.connector + .connect_stats + .in_messages + .add(1, Ordering::Relaxed); + self.connector + .connect_stats + .in_bytes + .add(length as u64, Ordering::Relaxed); + if let Some(subscription) = self.subscriptions.get_mut(&sid) { let message: Message = Message { subject, @@ -796,6 +808,11 @@ impl ConnectionHandler { } => { let (prefix, token) = respond.rsplit_once('.').expect("malformed request subject"); + let header_len = headers + .as_ref() + .map(|headers| headers.len()) + .unwrap_or_default(); + let multiplexer = if let Some(multiplexer) = self.multiplexer.as_mut() { multiplexer } else { @@ -814,13 +831,23 @@ impl ConnectionHandler { senders: HashMap::new(), }) }; + self.connector + .connect_stats + .out_messages + .add(1, Ordering::Relaxed); multiplexer.senders.insert(token.to_owned(), sender); + let respond: Subject = format!("{}{}", multiplexer.prefix, token).into(); + + self.connector.connect_stats.out_bytes.add( + (payload.len() + respond.len() + subject.len() + header_len) as u64, + Ordering::Relaxed, + ); let pub_op = ClientOp::Publish { subject, payload, - respond: Some(format!("{}{}", multiplexer.prefix, token).into()), + respond: Some(respond), headers, }; @@ -833,6 +860,24 @@ impl ConnectionHandler { reply: respond, headers, }) => { + self.connector + .connect_stats + .out_messages + .add(1, Ordering::Relaxed); + + let header_len = headers + .as_ref() + .map(|headers| headers.len()) + .unwrap_or_default(); + + self.connector.connect_stats.out_bytes.add( + (payload.len() + + respond.as_ref().map_or_else(|| 0, |r| r.len()) + + subject.len() + + header_len) as u64, + Ordering::Relaxed, + ); + self.connection.enqueue_write_op(&ClientOp::Publish { subject, payload, @@ -907,6 +952,7 @@ pub async fn connect_with_options( let (state_tx, state_rx) = tokio::sync::watch::channel(State::Pending); // We're setting it to the default server payload size. let max_payload = Arc::new(AtomicUsize::new(1024 * 1024)); + let statistics = Arc::new(Statistics::default()); let mut connector = Connector::new( addrs, @@ -931,6 +977,7 @@ pub async fn connect_with_options( events_tx, state_tx, max_payload.clone(), + statistics.clone(), ) .map_err(|err| ConnectError::with_source(ConnectErrorKind::ServerParse, err))?; @@ -954,6 +1001,7 @@ pub async fn connect_with_options( options.inbox_prefix, options.request_timeout, max_payload, + statistics, ); task::spawn(async move { diff --git a/async-nats/tests/client_tests.rs b/async-nats/tests/client_tests.rs index f125e9765..b7275a24f 100644 --- a/async-nats/tests/client_tests.rs +++ b/async-nats/tests/client_tests.rs @@ -22,6 +22,7 @@ mod client { use futures::stream::StreamExt; use std::path::PathBuf; use std::str::FromStr; + use std::sync::atomic::Ordering; use std::time::Duration; #[tokio::test] @@ -931,4 +932,68 @@ mod client { .await .unwrap(); } + + #[tokio::test] + async fn client_statistics() { + let server = nats_server::run_basic_server(); + + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + let client = async_nats::ConnectOptions::new() + .event_callback(move |event| { + let tx = tx.clone(); + async move { + if let Event::Connected = event { + tx.send(()).await.unwrap(); + } + } + }) + .connect(server.client_url()) + .await + .unwrap(); + + tokio::time::timeout(Duration::from_secs(5), rx.recv()) + .await + .unwrap() + .unwrap(); + let stats = client.statistics(); + + assert_eq!(stats.in_messages.load(Ordering::Relaxed), 0); + assert_eq!(stats.out_messages.load(Ordering::Relaxed), 0); + assert_eq!(stats.in_bytes.load(Ordering::Relaxed), 0); + assert_eq!(stats.out_bytes.load(Ordering::Relaxed), 0); + assert_eq!(stats.connects.load(Ordering::Relaxed), 1); + + let mut responder = client.subscribe("request").await.unwrap(); + tokio::task::spawn({ + let client = client.clone(); + async move { + let msg = responder.next().await.unwrap(); + client + .publish(msg.reply.unwrap(), "response".into()) + .await + .unwrap(); + } + }); + client.request("request", "data".into()).await.unwrap(); + + let mut sub = client.subscribe("test").await.unwrap(); + client.publish("test", "data".into()).await.unwrap(); + client.publish("test", "data".into()).await.unwrap(); + sub.next().await.unwrap(); + sub.next().await.unwrap(); + + client.flush().await.unwrap(); + client.force_reconnect().await.unwrap(); + + tokio::time::timeout(Duration::from_secs(5), rx.recv()) + .await + .unwrap() + .unwrap(); + + assert_eq!(stats.in_messages.load(Ordering::Relaxed), 4); + assert_eq!(stats.out_messages.load(Ordering::Relaxed), 4); + assert_eq!(stats.in_bytes.load(Ordering::Relaxed), 139); + assert_eq!(stats.out_bytes.load(Ordering::Relaxed), 139); + assert_eq!(stats.connects.load(Ordering::Relaxed), 2); + } }