Skip to content

Commit

Permalink
Improve reconnect randomization
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Mar 8, 2023
1 parent f7a1680 commit f432fa4
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 2 deletions.
1 change: 1 addition & 0 deletions async-nats/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ lazy_static = "1.4"
base64 = "0.13"
tokio-retry = "0.3"
ring = "0.16"
rand = "0.8"

[dev-dependencies]
criterion = { version = "0.3", features = ["async_tokio"]}
Expand Down
10 changes: 8 additions & 2 deletions async-nats/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use crate::ToServerAddrs;
use crate::LANG;
use crate::VERSION;
use bytes::BytesMut;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::cmp;
use std::collections::HashMap;
use std::io;
Expand Down Expand Up @@ -99,8 +101,13 @@ impl Connector {

pub(crate) async fn try_connect(&mut self) -> Result<(ServerInfo, Connection), io::Error> {
let mut error = None;
let server_addrs = {
let mut rng = thread_rng();
let mut server_addrs: Vec<ServerAddr> = self.servers.keys().cloned().collect();
server_addrs.shuffle(&mut rng);
server_addrs
};

let server_addrs: Vec<ServerAddr> = self.servers.keys().cloned().collect();
for server_addr in server_addrs {
let server_attempts = self.servers.get_mut(&server_addr).unwrap();
let duration = if *server_attempts == 0 {
Expand All @@ -111,7 +118,6 @@ impl Connector {

cmp::min(Duration::from_millis(2_u64.saturating_pow(exp)), max)
};

*server_attempts += 1;
sleep(duration).await;

Expand Down
42 changes: 42 additions & 0 deletions async-nats/tests/jwt_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,48 @@ mod client {
.await
.expect("published");
}

#[cfg(not(target_os = "windows"))]
#[tokio::test]
async fn jwt_lame_duck_reconnect() {
use async_nats::Event;
let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let c = nats_server::run_cluster("tests/configs/jwt.conf");

let (tx_recconect, mut rx_reconnect) = tokio::sync::mpsc::channel(10);

let client = async_nats::ConnectOptions::with_credentials_file(
path.join("tests/configs/TestUser.creds"),
)
.await
.unwrap()
.event_callback({
let tx = tx_recconect.clone();
move |event| {
let tx = tx.clone();
async move {
if event == Event::Connected {
tx.send(()).await.unwrap();
}
}
}
})
.connect(c.client_url())
.await
.unwrap();

let mut subscriber = client.subscribe("test".into()).await.unwrap();
for i in 0..2 {
rx_reconnect.recv().await;
let mut subscribe = client.subscribe("test".into()).await.unwrap();
client.publish("test".into(), "data".into()).await.unwrap();
subscribe.next().await.unwrap();
client.flush().await.unwrap();
assert!(subscriber.next().await.is_some());
nats_server::set_lame_duck_mode(&c.servers[i]);
}
}

#[tokio::test]
async fn jwt_reconnect() {
use async_nats::ServerAddr;
Expand Down

0 comments on commit f432fa4

Please sign in to comment.