Skip to content

Commit

Permalink
pack healthcheck related Ctx fields
Browse files Browse the repository at this point in the history
Signed-off-by: onur-ozkan <[email protected]>
  • Loading branch information
onur-ozkan committed Sep 18, 2024
1 parent 5b39e75 commit 7fccb15
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 17 deletions.
25 changes: 15 additions & 10 deletions mm2src/mm2_core/src/mm_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ mod healthcheck_defaults {
pub(crate) const fn default_timeout_secs() -> u64 { 10 }
}

pub struct Healthcheck {
/// Links the RPC context to the P2P context to handle health check responses.
pub response_handler: AsyncMutex<ExpirableMap<String, oneshot::Sender<()>>>,
/// This is used to record healthcheck sender peers in an expirable manner to prevent brute-force attacks.
pub bruteforce_shield: AsyncMutex<ExpirableMap<String, ()>>,
pub config: HealthcheckConfig,
}

#[derive(Debug, Deserialize)]
#[serde(default)]
pub struct HealthcheckConfig {
Expand Down Expand Up @@ -175,12 +183,7 @@ pub struct MmCtx {
/// asynchronous handle for rusqlite connection.
#[cfg(not(target_arch = "wasm32"))]
pub async_sqlite_connection: Constructible<Arc<AsyncMutex<AsyncConnection>>>,
/// Links the RPC context to the P2P context to handle health check responses.
pub healthcheck_response_handler: AsyncMutex<ExpirableMap<String, oneshot::Sender<()>>>,
/// This is used to record healthcheck sender peers in an expirable manner to prevent brute-force attacks.
pub healthcheck_bruteforce_shield: AsyncMutex<ExpirableMap<String, ()>>,
/// Global configuration of healthchecks.
pub healthcheck_config: HealthcheckConfig,
pub healthcheck: Healthcheck,
}

impl MmCtx {
Expand Down Expand Up @@ -230,9 +233,11 @@ impl MmCtx {
nft_ctx: Mutex::new(None),
#[cfg(not(target_arch = "wasm32"))]
async_sqlite_connection: Constructible::default(),
healthcheck_response_handler: AsyncMutex::new(ExpirableMap::default()),
healthcheck_bruteforce_shield: AsyncMutex::new(ExpirableMap::default()),
healthcheck_config: HealthcheckConfig::default(),
healthcheck: Healthcheck {
response_handler: AsyncMutex::new(ExpirableMap::default()),
bruteforce_shield: AsyncMutex::new(ExpirableMap::default()),
config: HealthcheckConfig::default(),
},
}
}

Expand Down Expand Up @@ -807,7 +812,7 @@ impl MmCtxBuilder {
if !healthcheck_config.is_null() {
let healthcheck_config: HealthcheckConfig =
json::from_value(healthcheck_config.clone()).expect("Invalid json value in 'healthcheck_config'.");
ctx.healthcheck_config = healthcheck_config;
ctx.healthcheck.config = healthcheck_config;
}
}

Expand Down
8 changes: 4 additions & 4 deletions mm2src/mm2_main/src/lp_healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ pub async fn peer_connection_healthcheck_rpc(
static ADDRESS_RECORD_EXPIRATION: OnceLock<Duration> = OnceLock::new();

let address_record_exp =
ADDRESS_RECORD_EXPIRATION.get_or_init(|| Duration::from_secs(ctx.healthcheck_config.timeout_secs));
ADDRESS_RECORD_EXPIRATION.get_or_init(|| Duration::from_secs(ctx.healthcheck.config.timeout_secs));

let target_peer_id = PeerId::from_str(&req.peer_id)
.map_err(|e| HealthcheckRpcError::InvalidPeerAddress { reason: e.to_string() })?;
Expand All @@ -174,7 +174,7 @@ pub async fn peer_connection_healthcheck_rpc(
}

let message =
HealthcheckMessage::generate_message(&ctx, target_peer_id, false, ctx.healthcheck_config.message_expiration)
HealthcheckMessage::generate_message(&ctx, target_peer_id, false, ctx.healthcheck.config.message_expiration)
.map_err(|reason| HealthcheckRpcError::MessageGenerationFailed { reason })?;

let encoded_message = message
Expand All @@ -183,14 +183,14 @@ pub async fn peer_connection_healthcheck_rpc(

let (tx, rx): (Sender<()>, Receiver<()>) = oneshot::channel();

let mut book = ctx.healthcheck_response_handler.lock().await;
let mut book = ctx.healthcheck.response_handler.lock().await;
book.clear_expired_entries();
book.insert(target_peer_id.to_string(), tx, *address_record_exp);
drop(book);

broadcast_p2p_msg(&ctx, peer_healthcheck_topic(&target_peer_id), encoded_message, None);

let timeout_duration = Duration::from_secs(ctx.healthcheck_config.timeout_secs);
let timeout_duration = Duration::from_secs(ctx.healthcheck.config.timeout_secs);
Ok(rx.timeout(timeout_duration).await == Ok(Ok(())))
}

Expand Down
6 changes: 3 additions & 3 deletions mm2src/mm2_main/src/lp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,13 @@ async fn process_p2p_message(

let sender_peer = data.sender_peer().to_owned();

let mut bruteforce_shield = ctx.healthcheck_bruteforce_shield.lock().await;
let mut bruteforce_shield = ctx.healthcheck.bruteforce_shield.lock().await;
bruteforce_shield.clear_expired_entries();
if bruteforce_shield
.insert(
sender_peer.clone(),
(),
Duration::from_millis(ctx.healthcheck_config.blocking_ms_for_per_address),
Duration::from_millis(ctx.healthcheck.config.blocking_ms_for_per_address),
)
.is_some()
{
Expand Down Expand Up @@ -280,7 +280,7 @@ async fn process_p2p_message(
broadcast_p2p_msg(&ctx, topic, encoded_msg, None);
} else {
// The requested peer is healthy; signal the response channel.
let mut response_handler = ctx.healthcheck_response_handler.lock().await;
let mut response_handler = ctx.healthcheck.response_handler.lock().await;
if let Some(tx) = response_handler.remove(&sender_peer) {
if tx.send(()).is_err() {
log::error!("Result channel isn't present for peer '{sender_peer}'.");
Expand Down

0 comments on commit 7fccb15

Please sign in to comment.