Skip to content

Commit

Permalink
set max limit for expiration time logic
Browse files Browse the repository at this point in the history
Signed-off-by: onur-ozkan <[email protected]>
  • Loading branch information
onur-ozkan committed Sep 25, 2024
1 parent de61cc2 commit 93efaa2
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 14 deletions.
4 changes: 2 additions & 2 deletions mm2src/mm2_core/src/mm_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub struct HealthcheckConfig {
pub blocking_ms_for_per_address: u64,
/// Lifetime of the message.
/// Do not change this unless you know what you are doing.
pub message_expiration: u64,
pub message_expiration_secs: u64,
/// Maximum time (milliseconds) to wait for healthcheck response.
pub timeout_secs: u64,
}
Expand All @@ -69,7 +69,7 @@ impl Default for HealthcheckConfig {
fn default() -> Self {
Self {
blocking_ms_for_per_address: 750,
message_expiration: 10,
message_expiration_secs: 10,
timeout_secs: 10,
}
}
Expand Down
34 changes: 22 additions & 12 deletions mm2src/mm2_main/src/lp_healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use common::{log, HttpStatusCode, StatusCode};
use derive_more::Display;
use futures::channel::oneshot::{self, Receiver, Sender};
use instant::Duration;
use mm2_core::mm_ctx::MmArc;
use mm2_core::mm_ctx::{HealthcheckConfig, MmArc};
use mm2_err_handle::prelude::MmError;
use mm2_libp2p::{decode_message, encode_message, pub_sub_topic, Libp2pPublic, PeerId, TopicPrefix};
use mm2_net::p2p::P2PContext;
use ser_error_derive::SerializeErrorType;
use serde::{Deserialize, Serialize};
use std::convert::TryFrom;
use std::convert::TryInto;
use std::num::TryFromIntError;
use std::str::FromStr;
Expand Down Expand Up @@ -52,14 +53,23 @@ impl HealthcheckMessage {
Ok(Self { signature, data })
}

pub(crate) fn is_received_message_valid(&self, my_peer_id: PeerId) -> bool {
pub(crate) fn is_received_message_valid(&self, my_peer_id: PeerId, healthcheck_config: &HealthcheckConfig) -> bool {
let now = Utc::now().timestamp();
if now > self.data.expires_at {
let remaining_expiration_seconds = u64::try_from(self.data.expires_at - now).unwrap_or(0);

if remaining_expiration_seconds == 0 {
log::debug!(
"Healthcheck message is expired. Current time in UTC: {now}, healthcheck `expires_at` in UTC: {}",
self.data.expires_at
);
return false;
} else if remaining_expiration_seconds > healthcheck_config.message_expiration_secs {
log::debug!(
"Healthcheck message have too high expiration time.\nMax allowed expiration seconds: {}\nReceived message expiration seconds: {}",
self.data.expires_at,
remaining_expiration_seconds,
);
return false;
}

if self.data.target_peer != my_peer_id {
Expand Down Expand Up @@ -264,7 +274,7 @@ pub async fn peer_connection_healthcheck_rpc(
false,
ctx.health_checker
.config
.message_expiration
.message_expiration_secs
.try_into()
.map_err(|e: TryFromIntError| HealthcheckRpcError::Internal { reason: e.to_string() })?,
)
Expand Down Expand Up @@ -329,7 +339,7 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li
// so KDF can handle a high amount of healthcheck messages more efficiently.
ctx.spawner().spawn(async move {
let my_peer_id = P2PContext::fetch_from_mm_arc(&ctx_c).peer_id();
if !data.is_received_message_valid(my_peer_id) {
if !data.is_received_message_valid(my_peer_id, &ctx_c.health_checker.config) {
log::error!("Received an invalid healthcheck message.");
log::debug!("Message context: {:?}", data);
return;
Expand Down Expand Up @@ -403,7 +413,7 @@ mod tests {
let ctx = ctx();
let target_peer = create_test_peer_id();
let message = HealthcheckMessage::generate_message(&ctx, target_peer, false, 5).unwrap();
assert!(message.is_received_message_valid(target_peer));
assert!(message.is_received_message_valid(target_peer, &ctx.health_checker.config));
});

cross_test!(test_corrupted_messages, {
Expand All @@ -412,29 +422,29 @@ mod tests {

let mut message = HealthcheckMessage::generate_message(&ctx, target_peer, false, 5).unwrap();
message.data.expires_at += 1;
assert!(!message.is_received_message_valid(target_peer));
assert!(!message.is_received_message_valid(target_peer, &ctx.health_checker.config));

let mut message = HealthcheckMessage::generate_message(&ctx, target_peer, false, 5).unwrap();
message.data.is_a_reply = !message.data.is_a_reply;
assert!(!message.is_received_message_valid(target_peer));
assert!(!message.is_received_message_valid(target_peer, &ctx.health_checker.config));

let mut message = HealthcheckMessage::generate_message(&ctx, target_peer, false, 5).unwrap();
message.data.sender_peer = message.data.target_peer;
assert!(!message.is_received_message_valid(target_peer));
assert!(!message.is_received_message_valid(target_peer, &ctx.health_checker.config));

let mut message = HealthcheckMessage::generate_message(&ctx, target_peer, false, 5).unwrap();
message.data.target_peer = message.data.sender_peer;
assert!(!message.is_received_message_valid(target_peer));
assert!(!message.is_received_message_valid(target_peer, &ctx.health_checker.config));

let message = HealthcheckMessage::generate_message(&ctx, target_peer, false, 5).unwrap();
assert!(!message.is_received_message_valid(message.data.sender_peer));
assert!(!message.is_received_message_valid(message.data.sender_peer, &ctx.health_checker.config));
});

cross_test!(test_expired_message, {
let ctx = ctx();
let target_peer = create_test_peer_id();
let message = HealthcheckMessage::generate_message(&ctx, target_peer, false, -1).unwrap();
assert!(!message.is_received_message_valid(target_peer));
assert!(!message.is_received_message_valid(target_peer, &ctx.health_checker.config));
});

cross_test!(test_encode_decode, {
Expand Down

0 comments on commit 93efaa2

Please sign in to comment.