Skip to content

Commit

Permalink
Add seen_current to kv
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Aug 22, 2024
1 parent f044e06 commit ed856c9
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 0 deletions.
12 changes: 12 additions & 0 deletions async-nats/src/jetstream/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ impl Store {
created,
operation,
delta: 0,
seen_current: false,
};
Ok(Some(entry))
}
Expand Down Expand Up @@ -596,6 +597,7 @@ impl Store {
})?,
prefix: self.prefix.clone(),
bucket: self.name.clone(),
seen_current: false,
})
}

Expand Down Expand Up @@ -1056,6 +1058,7 @@ impl Store {

/// A structure representing a watch on a key-value bucket, yielding values whenever there are changes.
pub struct Watch {
seen_current: bool,
subscription: super::consumer::push::Ordered,
prefix: String,
bucket: String,
Expand Down Expand Up @@ -1088,6 +1091,10 @@ impl futures::Stream for Watch {
.map(|s| s.to_string())
.unwrap();

if !self.seen_current && info.pending == 0 {
self.seen_current = true;
}

Poll::Ready(Some(Ok(Entry {
bucket: self.bucket.clone(),
key,
Expand All @@ -1096,6 +1103,7 @@ impl futures::Stream for Watch {
created: info.published,
delta: info.pending,
operation,
seen_current: self.seen_current,
})))
}
},
Expand Down Expand Up @@ -1157,6 +1165,7 @@ impl futures::Stream for History {
created: info.published,
delta: info.pending,
operation,
seen_current: self.done,
})))
}
},
Expand Down Expand Up @@ -1218,6 +1227,9 @@ pub struct Entry {
pub created: OffsetDateTime,
/// The kind of operation that caused this entry.
pub operation: Operation,
/// Set to true after all historical messages have been received, and
/// now all Entries are the new ones.
pub seen_current: bool,
}

#[derive(Clone, Debug, PartialEq)]
Expand Down
63 changes: 63 additions & 0 deletions async-nats/tests/kv_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,69 @@ mod kv {
}
}

#[tokio::test]
async fn watch_seen_current() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = ConnectOptions::new()
.event_callback(|event| async move { println!("event: {event:?}") })
.connect(server.client_url())
.await
.unwrap();

let context = async_nats::jetstream::new(client);

let kv = context
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "history".into(),
description: "test_description".into(),
history: 15,
storage: StorageType::File,
num_replicas: 1,
..Default::default()
})
.await
.unwrap();

for i in 0..10 {
kv.put(format!("key.{i}"), i.to_string().into())
.await
.unwrap();
}

let mut watcher = kv
.watch_with_history("key.>")
.await
.unwrap()
.enumerate()
.take(20);

tokio::task::spawn({
let kv = kv.clone();
async move {
for i in 10..20 {
tokio::time::sleep(Duration::from_millis(50)).await;
kv.put(format!("key.{i}"), i.to_string().into())
.await
.unwrap();
}
}
});

while let Some((i, entry)) = watcher.next().await {
let entry = entry.unwrap();
assert_eq!(entry.key, format!("key.{i}"));
assert_eq!(
i,
from_utf8(&entry.value).unwrap().parse::<usize>().unwrap()
);
if i >= 9 {
assert!(entry.seen_current);
} else {
assert!(!entry.seen_current);
}
}
}

#[tokio::test]
async fn watch_with_history() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down

0 comments on commit ed856c9

Please sign in to comment.