Skip to content

Commit

Permalink
Merge pull request #171 from avifenesh/bigfix/closing_connection
Browse files Browse the repository at this point in the history
fixed close connection logic
  • Loading branch information
avifenesh authored Jul 4, 2024
2 parents b954618 + 7218271 commit 062e727
Showing 1 changed file with 32 additions and 30 deletions.
62 changes: 32 additions & 30 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ mod connections_logic;
pub mod testing {
pub use super::connections_logic::*;
}
use crate::{
cluster_slotmap::SlotMap,
cluster_topology::SLOT_SIZE,
cmd,
commands::cluster_scan::{cluster_scan, ClusterScanArgs, ObjectType, ScanStateRC},
FromRedisValue, InfoDict,
};
#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))]
use async_std::task::{spawn, JoinHandle};
#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))]
use futures::executor::block_on;
use std::{
collections::{HashMap, HashSet},
fmt, io, mem,
Expand All @@ -40,14 +51,8 @@ use std::{
task::{self, Poll},
time::SystemTime,
};

use crate::{
cluster_slotmap::SlotMap,
cluster_topology::SLOT_SIZE,
cmd,
commands::cluster_scan::{cluster_scan, ClusterScanArgs, ObjectType, ScanStateRC},
FromRedisValue, InfoDict,
};
#[cfg(feature = "tokio-comp")]
use tokio::task::JoinHandle;

use crate::{
aio::{get_socket_addrs, ConnectionLike, MultiplexedConnection, Runtime},
Expand Down Expand Up @@ -88,7 +93,6 @@ use backoff_tokio::{Error as BackoffError, ExponentialBackoff};
use dispose::{Disposable, Dispose};
use futures::{future::BoxFuture, prelude::*, ready};
use pin_project_lite::pin_project;
use std::sync::atomic::AtomicBool;
use tokio::sync::{
mpsc,
oneshot::{self, Receiver},
Expand Down Expand Up @@ -393,13 +397,18 @@ pub(crate) struct ClusterConnInner<C> {
#[allow(clippy::complexity)]
in_flight_requests: stream::FuturesUnordered<Pin<Box<Request<C>>>>,
refresh_error: Option<RedisError>,
// A flag indicating the connection's closure and the requirement to shut down all related tasks.
shutdown_flag: Arc<AtomicBool>,
// Handler of the periodic check task.
periodic_checks_handler: Option<JoinHandle<()>>,
}

impl<C> Dispose for ClusterConnInner<C> {
fn dispose(self) {
self.shutdown_flag.store(true, Ordering::Relaxed);
if let Some(handle) = self.periodic_checks_handler {
#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))]
block_on(handle.cancel());
#[cfg(feature = "tokio-comp")]
handle.abort()
}
}
}

Expand Down Expand Up @@ -913,13 +922,12 @@ where
),
subscriptions_by_address: RwLock::new(Default::default()),
});
let shutdown_flag = Arc::new(AtomicBool::new(false));
let connection = ClusterConnInner {
let mut connection = ClusterConnInner {
inner,
in_flight_requests: Default::default(),
refresh_error: None,
state: ConnectionState::PollComplete,
shutdown_flag: shutdown_flag.clone(),
periodic_checks_handler: None,
};
Self::refresh_slots_and_subscriptions_with_retries(
connection.inner.clone(),
Expand All @@ -928,15 +936,16 @@ where
.await?;

if let Some(duration) = topology_checks_interval {
let periodic_task = ClusterConnInner::periodic_topology_check(
connection.inner.clone(),
duration,
shutdown_flag,
);
let periodic_task =
ClusterConnInner::periodic_topology_check(connection.inner.clone(), duration);
#[cfg(feature = "tokio-comp")]
tokio::spawn(periodic_task);
{
connection.periodic_checks_handler = Some(tokio::spawn(periodic_task));
}
#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))]
AsyncStd::spawn(periodic_task);
{
connection.periodic_checks_handler = Some(spawn(periodic_task));
}
}

Ok(Disposable::new(connection))
Expand Down Expand Up @@ -1307,15 +1316,8 @@ where
topology_changed
}

async fn periodic_topology_check(
inner: Arc<InnerCore<C>>,
interval_duration: Duration,
shutdown_flag: Arc<AtomicBool>,
) {
async fn periodic_topology_check(inner: Arc<InnerCore<C>>, interval_duration: Duration) {
loop {
if shutdown_flag.load(Ordering::Relaxed) {
return;
}
let _ = boxed_sleep(interval_duration).await;
let topology_changed =
Self::check_topology_and_refresh_if_diff(inner.clone(), &RefreshPolicy::Throttable)
Expand Down

0 comments on commit 062e727

Please sign in to comment.