Skip to content

Commit

Permalink
fixed close connection logic
Browse files Browse the repository at this point in the history
  • Loading branch information
avifenesh committed Jul 4, 2024
1 parent 4571c91 commit 701705d
Showing 1 changed file with 14 additions and 22 deletions.
36 changes: 14 additions & 22 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ use backoff_tokio::{Error as BackoffError, ExponentialBackoff};
use dispose::{Disposable, Dispose};
use futures::{future::BoxFuture, prelude::*, ready};
use pin_project_lite::pin_project;
use std::sync::atomic::AtomicBool;
use tokio::sync::{
mpsc,
oneshot::{self, Receiver},
Expand Down Expand Up @@ -393,13 +392,13 @@ pub(crate) struct ClusterConnInner<C> {
#[allow(clippy::complexity)]
in_flight_requests: stream::FuturesUnordered<Pin<Box<Request<C>>>>,
refresh_error: Option<RedisError>,
// A flag indicating the connection's closure and the requirement to shut down all related tasks.
shutdown_flag: Arc<AtomicBool>,
// Handler of the periodic check task.
periodic_checks_handler: Option<tokio::task::JoinHandle<()>>,
}

impl<C> Dispose for ClusterConnInner<C> {
fn dispose(self) {
self.shutdown_flag.store(true, Ordering::Relaxed);
self.periodic_checks_handler.map(|handle| handle.abort());
}
}

Expand Down Expand Up @@ -913,13 +912,12 @@ where
),
subscriptions_by_address: RwLock::new(Default::default()),
});
let shutdown_flag = Arc::new(AtomicBool::new(false));
let connection = ClusterConnInner {
let mut connection = ClusterConnInner {
inner,
in_flight_requests: Default::default(),
refresh_error: None,
state: ConnectionState::PollComplete,
shutdown_flag: shutdown_flag.clone(),
periodic_checks_handler: None,
};
Self::refresh_slots_and_subscriptions_with_retries(
connection.inner.clone(),
Expand All @@ -928,15 +926,16 @@ where
.await?;

if let Some(duration) = topology_checks_interval {
let periodic_task = ClusterConnInner::periodic_topology_check(
connection.inner.clone(),
duration,
shutdown_flag,
);
let periodic_task =
ClusterConnInner::periodic_topology_check(connection.inner.clone(), duration);
#[cfg(feature = "tokio-comp")]
tokio::spawn(periodic_task);
{
connection.periodic_checks_handler = Some(tokio::spawn(periodic_task));
}
#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))]
AsyncStd::spawn(periodic_task);
{
connection.periodic_checks_handler = Some(AsyncStd::spawn(periodic_task));
}
}

Ok(Disposable::new(connection))
Expand Down Expand Up @@ -1307,15 +1306,8 @@ where
topology_changed
}

async fn periodic_topology_check(
inner: Arc<InnerCore<C>>,
interval_duration: Duration,
shutdown_flag: Arc<AtomicBool>,
) {
async fn periodic_topology_check(inner: Arc<InnerCore<C>>, interval_duration: Duration) {
loop {
if shutdown_flag.load(Ordering::Relaxed) {
return;
}
let _ = boxed_sleep(interval_duration).await;
let topology_changed =
Self::check_topology_and_refresh_if_diff(inner.clone(), &RefreshPolicy::Throttable)
Expand Down

0 comments on commit 701705d

Please sign in to comment.