Skip to content

Commit

Permalink
Add stream subject mappings
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Sep 5, 2023
1 parent 8661572 commit a17334c
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 6 deletions.
55 changes: 49 additions & 6 deletions async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use base64::engine::general_purpose::STANDARD;
use base64::engine::Engine;
use bytes::Bytes;
use futures::{future::BoxFuture, TryFutureExt};
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::json;
use time::{serde::rfc3339, OffsetDateTime};

Expand Down Expand Up @@ -1153,16 +1153,21 @@ pub enum StorageType {
/// Shows config and current state for this stream.
#[derive(Debug, Deserialize, Clone)]
pub struct Info {
/// The configuration associated with this stream
/// The configuration associated with this stream.
pub config: Config,
/// The time that this stream was created
/// The time that this stream was created.
#[serde(with = "rfc3339")]
pub created: time::OffsetDateTime,
/// Various metrics associated with this stream
/// Various metrics associated with this stream.
pub state: State,

///information about leader and replicas
/// Information about leader and replicas.
pub cluster: Option<ClusterInfo>,
/// Information about mirror config if present.
#[serde(default)]
pub mirror: Option<SourceInfo>,
/// Information about sources configs if present.
#[serde(default)]
pub sources: Vec<SourceInfo>,
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -1374,6 +1379,40 @@ pub struct PeerInfo {
pub lag: Option<u64>,
}

#[derive(Debug, Clone, Deserialize)]
pub struct SourceInfo {
/// Source name.
pub name: String,
/// Number of messages this source is lagging behind.
pub lag: u64,
/// Last time the source was seen active.
#[serde(deserialize_with = "negative_duration_as_none")]
pub active: Option<std::time::Duration>,
/// Filtering for the source.
#[serde(default)]
pub filter_subject: Option<String>,
/// Source destination subject.
#[serde(default)]
pub subject_transform_dest: Option<String>,
/// List of transforms.
#[serde(default)]
pub subject_transforms: Vec<SubjectTransform>,
}

fn negative_duration_as_none<'de, D>(
deserializer: D,
) -> Result<Option<std::time::Duration>, D::Error>
where
D: Deserializer<'de>,
{
let n = i64::deserialize(deserializer)?;
if n.is_negative() {
Ok(None)
} else {
Ok(Some(std::time::Duration::from_nanos(n as u64)))
}
}

/// The response generated by trying to purge a stream.
#[derive(Debug, Deserialize, Clone, Copy)]
pub struct PurgeResponse {
Expand Down Expand Up @@ -1430,6 +1469,10 @@ pub struct Source {
skip_serializing_if = "is_default"
)]
pub subject_transform_destination: Option<String>,
/// Subject transforms for Stream.
#[cfg(feature = "server_2_10")]
#[serde(default, skip_serializing_if = "is_default")]
pub subject_transforms: Vec<SubjectTransform>,
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
Expand Down
59 changes: 59 additions & 0 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2733,6 +2733,65 @@ mod jetstream {
);
}

#[tokio::test]
#[cfg(feature = "server_2_10")]
async fn stream_subject_transforms() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let context = async_nats::jetstream::new(client);

let subject_transform = stream::SubjectTransform {
source: "foo".to_string(),
destination: "bar".to_string(),
};

let source = stream::Source {
name: "source".to_string(),
filter_subject: Some("stream1.foo".to_string()),
subject_transform_destination: Some("foo".to_string()),
..Default::default()
};

let sources = vec![
source.clone(),
stream::Source {
name: "multi_source".to_string(),
subject_transforms: vec![stream::SubjectTransform {
source: "stream2.foo.>".to_string(),
destination: "foo.>".to_string(),
}],
..Default::default()
},
];

let mut stream = context
.create_stream(stream::Config {
name: "filtered".to_string(),
subject_transform: Some(subject_transform.clone()),
sources: Some(sources.clone()),
..Default::default()
})
.await
.unwrap();

let info = stream.info().await.unwrap();
assert_eq!(info.config.sources, Some(sources.clone()));
assert_eq!(info.config.subject_transform, Some(subject_transform));

let mut stream = context
.create_stream(stream::Config {
name: "mirror".to_string(),
mirror: Some(source.clone()),
..Default::default()
})
.await
.unwrap();

let info = stream.info().await.unwrap();

assert_eq!(info.config.mirror, Some(source));
}

#[tokio::test]
async fn pull_by_bytes() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down

0 comments on commit a17334c

Please sign in to comment.