From 18a430ed589d8032aed5a475030b0d3f20056ceb Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Wed, 18 Sep 2024 14:32:38 +0300 Subject: [PATCH] don't disconnect immediately for the fall back case. But delegate whether to disconnect or not to the connection manager. this helps with concurrent requests disconnecting for no reason (so no error) and breaking other requests --- .../utxo/rpc_clients/electrum_rpc/client.rs | 28 +++++++------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs b/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs index 301de2fef9..ad21262b75 100644 --- a/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs +++ b/mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs @@ -309,22 +309,21 @@ impl ElectrumClient { let connections = self.connection_manager.get_active_connections().await; // Maximum number of connections to establish or use in request concurrently. Could be up to connections.len(). let concurrency = if send_to_all { connections.len() as u32 } else { 1 }; - let disconnect_immediately = false; match self - .send_request_using(&request, connections, send_to_all, concurrency, disconnect_immediately) + .send_request_using(&request, connections, send_to_all, concurrency) .await { Ok(response) => Ok(response), // If we failed the request using only the active connections, try again using all connections. Err(_) => { + warn!("Failed to send the request using active connections, trying all connections."); let connections = self.connection_manager.get_all_connections(); // The concurrency here must be `1`, because we are trying out connections that aren't maintained // which means we might break the max connections rule. // We will at most we will break this rule by `1` (have `max_connected + 1` open connections). let concurrency = 1; - let disconnect_immediately = true; match self - .send_request_using(&request, connections, send_to_all, concurrency, disconnect_immediately) + .send_request_using(&request, connections, send_to_all, concurrency) .await { Ok(response) => Ok(response), @@ -344,16 +343,17 @@ impl ElectrumClient { ) -> Result { // Whether to force the connection to be established (if not) before sending the request. let force_connect = !matches!(request, JsonRpcRequestEnum::Single(ref req) if NO_FORCE_CONNECT_METHODS.contains(&req.method.as_str())); + let json = json::to_string(&request).map_err(|err| JsonRpcErrorType::InvalidRequest(err.to_string()))?; + let connection = self .connection_manager .get_connection_by_address(&to_addr, force_connect) .await .map_err(|err| JsonRpcErrorType::Internal(err.to_string()))?; - let json = json::to_string(&request).map_err(|err| JsonRpcErrorType::InvalidRequest(err.to_string()))?; let response = connection .electrum_request(json, request.rpc_id(), ELECTRUM_REQUEST_TIMEOUT) - .await?; + .await; // If the request was not forcefully connected, we shouldn't inform the connection manager that it's // not needed anymore, as we didn't force spawn it in the first place. // This fixes dropping the connection after the version check request, as we don't mark the connection @@ -363,7 +363,7 @@ impl ElectrumClient { self.connection_manager.not_needed(&to_addr).await; } - Ok(response) + response } /// Sends a JSONRPC request to all the given connections in parallel and returns @@ -377,7 +377,6 @@ impl ElectrumClient { connections: Vec>, send_to_all: bool, max_concurrency: u32, - disconnect_immediately: bool, ) -> Result<(JsonRpcRemoteAddr, JsonRpcResponseEnum), Vec<(JsonRpcRemoteAddr, JsonRpcErrorType)>> { let max_concurrency = max_concurrency.max(1) as usize; // Create the request @@ -405,7 +404,7 @@ impl ElectrumClient { } })) }); - let event_handlers = self.event_handlers.clone(); + let client = self.clone(); let mut final_response = None; let mut errors = Vec::new(); // Iterate over the request chunks sequentially. @@ -418,14 +417,7 @@ impl ElectrumClient { if final_response.is_none() { final_response = Some((address, response)); } - if disconnect_immediately { - connection - .disconnect(Some(ElectrumConnectionErr::Temporary( - "Was used as a fallback and not needed now.".to_string(), - ))) - .await; - event_handlers.on_disconnected(connection.address()).ok(); - } + client.connection_manager.not_needed(connection.address()).await; if !send_to_all && final_response.is_some() { return Ok(final_response.unwrap()); } @@ -437,7 +429,7 @@ impl ElectrumClient { "Forcefully disconnected for erroring: {e:?}." )))) .await; - event_handlers.on_disconnected(connection.address()).ok(); + client.event_handlers.on_disconnected(connection.address()).ok(); errors.push((address, e)) }, }