Skip to content

Commit

Permalink
Use LruHashMap over HashMap for pending messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Braun committed Jul 12, 2023
1 parent ca02f61 commit 982f2fe
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 6 deletions.
14 changes: 8 additions & 6 deletions dkg-gadget/src/gossip_engine/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use sc_network_common::{
};
use sp_runtime::traits::{Block, NumberFor};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
collections::{HashMap, HashSet},
hash::Hash,
iter,
marker::PhantomData,
Expand Down Expand Up @@ -123,7 +123,9 @@ impl NetworkGossipEngineBuilder {
protocol_name: self.protocol_name.clone(),
to_receiver: message_channel_tx,
incoming_messages_stream: Arc::new(Mutex::new(Some(handler_channel_rx))),
pending_messages_peers: Arc::new(RwLock::new(HashMap::new())),
pending_messages_peers: Arc::new(RwLock::new(LruHashMap::new(
NonZeroUsize::new(MAX_KNOWN_MESSAGES).expect("Constant is nonzero"),
))),
authority_id_to_peer_id: Arc::new(RwLock::new(HashMap::new())),
gossip_enabled: gossip_enabled.clone(),
service,
Expand Down Expand Up @@ -263,7 +265,7 @@ pub struct GossipHandler<B: Block + 'static> {
/// these peers using the message hash while the message is
/// received. This prevents that we receive the same message
/// multiple times concurrently.
pending_messages_peers: Arc<RwLock<HashMap<B::Hash, HashSet<PeerId>>>>,
pending_messages_peers: Arc<RwLock<LruHashMap<B::Hash, HashSet<PeerId>>>>,
/// Network service to use to send messages and manage peers.
service: Arc<NetworkService<B, B::Hash>>,
// All connected peers
Expand Down Expand Up @@ -554,8 +556,8 @@ impl<B: Block + 'static> GossipHandler<B> {
);
}
};
match pending_messages_peers.entry(message_hash) {
Entry::Vacant(entry) => {
match pending_messages_peers.inner.entry(message_hash) {
linked_hash_map::Entry::Vacant(entry) => {
self.logger.debug(format!("NEW DKG MESSAGE FROM {who}"));
if let Some(metrics) = self.metrics.as_ref() {
metrics.dkg_new_signed_messages.inc();
Expand All @@ -566,7 +568,7 @@ impl<B: Block + 'static> GossipHandler<B> {
// we should add some good reputation to them.
self.service.report_peer(who, rep::GOOD_MESSAGE);
},
Entry::Occupied(mut entry) => {
linked_hash_map::Entry::Occupied(mut entry) => {
self.logger.debug(format!("OLD DKG MESSAGE FROM {who}"));
if let Some(metrics) = self.metrics.as_ref() {
metrics.dkg_old_signed_messages.inc();
Expand Down
43 changes: 43 additions & 0 deletions scripts/monitor_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import psutil
import sys
import matplotlib.pyplot as plt
from matplotlib import animation
from collections import deque
import numpy as np

if len(sys.argv) < 2:
print("Error: PID not specified")
sys.exit(1)

pid = int(sys.argv[1])

try:
process = psutil.Process(pid)
except psutil.NoSuchProcess:
print(f"No process found with PID: {pid}")
sys.exit(1)

fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
xs = deque(maxlen=10000)
ys = deque(maxlen=10000)


def animate(i):
mem = process.memory_info().rss / (1024**2)

xs.append(i)
ys.append(mem)

ax.clear()
ax.plot(xs, ys)

if len(xs) > 1:
m, b = np.polyfit(xs, ys, 1)
print(f"Slope: {m}")
reg_line = [m*x + b for x in xs]
ax.plot(xs, reg_line, color='red')


ani = animation.FuncAnimation(fig, animate, interval=1000)
plt.show()

0 comments on commit 982f2fe

Please sign in to comment.