Skip to content

Commit

Permalink
Improve ack test
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed May 20, 2024
1 parent 9540407 commit ec6331c
Showing 1 changed file with 36 additions and 11 deletions.
47 changes: 36 additions & 11 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2378,30 +2378,55 @@ mod jetstream {
let mut iter = consumer.fetch().max_messages(100).messages().await.unwrap();
client.flush().await.unwrap();

// TODO: when rtt() is available, use it here.
tokio::time::sleep(Duration::from_millis(400)).await;
let info = consumer.info().await.unwrap();
assert_eq!(info.num_ack_pending, 10);
tryhard::retry_fn(|| async {
let mut consumer = consumer.clone();
let num_ack_pending = consumer.info().await?.num_ack_pending;
if num_ack_pending != 10 {
return Err(format!("expected {}, got {}", 10, num_ack_pending).into());
}
Ok::<(), async_nats::Error>(())
})
.retries(10)
.exponential_backoff(Duration::from_millis(500))
.await
.unwrap();

// standard ack
if let Some(message) = iter.next().await {
message.unwrap().ack().await.unwrap();
}
client.flush().await.unwrap();

tokio::time::sleep(Duration::from_millis(100)).await;
let info = consumer.info().await.unwrap();
assert_eq!(info.num_ack_pending, 9);
tryhard::retry_fn(|| async {
let mut consumer = consumer.clone();
let num_ack_pending = consumer.info().await?.num_ack_pending;
if num_ack_pending != 9 {
return Err(format!("expected {}, got {}", 9, num_ack_pending).into());
}
Ok::<(), async_nats::Error>(())
})
.retries(10)
.exponential_backoff(Duration::from_millis(500))
.await
.unwrap();

// double ack
if let Some(message) = iter.next().await {
message.unwrap().double_ack().await.unwrap();
}
client.flush().await.unwrap();

tokio::time::sleep(Duration::from_millis(500)).await;
let info = consumer.info().await.unwrap();
assert_eq!(info.num_ack_pending, 8);
tryhard::retry_fn(|| async {
let mut consumer = consumer.clone();
let num_ack_pending = consumer.info().await?.num_ack_pending;
if num_ack_pending != 8 {
return Err(format!("expected {}, got {}", 8, num_ack_pending).into());
}
Ok::<(), async_nats::Error>(())
})
.retries(10)
.exponential_backoff(Duration::from_millis(500))
.await
.unwrap();

// in progress
if let Some(message) = iter.next().await {
Expand Down

0 comments on commit ec6331c

Please sign in to comment.