diff --git a/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs b/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs index 5b4fef056d..a431ba159e 100644 --- a/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs +++ b/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs @@ -307,11 +307,11 @@ impl ElectrumClient { let request = (req_id, request); // Use the active connections for this request. let connections = self.connection_manager.get_active_connections().await; - // Maximum number of connections to establish or use in request concurrently. - // Could be up to connections.len(). - let concurrency = 1; + // Maximum number of connections to establish or use in request concurrently. Could be up to connections.len(). + let concurrency = if send_to_all { connections.len() as u32 } else { 1 }; + let disconnect_immediately = false; match self - .send_request_using(&request, connections, send_to_all, concurrency) + .send_request_using(&request, connections, send_to_all, concurrency, disconnect_immediately) .await { Ok(response) => Ok(response), @@ -322,8 +322,9 @@ impl ElectrumClient { // which means we might break the max connections rule. // We will at most we will break this rule by `1` (have `max_connected + 1` open connections). let concurrency = 1; + let disconnect_immediately = true; match self - .send_request_using(&request, connections, send_to_all, concurrency) + .send_request_using(&request, connections, send_to_all, concurrency, disconnect_immediately) .await { Ok(response) => Ok(response), @@ -353,6 +354,8 @@ impl ElectrumClient { let response = connection .electrum_request(json, request.rpc_id(), ELECTRUM_REQUEST_TIMEOUT) .await?; + // Inform the connection manager that the connection was queried and no longer needed now. + self.connection_manager.not_needed(&to_addr).await; Ok(response) } @@ -368,6 +371,7 @@ impl ElectrumClient { connections: Vec>, send_to_all: bool, max_concurrency: u32, + disconnect_immediately: bool, ) -> Result<(JsonRpcRemoteAddr, JsonRpcResponseEnum), Vec<(JsonRpcRemoteAddr, JsonRpcErrorType)>> { let max_concurrency = max_concurrency.max(1) as usize; // Create the request @@ -408,6 +412,14 @@ impl ElectrumClient { if final_response.is_none() { final_response = Some((address, response)); } + if disconnect_immediately { + connection + .disconnect(Some(ElectrumConnectionErr::Temporary( + "Was used as a fallback and not needed now.".to_string(), + ))) + .await; + event_handlers.on_disconnected(connection.address()).ok(); + } if !send_to_all { return final_response.ok_or(errors); } diff --git a/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs b/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs index 271daad331..5f583f37c3 100644 --- a/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs +++ b/mm2src/coins/utxo/rpc_clients/electrum_rpc/connection_manager/manager.rs @@ -266,7 +266,7 @@ impl ConnectionManager { } } - // Handles the connection event. + /// Handles the connection event. pub fn on_connected(&self, server_address: &str) { let all_connections = self.read_connections(); let connection_ctx = unwrap_or_return!(all_connections.get(server_address)); @@ -275,7 +275,7 @@ impl ConnectionManager { connection_ctx.connected(); } - // Handles the disconnection event from an Electrum server. + /// Handles the disconnection event from an Electrum server. pub fn on_disconnected(&self, server_address: &str) { let all_connections = self.read_connections(); let connection_ctx = unwrap_or_return!(all_connections.get(server_address)); @@ -296,6 +296,24 @@ impl ConnectionManager { client.subscribe_addresses(abandoned_subs).ok(); } + /// A method that should be called after using a specific server for some request. + /// + /// Instead of disconnecting the connection right away, this method will only disconnect it + /// if it's not in the maintained connections set. + pub async fn not_needed(&self, server_address: &str) { + let (id, connection) = { + let all_connections = self.read_connections(); + let connection_ctx = unwrap_or_return!(all_connections.get(server_address)); + (connection_ctx.id, connection_ctx.connection.clone()) + }; + if !self.read_maintained_connections().contains_key(&id) { + connection + .disconnect(Some(ElectrumConnectionErr::Temporary("Not needed anymore".to_string()))) + .await; + self.on_disconnected(connection.address()); + } + } + /// Remove a connection from the connection manager by its address. // TODO(feat): Add the ability to add a connection during runtime. pub async fn remove_connection( diff --git a/mm2src/coins/utxo/rpc_clients/electrum_rpc/constants.rs b/mm2src/coins/utxo/rpc_clients/electrum_rpc/constants.rs index 4001e3481c..13c661e2d8 100644 --- a/mm2src/coins/utxo/rpc_clients/electrum_rpc/constants.rs +++ b/mm2src/coins/utxo/rpc_clients/electrum_rpc/constants.rs @@ -1,5 +1,5 @@ /// The timeout for the electrum server to respond to a request. -pub const ELECTRUM_REQUEST_TIMEOUT: f64 = 60.; +pub const ELECTRUM_REQUEST_TIMEOUT: f64 = 20.; /// The default (can be overridden) maximum timeout to establish a connection with the electrum server. /// This included connecting to the server and querying the server version. pub const DEFAULT_CONNECTION_ESTABLISHMENT_TIMEOUT: f64 = 60.; diff --git a/mm2src/coins/utxo/utxo_builder/utxo_coin_builder.rs b/mm2src/coins/utxo/utxo_builder/utxo_coin_builder.rs index 39a19167e7..98813e5c7d 100644 --- a/mm2src/coins/utxo/utxo_builder/utxo_coin_builder.rs +++ b/mm2src/coins/utxo/utxo_builder/utxo_coin_builder.rs @@ -68,9 +68,6 @@ pub enum UtxoCoinBuildError { )] CoinDoesntSupportTrezor, BlockHeaderStorageError(BlockHeaderStorageError), - // TODO: this isn't used - #[display(fmt = "Error {} on getting the height of the latest block from rpc!", _0)] - CantGetBlockCount(String), #[display(fmt = "Internal error: {}", _0)] Internal(String), #[display(fmt = "SPV params verificaiton failed. Error: {_0}")]