From a17334c68e51040729b71c461d9635fa07c8a9c1 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 4 Sep 2023 14:35:45 +0200 Subject: [PATCH] Add stream subject mappings Signed-off-by: Tomasz Pietrek --- async-nats/src/jetstream/stream.rs | 55 ++++++++++++++++++++++++--- async-nats/tests/jetstream_tests.rs | 59 +++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 6 deletions(-) diff --git a/async-nats/src/jetstream/stream.rs b/async-nats/src/jetstream/stream.rs index 0283753b0..c5334693a 100644 --- a/async-nats/src/jetstream/stream.rs +++ b/async-nats/src/jetstream/stream.rs @@ -32,7 +32,7 @@ use base64::engine::general_purpose::STANDARD; use base64::engine::Engine; use bytes::Bytes; use futures::{future::BoxFuture, TryFutureExt}; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; use serde_json::json; use time::{serde::rfc3339, OffsetDateTime}; @@ -1153,16 +1153,21 @@ pub enum StorageType { /// Shows config and current state for this stream. #[derive(Debug, Deserialize, Clone)] pub struct Info { - /// The configuration associated with this stream + /// The configuration associated with this stream. pub config: Config, - /// The time that this stream was created + /// The time that this stream was created. #[serde(with = "rfc3339")] pub created: time::OffsetDateTime, - /// Various metrics associated with this stream + /// Various metrics associated with this stream. pub state: State, - - ///information about leader and replicas + /// Information about leader and replicas. pub cluster: Option, + /// Information about mirror config if present. + #[serde(default)] + pub mirror: Option, + /// Information about sources configs if present. + #[serde(default)] + pub sources: Vec, } #[derive(Deserialize)] @@ -1374,6 +1379,40 @@ pub struct PeerInfo { pub lag: Option, } +#[derive(Debug, Clone, Deserialize)] +pub struct SourceInfo { + /// Source name. + pub name: String, + /// Number of messages this source is lagging behind. + pub lag: u64, + /// Last time the source was seen active. + #[serde(deserialize_with = "negative_duration_as_none")] + pub active: Option, + /// Filtering for the source. + #[serde(default)] + pub filter_subject: Option, + /// Source destination subject. + #[serde(default)] + pub subject_transform_dest: Option, + /// List of transforms. + #[serde(default)] + pub subject_transforms: Vec, +} + +fn negative_duration_as_none<'de, D>( + deserializer: D, +) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let n = i64::deserialize(deserializer)?; + if n.is_negative() { + Ok(None) + } else { + Ok(Some(std::time::Duration::from_nanos(n as u64))) + } +} + /// The response generated by trying to purge a stream. #[derive(Debug, Deserialize, Clone, Copy)] pub struct PurgeResponse { @@ -1430,6 +1469,10 @@ pub struct Source { skip_serializing_if = "is_default" )] pub subject_transform_destination: Option, + /// Subject transforms for Stream. + #[cfg(feature = "server_2_10")] + #[serde(default, skip_serializing_if = "is_default")] + pub subject_transforms: Vec, } #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)] diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index 5797f7d7b..2a7ab5549 100644 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -2733,6 +2733,65 @@ mod jetstream { ); } + #[tokio::test] + #[cfg(feature = "server_2_10")] + async fn stream_subject_transforms() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = async_nats::connect(server.client_url()).await.unwrap(); + let context = async_nats::jetstream::new(client); + + let subject_transform = stream::SubjectTransform { + source: "foo".to_string(), + destination: "bar".to_string(), + }; + + let source = stream::Source { + name: "source".to_string(), + filter_subject: Some("stream1.foo".to_string()), + subject_transform_destination: Some("foo".to_string()), + ..Default::default() + }; + + let sources = vec![ + source.clone(), + stream::Source { + name: "multi_source".to_string(), + subject_transforms: vec![stream::SubjectTransform { + source: "stream2.foo.>".to_string(), + destination: "foo.>".to_string(), + }], + ..Default::default() + }, + ]; + + let mut stream = context + .create_stream(stream::Config { + name: "filtered".to_string(), + subject_transform: Some(subject_transform.clone()), + sources: Some(sources.clone()), + ..Default::default() + }) + .await + .unwrap(); + + let info = stream.info().await.unwrap(); + assert_eq!(info.config.sources, Some(sources.clone())); + assert_eq!(info.config.subject_transform, Some(subject_transform)); + + let mut stream = context + .create_stream(stream::Config { + name: "mirror".to_string(), + mirror: Some(source.clone()), + ..Default::default() + }) + .await + .unwrap(); + + let info = stream.info().await.unwrap(); + + assert_eq!(info.config.mirror, Some(source)); + } + #[tokio::test] async fn pull_by_bytes() { let server = nats_server::run_server("tests/configs/jetstream.conf");