Skip to content

Commit

Permalink
separate the healthcheck processing logic and improve the performance
Browse files Browse the repository at this point in the history
Signed-off-by: onur-ozkan <[email protected]>
  • Loading branch information
onur-ozkan committed Sep 19, 2024
1 parent 1777559 commit 13fe925
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 70 deletions.
78 changes: 78 additions & 0 deletions mm2src/mm2_main/src/lp_healthcheck.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_std::prelude::FutureExt;
use chrono::Utc;
use common::executor::SpawnFuture;
use common::{log, HttpStatusCode, StatusCode};
use derive_more::Display;
use futures::channel::oneshot::{self, Receiver, Sender};
Expand Down Expand Up @@ -279,6 +280,83 @@ pub async fn peer_connection_healthcheck_rpc(
Ok(rx.timeout(timeout_duration).await == Ok(Ok(())))
}

pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_libp2p::GossipsubMessage) {
macro_rules! try_or_return {
($exp:expr, $msg: expr) => {
match $exp {
Ok(t) => t,
Err(e) => {
log::error!("{}, error: {e:?}", $msg);
return;
},
}
};
}

let data = try_or_return!(
HealthcheckMessage::decode(&message.data),
"Couldn't decode healthcheck message"
);

let sender_peer = data.sender_peer().to_owned();

let mut bruteforce_shield = ctx.healthcheck.bruteforce_shield.lock().await;
bruteforce_shield.clear_expired_entries();
if bruteforce_shield
.insert(
sender_peer.to_string(),
(),
Duration::from_millis(ctx.healthcheck.config.blocking_ms_for_per_address),
)
.is_some()
{
log::warn!("Peer '{sender_peer}' exceeded the healthcheck blocking time, skipping their message.");
return;
}
drop(bruteforce_shield);

let ctx_c = ctx.clone();

// Pass the remaining work to another thread to free up this one as soon as possible,
// so KDF can handle a high amount of healthcheck messages more efficiently.
ctx.spawner().spawn(async move {
let my_peer_id = P2PContext::fetch_from_mm_arc(&ctx_c).peer_id();
if !data.is_received_message_valid(my_peer_id) {
log::error!("Received an invalid healthcheck message.");
log::debug!("Message context: {:?}", data);
return;
};

if data.should_reply() {
// Reply the message so they know we are healthy.

let topic = peer_healthcheck_topic(&sender_peer);

let msg = try_or_return!(
HealthcheckMessage::generate_message(&ctx_c, sender_peer, true, 10),
"Couldn't generate the healthcheck message, this is very unusual!"
);

let encoded_msg = try_or_return!(
msg.encode(),
"Couldn't encode healthcheck message, this is very unusual!"
);

broadcast_p2p_msg(&ctx_c, topic, encoded_msg, None);
} else {
// The requested peer is healthy; signal the response channel.
let mut response_handler = ctx_c.healthcheck.response_handler.lock().await;
if let Some(tx) = response_handler.remove(&sender_peer.to_string()) {
if tx.send(()).is_err() {
log::error!("Result channel isn't present for peer '{sender_peer}'.");
};
} else {
log::info!("Peer '{sender_peer}' isn't recorded in the healthcheck response handler.");
};
}
});
}

#[cfg(any(test, target_arch = "wasm32"))]
mod tests {
use super::*;
Expand Down
72 changes: 2 additions & 70 deletions mm2src/mm2_main/src/lp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use common::executor::SpawnFuture;
use common::{log, Future01CompatExt};
use derive_more::Display;
use futures::{channel::oneshot, StreamExt};
use instant::{Duration, Instant};
use instant::Instant;
use keys::KeyPair;
use mm2_core::mm_ctx::{MmArc, MmWeak};
use mm2_err_handle::prelude::*;
Expand All @@ -38,7 +38,6 @@ use mm2_net::p2p::P2PContext;
use serde::de;
use std::net::ToSocketAddrs;

use crate::lp_healthcheck::{peer_healthcheck_topic, HealthcheckMessage};
use crate::{lp_healthcheck, lp_ordermatch, lp_stats, lp_swap};

pub type P2PRequestResult<T> = Result<T, MmError<P2PRequestError>>;
Expand Down Expand Up @@ -217,74 +216,7 @@ async fn process_p2p_message(
}
},
Some(lp_healthcheck::PEER_HEALTHCHECK_PREFIX) => {
macro_rules! try_or_return {
($exp:expr, $msg: expr) => {
match $exp {
Ok(t) => t,
Err(e) => {
log::error!("{}, error: {e:?}", $msg);
return;
},
}
};
}

let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx);
let data = try_or_return!(
HealthcheckMessage::decode(&message.data),
"Couldn't decode healthcheck message"
);

let sender_peer = data.sender_peer().to_owned();

let mut bruteforce_shield = ctx.healthcheck.bruteforce_shield.lock().await;
bruteforce_shield.clear_expired_entries();
if bruteforce_shield
.insert(
sender_peer.to_string(),
(),
Duration::from_millis(ctx.healthcheck.config.blocking_ms_for_per_address),
)
.is_some()
{
log::warn!("Peer '{sender_peer}' exceeded the healthcheck blocking time, skipping their message.");
return;
}
drop(bruteforce_shield);

if !data.is_received_message_valid(p2p_ctx.peer_id()) {
log::error!("Received an invalid healthcheck message.");
log::debug!("Message context: {:?}", data);
return;
};

if data.should_reply() {
// Reply the message so they know we are healthy.

let topic = peer_healthcheck_topic(&sender_peer);

let msg = try_or_return!(
HealthcheckMessage::generate_message(&ctx, sender_peer, true, 10),
"Couldn't generate the healthcheck message, this is very unusual!"
);

let encoded_msg = try_or_return!(
msg.encode(),
"Couldn't encode healthcheck message, this is very unusual!"
);

broadcast_p2p_msg(&ctx, topic, encoded_msg, None);
} else {
// The requested peer is healthy; signal the response channel.
let mut response_handler = ctx.healthcheck.response_handler.lock().await;
if let Some(tx) = response_handler.remove(&sender_peer.to_string()) {
if tx.send(()).is_err() {
log::error!("Result channel isn't present for peer '{sender_peer}'.");
};
} else {
log::info!("Peer '{sender_peer}' isn't recorded in the healthcheck response handler.");
};
}
lp_healthcheck::process_p2p_healthcheck_message(&ctx, message).await
},
None | Some(_) => (),
}
Expand Down

0 comments on commit 13fe925

Please sign in to comment.