diff --git a/dkg-gadget/src/gossip_engine/network.rs b/dkg-gadget/src/gossip_engine/network.rs index f15c8fc8d..4ddb4c5df 100644 --- a/dkg-gadget/src/gossip_engine/network.rs +++ b/dkg-gadget/src/gossip_engine/network.rs @@ -54,7 +54,7 @@ use sc_network_common::{ }; use sp_runtime::traits::{Block, NumberFor}; use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{HashMap, HashSet}, hash::Hash, iter, marker::PhantomData, @@ -123,7 +123,9 @@ impl NetworkGossipEngineBuilder { protocol_name: self.protocol_name.clone(), to_receiver: message_channel_tx, incoming_messages_stream: Arc::new(Mutex::new(Some(handler_channel_rx))), - pending_messages_peers: Arc::new(RwLock::new(HashMap::new())), + pending_messages_peers: Arc::new(RwLock::new(LruHashMap::new( + NonZeroUsize::new(MAX_KNOWN_MESSAGES).expect("Constant is nonzero"), + ))), authority_id_to_peer_id: Arc::new(RwLock::new(HashMap::new())), gossip_enabled: gossip_enabled.clone(), service, @@ -263,7 +265,7 @@ pub struct GossipHandler { /// these peers using the message hash while the message is /// received. This prevents that we receive the same message /// multiple times concurrently. - pending_messages_peers: Arc>>>, + pending_messages_peers: Arc>>>, /// Network service to use to send messages and manage peers. service: Arc>, // All connected peers @@ -554,8 +556,8 @@ impl GossipHandler { ); } }; - match pending_messages_peers.entry(message_hash) { - Entry::Vacant(entry) => { + match pending_messages_peers.inner.entry(message_hash) { + linked_hash_map::Entry::Vacant(entry) => { self.logger.debug(format!("NEW DKG MESSAGE FROM {who}")); if let Some(metrics) = self.metrics.as_ref() { metrics.dkg_new_signed_messages.inc(); @@ -566,7 +568,7 @@ impl GossipHandler { // we should add some good reputation to them. self.service.report_peer(who, rep::GOOD_MESSAGE); }, - Entry::Occupied(mut entry) => { + linked_hash_map::Entry::Occupied(mut entry) => { self.logger.debug(format!("OLD DKG MESSAGE FROM {who}")); if let Some(metrics) = self.metrics.as_ref() { metrics.dkg_old_signed_messages.inc(); diff --git a/dkg-gadget/src/gossip_messages/misbehaviour_report.rs b/dkg-gadget/src/gossip_messages/misbehaviour_report.rs index 066200a6a..244cf446a 100644 --- a/dkg-gadget/src/gossip_messages/misbehaviour_report.rs +++ b/dkg-gadget/src/gossip_messages/misbehaviour_report.rs @@ -205,12 +205,12 @@ where // Try to store reports offchain if try_store_offchain(dkg_worker, &reports).await? { + let mut lock = dkg_worker.aggregated_misbehaviour_reports.write(); // remove the report from the queue - dkg_worker.aggregated_misbehaviour_reports.write().remove(&( - report.misbehaviour_type, - report.session_id, - report.offender, - )); + lock.remove(&(report.misbehaviour_type, report.session_id, report.offender)); + + // Also, remove any misbehavior reports that are older than the session_id + lock.retain(|k, _| k.1 >= report.session_id); } Ok(()) } else { diff --git a/dkg-test-orchestrator/src/dummy_api.rs b/dkg-test-orchestrator/src/dummy_api.rs index e2fedf7b1..d672c933f 100644 --- a/dkg-test-orchestrator/src/dummy_api.rs +++ b/dkg-test-orchestrator/src/dummy_api.rs @@ -6,7 +6,6 @@ use dkg_runtime_primitives::{ }; use hash_db::HashDB; use parking_lot::RwLock; -use rand::Rng; use sp_api::{ApiExt, AsTrieBackend, BlockT, StateBackend, *}; use sp_core::bounded_vec::BoundedVec; use sp_runtime::{testing::H256, traits::BlakeTwo256, Permill}; @@ -40,6 +39,7 @@ pub struct DummyApiInner { >, pub should_execute_keygen: bool, pub blocks_per_session: u64, + pub incrementing_batch_id: u32, } impl MutableBlockchain for DummyApi { @@ -47,17 +47,21 @@ impl MutableBlockchain for DummyApi { &self, propos: Vec<(UnsignedProposal>, u64)>, ) { - // use a random batch_id to avoid collision - let num = rand::thread_rng().gen_range(0..100); + // Use an incremented batch ID to avoid collision + let mut lock = self.inner.write(); + let batch_id = lock.incrementing_batch_id; + lock.incrementing_batch_id += 1; + let batches = propos .iter() .map(|prop| StoredUnsignedProposalBatch { proposals: vec![prop.clone().0].try_into().unwrap(), - batch_id: num, + batch_id, timestamp: 0, }) .collect::>(); - self.inner.write().unsigned_proposals = batches; + + lock.unsigned_proposals = batches; } fn set_pub_key(&self, block_id: u64, key: Vec) { @@ -96,6 +100,7 @@ impl DummyApi { unsigned_proposals: vec![], should_execute_keygen: false, blocks_per_session, + incrementing_batch_id: 0, })), logger, } diff --git a/scripts/monitor_memory.py b/scripts/monitor_memory.py new file mode 100644 index 000000000..eea62a534 --- /dev/null +++ b/scripts/monitor_memory.py @@ -0,0 +1,43 @@ +import psutil +import sys +import matplotlib.pyplot as plt +from matplotlib import animation +from collections import deque +import numpy as np + +if len(sys.argv) < 2: + print("Error: PID not specified") + sys.exit(1) + +pid = int(sys.argv[1]) + +try: + process = psutil.Process(pid) +except psutil.NoSuchProcess: + print(f"No process found with PID: {pid}") + sys.exit(1) + +fig = plt.figure() +ax = fig.add_subplot(1, 1, 1) +xs = deque(maxlen=10000) +ys = deque(maxlen=10000) + + +def animate(i): + mem = process.memory_info().rss / (1024**2) + + xs.append(i) + ys.append(mem) + + ax.clear() + ax.plot(xs, ys) + + if len(xs) > 1: + m, b = np.polyfit(xs, ys, 1) + print(f"Slope: {m}") + reg_line = [m*x + b for x in xs] + ax.plot(xs, reg_line, color='red') + + +ani = animation.FuncAnimation(fig, animate, interval=1000) +plt.show()