From 17fc73e7befde50daa0e59f96ba860ccdd4668e5 Mon Sep 17 00:00:00 2001 From: momosh Date: Tue, 20 Aug 2024 13:08:38 +0200 Subject: [PATCH] output events from event loop --- client/src/main.rs | 50 +++-------- core/src/network/p2p.rs | 73 ++++++++++++++-- core/src/network/p2p/event_loop.rs | 128 +++++++++++++++++++---------- crawler/src/main.rs | 66 ++++++--------- 4 files changed, 189 insertions(+), 128 deletions(-) diff --git a/client/src/main.rs b/client/src/main.rs index d3a8ddc1d..a74853108 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -32,7 +32,7 @@ use color_eyre::{ use kate_recovery::com::AppData; use kate_recovery::matrix::Partition; use std::{fs, path::Path, sync::Arc}; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::broadcast; use tracing::trace; use tracing::{error, info, span, warn, Level}; @@ -96,33 +96,19 @@ async fn run( .wrap_err("Unable to initialize OpenTelemetry service")?, ); - // Create sender channel for P2P event loop commands - let (p2p_event_loop_sender, p2p_event_loop_receiver) = mpsc::unbounded_channel(); - - let p2p_event_loop = p2p::EventLoop::new( + let (p2p_client, p2p_event_loop, _) = p2p::init( cfg.libp2p.clone(), + id_keys, version, &cfg.genesis_hash, - &id_keys, cfg.is_fat_client(), shutdown.clone(), #[cfg(feature = "kademlia-rocksdb")] db.inner(), - ); + ) + .await?; - spawn_in_span( - shutdown.with_cancel( - p2p_event_loop - .await - .run(ot_metrics.clone(), p2p_event_loop_receiver), - ), - ); - - let p2p_client = p2p::Client::new( - p2p_event_loop_sender, - cfg.libp2p.dht_parallelization_limit, - cfg.libp2p.kademlia.kad_record_ttl, - ); + spawn_in_span(shutdown.with_cancel(p2p_event_loop.run(ot_metrics.clone()))); // Start listening on provided port p2p_client @@ -381,33 +367,21 @@ async fn run_fat( .wrap_err("Unable to initialize OpenTelemetry service")?, ); - // Create sender channel for P2P event loop commands - let (p2p_event_loop_sender, p2p_event_loop_receiver) = mpsc::unbounded_channel(); - - let p2p_event_loop = p2p::EventLoop::new( + let (p2p_client, p2p_event_loop, _) = p2p::init( cfg.libp2p.clone(), + id_keys, version, &cfg.genesis_hash, - &id_keys, cfg.is_fat_client(), shutdown.clone(), #[cfg(feature = "kademlia-rocksdb")] db.inner(), - ); + ) + .await?; - spawn_in_span( - shutdown.with_cancel( - p2p_event_loop - .await - .run(ot_metrics.clone(), p2p_event_loop_receiver), - ), - ); + // Create sender channel for P2P event loop commands - let p2p_client = p2p::Client::new( - p2p_event_loop_sender, - cfg.libp2p.dht_parallelization_limit, - cfg.libp2p.kademlia.kad_record_ttl, - ); + spawn_in_span(shutdown.with_cancel(p2p_event_loop.run(ot_metrics.clone()))); // Start listening on provided port p2p_client diff --git a/core/src/network/p2p.rs b/core/src/network/p2p.rs index c5bf85d34..74c116738 100644 --- a/core/src/network/p2p.rs +++ b/core/src/network/p2p.rs @@ -3,8 +3,8 @@ use color_eyre::{eyre::WrapErr, Report, Result}; use configuration::LibP2PConfig; use libp2p::{ autonat, dcutr, identify, - identity::{self, ed25519}, - kad::{self, PeerRecord}, + identity::{self, ed25519, Keypair}, + kad::{self, Mode, PeerRecord, QueryStats}, mdns, noise, ping, relay, swarm::NetworkBehaviour, tcp, upnp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder, @@ -12,8 +12,8 @@ use libp2p::{ use multihash::{self, Hasher}; use semver::Version; use serde::{Deserialize, Serialize}; -use std::{fmt, net::Ipv4Addr, str::FromStr}; -use tokio::sync::oneshot; +use std::{fmt, net::Ipv4Addr, str::FromStr, sync::Arc, time::Duration}; +use tokio::sync::{broadcast, mpsc, oneshot}; use tracing::info; #[cfg(feature = "network-analysis")] pub mod analyzer; @@ -26,6 +26,7 @@ mod kad_mem_store; mod kad_rocksdb_store; use crate::{ data::{Database, P2PKeypairKey, RocksDB}, + shutdown::Controller, types::SecretKey, }; pub use client::Client; @@ -49,6 +50,33 @@ Bootstrap node list must not be empty. Either use a '--network' flag or add a list of bootstrap nodes in the configuration file. "#; +#[derive(Clone)] +pub enum OutputEvent { + IncomingGetRecord, + IncomingPutRecord, + KadModeChange(Mode), + PutRecordOk { + key: kad::RecordKey, + stats: QueryStats, + }, + PutRecordQuorumFailed { + key: kad::RecordKey, + stats: QueryStats, + }, + PutRecordTimeout { + key: kad::RecordKey, + stats: QueryStats, + }, + NatStatusPrivate, + Ping(Duration), + IncomingConnection, + IncomingConnectionError, + MultiaddressUpdate(Multiaddr), + EstablishedConnection, + OutgoingConnectionError, + Count, +} + #[derive(Clone)] struct AgentVersion { pub base_version: String, @@ -175,11 +203,46 @@ fn protocol_name(genesis_hash: &str) -> libp2p::StreamProtocol { .expect("Invalid Kademlia protocol name") } +pub async fn init( + cfg: LibP2PConfig, + id_keys: Keypair, + version: &str, + genesis_hash: &str, + is_fat: bool, + shutdown: Controller, + #[cfg(feature = "kademlia-rocksdb")] db: Arc, +) -> Result<(Client, EventLoop, broadcast::Receiver)> { + // create sender channel for P2P event loop commands + let (command_sender, command_receiver) = mpsc::unbounded_channel(); + // create P2P Client + let client = Client::new( + command_sender, + cfg.dht_parallelization_limit, + cfg.kademlia.kad_record_ttl, + ); + // create Store + let store = Store::with_config( + id_keys.public().to_peer_id(), + (&cfg).into(), + #[cfg(feature = "kademlia-rocksdb")] + db, + ); + // create Swarm + let swarm = build_swarm(&cfg, version, genesis_hash, &id_keys, store) + .await + .expect("Unable to build swarm."); + let (event_sender, event_receiver) = broadcast::channel(1000); + // create EventLoop + let event_loop = EventLoop::new(cfg, swarm, is_fat, command_receiver, event_sender, shutdown); + + Ok((client, event_loop, event_receiver)) +} + async fn build_swarm( cfg: &LibP2PConfig, version: &str, genesis_hash: &str, - id_keys: &libp2p::identity::Keypair, + id_keys: &Keypair, kad_store: Store, ) -> Result> { // create Identify Protocol Config diff --git a/core/src/network/p2p/event_loop.rs b/core/src/network/p2p/event_loop.rs index 3169a8a73..8b8f08d82 100644 --- a/core/src/network/p2p/event_loop.rs +++ b/core/src/network/p2p/event_loop.rs @@ -5,10 +5,9 @@ use libp2p::{ core::ConnectedPoint, dcutr, identify::{self, Info}, - identity::Keypair, kad::{ - self, store::RecordStore, BootstrapOk, GetRecordOk, InboundRequest, Mode, QueryId, - QueryResult, QueryStats, RecordKey, + self, store::RecordStore, BootstrapOk, GetRecordOk, InboundRequest, Mode, PutRecordOk, + QueryId, QueryResult, QueryStats, RecordKey, }, mdns, multiaddr::Protocol, @@ -22,14 +21,14 @@ use libp2p::{ use rand::seq::SliceRandom; use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration}; use tokio::{ - sync::{mpsc::UnboundedReceiver, oneshot}, + sync::{broadcast, mpsc::UnboundedReceiver, oneshot}, time::{self, interval_at, Instant, Interval}, }; use tracing::{debug, error, info, trace, warn}; use super::{ - build_swarm, client::BlockStat, configuration::LibP2PConfig, Behaviour, BehaviourEvent, - Command, QueryChannel, + client::BlockStat, configuration::LibP2PConfig, Behaviour, BehaviourEvent, Command, + OutputEvent, QueryChannel, }; use crate::{ network::p2p::{is_multiaddr_global, AgentVersion}, @@ -127,6 +126,8 @@ impl EventCounter { pub struct EventLoop { pub swarm: Swarm, + command_receiver: UnboundedReceiver, + event_sender: broadcast::Sender, // Tracking Kademlia events pub pending_kad_queries: HashMap, // Tracking swarm events (i.e. peer dialing) @@ -163,38 +164,23 @@ impl TryFrom for DHTKey { } } -#[cfg(not(feature = "kademlia-rocksdb"))] -type Store = super::kad_mem_store::MemoryStore; -#[cfg(feature = "kademlia-rocksdb")] -type Store = super::kad_rocksdb_store::RocksDBStore; - impl EventLoop { #[allow(clippy::too_many_arguments)] - pub async fn new( + pub(crate) fn new( cfg: LibP2PConfig, - version: &str, - genesis_hash: &str, - id_keys: &Keypair, + swarm: Swarm, is_fat_client: bool, + command_receiver: UnboundedReceiver, + event_sender: broadcast::Sender, shutdown: Controller, - #[cfg(feature = "kademlia-rocksdb")] db: Arc, ) -> Self { let bootstrap_interval = cfg.bootstrap_period; - let peer_id = id_keys.public().to_peer_id(); - let store = Store::with_config( - peer_id, - (&cfg).into(), - #[cfg(feature = "kademlia-rocksdb")] - db, - ); - - let swarm = build_swarm(&cfg, version, genesis_hash, id_keys, store) - .await - .expect("Unable to build swarm."); - let relay_nodes = cfg.relays.iter().map(Into::into).collect(); + Self { swarm, + command_receiver, + event_sender, pending_kad_queries: Default::default(), pending_swarm_events: Default::default(), relay: RelayState { @@ -217,11 +203,7 @@ impl EventLoop { } } - pub async fn run( - mut self, - metrics: Arc, - mut command_receiver: UnboundedReceiver, - ) { + pub async fn run(mut self, metrics: Arc) { // shutdown will wait as long as this token is not dropped let _delay_token = self .shutdown @@ -235,9 +217,12 @@ impl EventLoop { event = self.swarm.next() => { self.handle_event(event.expect("Swarm stream should be infinite"), metrics.clone()).await; event_counter.add_event(); + // TODO: remove from the loop metrics.count(MetricCounter::EventLoopEvent).await; + + _ = self.event_sender.send(OutputEvent::Count); }, - command = command_receiver.recv() => match command { + command = self.command_receiver.recv() => match command { Some(c) => _ = (c)(&mut self), // None => { @@ -298,10 +283,15 @@ impl EventLoop { }, kad::Event::InboundRequest { request } => match request { InboundRequest::GetRecord { .. } => { + _ = self.event_sender.send(OutputEvent::IncomingGetRecord); + // TODO: move out from the loop metrics.count(MetricCounter::IncomingGetRecord).await; }, InboundRequest::PutRecord { source, record, .. } => { + _ = self.event_sender.send(OutputEvent::IncomingPutRecord); + // TODO: move out from the loop metrics.count(MetricCounter::IncomingPutRecord).await; + match record { Some(mut record) => { let ttl = &self.event_loop_config.kad_record_ttl; @@ -323,7 +313,10 @@ impl EventLoop { trace!("Kademlia mode changed: {new_mode:?}"); // This event should not be automatically triggered because the mode changes are handled explicitly through the LC logic self.kad_mode = new_mode; - metrics.update_operating_mode(new_mode).await + // TODO: move out from the loop + metrics.update_operating_mode(new_mode).await; + + _ = self.event_sender.send(OutputEvent::KadModeChange(new_mode)); }, kad::Event::OutboundQueryProgressed { id, result, stats, .. @@ -352,20 +345,45 @@ impl EventLoop { match error { kad::PutRecordError::QuorumFailed { key, .. } => { - self.handle_put_result(key, stats, true, metrics).await; + // TODO: move out from the loop + self.handle_put_result( + key.clone(), + stats.clone(), + true, + metrics, + ) + .await; + + _ = self + .event_sender + .send(OutputEvent::PutRecordQuorumFailed { key, stats }); }, kad::PutRecordError::Timeout { key, .. } => { - self.handle_put_result(key, stats, true, metrics).await; + // TODO: move out from the loop + self.handle_put_result( + key.clone(), + stats.clone(), + true, + metrics, + ) + .await; + + _ = self + .event_sender + .send(OutputEvent::PutRecordTimeout { key, stats }) }, } }, - QueryResult::PutRecord(Ok(record)) => { - if self.pending_kad_queries.remove(&id).is_none() { - return; - }; - self.handle_put_result(record.key.clone(), stats, false, metrics) + QueryResult::PutRecord(Ok(PutRecordOk { key })) => { + _ = self.pending_kad_queries.remove(&id); + + // TODO: move out from the loop + self.handle_put_result(key.clone(), stats.clone(), false, metrics) .await; + _ = self + .event_sender + .send(OutputEvent::PutRecordOk { key, stats }); }, QueryResult::Bootstrap(result) => match result { Ok(BootstrapOk { @@ -490,10 +508,13 @@ impl EventLoop { if new == NatStatus::Private || old == NatStatus::Private { info!("[AutoNat] Autonat says we're still private."); // Fat clients should always be in Kademlia client mode, no need to do NAT traversal + // TODO: remove from the loop if !self.event_loop_config.is_fat_client { // select a relay, try to dial it self.select_and_dial_relay(); } + + _ = self.event_sender.send(OutputEvent::NatStatusPrivate); }; }, }, @@ -511,9 +532,12 @@ impl EventLoop { }, SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event { result, .. })) => { if let Ok(rtt) = result { + // TODO: remove from the loop let _ = metrics .record(MetricValue::DHTPingLatency(rtt.as_millis() as f64)) .await; + + _ = self.event_sender.send(OutputEvent::Ping(rtt)); } }, SwarmEvent::Behaviour(BehaviourEvent::Upnp(event)) => match event { @@ -545,10 +569,16 @@ impl EventLoop { trace!("Connection closed. PeerID: {peer_id:?}. Address: {:?}. Num established: {num_established:?}. Cause: {cause:?}", endpoint.get_remote_address()); }, SwarmEvent::IncomingConnection { .. } => { + // TODO: remove from the loop metrics.count(MetricCounter::IncomingConnections).await; + + _ = self.event_sender.send(OutputEvent::IncomingConnection); }, SwarmEvent::IncomingConnectionError { .. } => { + // TODO: remove from the loop metrics.count(MetricCounter::IncomingConnectionErrors).await; + + _ = self.event_sender.send(OutputEvent::IncomingConnectionError); }, SwarmEvent::ExternalAddrConfirmed { address } => { info!( @@ -561,7 +591,12 @@ impl EventLoop { address.to_string() ); }; - metrics.update_multiaddress(address).await; + // TODO: remove from the loop + metrics.update_multiaddress(address.clone()).await; + + _ = self + .event_sender + .send(OutputEvent::MultiaddressUpdate(address)); }, SwarmEvent::ConnectionEstablished { peer_id, @@ -570,7 +605,11 @@ impl EventLoop { num_established, .. } => { + // TODO: remove from the loop metrics.count(MetricCounter::EstablishedConnections).await; + + _ = self.event_sender.send(OutputEvent::EstablishedConnection); + // Notify the connections we're waiting on that we've connected successfully if let Some(ch) = self.pending_swarm_events.remove(&peer_id) { _ = ch.send(Ok(ConnectionEstablishedInfo { @@ -583,8 +622,11 @@ impl EventLoop { self.establish_relay_circuit(peer_id); }, SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { + // TODO: remove from the loop metrics.count(MetricCounter::OutgoingConnectionErrors).await; + _ = self.event_sender.send(OutputEvent::OutgoingConnectionError); + if let Some(peer_id) = peer_id { // Notify the connections we're waiting on an error has occurred if let libp2p::swarm::DialError::WrongPeerId { .. } = &error { diff --git a/crawler/src/main.rs b/crawler/src/main.rs index e6179e272..ad4740e86 100644 --- a/crawler/src/main.rs +++ b/crawler/src/main.rs @@ -14,7 +14,7 @@ use color_eyre::{ }; use config::{Config, ENTIRE_BLOCK}; use std::{fs, path::Path, sync::Arc}; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::broadcast; use tracing::{info, span, warn, Level}; mod config; @@ -57,13 +57,13 @@ pub async fn main() -> Result<()> { Ok(()) } -async fn run(config: Config, db: RocksDB, shutdown: Controller) -> Result<()> { +async fn run(cfg: Config, db: RocksDB, shutdown: Controller) -> Result<()> { let version = clap::crate_version!(); info!("Running Avail Light Client Crawler v{version}"); - info!("Using configuration: {config:?}"); + info!("Using configuration: {cfg:?}"); - let (p2p_keypair, p2p_peer_id) = p2p::identity(&config.libp2p, db.clone())?; - let partition = config.crawl_block_matrix_partition.unwrap_or(ENTIRE_BLOCK); + let (p2p_keypair, p2p_peer_id) = p2p::identity(&cfg.libp2p, db.clone())?; + let partition = cfg.crawl_block_matrix_partition.unwrap_or(ENTIRE_BLOCK); let partition_size = format!("{}/{}", partition.number, partition.fraction); let metric_attributes = vec![ @@ -71,69 +71,51 @@ async fn run(config: Config, db: RocksDB, shutdown: Controller) -> Resul ("version", version.to_string()), ("peerID", p2p_peer_id.to_string()), ("partition_size", partition_size), - ("network", Network::name(&config.genesis_hash)), - ("client_alias", config.client_alias), + ("network", Network::name(&cfg.genesis_hash)), + ("client_alias", cfg.client_alias), ]; let ot_metrics = Arc::new( otlp::initialize( metric_attributes, - &config.origin, + &cfg.origin, &KademliaMode::Client.into(), - config.otel.clone(), + cfg.otel.clone(), ) .wrap_err("Unable to initialize OpenTelemetry service")?, ); - let (p2p_event_loop_sender, p2p_event_loop_receiver) = mpsc::unbounded_channel(); - - let p2p_event_loop = p2p::EventLoop::new( - config.libp2p.clone(), + let (p2p_client, p2p_event_loop, _) = p2p::init( + cfg.libp2p.clone(), + p2p_keypair, version, - &config.genesis_hash, - &p2p_keypair, + &cfg.genesis_hash, true, shutdown.clone(), db.inner(), - ); - - spawn_in_span( - shutdown.with_cancel( - p2p_event_loop - .await - .run(ot_metrics.clone(), p2p_event_loop_receiver), - ), - ); + ) + .await?; - let p2p_client = p2p::Client::new( - p2p_event_loop_sender, - config.libp2p.dht_parallelization_limit, - config.libp2p.kademlia.kad_record_ttl, - ); + spawn_in_span(shutdown.with_cancel(p2p_event_loop.run(ot_metrics.clone()))); p2p_client - .start_listening(config.libp2p.multiaddress()) + .start_listening(cfg.libp2p.multiaddress()) .await .wrap_err("Listening on TCP not to fail.")?; - info!("TCP listener started on port {}", config.libp2p.port); + info!("TCP listener started on port {}", cfg.libp2p.port); let bootstrap_p2p_client = p2p_client.clone(); spawn_in_span(shutdown.with_cancel(async move { info!("Bootstraping the DHT with bootstrap nodes..."); - let bootstraps = &config.libp2p.bootstraps; + let bootstraps = &cfg.libp2p.bootstraps; match bootstrap_p2p_client.bootstrap_on_startup(bootstraps).await { Ok(()) => info!("Bootstrap done."), Err(e) => warn!("Bootstrap error: {e:?}."), } })); - let (_, rpc_events, rpc_subscriptions) = rpc::init( - db.clone(), - &config.genesis_hash, - &config.rpc, - shutdown.clone(), - ) - .await?; + let (_, rpc_events, rpc_subscriptions) = + rpc::init(db.clone(), &cfg.genesis_hash, &cfg.rpc, shutdown.clone()).await?; let first_header_rpc_event_receiver = rpc_events.subscribe(); let client_rpc_event_receiver = rpc_events.subscribe(); @@ -167,7 +149,7 @@ async fn run(config: Config, db: RocksDB, shutdown: Controller) -> Resul let (block_tx, block_rx) = broadcast::channel::(1 << 7); spawn_in_span(shutdown.with_cancel(maintenance::run( - config.otel.ot_flush_block_interval, + cfg.otel.ot_flush_block_interval, block_rx, shutdown.clone(), ot_metrics.clone(), @@ -176,9 +158,9 @@ async fn run(config: Config, db: RocksDB, shutdown: Controller) -> Resul let crawler = spawn_in_span(shutdown.with_cancel(crawl_client::run( client_rpc_event_receiver, p2p_client.clone(), - config.crawl_block_delay, + cfg.crawl_block_delay, ot_metrics.clone(), - config.crawl_block_mode, + cfg.crawl_block_mode, partition, block_tx, )));