Skip to content

Commit

Permalink
suggestion from omar - 2 - (un)maintain methods
Browse files Browse the repository at this point in the history
removed write_maintained_connections and added maintain & unmaintain methods.
  • Loading branch information
mariocynicys committed Sep 12, 2024
1 parent 950b69b commit db4cfe7
Showing 1 changed file with 25 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -280,15 +280,7 @@ impl ConnectionManager {
let all_connections = self.read_connections();
let connection_ctx = unwrap_or_return!(all_connections.get(server_address));

if self.read_maintained_connections().contains_key(&connection_ctx.id) {
// If the connection was maintained, remove it from the maintained connections.
let mut maintained_connections = self.write_maintained_connections();
maintained_connections.remove(&connection_ctx.id);
// And notify the background task if we fell below the `min_connected` threshold.
if (maintained_connections.len() as u32) < self.config().min_connected {
self.notify_below_min_connected()
}
}
self.unmaintain(connection_ctx.id);

let abandoned_subs = connection_ctx.disconnected();
// Re-subscribe the abandoned addresses using the client.
Expand Down Expand Up @@ -407,8 +399,6 @@ impl ConnectionManager {
let client = unwrap_or_return!(self.get_client());
// Sort the candidate connections by their priority/ID.
candidate_connections.sort_by_key(|(_, priority)| *priority);

// Pick each chunk of connections and establish them concurrently.
for (connection, _) in candidate_connections {
let address = connection.address().to_string();
let connection_id = unwrap_or_continue!(self.read_connections().get(&address)).id;
Expand All @@ -434,12 +424,7 @@ impl ConnectionManager {
}
continue;
}
let mut maintained_connections = self.write_maintained_connections();
maintained_connections.insert(connection_id, address);
// If we have reached the `max_connected` threshold then remove the lowest priority connection.
if !maintained_connections_size < self.config().max_connected {
maintained_connections.remove(&lowest_priority_connection_id);
}
self.maintain(connection_id, address);
} else {
// If any of the two conditions on the `if` statement above are not met, there is nothing to do.
// At this point we have already collected `max_connected` connections and also the current connection
Expand Down Expand Up @@ -493,8 +478,29 @@ impl ConnectionManager {
}

#[inline]
fn write_maintained_connections(&self) -> RwLockWriteGuard<BTreeMap<ID, String>> {
self.0.maintained_connections.write().unwrap()
fn maintain(&self, id: ID, server_address: String) {
let mut maintained_connections = self.0.maintained_connections.write().unwrap();
maintained_connections.insert(id, server_address);
// If we have reached the `max_connected` threshold then remove the lowest priority connection.
if (maintained_connections.len() as u32) > self.config().max_connected {
let lowest_priority_connection_id = *maintained_connections.keys().next_back().unwrap_or(&u32::MAX);
maintained_connections.remove(&lowest_priority_connection_id);
}
}

#[inline]
fn unmaintain(&self, id: ID) {
// To avoid write locking the maintained connections, just make sure the connection is actually maintained first.
let is_maintained = self.read_maintained_connections().contains_key(&id);
if is_maintained {
// If the connection was maintained, remove it from the maintained connections.
let mut maintained_connections = self.0.maintained_connections.write().unwrap();
maintained_connections.remove(&id);
// And notify the background task if we fell below the `min_connected` threshold.
if (maintained_connections.len() as u32) < self.config().min_connected {
self.notify_below_min_connected()
}
}
}

#[inline]
Expand Down

0 comments on commit db4cfe7

Please sign in to comment.