diff --git a/.gitignore b/.gitignore index fcc761c4d..f33af1734 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,5 @@ +.* /target **/*.rs.bk -.env -.DS_Store config.yaml avail_light_store diff --git a/Cargo.lock b/Cargo.lock index 79d3e3980..99d59bb48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -647,6 +647,7 @@ dependencies = [ "tracing", "tracing-subscriber 0.3.17", "url", + "uuid", "void", "warp", ] @@ -7887,11 +7888,24 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.3.1" +version = "1.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b55a3fef2a1e3b3a00ce878640918820d3c51081576ac657d23af9fc7928fdb" +checksum = "0fa2982af2eec27de306107c027578ff7f423d65f7250e40ce0fea8f45248b81" dependencies = [ "getrandom 0.2.9", + "rand 0.8.5", + "uuid-macro-internal", +] + +[[package]] +name = "uuid-macro-internal" +version = "1.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8500be15217da76379f13cfb1a9e351ccc2b0959c7bc8ea64ac4302ba4de4" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.15", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 8027e00b8..ab42d6f11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ base64 = "0.21.0" mockall = "0.11.3" async-trait = "0.1.66" hex-literal = "0.4.0" +uuid = { version = "1.3.4", features = ["v4", "fast-rng", "macro-diagnostics"] } pcap = "1.1.0" [features] @@ -85,4 +86,3 @@ panic = "abort" [profile.release] panic = "abort" - diff --git a/README.md b/README.md index 64d599825..b975f8498 100644 --- a/README.md +++ b/README.md @@ -81,29 +81,31 @@ http_server_port = "7000" # If set to seed, keypair will be generated from that seed. # If set to key, a valid ed25519 private key must be provided, else the client will fail # If `secret_key` is not set, random seed will be used. -secret_key = { key = "1498b5467a63dffa2dc9d9e069caf075d16fc33fdd4c3b01bfadae6433767d93" } -# Libp2p service port range (port, range) (default: 37000). -libp2p_port = "37000" -# Configures LibP2P TCP port reuse for local sockets, which implies reuse of listening ports for outgoing connections to enhance NAT traversal capabilities (default: false) -libp2p_tcp_port_reuse = false -# Configures LibP2P AutoNAT behaviour to reject probes as a server for clients that are observed at a non-global ip address (default: false) -libp2p_autonat_only_global_ips = false -# Libp2p AutoNat throttle period for re-using a peer as server for a dial-request. (default: 1 sec) -libp2p_autonat_throttle = 1 +secret_key = { seed="1" } +# P2P service port (default: 37000). +port = 3700 +# Configures TCP port reuse for local sockets, which implies reuse of listening ports for outgoing connections to enhance NAT traversal capabilities (default: false) +tcp_port_reuse = bool +# Configures AutoNAT behaviour to reject probes as a server for clients that are observed at a non-global ip address (default: false) +autonat_only_global_ips = false +# AutoNat throttle period for re-using a peer as server for a dial-request. (default: 1 sec) +autonat_throttle = 2 # Interval in which the NAT status should be re-tried if it is currently unknown or max confidence was not reached yet. (default: 10 sec) -libp2p_autonat_retry_interval = 10 +autonat_retry_interval = 10 # Interval in which the NAT should be tested again if max confidence was reached in a status. (default: 30 sec) -libp2p_autonat_refresh_interval = 30 -# Libp2p AutoNat on init delay before starting the fist probe. (default: 5 sec) -libp2p_autonat_boot_delay = 5 -# Sets libp2p application-specific version of the protocol family used by the peer. (default: "/avail_kad/id/1.0.0") -libp2p_identify_protocol = "/avail_kad/id/1.0.0" -# Sets libp2p agent version that is sent to peers. (default: "avail-light-client/rust-client") -libp2p_identify_agent = "avail-light-client/rust-client" -# Vector of Relay nodes, which are used for hole punching (default: empty) -relays = [["12D3KooWMm1c4pzeLPGkkCJMAgFbsfQ8xmVDusg272icWsaNHWzN", "/ip4/127.0.0.1/tcp/37000"]] -# Vector of IPFS bootstrap nodes, used to bootstrap DHT. If not set, light client acts as a bootstrap node, waiting for first peer to connect for DHT bootstrap (default: empty). -bootstraps = [["12D3KooWMm1c4pzeLPGkkCJMAgFbsfQ8xmVDusg272icWsaNHWzN", "/ip4/127.0.0.1/tcp/37000"]] +autonat_refresh_interval = 30 +# AutoNat on init delay before starting the fist probe. (default: 5 sec) +autonat_boot_delay = 10 +# Sets application-specific version of the protocol family used by the peer. (default: "/avail_kad/id/1.0.0") +identify_protocol = "/avail_kad/id/1.0.0" +# Sets agent version that is sent to peers. (default: "avail-light-client/rust-client") +identify_agent = "avail-light-client/rust-client" +# Vector of Light Client bootstrap nodes, used to bootstrap DHT. If not set, light client acts as a bootstrap node, waiting for first peer to connect for DHT bootstrap (default: empty). +bootstraps = [["12D3KooWE2xXc6C2JzeaCaEg7jvZLogWyjLsB5dA3iw5o3KcF9ds", "/ip4/13.51.79.255/udp/39000/quic-v1"]] +# Vector of Relay nodes, which are used for hole punching +relays = [["12D3KooWBETtE42fN7DZ5QsGgi7qfrN3jeYdXmBPL4peVTDmgG9b", "/ip4/13.49.44.246/udp/39111/quic-v1"]] +# Defines a period of time in which periodic bootstraps will be repeated. (default: 300 sec) +bootstrap_period = 300, # WebSocket endpoint of a full node for subscribing to the latest header, etc (default: ws://127.0.0.1:9944). full_node_ws = ["ws://127.0.0.1:9944"] # ID of application used to start application client. If app_id is not set, or set to 0, application client is not started (default: 0). diff --git a/src/main.rs b/src/main.rs index a90cfb1e1..ce7794984 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,7 +14,6 @@ use clap::Parser; use consts::STATE_CF; use libp2p::{metrics::Metrics as LibP2PMetrics, multiaddr::Protocol, Multiaddr, PeerId}; use prometheus_client::registry::Registry; -use rand::{thread_rng, Rng}; use rocksdb::{ColumnFamilyDescriptor, Options, DB}; use tokio::sync::mpsc::{channel, Sender}; use tracing::{error, info, metadata::ParseLevelError, trace, warn, Level}; @@ -191,13 +190,8 @@ async fn run(error_sender: Sender) -> Result<()> { tokio::spawn(network_event_loop.run()); // Start listening on provided port - let port = if cfg.libp2p_port.1 > 0 { - let port: u16 = thread_rng().gen_range(cfg.libp2p_port.0..=cfg.libp2p_port.1); - info!("Using random port: {port}"); - port - } else { - cfg.libp2p_port.0 - }; + let port = cfg.port; + info!("Using random port: {port}"); // always listen on UDP to prioritize QUIC network_client diff --git a/src/network/client.rs b/src/network/client.rs index 9befc52c0..970a9c603 100644 --- a/src/network/client.rs +++ b/src/network/client.rs @@ -27,7 +27,7 @@ pub struct Client { dht_parallelization_limit: usize, /// Cell time to live in DHT (in seconds) ttl: u64, - /// Number of records to be put in DHT simultaneuosly + /// Number of records to be put in DHT simultaneously put_batch_size: usize, } @@ -144,29 +144,29 @@ impl Client { } async fn put_kad_record_batch(&self, records: Vec, quorum: Quorum) -> NumSuccPut { - let mut num_success: usize = 0; - for records in records.chunks(self.put_batch_size).map(Into::into) { - let (sender, receiver) = oneshot::channel(); - if self - .sender - .send(Command::PutKadRecordBatch { - records, + let (tx, mut rx) = mpsc::channel::(100); + let commands = + records + .chunks(self.put_batch_size) + .map(|records| Command::PutKadRecordBatch { + records: records.into(), quorum, - sender, - }) - .await - .context("Command receiver should not be dropped.") - .is_err() - { - return NumSuccPut(num_success); + sender: tx.clone(), + }); + + for cmd in commands { + if self.sender.send(cmd).await.is_err() { + return NumSuccPut(0); } + } - num_success += - if let Ok(NumSuccPut(num)) = receiver.await.context("Sender not to be dropped.") { - num - } else { - num_success - }; + // drop tx manually, + // ensure that only senders in spawned threads are still in use + drop(tx); + + let mut num_success: usize = 0; + while let Some(NumSuccPut(num)) = rx.recv().await { + num_success += num; } NumSuccPut(num_success) } @@ -371,7 +371,7 @@ pub enum Command { PutKadRecordBatch { records: Arc<[Record]>, quorum: Quorum, - sender: oneshot::Sender, + sender: mpsc::Sender, }, ReduceKademliaMapSize, NetworkObservabilityDump, diff --git a/src/network/event_loop.rs b/src/network/event_loop.rs index 06e5264d2..cfdbd65c5 100644 --- a/src/network/event_loop.rs +++ b/src/network/event_loop.rs @@ -1,11 +1,14 @@ -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; use anyhow::Result; use async_std::stream::StreamExt; use itertools::Either; use rand::seq::SliceRandom; use std::str; -use tokio::sync::{mpsc, oneshot}; +use tokio::{ + sync::{mpsc, oneshot}, + time::{interval_at, Instant, Interval}, +}; use void::Void; use super::{ @@ -42,6 +45,7 @@ use libp2p::{ }, Multiaddr, PeerId, Swarm, }; +use uuid::Uuid; use crate::telemetry::metrics::{MetricEvent, Metrics as AvailMetrics}; use tracing::{debug, error, info, trace}; @@ -49,17 +53,58 @@ use tracing::{debug, error, info, trace}; #[derive(Debug)] enum QueryChannel { GetRecord(oneshot::Sender>), - PutRecordBatch(oneshot::Sender), + PutRecordBatch(mpsc::Sender), Bootstrap(oneshot::Sender>), } -struct RelayReservation { +struct RelayState { id: PeerId, address: Multiaddr, - is_reserved: bool, + is_circuit_established: bool, + nodes: Vec<(PeerId, Multiaddr)>, +} + +impl RelayState { + fn reset(&mut self) { + self.id = PeerId::random(); + self.address = Multiaddr::empty(); + self.is_circuit_established = false; + } + + fn select_random(&mut self) { + // choose relay by random + if let Some(relay) = self.nodes.choose(&mut rand::thread_rng()) { + let (id, addr) = relay.clone(); + // appoint this relay as our chosen one + self.id = id; + self.address = addr; + } + } } -enum QueryState { +// BootstrapState keeps track of all things bootstrap related +struct BootstrapState { + // referring to the initial bootstrap process, + // one that runs when the Light Client node starts up + is_startup_done: bool, + timer: Interval, +} + +#[derive(Debug)] +// Query Details struct gives additional meaning and data to ongoing KAD queries +struct QueryDetails { + // Batch ID field is only ever populated in case query is being utilized in batch + // only present in case of batching + batch_id: Option, + // Status field signifies in what state the query currently is + status: QueryStatus, + // Result sender channel is always present, for all queries + res_sender: QueryChannel, +} + +#[derive(Debug)] +// Query Status enum represents current state of the ongoing KAD query +enum QueryStatus { Pending, Succeeded, Failed(anyhow::Error), @@ -69,14 +114,12 @@ pub struct EventLoop { swarm: Swarm, command_receiver: mpsc::Receiver, output_senders: Vec>, - pending_kad_queries: HashMap, + pending_kad_queries: HashMap, pending_kad_routing: HashMap>>, - pending_kad_query_batch: HashMap, - pending_batch_complete: Option, metrics: Metrics, avail_metrics: AvailMetrics, - relay_nodes: Vec<(PeerId, Multiaddr)>, - relay_reservation: RelayReservation, + relay: RelayState, + bootstrap: BootstrapState, kad_remove_local_record: bool, } @@ -105,6 +148,7 @@ impl EventLoop { metrics: Metrics, avail_metrics: AvailMetrics, relay_nodes: Vec<(PeerId, Multiaddr)>, + periodic_bootstrap_interval: Duration, kad_remove_local_record: bool, ) -> Self { Self { @@ -113,15 +157,20 @@ impl EventLoop { output_senders: Vec::new(), pending_kad_queries: Default::default(), pending_kad_routing: Default::default(), - pending_kad_query_batch: Default::default(), - pending_batch_complete: None, metrics, avail_metrics, - relay_nodes, - relay_reservation: RelayReservation { + relay: RelayState { id: PeerId::random(), address: Multiaddr::empty(), - is_reserved: Default::default(), + is_circuit_established: false, + nodes: relay_nodes, + }, + bootstrap: BootstrapState { + is_startup_done: false, + timer: interval_at( + Instant::now() + periodic_bootstrap_interval, + periodic_bootstrap_interval, + ), }, kad_remove_local_record, } @@ -131,7 +180,12 @@ impl EventLoop { loop { tokio::select! { event = self.swarm.next() => self.handle_event(event.expect("Swarm stream should be infinite")).await, - Some(command) = self.command_receiver.recv() => self.handle_command(command).await, + command = self.command_receiver.recv() => match command { + Some(c) => self.handle_command(c), + // Command channel closed, thus shutting down the network event loop. + None => return, + }, + _ = self.bootstrap.timer.tick() => self.handle_periodic_bootstraps(), } } } @@ -188,15 +242,19 @@ impl EventLoop { KademliaEvent::OutboundQueryProgressed { id, result, .. } => match result { QueryResult::GetRecord(result) => match result { Ok(GetRecordOk::FoundRecord(record)) => { - if let Some(QueryChannel::GetRecord(ch)) = - self.pending_kad_queries.remove(&id) + if let Some(QueryDetails { + res_sender: QueryChannel::GetRecord(ch), + .. + }) = self.pending_kad_queries.remove(&id) { _ = ch.send(Ok(record)); } }, Err(err) => { - if let Some(QueryChannel::GetRecord(ch)) = - self.pending_kad_queries.remove(&id) + if let Some(QueryDetails { + res_sender: QueryChannel::GetRecord(ch), + .. + }) = self.pending_kad_queries.remove(&id) { _ = ch.send(Err(err.into())); } @@ -204,41 +262,87 @@ impl EventLoop { _ => (), }, QueryResult::PutRecord(result) => { - if let Some(v) = self.pending_kad_query_batch.get_mut(&id) { - if let Ok(put_record_ok) = result.as_ref() { - // Remove local records for fat clients (memory optimization) - if self.kad_remove_local_record { - self.swarm - .behaviour_mut() - .kademlia - .remove_record(&put_record_ok.key); - } - }; - - // TODO: Handle or log errors - *v = match result { - Ok(_) => QueryState::Succeeded, - Err(error) => QueryState::Failed(error.into()), - }; - - let has_pending = self - .pending_kad_query_batch + // update this records ongoing Status Field accordingly, if there's an entry + self.pending_kad_queries.entry(id).and_modify(|qd| { + qd.status = match result { + Ok(record) => { + // check if removal of local records is set for fat clients memory optimization + if self.kad_remove_local_record { + self.swarm + .behaviour_mut() + .kademlia + .remove_record(&record.key); + } + QueryStatus::Succeeded + }, + Err(err) => QueryStatus::Failed(err.into()), + } + }); + + // gather finished queries that should be removed from pending + let ids_to_remove = if let Some(QueryDetails { + batch_id: Some(uuid), + res_sender: QueryChannel::PutRecordBatch(ch), + .. + }) = self.pending_kad_queries.get(&id) + { + // filter queries from the current batch + let batch_queries = self + .pending_kad_queries .iter() - .any(|(_, qs)| matches!(qs, QueryState::Pending)); + .filter(|(_, qd)| qd.batch_id == Some(*uuid)) + .collect::>(); - if !has_pending { - if let Some(QueryChannel::PutRecordBatch(ch)) = - self.pending_batch_complete.take() - { - let count_success = self - .pending_kad_query_batch + // make sure there are no Pending queries left + let has_no_pending = !batch_queries + .iter() + .any(|(_, qd)| matches!(qd.status, QueryStatus::Pending)); + + // if there are no Pending queries left + // collect ones that are considered to be done + let finished_queries = has_no_pending + .then(|| { + batch_queries + .into_iter() + .filter(|(_, qd)| { + !matches!(qd.status, QueryStatus::Pending) + }) + .collect::>() + }) + // count successful ones and gather what ids to remove + .and_then(|queries| { + let success_count = queries .iter() - .filter(|(_, qs)| matches!(qs, QueryState::Succeeded)) + .filter(|(_, &qd)| { + matches!(qd.status, QueryStatus::Succeeded) + }) .count(); - _ = ch.send(NumSuccPut(count_success)); - } + let ids_to_remove = queries + .into_iter() + .map(|(&id, _)| id) + .collect::>(); + + Some((success_count, ids_to_remove)) + }); + + // construct result to return out of closure + if let Some((success_count, ids_to_remove)) = finished_queries { + // send result back through channel + _ = ch.send(NumSuccPut(success_count)).await; + Some(ids_to_remove) + } else { + None } + } else { + None + }; + + // check to see if we have something to remove from pending map + if let Some(ids) = ids_to_remove { + ids.iter().for_each(|id| { + self.pending_kad_queries.remove(id); + }); } }, QueryResult::Bootstrap(result) => match result { @@ -248,17 +352,23 @@ impl EventLoop { }) => { trace!("BootstrapOK event. PeerID: {peer:?}. Num remaining: {num_remaining:?}."); if num_remaining == 0 { - if let Some(QueryChannel::Bootstrap(ch)) = - self.pending_kad_queries.remove(&id) + if let Some(QueryDetails { + res_sender: QueryChannel::Bootstrap(ch), + .. + }) = self.pending_kad_queries.remove(&id) { _ = ch.send(Ok(())); + // we can say that the startup bootstrap is done here + self.bootstrap.is_startup_done = true; } } }, Err(err) => { trace!("Bootstrap error event. Error: {err:?}."); - if let Some(QueryChannel::Bootstrap(ch)) = - self.pending_kad_queries.remove(&id) + if let Some(QueryDetails { + res_sender: QueryChannel::Bootstrap(ch), + .. + }) = self.pending_kad_queries.remove(&id) { _ = ch.send(Err(err.into())); } @@ -269,7 +379,7 @@ impl EventLoop { } }, SwarmEvent::Behaviour(BehaviourEvent::Identify(event)) => { - // record Indetify Behaviour events + // record Identify Behaviour events self.metrics.record(&event); match event { @@ -278,54 +388,32 @@ impl EventLoop { info: Info { listen_addrs, .. }, } => { debug!("Identity Received from: {peer_id:?} on listen address: {listen_addrs:?}"); - // before we try and do a reservation with the relay - // we have to exchange observed addresses - // in this case relay needs to tell us our own - if peer_id == self.relay_reservation.id - && !self.relay_reservation.is_reserved - { - match self.swarm.listen_on( - self.relay_reservation - .address - .clone() - .with(Protocol::P2p(peer_id.into())) - .with(Protocol::P2pCircuit), - ) { - Ok(_) => { - self.relay_reservation.is_reserved = true; - }, - Err(e) => { - error!("Local node failed to listen on relay address. Error: {e:#?}"); - }, - } - } + self.establish_relay_circuit(peer_id); // only interested in addresses with actual Multiaddresses // ones that contains the 'p2p' tag - let addrs = listen_addrs + listen_addrs .into_iter() .filter(|a| a.to_string().contains(Protocol::P2p(peer_id.into()).tag())) - .collect::>(); - - for addr in addrs { - self.swarm - .behaviour_mut() - .kademlia - .add_address(&peer_id, addr.clone()); - - // if address contains relay circuit tag, - // dial that address for immediate Direct Connection Upgrade - if *self.swarm.local_peer_id() != peer_id - && addr.to_string().contains(Protocol::P2pCircuit.tag()) - { - _ = self.swarm.dial( - DialOpts::peer_id(peer_id) - .condition(PeerCondition::Disconnected) - .addresses(vec![addr.with(Protocol::P2pCircuit)]) - .build(), - ); - } - } + .for_each(|a| { + self.swarm + .behaviour_mut() + .kademlia + .add_address(&peer_id, a.clone()); + + // if address contains relay circuit tag, + // dial that address for immediate Direct Connection Upgrade + if *self.swarm.local_peer_id() != peer_id + && a.to_string().contains(Protocol::P2pCircuit.tag()) + { + _ = self.swarm.dial( + DialOpts::peer_id(peer_id) + .condition(PeerCondition::Disconnected) + .addresses(vec![a.with(Protocol::P2pCircuit)]) + .build(), + ); + } + }); }, IdentifyEvent::Sent { peer_id } => { debug!("Identity Sent event to: {peer_id:?}"); @@ -377,12 +465,8 @@ impl EventLoop { // if so, create reservation request with relay if new == NatStatus::Private || old == NatStatus::Private { info!("Autonat says we're still private."); - // choose relay by random - if let Some(relay) = self.relay_nodes.choose(&mut rand::thread_rng()) { - let (relay_id, relay_addr) = relay.clone(); - // appoint this relay as our chosen one - self.relay_reservation(relay_id, relay_addr); - } + // select a relay, try to dial it + self.select_and_dial_relay(); }; }, }, @@ -426,7 +510,7 @@ impl EventLoop { num_established, cause, } => { - trace!("Connection closed. PeerID: {peer_id:?}. Address: {:?}. Num establ: {num_established:?}. Cause: {cause:?}", endpoint.get_remote_address()); + trace!("Connection closed. PeerID: {peer_id:?}. Address: {:?}. Num established: {num_established:?}. Cause: {cause:?}", endpoint.get_remote_address()); if let Some(cause) = cause { match cause { @@ -467,9 +551,9 @@ impl EventLoop { if let Some(peer_id) = peer_id { trace!("Error produced by peer with PeerId: {peer_id:?}"); // if the peer giving us problems is the chosen relay - // just remove it by reseting the reservatin state slot - if self.relay_reservation.id == peer_id { - self.reset_relay_reservation(); + // just remove it by resetting the reservation state slot + if self.relay.id == peer_id { + self.relay.reset(); } } }, @@ -480,7 +564,7 @@ impl EventLoop { } } - async fn handle_command(&mut self, command: Command) { + fn handle_command(&mut self, command: Command) { match command { Command::StartListening { addr, sender } => { _ = match self.swarm.listen_on(addr) { @@ -511,33 +595,52 @@ impl EventLoop { .bootstrap() .expect("DHT not to be empty"); - self.pending_kad_queries - .insert(query_id, QueryChannel::Bootstrap(sender)); + self.pending_kad_queries.insert( + query_id, + QueryDetails { + batch_id: None, + status: QueryStatus::Pending, + res_sender: QueryChannel::Bootstrap(sender), + }, + ); }, Command::GetKadRecord { key, sender } => { let query_id = self.swarm.behaviour_mut().kademlia.get_record(key); - self.pending_kad_queries - .insert(query_id, QueryChannel::GetRecord(sender)); + self.pending_kad_queries.insert( + query_id, + QueryDetails { + batch_id: None, + status: QueryStatus::Pending, + res_sender: QueryChannel::GetRecord(sender), + }, + ); }, Command::PutKadRecordBatch { records, quorum, sender, } => { - let mut ids: HashMap = Default::default(); - + // create unique batch ID + let batch_id = Uuid::new_v4(); for record in records.as_ref() { let query_id = self .swarm .behaviour_mut() .kademlia .put_record(record.to_owned(), quorum) - .expect("Unable to perform batch Kademlia PUT operation."); - ids.insert(query_id, QueryState::Pending); + .expect("Should be able to perform Kademlia PUT operation while batching"); + + // insert batch queries into pending KAD queries + self.pending_kad_queries.insert( + query_id, + QueryDetails { + batch_id: Some(batch_id), + status: QueryStatus::Pending, + res_sender: QueryChannel::PutRecordBatch(sender.clone()), + }, + ); } - self.pending_kad_query_batch = ids; - self.pending_batch_complete = Some(QueryChannel::PutRecordBatch(sender)); }, Command::ReduceKademliaMapSize => { self.swarm @@ -554,7 +657,7 @@ impl EventLoop { } fn dump_hash_map_block_stats(&mut self) { - let mut occurence_map = HashMap::new(); + let mut occurrence_map = HashMap::new(); for record in self .swarm @@ -571,10 +674,10 @@ impl EventLoop { .split_once(':') .expect("unable to split the key string"); - let count = occurence_map.entry(block_num.to_string()).or_insert(0); + let count = occurrence_map.entry(block_num.to_string()).or_insert(0); *count += 1; } - let mut sorted: Vec<(&String, &i32)> = occurence_map.iter().collect(); + let mut sorted: Vec<(&String, &i32)> = occurrence_map.iter().collect(); sorted.sort_by(|a, b| a.0.cmp(b.0)); for (block_number, cell_count) in sorted { trace!( @@ -612,19 +715,63 @@ impl EventLoop { )); } - fn reset_relay_reservation(&mut self) { - self.relay_reservation = RelayReservation { - id: PeerId::random(), - address: Multiaddr::empty(), - is_reserved: false, - }; + fn handle_periodic_bootstraps(&mut self) { + // commence with periodic bootstraps, + // only when the initial startup bootstrap is done + if self.bootstrap.is_startup_done { + _ = self.swarm.behaviour_mut().kademlia.bootstrap(); + } } - fn relay_reservation(&mut self, id: PeerId, address: Multiaddr) { - self.relay_reservation = RelayReservation { - id, - address, - is_reserved: false, - }; + fn establish_relay_circuit(&mut self, peer_id: PeerId) { + // before we try and create a circuit with the relay + // we have to exchange observed addresses + // in this case we're waiting on relay to tell us our own + if peer_id == self.relay.id && !self.relay.is_circuit_established { + match self.swarm.listen_on( + self.relay + .address + .clone() + .with(Protocol::P2p(peer_id.into())) + .with(Protocol::P2pCircuit), + ) { + Ok(_) => { + info!("Relay circuit established with relay: {peer_id:?}"); + self.relay.is_circuit_established = true; + }, + Err(e) => { + // failed to establish a circuit, reset to try another relay + self.relay.reset(); + error!("Local node failed to listen on relay address. Error: {e:#?}"); + }, + } + } + } + + fn select_and_dial_relay(&mut self) { + // select a random relay from the list of known ones + self.relay.select_random(); + + // dial selected relay, + // so we don't wait on swarm to do it eventually + match self.swarm.dial( + DialOpts::peer_id(self.relay.id) + .condition(PeerCondition::NotDialing) + .addresses(vec![self.relay.address.clone()]) + .build(), + ) { + Ok(_) => { + info!("Dialing Relay: {id:?} succeeded.", id = self.relay.id); + }, + Err(e) => { + // got an error while dialing, + // better select a new relay and try again + self.relay.reset(); + error!( + "Dialing Relay: {id:?}, produced an error: {e:?}", + id = self.relay.id + ); + }, + } } } diff --git a/src/network/mod.rs b/src/network/mod.rs index 7c291d66b..cfc0dec30 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -185,6 +185,7 @@ pub fn init( metrics, avail_metrics, cfg.relays, + cfg.bootstrap_interval, kad_remove_local_record, ), )) diff --git a/src/types.rs b/src/types.rs index db2011bb0..f1624c913 100644 --- a/src/types.rs +++ b/src/types.rs @@ -198,29 +198,30 @@ pub struct RuntimeConfig { /// If set to key, a valid ed25519 private key must be provided, else the client will fail /// If `secret_key` is not set, random seed will be used. pub secret_key: Option, - /// Libp2p service port range (port, range) (default: 37000). - #[serde(with = "port_range_format")] - pub libp2p_port: (u16, u16), - /// Configures LibP2P TCP port reuse for local sockets, which implies reuse of listening ports for outgoing connections to enhance NAT traversal capabilities (default: false) - pub libp2p_tcp_port_reuse: bool, - /// Configures LibP2P AutoNAT behaviour to reject probes as a server for clients that are observed at a non-global ip address (default: false) - pub libp2p_autonat_only_global_ips: bool, - /// Libp2p AutoNat throttle period for re-using a peer as server for a dial-request. (default: 1 sec) - pub libp2p_autonat_throttle: u64, + /// P2P service port (default: 37000). + pub port: u16, + /// Configures TCP port reuse for local sockets, which implies reuse of listening ports for outgoing connections to enhance NAT traversal capabilities (default: false) + pub tcp_port_reuse: bool, + /// Configures AutoNAT behaviour to reject probes as a server for clients that are observed at a non-global ip address (default: false) + pub autonat_only_global_ips: bool, + /// AutoNat throttle period for re-using a peer as server for a dial-request. (default: 1 sec) + pub autonat_throttle: u64, /// Interval in which the NAT status should be re-tried if it is currently unknown or max confidence was not reached yet. (default: 10 sec) - pub libp2p_autonat_retry_interval: u64, + pub autonat_retry_interval: u64, /// Interval in which the NAT should be tested again if max confidence was reached in a status. (default: 30 sec) - pub libp2p_autonat_refresh_interval: u64, - /// Libp2p AutoNat on init delay before starting the fist probe. (default: 5 sec) - pub libp2p_autonat_boot_delay: u64, - /// Sets libp2p application-specific version of the protocol family used by the peer. (default: "/avail_kad/id/1.0.0") - pub libp2p_identify_protocol: String, - /// Sets libp2p agent version that is sent to peers. (default: "avail-light-client/rust-client") - pub libp2p_identify_agent: String, + pub autonat_refresh_interval: u64, + /// AutoNat on init delay before starting the fist probe. (default: 5 sec) + pub autonat_boot_delay: u64, + /// Sets application-specific version of the protocol family used by the peer. (default: "/avail_kad/id/1.0.0") + pub identify_protocol: String, + /// Sets agent version that is sent to peers. (default: "avail-light-client/rust-client") + pub identify_agent: String, /// Vector of Light Client bootstrap nodes, used to bootstrap DHT. If not set, light client acts as a bootstrap node, waiting for first peer to connect for DHT bootstrap (default: empty). pub bootstraps: Vec<(String, Multiaddr)>, /// Vector of Relay nodes, which are used for hole punching pub relays: Vec<(String, Multiaddr)>, + /// Defines a period of time in which periodic bootstraps will be repeated. (default: 300 sec) + pub bootstrap_period: u64, /// WebSocket endpoint of full node for subscribing to latest header, etc (default: [ws://127.0.0.1:9944]). pub full_node_ws: Vec, /// ID of application used to start application client. If app_id is not set, or set to 0, application client is not started (default: 0). @@ -345,12 +346,13 @@ impl From<&RuntimeConfig> for LightClientConfig { pub struct LibP2PConfig { pub secret_key: Option, - pub port: (u16, u16), + pub port: u16, pub identify: IdentifyConfig, pub autonat: AutoNATConfig, pub kademlia: KademliaConfig, pub is_relay: bool, pub relays: Vec<(PeerId, Multiaddr)>, + pub bootstrap_interval: Duration, } impl From<&RuntimeConfig> for LibP2PConfig { @@ -364,12 +366,13 @@ impl From<&RuntimeConfig> for LibP2PConfig { Self { secret_key: val.secret_key.clone(), - port: val.libp2p_port, + port: val.port, identify: val.into(), autonat: val.into(), kademlia: val.into(), is_relay: val.relays.is_empty(), relays: relay_nodes, + bootstrap_interval: Duration::from_secs(val.bootstrap_period), } } } @@ -423,11 +426,11 @@ pub struct AutoNATConfig { impl From<&RuntimeConfig> for AutoNATConfig { fn from(val: &RuntimeConfig) -> Self { Self { - retry_interval: Duration::from_secs(val.libp2p_autonat_retry_interval), - refresh_interval: Duration::from_secs(val.libp2p_autonat_refresh_interval), - boot_delay: Duration::from_secs(val.libp2p_autonat_boot_delay), - throttle_server_period: Duration::from_secs(val.libp2p_autonat_throttle), - only_global_ips: val.libp2p_autonat_only_global_ips, + retry_interval: Duration::from_secs(val.autonat_retry_interval), + refresh_interval: Duration::from_secs(val.autonat_refresh_interval), + boot_delay: Duration::from_secs(val.autonat_boot_delay), + throttle_server_period: Duration::from_secs(val.autonat_throttle), + only_global_ips: val.autonat_only_global_ips, } } } @@ -440,8 +443,8 @@ pub struct IdentifyConfig { impl From<&RuntimeConfig> for IdentifyConfig { fn from(val: &RuntimeConfig) -> Self { Self { - agent_version: val.libp2p_identify_agent.clone(), - protocol_version: val.libp2p_identify_protocol.clone(), + agent_version: val.identify_agent.clone(), + protocol_version: val.identify_protocol.clone(), } } } @@ -488,21 +491,22 @@ impl Default for RuntimeConfig { RuntimeConfig { http_server_host: "127.0.0.1".to_owned(), http_server_port: (7000, 0), - libp2p_port: (37000, 0), + port: 37000, secret_key: None, - libp2p_tcp_port_reuse: false, - libp2p_autonat_only_global_ips: false, - libp2p_autonat_refresh_interval: 30, - libp2p_autonat_retry_interval: 10, - libp2p_autonat_throttle: 1, - libp2p_autonat_boot_delay: 5, - libp2p_identify_protocol: "/avail_kad/id/1.0.0".to_string(), - libp2p_identify_agent: "avail-light-client/rust-client".to_string(), + tcp_port_reuse: false, + autonat_only_global_ips: false, + autonat_refresh_interval: 30, + autonat_retry_interval: 10, + autonat_throttle: 1, + autonat_boot_delay: 5, + identify_protocol: "/avail_kad/id/1.0.0".to_string(), + identify_agent: "avail-light-client/rust-client".to_string(), + bootstraps: Vec::new(), + relays: Vec::new(), + bootstrap_period: 300, full_node_ws: vec!["ws://127.0.0.1:9944".to_owned()], app_id: None, confidence: 92.0, - bootstraps: Vec::new(), - relays: Vec::new(), avail_path: "avail_path".to_owned(), log_level: "INFO".to_owned(), log_format_json: false,