Skip to content

Commit

Permalink
don't disconnect immediately
Browse files Browse the repository at this point in the history
for the fall back case. But delegate whether to disconnect or not to the connection manager.
this helps with concurrent requests disconnecting for no reason (so no error) and breaking other requests
  • Loading branch information
mariocynicys committed Sep 18, 2024
1 parent c58da8f commit 18a430e
Showing 1 changed file with 10 additions and 18 deletions.
28 changes: 10 additions & 18 deletions mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,22 +309,21 @@ impl ElectrumClient {
let connections = self.connection_manager.get_active_connections().await;
// Maximum number of connections to establish or use in request concurrently. Could be up to connections.len().
let concurrency = if send_to_all { connections.len() as u32 } else { 1 };
let disconnect_immediately = false;
match self
.send_request_using(&request, connections, send_to_all, concurrency, disconnect_immediately)
.send_request_using(&request, connections, send_to_all, concurrency)
.await
{
Ok(response) => Ok(response),
// If we failed the request using only the active connections, try again using all connections.
Err(_) => {
warn!("Failed to send the request using active connections, trying all connections.");
let connections = self.connection_manager.get_all_connections();
// The concurrency here must be `1`, because we are trying out connections that aren't maintained
// which means we might break the max connections rule.
// We will at most we will break this rule by `1` (have `max_connected + 1` open connections).
let concurrency = 1;
let disconnect_immediately = true;
match self
.send_request_using(&request, connections, send_to_all, concurrency, disconnect_immediately)
.send_request_using(&request, connections, send_to_all, concurrency)
.await
{
Ok(response) => Ok(response),
Expand All @@ -344,16 +343,17 @@ impl ElectrumClient {
) -> Result<JsonRpcResponseEnum, JsonRpcErrorType> {
// Whether to force the connection to be established (if not) before sending the request.
let force_connect = !matches!(request, JsonRpcRequestEnum::Single(ref req) if NO_FORCE_CONNECT_METHODS.contains(&req.method.as_str()));
let json = json::to_string(&request).map_err(|err| JsonRpcErrorType::InvalidRequest(err.to_string()))?;

let connection = self
.connection_manager
.get_connection_by_address(&to_addr, force_connect)
.await
.map_err(|err| JsonRpcErrorType::Internal(err.to_string()))?;

let json = json::to_string(&request).map_err(|err| JsonRpcErrorType::InvalidRequest(err.to_string()))?;
let response = connection
.electrum_request(json, request.rpc_id(), ELECTRUM_REQUEST_TIMEOUT)
.await?;
.await;
// If the request was not forcefully connected, we shouldn't inform the connection manager that it's
// not needed anymore, as we didn't force spawn it in the first place.
// This fixes dropping the connection after the version check request, as we don't mark the connection
Expand All @@ -363,7 +363,7 @@ impl ElectrumClient {
self.connection_manager.not_needed(&to_addr).await;
}

Ok(response)
response
}

/// Sends a JSONRPC request to all the given connections in parallel and returns
Expand All @@ -377,7 +377,6 @@ impl ElectrumClient {
connections: Vec<Arc<ElectrumConnection>>,
send_to_all: bool,
max_concurrency: u32,
disconnect_immediately: bool,
) -> Result<(JsonRpcRemoteAddr, JsonRpcResponseEnum), Vec<(JsonRpcRemoteAddr, JsonRpcErrorType)>> {
let max_concurrency = max_concurrency.max(1) as usize;
// Create the request
Expand Down Expand Up @@ -405,7 +404,7 @@ impl ElectrumClient {
}
}))
});
let event_handlers = self.event_handlers.clone();
let client = self.clone();
let mut final_response = None;
let mut errors = Vec::new();
// Iterate over the request chunks sequentially.
Expand All @@ -418,14 +417,7 @@ impl ElectrumClient {
if final_response.is_none() {
final_response = Some((address, response));
}
if disconnect_immediately {
connection
.disconnect(Some(ElectrumConnectionErr::Temporary(
"Was used as a fallback and not needed now.".to_string(),
)))
.await;
event_handlers.on_disconnected(connection.address()).ok();
}
client.connection_manager.not_needed(connection.address()).await;
if !send_to_all && final_response.is_some() {
return Ok(final_response.unwrap());
}
Expand All @@ -437,7 +429,7 @@ impl ElectrumClient {
"Forcefully disconnected for erroring: {e:?}."
))))
.await;
event_handlers.on_disconnected(connection.address()).ok();
client.event_handlers.on_disconnected(connection.address()).ok();
errors.push((address, e))
},
}
Expand Down

0 comments on commit 18a430e

Please sign in to comment.