Skip to content

Commit

Permalink
properly respect the max_connected threshold
Browse files Browse the repository at this point in the history
let the background thread of the connection manager respect the
`max_connected` threshold (not entirely really, we can reach up to
`max_connected + 1`).

This is done by refactoring the connection establishment to not
establish all connections but only connections that we know we will
maintain for sure. This also is lighter overall than the old approach.
but it's also slower since connections are tried sequentially, we don't
need the speed here anyways since if an electrum request fails we fail
back to try all connections which is a logic that isn't controlled by
the connection manager in any way.
  • Loading branch information
mariocynicys committed Sep 12, 2024
1 parent 1c0a179 commit 5c011ae
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 44 deletions.
13 changes: 11 additions & 2 deletions mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,9 @@ impl ElectrumClient {
let request = (req_id, request);
// Use the active connections for this request.
let connections = self.connection_manager.get_active_connections().await;
let concurrency = connections.len() as u32;
// Maximum number of connections to establish or use in request concurrently.
// Could be up to connections.len().
let concurrency = 1;
match self
.send_request_using(&request, connections, send_to_all, concurrency)
.await
Expand All @@ -316,7 +318,14 @@ impl ElectrumClient {
// If we failed the request using only the active connections, try again using all connections.
Err(_) => {
let connections = self.connection_manager.get_all_connections();
match self.send_request_using(&request, connections, send_to_all, 1).await {
// The concurrency here must be `1`, because we are trying out connections that aren't maintained
// which means we might break the max connections rule.
// We will at most we will break this rule by `1` (have `max_connected + 1` open connections).
let concurrency = 1;
match self
.send_request_using(&request, connections, send_to_all, concurrency)
.await
{
Ok(response) => Ok(response),
Err(err_vec) => Err(JsonRpcErrorType::Internal(format!("All servers errored: {err_vec:?}"))),
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::{BTreeMap, HashMap};
use std::iter::FromIterator;
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, Weak};

use super::super::client::{ElectrumClient, ElectrumClientImpl};
Expand All @@ -12,11 +11,10 @@ use common::executor::abortable_queue::AbortableQueue;
use common::executor::{AbortableSystem, SpawnFuture, Timer};
use common::notifier::{Notifiee, Notifier};
use common::now_ms;
use futures::stream::FuturesUnordered;
use keys::Address;

use futures::compat::Future01CompatExt;
use futures::{FutureExt, StreamExt};
use futures::FutureExt;

/// A macro to unwrap an option and *execute* some code if the option is None.
macro_rules! unwrap_or_else {
Expand Down Expand Up @@ -343,7 +341,7 @@ impl ConnectionManager {
let mut min_connected_notification = unwrap_or_return!(self.extract_below_min_connected_notifiee());
loop {
// Get the candidate connections that we will consider maintaining.
let (will_never_get_min_connected, candidate_connections, currently_connected) = {
let (will_never_get_min_connected, mut candidate_connections) = {
let all_connections = self.read_connections();
let maintained_connections = self.read_maintained_connections();
let currently_connected = maintained_connections.len() as u32;
Expand All @@ -353,13 +351,14 @@ impl ConnectionManager {
let all_candidate_connections: Vec<_> = all_connections
.iter()
.filter_map(|(_, conn_ctx)| {
(!maintained_connections.contains_key(&conn_ctx.id)).then(|| conn_ctx.connection.clone())
(!maintained_connections.contains_key(&conn_ctx.id))
.then(|| (conn_ctx.connection.clone(), conn_ctx.id))
})
.collect();
// The candidate connections from above, but further filtered by whether they are suspended or not.
let non_suspended_candidate_connections: Vec<_> = all_candidate_connections
.iter()
.filter(|connection| {
.filter(|(connection, _)| {
all_connections
.get(connection.address())
.map_or(false, |conn_ctx| now_ms() > conn_ctx.suspended_till())
Expand All @@ -371,62 +370,62 @@ impl ConnectionManager {
if connections_needed > all_candidate_connections.len() as u32 {
// Not enough connections to cover the `min_connected` threshold.
// This means we will never be able to maintain `min_connected` active connections.
(true, all_candidate_connections, currently_connected)
(true, all_candidate_connections)
} else {
// If we consider all candidate connection (but some are suspended), we can cover the needed connections.
// We will consider the suspended ones since if we don't we will stay below `min_connected` threshold.
(false, all_candidate_connections, currently_connected)
(false, all_candidate_connections)
}
} else {
// Non suspended candidates are enough to cover the needed connections.
(false, non_suspended_candidate_connections, currently_connected)
(false, non_suspended_candidate_connections)
}
};

// Establish the connections to the selected candidates and alter the maintained connections set accordingly.
{
let client = unwrap_or_return!(self.get_client());
// This is the maximum connections we can have being established concurrently.
let allowed_concurrency =
self.config().max_connected.saturating_sub(currently_connected).max(1) as usize;
let connection_loop_chunks = candidate_connections.chunks(allowed_concurrency).map(|chunk| {
FuturesUnordered::from_iter(chunk.iter().map(|connection| {
let address = connection.address().to_string();
connection
.establish_connection_loop(client.clone())
.map(|res| (address, res))
}))
});
// Sort the candidate connections by their priority/ID.
candidate_connections.sort_by_key(|(_, priority)| *priority);

// Pick each chunk of connections and establish them concurrently.
for mut connection_loops in connection_loop_chunks {
while let Some((address, result)) = connection_loops.next().await {
if let Err(e) = result {
for (connection, _) in candidate_connections {
let address = connection.address().to_string();
let connection_id = unwrap_or_continue!(self.read_connections().get(&address)).id;
let (maintained_connections_size, lowest_priority_connection_id) = {
let maintained_connections = self.read_maintained_connections();
let maintained_connections_size = maintained_connections.len() as u32;
let lowest_priority_connection_id =
*maintained_connections.keys().next_back().unwrap_or(&u32::MAX);
(maintained_connections_size, lowest_priority_connection_id)
};

// We can only try to add the connection if:
// 1- We haven't reached the `max_connected` threshold.
// 2- We have reached the `max_connected` threshold but the connection has a higher priority than the lowest priority connection.
if maintained_connections_size < self.config().max_connected
|| connection_id < lowest_priority_connection_id
{
// Now that we know the connection is good to be inserted, try to establish it.
if let Err(e) = connection.establish_connection_loop(client.clone()).await {
// Remove the connection if it's not recoverable.
if !e.is_recoverable() {
self.remove_connection(&address).await.ok();
}
continue;
}
let connection_id = unwrap_or_continue!(self.read_connections().get(&address)).id;
let maintained_connections = self.read_maintained_connections();
let maintained_connections_size = maintained_connections.len() as u32;
let lowest_priority_connection_id =
*maintained_connections.keys().next_back().unwrap_or(&u32::MAX);
// NOTE: Must drop to avoid deadlock with the write lock below.
drop(maintained_connections);
// We don't write-lock the maintained connections unless we know we will add this connection.
// That is, we can add it because we didn't hit the `max_connected` threshold,
// or we can add it because it is of a higher priority than the lowest priority connection.
if maintained_connections_size < self.config().max_connected
|| connection_id < lowest_priority_connection_id
{
let mut maintained_connections = self.write_maintained_connections();
maintained_connections.insert(connection_id, address);
// If we have reached the `max_connected` threshold then remove the lowest priority connection.
if !maintained_connections_size < self.config().max_connected {
maintained_connections.remove(&lowest_priority_connection_id);
}
let mut maintained_connections = self.write_maintained_connections();
maintained_connections.insert(connection_id, address);
// If we have reached the `max_connected` threshold then remove the lowest priority connection.
if !maintained_connections_size < self.config().max_connected {
maintained_connections.remove(&lowest_priority_connection_id);
}
} else {
// If any of the two conditions on the `if` statement above are not met, there is nothing to do.
// At this point we have already collected `max_connected` connections and also the current connection
// in the candidate list has a lower priority than the lowest priority maintained connection, and the next
// candidate connections as well since they are sorted by priority.
break;
}
}
}
Expand Down

0 comments on commit 5c011ae

Please sign in to comment.