Skip to content

Commit

Permalink
Merge pull request #163 from amazon-contributing/improve_pubsub
Browse files Browse the repository at this point in the history
1. Fixes and optimizes pubsub refresh mechanic by refreshing only for the existing addresses and utilizing read lock. 2. Adds error log to refresh_connections() and improves code comments
  • Loading branch information
ikolomi authored Jun 27, 2024
2 parents ea3d906 + e219e2e commit 709796a
Showing 1 changed file with 46 additions and 27 deletions.
73 changes: 46 additions & 27 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1057,8 +1057,17 @@ where
push_sender.clone(),
)
.await;
if let Ok(node) = node {
connections_container.replace_or_add_connection_for_address(address, node);
match node {
Ok(node) => {
connections_container
.replace_or_add_connection_for_address(address, node);
}
Err(err) => {
warn!(
"Failed to refresh connection for node {}. Error: `{:?}`",
address, err
);
}
}
connections_container
},
Expand Down Expand Up @@ -1224,6 +1233,10 @@ where
let _ = boxed_sleep(interval_duration).await;
let topology_changed = Self::check_topology_and_refresh_if_diff(inner.clone()).await;
if !topology_changed {
// This serves as a safety measure for validating pubsub subsctiptions state in case it has drifted
// while topology stayed the same.
// For example, a failed attempt to refresh a connection which is triggered from refresh_pubsub_subscriptions(),
// might leave a node unconnected indefinitely in case topology is stable and no request are attempted to this node.
Self::refresh_pubsub_subscriptions(inner.clone()).await;
}
}
Expand All @@ -1234,30 +1247,33 @@ where
return;
}

// validate active subscriptions location
let mut addrs_to_refresh: HashSet<ArcStr> = HashSet::new();
// lock the subscriptions and connections for writing - might be possible to optimize
let mut subs_by_address_guard = inner.subscriptions_by_address.write().await;
let mut unassigned_subs_guard = inner.unassigned_subscriptions.write().await;
let conns_read_guard = inner.conn_lock.read().await;

let mut conns_write_guard = inner.conn_lock.write().await;

subs_by_address_guard.retain(|addr, addr_subs| {
addr_subs.retain(|kind, channels_patterns| {
// validate active subscriptions location
subs_by_address_guard.retain(|current_address, address_subs| {
address_subs.retain(|kind, channels_patterns| {
channels_patterns.retain(|channel_pattern| {
let new_slot = get_slot(channel_pattern);
let mut valid = false;
if let Some((new_address, _)) = conns_write_guard
if let Some((new_address, _)) = conns_read_guard
.connection_for_route(&Route::new(new_slot, SlotAddr::Master))
{
if *new_address == *addr {
if *new_address == *current_address {
valid = true;
}
}
// no new address or new address differ - move to unassigned and store this address for connection reset
if !valid {
addrs_to_refresh.insert(addr.clone()); // need to drop the original connection for clearing the subscription in the server, avoiding possible double-receivers
conns_write_guard.remove_node(addr);
// need to drop the original connection for clearing the subscription in the server, avoiding possible double-receivers
if conns_read_guard
.connection_for_address(current_address)
.is_some()
{
addrs_to_refresh.insert(current_address.clone());
}

unassigned_subs_guard
.entry(*kind)
Expand All @@ -1270,15 +1286,15 @@ where
});
!channels_patterns.is_empty()
});
!addr_subs.is_empty()
!address_subs.is_empty()
});

// try to assign new addresses
unassigned_subs_guard.retain(|kind: &PubSubSubscriptionKind, channels_patterns| {
channels_patterns.retain(|channel_pattern| {
let new_slot = get_slot(channel_pattern);
if let Some((new_address, _)) =
conns_write_guard.connection_for_route(&Route::new(new_slot, SlotAddr::Master))
conns_read_guard.connection_for_route(&Route::new(new_slot, SlotAddr::Master))
{
// need to drop the new connection so the subscription will be picked up in setup_connection()
addrs_to_refresh.insert(new_address.clone());
Expand All @@ -1298,22 +1314,25 @@ where
!channels_patterns.is_empty()
});

// have to remove or otherwise the refresh_connection wont trigger node recreation
for addr_to_refresh in addrs_to_refresh.iter() {
conns_write_guard.remove_node(addr_to_refresh);
}

drop(conns_write_guard);
drop(conns_read_guard);
drop(unassigned_subs_guard);
drop(subs_by_address_guard);

// immediately trigger connection reestablishment
Self::refresh_connections(
inner.clone(),
addrs_to_refresh.into_iter().collect(),
RefreshConnectionType::AllConnections,
)
.await;
if !addrs_to_refresh.is_empty() {
let mut conns_write_guard = inner.conn_lock.write().await;
// have to remove or otherwise the refresh_connection wont trigger node recreation
for addr_to_refresh in addrs_to_refresh.iter() {
conns_write_guard.remove_node(addr_to_refresh);
}
drop(conns_write_guard);
// immediately trigger connection reestablishment
Self::refresh_connections(
inner.clone(),
addrs_to_refresh.into_iter().collect(),
RefreshConnectionType::AllConnections,
)
.await;
}
}

/// Queries log2n nodes (where n represents the number of cluster nodes) to determine whether their
Expand Down

0 comments on commit 709796a

Please sign in to comment.