Skip to content

Commit

Permalink
switch back to async mutexa again
Browse files Browse the repository at this point in the history
Signed-off-by: onur-ozkan <[email protected]>
  • Loading branch information
onur-ozkan committed Sep 26, 2024
1 parent b4d63f6 commit 8727e52
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
10 changes: 5 additions & 5 deletions mm2src/mm2_core/src/mm_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use common::{executor::{abortable_queue::{AbortableQueue, WeakSpawner},
graceful_shutdown, AbortSettings, AbortableSystem, SpawnAbortable, SpawnFuture},
expirable_map::ExpirableMap};
use futures::channel::oneshot;
use futures::lock::Mutex as AsyncMutex;
use gstuff::{try_s, Constructible, ERR, ERRL};
use lazy_static::lazy_static;
use mm2_event_stream::{controller::Controller, Event, EventStreamConfiguration};
Expand Down Expand Up @@ -39,17 +40,16 @@ cfg_native! {
use std::net::{IpAddr, SocketAddr, AddrParseError};
use std::path::{Path, PathBuf};
use std::sync::MutexGuard;
use futures::lock::Mutex as AsyncMutex;
}

/// Default interval to export and record metrics to log.
const EXPORT_METRICS_INTERVAL: f64 = 5. * 60.;

pub struct HealthChecker {
/// Links the RPC context to the P2P context to handle health check responses.
pub response_handler: Mutex<ExpirableMap<String, oneshot::Sender<()>>>,
pub response_handler: AsyncMutex<ExpirableMap<String, oneshot::Sender<()>>>,
/// This is used to record healthcheck sender peers in an expirable manner to prevent DDoS attacks.
pub ddos_shield: Mutex<ExpirableMap<String, ()>>,
pub ddos_shield: AsyncMutex<ExpirableMap<String, ()>>,
pub config: HealthcheckConfig,
}

Expand Down Expand Up @@ -226,8 +226,8 @@ impl MmCtx {
#[cfg(not(target_arch = "wasm32"))]
async_sqlite_connection: Constructible::default(),
health_checker: HealthChecker {
response_handler: Mutex::new(ExpirableMap::default()),
ddos_shield: Mutex::new(ExpirableMap::default()),
response_handler: AsyncMutex::new(ExpirableMap::default()),
ddos_shield: AsyncMutex::new(ExpirableMap::default()),
config: HealthcheckConfig::default(),
},
}
Expand Down
6 changes: 3 additions & 3 deletions mm2src/mm2_main/src/lp_healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ pub async fn peer_connection_healthcheck_rpc(
let (tx, rx): (Sender<()>, Receiver<()>) = oneshot::channel();

{
let mut book = ctx.health_checker.response_handler.lock().unwrap();
let mut book = ctx.health_checker.response_handler.lock().await;
book.clear_expired_entries();
book.insert(target_peer_address.to_string(), tx, address_record_exp);
}
Expand Down Expand Up @@ -357,7 +357,7 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li
return;
};

let mut ddos_shield = ctx.health_checker.ddos_shield.lock().unwrap();
let mut ddos_shield = ctx.health_checker.ddos_shield.lock().await;
ddos_shield.clear_expired_entries();
if ddos_shield
.insert(
Expand Down Expand Up @@ -390,7 +390,7 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li
broadcast_p2p_msg(&ctx, topic, encoded_msg, None);
} else {
// The requested peer is healthy; signal the response channel.
let mut response_handler = ctx.health_checker.response_handler.lock().unwrap();
let mut response_handler = ctx.health_checker.response_handler.lock().await;
if let Some(tx) = response_handler.remove(&sender_peer.to_string()) {
if tx.send(()).is_err() {
log::error!("Result channel isn't present for peer '{sender_peer}'.");
Expand Down

0 comments on commit 8727e52

Please sign in to comment.