Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moved KAD Put Records operation handling into Metrics state #685

Merged
merged 1 commit into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions core/src/network/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use configuration::LibP2PConfig;
use libp2p::{
autonat, dcutr, identify,
identity::{self, ed25519, Keypair},
kad::{self, Mode, PeerRecord},
kad::{self, Mode, PeerRecord, QueryStats, Record, RecordKey},
mdns, noise, ping, relay,
swarm::NetworkBehaviour,
tcp, upnp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder,
Expand Down Expand Up @@ -61,19 +61,31 @@ Bootstrap node list must not be empty.
Either use a '--network' flag or add a list of bootstrap nodes in the configuration file.
"#;

#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum OutputEvent {
IncomingGetRecord,
IncomingPutRecord,
KadModeChange(Mode),
PutRecord { success_rate: f64, duration: f64 },

Ping(Duration),
IncomingConnection,
IncomingConnectionError,
MultiaddressUpdate(Multiaddr),
EstablishedConnection,
OutgoingConnectionError,
Count,
PutRecord {
block_num: u32,
records: Vec<Record>,
},
PutRecordSuccess {
record_key: RecordKey,
query_stats: QueryStats,
},
PutRecordFailed {
record_key: RecordKey,
query_stats: QueryStats,
},
}

#[derive(Clone)]
Expand Down
25 changes: 8 additions & 17 deletions core/src/network/p2p/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use tokio::sync::{mpsc::UnboundedSender, oneshot};
use tracing::{debug, info, trace};

use super::{
event_loop::{BlockStat, ConnectionEstablishedInfo},
is_global, is_multiaddr_global, Command, EventLoop, MultiAddressInfo, PeerInfo, QueryChannel,
event_loop::ConnectionEstablishedInfo, is_global, is_multiaddr_global, Command, EventLoop,
MultiAddressInfo, OutputEvent, PeerInfo, QueryChannel,
};
use crate::types::MultiaddrConfig;

