Skip to content

Commit

Permalink
Improved p2p level logging (#467)
Browse files Browse the repository at this point in the history
  • Loading branch information
sh3ll3x3c authored Mar 1, 2024
1 parent ed094bb commit c8c015d
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 59 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "avail-light"
version = "1.7.8"
version = "1.8.0"
authors = ["Avail Team"]
default-run = "avail-light"
edition = "2021"
Expand Down
1 change: 0 additions & 1 deletion src/light_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,6 @@ mod tests {
mock_metrics.expect_count().returning(|_| ());
mock_metrics.expect_record().returning(|_| Ok(()));
mock_metrics.expect_set_multiaddress().returning(|_| ());
mock_metrics.expect_set_ip().returning(|_| ());
process_block(
&mock_client,
&mock_network_client,
Expand Down
20 changes: 15 additions & 5 deletions src/maintenance.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use color_eyre::{eyre::WrapErr, Result};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{error, info};
use tracing::{debug, error, info};

use crate::{
network::p2p::Client as P2pClient,
Expand Down Expand Up @@ -42,15 +42,25 @@ pub async fn process_block(
.await
.wrap_err("Unable to get Kademlia map size")?;

if let Ok((multiaddr, ip)) = p2p_client.get_multiaddress_and_ip().await {
metrics.set_multiaddress(multiaddr).await;
metrics.set_ip(ip).await;
// Get last confirmed external multiaddress
if let Ok(multiaddrs) = p2p_client.get_multiaddress_and_ip().await {
debug!("Confirmed external multiaddresses: {:?}", multiaddrs);
if let Some(last_confirmed_ma) = multiaddrs.last() {
metrics
.set_multiaddress(last_confirmed_ma.to_string())
.await;
}
}

let peers_num = p2p_client.count_dht_entries().await?;
let peers_num_metric = MetricValue::KadRoutingPeerNum(peers_num);
info!("Number of connected peers: {peers_num}");

let connected_peers = p2p_client.list_connected_peers().await?;
debug!("Connected peers: {:?}", connected_peers);

let peers_num_metric = MetricValue::ConnectedPeersNum(peers_num);
metrics.record(peers_num_metric).await?;

metrics
.record(MetricValue::BlockConfidenceTreshold(
static_config_params.block_confidence_treshold,
Expand Down
3 changes: 3 additions & 0 deletions src/network/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tokio::sync::{
mpsc::{self},
oneshot,
};
use tracing::info;

#[cfg(feature = "network-analysis")]
pub mod analyzer;
Expand Down Expand Up @@ -162,6 +163,8 @@ fn build_swarm(
})
.build();

info!("Local peerID: {}", swarm.local_peer_id());

// Setting the mode this way disables automatic mode changes.
//
// Because the identify protocol doesn't allow us to change
Expand Down
84 changes: 59 additions & 25 deletions src/network/p2p/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use kate_recovery::{
};
use libp2p::{
kad::{PeerRecord, Quorum, Record, RecordKey},
multiaddr::Protocol,
swarm::dial_opts::DialOpts,
Multiaddr, PeerId,
};
Expand Down Expand Up @@ -239,23 +238,50 @@ impl Command for PutKadRecord {
fn abort(&mut self, _: Report) {}
}

struct CountDHTPeers {
struct CountConnectedPeers {
response_sender: Option<oneshot::Sender<Result<usize>>>,
}

impl Command for CountDHTPeers {
fn run(&mut self, mut entries: EventLoopEntries) -> Result<()> {
let mut total_peers: usize = 0;
for bucket in entries.behavior_mut().kademlia.kbuckets() {
total_peers += bucket.num_entries();
}
impl Command for CountConnectedPeers {
fn run(&mut self, entries: EventLoopEntries) -> Result<()> {
// send result back
// TODO: consider what to do if this results with None
self.response_sender
.take()
.unwrap()
.send(Ok(entries.swarm.network_info().num_peers()))
.expect("CountDHTPeers receiver dropped");
Ok(())
}

fn abort(&mut self, error: Report) {
// TODO: consider what to do if this results with None
self.response_sender
.take()
.unwrap()
.send(Err(error))
.expect("CountDHTPeers receiver dropped");
}
}

struct ListConnectedPeers {
response_sender: Option<oneshot::Sender<Result<Vec<String>>>>,
}

impl Command for ListConnectedPeers {
fn run(&mut self, entries: EventLoopEntries) -> Result<()> {
let connected_peer_list = entries
.swarm
.connected_peers()
.map(|peer_id| peer_id.to_string())
.collect::<Vec<_>>();

// send result back
// TODO: consider what to do if this results with None
self.response_sender
.take()
.unwrap()
.send(Ok(total_peers))
.send(Ok(connected_peer_list))
.expect("CountDHTPeers receiver dropped");
Ok(())
}
Expand Down Expand Up @@ -319,23 +345,23 @@ impl Command for GetCellsInDHTPerBlock {
}

struct GetMultiaddress {
response_sender: Option<oneshot::Sender<Result<Multiaddr>>>,
response_sender: Option<oneshot::Sender<Result<Vec<Multiaddr>>>>,
}

impl Command for GetMultiaddress {
fn run(&mut self, mut entries: EventLoopEntries) -> Result<()> {
let last_address = entries
.swarm()
.external_addresses()
.last()
.ok_or_else(|| eyre!("The last_address should exist"))?;
.cloned()
.collect::<Vec<_>>();

// send result back
// TODO: consider what to do if this results with None
self.response_sender
.take()
.unwrap()
.send(Ok(last_address.clone()))
.send(Ok(last_address))
.expect("GetMultiaddress receiver dropped");
Ok(())
}
Expand Down Expand Up @@ -568,14 +594,23 @@ impl Client {

pub async fn count_dht_entries(&self) -> Result<usize> {
self.execute_sync(|response_sender| {
Box::new(CountDHTPeers {
Box::new(CountConnectedPeers {
response_sender: Some(response_sender),
})
})
.await
}

pub async fn list_connected_peers(&self) -> Result<Vec<String>> {
self.execute_sync(|response_sender| {
Box::new(ListConnectedPeers {
response_sender: Some(response_sender),
})
})
.await
}

async fn get_multiaddress(&self) -> Result<Multiaddr> {
async fn get_multiaddress(&self) -> Result<Vec<Multiaddr>> {
self.execute_sync(|response_sender| {
Box::new(GetMultiaddress {
response_sender: Some(response_sender),
Expand Down Expand Up @@ -773,15 +808,14 @@ impl Client {
self.insert_into_dht(records, block).await
}

pub async fn get_multiaddress_and_ip(&self) -> Result<(String, String)> {
let addr = self.get_multiaddress().await?;
for protocol in &addr {
match protocol {
Protocol::Ip4(ip) => return Ok((addr.to_string(), ip.to_string())),
Protocol::Ip6(ip) => return Ok((addr.to_string(), ip.to_string())),
_ => continue,
}
}
Err(eyre!("No IP Address was present in Multiaddress"))
pub async fn get_multiaddress_and_ip(&self) -> Result<Vec<String>> {
let addr = self
.get_multiaddress()
.await?
.into_iter()
.map(|addr| addr.to_string())
.collect::<Vec<_>>();

Ok(addr)
}
}
25 changes: 14 additions & 11 deletions src/network/p2p/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,20 +404,17 @@ impl EventLoop {
},
SwarmEvent::Behaviour(BehaviourEvent::AutoNat(event)) => match event {
autonat::Event::InboundProbe(e) => {
trace!("AutoNat Inbound Probe: {:#?}", e);
trace!("[AutoNat] Inbound Probe: {:#?}", e);
},
autonat::Event::OutboundProbe(e) => {
trace!("AutoNat Outbound Probe: {:#?}", e);
trace!("[AutoNat] Outbound Probe: {:#?}", e);
},
autonat::Event::StatusChanged { old, new } => {
debug!(
"AutoNat Old status: {:#?}. AutoNat New status: {:#?}",
old, new
);
debug!("[AutoNat] Old status: {:#?}. New status: {:#?}", old, new);
// check if went private or are private
// if so, create reservation request with relay
if new == NatStatus::Private || old == NatStatus::Private {
info!("Autonat says we're still private.");
info!("[AutoNat] Autonat says we're still private.");
// Fat clients should always be in Kademlia client mode, no need to do NAT traversal
if !self.event_loop_config.is_fat_client {
// select a relay, try to dial it
Expand Down Expand Up @@ -447,16 +444,16 @@ impl EventLoop {
},
SwarmEvent::Behaviour(BehaviourEvent::Upnp(event)) => match event {
upnp::Event::NewExternalAddr(addr) => {
info!("New external address: {addr}");
trace!("[UPnP] New external address: {addr}");
},
upnp::Event::GatewayNotFound => {
trace!("Gateway does not support UPnP");
trace!("[UPnP] Gateway does not support UPnP");
},
upnp::Event::NonRoutableGateway => {
trace!("Gateway is not exposed directly to the public Internet, i.e. it itself has a private IP address.");
trace!("[UPnP] Gateway is not exposed directly to the public Internet, i.e. it itself has a private IP address.");
},
upnp::Event::ExpiredExternalAddr(addr) => {
trace!("Gateway address expired: {addr}");
trace!("[UPnP] Gateway address expired: {addr}");
},
},
swarm_event => {
Expand Down Expand Up @@ -484,6 +481,12 @@ impl EventLoop {
SwarmEvent::IncomingConnectionError { .. } => {
metrics.count(MetricCounter::IncomingConnectionError).await;
},
SwarmEvent::ExternalAddrConfirmed { address } => {
info!(
"External reachability confirmed on address: {}",
address.to_string()
);
},
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
metrics.count(MetricCounter::ConnectionEstablished).await;
// Notify the connections we're waiting on that we've connected successfully
Expand Down
3 changes: 1 addition & 2 deletions src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub enum MetricValue {
RPCCallDuration(f64),
DHTPutDuration(f64),
DHTPutSuccess(f64),
KadRoutingPeerNum(usize),
ConnectedPeersNum(usize),
HealthCheck(),
BlockProcessingDelay(f64),
PingLatency(f64),
Expand All @@ -87,5 +87,4 @@ pub trait Metrics {
async fn count(&self, counter: MetricCounter);
async fn record(&self, value: MetricValue) -> Result<()>;
async fn set_multiaddress(&self, multiaddr: String);
async fn set_ip(&self, ip: String);
}
16 changes: 3 additions & 13 deletions src/telemetry/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::sync::RwLock;

use super::MetricCounter;

const ATTRIBUTE_NUMBER: usize = 9;
const ATTRIBUTE_NUMBER: usize = 8;

#[derive(Debug)]
pub struct Metrics {
Expand Down Expand Up @@ -43,7 +43,6 @@ impl Metrics {
"multiaddress",
self.attributes.multiaddress.read().await.clone(),
),
KeyValue::new("ip", self.attributes.ip.read().await.clone()),
KeyValue::new("avail_address", self.attributes.avail_address.clone()),
KeyValue::new("partition_size", self.attributes.partition_size.clone()),
KeyValue::new("operating_mode", self.attributes.operating_mode.clone()),
Expand Down Expand Up @@ -74,11 +73,6 @@ impl Metrics {
let mut m = self.attributes.multiaddress.write().await;
*m = multiaddr;
}

async fn set_ip(&self, ip: String) {
let mut i = self.attributes.ip.write().await;
*i = ip;
}
}

#[async_trait]
Expand Down Expand Up @@ -122,8 +116,8 @@ impl super::Metrics for Metrics {
super::MetricValue::DHTPutSuccess(number) => {
self.record_f64("dht_put_success", number).await?;
},
super::MetricValue::KadRoutingPeerNum(number) => {
self.record_u64("kad_routing_table_peer_num", number as u64)
super::MetricValue::ConnectedPeersNum(number) => {
self.record_u64("connected_peers_num", number as u64)
.await?;
},
super::MetricValue::HealthCheck() => {
Expand Down Expand Up @@ -160,10 +154,6 @@ impl super::Metrics for Metrics {
async fn set_multiaddress(&self, multiaddr: String) {
self.set_multiaddress(multiaddr).await;
}

async fn set_ip(&self, ip: String) {
self.set_ip(ip).await;
}
}

pub fn initialize(endpoint: String, attributes: MetricAttributes) -> Result<Metrics> {
Expand Down

0 comments on commit c8c015d

Please sign in to comment.