Skip to content

Commit

Permalink
switch to sync Mutex
Browse files Browse the repository at this point in the history
Signed-off-by: onur-ozkan <[email protected]>
  • Loading branch information
onur-ozkan committed Sep 25, 2024
1 parent 74c8bea commit de61cc2
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 10 deletions.
8 changes: 4 additions & 4 deletions mm2src/mm2_core/src/mm_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ const EXPORT_METRICS_INTERVAL: f64 = 5. * 60.;

pub struct HealthChecker {
/// Links the RPC context to the P2P context to handle health check responses.
pub response_handler: AsyncMutex<ExpirableMap<String, oneshot::Sender<()>>>,
pub response_handler: Mutex<ExpirableMap<String, oneshot::Sender<()>>>,
/// This is used to record healthcheck sender peers in an expirable manner to prevent DDoS attacks.
pub ddos_shield: AsyncMutex<ExpirableMap<String, ()>>,
pub ddos_shield: Mutex<ExpirableMap<String, ()>>,
pub config: HealthcheckConfig,
}

Expand Down Expand Up @@ -226,8 +226,8 @@ impl MmCtx {
#[cfg(not(target_arch = "wasm32"))]
async_sqlite_connection: Constructible::default(),
health_checker: HealthChecker {
response_handler: AsyncMutex::new(ExpirableMap::default()),
ddos_shield: AsyncMutex::new(ExpirableMap::default()),
response_handler: Mutex::new(ExpirableMap::default()),
ddos_shield: Mutex::new(ExpirableMap::default()),
config: HealthcheckConfig::default(),
},
}
Expand Down
13 changes: 7 additions & 6 deletions mm2src/mm2_main/src/lp_healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,11 @@ pub async fn peer_connection_healthcheck_rpc(

let (tx, rx): (Sender<()>, Receiver<()>) = oneshot::channel();

let mut book = ctx.health_checker.response_handler.lock().await;
book.clear_expired_entries();
book.insert(target_peer_id.to_string(), tx, address_record_exp);
drop(book);
{
let mut book = ctx.health_checker.response_handler.lock().unwrap();
book.clear_expired_entries();
book.insert(target_peer_id.to_string(), tx, address_record_exp);
}

broadcast_p2p_msg(&ctx, peer_healthcheck_topic(&target_peer_id), encoded_message, None);

Expand Down Expand Up @@ -307,7 +308,7 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li

let sender_peer = data.sender_peer().to_owned();

let mut ddos_shield = ctx.health_checker.ddos_shield.lock().await;
let mut ddos_shield = ctx.health_checker.ddos_shield.lock().unwrap();
ddos_shield.clear_expired_entries();
if ddos_shield
.insert(
Expand Down Expand Up @@ -352,7 +353,7 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li
broadcast_p2p_msg(&ctx_c, topic, encoded_msg, None);
} else {
// The requested peer is healthy; signal the response channel.
let mut response_handler = ctx_c.health_checker.response_handler.lock().await;
let mut response_handler = ctx_c.health_checker.response_handler.lock().unwrap();
if let Some(tx) = response_handler.remove(&sender_peer.to_string()) {
if tx.send(()).is_err() {
log::error!("Result channel isn't present for peer '{sender_peer}'.");
Expand Down

0 comments on commit de61cc2

Please sign in to comment.