From f9d2622fff5c8bda9d6374978fa82ac877db718e Mon Sep 17 00:00:00 2001 From: barshaul Date: Wed, 31 Jul 2024 14:26:32 +0000 Subject: [PATCH] Refactored the connections map from HashMap to DashMap to minimize write lock contention on the connection container --- Cargo.lock | 15 ++++ redis/Cargo.toml | 5 +- .../cluster_async/connections_container.rs | 85 ++++++++++--------- redis/src/cluster_async/mod.rs | 21 ++--- 4 files changed, 76 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ce94dfeaa1..a6a8c9b7bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -652,6 +652,20 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" +[[package]] +name = "dashmap" +version = "6.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.3", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "derivative" version = "2.2.0" @@ -1637,6 +1651,7 @@ dependencies = [ "combine", "crc16", "criterion", + "dashmap", "derivative", "dispose", "fast-math", diff --git a/redis/Cargo.toml b/redis/Cargo.toml index 0d836a6989..d9711e6955 100644 --- a/redis/Cargo.toml +++ b/redis/Cargo.toml @@ -63,6 +63,9 @@ crc16 = { version = "0.4", optional = true } rand = { version = "0.8", optional = true } derivative = { version = "2.2.0", optional = true } +# Only needed for async cluster +dashmap = { version = "6.0.1", optional = true } + # Only needed for async_std support async-std = { version = "1.8.0", optional = true } async-trait = { version = "0.1.24", optional = true } @@ -124,7 +127,7 @@ tokio-native-tls-comp = ["tokio-comp", "tls-native-tls", "tokio-native-tls"] tokio-rustls-comp = ["tokio-comp", "tls-rustls", "tokio-rustls"] connection-manager = ["futures", "aio", "tokio-retry"] streams = [] -cluster-async = ["cluster", "futures", "futures-util"] +cluster-async = ["cluster", "futures", "futures-util", "dashmap"] keep-alive = ["socket2"] sentinel = ["rand"] tcp_nodelay = [] diff --git a/redis/src/cluster_async/connections_container.rs b/redis/src/cluster_async/connections_container.rs index 2a438aa770..8e8e83bbe6 100644 --- a/redis/src/cluster_async/connections_container.rs +++ b/redis/src/cluster_async/connections_container.rs @@ -1,14 +1,13 @@ use crate::cluster_async::ConnectionFuture; +use crate::cluster_routing::{Route, SlotAddr}; +use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue}; +use crate::cluster_topology::TopologyHash; use arcstr::ArcStr; +use dashmap::DashMap; use futures::FutureExt; use rand::seq::IteratorRandom; -use std::collections::HashMap; use std::net::IpAddr; -use crate::cluster_routing::{Route, SlotAddr}; -use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue}; -use crate::cluster_topology::TopologyHash; - /// A struct that encapsulates a network connection along with its associated IP address. #[derive(Clone, Eq, PartialEq, Debug)] pub struct ConnectionWithIp { @@ -86,11 +85,12 @@ pub(crate) enum ConnectionType { PreferManagement, } -pub(crate) struct ConnectionsMap(pub(crate) HashMap>); +pub(crate) struct ConnectionsMap(pub(crate) DashMap>); impl std::fmt::Display for ConnectionsMap { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - for (address, node) in self.0.iter() { + for item in self.0.iter() { + let (address, node) = (item.key(), item.value()); match node.user_connection.ip { Some(ip) => writeln!(f, "{address} - {ip}")?, None => writeln!(f, "{address}")?, @@ -101,7 +101,7 @@ impl std::fmt::Display for ConnectionsMap { } pub(crate) struct ConnectionsContainer { - connection_map: HashMap>, + connection_map: DashMap>, pub(crate) slot_map: SlotMap, read_from_replica_strategy: ReadFromReplicaStrategy, topology_hash: TopologyHash, @@ -213,9 +213,10 @@ where pub(crate) fn all_node_connections( &self, ) -> impl Iterator> + '_ { - self.connection_map - .iter() - .map(move |(address, node)| (address.clone(), node.user_connection.conn.clone())) + self.connection_map.iter().map(move |item| { + let (node, address) = (item.key(), item.value()); + (node.clone(), address.user_connection.conn.clone()) + }) } pub(crate) fn all_primary_connections( @@ -228,16 +229,19 @@ where } pub(crate) fn node_for_address(&self, address: &str) -> Option> { - self.connection_map.get(address).cloned() + self.connection_map + .get(address) + .map(|item| item.value().clone()) } pub(crate) fn connection_for_address( &self, address: &str, ) -> Option> { - self.connection_map - .get_key_value(address) - .map(|(address, conn)| (address.clone(), conn.user_connection.conn.clone())) + self.connection_map.get(address).map(|item| { + let (address, conn) = (item.key(), item.value()); + (address.clone(), conn.user_connection.conn.clone()) + }) } pub(crate) fn random_connections( @@ -249,14 +253,15 @@ where .iter() .choose_multiple(&mut rand::thread_rng(), amount) .into_iter() - .map(move |(address, node)| { + .map(move |item| { + let (address, node) = (item.key(), item.value()); let conn = node.get_connection(&conn_type); (address.clone(), conn) }) } pub(crate) fn replace_or_add_connection_for_address( - &mut self, + &self, address: impl Into, node: ClusterNode, ) -> ArcStr { @@ -265,8 +270,10 @@ where address } - pub(crate) fn remove_node(&mut self, address: &ArcStr) -> Option> { - self.connection_map.remove(address) + pub(crate) fn remove_node(&self, address: &ArcStr) -> Option> { + self.connection_map + .remove(address) + .map(|(_key, value)| value) } pub(crate) fn len(&self) -> usize { @@ -302,13 +309,13 @@ mod tests { } } } - fn remove_nodes(container: &mut ConnectionsContainer, addresss: &[&str]) { + fn remove_nodes(container: &ConnectionsContainer, addresss: &[&str]) { for address in addresss { container.remove_node(&(*address).into()); } } - fn remove_all_connections(container: &mut ConnectionsContainer) { + fn remove_all_connections(container: &ConnectionsContainer) { remove_nodes( container, &[ @@ -366,7 +373,7 @@ mod tests { ], ReadFromReplicaStrategy::AlwaysFromPrimary, // this argument shouldn't matter, since we overload the RFR strategy. ); - let mut connection_map = HashMap::new(); + let connection_map = DashMap::new(); connection_map.insert( "primary1".into(), create_cluster_node(1, use_management_connections), @@ -514,7 +521,7 @@ mod tests { #[test] fn get_replica_connection_for_replica_route_if_some_but_not_all_replicas_were_removed() { - let mut container = create_container(); + let container = create_container(); container.remove_node(&"replica3-2".into()); assert_eq!( @@ -540,8 +547,8 @@ mod tests { #[test] fn get_primary_connection_for_replica_route_if_all_replicas_were_removed() { - let mut container = create_container(); - remove_nodes(&mut container, &["replica2-1", "replica3-1", "replica3-2"]); + let container = create_container(); + remove_nodes(&container, &["replica2-1", "replica3-1", "replica3-2"]); assert_eq!( 2, @@ -593,7 +600,7 @@ mod tests { #[test] fn get_connection_by_address_returns_none_if_connection_was_removed() { - let mut container = create_container(); + let container = create_container(); container.remove_node(&"primary1".into()); assert!(container.connection_for_address("primary1").is_none()); @@ -601,7 +608,7 @@ mod tests { #[test] fn get_connection_by_address_returns_added_connection() { - let mut container = create_container(); + let container = create_container(); let address = container.replace_or_add_connection_for_address( "foobar", ClusterNode::new_only_with_user_conn(4), @@ -630,8 +637,8 @@ mod tests { #[test] fn get_random_connections_returns_none_if_all_connections_were_removed() { - let mut container = create_container(); - remove_all_connections(&mut container); + let container = create_container(); + remove_all_connections(&container); assert_eq!( 0, @@ -643,8 +650,8 @@ mod tests { #[test] fn get_random_connections_returns_added_connection() { - let mut container = create_container(); - remove_all_connections(&mut container); + let container = create_container(); + remove_all_connections(&container); let address = container.replace_or_add_connection_for_address( "foobar", ClusterNode::new_only_with_user_conn(4), @@ -694,7 +701,7 @@ mod tests { #[test] fn get_all_user_connections_returns_added_connection() { - let mut container = create_container(); + let container = create_container(); container.replace_or_add_connection_for_address( "foobar", ClusterNode::new_only_with_user_conn(4), @@ -711,7 +718,7 @@ mod tests { #[test] fn get_all_user_connections_does_not_return_removed_connection() { - let mut container = create_container(); + let container = create_container(); container.remove_node(&"primary1".into()); let mut connections: Vec<_> = container @@ -738,7 +745,7 @@ mod tests { #[test] fn get_all_primaries_does_not_return_removed_connection() { - let mut container = create_container(); + let container = create_container(); container.remove_node(&"primary1".into()); let mut connections: Vec<_> = container @@ -752,7 +759,7 @@ mod tests { #[test] fn len_is_adjusted_on_removals_and_additions() { - let mut container = create_container(); + let container = create_container(); assert_eq!(container.len(), 6); @@ -769,7 +776,7 @@ mod tests { #[test] fn len_is_not_adjusted_on_removals_of_nonexisting_connections_or_additions_of_existing_connections( ) { - let mut container = create_container(); + let container = create_container(); assert_eq!(container.len(), 6); @@ -785,7 +792,7 @@ mod tests { #[test] fn remove_node_returns_connection_if_it_exists() { - let mut container = create_container(); + let container = create_container(); let connection = container.remove_node(&"primary1".into()); assert_eq!(connection, Some(ClusterNode::new_only_with_user_conn(1))); @@ -796,7 +803,7 @@ mod tests { #[test] fn test_is_empty() { - let mut container = create_container(); + let container = create_container(); assert!(!container.is_empty()); container.remove_node(&"primary1".into()); @@ -829,7 +836,7 @@ mod tests { #[test] fn is_primary_returns_false_for_removed_node() { - let mut container = create_container(); + let container = create_container(); let address = "primary1".into(); container.remove_node(&address); diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 0a4d56e998..5aa0ea7466 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -39,6 +39,7 @@ use crate::{ }; #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] use async_std::task::{spawn, JoinHandle}; +use dashmap::DashMap; #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] use futures::executor::block_on; use std::{ @@ -1090,10 +1091,10 @@ where .buffer_unordered(initial_nodes.len()) .fold( ( - ConnectionsMap(HashMap::with_capacity(initial_nodes.len())), + ConnectionsMap(DashMap::with_capacity(initial_nodes.len())), None, ), - |mut connections: (ConnectionMap, Option), addr_conn_res| async move { + |connections: (ConnectionMap, Option), addr_conn_res| async move { match addr_conn_res { Ok((addr, node)) => { connections.0 .0.insert(addr.into(), node); @@ -1156,14 +1157,14 @@ where conn_type: RefreshConnectionType, ) { info!("Started refreshing connections to {:?}", addresses); - let mut connections_container = inner.conn_lock.write().await; + let connections_container = inner.conn_lock.read().await; let cluster_params = &inner.cluster_params; let subscriptions_by_address = &inner.subscriptions_by_address; let push_sender = &inner.push_sender; stream::iter(addresses.into_iter()) .fold( - &mut *connections_container, + &*connections_container, |connections_container, address| async move { let node_option = connections_container.remove_node(&address); @@ -1476,12 +1477,12 @@ where drop(subs_by_address_guard); if !addrs_to_refresh.is_empty() { - let mut conns_write_guard = inner.conn_lock.write().await; + let conns_read_guard = inner.conn_lock.read().await; // have to remove or otherwise the refresh_connection wont trigger node recreation for addr_to_refresh in addrs_to_refresh.iter() { - conns_write_guard.remove_node(addr_to_refresh); + conns_read_guard.remove_node(addr_to_refresh); } - drop(conns_write_guard); + drop(conns_read_guard); // immediately trigger connection reestablishment Self::refresh_connections( inner.clone(), @@ -1609,8 +1610,8 @@ where .await; let new_connections: ConnectionMap = stream::iter(addresses_and_connections_iter) .fold( - ConnectionsMap(HashMap::with_capacity(nodes_len)), - |mut connections, (addr, node)| async { + ConnectionsMap(DashMap::with_capacity(nodes_len)), + |connections, (addr, node)| async { let mut cluster_params = inner.cluster_params.clone(); let subs_guard = inner.subscriptions_by_address.read().await; cluster_params.pubsub_subscriptions = @@ -1923,7 +1924,7 @@ where { Ok(node) => { let connection_clone = node.user_connection.conn.clone().await; - let mut connections = core.conn_lock.write().await; + let connections = core.conn_lock.write().await; let address = connections.replace_or_add_connection_for_address(addr, node); drop(connections); (address, connection_clone)