From 5568bd96dc9c62279ab1d744b082e48cf8938072 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Wed, 11 Sep 2024 18:37:22 +0200 Subject: [PATCH 1/6] Add connection stats Signed-off-by: Tomasz Pietrek --- async-nats/src/client.rs | 8 ++++++ async-nats/src/connector.rs | 19 +++++++++++++- async-nats/src/header.rs | 4 +++ async-nats/src/lib.rs | 45 ++++++++++++++++++++++++++++++++ async-nats/tests/client_tests.rs | 18 +++++++++++++ 5 files changed, 93 insertions(+), 1 deletion(-) diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index 4ff1140c8..6ccf10b76 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -15,6 +15,7 @@ use core::pin::Pin; use core::task::{Context, Poll}; use crate::connection::State; +use crate::connector::ConnectionStats; use crate::subject::ToSubject; use crate::{PublishMessage, ServerInfo}; @@ -83,6 +84,7 @@ pub struct Client { inbox_prefix: Arc, request_timeout: Option, max_payload: Arc, + connection_stats: Arc, } impl Sink for Client { @@ -116,6 +118,7 @@ impl Client { inbox_prefix: String, request_timeout: Option, max_payload: Arc, + connection_stats: Arc, ) -> Client { let poll_sender = PollSender::new(sender.clone()); Client { @@ -128,6 +131,7 @@ impl Client { inbox_prefix: inbox_prefix.into(), request_timeout, max_payload, + connection_stats, } } @@ -649,6 +653,10 @@ impl Client { .await .map_err(Into::into) } + + pub async fn statistics(&self) -> &ConnectionStats { + &self.connection_stats + } } /// Used for building customized requests. diff --git a/async-nats/src/connector.rs b/async-nats/src/connector.rs index 87112c038..2c6d545a8 100644 --- a/async-nats/src/connector.rs +++ b/async-nats/src/connector.rs @@ -34,12 +34,14 @@ use crate::LANG; use crate::VERSION; use base64::engine::general_purpose::URL_SAFE_NO_PAD; use base64::engine::Engine; +use portable_atomic::AtomicU64; use rand::seq::SliceRandom; use rand::thread_rng; use std::cmp; use std::io; use std::path::PathBuf; use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use tokio::net::TcpStream; @@ -65,11 +67,21 @@ pub(crate) struct ConnectorOptions { pub(crate) max_reconnects: Option, } +#[derive(Default, Debug)] +pub struct ConnectionStats { + pub in_bytes: AtomicU64, + pub out_bytes: AtomicU64, + pub in_msgs: AtomicU64, + pub out_msgs: AtomicU64, + pub reconnects: AtomicU64, +} + /// Maintains a list of servers and establishes connections. pub(crate) struct Connector { /// A map of servers and number of connect attempts. servers: Vec<(ServerAddr, usize)>, options: ConnectorOptions, + pub(crate) connect_stats: Arc, attempts: usize, pub(crate) events_tx: tokio::sync::mpsc::Sender, pub(crate) state_tx: tokio::sync::watch::Sender, @@ -93,6 +105,7 @@ impl Connector { events_tx: tokio::sync::mpsc::Sender, state_tx: tokio::sync::watch::Sender, max_payload: Arc, + connect_stats: Arc, ) -> Result { let servers = addrs.to_server_addrs()?.map(|addr| (addr, 0)).collect(); @@ -103,13 +116,17 @@ impl Connector { events_tx, state_tx, max_payload, + connect_stats, }) } pub(crate) async fn connect(&mut self) -> Result<(ServerInfo, Connection), ConnectError> { loop { match self.try_connect().await { - Ok(inner) => return Ok(inner), + Ok(inner) => { + self.connect_stats.reconnects.add(1, Ordering::Relaxed); + return Ok(inner); + } Err(error) => match error.kind() { ConnectErrorKind::MaxReconnects => { return Err(ConnectError::with_source( diff --git a/async-nats/src/header.rs b/async-nats/src/header.rs index 2dec615ed..cc595cb42 100644 --- a/async-nats/src/header.rs +++ b/async-nats/src/header.rs @@ -110,6 +110,10 @@ impl HeaderMap { pub fn is_empty(&self) -> bool { self.inner.is_empty() } + + pub fn len(&self) -> usize { + self.inner.len() + } } impl HeaderMap { diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 3981f172c..00850f9e9 100755 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -194,6 +194,7 @@ #![deny(rustdoc::invalid_rust_codeblocks)] #![cfg_attr(docsrs, feature(doc_auto_cfg))] +pub use connector::ConnectionStats; use thiserror::Error; use futures::stream::Stream; @@ -213,6 +214,7 @@ use std::pin::Pin; use std::slice; use std::str::{self, FromStr}; use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::task::{Context, Poll}; use tokio::io::ErrorKind; @@ -667,6 +669,15 @@ impl ConnectionHandler { description, length, } => { + self.connector + .connect_stats + .in_msgs + .add(1, Ordering::Relaxed); + self.connector + .connect_stats + .in_bytes + .add(length as u64, Ordering::Relaxed); + if let Some(subscription) = self.subscriptions.get_mut(&sid) { let message: Message = Message { subject, @@ -796,6 +807,15 @@ impl ConnectionHandler { } => { let (prefix, token) = respond.rsplit_once('.').expect("malformed request subject"); + let header_len = headers + .as_ref() + .map(|headers| headers.len()) + .unwrap_or_default(); + + self.connector.connect_stats.out_bytes.add( + (payload.len() + respond.len() + subject.len() + header_len) as u64, + Ordering::Relaxed, + ); let multiplexer = if let Some(multiplexer) = self.multiplexer.as_mut() { multiplexer } else { @@ -814,6 +834,10 @@ impl ConnectionHandler { senders: HashMap::new(), }) }; + self.connector + .connect_stats + .out_msgs + .add(1, Ordering::Relaxed); multiplexer.senders.insert(token.to_owned(), sender); @@ -833,6 +857,24 @@ impl ConnectionHandler { reply: respond, headers, }) => { + self.connector + .connect_stats + .out_msgs + .add(1, Ordering::Relaxed); + + let header_len = headers + .as_ref() + .map(|headers| headers.len()) + .unwrap_or_default(); + + self.connector.connect_stats.out_bytes.add( + (payload.len() + + respond.as_ref().map_or_else(|| 0, |r| r.len()) + + subject.len() + + header_len) as u64, + Ordering::Relaxed, + ); + self.connection.enqueue_write_op(&ClientOp::Publish { subject, payload, @@ -907,6 +949,7 @@ pub async fn connect_with_options( let (state_tx, state_rx) = tokio::sync::watch::channel(State::Pending); // We're setting it to the default server payload size. let max_payload = Arc::new(AtomicUsize::new(1024 * 1024)); + let connection_stats = Arc::new(ConnectionStats::default()); let mut connector = Connector::new( addrs, @@ -931,6 +974,7 @@ pub async fn connect_with_options( events_tx, state_tx, max_payload.clone(), + connection_stats.clone(), ) .map_err(|err| ConnectError::with_source(ConnectErrorKind::ServerParse, err))?; @@ -954,6 +998,7 @@ pub async fn connect_with_options( options.inbox_prefix, options.request_timeout, max_payload, + connection_stats, ); task::spawn(async move { diff --git a/async-nats/tests/client_tests.rs b/async-nats/tests/client_tests.rs index f125e9765..6c26b9800 100644 --- a/async-nats/tests/client_tests.rs +++ b/async-nats/tests/client_tests.rs @@ -931,4 +931,22 @@ mod client { .await .unwrap(); } + + #[tokio::test] + async fn client_statistics() { + let server = nats_server::run_basic_server(); + let client = async_nats::connect(server.client_url()).await.unwrap(); + let stats = client.statistics().await; + println!("{:#?}", stats); + + let mut sub = client.subscribe("test").await.unwrap(); + client.publish("test", "data".into()).await.unwrap(); + client.publish("test", "data".into()).await.unwrap(); + sub.next().await.unwrap(); + sub.next().await.unwrap(); + + client.flush().await.unwrap(); + let stats = client.statistics().await; + println!("{:#?}", stats); + } } From fc0ece3e513b57608bc23c1f0db02cf22b5bb4a0 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Thu, 12 Sep 2024 09:05:20 +0200 Subject: [PATCH 2/6] Decide to use atomics Signed-off-by: Tomasz Pietrek --- async-nats/src/client.rs | 41 ++++++++++++++++++++--- async-nats/src/connector.rs | 17 +++------- async-nats/src/lib.rs | 25 ++++++++------ async-nats/tests/client_tests.rs | 57 +++++++++++++++++++++++++++++--- 4 files changed, 106 insertions(+), 34 deletions(-) diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index 6ccf10b76..48a45dc50 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -15,7 +15,6 @@ use core::pin::Pin; use core::task::{Context, Poll}; use crate::connection::State; -use crate::connector::ConnectionStats; use crate::subject::ToSubject; use crate::{PublishMessage, ServerInfo}; @@ -84,7 +83,7 @@ pub struct Client { inbox_prefix: Arc, request_timeout: Option, max_payload: Arc, - connection_stats: Arc, + connection_stats: Arc, } impl Sink for Client { @@ -118,7 +117,7 @@ impl Client { inbox_prefix: String, request_timeout: Option, max_payload: Arc, - connection_stats: Arc, + connection_stats: Arc, ) -> Client { let poll_sender = PollSender::new(sender.clone()); Client { @@ -654,8 +653,24 @@ impl Client { .map_err(Into::into) } - pub async fn statistics(&self) -> &ConnectionStats { - &self.connection_stats + /// Returns struct representing statistics of the whole lifecycle of the client. + /// This includes number of bytes sent/received, number of messages sent/received, + /// and number of times the connection was established. + /// As this returns [Arc] with [AtomicU64] fields, it can be safely reused and shared + /// across threads. + /// + /// # Examples + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// let client = async_nats::connect("demo.nats.io").await?; + /// let statistics = client.statistics(); + /// println!("client statistics: {:$?}", statistics); + /// # Ok(()) + /// # } + /// ``` + pub fn statistics(&self) -> Arc { + self.connection_stats.clone() } } @@ -834,3 +849,19 @@ impl Display for FlushErrorKind { } pub type FlushError = Error; + +/// Represents statistics for the instance of the client throughout its lifecycle. +#[derive(Default, Debug)] +pub struct Statistics { + /// Number of bytes received. This does not include the protocol overhead. + pub in_bytes: AtomicU64, + /// Number of bytes sent. This doe not include the protocol overhead. + pub out_bytes: AtomicU64, + /// Number of messages received. + pub in_messages: AtomicU64, + /// Number of messages sent. + pub out_messages: AtomicU64, + /// Number of times connection was established. + /// Initial connect will be counted as well, then all successful reconnects. + pub connects: AtomicU64, +} diff --git a/async-nats/src/connector.rs b/async-nats/src/connector.rs index 2c6d545a8..7bf54307a 100644 --- a/async-nats/src/connector.rs +++ b/async-nats/src/connector.rs @@ -12,6 +12,7 @@ // limitations under the License. use crate::auth::Auth; +use crate::client::Statistics; use crate::connection::Connection; use crate::connection::State; use crate::options::CallbackArg1; @@ -34,7 +35,6 @@ use crate::LANG; use crate::VERSION; use base64::engine::general_purpose::URL_SAFE_NO_PAD; use base64::engine::Engine; -use portable_atomic::AtomicU64; use rand::seq::SliceRandom; use rand::thread_rng; use std::cmp; @@ -67,21 +67,12 @@ pub(crate) struct ConnectorOptions { pub(crate) max_reconnects: Option, } -#[derive(Default, Debug)] -pub struct ConnectionStats { - pub in_bytes: AtomicU64, - pub out_bytes: AtomicU64, - pub in_msgs: AtomicU64, - pub out_msgs: AtomicU64, - pub reconnects: AtomicU64, -} - /// Maintains a list of servers and establishes connections. pub(crate) struct Connector { /// A map of servers and number of connect attempts. servers: Vec<(ServerAddr, usize)>, options: ConnectorOptions, - pub(crate) connect_stats: Arc, + pub(crate) connect_stats: Arc, attempts: usize, pub(crate) events_tx: tokio::sync::mpsc::Sender, pub(crate) state_tx: tokio::sync::watch::Sender, @@ -105,7 +96,7 @@ impl Connector { events_tx: tokio::sync::mpsc::Sender, state_tx: tokio::sync::watch::Sender, max_payload: Arc, - connect_stats: Arc, + connect_stats: Arc, ) -> Result { let servers = addrs.to_server_addrs()?.map(|addr| (addr, 0)).collect(); @@ -124,7 +115,6 @@ impl Connector { loop { match self.try_connect().await { Ok(inner) => { - self.connect_stats.reconnects.add(1, Ordering::Relaxed); return Ok(inner); } Err(error) => match error.kind() { @@ -301,6 +291,7 @@ impl Connector { Some(_) => { tracing::debug!("connected to {}", server_info.port); self.attempts = 0; + self.connect_stats.connects.add(1, Ordering::Relaxed); self.events_tx.send(Event::Connected).await.ok(); self.state_tx.send(State::Connected).ok(); self.max_payload.store( diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 00850f9e9..920d3c33f 100755 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -194,7 +194,6 @@ #![deny(rustdoc::invalid_rust_codeblocks)] #![cfg_attr(docsrs, feature(doc_auto_cfg))] -pub use connector::ConnectionStats; use thiserror::Error; use futures::stream::Stream; @@ -253,7 +252,9 @@ mod connector; mod options; pub use auth::Auth; -pub use client::{Client, PublishError, Request, RequestError, RequestErrorKind, SubscribeError}; +pub use client::{ + Client, PublishError, Request, RequestError, RequestErrorKind, Statistics, SubscribeError, +}; pub use options::{AuthError, ConnectOptions}; mod crypto; @@ -671,7 +672,7 @@ impl ConnectionHandler { } => { self.connector .connect_stats - .in_msgs + .in_messages .add(1, Ordering::Relaxed); self.connector .connect_stats @@ -812,10 +813,6 @@ impl ConnectionHandler { .map(|headers| headers.len()) .unwrap_or_default(); - self.connector.connect_stats.out_bytes.add( - (payload.len() + respond.len() + subject.len() + header_len) as u64, - Ordering::Relaxed, - ); let multiplexer = if let Some(multiplexer) = self.multiplexer.as_mut() { multiplexer } else { @@ -836,15 +833,21 @@ impl ConnectionHandler { }; self.connector .connect_stats - .out_msgs + .out_messages .add(1, Ordering::Relaxed); multiplexer.senders.insert(token.to_owned(), sender); + let respond: Subject = format!("{}{}", multiplexer.prefix, token).into(); + + self.connector.connect_stats.out_bytes.add( + (payload.len() + respond.len() + subject.len() + header_len) as u64, + Ordering::Relaxed, + ); let pub_op = ClientOp::Publish { subject, payload, - respond: Some(format!("{}{}", multiplexer.prefix, token).into()), + respond: Some(respond), headers, }; @@ -859,7 +862,7 @@ impl ConnectionHandler { }) => { self.connector .connect_stats - .out_msgs + .out_messages .add(1, Ordering::Relaxed); let header_len = headers @@ -949,7 +952,7 @@ pub async fn connect_with_options( let (state_tx, state_rx) = tokio::sync::watch::channel(State::Pending); // We're setting it to the default server payload size. let max_payload = Arc::new(AtomicUsize::new(1024 * 1024)); - let connection_stats = Arc::new(ConnectionStats::default()); + let connection_stats = Arc::new(Statistics::default()); let mut connector = Connector::new( addrs, diff --git a/async-nats/tests/client_tests.rs b/async-nats/tests/client_tests.rs index 6c26b9800..b7275a24f 100644 --- a/async-nats/tests/client_tests.rs +++ b/async-nats/tests/client_tests.rs @@ -22,6 +22,7 @@ mod client { use futures::stream::StreamExt; use std::path::PathBuf; use std::str::FromStr; + use std::sync::atomic::Ordering; use std::time::Duration; #[tokio::test] @@ -935,9 +936,45 @@ mod client { #[tokio::test] async fn client_statistics() { let server = nats_server::run_basic_server(); - let client = async_nats::connect(server.client_url()).await.unwrap(); - let stats = client.statistics().await; - println!("{:#?}", stats); + + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + let client = async_nats::ConnectOptions::new() + .event_callback(move |event| { + let tx = tx.clone(); + async move { + if let Event::Connected = event { + tx.send(()).await.unwrap(); + } + } + }) + .connect(server.client_url()) + .await + .unwrap(); + + tokio::time::timeout(Duration::from_secs(5), rx.recv()) + .await + .unwrap() + .unwrap(); + let stats = client.statistics(); + + assert_eq!(stats.in_messages.load(Ordering::Relaxed), 0); + assert_eq!(stats.out_messages.load(Ordering::Relaxed), 0); + assert_eq!(stats.in_bytes.load(Ordering::Relaxed), 0); + assert_eq!(stats.out_bytes.load(Ordering::Relaxed), 0); + assert_eq!(stats.connects.load(Ordering::Relaxed), 1); + + let mut responder = client.subscribe("request").await.unwrap(); + tokio::task::spawn({ + let client = client.clone(); + async move { + let msg = responder.next().await.unwrap(); + client + .publish(msg.reply.unwrap(), "response".into()) + .await + .unwrap(); + } + }); + client.request("request", "data".into()).await.unwrap(); let mut sub = client.subscribe("test").await.unwrap(); client.publish("test", "data".into()).await.unwrap(); @@ -946,7 +983,17 @@ mod client { sub.next().await.unwrap(); client.flush().await.unwrap(); - let stats = client.statistics().await; - println!("{:#?}", stats); + client.force_reconnect().await.unwrap(); + + tokio::time::timeout(Duration::from_secs(5), rx.recv()) + .await + .unwrap() + .unwrap(); + + assert_eq!(stats.in_messages.load(Ordering::Relaxed), 4); + assert_eq!(stats.out_messages.load(Ordering::Relaxed), 4); + assert_eq!(stats.in_bytes.load(Ordering::Relaxed), 139); + assert_eq!(stats.out_bytes.load(Ordering::Relaxed), 139); + assert_eq!(stats.connects.load(Ordering::Relaxed), 2); } } From 88de99d9e31b667f30fcef4f1f3fca1e2cbcbfac Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Thu, 12 Sep 2024 09:09:42 +0200 Subject: [PATCH 3/6] Add clippy exception Signed-off-by: Tomasz Pietrek --- async-nats/src/client.rs | 1 + async-nats/src/ext.rs | 169 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 170 insertions(+) create mode 100644 async-nats/src/ext.rs diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index 48a45dc50..6e70afce0 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -109,6 +109,7 @@ impl Sink for Client { } impl Client { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( info: tokio::sync::watch::Receiver, state: tokio::sync::watch::Receiver, diff --git a/async-nats/src/ext.rs b/async-nats/src/ext.rs new file mode 100644 index 000000000..94804bdc5 --- /dev/null +++ b/async-nats/src/ext.rs @@ -0,0 +1,169 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use futures::Stream; +use serde::de::DeserializeOwned; + +pub trait SubscribeExt: Stream +where + M: MessageTrait, +{ + fn for_type(self) -> TypedStream + where + Self: Sized, + T: DeserializeOwned, + { + TypedStream::new(self) + } +} + +impl SubscribeExt for S +where + S: Stream, + M: MessageTrait, +{ +} + +pin_project_lite::pin_project! { + pub struct TypedStream { + #[pin] + stream: S, + _phantom: std::marker::PhantomData, + } +} + +impl TypedStream { + fn new(stream: S) -> Self { + Self { + stream, + _phantom: std::marker::PhantomData, + } + } +} + +pub trait MessageTrait { + // fn payload(&self) -> Bytes; + // fn subject(&self) -> Subject; + // fn reply(&self) -> Option; + // fn headers(&self) -> Option; + // fn status(&self) -> Option; + // fn description(&self) -> Option; + // fn length(&self) -> usize; + fn payload(&self) -> &[u8]; +} + +impl MessageTrait for crate::Message { + fn payload(&self) -> &[u8] { + self.payload.as_ref() + } +} + +impl MessageTrait for crate::PublishMessage { + fn payload(&self) -> &[u8] { + self.payload.as_ref() + } +} + +impl MessageTrait for crate::jetstream::message::Message { + fn payload(&self) -> &[u8] { + self.payload.as_ref() + } +} + +impl Stream for TypedStream +where + S: Stream, + T: DeserializeOwned, + M: MessageTrait, +{ + type Item = serde_json::Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + match this.stream.poll_next(cx) { + Poll::Ready(message) => match message { + Some(message) => { + let message = message.payload(); + Poll::Ready(Some(serde_json::from_slice(&message))) + } + None => Poll::Ready(None), + }, + Poll::Pending => Poll::Pending, + } + } +} + +#[cfg(test)] +mod test { + use futures::StreamExt; + use futures::TryStreamExt; + use serde::Serialize; + + use super::SubscribeExt; + use crate::PublishMessage; + + #[tokio::test] + async fn for_type() { + use futures::stream; + use serde::Deserialize; + + #[derive(Serialize, Deserialize, Debug, PartialEq)] + struct Test { + a: i32, + b: String, + } + + struct OtherTest { + data: (i32, String), + } + + // Prepare some messages + let messages = vec![ + PublishMessage { + subject: "test".into(), + payload: serde_json::to_vec(&Test { + a: 1, + b: "a".to_string(), + }) + .unwrap() + .into(), + reply: None, + headers: Default::default(), + }, + PublishMessage { + subject: "test".into(), + payload: serde_json::to_vec(&Test { + a: 2, + b: "b".to_string(), + }) + .unwrap() + .into(), + reply: None, + headers: Default::default(), + }, + ]; + + // Simulate a stream of messages + let stream = stream::iter(messages); + + // first deserialize into a concrete type + let stream = stream + .for_type::() + // and then transform into another type + .and_then(|item| async move { + Ok(OtherTest { + data: (item.a, item.b), + }) + }); + + // Don't worry, that is just Rust bs about pinning data. + let mut stream = Box::pin(stream); + + // see that it works. + assert_eq!(stream.next().await.unwrap().unwrap().data.0, 1); + assert_eq!(stream.next().await.unwrap().unwrap().data.0, 2); + } +} From 3b9e00c94d4479d5d2216e0d92967a8090986cd0 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Thu, 12 Sep 2024 10:46:34 +0200 Subject: [PATCH 4/6] Update dic Signed-off-by: Tomasz Pietrek --- .config/nats.dic | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.config/nats.dic b/.config/nats.dic index fec16ee97..52c7cfea1 100644 --- a/.config/nats.dic +++ b/.config/nats.dic @@ -156,3 +156,5 @@ create_consumer_strict_on_stream leafnodes get_stream get_stream_no_info +lifecycle +AtomicU64 From f89a309ee9eeff6f7cb182acc93784f63257ef43 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Thu, 12 Sep 2024 11:22:48 +0200 Subject: [PATCH 5/6] fix doc example Signed-off-by: Tomasz Pietrek --- async-nats/src/client.rs | 2 +- async-nats/src/ext.rs | 169 --------------------------------------- 2 files changed, 1 insertion(+), 170 deletions(-) delete mode 100644 async-nats/src/ext.rs diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index 6e70afce0..ce2c0f83f 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -666,7 +666,7 @@ impl Client { /// # async fn main() -> Result<(), async_nats::Error> { /// let client = async_nats::connect("demo.nats.io").await?; /// let statistics = client.statistics(); - /// println!("client statistics: {:$?}", statistics); + /// println!("client statistics: {:#?}", statistics); /// # Ok(()) /// # } /// ``` diff --git a/async-nats/src/ext.rs b/async-nats/src/ext.rs deleted file mode 100644 index 94804bdc5..000000000 --- a/async-nats/src/ext.rs +++ /dev/null @@ -1,169 +0,0 @@ -use std::{ - pin::Pin, - task::{Context, Poll}, -}; - -use futures::Stream; -use serde::de::DeserializeOwned; - -pub trait SubscribeExt: Stream -where - M: MessageTrait, -{ - fn for_type(self) -> TypedStream - where - Self: Sized, - T: DeserializeOwned, - { - TypedStream::new(self) - } -} - -impl SubscribeExt for S -where - S: Stream, - M: MessageTrait, -{ -} - -pin_project_lite::pin_project! { - pub struct TypedStream { - #[pin] - stream: S, - _phantom: std::marker::PhantomData, - } -} - -impl TypedStream { - fn new(stream: S) -> Self { - Self { - stream, - _phantom: std::marker::PhantomData, - } - } -} - -pub trait MessageTrait { - // fn payload(&self) -> Bytes; - // fn subject(&self) -> Subject; - // fn reply(&self) -> Option; - // fn headers(&self) -> Option; - // fn status(&self) -> Option; - // fn description(&self) -> Option; - // fn length(&self) -> usize; - fn payload(&self) -> &[u8]; -} - -impl MessageTrait for crate::Message { - fn payload(&self) -> &[u8] { - self.payload.as_ref() - } -} - -impl MessageTrait for crate::PublishMessage { - fn payload(&self) -> &[u8] { - self.payload.as_ref() - } -} - -impl MessageTrait for crate::jetstream::message::Message { - fn payload(&self) -> &[u8] { - self.payload.as_ref() - } -} - -impl Stream for TypedStream -where - S: Stream, - T: DeserializeOwned, - M: MessageTrait, -{ - type Item = serde_json::Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - - match this.stream.poll_next(cx) { - Poll::Ready(message) => match message { - Some(message) => { - let message = message.payload(); - Poll::Ready(Some(serde_json::from_slice(&message))) - } - None => Poll::Ready(None), - }, - Poll::Pending => Poll::Pending, - } - } -} - -#[cfg(test)] -mod test { - use futures::StreamExt; - use futures::TryStreamExt; - use serde::Serialize; - - use super::SubscribeExt; - use crate::PublishMessage; - - #[tokio::test] - async fn for_type() { - use futures::stream; - use serde::Deserialize; - - #[derive(Serialize, Deserialize, Debug, PartialEq)] - struct Test { - a: i32, - b: String, - } - - struct OtherTest { - data: (i32, String), - } - - // Prepare some messages - let messages = vec![ - PublishMessage { - subject: "test".into(), - payload: serde_json::to_vec(&Test { - a: 1, - b: "a".to_string(), - }) - .unwrap() - .into(), - reply: None, - headers: Default::default(), - }, - PublishMessage { - subject: "test".into(), - payload: serde_json::to_vec(&Test { - a: 2, - b: "b".to_string(), - }) - .unwrap() - .into(), - reply: None, - headers: Default::default(), - }, - ]; - - // Simulate a stream of messages - let stream = stream::iter(messages); - - // first deserialize into a concrete type - let stream = stream - .for_type::() - // and then transform into another type - .and_then(|item| async move { - Ok(OtherTest { - data: (item.a, item.b), - }) - }); - - // Don't worry, that is just Rust bs about pinning data. - let mut stream = Box::pin(stream); - - // see that it works. - assert_eq!(stream.next().await.unwrap().unwrap().data.0, 1); - assert_eq!(stream.next().await.unwrap().unwrap().data.0, 2); - } -} From 916d5f4b6db3abca226823f2ae8fb64cdcbc5af3 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Thu, 12 Sep 2024 20:32:18 +0200 Subject: [PATCH 6/6] Uber nitpicking Signed-off-by: Tomasz Pietrek --- async-nats/src/client.rs | 4 ++-- async-nats/src/lib.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index ce2c0f83f..f64b11717 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -118,7 +118,7 @@ impl Client { inbox_prefix: String, request_timeout: Option, max_payload: Arc, - connection_stats: Arc, + statistics: Arc, ) -> Client { let poll_sender = PollSender::new(sender.clone()); Client { @@ -131,7 +131,7 @@ impl Client { inbox_prefix: inbox_prefix.into(), request_timeout, max_payload, - connection_stats, + connection_stats: statistics, } } diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 920d3c33f..b6079c005 100755 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -952,7 +952,7 @@ pub async fn connect_with_options( let (state_tx, state_rx) = tokio::sync::watch::channel(State::Pending); // We're setting it to the default server payload size. let max_payload = Arc::new(AtomicUsize::new(1024 * 1024)); - let connection_stats = Arc::new(Statistics::default()); + let statistics = Arc::new(Statistics::default()); let mut connector = Connector::new( addrs, @@ -977,7 +977,7 @@ pub async fn connect_with_options( events_tx, state_tx, max_payload.clone(), - connection_stats.clone(), + statistics.clone(), ) .map_err(|err| ConnectError::with_source(ConnectErrorKind::ServerParse, err))?; @@ -1001,7 +1001,7 @@ pub async fn connect_with_options( options.inbox_prefix, options.request_timeout, max_payload, - connection_stats, + statistics, ); task::spawn(async move {