diff --git a/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs b/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs index de1ca2aa51..c04692a493 100644 --- a/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs +++ b/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs @@ -307,7 +307,9 @@ impl ElectrumClient { let request = (req_id, request); // Use the active connections for this request. let connections = self.connection_manager.get_active_connections().await; - let concurrency = connections.len() as u32; + // Maximum number of connections to establish or use in request concurrently. + // Could be up to connections.len(). + let concurrency = 1; match self .send_request_using(&request, connections, send_to_all, concurrency) .await @@ -316,7 +318,14 @@ impl ElectrumClient { // If we failed the request using only the active connections, try again using all connections. Err(_) => { let connections = self.connection_manager.get_all_connections(); - match self.send_request_using(&request, connections, send_to_all, 1).await { + // The concurrency here must be `1`, because we are trying out connections that aren't maintained + // which means we might break the max connections rule. + // We will at most we will break this rule by `1` (have `max_connected + 1` open connections). + let concurrency = 1; + match self + .send_request_using(&request, connections, send_to_all, concurrency) + .await + { Ok(response) => Ok(response), Err(err_vec) => Err(JsonRpcErrorType::Internal(format!("All servers errored: {err_vec:?}"))), } diff --git a/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs b/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs index 1de26f0695..271daad331 100644 --- a/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs +++ b/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs @@ -1,5 +1,4 @@ use std::collections::{BTreeMap, HashMap}; -use std::iter::FromIterator; use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, Weak}; use super::super::client::{ElectrumClient, ElectrumClientImpl}; @@ -12,11 +11,10 @@ use common::executor::abortable_queue::AbortableQueue; use common::executor::{AbortableSystem, SpawnFuture, Timer}; use common::notifier::{Notifiee, Notifier}; use common::now_ms; -use futures::stream::FuturesUnordered; use keys::Address; use futures::compat::Future01CompatExt; -use futures::{FutureExt, StreamExt}; +use futures::FutureExt; /// A macro to unwrap an option and *execute* some code if the option is None. macro_rules! unwrap_or_else { @@ -343,7 +341,7 @@ impl ConnectionManager { let mut min_connected_notification = unwrap_or_return!(self.extract_below_min_connected_notifiee()); loop { // Get the candidate connections that we will consider maintaining. - let (will_never_get_min_connected, candidate_connections, currently_connected) = { + let (will_never_get_min_connected, mut candidate_connections) = { let all_connections = self.read_connections(); let maintained_connections = self.read_maintained_connections(); let currently_connected = maintained_connections.len() as u32; @@ -353,13 +351,14 @@ impl ConnectionManager { let all_candidate_connections: Vec<_> = all_connections .iter() .filter_map(|(_, conn_ctx)| { - (!maintained_connections.contains_key(&conn_ctx.id)).then(|| conn_ctx.connection.clone()) + (!maintained_connections.contains_key(&conn_ctx.id)) + .then(|| (conn_ctx.connection.clone(), conn_ctx.id)) }) .collect(); // The candidate connections from above, but further filtered by whether they are suspended or not. let non_suspended_candidate_connections: Vec<_> = all_candidate_connections .iter() - .filter(|connection| { + .filter(|(connection, _)| { all_connections .get(connection.address()) .map_or(false, |conn_ctx| now_ms() > conn_ctx.suspended_till()) @@ -371,62 +370,62 @@ impl ConnectionManager { if connections_needed > all_candidate_connections.len() as u32 { // Not enough connections to cover the `min_connected` threshold. // This means we will never be able to maintain `min_connected` active connections. - (true, all_candidate_connections, currently_connected) + (true, all_candidate_connections) } else { // If we consider all candidate connection (but some are suspended), we can cover the needed connections. // We will consider the suspended ones since if we don't we will stay below `min_connected` threshold. - (false, all_candidate_connections, currently_connected) + (false, all_candidate_connections) } } else { // Non suspended candidates are enough to cover the needed connections. - (false, non_suspended_candidate_connections, currently_connected) + (false, non_suspended_candidate_connections) } }; // Establish the connections to the selected candidates and alter the maintained connections set accordingly. { let client = unwrap_or_return!(self.get_client()); - // This is the maximum connections we can have being established concurrently. - let allowed_concurrency = - self.config().max_connected.saturating_sub(currently_connected).max(1) as usize; - let connection_loop_chunks = candidate_connections.chunks(allowed_concurrency).map(|chunk| { - FuturesUnordered::from_iter(chunk.iter().map(|connection| { - let address = connection.address().to_string(); - connection - .establish_connection_loop(client.clone()) - .map(|res| (address, res)) - })) - }); + // Sort the candidate connections by their priority/ID. + candidate_connections.sort_by_key(|(_, priority)| *priority); + // Pick each chunk of connections and establish them concurrently. - for mut connection_loops in connection_loop_chunks { - while let Some((address, result)) = connection_loops.next().await { - if let Err(e) = result { + for (connection, _) in candidate_connections { + let address = connection.address().to_string(); + let connection_id = unwrap_or_continue!(self.read_connections().get(&address)).id; + let (maintained_connections_size, lowest_priority_connection_id) = { + let maintained_connections = self.read_maintained_connections(); + let maintained_connections_size = maintained_connections.len() as u32; + let lowest_priority_connection_id = + *maintained_connections.keys().next_back().unwrap_or(&u32::MAX); + (maintained_connections_size, lowest_priority_connection_id) + }; + + // We can only try to add the connection if: + // 1- We haven't reached the `max_connected` threshold. + // 2- We have reached the `max_connected` threshold but the connection has a higher priority than the lowest priority connection. + if maintained_connections_size < self.config().max_connected + || connection_id < lowest_priority_connection_id + { + // Now that we know the connection is good to be inserted, try to establish it. + if let Err(e) = connection.establish_connection_loop(client.clone()).await { // Remove the connection if it's not recoverable. if !e.is_recoverable() { self.remove_connection(&address).await.ok(); } continue; } - let connection_id = unwrap_or_continue!(self.read_connections().get(&address)).id; - let maintained_connections = self.read_maintained_connections(); - let maintained_connections_size = maintained_connections.len() as u32; - let lowest_priority_connection_id = - *maintained_connections.keys().next_back().unwrap_or(&u32::MAX); - // NOTE: Must drop to avoid deadlock with the write lock below. - drop(maintained_connections); - // We don't write-lock the maintained connections unless we know we will add this connection. - // That is, we can add it because we didn't hit the `max_connected` threshold, - // or we can add it because it is of a higher priority than the lowest priority connection. - if maintained_connections_size < self.config().max_connected - || connection_id < lowest_priority_connection_id - { - let mut maintained_connections = self.write_maintained_connections(); - maintained_connections.insert(connection_id, address); - // If we have reached the `max_connected` threshold then remove the lowest priority connection. - if !maintained_connections_size < self.config().max_connected { - maintained_connections.remove(&lowest_priority_connection_id); - } + let mut maintained_connections = self.write_maintained_connections(); + maintained_connections.insert(connection_id, address); + // If we have reached the `max_connected` threshold then remove the lowest priority connection. + if !maintained_connections_size < self.config().max_connected { + maintained_connections.remove(&lowest_priority_connection_id); } + } else { + // If any of the two conditions on the `if` statement above are not met, there is nothing to do. + // At this point we have already collected `max_connected` connections and also the current connection + // in the candidate list has a lower priority than the lowest priority maintained connection, and the next + // candidate connections as well since they are sorted by priority. + break; } } }