Skip to content

Commit

Permalink
Improve kv::Watcher without messages
Browse files Browse the repository at this point in the history
Until now, if underlying watcher for given consumer did not have any pending
messages, it would indefinitely wait for the first one.
This commit improves it by checking message pending count on initial
consumer info, and returning `None` if there are no messages.

Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Sep 23, 2024
1 parent ea4c372 commit 6cbe847
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 0 deletions.
7 changes: 7 additions & 0 deletions async-nats/src/jetstream/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,8 @@ impl Store {
_ => WatchError::with_source(WatchErrorKind::Other, err),
})?;

let no_messages = consumer.cached_info().num_pending == 0;

Ok(Watch {
subscription: consumer.messages().await.map_err(|err| match err.kind() {
crate::jetstream::consumer::StreamErrorKind::TimedOut => {
Expand All @@ -612,6 +614,7 @@ impl Store {
prefix: self.prefix.clone(),
bucket: self.name.clone(),
seen_current: false,
no_messages,
})
}

Expand Down Expand Up @@ -1072,6 +1075,7 @@ impl Store {

/// A structure representing a watch on a key-value bucket, yielding values whenever there are changes.
pub struct Watch {
no_messages: bool,
seen_current: bool,
subscription: super::consumer::push::Ordered,
prefix: String,
Expand All @@ -1085,6 +1089,9 @@ impl futures::Stream for Watch {
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
if self.no_messages {
return Poll::Ready(None);
}
match self.subscription.poll_next_unpin(cx) {
Poll::Ready(message) => match message {
None => Poll::Ready(None),
Expand Down
23 changes: 23 additions & 0 deletions async-nats/tests/kv_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,29 @@ mod kv {
}
}
}

#[tokio::test]
async fn watch_no_messages() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();

let context = async_nats::jetstream::new(client);
let kv = context
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "history".to_string(),
description: "test_description".to_string(),
history: 15,
storage: StorageType::File,
num_replicas: 1,
..Default::default()
})
.await
.unwrap();

let mut watcher = kv.watch("foo").await.unwrap();
assert!(watcher.next().await.is_none());
}

#[tokio::test]
async fn watch() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down

0 comments on commit 6cbe847

Please sign in to comment.