Skip to content

Commit

Permalink
nit fixes
Browse files Browse the repository at this point in the history
Signed-off-by: onur-ozkan <[email protected]>
  • Loading branch information
onur-ozkan committed Sep 25, 2024
1 parent 0d46b36 commit 74c8bea
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 35 deletions.
24 changes: 8 additions & 16 deletions mm2src/mm2_core/src/mm_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,7 @@ cfg_native! {
/// Default interval to export and record metrics to log.
const EXPORT_METRICS_INTERVAL: f64 = 5. * 60.;

mod healthcheck_defaults {
pub(crate) const fn default_healthcheck_blocking_ms() -> u64 { 750 }

pub(crate) const fn default_healthcheck_message_expiration_secs() -> i64 { 10 }

pub(crate) const fn default_timeout_secs() -> u64 { 10 }
}

pub struct Healthcheck {
pub struct HealthChecker {
/// Links the RPC context to the P2P context to handle health check responses.
pub response_handler: AsyncMutex<ExpirableMap<String, oneshot::Sender<()>>>,
/// This is used to record healthcheck sender peers in an expirable manner to prevent DDoS attacks.
Expand All @@ -68,17 +60,17 @@ pub struct HealthcheckConfig {
pub blocking_ms_for_per_address: u64,
/// Lifetime of the message.
/// Do not change this unless you know what you are doing.
pub message_expiration: i64,
pub message_expiration: u64,
/// Maximum time (milliseconds) to wait for healthcheck response.
pub timeout_secs: u64,
}

impl Default for HealthcheckConfig {
fn default() -> Self {
Self {
blocking_ms_for_per_address: healthcheck_defaults::default_healthcheck_blocking_ms(),
message_expiration: healthcheck_defaults::default_healthcheck_message_expiration_secs(),
timeout_secs: healthcheck_defaults::default_timeout_secs(),
blocking_ms_for_per_address: 750,
message_expiration: 10,
timeout_secs: 10,
}
}
}
Expand Down Expand Up @@ -183,7 +175,7 @@ pub struct MmCtx {
/// asynchronous handle for rusqlite connection.
#[cfg(not(target_arch = "wasm32"))]
pub async_sqlite_connection: Constructible<Arc<AsyncMutex<AsyncConnection>>>,
pub healthcheck: Healthcheck,
pub health_checker: HealthChecker,
}

impl MmCtx {
Expand Down Expand Up @@ -233,7 +225,7 @@ impl MmCtx {
nft_ctx: Mutex::new(None),
#[cfg(not(target_arch = "wasm32"))]
async_sqlite_connection: Constructible::default(),
healthcheck: Healthcheck {
health_checker: HealthChecker {
response_handler: AsyncMutex::new(ExpirableMap::default()),
ddos_shield: AsyncMutex::new(ExpirableMap::default()),
config: HealthcheckConfig::default(),
Expand Down Expand Up @@ -828,7 +820,7 @@ impl MmCtxBuilder {
if !healthcheck_config.is_null() {
let healthcheck_config: HealthcheckConfig =
json::from_value(healthcheck_config.clone()).expect("Invalid json value in 'healthcheck_config'.");
ctx.healthcheck.config = healthcheck_config;
ctx.health_checker.config = healthcheck_config;
}
}

Expand Down
45 changes: 26 additions & 19 deletions mm2src/mm2_main/src/lp_healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use mm2_libp2p::{decode_message, encode_message, pub_sub_topic, Libp2pPublic, Pe
use mm2_net::p2p::P2PContext;
use ser_error_derive::SerializeErrorType;
use serde::{Deserialize, Serialize};
use std::convert::TryInto;
use std::num::TryFromIntError;
use std::str::FromStr;
use std::sync::OnceLock;

use crate::lp_network::broadcast_p2p_msg;

Expand Down Expand Up @@ -228,14 +229,15 @@ where
pub enum HealthcheckRpcError {
MessageGenerationFailed { reason: String },
MessageEncodingFailed { reason: String },
Internal { reason: String },
}

impl HttpStatusCode for HealthcheckRpcError {
fn status_code(&self) -> common::StatusCode {
match self {
HealthcheckRpcError::MessageGenerationFailed { .. } | HealthcheckRpcError::MessageEncodingFailed { .. } => {
StatusCode::INTERNAL_SERVER_ERROR
},
HealthcheckRpcError::MessageGenerationFailed { .. }
| HealthcheckRpcError::Internal { .. }
| HealthcheckRpcError::MessageEncodingFailed { .. } => StatusCode::INTERNAL_SERVER_ERROR,
}
}
}
Expand All @@ -244,12 +246,9 @@ pub async fn peer_connection_healthcheck_rpc(
ctx: MmArc,
req: RequestPayload,
) -> Result<bool, MmError<HealthcheckRpcError>> {
/// When things go awry, we want records to clear themselves to keep the memory clean of unused data.
/// This is unrelated to the timeout logic.
static ADDRESS_RECORD_EXPIRATION: OnceLock<Duration> = OnceLock::new();

let address_record_exp =
ADDRESS_RECORD_EXPIRATION.get_or_init(|| Duration::from_secs(ctx.healthcheck.config.timeout_secs));
// When things go awry, we want records to clear themselves to keep the memory clean of unused data.
// This is unrelated to the timeout logic.
let address_record_exp = Duration::from_secs(ctx.health_checker.config.timeout_secs);

let target_peer_id = req.peer_id;

Expand All @@ -259,24 +258,32 @@ pub async fn peer_connection_healthcheck_rpc(
return Ok(true);
}

let message =
HealthcheckMessage::generate_message(&ctx, target_peer_id, false, ctx.healthcheck.config.message_expiration)
.map_err(|reason| HealthcheckRpcError::MessageGenerationFailed { reason })?;
let message = HealthcheckMessage::generate_message(
&ctx,
target_peer_id,
false,
ctx.health_checker
.config
.message_expiration
.try_into()
.map_err(|e: TryFromIntError| HealthcheckRpcError::Internal { reason: e.to_string() })?,
)
.map_err(|reason| HealthcheckRpcError::MessageGenerationFailed { reason })?;

let encoded_message = message
.encode()
.map_err(|e| HealthcheckRpcError::MessageEncodingFailed { reason: e.to_string() })?;

let (tx, rx): (Sender<()>, Receiver<()>) = oneshot::channel();

let mut book = ctx.healthcheck.response_handler.lock().await;
let mut book = ctx.health_checker.response_handler.lock().await;
book.clear_expired_entries();
book.insert(target_peer_id.to_string(), tx, *address_record_exp);
book.insert(target_peer_id.to_string(), tx, address_record_exp);
drop(book);

broadcast_p2p_msg(&ctx, peer_healthcheck_topic(&target_peer_id), encoded_message, None);

let timeout_duration = Duration::from_secs(ctx.healthcheck.config.timeout_secs);
let timeout_duration = Duration::from_secs(ctx.health_checker.config.timeout_secs);
Ok(rx.timeout(timeout_duration).await == Ok(Ok(())))
}

Expand All @@ -300,13 +307,13 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li

let sender_peer = data.sender_peer().to_owned();

let mut ddos_shield = ctx.healthcheck.ddos_shield.lock().await;
let mut ddos_shield = ctx.health_checker.ddos_shield.lock().await;
ddos_shield.clear_expired_entries();
if ddos_shield
.insert(
sender_peer.to_string(),
(),
Duration::from_millis(ctx.healthcheck.config.blocking_ms_for_per_address),
Duration::from_millis(ctx.health_checker.config.blocking_ms_for_per_address),
)
.is_some()
{
Expand Down Expand Up @@ -345,7 +352,7 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li
broadcast_p2p_msg(&ctx_c, topic, encoded_msg, None);
} else {
// The requested peer is healthy; signal the response channel.
let mut response_handler = ctx_c.healthcheck.response_handler.lock().await;
let mut response_handler = ctx_c.health_checker.response_handler.lock().await;
if let Some(tx) = response_handler.remove(&sender_peer.to_string()) {
if tx.send(()).is_err() {
log::error!("Result channel isn't present for peer '{sender_peer}'.");
Expand Down

0 comments on commit 74c8bea

Please sign in to comment.