From 74c8bea8ddbf057d62c6def473ac5dd763ced926 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Wed, 25 Sep 2024 07:51:24 +0300 Subject: [PATCH] nit fixes Signed-off-by: onur-ozkan --- mm2src/mm2_core/src/mm_ctx.rs | 24 +++++--------- mm2src/mm2_main/src/lp_healthcheck.rs | 45 ++++++++++++++++----------- 2 files changed, 34 insertions(+), 35 deletions(-) diff --git a/mm2src/mm2_core/src/mm_ctx.rs b/mm2src/mm2_core/src/mm_ctx.rs index 98864df40f..8d5969332f 100644 --- a/mm2src/mm2_core/src/mm_ctx.rs +++ b/mm2src/mm2_core/src/mm_ctx.rs @@ -45,15 +45,7 @@ cfg_native! { /// Default interval to export and record metrics to log. const EXPORT_METRICS_INTERVAL: f64 = 5. * 60.; -mod healthcheck_defaults { - pub(crate) const fn default_healthcheck_blocking_ms() -> u64 { 750 } - - pub(crate) const fn default_healthcheck_message_expiration_secs() -> i64 { 10 } - - pub(crate) const fn default_timeout_secs() -> u64 { 10 } -} - -pub struct Healthcheck { +pub struct HealthChecker { /// Links the RPC context to the P2P context to handle health check responses. pub response_handler: AsyncMutex>>, /// This is used to record healthcheck sender peers in an expirable manner to prevent DDoS attacks. @@ -68,7 +60,7 @@ pub struct HealthcheckConfig { pub blocking_ms_for_per_address: u64, /// Lifetime of the message. /// Do not change this unless you know what you are doing. - pub message_expiration: i64, + pub message_expiration: u64, /// Maximum time (milliseconds) to wait for healthcheck response. pub timeout_secs: u64, } @@ -76,9 +68,9 @@ pub struct HealthcheckConfig { impl Default for HealthcheckConfig { fn default() -> Self { Self { - blocking_ms_for_per_address: healthcheck_defaults::default_healthcheck_blocking_ms(), - message_expiration: healthcheck_defaults::default_healthcheck_message_expiration_secs(), - timeout_secs: healthcheck_defaults::default_timeout_secs(), + blocking_ms_for_per_address: 750, + message_expiration: 10, + timeout_secs: 10, } } } @@ -183,7 +175,7 @@ pub struct MmCtx { /// asynchronous handle for rusqlite connection. #[cfg(not(target_arch = "wasm32"))] pub async_sqlite_connection: Constructible>>, - pub healthcheck: Healthcheck, + pub health_checker: HealthChecker, } impl MmCtx { @@ -233,7 +225,7 @@ impl MmCtx { nft_ctx: Mutex::new(None), #[cfg(not(target_arch = "wasm32"))] async_sqlite_connection: Constructible::default(), - healthcheck: Healthcheck { + health_checker: HealthChecker { response_handler: AsyncMutex::new(ExpirableMap::default()), ddos_shield: AsyncMutex::new(ExpirableMap::default()), config: HealthcheckConfig::default(), @@ -828,7 +820,7 @@ impl MmCtxBuilder { if !healthcheck_config.is_null() { let healthcheck_config: HealthcheckConfig = json::from_value(healthcheck_config.clone()).expect("Invalid json value in 'healthcheck_config'."); - ctx.healthcheck.config = healthcheck_config; + ctx.health_checker.config = healthcheck_config; } } diff --git a/mm2src/mm2_main/src/lp_healthcheck.rs b/mm2src/mm2_main/src/lp_healthcheck.rs index b7eddb3d70..833e43938e 100644 --- a/mm2src/mm2_main/src/lp_healthcheck.rs +++ b/mm2src/mm2_main/src/lp_healthcheck.rs @@ -11,8 +11,9 @@ use mm2_libp2p::{decode_message, encode_message, pub_sub_topic, Libp2pPublic, Pe use mm2_net::p2p::P2PContext; use ser_error_derive::SerializeErrorType; use serde::{Deserialize, Serialize}; +use std::convert::TryInto; +use std::num::TryFromIntError; use std::str::FromStr; -use std::sync::OnceLock; use crate::lp_network::broadcast_p2p_msg; @@ -228,14 +229,15 @@ where pub enum HealthcheckRpcError { MessageGenerationFailed { reason: String }, MessageEncodingFailed { reason: String }, + Internal { reason: String }, } impl HttpStatusCode for HealthcheckRpcError { fn status_code(&self) -> common::StatusCode { match self { - HealthcheckRpcError::MessageGenerationFailed { .. } | HealthcheckRpcError::MessageEncodingFailed { .. } => { - StatusCode::INTERNAL_SERVER_ERROR - }, + HealthcheckRpcError::MessageGenerationFailed { .. } + | HealthcheckRpcError::Internal { .. } + | HealthcheckRpcError::MessageEncodingFailed { .. } => StatusCode::INTERNAL_SERVER_ERROR, } } } @@ -244,12 +246,9 @@ pub async fn peer_connection_healthcheck_rpc( ctx: MmArc, req: RequestPayload, ) -> Result> { - /// When things go awry, we want records to clear themselves to keep the memory clean of unused data. - /// This is unrelated to the timeout logic. - static ADDRESS_RECORD_EXPIRATION: OnceLock = OnceLock::new(); - - let address_record_exp = - ADDRESS_RECORD_EXPIRATION.get_or_init(|| Duration::from_secs(ctx.healthcheck.config.timeout_secs)); + // When things go awry, we want records to clear themselves to keep the memory clean of unused data. + // This is unrelated to the timeout logic. + let address_record_exp = Duration::from_secs(ctx.health_checker.config.timeout_secs); let target_peer_id = req.peer_id; @@ -259,9 +258,17 @@ pub async fn peer_connection_healthcheck_rpc( return Ok(true); } - let message = - HealthcheckMessage::generate_message(&ctx, target_peer_id, false, ctx.healthcheck.config.message_expiration) - .map_err(|reason| HealthcheckRpcError::MessageGenerationFailed { reason })?; + let message = HealthcheckMessage::generate_message( + &ctx, + target_peer_id, + false, + ctx.health_checker + .config + .message_expiration + .try_into() + .map_err(|e: TryFromIntError| HealthcheckRpcError::Internal { reason: e.to_string() })?, + ) + .map_err(|reason| HealthcheckRpcError::MessageGenerationFailed { reason })?; let encoded_message = message .encode() @@ -269,14 +276,14 @@ pub async fn peer_connection_healthcheck_rpc( let (tx, rx): (Sender<()>, Receiver<()>) = oneshot::channel(); - let mut book = ctx.healthcheck.response_handler.lock().await; + let mut book = ctx.health_checker.response_handler.lock().await; book.clear_expired_entries(); - book.insert(target_peer_id.to_string(), tx, *address_record_exp); + book.insert(target_peer_id.to_string(), tx, address_record_exp); drop(book); broadcast_p2p_msg(&ctx, peer_healthcheck_topic(&target_peer_id), encoded_message, None); - let timeout_duration = Duration::from_secs(ctx.healthcheck.config.timeout_secs); + let timeout_duration = Duration::from_secs(ctx.health_checker.config.timeout_secs); Ok(rx.timeout(timeout_duration).await == Ok(Ok(()))) } @@ -300,13 +307,13 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li let sender_peer = data.sender_peer().to_owned(); - let mut ddos_shield = ctx.healthcheck.ddos_shield.lock().await; + let mut ddos_shield = ctx.health_checker.ddos_shield.lock().await; ddos_shield.clear_expired_entries(); if ddos_shield .insert( sender_peer.to_string(), (), - Duration::from_millis(ctx.healthcheck.config.blocking_ms_for_per_address), + Duration::from_millis(ctx.health_checker.config.blocking_ms_for_per_address), ) .is_some() { @@ -345,7 +352,7 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li broadcast_p2p_msg(&ctx_c, topic, encoded_msg, None); } else { // The requested peer is healthy; signal the response channel. - let mut response_handler = ctx_c.healthcheck.response_handler.lock().await; + let mut response_handler = ctx_c.health_checker.response_handler.lock().await; if let Some(tx) = response_handler.remove(&sender_peer.to_string()) { if tx.send(()).is_err() { log::error!("Result channel isn't present for peer '{sender_peer}'.");