Skip to content

Commit

Permalink
Add stream_by_subject
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Jul 26, 2024
1 parent ee03622 commit dcbd68a
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 3 deletions.
76 changes: 74 additions & 2 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ use crate::jetstream::account::Account;
use crate::jetstream::publish::PublishAck;
use crate::jetstream::response::Response;
use crate::subject::ToSubject;
use crate::{header, Client, Command, HeaderMap, HeaderValue, Message, StatusCode};
use crate::{
header, is_valid_subject, Client, Command, HeaderMap, HeaderValue, Message, StatusCode,
};
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::{Future, TryFutureExt};
use futures::{Future, StreamExt, TryFutureExt};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json::{self, json};
Expand Down Expand Up @@ -498,6 +500,52 @@ impl Context {
}
}

/// Looks up Stream that contains provided subject.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use futures::TryStreamExt;
/// let client = async_nats::connect("demo.nats.io:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
/// let stream_name = jetstream.stream_by_subject("foo.>");
/// # Ok(())
/// # }
/// ```
pub async fn stream_by_subject<T: Into<String>>(
&self,
subject: T,
) -> Result<String, GetStreamByNameError> {
let subject = subject.into();
if !is_valid_subject(subject.as_str()) {
return Err(GetStreamByNameError::new(
GetStreamByNameErrorKind::InvalidSubject,
));
}
let mut names = StreamNames {
context: self.clone(),
offset: 0,
page_request: None,
streams: Vec::new(),
subject: Some(subject),
done: false,
};
match names.next().await {
Some(name) => match name {
Ok(name) => Ok(name),
Err(err) => Err(GetStreamByNameError::with_source(
GetStreamByNameErrorKind::Request,
err,
)),
},
None => Err(GetStreamByNameError::new(
GetStreamByNameErrorKind::NotFound,
)),
}
}

/// Lists names of all streams for current context.
///
/// # Examples
Expand All @@ -521,6 +569,7 @@ impl Context {
offset: 0,
page_request: None,
streams: Vec::new(),
subject: None,
done: false,
}
}
Expand Down Expand Up @@ -1297,6 +1346,7 @@ pub struct StreamNames {
context: Context,
offset: usize,
page_request: Option<PageRequest>,
subject: Option<String>,
streams: Vec<String>,
done: bool,
}
Expand Down Expand Up @@ -1339,12 +1389,14 @@ impl futures::Stream for StreamNames {
}
let context = self.context.clone();
let offset = self.offset;
let subject = self.subject.clone();
self.page_request = Some(Box::pin(async move {
match context
.request(
"STREAM.NAMES",
&json!({
"offset": offset,
"subject": subject
}),
)
.await?
Expand Down Expand Up @@ -1611,7 +1663,27 @@ impl Display for GetStreamErrorKind {
}
}

#[derive(Clone, Debug, PartialEq)]
pub enum GetStreamByNameErrorKind {
Request,
NotFound,
InvalidSubject,
JetStream(super::errors::Error),
}

impl Display for GetStreamByNameErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Request => write!(f, "request error"),
Self::NotFound => write!(f, "stream not found"),
Self::InvalidSubject => write!(f, "invalid subject"),
Self::JetStream(err) => write!(f, "jetstream error: {}", err),
}
}
}

pub type GetStreamError = Error<GetStreamErrorKind>;
pub type GetStreamByNameError = Error<GetStreamByNameErrorKind>;

pub type UpdateStreamError = CreateStreamError;
pub type UpdateStreamErrorKind = CreateStreamErrorKind;
Expand Down
25 changes: 24 additions & 1 deletion async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ mod jetstream {
self, push, AckPolicy, DeliverPolicy, Info, OrderedPullConsumer, OrderedPushConsumer,
PullConsumer, PushConsumer, ReplayPolicy,
};
use async_nats::jetstream::context::{Publish, PublishErrorKind};
use async_nats::jetstream::context::{GetStreamByNameErrorKind, Publish, PublishErrorKind};
use async_nats::jetstream::response::Response;
use async_nats::jetstream::stream::{
self, ConsumerCreateStrictErrorKind, ConsumerUpdateErrorKind, DiscardPolicy, StorageType,
Expand Down Expand Up @@ -3654,4 +3654,27 @@ mod jetstream {
.await
.expect_err("should fail but not panic because of lack of server info");
}

#[tokio::test]
async fn test_stream_by_subject() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();

let jetstream = async_nats::jetstream::new(client);

let stream = jetstream
.create_stream(stream::Config {
name: "events".to_string(),
subjects: vec!["events.>".to_string()],
..Default::default()
})
.await
.unwrap();

let stream_name = jetstream.stream_by_subject("events.>").await.unwrap();
assert_eq!(stream_name, stream.cached_info().config.name);

let err = jetstream.stream_by_subject("foo").await.unwrap_err();
assert_eq!(err.kind(), GetStreamByNameErrorKind::NotFound);
}
}

0 comments on commit dcbd68a

Please sign in to comment.