From abc52f9a955e464c2757168071716c633448f433 Mon Sep 17 00:00:00 2001 From: ikolomi Date: Wed, 26 Jun 2024 12:55:46 +0300 Subject: [PATCH] 1. Fixes and optimizes pubsub refresh mechanic by refreshing only for the existing addresses and utilizing read lock. 2. Adds error log to refresh_connections() and improves code comments --- redis/src/cluster_async/mod.rs | 73 +++++++++++++++++++++------------- 1 file changed, 46 insertions(+), 27 deletions(-) diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 75dbc566e..594ee4cbe 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -1070,8 +1070,17 @@ where push_sender.clone(), ) .await; - if let Ok(node) = node { - connections_container.replace_or_add_connection_for_address(address, node); + match node { + Ok(node) => { + connections_container + .replace_or_add_connection_for_address(address, node); + } + Err(err) => { + warn!( + "Failed to refresh connection for node {}. Error: `{:?}`", + address, err + ); + } } connections_container }, @@ -1210,6 +1219,10 @@ where if Self::check_for_topology_diff(inner.clone()).await { let _ = Self::refresh_slots_and_subscriptions_with_retries(inner.clone()).await; } else { + // This serves as a safety measure for validating pubsub subsctiptions state in case it has drifted + // while topology stayed the same. + // For example, a failed attempt to refresh a connection which is triggered from refresh_pubsub_subscriptions(), + // might leave a node unconnected indefinitely in case topology is stable and no request are attempted to this node. Self::refresh_pubsub_subscriptions(inner.clone()).await; } } @@ -1220,30 +1233,33 @@ where return; } - // validate active subscriptions location let mut addrs_to_refresh: HashSet = HashSet::new(); - // lock the subscriptions and connections for writing - might be possible to optimize let mut subs_by_address_guard = inner.subscriptions_by_address.write().await; let mut unassigned_subs_guard = inner.unassigned_subscriptions.write().await; + let conns_read_guard = inner.conn_lock.read().await; - let mut conns_write_guard = inner.conn_lock.write().await; - - subs_by_address_guard.retain(|addr, addr_subs| { - addr_subs.retain(|kind, channels_patterns| { + // validate active subscriptions location + subs_by_address_guard.retain(|current_address, address_subs| { + address_subs.retain(|kind, channels_patterns| { channels_patterns.retain(|channel_pattern| { let new_slot = get_slot(channel_pattern); let mut valid = false; - if let Some((new_address, _)) = conns_write_guard + if let Some((new_address, _)) = conns_read_guard .connection_for_route(&Route::new(new_slot, SlotAddr::Master)) { - if *new_address == *addr { + if *new_address == *current_address { valid = true; } } // no new address or new address differ - move to unassigned and store this address for connection reset if !valid { - addrs_to_refresh.insert(addr.clone()); // need to drop the original connection for clearing the subscription in the server, avoiding possible double-receivers - conns_write_guard.remove_node(addr); + // need to drop the original connection for clearing the subscription in the server, avoiding possible double-receivers + if conns_read_guard + .connection_for_address(current_address) + .is_some() + { + addrs_to_refresh.insert(current_address.clone()); + } unassigned_subs_guard .entry(*kind) @@ -1256,7 +1272,7 @@ where }); !channels_patterns.is_empty() }); - !addr_subs.is_empty() + !address_subs.is_empty() }); // try to assign new addresses @@ -1264,7 +1280,7 @@ where channels_patterns.retain(|channel_pattern| { let new_slot = get_slot(channel_pattern); if let Some((new_address, _)) = - conns_write_guard.connection_for_route(&Route::new(new_slot, SlotAddr::Master)) + conns_read_guard.connection_for_route(&Route::new(new_slot, SlotAddr::Master)) { // need to drop the new connection so the subscription will be picked up in setup_connection() addrs_to_refresh.insert(new_address.clone()); @@ -1284,22 +1300,25 @@ where !channels_patterns.is_empty() }); - // have to remove or otherwise the refresh_connection wont trigger node recreation - for addr_to_refresh in addrs_to_refresh.iter() { - conns_write_guard.remove_node(addr_to_refresh); - } - - drop(conns_write_guard); + drop(conns_read_guard); drop(unassigned_subs_guard); drop(subs_by_address_guard); - // immediately trigger connection reestablishment - Self::refresh_connections( - inner.clone(), - addrs_to_refresh.into_iter().collect(), - RefreshConnectionType::AllConnections, - ) - .await; + if !addrs_to_refresh.is_empty() { + let mut conns_write_guard = inner.conn_lock.write().await; + // have to remove or otherwise the refresh_connection wont trigger node recreation + for addr_to_refresh in addrs_to_refresh.iter() { + conns_write_guard.remove_node(addr_to_refresh); + } + drop(conns_write_guard); + // immediately trigger connection reestablishment + Self::refresh_connections( + inner.clone(), + addrs_to_refresh.into_iter().collect(), + RefreshConnectionType::AllConnections, + ) + .await; + } } /// Queries log2n nodes (where n represents the number of cluster nodes) to determine whether their