diff --git a/src/network/event_loop.rs b/src/network/event_loop.rs index 06e5264d2..eae3d617b 100644 --- a/src/network/event_loop.rs +++ b/src/network/event_loop.rs @@ -1,11 +1,14 @@ -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; use anyhow::Result; use async_std::stream::StreamExt; use itertools::Either; use rand::seq::SliceRandom; use std::str; -use tokio::sync::{mpsc, oneshot}; +use tokio::{ + sync::{mpsc, oneshot}, + time::{interval_at, Instant, Interval}, +}; use void::Void; use super::{ @@ -53,10 +56,43 @@ enum QueryChannel { Bootstrap(oneshot::Sender>), } -struct RelayReservation { +// RelayState keeps track of all things relay related +struct RelayState { + // id of the selected Relay that needs to be connected id: PeerId, + // Multiaddress of the selected Relay that needs to be connected address: Multiaddr, - is_reserved: bool, + // boolean value that signals if have established a circuit with the selected Relay + is_circuit_established: bool, + // list of available Relay nodes + nodes: Vec<(PeerId, Multiaddr)>, +} + +impl RelayState { + fn reset(&mut self) { + self.id = PeerId::random(); + self.address = Multiaddr::empty(); + self.is_circuit_established = false; + } + + fn select_random(&mut self) { + // choose relay by random + if let Some(relay) = self.nodes.choose(&mut rand::thread_rng()) { + let (id, addr) = relay.clone(); + // appoint this relay as our chosen one + self.id = id; + self.address = addr; + } + } +} + +// BootstrapState keeps track of all things bootstrap related +struct BootstrapState { + // referring to the initial bootstrap process, + // one that runs when the Light Client node starts up + is_startup_done: bool, + // timer that is responsible for firing periodic bootstraps + timer: Interval, } enum QueryState { @@ -75,8 +111,8 @@ pub struct EventLoop { pending_batch_complete: Option, metrics: Metrics, avail_metrics: AvailMetrics, - relay_nodes: Vec<(PeerId, Multiaddr)>, - relay_reservation: RelayReservation, + relay: RelayState, + bootstrap: BootstrapState, kad_remove_local_record: bool, } @@ -105,6 +141,7 @@ impl EventLoop { metrics: Metrics, avail_metrics: AvailMetrics, relay_nodes: Vec<(PeerId, Multiaddr)>, + bootstrap_interval: Duration, kad_remove_local_record: bool, ) -> Self { Self { @@ -117,11 +154,15 @@ impl EventLoop { pending_batch_complete: None, metrics, avail_metrics, - relay_nodes, - relay_reservation: RelayReservation { + relay: RelayState { id: PeerId::random(), address: Multiaddr::empty(), - is_reserved: Default::default(), + is_circuit_established: false, + nodes: relay_nodes, + }, + bootstrap: BootstrapState { + is_startup_done: false, + timer: interval_at(Instant::now() + bootstrap_interval, bootstrap_interval), }, kad_remove_local_record, } @@ -131,7 +172,12 @@ impl EventLoop { loop { tokio::select! { event = self.swarm.next() => self.handle_event(event.expect("Swarm stream should be infinite")).await, - Some(command) = self.command_receiver.recv() => self.handle_command(command).await, + command = self.command_receiver.recv() => match command { + Some(c) => self.handle_command(c), + // Command channel closed, thus shutting down the network event loop. + None => return, + }, + _ = self.bootstrap.timer.tick() => self.handle_periodic_bootstraps(), } } } @@ -252,6 +298,8 @@ impl EventLoop { self.pending_kad_queries.remove(&id) { _ = ch.send(Ok(())); + // we can say that the startup bootstrap is done here + self.bootstrap.is_startup_done = true; } } }, @@ -269,7 +317,7 @@ impl EventLoop { } }, SwarmEvent::Behaviour(BehaviourEvent::Identify(event)) => { - // record Indetify Behaviour events + // record Identify Behaviour events self.metrics.record(&event); match event { @@ -278,54 +326,32 @@ impl EventLoop { info: Info { listen_addrs, .. }, } => { debug!("Identity Received from: {peer_id:?} on listen address: {listen_addrs:?}"); - // before we try and do a reservation with the relay - // we have to exchange observed addresses - // in this case relay needs to tell us our own - if peer_id == self.relay_reservation.id - && !self.relay_reservation.is_reserved - { - match self.swarm.listen_on( - self.relay_reservation - .address - .clone() - .with(Protocol::P2p(peer_id.into())) - .with(Protocol::P2pCircuit), - ) { - Ok(_) => { - self.relay_reservation.is_reserved = true; - }, - Err(e) => { - error!("Local node failed to listen on relay address. Error: {e:#?}"); - }, - } - } + self.establish_relay_circuit(peer_id); // only interested in addresses with actual Multiaddresses // ones that contains the 'p2p' tag - let addrs = listen_addrs + listen_addrs .into_iter() .filter(|a| a.to_string().contains(Protocol::P2p(peer_id.into()).tag())) - .collect::>(); - - for addr in addrs { - self.swarm - .behaviour_mut() - .kademlia - .add_address(&peer_id, addr.clone()); - - // if address contains relay circuit tag, - // dial that address for immediate Direct Connection Upgrade - if *self.swarm.local_peer_id() != peer_id - && addr.to_string().contains(Protocol::P2pCircuit.tag()) - { - _ = self.swarm.dial( - DialOpts::peer_id(peer_id) - .condition(PeerCondition::Disconnected) - .addresses(vec![addr.with(Protocol::P2pCircuit)]) - .build(), - ); - } - } + .for_each(|a| { + self.swarm + .behaviour_mut() + .kademlia + .add_address(&peer_id, a.clone()); + + // if address contains relay circuit tag, + // dial that address for immediate Direct Connection Upgrade + if *self.swarm.local_peer_id() != peer_id + && a.to_string().contains(Protocol::P2pCircuit.tag()) + { + _ = self.swarm.dial( + DialOpts::peer_id(peer_id) + .condition(PeerCondition::Disconnected) + .addresses(vec![a.with(Protocol::P2pCircuit)]) + .build(), + ); + } + }); }, IdentifyEvent::Sent { peer_id } => { debug!("Identity Sent event to: {peer_id:?}"); @@ -377,12 +403,8 @@ impl EventLoop { // if so, create reservation request with relay if new == NatStatus::Private || old == NatStatus::Private { info!("Autonat says we're still private."); - // choose relay by random - if let Some(relay) = self.relay_nodes.choose(&mut rand::thread_rng()) { - let (relay_id, relay_addr) = relay.clone(); - // appoint this relay as our chosen one - self.relay_reservation(relay_id, relay_addr); - } + // select a relay, try to dial it + self.select_and_dial_relay(); }; }, }, @@ -426,7 +448,7 @@ impl EventLoop { num_established, cause, } => { - trace!("Connection closed. PeerID: {peer_id:?}. Address: {:?}. Num establ: {num_established:?}. Cause: {cause:?}", endpoint.get_remote_address()); + trace!("Connection closed. PeerID: {peer_id:?}. Address: {:?}. Num established: {num_established:?}. Cause: {cause:?}", endpoint.get_remote_address()); if let Some(cause) = cause { match cause { @@ -467,9 +489,9 @@ impl EventLoop { if let Some(peer_id) = peer_id { trace!("Error produced by peer with PeerId: {peer_id:?}"); // if the peer giving us problems is the chosen relay - // just remove it by reseting the reservatin state slot - if self.relay_reservation.id == peer_id { - self.reset_relay_reservation(); + // just remove it by resetting the reservation state slot + if self.relay.id == peer_id { + self.relay.reset(); } } }, @@ -480,7 +502,7 @@ impl EventLoop { } } - async fn handle_command(&mut self, command: Command) { + fn handle_command(&mut self, command: Command) { match command { Command::StartListening { addr, sender } => { _ = match self.swarm.listen_on(addr) { @@ -554,7 +576,7 @@ impl EventLoop { } fn dump_hash_map_block_stats(&mut self) { - let mut occurence_map = HashMap::new(); + let mut occurrence_map = HashMap::new(); for record in self .swarm @@ -571,10 +593,10 @@ impl EventLoop { .split_once(':') .expect("unable to split the key string"); - let count = occurence_map.entry(block_num.to_string()).or_insert(0); + let count = occurrence_map.entry(block_num.to_string()).or_insert(0); *count += 1; } - let mut sorted: Vec<(&String, &i32)> = occurence_map.iter().collect(); + let mut sorted: Vec<(&String, &i32)> = occurrence_map.iter().collect(); sorted.sort_by(|a, b| a.0.cmp(b.0)); for (block_number, cell_count) in sorted { trace!( @@ -612,19 +634,63 @@ impl EventLoop { )); } - fn reset_relay_reservation(&mut self) { - self.relay_reservation = RelayReservation { - id: PeerId::random(), - address: Multiaddr::empty(), - is_reserved: false, - }; + fn handle_periodic_bootstraps(&mut self) { + // commence with periodic bootstraps, + // only when the initial startup bootstrap is done + if self.bootstrap.is_startup_done { + _ = self.swarm.behaviour_mut().kademlia.bootstrap(); + } } - fn relay_reservation(&mut self, id: PeerId, address: Multiaddr) { - self.relay_reservation = RelayReservation { - id, - address, - is_reserved: false, - }; + fn establish_relay_circuit(&mut self, peer_id: PeerId) { + // before we try and create a circuit with the relay + // we have to exchange observed addresses + // in this case we're waiting on relay to tell us our own + if peer_id == self.relay.id && !self.relay.is_circuit_established { + match self.swarm.listen_on( + self.relay + .address + .clone() + .with(Protocol::P2p(peer_id.into())) + .with(Protocol::P2pCircuit), + ) { + Ok(_) => { + info!("Relay circuit established with relay: {peer_id:?}"); + self.relay.is_circuit_established = true; + }, + Err(e) => { + // failed to establish a circuit, reset to try another relay + self.relay.reset(); + error!("Local node failed to listen on relay address. Error: {e:#?}"); + }, + } + } + } + + fn select_and_dial_relay(&mut self) { + // select a random relay from the list of known ones + self.relay.select_random(); + + // dial selected relay, + // so we don't wait on swarm to do it eventually + match self.swarm.dial( + DialOpts::peer_id(self.relay.id) + .condition(PeerCondition::NotDialing) + .addresses(vec![self.relay.address.clone()]) + .build(), + ) { + Ok(_) => { + info!("Dialing Relay: {id:?} succeeded.", id = self.relay.id); + }, + Err(e) => { + // got an error while dialing, + // better select a new relay and try again + self.relay.reset(); + error!( + "Dialing Relay: {id:?}, produced an error: {e:?}", + id = self.relay.id + ); + }, + } } } diff --git a/src/network/mod.rs b/src/network/mod.rs index 7c291d66b..cfc0dec30 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -185,6 +185,7 @@ pub fn init( metrics, avail_metrics, cfg.relays, + cfg.bootstrap_interval, kad_remove_local_record, ), )) diff --git a/src/types.rs b/src/types.rs index 2fd83a4ff..2ffcf9c61 100644 --- a/src/types.rs +++ b/src/types.rs @@ -215,6 +215,8 @@ pub struct RuntimeConfig { pub identify_agent: String, /// Vector of Light Client bootstrap nodes, used to bootstrap DHT. If not set, light client acts as a bootstrap node, waiting for first peer to connect for DHT bootstrap (default: empty). pub bootstraps: Vec<(String, Multiaddr)>, + /// Defines a period of time in which periodic bootstraps will be repeated. (default: 300 sec) + pub bootstrap_period: u64, /// Vector of Relay nodes, which are used for hole punching pub relays: Vec<(String, Multiaddr)>, /// WebSocket endpoint of full node for subscribing to latest header, etc (default: [ws://127.0.0.1:9944]). @@ -347,6 +349,7 @@ pub struct LibP2PConfig { pub kademlia: KademliaConfig, pub is_relay: bool, pub relays: Vec<(PeerId, Multiaddr)>, + pub bootstrap_interval: Duration, } impl From<&RuntimeConfig> for LibP2PConfig { @@ -366,6 +369,7 @@ impl From<&RuntimeConfig> for LibP2PConfig { kademlia: val.into(), is_relay: val.relays.is_empty(), relays: relay_nodes, + bootstrap_interval: Duration::from_secs(val.bootstrap_period), } } } @@ -495,6 +499,7 @@ impl Default for RuntimeConfig { identify_protocol: "/avail_kad/id/1.0.0".to_string(), identify_agent: "avail-light-client/rust-client".to_string(), bootstraps: Vec::new(), + bootstrap_period: 300, relays: Vec::new(), full_node_ws: vec!["ws://127.0.0.1:9944".to_owned()], app_id: None,