Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use LruHashMap over HashMap for pending messages + Other Fixes #673

Merged
merged 3 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions dkg-gadget/src/gossip_engine/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use sc_network_common::{
};
use sp_runtime::traits::{Block, NumberFor};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
collections::{HashMap, HashSet},
hash::Hash,
iter,
marker::PhantomData,
Expand Down Expand Up @@ -123,7 +123,9 @@ impl NetworkGossipEngineBuilder {
protocol_name: self.protocol_name.clone(),
to_receiver: message_channel_tx,
incoming_messages_stream: Arc::new(Mutex::new(Some(handler_channel_rx))),
pending_messages_peers: Arc::new(RwLock::new(HashMap::new())),
pending_messages_peers: Arc::new(RwLock::new(LruHashMap::new(
NonZeroUsize::new(MAX_KNOWN_MESSAGES).expect("Constant is nonzero"),
))),
authority_id_to_peer_id: Arc::new(RwLock::new(HashMap::new())),
gossip_enabled: gossip_enabled.clone(),
service,
Expand Down Expand Up @@ -263,7 +265,7 @@ pub struct GossipHandler<B: Block + 'static> {
/// these peers using the message hash while the message is
/// received. This prevents that we receive the same message
/// multiple times concurrently.
pending_messages_peers: Arc<RwLock<HashMap<B::Hash, HashSet<PeerId>>>>,
pending_messages_peers: Arc<RwLock<LruHashMap<B::Hash, HashSet<PeerId>>>>,
/// Network service to use to send messages and manage peers.
service: Arc<NetworkService<B, B::Hash>>,
// All connected peers
Expand Down Expand Up @@ -554,8 +556,8 @@ impl<B: Block + 'static> GossipHandler<B> {
);
}
};
match pending_messages_peers.entry(message_hash) {
Entry::Vacant(entry) => {
match pending_messages_peers.inner.entry(message_hash) {
linked_hash_map::Entry::Vacant(entry) => {
self.logger.debug(format!("NEW DKG MESSAGE FROM {who}"));
if let Some(metrics) = self.metrics.as_ref() {
metrics.dkg_new_signed_messages.inc();
Expand All @@ -566,7 +568,7 @@ impl<B: Block + 'static> GossipHandler<B> {
// we should add some good reputation to them.
self.service.report_peer(who, rep::GOOD_MESSAGE);
},
Entry::Occupied(mut entry) => {
linked_hash_map::Entry::Occupied(mut entry) => {
self.logger.debug(format!("OLD DKG MESSAGE FROM {who}"));
if let Some(metrics) = self.metrics.as_ref() {
metrics.dkg_old_signed_messages.inc();
Expand Down
10 changes: 5 additions & 5 deletions dkg-gadget/src/gossip_messages/misbehaviour_report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,12 @@ where

// Try to store reports offchain
if try_store_offchain(dkg_worker, &reports).await? {
let mut lock = dkg_worker.aggregated_misbehaviour_reports.write();
// remove the report from the queue
dkg_worker.aggregated_misbehaviour_reports.write().remove(&(
report.misbehaviour_type,
report.session_id,
report.offender,
));
lock.remove(&(report.misbehaviour_type, report.session_id, report.offender));

// Also, remove any misbehavior reports that are older than the session_id
lock.retain(|k, _| k.1 >= report.session_id);
}
Ok(())
} else {
Expand Down
15 changes: 10 additions & 5 deletions dkg-test-orchestrator/src/dummy_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use dkg_runtime_primitives::{
};
use hash_db::HashDB;
use parking_lot::RwLock;
use rand::Rng;
use sp_api::{ApiExt, AsTrieBackend, BlockT, StateBackend, *};
use sp_core::bounded_vec::BoundedVec;
use sp_runtime::{testing::H256, traits::BlakeTwo256, Permill};
Expand Down Expand Up @@ -40,24 +39,29 @@ pub struct DummyApiInner {
>,
pub should_execute_keygen: bool,
pub blocks_per_session: u64,
pub incrementing_batch_id: u32,
}

impl MutableBlockchain for DummyApi {
fn set_unsigned_proposals(
&self,
propos: Vec<(UnsignedProposal<dkg_runtime_primitives::CustomU32Getter<10000>>, u64)>,
) {
// use a random batch_id to avoid collision
let num = rand::thread_rng().gen_range(0..100);
// Use an incremented batch ID to avoid collision
let mut lock = self.inner.write();
let batch_id = lock.incrementing_batch_id;
lock.incrementing_batch_id += 1;

let batches = propos
.iter()
.map(|prop| StoredUnsignedProposalBatch {
proposals: vec![prop.clone().0].try_into().unwrap(),
batch_id: num,
batch_id,
timestamp: 0,
})
.collect::<Vec<_>>();
self.inner.write().unsigned_proposals = batches;

lock.unsigned_proposals = batches;
}

fn set_pub_key(&self, block_id: u64, key: Vec<u8>) {
Expand Down Expand Up @@ -96,6 +100,7 @@ impl DummyApi {
unsigned_proposals: vec![],
should_execute_keygen: false,
blocks_per_session,
incrementing_batch_id: 0,
})),
logger,
}
Expand Down
43 changes: 43 additions & 0 deletions scripts/monitor_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import psutil
import sys
import matplotlib.pyplot as plt
from matplotlib import animation
from collections import deque
import numpy as np

if len(sys.argv) < 2:
print("Error: PID not specified")
sys.exit(1)

pid = int(sys.argv[1])

try:
process = psutil.Process(pid)
except psutil.NoSuchProcess:
print(f"No process found with PID: {pid}")
sys.exit(1)

fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
xs = deque(maxlen=10000)
ys = deque(maxlen=10000)


def animate(i):
mem = process.memory_info().rss / (1024**2)

xs.append(i)
ys.append(mem)

ax.clear()
ax.plot(xs, ys)

if len(xs) > 1:
m, b = np.polyfit(xs, ys, 1)
print(f"Slope: {m}")
reg_line = [m*x + b for x in xs]
ax.plot(xs, reg_line, color='red')


ani = animation.FuncAnimation(fig, animate, interval=1000)
plt.show()
Loading