Expand Down Expand Up @@ -203,27 +203,18 @@ impl Client {
) -> Result<()> {
self.command_sender
.send(Box::new(move |context: &mut EventLoop| {
context
.active_blocks
.entry(block_num)
// increase the total cell count we monitor if the block entry already exists
.and_modify(|block| block.increase_block_stat_counters(records.len()))
// initiate counting for the new block if the block doesn't exist
.or_insert(BlockStat {
total_count: records.len(),
remaining_counter: records.len(),
success_counter: 0,
error_counter: 0,
time_stat: 0,
});

for record in records {
for record in records.clone() {
let _ = context
.swarm
.behaviour_mut()
.kademlia
.put_record(record, quorum);
}

context
.event_sender
.send(OutputEvent::PutRecord { block_num, records })?;

Ok(())
}))
.map_err(|_| eyre!("Failed to send the Put Kad Record Command to the EventLoop"))
Expand Down
101 changes: 24 additions & 77 deletions core/src/network/p2p/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use libp2p::{
identify::{self, Info},
kad::{
self, store::RecordStore, BootstrapOk, GetRecordOk, InboundRequest, Mode, PutRecordOk,
QueryId, QueryResult, QueryStats, RecordKey,
QueryId, QueryResult, RecordKey,
},
mdns,
multiaddr::Protocol,
Expand Down Expand Up @@ -35,22 +35,6 @@ use crate::{
types::TimeToLive,
};

#[derive(Debug)]
pub struct BlockStat {
pub total_count: usize,
pub remaining_counter: usize,
pub success_counter: usize,
pub error_counter: usize,
pub time_stat: u64,
}

impl BlockStat {
pub fn increase_block_stat_counters(&mut self, cell_number: usize) {
self.total_count += cell_number;
self.remaining_counter += cell_number;
}
}

// RelayState keeps track of all things relay related
struct RelayState {
// id of the selected Relay that needs to be connected
Expand Down Expand Up @@ -141,15 +125,13 @@ impl EventCounter {
pub struct EventLoop {
pub swarm: Swarm<Behaviour>,
command_receiver: UnboundedReceiver<Command>,
pub event_sender: broadcast::Sender<OutputEvent>,
pub(crate) event_sender: broadcast::Sender<OutputEvent>,
// Tracking Kademlia events
pub pending_kad_queries: HashMap<QueryId, QueryChannel>,
// Tracking swarm events (i.e. peer dialing)
pub pending_swarm_events: HashMap<PeerId, oneshot::Sender<Result<ConnectionEstablishedInfo>>>,
relay: RelayState,
bootstrap: BootstrapState,
/// Blocks we monitor for PUT success rate
pub active_blocks: HashMap<u32, BlockStat>,
shutdown: Controller<String>,
event_loop_config: EventLoopConfig,
pub kad_mode: Mode,
Expand Down Expand Up @@ -208,7 +190,6 @@ impl EventLoop {
is_startup_done: false,
timer: interval_at(Instant::now() + bootstrap_interval, bootstrap_interval),
},
active_blocks: Default::default(),
shutdown,
event_loop_config: EventLoopConfig {
is_fat_client,
Expand Down Expand Up @@ -346,22 +327,35 @@ impl EventLoop {
};

match error {
kad::PutRecordError::QuorumFailed { key, .. } => {
self.handle_put_result(key.clone(), stats.clone(), true)
.await;
},
kad::PutRecordError::Timeout { key, .. } => {
self.handle_put_result(key.clone(), stats.clone(), true)
.await;
kad::PutRecordError::QuorumFailed { key, .. }
| kad::PutRecordError::Timeout { key, .. } => {
// Remove local records for fat clients (memory optimization)
if self.event_loop_config.is_fat_client {
debug!("Pruning local records on fat client");
self.swarm.behaviour_mut().kademlia.remove_record(&key);
}

_ = self.event_sender.send(OutputEvent::PutRecordFailed {
record_key: key,
query_stats: stats,
});
},
}
},

QueryResult::PutRecord(Ok(PutRecordOk { key })) => {
_ = self.pending_kad_queries.remove(&id);

self.handle_put_result(key.clone(), stats.clone(), false)
.await;
// Remove local records for fat clients (memory optimization)
if self.event_loop_config.is_fat_client {
debug!("Pruning local records on fat client");
self.swarm.behaviour_mut().kademlia.remove_record(&key);
}

_ = self.event_sender.send(OutputEvent::PutRecordSuccess {
record_key: key,
query_stats: stats,
});
},
QueryResult::Bootstrap(result) => match result {
Ok(BootstrapOk {
Expand Down Expand Up @@ -673,53 +667,6 @@ impl EventLoop {
},
}
}

async fn handle_put_result(&mut self, key: RecordKey, stats: QueryStats, is_error: bool) {
let block_num = match key.clone().try_into() {
Ok(DHTKey::Cell(block_num, _, _)) => block_num,
Ok(DHTKey::Row(block_num, _)) => block_num,
Err(error) => {
warn!("Unable to cast Kademlia key to DHT key: {error}");
return;
},
};
if let Some(block) = self.active_blocks.get_mut(&block_num) {
// Decrement record counter for this block
block.remaining_counter -= 1;
if is_error {
block.error_counter += 1;
} else {
block.success_counter += 1;
}

block.time_stat = stats
.duration()
.as_ref()
.map(Duration::as_secs)
.unwrap_or_default();

if block.remaining_counter == 0 {
let success_rate = block.success_counter as f64 / block.total_count as f64;
info!(
"Cell upload success rate for block {block_num}: {}/{}. Duration: {}",
block.success_counter, block.total_count, block.time_stat
);

_ = self.event_sender.send(OutputEvent::PutRecord {
success_rate,
duration: block.time_stat as f64,
});
}

if self.event_loop_config.is_fat_client {
// Remove local records for fat clients (memory optimization)
debug!("Pruning local records on fat client");
self.swarm.behaviour_mut().kademlia.remove_record(&key);
}
} else {
debug!("Can't find block in the active blocks list")
}
}
}

#[cfg(test)]
Expand Down
Loading
Loading