From 982f2feb510fef4def7f34fa75c53780b4e87167 Mon Sep 17 00:00:00 2001 From: Thomas Braun Date: Wed, 12 Jul 2023 08:43:01 -0400 Subject: [PATCH] Use LruHashMap over HashMap for pending messages --- dkg-gadget/src/gossip_engine/network.rs | 14 ++++---- scripts/monitor_memory.py | 43 +++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 6 deletions(-) create mode 100644 scripts/monitor_memory.py diff --git a/dkg-gadget/src/gossip_engine/network.rs b/dkg-gadget/src/gossip_engine/network.rs index f15c8fc8d..4ddb4c5df 100644 --- a/dkg-gadget/src/gossip_engine/network.rs +++ b/dkg-gadget/src/gossip_engine/network.rs @@ -54,7 +54,7 @@ use sc_network_common::{ }; use sp_runtime::traits::{Block, NumberFor}; use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{HashMap, HashSet}, hash::Hash, iter, marker::PhantomData, @@ -123,7 +123,9 @@ impl NetworkGossipEngineBuilder { protocol_name: self.protocol_name.clone(), to_receiver: message_channel_tx, incoming_messages_stream: Arc::new(Mutex::new(Some(handler_channel_rx))), - pending_messages_peers: Arc::new(RwLock::new(HashMap::new())), + pending_messages_peers: Arc::new(RwLock::new(LruHashMap::new( + NonZeroUsize::new(MAX_KNOWN_MESSAGES).expect("Constant is nonzero"), + ))), authority_id_to_peer_id: Arc::new(RwLock::new(HashMap::new())), gossip_enabled: gossip_enabled.clone(), service, @@ -263,7 +265,7 @@ pub struct GossipHandler { /// these peers using the message hash while the message is /// received. This prevents that we receive the same message /// multiple times concurrently. - pending_messages_peers: Arc>>>, + pending_messages_peers: Arc>>>, /// Network service to use to send messages and manage peers. service: Arc>, // All connected peers @@ -554,8 +556,8 @@ impl GossipHandler { ); } }; - match pending_messages_peers.entry(message_hash) { - Entry::Vacant(entry) => { + match pending_messages_peers.inner.entry(message_hash) { + linked_hash_map::Entry::Vacant(entry) => { self.logger.debug(format!("NEW DKG MESSAGE FROM {who}")); if let Some(metrics) = self.metrics.as_ref() { metrics.dkg_new_signed_messages.inc(); @@ -566,7 +568,7 @@ impl GossipHandler { // we should add some good reputation to them. self.service.report_peer(who, rep::GOOD_MESSAGE); }, - Entry::Occupied(mut entry) => { + linked_hash_map::Entry::Occupied(mut entry) => { self.logger.debug(format!("OLD DKG MESSAGE FROM {who}")); if let Some(metrics) = self.metrics.as_ref() { metrics.dkg_old_signed_messages.inc(); diff --git a/scripts/monitor_memory.py b/scripts/monitor_memory.py new file mode 100644 index 000000000..eea62a534 --- /dev/null +++ b/scripts/monitor_memory.py @@ -0,0 +1,43 @@ +import psutil +import sys +import matplotlib.pyplot as plt +from matplotlib import animation +from collections import deque +import numpy as np + +if len(sys.argv) < 2: + print("Error: PID not specified") + sys.exit(1) + +pid = int(sys.argv[1]) + +try: + process = psutil.Process(pid) +except psutil.NoSuchProcess: + print(f"No process found with PID: {pid}") + sys.exit(1) + +fig = plt.figure() +ax = fig.add_subplot(1, 1, 1) +xs = deque(maxlen=10000) +ys = deque(maxlen=10000) + + +def animate(i): + mem = process.memory_info().rss / (1024**2) + + xs.append(i) + ys.append(mem) + + ax.clear() + ax.plot(xs, ys) + + if len(xs) > 1: + m, b = np.polyfit(xs, ys, 1) + print(f"Slope: {m}") + reg_line = [m*x + b for x in xs] + ax.plot(xs, reg_line, color='red') + + +ani = animation.FuncAnimation(fig, animate, interval=1000) +plt.show()