From c236ade8ef58825b4d39ede995cb02184772598b Mon Sep 17 00:00:00 2001 From: momosh Date: Tue, 27 Aug 2024 16:43:46 +0200 Subject: [PATCH] putting kad records is completely handled as part of metrics state --- core/src/network/p2p.rs | 18 ++- core/src/network/p2p/client.rs | 25 ++-- core/src/network/p2p/event_loop.rs | 101 ++++------------ core/src/telemetry/otlp.rs | 182 ++++++++++++++++++++++++++--- 4 files changed, 211 insertions(+), 115 deletions(-) diff --git a/core/src/network/p2p.rs b/core/src/network/p2p.rs index 231ccddc5..b04b8dc41 100644 --- a/core/src/network/p2p.rs +++ b/core/src/network/p2p.rs @@ -4,7 +4,7 @@ use configuration::LibP2PConfig; use libp2p::{ autonat, dcutr, identify, identity::{self, ed25519, Keypair}, - kad::{self, Mode, PeerRecord}, + kad::{self, Mode, PeerRecord, QueryStats, Record, RecordKey}, mdns, noise, ping, relay, swarm::NetworkBehaviour, tcp, upnp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder, @@ -61,12 +61,12 @@ Bootstrap node list must not be empty. Either use a '--network' flag or add a list of bootstrap nodes in the configuration file. "#; -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum OutputEvent { IncomingGetRecord, IncomingPutRecord, KadModeChange(Mode), - PutRecord { success_rate: f64, duration: f64 }, + Ping(Duration), IncomingConnection, IncomingConnectionError, @@ -74,6 +74,18 @@ pub enum OutputEvent { EstablishedConnection, OutgoingConnectionError, Count, + PutRecord { + block_num: u32, + records: Vec, + }, + PutRecordSuccess { + record_key: RecordKey, + query_stats: QueryStats, + }, + PutRecordFailed { + record_key: RecordKey, + query_stats: QueryStats, + }, } #[derive(Clone)] diff --git a/core/src/network/p2p/client.rs b/core/src/network/p2p/client.rs index 732a305c0..60ded539f 100644 --- a/core/src/network/p2p/client.rs +++ b/core/src/network/p2p/client.rs @@ -20,8 +20,8 @@ use tokio::sync::{mpsc::UnboundedSender, oneshot}; use tracing::{debug, info, trace}; use super::{ - event_loop::{BlockStat, ConnectionEstablishedInfo}, - is_global, is_multiaddr_global, Command, EventLoop, MultiAddressInfo, PeerInfo, QueryChannel, + event_loop::ConnectionEstablishedInfo, is_global, is_multiaddr_global, Command, EventLoop, + MultiAddressInfo, OutputEvent, PeerInfo, QueryChannel, }; use crate::types::MultiaddrConfig; @@ -203,27 +203,18 @@ impl Client { ) -> Result<()> { self.command_sender .send(Box::new(move |context: &mut EventLoop| { - context - .active_blocks - .entry(block_num) - // increase the total cell count we monitor if the block entry already exists - .and_modify(|block| block.increase_block_stat_counters(records.len())) - // initiate counting for the new block if the block doesn't exist - .or_insert(BlockStat { - total_count: records.len(), - remaining_counter: records.len(), - success_counter: 0, - error_counter: 0, - time_stat: 0, - }); - - for record in records { + for record in records.clone() { let _ = context .swarm .behaviour_mut() .kademlia .put_record(record, quorum); } + + context + .event_sender + .send(OutputEvent::PutRecord { block_num, records })?; + Ok(()) })) .map_err(|_| eyre!("Failed to send the Put Kad Record Command to the EventLoop")) diff --git a/core/src/network/p2p/event_loop.rs b/core/src/network/p2p/event_loop.rs index 86a04619d..9b67b3933 100644 --- a/core/src/network/p2p/event_loop.rs +++ b/core/src/network/p2p/event_loop.rs @@ -7,7 +7,7 @@ use libp2p::{ identify::{self, Info}, kad::{ self, store::RecordStore, BootstrapOk, GetRecordOk, InboundRequest, Mode, PutRecordOk, - QueryId, QueryResult, QueryStats, RecordKey, + QueryId, QueryResult, RecordKey, }, mdns, multiaddr::Protocol, @@ -35,22 +35,6 @@ use crate::{ types::TimeToLive, }; -#[derive(Debug)] -pub struct BlockStat { - pub total_count: usize, - pub remaining_counter: usize, - pub success_counter: usize, - pub error_counter: usize, - pub time_stat: u64, -} - -impl BlockStat { - pub fn increase_block_stat_counters(&mut self, cell_number: usize) { - self.total_count += cell_number; - self.remaining_counter += cell_number; - } -} - // RelayState keeps track of all things relay related struct RelayState { // id of the selected Relay that needs to be connected @@ -141,15 +125,13 @@ impl EventCounter { pub struct EventLoop { pub swarm: Swarm, command_receiver: UnboundedReceiver, - pub event_sender: broadcast::Sender, + pub(crate) event_sender: broadcast::Sender, // Tracking Kademlia events pub pending_kad_queries: HashMap, // Tracking swarm events (i.e. peer dialing) pub pending_swarm_events: HashMap>>, relay: RelayState, bootstrap: BootstrapState, - /// Blocks we monitor for PUT success rate - pub active_blocks: HashMap, shutdown: Controller, event_loop_config: EventLoopConfig, pub kad_mode: Mode, @@ -208,7 +190,6 @@ impl EventLoop { is_startup_done: false, timer: interval_at(Instant::now() + bootstrap_interval, bootstrap_interval), }, - active_blocks: Default::default(), shutdown, event_loop_config: EventLoopConfig { is_fat_client, @@ -346,13 +327,18 @@ impl EventLoop { }; match error { - kad::PutRecordError::QuorumFailed { key, .. } => { - self.handle_put_result(key.clone(), stats.clone(), true) - .await; - }, - kad::PutRecordError::Timeout { key, .. } => { - self.handle_put_result(key.clone(), stats.clone(), true) - .await; + kad::PutRecordError::QuorumFailed { key, .. } + | kad::PutRecordError::Timeout { key, .. } => { + // Remove local records for fat clients (memory optimization) + if self.event_loop_config.is_fat_client { + debug!("Pruning local records on fat client"); + self.swarm.behaviour_mut().kademlia.remove_record(&key); + } + + _ = self.event_sender.send(OutputEvent::PutRecordFailed { + record_key: key, + query_stats: stats, + }); }, } }, @@ -360,8 +346,16 @@ impl EventLoop { QueryResult::PutRecord(Ok(PutRecordOk { key })) => { _ = self.pending_kad_queries.remove(&id); - self.handle_put_result(key.clone(), stats.clone(), false) - .await; + // Remove local records for fat clients (memory optimization) + if self.event_loop_config.is_fat_client { + debug!("Pruning local records on fat client"); + self.swarm.behaviour_mut().kademlia.remove_record(&key); + } + + _ = self.event_sender.send(OutputEvent::PutRecordSuccess { + record_key: key, + query_stats: stats, + }); }, QueryResult::Bootstrap(result) => match result { Ok(BootstrapOk { @@ -673,53 +667,6 @@ impl EventLoop { }, } } - - async fn handle_put_result(&mut self, key: RecordKey, stats: QueryStats, is_error: bool) { - let block_num = match key.clone().try_into() { - Ok(DHTKey::Cell(block_num, _, _)) => block_num, - Ok(DHTKey::Row(block_num, _)) => block_num, - Err(error) => { - warn!("Unable to cast Kademlia key to DHT key: {error}"); - return; - }, - }; - if let Some(block) = self.active_blocks.get_mut(&block_num) { - // Decrement record counter for this block - block.remaining_counter -= 1; - if is_error { - block.error_counter += 1; - } else { - block.success_counter += 1; - } - - block.time_stat = stats - .duration() - .as_ref() - .map(Duration::as_secs) - .unwrap_or_default(); - - if block.remaining_counter == 0 { - let success_rate = block.success_counter as f64 / block.total_count as f64; - info!( - "Cell upload success rate for block {block_num}: {}/{}. Duration: {}", - block.success_counter, block.total_count, block.time_stat - ); - - _ = self.event_sender.send(OutputEvent::PutRecord { - success_rate, - duration: block.time_stat as f64, - }); - } - - if self.event_loop_config.is_fat_client { - // Remove local records for fat clients (memory optimization) - debug!("Pruning local records on fat client"); - self.swarm.behaviour_mut().kademlia.remove_record(&key); - } - } else { - debug!("Can't find block in the active blocks list") - } - } } #[cfg(test)] diff --git a/core/src/telemetry/otlp.rs b/core/src/telemetry/otlp.rs index f3e874387..ff57a62fc 100644 --- a/core/src/telemetry/otlp.rs +++ b/core/src/telemetry/otlp.rs @@ -2,8 +2,10 @@ use super::{metric, MetricCounter, MetricValue}; use crate::{network::p2p::OutputEvent, telemetry::MetricName, types::Origin}; use async_trait::async_trait; use color_eyre::{eyre::eyre, Result}; -use futures::stream::TryStreamExt; -use libp2p::{kad::Mode, Multiaddr}; +use libp2p::{ + kad::{Mode, QueryStats, RecordKey}, + Multiaddr, +}; use opentelemetry::{ global, metrics::{Counter, Meter}, @@ -13,7 +15,74 @@ use opentelemetry_otlp::{ExportConfig, Protocol, WithExportConfig}; use serde::{Deserialize, Serialize}; use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::sync::{broadcast, Mutex, RwLock}; -use tokio_stream::wrappers::BroadcastStream; +use tracing::{debug, info, warn}; + +#[derive(Debug)] +pub struct BlockStat { + pub total_count: usize, + pub remaining_counter: usize, + pub success_counter: usize, + pub error_counter: usize, + pub time_stat: u64, +} + +impl BlockStat { + pub fn increase_block_stat_counters(&mut self, cell_number: usize) { + self.total_count += cell_number; + self.remaining_counter += cell_number; + } + + fn increment_success_counter(&mut self) { + self.success_counter += 1; + } + + fn increment_error_counter(&mut self) { + self.error_counter += 1; + } + + fn decrement_remaining_counter(&mut self) { + self.remaining_counter -= 1; + } + + fn is_completed(&self) -> bool { + self.remaining_counter == 0 + } + + fn update_time_stat(&mut self, stats: &QueryStats) { + self.time_stat = stats + .duration() + .as_ref() + .map(Duration::as_secs) + .unwrap_or_default(); + } + + fn success_rate(&self) -> f64 { + self.success_counter as f64 / self.total_count as f64 + } +} + +#[derive(PartialEq, Debug)] +enum DHTKey { + Cell(u32, u32, u32), + Row(u32, u32), +} + +impl TryFrom for DHTKey { + type Error = color_eyre::Report; + + fn try_from(key: RecordKey) -> std::result::Result { + match *String::from_utf8(key.to_vec())? + .split(':') + .map(str::parse::) + .collect::, _>>()? + .as_slice() + { + [block_num, row_num] => Ok(DHTKey::Row(block_num, row_num)), + [block_num, row_num, col_num] => Ok(DHTKey::Cell(block_num, row_num, col_num)), + _ => Err(eyre!("Invalid DHT key")), + } + } +} // NOTE: Buffers are less space efficient, as opposed to the solution with in place compute. // That can be optimized by using dedicated data structure with proper bounds. @@ -69,6 +138,36 @@ impl Metrics { let mut multiaddress = self.multiaddress.write().await; *multiaddress = value; } + + fn extract_block_num(&self, key: &RecordKey) -> Result { + match key.clone().try_into() { + Ok(DHTKey::Cell(block_num, _, _)) | Ok(DHTKey::Row(block_num, _)) => Ok(block_num), + Err(error) => { + warn!("Unable to cast KAD key to DHT key: {error}"); + Err(eyre!("Invalid key: {error}")) + }, + } + } + + async fn log_and_record_block_completion( + &self, + block_num: u32, + block: &BlockStat, + ) -> Result<()> { + if block.is_completed() { + // log Block stats + let success_rate = block.success_rate(); + info!( + "Cell upload success rate for block {}: {}. Duration: {}", + block_num, success_rate, block.time_stat + ); + // record metric values + super::Metrics::record(self, MetricValue::DHTPutSuccess(success_rate)).await; + super::Metrics::record(self, MetricValue::DHTPutDuration(block.time_stat as f64)).await; + } + + Ok(()) + } } #[derive(Debug)] @@ -212,12 +311,15 @@ impl super::Metrics for Metrics { async fn handle_event_stream( &self, - event_receiver: broadcast::Receiver, + mut event_receiver: broadcast::Receiver, ) -> Result<()> { - let events = BroadcastStream::new(event_receiver.resubscribe()); - events - .try_for_each(|event| async move { - match event { + // blocks we monitor for PUT success rate + // TODO: will live here until Metrics is free of Arc + let mut active_blocks: HashMap = Default::default(); + + loop { + match event_receiver.recv().await { + Ok(event) => match event { OutputEvent::Count => { self.count(MetricCounter::EventLoopEvent).await; }, @@ -249,19 +351,63 @@ impl super::Metrics for Metrics { OutputEvent::OutgoingConnectionError => { self.count(MetricCounter::OutgoingConnectionErrors).await; }, - OutputEvent::PutRecord { - success_rate, - duration, + OutputEvent::PutRecord { block_num, records } => { + active_blocks + .entry(block_num) + .and_modify(|block| block.increase_block_stat_counters(records.len())) + .or_insert(BlockStat { + total_count: records.len(), + remaining_counter: records.len(), + success_counter: 0, + error_counter: 0, + time_stat: 0, + }); + }, + OutputEvent::PutRecordSuccess { + record_key, + query_stats, } => { - self.record(MetricValue::DHTPutSuccess(success_rate)).await; - self.record(MetricValue::DHTPutDuration(duration)).await; + let block_num = self.extract_block_num(&record_key)?; + if let Some(block) = active_blocks.get_mut(&block_num) { + block.increment_success_counter(); + block.decrement_remaining_counter(); + block.update_time_stat(&query_stats); + + self.log_and_record_block_completion(block_num, block) + .await?; + } else { + debug!("Cant't find block: {} in active block list", block_num); + }; }, - } + OutputEvent::PutRecordFailed { + record_key, + query_stats, + } => { + let block_num = self.extract_block_num(&record_key)?; + if let Some(block) = active_blocks.get_mut(&block_num) { + block.increment_error_counter(); + block.decrement_remaining_counter(); + block.update_time_stat(&query_stats); + + self.log_and_record_block_completion(block_num, block) + .await?; + } else { + debug!("Cant't find block: {} in active block list", block_num); + }; + }, + }, + Err(broadcast::error::RecvError::Closed) => { + // channel closed, exit the loop + break; + }, + Err(broadcast::error::RecvError::Lagged(missed_events)) => { + warn!("Missed {} events due to channel lag", missed_events); + continue; + }, + } + } - Ok(()) - }) - .await - .map_err(|err| eyre!("Error receiving the P2P Output Event: {err}")) + Ok(()) } }