diff --git a/bootstrap/CHANGELOG.md b/bootstrap/CHANGELOG.md index 825c32f0d..555bdccb3 100644 --- a/bootstrap/CHANGELOG.md +++ b/bootstrap/CHANGELOG.md @@ -1 +1,5 @@ # Changelog + +## [0.2.0] + +- Added new configuration parameter `bootstraps` which explicitly adds a list of known listen address of provided bootstraps to the routing table diff --git a/bootstrap/src/main.rs b/bootstrap/src/main.rs index b632e99ef..4e6540586 100644 --- a/bootstrap/src/main.rs +++ b/bootstrap/src/main.rs @@ -131,6 +131,12 @@ async fn run() -> Result<()> { info!("Started listening for TCP traffic on port: {:?}.", cfg.port); info!("Bootstrap node starting ..."); + // add bootstrap nodes, if provided + if !cfg.bootstraps.is_empty() { + network_client + .add_bootstrap_nodes(cfg.bootstraps.iter().map(Into::into).collect()) + .await?; + } network_client.bootstrap().await?; info!("Bootstrap done."); loop_handle.await?; diff --git a/bootstrap/src/p2p/client.rs b/bootstrap/src/p2p/client.rs index 8a2775461..93858dd68 100644 --- a/bootstrap/src/p2p/client.rs +++ b/bootstrap/src/p2p/client.rs @@ -41,6 +41,32 @@ impl Client { .context("Sender not to be dropped.")? } + pub async fn dial_peer(&self, peer_id: PeerId, peer_address: Multiaddr) -> Result<()> { + let (response_sender, response_receiver) = oneshot::channel(); + self.command_sender + .send(Command::DialPeer { + peer_id, + peer_address, + response_sender, + }) + .await + .context("Command receiver should not be dropped.")?; + response_receiver + .await + .context("Sender not to be dropped.")? + } + + pub async fn add_bootstrap_nodes(&self, nodes: Vec<(PeerId, Multiaddr)>) -> Result<()> { + for (peer, addr) in nodes { + self.dial_peer(peer, addr.clone()) + .await + .context("Dialing Bootstrap peer failed.")?; + self.add_address(peer, addr.clone()).await?; + } + + Ok(()) + } + pub async fn bootstrap(&self) -> Result<()> { // bootstrapping is impossible on an empty DHT table // at least one node is required to be known, so check @@ -103,6 +129,11 @@ pub enum Command { addr: Multiaddr, response_sender: oneshot::Sender>, }, + DialPeer { + peer_id: PeerId, + peer_address: Multiaddr, + response_sender: oneshot::Sender>, + }, AddAddress { peer_id: PeerId, multiaddr: Multiaddr, diff --git a/bootstrap/src/p2p/event_loop.rs b/bootstrap/src/p2p/event_loop.rs index d2af15719..b4ed470ef 100644 --- a/bootstrap/src/p2p/event_loop.rs +++ b/bootstrap/src/p2p/event_loop.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use anyhow::{anyhow, Result}; use libp2p::{ autonat::{self, InboundProbeEvent, OutboundProbeEvent}, futures::StreamExt, @@ -8,7 +8,11 @@ use libp2p::{ swarm::SwarmEvent, Multiaddr, PeerId, Swarm, }; -use std::{collections::HashMap, str::FromStr, time::Duration}; +use std::{ + collections::{hash_map, HashMap}, + str::FromStr, + time::Duration, +}; use tokio::{ sync::{mpsc, oneshot}, time::{interval_at, Instant, Interval}, @@ -24,6 +28,7 @@ enum QueryChannel { } enum SwarmChannel { + Dial(oneshot::Sender>), ConnectionEstablished(oneshot::Sender<(PeerId, Multiaddr)>), } @@ -161,7 +166,7 @@ impl EventLoop { } if protocols.contains(&self.swarm.behaviour_mut().kademlia.protocol_names()[0]) { - debug!("Adding peer {peer_id} to routing table."); + trace!("Adding peer {peer_id} to routing table."); for addr in listen_addrs { self.swarm .behaviour_mut() @@ -221,6 +226,10 @@ impl EventLoop { error, } => { trace!("Outgoing connection error. Connection id: {connection_id}. Peer: {peer_id}. Error: {error}."); + + if let Some(SwarmChannel::Dial(ch)) = self.pending_swarm_events.remove(&peer_id) { + _ = ch.send(Err(anyhow!(error))); + } }, SwarmEvent::ConnectionEstablished { endpoint, peer_id, .. @@ -228,15 +237,20 @@ impl EventLoop { // while waiting for a first successful connection, // we're interested in a case where we are dialing back if endpoint.is_dialer() { - // check if there is a command waiting for a response - let local_peer_id = self.swarm.local_peer_id(); - if let Some(SwarmChannel::ConnectionEstablished(ch)) = - self.pending_swarm_events.remove(local_peer_id) - { - // signal back that we have successfully established a connection, - // give us back PeerId and Multiaddress - let addr = endpoint.get_remote_address().to_owned(); - _ = ch.send((peer_id, addr)); + if let Some(event) = self.pending_swarm_events.remove(&peer_id) { + match event { + // check if there is a command waiting for a response for established 1st connection + SwarmChannel::ConnectionEstablished(ch) => { + // signal back that we have successfully established a connection, + // give us back PeerId and Multiaddress + let addr = endpoint.get_remote_address().to_owned(); + _ = ch.send((peer_id, addr)); + }, + SwarmChannel::Dial(ch) => { + // signal back that dial was a success + _ = ch.send(Ok(())); + }, + } } } }, @@ -262,6 +276,24 @@ impl EventLoop { Err(err) => response_sender.send(Err(err.into())), } }, + Command::DialPeer { + peer_id, + peer_address, + response_sender, + } => { + if let hash_map::Entry::Vacant(e) = self.pending_swarm_events.entry(peer_id) { + match self.swarm.dial(peer_address.with(Protocol::P2p(peer_id))) { + Ok(()) => { + e.insert(SwarmChannel::Dial(response_sender)); + }, + Err(e) => { + let _ = response_sender.send(Err(anyhow!(e))); + }, + } + } else { + todo!("Already dialing peer."); + } + }, Command::AddAddress { peer_id, multiaddr, diff --git a/bootstrap/src/types.rs b/bootstrap/src/types.rs index f324b3a98..56131d9d1 100644 --- a/bootstrap/src/types.rs +++ b/bootstrap/src/types.rs @@ -1,5 +1,5 @@ -use anyhow::Context; -use libp2p::StreamProtocol; +use anyhow::{anyhow, Context, Error}; +use libp2p::{Multiaddr, PeerId, StreamProtocol}; use semver::Version; use serde::{Deserialize, Serialize}; use std::{ @@ -24,6 +24,42 @@ pub enum SecretKey { Key { key: String }, } +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(try_from = "String")] +pub struct CompactMultiaddress((PeerId, Multiaddr)); + +impl TryFrom for CompactMultiaddress { + type Error = Error; + + fn try_from(value: String) -> std::result::Result { + let Some((_, peer_id)) = value.rsplit_once('/') else { + return Err(anyhow!("Invalid multiaddress string")); + }; + let peer_id = PeerId::from_str(peer_id)?; + let multiaddr = Multiaddr::from_str(&value)?; + Ok(CompactMultiaddress((peer_id, multiaddr))) + } +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde( + untagged, + expecting = "Valid multiaddress/peer_id string or a tuple (peer_id, multiaddress) expected" +)] +pub enum MultiaddrConfig { + Compact(CompactMultiaddress), + PeerIdAndMultiaddr((PeerId, Multiaddr)), +} + +impl From<&MultiaddrConfig> for (PeerId, Multiaddr) { + fn from(value: &MultiaddrConfig) -> Self { + match value { + MultiaddrConfig::Compact(CompactMultiaddress(value)) => value.clone(), + MultiaddrConfig::PeerIdAndMultiaddr(value) => value.clone(), + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(default)] pub struct RuntimeConfig { @@ -52,6 +88,8 @@ pub struct RuntimeConfig { pub autonat_only_global_ips: bool, /// Sets the timeout for a single Kademlia query. (default: 60s). pub kad_query_timeout: u32, + /// Vector of Light Client bootstrap nodes, used to bootstrap DHT. If not set, light client acts as a bootstrap node, waiting for first peer to connect for DHT bootstrap (default: empty). + pub bootstraps: Vec, /// Defines a period of time in which periodic bootstraps will be repeated. (default: 300s) pub bootstrap_period: u64, /// OpenTelemetry Collector endpoint (default: http://127.0.0.1:4317) @@ -151,6 +189,7 @@ impl Default for RuntimeConfig { autonat_only_global_ips: true, connection_idle_timeout: 30, kad_query_timeout: 60, + bootstraps: vec![], bootstrap_period: 300, ot_collector_endpoint: "http://127.0.0.1:4317".to_string(), metrics_network_dump_interval: 15,