Skip to content

Commit

Permalink
feat(new-RPC): connection healthcheck implementation for peers (#2194)
Browse files Browse the repository at this point in the history
  • Loading branch information
onur-ozkan authored Oct 3, 2024
1 parent 76d4342 commit 1f3dffa
Show file tree
Hide file tree
Showing 10 changed files with 629 additions and 18 deletions.
22 changes: 18 additions & 4 deletions mm2src/common/expirable_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,26 @@ pub struct ExpirableEntry<V> {
}

impl<V> ExpirableEntry<V> {
#[inline(always)]
pub fn new(v: V, exp: Duration) -> Self {
Self {
expires_at: Instant::now() + exp,
value: v,
}
}

#[inline(always)]
pub fn get_element(&self) -> &V { &self.value }

#[inline(always)]
pub fn update_value(&mut self, v: V) { self.value = v }

#[inline(always)]
pub fn update_expiration(&mut self, expires_at: Instant) { self.expires_at = expires_at }

/// Checks whether entry has longer ttl than the given one.
#[inline(always)]
pub fn has_longer_life_than(&self, min_ttl: Duration) -> bool { self.expires_at > Instant::now() + min_ttl }
}

impl<K: Eq + Hash, V> Default for ExpirableMap<K, V> {
Expand Down Expand Up @@ -47,10 +64,7 @@ impl<K: Eq + Hash, V> ExpirableMap<K, V> {
/// If a value already exists for the given key, it will be updated and then
/// the old one will be returned.
pub fn insert(&mut self, k: K, v: V, exp: Duration) -> Option<V> {
let entry = ExpirableEntry {
expires_at: Instant::now() + exp,
value: v,
};
let entry = ExpirableEntry::new(v, exp);

self.0.insert(k, entry).map(|v| v.value)
}
Expand Down
11 changes: 8 additions & 3 deletions mm2src/mm2_core/src/mm_ctx.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#[cfg(feature = "track-ctx-pointer")]
use common::executor::Timer;
use common::executor::{abortable_queue::{AbortableQueue, WeakSpawner},
graceful_shutdown, AbortSettings, AbortableSystem, SpawnAbortable, SpawnFuture};
use common::log::{self, LogLevel, LogOnError, LogState};
use common::{cfg_native, cfg_wasm32, small_rng};
use common::{executor::{abortable_queue::{AbortableQueue, WeakSpawner},
graceful_shutdown, AbortSettings, AbortableSystem, SpawnAbortable, SpawnFuture},
expirable_map::ExpirableMap};
use futures::channel::oneshot;
use futures::lock::Mutex as AsyncMutex;
use gstuff::{try_s, Constructible, ERR, ERRL};
use lazy_static::lazy_static;
use mm2_event_stream::{controller::Controller, Event, EventStreamConfiguration};
Expand All @@ -30,7 +33,6 @@ cfg_wasm32! {
cfg_native! {
use db_common::async_sql_conn::AsyncConnection;
use db_common::sqlite::rusqlite::Connection;
use futures::lock::Mutex as AsyncMutex;
use rustls::ServerName;
use mm2_metrics::prometheus;
use mm2_metrics::MmMetricsError;
Expand Down Expand Up @@ -142,6 +144,8 @@ pub struct MmCtx {
/// asynchronous handle for rusqlite connection.
#[cfg(not(target_arch = "wasm32"))]
pub async_sqlite_connection: Constructible<Arc<AsyncMutex<AsyncConnection>>>,
/// Links the RPC context to the P2P context to handle health check responses.
pub healthcheck_response_handler: AsyncMutex<ExpirableMap<String, oneshot::Sender<()>>>,
}

impl MmCtx {
Expand Down Expand Up @@ -191,6 +195,7 @@ impl MmCtx {
nft_ctx: Mutex::new(None),
#[cfg(not(target_arch = "wasm32"))]
async_sqlite_connection: Constructible::default(),
healthcheck_response_handler: AsyncMutex::new(ExpirableMap::default()),
}
}

Expand Down
1 change: 1 addition & 0 deletions mm2src/mm2_main/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ bitcrypto = { path = "../mm2_bitcoin/crypto" }
blake2 = "0.10.6"
bytes = "0.4"
chain = { path = "../mm2_bitcoin/chain" }
chrono = "0.4"
cfg-if = "1.0"
coins = { path = "../coins" }
coins_activation = { path = "../coins_activation" }
Expand Down
Loading

0 comments on commit 1f3dffa

Please sign in to comment.