Skip to content

Commit

Permalink
Fixed reconnect_to_initial_nodes not to replace the current connectio…
Browse files Browse the repository at this point in the history
…n map but to extend it
  • Loading branch information
barshaul committed Aug 14, 2024
1 parent b43a07e commit da6e639
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 6 deletions.
39 changes: 39 additions & 0 deletions redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ where
}
}

// Extends the current connection map with the provided one
pub(crate) fn extend_connection_map(
&mut self,
other_connection_map: ConnectionsMap<Connection>,
) {
self.connection_map.extend(other_connection_map.0);
}

/// Returns true if the address represents a known primary node.
pub(crate) fn is_primary(&self, address: &String) -> bool {
self.connection_for_address(address).is_some()
Expand Down Expand Up @@ -177,6 +185,7 @@ where

fn lookup_route(&self, route: &Route) -> Option<ConnectionAndAddress<Connection>> {
let slot_map_value = self.slot_map.slot_value_for_route(route)?;
println!("slot_map_value={slot_map_value:?}");
let addrs = &slot_map_value.addrs;
if addrs.replicas.is_empty() {
return self.connection_for_address(addrs.primary.as_str());
Expand Down Expand Up @@ -270,6 +279,7 @@ where
}

pub(crate) fn remove_node(&self, address: &String) -> Option<ClusterNode<Connection>> {
println!("removing node {address}");
self.connection_map
.remove(address)
.map(|(_key, value)| value)
Expand Down Expand Up @@ -841,4 +851,33 @@ mod tests {

assert!(!container.is_primary(&address));
}

#[test]
fn test_extend_connection_map() {
let mut container = create_container();
let mut current_addresses: Vec<_> = container
.all_node_connections()
.map(|conn| conn.0)
.collect();

let new_node = "new_primary1".to_string();
// Check that `new_node` not exists in the current
assert!(container.connection_for_address(&new_node).is_none());
// Create new connection map
let new_connection_map = DashMap::new();
new_connection_map.insert(new_node.clone(), create_cluster_node(1, false));

// Extend the current connection map
container.extend_connection_map(ConnectionsMap(new_connection_map));

// Check that the new addresses vector contains both the new node and all previous nodes
let mut new_addresses: Vec<_> = container
.all_node_connections()
.map(|conn| conn.0)
.collect();
current_addresses.push(new_node);
current_addresses.sort();
new_addresses.sort();
assert_eq!(current_addresses, new_addresses);
}
}
7 changes: 1 addition & 6 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1132,12 +1132,7 @@ where
}
};
let mut write_lock = inner.conn_lock.write().await;
*write_lock = ConnectionsContainer::new(
Default::default(),
connection_map,
inner.cluster_params.read_from_replicas,
0,
);
write_lock.extend_connection_map(connection_map);
drop(write_lock);
if let Err(err) = Self::refresh_slots_and_subscriptions_with_retries(
inner.clone(),
Expand Down

0 comments on commit da6e639

Please sign in to comment.