Skip to content

Commit

Permalink
Fix benchmark linter error
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Feb 14, 2024
1 parent cb49bea commit 9c5c32e
Showing 1 changed file with 5 additions and 6 deletions.
11 changes: 5 additions & 6 deletions async-nats/benches/jetstream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::future::IntoFuture;

use async_nats::jetstream::stream;
use async_nats::jetstream::{context::PublishAckFuture, stream};
use bytes::Bytes;
use criterion::{criterion_group, Criterion};

Expand Down Expand Up @@ -199,16 +197,17 @@ async fn publish_sync_batch(context: async_nats::jetstream::Context, msg: Bytes,

async fn publish_async_batch(context: async_nats::jetstream::Context, msg: Bytes, amount: u64) {
// This acts as a semaphore that does not allow for more than 10 publish acks awaiting.
let (tx, mut rx) = tokio::sync::mpsc::channel(10);
let (tx, mut rx) = tokio::sync::mpsc::channel(amount as usize);

let handle = tokio::task::spawn(async move {
for _ in 0..amount {
rx.recv().await.unwrap();
let ack: PublishAckFuture = rx.recv().await.unwrap();
ack.await.unwrap();
}
});
for _ in 0..amount {
let ack = context.publish("bench", msg.clone()).await.unwrap();
tx.send(ack.into_future()).await.unwrap();
tx.send(ack).await.unwrap();
}
handle.await.unwrap();
}
Expand Down

0 comments on commit 9c5c32e

Please sign in to comment.