Skip to content

Commit

Permalink
suggestion from omar - 1
Browse files Browse the repository at this point in the history
  • Loading branch information
mariocynicys committed Sep 12, 2024
1 parent eead6a8 commit ffd2052
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 11 deletions.
22 changes: 17 additions & 5 deletions mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,11 @@ impl ElectrumClient {
let request = (req_id, request);
// Use the active connections for this request.
let connections = self.connection_manager.get_active_connections().await;
// Maximum number of connections to establish or use in request concurrently.
// Could be up to connections.len().
let concurrency = 1;
// Maximum number of connections to establish or use in request concurrently. Could be up to connections.len().
let concurrency = if send_to_all { connections.len() as u32 } else { 1 };
let disconnect_immediately = false;
match self
.send_request_using(&request, connections, send_to_all, concurrency)
.send_request_using(&request, connections, send_to_all, concurrency, disconnect_immediately)
.await
{
Ok(response) => Ok(response),
Expand All @@ -322,8 +322,9 @@ impl ElectrumClient {
// which means we might break the max connections rule.
// We will at most we will break this rule by `1` (have `max_connected + 1` open connections).
let concurrency = 1;
let disconnect_immediately = true;
match self
.send_request_using(&request, connections, send_to_all, concurrency)
.send_request_using(&request, connections, send_to_all, concurrency, disconnect_immediately)
.await
{
Ok(response) => Ok(response),
Expand Down Expand Up @@ -353,6 +354,8 @@ impl ElectrumClient {
let response = connection
.electrum_request(json, request.rpc_id(), ELECTRUM_REQUEST_TIMEOUT)
.await?;
// Inform the connection manager that the connection was queried and no longer needed now.
self.connection_manager.not_needed(&to_addr).await;

Ok(response)
}
Expand All @@ -368,6 +371,7 @@ impl ElectrumClient {
connections: Vec<Arc<ElectrumConnection>>,
send_to_all: bool,
max_concurrency: u32,
disconnect_immediately: bool,
) -> Result<(JsonRpcRemoteAddr, JsonRpcResponseEnum), Vec<(JsonRpcRemoteAddr, JsonRpcErrorType)>> {
let max_concurrency = max_concurrency.max(1) as usize;
// Create the request
Expand Down Expand Up @@ -408,6 +412,14 @@ impl ElectrumClient {
if final_response.is_none() {
final_response = Some((address, response));
}
if disconnect_immediately {
connection
.disconnect(Some(ElectrumConnectionErr::Temporary(
"Was used as a fallback and not needed now.".to_string(),
)))
.await;
event_handlers.on_disconnected(connection.address()).ok();
}
if !send_to_all && final_response.is_some() {
return Ok(final_response.unwrap());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ impl ConnectionManager {
}
}

// Handles the connection event.
/// Handles the connection event.
pub fn on_connected(&self, server_address: &str) {
let all_connections = self.read_connections();
let connection_ctx = unwrap_or_return!(all_connections.get(server_address));
Expand All @@ -275,7 +275,7 @@ impl ConnectionManager {
connection_ctx.connected();
}

// Handles the disconnection event from an Electrum server.
/// Handles the disconnection event from an Electrum server.
pub fn on_disconnected(&self, server_address: &str) {
let all_connections = self.read_connections();
let connection_ctx = unwrap_or_return!(all_connections.get(server_address));
Expand All @@ -296,6 +296,24 @@ impl ConnectionManager {
client.subscribe_addresses(abandoned_subs).ok();
}

/// A method that should be called after using a specific server for some request.
///
/// Instead of disconnecting the connection right away, this method will only disconnect it
/// if it's not in the maintained connections set.
pub async fn not_needed(&self, server_address: &str) {
let (id, connection) = {
let all_connections = self.read_connections();
let connection_ctx = unwrap_or_return!(all_connections.get(server_address));
(connection_ctx.id, connection_ctx.connection.clone())
};
if !self.read_maintained_connections().contains_key(&id) {
connection
.disconnect(Some(ElectrumConnectionErr::Temporary("Not needed anymore".to_string())))
.await;
self.on_disconnected(connection.address());
}
}

/// Remove a connection from the connection manager by its address.
// TODO(feat): Add the ability to add a connection during runtime.
pub async fn remove_connection(
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/utxo/rpc_clients/electrum_rpc/constants.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/// The timeout for the electrum server to respond to a request.
pub const ELECTRUM_REQUEST_TIMEOUT: f64 = 60.;
pub const ELECTRUM_REQUEST_TIMEOUT: f64 = 20.;
/// The default (can be overridden) maximum timeout to establish a connection with the electrum server.
/// This included connecting to the server and querying the server version.
pub const DEFAULT_CONNECTION_ESTABLISHMENT_TIMEOUT: f64 = 60.;
Expand Down
3 changes: 0 additions & 3 deletions mm2src/coins/utxo/utxo_builder/utxo_coin_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ pub enum UtxoCoinBuildError {
)]
CoinDoesntSupportTrezor,
BlockHeaderStorageError(BlockHeaderStorageError),
// TODO: this isn't used
#[display(fmt = "Error {} on getting the height of the latest block from rpc!", _0)]
CantGetBlockCount(String),
#[display(fmt = "Internal error: {}", _0)]
Internal(String),
#[display(fmt = "SPV params verificaiton failed. Error: {_0}")]
Expand Down

0 comments on commit ffd2052

Please sign in to comment.