From c8c015d5763aa00d23ac1d629f5b06ee1a499dab Mon Sep 17 00:00:00 2001 From: Ljubisa Isakovic <93769705+sh3ll3x3c@users.noreply.github.com> Date: Fri, 1 Mar 2024 08:22:33 +0100 Subject: [PATCH] Improved p2p level logging (#467) --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/light_client.rs | 1 - src/maintenance.rs | 20 ++++++--- src/network/p2p.rs | 3 ++ src/network/p2p/client.rs | 84 ++++++++++++++++++++++++----------- src/network/p2p/event_loop.rs | 25 ++++++----- src/telemetry/mod.rs | 3 +- src/telemetry/otlp.rs | 16 ++----- 9 files changed, 97 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d61c8bec0..d4f5f77e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -584,7 +584,7 @@ dependencies = [ [[package]] name = "avail-light" -version = "1.7.8" +version = "1.8.0" dependencies = [ "async-std", "async-stream", diff --git a/Cargo.toml b/Cargo.toml index c1fdc41a8..c7c529a48 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "avail-light" -version = "1.7.8" +version = "1.8.0" authors = ["Avail Team"] default-run = "avail-light" edition = "2021" diff --git a/src/light_client.rs b/src/light_client.rs index 212c9f5b2..9d445d534 100644 --- a/src/light_client.rs +++ b/src/light_client.rs @@ -380,7 +380,6 @@ mod tests { mock_metrics.expect_count().returning(|_| ()); mock_metrics.expect_record().returning(|_| Ok(())); mock_metrics.expect_set_multiaddress().returning(|_| ()); - mock_metrics.expect_set_ip().returning(|_| ()); process_block( &mock_client, &mock_network_client, diff --git a/src/maintenance.rs b/src/maintenance.rs index 595917bc4..9c16625d0 100644 --- a/src/maintenance.rs +++ b/src/maintenance.rs @@ -1,7 +1,7 @@ use color_eyre::{eyre::WrapErr, Result}; use std::sync::Arc; use tokio::sync::broadcast; -use tracing::{error, info}; +use tracing::{debug, error, info}; use crate::{ network::p2p::Client as P2pClient, @@ -42,15 +42,25 @@ pub async fn process_block( .await .wrap_err("Unable to get Kademlia map size")?; - if let Ok((multiaddr, ip)) = p2p_client.get_multiaddress_and_ip().await { - metrics.set_multiaddress(multiaddr).await; - metrics.set_ip(ip).await; + // Get last confirmed external multiaddress + if let Ok(multiaddrs) = p2p_client.get_multiaddress_and_ip().await { + debug!("Confirmed external multiaddresses: {:?}", multiaddrs); + if let Some(last_confirmed_ma) = multiaddrs.last() { + metrics + .set_multiaddress(last_confirmed_ma.to_string()) + .await; + } } let peers_num = p2p_client.count_dht_entries().await?; - let peers_num_metric = MetricValue::KadRoutingPeerNum(peers_num); + info!("Number of connected peers: {peers_num}"); + + let connected_peers = p2p_client.list_connected_peers().await?; + debug!("Connected peers: {:?}", connected_peers); + let peers_num_metric = MetricValue::ConnectedPeersNum(peers_num); metrics.record(peers_num_metric).await?; + metrics .record(MetricValue::BlockConfidenceTreshold( static_config_params.block_confidence_treshold, diff --git a/src/network/p2p.rs b/src/network/p2p.rs index 8f792aea8..28062602f 100644 --- a/src/network/p2p.rs +++ b/src/network/p2p.rs @@ -13,6 +13,7 @@ use tokio::sync::{ mpsc::{self}, oneshot, }; +use tracing::info; #[cfg(feature = "network-analysis")] pub mod analyzer; @@ -162,6 +163,8 @@ fn build_swarm( }) .build(); + info!("Local peerID: {}", swarm.local_peer_id()); + // Setting the mode this way disables automatic mode changes. // // Because the identify protocol doesn't allow us to change diff --git a/src/network/p2p/client.rs b/src/network/p2p/client.rs index 60e298d85..ce8b5b6b5 100644 --- a/src/network/p2p/client.rs +++ b/src/network/p2p/client.rs @@ -11,7 +11,6 @@ use kate_recovery::{ }; use libp2p::{ kad::{PeerRecord, Quorum, Record, RecordKey}, - multiaddr::Protocol, swarm::dial_opts::DialOpts, Multiaddr, PeerId, }; @@ -239,23 +238,50 @@ impl Command for PutKadRecord { fn abort(&mut self, _: Report) {} } -struct CountDHTPeers { +struct CountConnectedPeers { response_sender: Option>>, } -impl Command for CountDHTPeers { - fn run(&mut self, mut entries: EventLoopEntries) -> Result<()> { - let mut total_peers: usize = 0; - for bucket in entries.behavior_mut().kademlia.kbuckets() { - total_peers += bucket.num_entries(); - } +impl Command for CountConnectedPeers { + fn run(&mut self, entries: EventLoopEntries) -> Result<()> { + // send result back + // TODO: consider what to do if this results with None + self.response_sender + .take() + .unwrap() + .send(Ok(entries.swarm.network_info().num_peers())) + .expect("CountDHTPeers receiver dropped"); + Ok(()) + } + + fn abort(&mut self, error: Report) { + // TODO: consider what to do if this results with None + self.response_sender + .take() + .unwrap() + .send(Err(error)) + .expect("CountDHTPeers receiver dropped"); + } +} + +struct ListConnectedPeers { + response_sender: Option>>>, +} + +impl Command for ListConnectedPeers { + fn run(&mut self, entries: EventLoopEntries) -> Result<()> { + let connected_peer_list = entries + .swarm + .connected_peers() + .map(|peer_id| peer_id.to_string()) + .collect::>(); // send result back // TODO: consider what to do if this results with None self.response_sender .take() .unwrap() - .send(Ok(total_peers)) + .send(Ok(connected_peer_list)) .expect("CountDHTPeers receiver dropped"); Ok(()) } @@ -319,7 +345,7 @@ impl Command for GetCellsInDHTPerBlock { } struct GetMultiaddress { - response_sender: Option>>, + response_sender: Option>>>, } impl Command for GetMultiaddress { @@ -327,15 +353,15 @@ impl Command for GetMultiaddress { let last_address = entries .swarm() .external_addresses() - .last() - .ok_or_else(|| eyre!("The last_address should exist"))?; + .cloned() + .collect::>(); // send result back // TODO: consider what to do if this results with None self.response_sender .take() .unwrap() - .send(Ok(last_address.clone())) + .send(Ok(last_address)) .expect("GetMultiaddress receiver dropped"); Ok(()) } @@ -568,14 +594,23 @@ impl Client { pub async fn count_dht_entries(&self) -> Result { self.execute_sync(|response_sender| { - Box::new(CountDHTPeers { + Box::new(CountConnectedPeers { + response_sender: Some(response_sender), + }) + }) + .await + } + + pub async fn list_connected_peers(&self) -> Result> { + self.execute_sync(|response_sender| { + Box::new(ListConnectedPeers { response_sender: Some(response_sender), }) }) .await } - async fn get_multiaddress(&self) -> Result { + async fn get_multiaddress(&self) -> Result> { self.execute_sync(|response_sender| { Box::new(GetMultiaddress { response_sender: Some(response_sender), @@ -773,15 +808,14 @@ impl Client { self.insert_into_dht(records, block).await } - pub async fn get_multiaddress_and_ip(&self) -> Result<(String, String)> { - let addr = self.get_multiaddress().await?; - for protocol in &addr { - match protocol { - Protocol::Ip4(ip) => return Ok((addr.to_string(), ip.to_string())), - Protocol::Ip6(ip) => return Ok((addr.to_string(), ip.to_string())), - _ => continue, - } - } - Err(eyre!("No IP Address was present in Multiaddress")) + pub async fn get_multiaddress_and_ip(&self) -> Result> { + let addr = self + .get_multiaddress() + .await? + .into_iter() + .map(|addr| addr.to_string()) + .collect::>(); + + Ok(addr) } } diff --git a/src/network/p2p/event_loop.rs b/src/network/p2p/event_loop.rs index c2ba4ba1c..36d0db41e 100644 --- a/src/network/p2p/event_loop.rs +++ b/src/network/p2p/event_loop.rs @@ -404,20 +404,17 @@ impl EventLoop { }, SwarmEvent::Behaviour(BehaviourEvent::AutoNat(event)) => match event { autonat::Event::InboundProbe(e) => { - trace!("AutoNat Inbound Probe: {:#?}", e); + trace!("[AutoNat] Inbound Probe: {:#?}", e); }, autonat::Event::OutboundProbe(e) => { - trace!("AutoNat Outbound Probe: {:#?}", e); + trace!("[AutoNat] Outbound Probe: {:#?}", e); }, autonat::Event::StatusChanged { old, new } => { - debug!( - "AutoNat Old status: {:#?}. AutoNat New status: {:#?}", - old, new - ); + debug!("[AutoNat] Old status: {:#?}. New status: {:#?}", old, new); // check if went private or are private // if so, create reservation request with relay if new == NatStatus::Private || old == NatStatus::Private { - info!("Autonat says we're still private."); + info!("[AutoNat] Autonat says we're still private."); // Fat clients should always be in Kademlia client mode, no need to do NAT traversal if !self.event_loop_config.is_fat_client { // select a relay, try to dial it @@ -447,16 +444,16 @@ impl EventLoop { }, SwarmEvent::Behaviour(BehaviourEvent::Upnp(event)) => match event { upnp::Event::NewExternalAddr(addr) => { - info!("New external address: {addr}"); + trace!("[UPnP] New external address: {addr}"); }, upnp::Event::GatewayNotFound => { - trace!("Gateway does not support UPnP"); + trace!("[UPnP] Gateway does not support UPnP"); }, upnp::Event::NonRoutableGateway => { - trace!("Gateway is not exposed directly to the public Internet, i.e. it itself has a private IP address."); + trace!("[UPnP] Gateway is not exposed directly to the public Internet, i.e. it itself has a private IP address."); }, upnp::Event::ExpiredExternalAddr(addr) => { - trace!("Gateway address expired: {addr}"); + trace!("[UPnP] Gateway address expired: {addr}"); }, }, swarm_event => { @@ -484,6 +481,12 @@ impl EventLoop { SwarmEvent::IncomingConnectionError { .. } => { metrics.count(MetricCounter::IncomingConnectionError).await; }, + SwarmEvent::ExternalAddrConfirmed { address } => { + info!( + "External reachability confirmed on address: {}", + address.to_string() + ); + }, SwarmEvent::ConnectionEstablished { peer_id, .. } => { metrics.count(MetricCounter::ConnectionEstablished).await; // Notify the connections we're waiting on that we've connected successfully diff --git a/src/telemetry/mod.rs b/src/telemetry/mod.rs index 3e56d3b96..0c16eb4d0 100644 --- a/src/telemetry/mod.rs +++ b/src/telemetry/mod.rs @@ -67,7 +67,7 @@ pub enum MetricValue { RPCCallDuration(f64), DHTPutDuration(f64), DHTPutSuccess(f64), - KadRoutingPeerNum(usize), + ConnectedPeersNum(usize), HealthCheck(), BlockProcessingDelay(f64), PingLatency(f64), @@ -87,5 +87,4 @@ pub trait Metrics { async fn count(&self, counter: MetricCounter); async fn record(&self, value: MetricValue) -> Result<()>; async fn set_multiaddress(&self, multiaddr: String); - async fn set_ip(&self, ip: String); } diff --git a/src/telemetry/otlp.rs b/src/telemetry/otlp.rs index befc6dbe4..0995fad45 100644 --- a/src/telemetry/otlp.rs +++ b/src/telemetry/otlp.rs @@ -11,7 +11,7 @@ use tokio::sync::RwLock; use super::MetricCounter; -const ATTRIBUTE_NUMBER: usize = 9; +const ATTRIBUTE_NUMBER: usize = 8; #[derive(Debug)] pub struct Metrics { @@ -43,7 +43,6 @@ impl Metrics { "multiaddress", self.attributes.multiaddress.read().await.clone(), ), - KeyValue::new("ip", self.attributes.ip.read().await.clone()), KeyValue::new("avail_address", self.attributes.avail_address.clone()), KeyValue::new("partition_size", self.attributes.partition_size.clone()), KeyValue::new("operating_mode", self.attributes.operating_mode.clone()), @@ -74,11 +73,6 @@ impl Metrics { let mut m = self.attributes.multiaddress.write().await; *m = multiaddr; } - - async fn set_ip(&self, ip: String) { - let mut i = self.attributes.ip.write().await; - *i = ip; - } } #[async_trait] @@ -122,8 +116,8 @@ impl super::Metrics for Metrics { super::MetricValue::DHTPutSuccess(number) => { self.record_f64("dht_put_success", number).await?; }, - super::MetricValue::KadRoutingPeerNum(number) => { - self.record_u64("kad_routing_table_peer_num", number as u64) + super::MetricValue::ConnectedPeersNum(number) => { + self.record_u64("connected_peers_num", number as u64) .await?; }, super::MetricValue::HealthCheck() => { @@ -160,10 +154,6 @@ impl super::Metrics for Metrics { async fn set_multiaddress(&self, multiaddr: String) { self.set_multiaddress(multiaddr).await; } - - async fn set_ip(&self, ip: String) { - self.set_ip(ip).await; - } } pub fn initialize(endpoint: String, attributes: MetricAttributes) -> Result {