diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index 7f31ca814..f4bcec82b 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -451,6 +451,7 @@ impl Store { created, operation, delta: 0, + seen_current: false, }; Ok(Some(entry)) } @@ -596,6 +597,7 @@ impl Store { })?, prefix: self.prefix.clone(), bucket: self.name.clone(), + seen_current: false, }) } @@ -1056,6 +1058,7 @@ impl Store { /// A structure representing a watch on a key-value bucket, yielding values whenever there are changes. pub struct Watch { + seen_current: bool, subscription: super::consumer::push::Ordered, prefix: String, bucket: String, @@ -1088,6 +1091,10 @@ impl futures::Stream for Watch { .map(|s| s.to_string()) .unwrap(); + if !self.seen_current && info.pending == 0 { + self.seen_current = true; + } + Poll::Ready(Some(Ok(Entry { bucket: self.bucket.clone(), key, @@ -1096,6 +1103,7 @@ impl futures::Stream for Watch { created: info.published, delta: info.pending, operation, + seen_current: self.seen_current, }))) } }, @@ -1157,6 +1165,7 @@ impl futures::Stream for History { created: info.published, delta: info.pending, operation, + seen_current: self.done, }))) } }, @@ -1218,6 +1227,9 @@ pub struct Entry { pub created: OffsetDateTime, /// The kind of operation that caused this entry. pub operation: Operation, + /// Set to true after all historical messages have been received, and + /// now all Entries are the new ones. + pub seen_current: bool, } #[derive(Clone, Debug, PartialEq)] diff --git a/async-nats/tests/kv_tests.rs b/async-nats/tests/kv_tests.rs index 670c19127..0d66e47a0 100644 --- a/async-nats/tests/kv_tests.rs +++ b/async-nats/tests/kv_tests.rs @@ -581,6 +581,69 @@ mod kv { } } + #[tokio::test] + async fn watch_seen_current() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = ConnectOptions::new() + .event_callback(|event| async move { println!("event: {event:?}") }) + .connect(server.client_url()) + .await + .unwrap(); + + let context = async_nats::jetstream::new(client); + + let kv = context + .create_key_value(async_nats::jetstream::kv::Config { + bucket: "history".into(), + description: "test_description".into(), + history: 15, + storage: StorageType::File, + num_replicas: 1, + ..Default::default() + }) + .await + .unwrap(); + + for i in 0..10 { + kv.put(format!("key.{i}"), i.to_string().into()) + .await + .unwrap(); + } + + let mut watcher = kv + .watch_with_history("key.>") + .await + .unwrap() + .enumerate() + .take(20); + + tokio::task::spawn({ + let kv = kv.clone(); + async move { + for i in 10..20 { + tokio::time::sleep(Duration::from_millis(50)).await; + kv.put(format!("key.{i}"), i.to_string().into()) + .await + .unwrap(); + } + } + }); + + while let Some((i, entry)) = watcher.next().await { + let entry = entry.unwrap(); + assert_eq!(entry.key, format!("key.{i}")); + assert_eq!( + i, + from_utf8(&entry.value).unwrap().parse::().unwrap() + ); + if i >= 9 { + assert!(entry.seen_current); + } else { + assert!(!entry.seen_current); + } + } + } + #[tokio::test] async fn watch_with_history() { let server = nats_server::run_server("tests/configs/jetstream.conf");