diff --git a/mm2src/mm2_core/src/mm_ctx.rs b/mm2src/mm2_core/src/mm_ctx.rs index b9bc8ae3a7..54b1b0fb6b 100644 --- a/mm2src/mm2_core/src/mm_ctx.rs +++ b/mm2src/mm2_core/src/mm_ctx.rs @@ -6,6 +6,7 @@ use common::{executor::{abortable_queue::{AbortableQueue, WeakSpawner}, graceful_shutdown, AbortSettings, AbortableSystem, SpawnAbortable, SpawnFuture}, expirable_map::ExpirableMap}; use futures::channel::oneshot; +use futures::lock::Mutex as AsyncMutex; use gstuff::{try_s, Constructible, ERR, ERRL}; use lazy_static::lazy_static; use mm2_event_stream::{controller::Controller, Event, EventStreamConfiguration}; @@ -39,7 +40,6 @@ cfg_native! { use std::net::{IpAddr, SocketAddr, AddrParseError}; use std::path::{Path, PathBuf}; use std::sync::MutexGuard; - use futures::lock::Mutex as AsyncMutex; } /// Default interval to export and record metrics to log. @@ -47,9 +47,9 @@ const EXPORT_METRICS_INTERVAL: f64 = 5. * 60.; pub struct HealthChecker { /// Links the RPC context to the P2P context to handle health check responses. - pub response_handler: Mutex>>, + pub response_handler: AsyncMutex>>, /// This is used to record healthcheck sender peers in an expirable manner to prevent DDoS attacks. - pub ddos_shield: Mutex>, + pub ddos_shield: AsyncMutex>, pub config: HealthcheckConfig, } @@ -226,8 +226,8 @@ impl MmCtx { #[cfg(not(target_arch = "wasm32"))] async_sqlite_connection: Constructible::default(), health_checker: HealthChecker { - response_handler: Mutex::new(ExpirableMap::default()), - ddos_shield: Mutex::new(ExpirableMap::default()), + response_handler: AsyncMutex::new(ExpirableMap::default()), + ddos_shield: AsyncMutex::new(ExpirableMap::default()), config: HealthcheckConfig::default(), }, } diff --git a/mm2src/mm2_main/src/lp_healthcheck.rs b/mm2src/mm2_main/src/lp_healthcheck.rs index 663b57a76d..be1f80daa7 100644 --- a/mm2src/mm2_main/src/lp_healthcheck.rs +++ b/mm2src/mm2_main/src/lp_healthcheck.rs @@ -308,7 +308,7 @@ pub async fn peer_connection_healthcheck_rpc( let (tx, rx): (Sender<()>, Receiver<()>) = oneshot::channel(); { - let mut book = ctx.health_checker.response_handler.lock().unwrap(); + let mut book = ctx.health_checker.response_handler.lock().await; book.clear_expired_entries(); book.insert(target_peer_address.to_string(), tx, address_record_exp); } @@ -357,7 +357,7 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li return; }; - let mut ddos_shield = ctx.health_checker.ddos_shield.lock().unwrap(); + let mut ddos_shield = ctx.health_checker.ddos_shield.lock().await; ddos_shield.clear_expired_entries(); if ddos_shield .insert( @@ -390,7 +390,7 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li broadcast_p2p_msg(&ctx, topic, encoded_msg, None); } else { // The requested peer is healthy; signal the response channel. - let mut response_handler = ctx.health_checker.response_handler.lock().unwrap(); + let mut response_handler = ctx.health_checker.response_handler.lock().await; if let Some(tx) = response_handler.remove(&sender_peer.to_string()) { if tx.send(()).is_err() { log::error!("Result channel isn't present for peer '{sender_peer}'.");