From ec6331cb05b81e6b3c41bd75762ddab18f256ecd Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 20 May 2024 14:30:42 +0200 Subject: [PATCH] Improve ack test Signed-off-by: Tomasz Pietrek --- async-nats/tests/jetstream_tests.rs | 47 ++++++++++++++++++++++------- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index 0f56c5c30..2e007c95a 100755 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -2378,10 +2378,18 @@ mod jetstream { let mut iter = consumer.fetch().max_messages(100).messages().await.unwrap(); client.flush().await.unwrap(); - // TODO: when rtt() is available, use it here. - tokio::time::sleep(Duration::from_millis(400)).await; - let info = consumer.info().await.unwrap(); - assert_eq!(info.num_ack_pending, 10); + tryhard::retry_fn(|| async { + let mut consumer = consumer.clone(); + let num_ack_pending = consumer.info().await?.num_ack_pending; + if num_ack_pending != 10 { + return Err(format!("expected {}, got {}", 10, num_ack_pending).into()); + } + Ok::<(), async_nats::Error>(()) + }) + .retries(10) + .exponential_backoff(Duration::from_millis(500)) + .await + .unwrap(); // standard ack if let Some(message) = iter.next().await { @@ -2389,19 +2397,36 @@ mod jetstream { } client.flush().await.unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; - let info = consumer.info().await.unwrap(); - assert_eq!(info.num_ack_pending, 9); + tryhard::retry_fn(|| async { + let mut consumer = consumer.clone(); + let num_ack_pending = consumer.info().await?.num_ack_pending; + if num_ack_pending != 9 { + return Err(format!("expected {}, got {}", 9, num_ack_pending).into()); + } + Ok::<(), async_nats::Error>(()) + }) + .retries(10) + .exponential_backoff(Duration::from_millis(500)) + .await + .unwrap(); // double ack if let Some(message) = iter.next().await { message.unwrap().double_ack().await.unwrap(); } - client.flush().await.unwrap(); - tokio::time::sleep(Duration::from_millis(500)).await; - let info = consumer.info().await.unwrap(); - assert_eq!(info.num_ack_pending, 8); + tryhard::retry_fn(|| async { + let mut consumer = consumer.clone(); + let num_ack_pending = consumer.info().await?.num_ack_pending; + if num_ack_pending != 8 { + return Err(format!("expected {}, got {}", 8, num_ack_pending).into()); + } + Ok::<(), async_nats::Error>(()) + }) + .retries(10) + .exponential_backoff(Duration::from_millis(500)) + .await + .unwrap(); // in progress if let Some(message) = iter.next().await {