Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Client stats #1314

Merged
merged 6 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .config/nats.dic
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,5 @@ create_consumer_strict_on_stream
leafnodes
get_stream
get_stream_no_info
lifecycle
AtomicU64
40 changes: 40 additions & 0 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub struct Client {
inbox_prefix: Arc<str>,
request_timeout: Option<Duration>,
max_payload: Arc<AtomicUsize>,
connection_stats: Arc<Statistics>,
}

impl Sink<PublishMessage> for Client {
Expand All @@ -108,6 +109,7 @@ impl Sink<PublishMessage> for Client {
}

impl Client {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
info: tokio::sync::watch::Receiver<ServerInfo>,
state: tokio::sync::watch::Receiver<State>,
Expand All @@ -116,6 +118,7 @@ impl Client {
inbox_prefix: String,
request_timeout: Option<Duration>,
max_payload: Arc<AtomicUsize>,
connection_stats: Arc<Statistics>,
Jarema marked this conversation as resolved.
Show resolved Hide resolved
) -> Client {
let poll_sender = PollSender::new(sender.clone());
Client {
Expand All @@ -128,6 +131,7 @@ impl Client {
inbox_prefix: inbox_prefix.into(),
request_timeout,
max_payload,
connection_stats,
}
}

Expand Down Expand Up @@ -649,6 +653,26 @@ impl Client {
.await
.map_err(Into::into)
}

/// Returns struct representing statistics of the whole lifecycle of the client.
/// This includes number of bytes sent/received, number of messages sent/received,
/// and number of times the connection was established.
/// As this returns [Arc] with [AtomicU64] fields, it can be safely reused and shared
/// across threads.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let statistics = client.statistics();
/// println!("client statistics: {:#?}", statistics);
/// # Ok(())
/// # }
/// ```
pub fn statistics(&self) -> Arc<Statistics> {
self.connection_stats.clone()
}
}

/// Used for building customized requests.
Expand Down Expand Up @@ -826,3 +850,19 @@ impl Display for FlushErrorKind {
}

pub type FlushError = Error<FlushErrorKind>;

/// Represents statistics for the instance of the client throughout its lifecycle.
#[derive(Default, Debug)]
pub struct Statistics {
/// Number of bytes received. This does not include the protocol overhead.
pub in_bytes: AtomicU64,
/// Number of bytes sent. This doe not include the protocol overhead.
pub out_bytes: AtomicU64,
/// Number of messages received.
pub in_messages: AtomicU64,
/// Number of messages sent.
pub out_messages: AtomicU64,
/// Number of times connection was established.
/// Initial connect will be counted as well, then all successful reconnects.
pub connects: AtomicU64,
}
10 changes: 9 additions & 1 deletion async-nats/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// limitations under the License.

use crate::auth::Auth;
use crate::client::Statistics;
use crate::connection::Connection;
use crate::connection::State;
use crate::options::CallbackArg1;
Expand Down Expand Up @@ -40,6 +41,7 @@ use std::cmp;
use std::io;
use std::path::PathBuf;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
Expand Down Expand Up @@ -70,6 +72,7 @@ pub(crate) struct Connector {
/// A map of servers and number of connect attempts.
servers: Vec<(ServerAddr, usize)>,
options: ConnectorOptions,
pub(crate) connect_stats: Arc<Statistics>,
attempts: usize,
pub(crate) events_tx: tokio::sync::mpsc::Sender<Event>,
pub(crate) state_tx: tokio::sync::watch::Sender<State>,
Expand All @@ -93,6 +96,7 @@ impl Connector {
events_tx: tokio::sync::mpsc::Sender<Event>,
state_tx: tokio::sync::watch::Sender<State>,
max_payload: Arc<AtomicUsize>,
connect_stats: Arc<Statistics>,
) -> Result<Connector, io::Error> {
let servers = addrs.to_server_addrs()?.map(|addr| (addr, 0)).collect();

Expand All @@ -103,13 +107,16 @@ impl Connector {
events_tx,
state_tx,
max_payload,
connect_stats,
})
}

pub(crate) async fn connect(&mut self) -> Result<(ServerInfo, Connection), ConnectError> {
loop {
match self.try_connect().await {
Ok(inner) => return Ok(inner),
Ok(inner) => {
return Ok(inner);
}
Err(error) => match error.kind() {
ConnectErrorKind::MaxReconnects => {
return Err(ConnectError::with_source(
Expand Down Expand Up @@ -284,6 +291,7 @@ impl Connector {
Some(_) => {
tracing::debug!("connected to {}", server_info.port);
self.attempts = 0;
self.connect_stats.connects.add(1, Ordering::Relaxed);
self.events_tx.send(Event::Connected).await.ok();
self.state_tx.send(State::Connected).ok();
self.max_payload.store(
Expand Down
4 changes: 4 additions & 0 deletions async-nats/src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ impl HeaderMap {
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}

pub fn len(&self) -> usize {
self.inner.len()
}
}

impl HeaderMap {
Expand Down
52 changes: 50 additions & 2 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ use std::pin::Pin;
use std::slice;
use std::str::{self, FromStr};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::ErrorKind;
Expand Down Expand Up @@ -251,7 +252,9 @@ mod connector;
mod options;

pub use auth::Auth;
pub use client::{Client, PublishError, Request, RequestError, RequestErrorKind, SubscribeError};
pub use client::{
Client, PublishError, Request, RequestError, RequestErrorKind, Statistics, SubscribeError,
};
pub use options::{AuthError, ConnectOptions};

mod crypto;
Expand Down Expand Up @@ -667,6 +670,15 @@ impl ConnectionHandler {
description,
length,
} => {
self.connector
.connect_stats
.in_messages
.add(1, Ordering::Relaxed);
self.connector
.connect_stats
.in_bytes
.add(length as u64, Ordering::Relaxed);

if let Some(subscription) = self.subscriptions.get_mut(&sid) {
let message: Message = Message {
subject,
Expand Down Expand Up @@ -796,6 +808,11 @@ impl ConnectionHandler {
} => {
let (prefix, token) = respond.rsplit_once('.').expect("malformed request subject");

let header_len = headers
.as_ref()
.map(|headers| headers.len())
.unwrap_or_default();

let multiplexer = if let Some(multiplexer) = self.multiplexer.as_mut() {
multiplexer
} else {
Expand All @@ -814,13 +831,23 @@ impl ConnectionHandler {
senders: HashMap::new(),
})
};
self.connector
.connect_stats
.out_messages
.add(1, Ordering::Relaxed);

multiplexer.senders.insert(token.to_owned(), sender);

let respond: Subject = format!("{}{}", multiplexer.prefix, token).into();

self.connector.connect_stats.out_bytes.add(
(payload.len() + respond.len() + subject.len() + header_len) as u64,
Ordering::Relaxed,
);
let pub_op = ClientOp::Publish {
subject,
payload,
respond: Some(format!("{}{}", multiplexer.prefix, token).into()),
respond: Some(respond),
headers,
};

Expand All @@ -833,6 +860,24 @@ impl ConnectionHandler {
reply: respond,
headers,
}) => {
self.connector
.connect_stats
.out_messages
.add(1, Ordering::Relaxed);

let header_len = headers
.as_ref()
.map(|headers| headers.len())
.unwrap_or_default();

self.connector.connect_stats.out_bytes.add(
(payload.len()
+ respond.as_ref().map_or_else(|| 0, |r| r.len())
+ subject.len()
+ header_len) as u64,
Ordering::Relaxed,
);

self.connection.enqueue_write_op(&ClientOp::Publish {
subject,
payload,
Expand Down Expand Up @@ -907,6 +952,7 @@ pub async fn connect_with_options<A: ToServerAddrs>(
let (state_tx, state_rx) = tokio::sync::watch::channel(State::Pending);
// We're setting it to the default server payload size.
let max_payload = Arc::new(AtomicUsize::new(1024 * 1024));
let connection_stats = Arc::new(Statistics::default());

let mut connector = Connector::new(
addrs,
Expand All @@ -931,6 +977,7 @@ pub async fn connect_with_options<A: ToServerAddrs>(
events_tx,
state_tx,
max_payload.clone(),
connection_stats.clone(),
)
.map_err(|err| ConnectError::with_source(ConnectErrorKind::ServerParse, err))?;

Expand All @@ -954,6 +1001,7 @@ pub async fn connect_with_options<A: ToServerAddrs>(
options.inbox_prefix,
options.request_timeout,
max_payload,
connection_stats,
);

task::spawn(async move {
Expand Down
65 changes: 65 additions & 0 deletions async-nats/tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod client {
use futures::stream::StreamExt;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::time::Duration;

#[tokio::test]
Expand Down Expand Up @@ -931,4 +932,68 @@ mod client {
.await
.unwrap();
}

#[tokio::test]
async fn client_statistics() {
let server = nats_server::run_basic_server();

let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let client = async_nats::ConnectOptions::new()
.event_callback(move |event| {
let tx = tx.clone();
async move {
if let Event::Connected = event {
tx.send(()).await.unwrap();
}
}
})
.connect(server.client_url())
.await
.unwrap();

tokio::time::timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();
let stats = client.statistics();

assert_eq!(stats.in_messages.load(Ordering::Relaxed), 0);
assert_eq!(stats.out_messages.load(Ordering::Relaxed), 0);
assert_eq!(stats.in_bytes.load(Ordering::Relaxed), 0);
assert_eq!(stats.out_bytes.load(Ordering::Relaxed), 0);
assert_eq!(stats.connects.load(Ordering::Relaxed), 1);

let mut responder = client.subscribe("request").await.unwrap();
tokio::task::spawn({
let client = client.clone();
async move {
let msg = responder.next().await.unwrap();
client
.publish(msg.reply.unwrap(), "response".into())
.await
.unwrap();
}
});
client.request("request", "data".into()).await.unwrap();

let mut sub = client.subscribe("test").await.unwrap();
client.publish("test", "data".into()).await.unwrap();
client.publish("test", "data".into()).await.unwrap();
sub.next().await.unwrap();
sub.next().await.unwrap();

client.flush().await.unwrap();
client.force_reconnect().await.unwrap();

tokio::time::timeout(Duration::from_secs(5), rx.recv())
.await
.unwrap()
.unwrap();

assert_eq!(stats.in_messages.load(Ordering::Relaxed), 4);
assert_eq!(stats.out_messages.load(Ordering::Relaxed), 4);
assert_eq!(stats.in_bytes.load(Ordering::Relaxed), 139);
assert_eq!(stats.out_bytes.load(Ordering::Relaxed), 139);
assert_eq!(stats.connects.load(Ordering::Relaxed), 2);
}
}
Loading