From 5bbfb57467423954ee10ddfbae0c081061ff4e23 Mon Sep 17 00:00:00 2001 From: diego Date: Mon, 16 Dec 2024 23:40:00 +0100 Subject: [PATCH] add discovery --- anchor/network/src/behaviour.rs | 1 - anchor/network/src/discovery.rs | 392 ++++++++++++++++++++++++++++---- anchor/network/src/network.rs | 67 +++--- 3 files changed, 388 insertions(+), 72 deletions(-) diff --git a/anchor/network/src/behaviour.rs b/anchor/network/src/behaviour.rs index 5e89ed6..baa2dd1 100644 --- a/anchor/network/src/behaviour.rs +++ b/anchor/network/src/behaviour.rs @@ -1,5 +1,4 @@ use crate::discovery::Discovery; -use discv5::Discv5; use libp2p::swarm::NetworkBehaviour; use libp2p::{gossipsub, identify, ping}; diff --git a/anchor/network/src/discovery.rs b/anchor/network/src/discovery.rs index 9959d6b..998583f 100644 --- a/anchor/network/src/discovery.rs +++ b/anchor/network/src/discovery.rs @@ -1,69 +1,377 @@ +use std::collections::HashMap; +use std::future::Future; +use std::net::Ipv4Addr; +use std::pin::Pin; use std::task::{Context, Poll}; +use std::time::Instant; -use crate::Config; -use discv5::enr::CombinedKey; -use discv5::libp2p_identity::PeerId; -use discv5::multiaddr::Multiaddr; use discv5::{Discv5, Enr}; -use libp2p::core::transport::PortUse; +use discv5::enr::{CombinedKey, NodeId}; +use discv5::libp2p_identity::{Keypair, PeerId}; +use discv5::multiaddr::Multiaddr; +use futures::{StreamExt, TryFutureExt}; +use futures::FutureExt; +use futures::stream::FuturesUnordered; use libp2p::core::Endpoint; +use libp2p::core::transport::PortUse; +use libp2p::swarm::{ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm}; use libp2p::swarm::dummy::ConnectionHandler; -use libp2p::swarm::{ - ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, - THandlerOutEvent, ToSwarm, -}; +use lighthouse_network::{CombinedKeyExt, Subnet}; +use lighthouse_network::discovery::DiscoveredPeers; use lighthouse_network::discovery::enr_ext::{QUIC6_ENR_KEY, QUIC_ENR_KEY}; +use tokio::sync::mpsc; + +use crate::Config; + +/// The number of closest peers to search for when doing a regular peer search. +/// +/// We could reduce this constant to speed up queries however at the cost of security. It will +/// make it easier to peers to eclipse this node. Kademlia suggests a value of 16. +pub const FIND_NODE_QUERY_CLOSEST_PEERS: usize = 16; + +#[derive(Debug, Clone, PartialEq)] +struct SubnetQuery { + subnet: Subnet, + min_ttl: Option, + retries: usize, +} + +#[derive(Debug, Clone, PartialEq)] +enum QueryType { + /// We are searching for subnet peers. + Subnet(Vec), + /// We are searching for more peers without ENR or time constraints. + FindPeers, +} + +/// The result of a query. +struct QueryResult { + query_type: QueryType, + result: Result, discv5::QueryError>, +} + +// Awaiting the event stream future +enum EventStream { + /// Awaiting an event stream to be generated. This is required due to the poll nature of + /// `Discovery` + Awaiting( + Pin, discv5::Error>> + Send>>, + ), + /// The future has completed. + Present(mpsc::Receiver), + // The future has failed or discv5 has been disabled. There are no events from discv5. + InActive, +} pub struct Discovery { - pub discv5: Discv5, + /// The handle for the underlying discv5 Server. + /// + /// This is behind a Reference counter to allow for futures to be spawned and polled with a + /// static lifetime. + discv5: Discv5, + + /// Indicates if we are actively searching for peers. We only allow a single FindPeers query at + /// a time, regardless of the query concurrency. + find_peer_active: bool, + + /// Active discovery queries. + active_queries: FuturesUnordered + Send>>>, + + /// The discv5 event stream. + event_stream: EventStream, + + /// Indicates if the discovery service has been started. When the service is disabled, this is + /// always false. + pub started: bool, } -impl NetworkBehaviour for Discovery { - type ConnectionHandler = ConnectionHandler; - type ToSwarm = (); +impl Discovery { + pub async fn new( + local_keypair: Keypair, + network_config: &Config, + ) -> Result { - fn handle_established_inbound_connection( - &mut self, - _connection_id: ConnectionId, - peer: PeerId, - local_addr: &Multiaddr, - remote_addr: &Multiaddr, - ) -> Result, ConnectionDenied> { - todo!() - } + let _enr_dir = match network_config.network_dir.to_str() { + Some(path) => String::from(path), + None => String::from(""), + }; - fn handle_established_outbound_connection( - &mut self, - _connection_id: ConnectionId, - peer: PeerId, - addr: &Multiaddr, - role_override: Endpoint, - port_use: PortUse, - ) -> Result, ConnectionDenied> { - todo!() + // TODO info!(log, "ENR Initialised"; "enr" => local_enr.to_base64(), "seq" => local_enr.seq(), "id"=> %local_enr.node_id(), + // "ip4" => ?local_enr.ip4(), "udp4"=> ?local_enr.udp4(), "tcp4" => ?local_enr.tcp4(), "tcp6" => ?local_enr.tcp6(), "udp6" => ?local_enr.udp6(), + // "quic4" => ?local_enr.quic4(), "quic6" => ?local_enr.quic6() + // ); + + let discv5_listen_config = discv5::ListenConfig::from_ip(Ipv4Addr::UNSPECIFIED.into(), 9000); + + // discv5 configuration + let discv5_config = discv5::ConfigBuilder::new(discv5_listen_config).build(); + + // convert the keypair into an ENR key + let enr_key: CombinedKey = CombinedKey::from_libp2p(local_keypair)?; + + let enr = build_enr(&enr_key, network_config).unwrap(); + let mut discv5 = Discv5::new(enr, enr_key, discv5_config) + .map_err(|e| format!("Discv5 service failed. Error: {:?}", e))?; + + // Add bootnodes to routing table + for bootnode_enr in network_config.boot_nodes_enr.clone() { + // TODO if bootnode_enr.node_id() == local_node_id { + // // If we are a boot node, ignore adding it to the routing table + // continue; + // } + // TODO debug!( + // log, + // "Adding node to routing table"; + // "node_id" => %bootnode_enr.node_id(), + // "peer_id" => %bootnode_enr.peer_id(), + // "ip" => ?bootnode_enr.ip4(), + // "udp" => ?bootnode_enr.udp4(), + // "tcp" => ?bootnode_enr.tcp4(), + // "quic" => ?bootnode_enr.quic4() + // ); + + //let repr = bootnode_enr.to_string(); + let _ = discv5.add_enr(bootnode_enr).map_err(|_e| { + // TODO error!( + // log, + // "Could not add peer to the local routing table"; + // "addr" => repr, + // "error" => e.to_string(), + // ) + }); + } + + // Start the discv5 service and obtain an event stream + let event_stream = if !network_config.disable_discovery { + discv5.start().map_err(|e| e.to_string()).await?; + // TODO debug!(log, "Discovery service started"); + EventStream::Awaiting(Box::pin(discv5.event_stream())) + } else { + EventStream::InActive + }; + + if !network_config.boot_nodes_multiaddr.is_empty() { + // TODO info!(log, "Contacting Multiaddr boot-nodes for their ENR"); + } + + // get futures for requesting the Enrs associated to these multiaddr and wait for their + // completion + let mut fut_coll = network_config + .boot_nodes_multiaddr + .iter() + .map(|addr| addr.to_string()) + // request the ENR for this multiaddr and keep the original for logging + .map(|addr| { + futures::future::join( + discv5.request_enr(addr.clone()), + futures::future::ready(addr), + ) + }) + .collect::>(); + + while let Some((result, _original_addr)) = fut_coll.next().await { + match result { + Ok(enr) => { + // TODO debug!( + // log, + // "Adding node to routing table"; + // "node_id" => %enr.node_id(), + // "peer_id" => %enr.peer_id(), + // "ip" => ?enr.ip4(), + // "udp" => ?enr.udp4(), + // "tcp" => ?enr.tcp4(), + // "quic" => ?enr.quic4() + // ); + let _ = discv5.add_enr(enr).map_err(|_e| { + // TODO error!( + // log, + // "Could not add peer to the local routing table"; + // "addr" => original_addr.to_string(), + // "error" => e.to_string(), + // ) + }); + } + Err(_e) => { + // TODO error!(log, "Error getting mapping to ENR"; "multiaddr" => original_addr.to_string(), "error" => e.to_string()) + } + } + } + + // TODO let update_ports = UpdatePorts { + // tcp4: config.enr_tcp4_port.is_none(), + // tcp6: config.enr_tcp6_port.is_none(), + // quic4: config.enr_quic4_port.is_none(), + // quic6: config.enr_quic6_port.is_none(), + // }; + + Ok(Self { + // cached_enrs: LruCache::new(ENR_CACHE_CAPACITY), + // network_globals, + find_peer_active: false, + // queued_queries: VecDeque::with_capacity(10), + active_queries: FuturesUnordered::new(), + discv5, + event_stream, + started: !network_config.disable_discovery, + // update_ports, + // log, + // enr_dir, + // spec: Arc::new(spec.clone()), + }) } - fn on_swarm_event(&mut self, event: FromSwarm) {} + /// This adds a new `FindPeers` query to the queue if one doesn't already exist. + /// The `target_peers` parameter informs discovery to end the query once the target is found. + /// The maximum this can be is 16. + pub fn discover_peers(&mut self, target_peers: usize) { + // If the discv5 service isn't running or we are in the process of a query, don't bother queuing a new one. + if !self.started || self.find_peer_active { + return; + } + // Immediately start a FindNode query + let target_peers = std::cmp::min(FIND_NODE_QUERY_CLOSEST_PEERS, target_peers); + // TODO debug!(self.log, "Starting a peer discovery request"; "target_peers" => target_peers ); + self.find_peer_active = true; + self.start_query(QueryType::FindPeers, target_peers, |_| true); + } - fn on_connection_handler_event( + /// Search for a specified number of new peers using the underlying discovery mechanism. + /// + /// This can optionally search for peers for a given predicate. Regardless of the predicate + /// given, this will only search for peers on the same enr_fork_id as specified in the local + /// ENR. + fn start_query( &mut self, - _peer_id: PeerId, - _connection_id: ConnectionId, - _event: THandlerOutEvent, + query: QueryType, + target_peers: usize, + _additional_predicate: impl Fn(&Enr) -> bool + Send + 'static, ) { - todo!() + // let enr_fork_id = match self.local_enr().eth2() { + // Ok(v) => v, + // Err(e) => { + // // TODO crit!(self.log, "Local ENR has no fork id"; "error" => e); + // return; + // } + // }; + // predicate for finding nodes with a matching fork and valid tcp port + let eth2_fork_predicate = move |enr: &Enr| { + // `next_fork_epoch` and `next_fork_version` can be different so that + // we can connect to peers who aren't compatible with an upcoming fork. + // `fork_digest` **must** be same. + + // enr.eth2().map(|e| e.fork_digest) == Ok(enr_fork_id.fork_digest) + // && + enr.tcp4().is_some() || enr.tcp6().is_some() + }; + + // General predicate + let predicate: Box bool + Send> = + //Box::new(move |enr: &Enr| eth2_fork_predicate(enr) && additional_predicate(enr)); + Box::new(move |enr: &Enr| eth2_fork_predicate(enr)); + + // Build the future + let query_future = self + .discv5 + // Generate a random target node id. + .find_node_predicate(NodeId::random(), predicate, target_peers) + .map(|v| QueryResult { + query_type: query, + result: v, + }); + + // Add the future to active queries, to be executed. + self.active_queries.push(Box::pin(query_future)); } - fn poll( + /// Process the completed QueryResult returned from discv5. + fn process_completed_queries( &mut self, - cx: &mut Context<'_>, - ) -> Poll>> { - todo!() + query: QueryResult, + ) -> Option>> { + match query.query_type { + QueryType::FindPeers => { + self.find_peer_active = false; + match query.result { + Ok(r) if r.is_empty() => { + //debug!(self.log, "Discovery query yielded no results."); + } + Ok(r) => { + // debug!(self.log, "Discovery query completed"; "peers_found" => r.len()); + let results = r + .into_iter() + .map(|enr| { + // cache the found ENR's + //self.cached_enrs.put(enr.peer_id(), enr.clone()); + (enr, None) + }) + .collect(); + return Some(results); + } + Err(_e) => { + //warn!(self.log, "Discovery query failed"; "error" => %e); + } + } + } + _ => {} + } + None + } + + /// Drives the queries returning any results from completed queries. + fn poll_queries(&mut self, cx: &mut Context) -> Option>> { + while let Poll::Ready(Some(query_result)) = self.active_queries.poll_next_unpin(cx) { + let result = self.process_completed_queries(query_result); + if result.is_some() { + return result; + } + } + None + } + +} + +impl NetworkBehaviour for Discovery { + // Discovery is not a real NetworkBehaviour... + type ConnectionHandler = ConnectionHandler; + type ToSwarm = DiscoveredPeers; + + fn handle_established_inbound_connection(&mut self, _connection_id: ConnectionId, _peer: PeerId, _local_addr: &Multiaddr, _remote_addr: &Multiaddr) -> Result, ConnectionDenied> { + Ok(ConnectionHandler) + } + + fn handle_established_outbound_connection(&mut self, _connection_id: ConnectionId, _peer: PeerId, _addr: &Multiaddr, _role_override: Endpoint, _port_use: PortUse) -> Result, ConnectionDenied> { + Ok(ConnectionHandler) + } + + fn on_swarm_event(&mut self, _event: FromSwarm) { + + } + + fn on_connection_handler_event(&mut self, _peer_id: PeerId, _connection_id: ConnectionId, _event: THandlerOutEvent) { + } + + fn poll(&mut self, cx: &mut Context<'_>) -> Poll>> { + if !self.started { + return Poll::Pending; + } + + // Process the query queue + //self.process_queue(); + + // Drive the queries and return any results from completed queries + if let Some(peers) = self.poll_queries(cx) { + // return the result to the peer manager + return Poll::Ready(ToSwarm::GenerateEvent(DiscoveredPeers { peers })); + } + Poll::Pending } } /// Builds a anchor ENR given a `network::Config`. -pub fn build_enr(enr_key: &CombinedKey, config: &Config) -> Result { +pub fn build_enr( + enr_key: &CombinedKey, + config: &Config, +) -> Result { let mut builder = discv5::enr::Enr::builder(); let (maybe_ipv4_address, maybe_ipv6_address) = &config.enr_address; diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 2b039ff..04350be 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -1,29 +1,27 @@ -use crate::behaviour::AnchorBehaviour; -use crate::behaviour::AnchorBehaviourEvent; -use crate::discovery::{build_enr, Discovery}; -use crate::keypair_utils::load_private_key; -use crate::transport::build_transport; -use crate::Config; -use discv5::enr::Enr; -use discv5::Discv5; +use std::num::{NonZeroU8, NonZeroUsize}; +use std::pin::Pin; +use std::time::Duration; + use futures::StreamExt; +use libp2p::{futures, gossipsub, identify, PeerId, ping, Swarm, SwarmBuilder}; use libp2p::core::muxing::StreamMuxerBox; use libp2p::core::transport::Boxed; use libp2p::gossipsub::{MessageAuthenticity, ValidationMode}; use libp2p::identity::Keypair; use libp2p::multiaddr::Protocol; use libp2p::swarm::SwarmEvent; -use libp2p::{futures, gossipsub, identify, ping, PeerId, Swarm, SwarmBuilder}; -use lighthouse_network::discovery::CombinedKey; +use lighthouse_network::discovery::DiscoveredPeers; use lighthouse_network::discv5::enr::k256::sha2::{Digest, Sha256}; -use lighthouse_network::CombinedKeyExt; -use std::net::Ipv4Addr; -use std::num::{NonZeroU8, NonZeroUsize}; -use std::pin::Pin; -use std::time::Duration; use task_executor::TaskExecutor; use tracing::{info, log}; +use crate::behaviour::AnchorBehaviour; +use crate::behaviour::AnchorBehaviourEvent; +use crate::Config; +use crate::discovery::{Discovery, FIND_NODE_QUERY_CLOSEST_PEERS}; +use crate::keypair_utils::load_private_key; +use crate::transport::build_transport; + pub struct Network { swarm: Swarm, peer_id: PeerId, @@ -35,7 +33,7 @@ impl Network { pub async fn try_new(config: &Config, executor: TaskExecutor) -> Result { let local_keypair: Keypair = load_private_key(&config.network_dir); let transport = build_transport(local_keypair.clone(), !config.disable_quic_support); - let behaviour = build_anchor_behaviour(local_keypair.clone(), config); + let behaviour = build_anchor_behaviour(local_keypair.clone(), config).await; let peer_id = local_keypair.public().to_peer_id(); let mut network = Network { @@ -91,12 +89,21 @@ impl Network { AnchorBehaviourEvent::Gossipsub(_ge) => { // TODO handle gossipsub events }, + // Inform the peer manager about discovered peers. + // + // The peer manager will subsequently decide which peers need to be dialed and then dial + // them. + AnchorBehaviourEvent::Discovery(DiscoveredPeers { peers }) => { + //self.peer_manager_mut().peers_discovered(peers); + log::debug!("Discovered peers: {:?}", peers); + } // TODO handle other behaviour events _ => { log::debug!("Unhandled behaviour event: {:?}", behaviour_event); } }, // TODO handle other swarm events + SwarmEvent::NewListenAddr { .. } => {}, _ => { log::debug!("Unhandled swarm event: {:?}", swarm_message); } @@ -108,7 +115,7 @@ impl Network { } } -fn build_anchor_behaviour(local_keypair: Keypair, network_config: &Config) -> AnchorBehaviour { +async fn build_anchor_behaviour(local_keypair: Keypair, network_config: &Config) -> AnchorBehaviour { // TODO setup discv5 let identify = { let local_public_key = local_keypair.public(); @@ -149,21 +156,22 @@ fn build_anchor_behaviour(local_keypair: Keypair, network_config: &Config) -> An gossipsub::Behaviour::new(MessageAuthenticity::Signed(local_keypair.clone()), config) .unwrap(); - let discv5_listen_config = discv5::ListenConfig::from_ip(Ipv4Addr::UNSPECIFIED.into(), 9000); - - // discv5 configuration - let discv5_config = discv5::ConfigBuilder::new(discv5_listen_config).build(); - - // convert the keypair into an ENR key - let enr_key: CombinedKey = CombinedKey::from_libp2p(local_keypair).unwrap(); - let enr = build_enr(&enr_key, network_config).unwrap(); - let discv5 = Discv5::new(enr, enr_key, discv5_config).unwrap(); + let discovery = { + // Build and start the discovery sub-behaviour + let mut discovery = Discovery::new( + local_keypair.clone(), + &network_config, + ).await.unwrap(); + // start searching for peers + discovery.discover_peers(FIND_NODE_QUERY_CLOSEST_PEERS); + discovery + }; AnchorBehaviour { identify, ping: ping::Behaviour::default(), gossipsub, - discovery: Discovery { discv5 }, + discovery, } } @@ -222,10 +230,11 @@ fn build_swarm( #[cfg(test)] mod test { - use crate::network::Network; - use crate::Config; use task_executor::TaskExecutor; + use crate::Config; + use crate::network::Network; + #[tokio::test] async fn create_network() { let handle = tokio::runtime::Handle::current();