Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodic Bootstraps #247

Merged
merged 5 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 146 additions & 80 deletions src/network/event_loop.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use std::collections::HashMap;
use std::{collections::HashMap, time::Duration};

use anyhow::Result;
use async_std::stream::StreamExt;
use itertools::Either;
use rand::seq::SliceRandom;
use std::str;
use tokio::sync::{mpsc, oneshot};
use tokio::{
sync::{mpsc, oneshot},
time::{interval_at, Instant, Interval},
};
use void::Void;

use super::{
Expand Down Expand Up @@ -53,10 +56,43 @@ enum QueryChannel {
Bootstrap(oneshot::Sender<Result<()>>),
}

struct RelayReservation {
// RelayState keeps track of all things relay related
struct RelayState {
// id of the selected Relay that needs to be connected
id: PeerId,
// Multiaddress of the selected Relay that needs to be connected
address: Multiaddr,
is_reserved: bool,
// boolean value that signals if have established a circuit with the selected Relay
is_circuit_established: bool,
// list of available Relay nodes
nodes: Vec<(PeerId, Multiaddr)>,
}

impl RelayState {
fn reset(&mut self) {
self.id = PeerId::random();
self.address = Multiaddr::empty();
self.is_circuit_established = false;
}

fn select_random(&mut self) {
// choose relay by random
if let Some(relay) = self.nodes.choose(&mut rand::thread_rng()) {
let (id, addr) = relay.clone();
// appoint this relay as our chosen one
self.id = id;
self.address = addr;
}
}
}

// BootstrapState keeps track of all things bootstrap related
struct BootstrapState {
// referring to the initial bootstrap process,
// one that runs when the Light Client node starts up
is_startup_done: bool,
// timer that is responsible for firing periodic bootstraps
timer: Interval,
}

enum QueryState {
Expand All @@ -75,8 +111,8 @@ pub struct EventLoop {
pending_batch_complete: Option<QueryChannel>,
metrics: Metrics,
avail_metrics: AvailMetrics,
relay_nodes: Vec<(PeerId, Multiaddr)>,
relay_reservation: RelayReservation,
relay: RelayState,
bootstrap: BootstrapState,
kad_remove_local_record: bool,
}

Expand Down Expand Up @@ -105,6 +141,7 @@ impl EventLoop {
metrics: Metrics,
avail_metrics: AvailMetrics,
relay_nodes: Vec<(PeerId, Multiaddr)>,
bootstrap_interval: Duration,
kad_remove_local_record: bool,
) -> Self {
Self {
Expand All @@ -117,11 +154,15 @@ impl EventLoop {
pending_batch_complete: None,
metrics,
avail_metrics,
relay_nodes,
relay_reservation: RelayReservation {
relay: RelayState {
id: PeerId::random(),
address: Multiaddr::empty(),
is_reserved: Default::default(),
is_circuit_established: false,
nodes: relay_nodes,
},
bootstrap: BootstrapState {
is_startup_done: false,
timer: interval_at(Instant::now() + bootstrap_interval, bootstrap_interval),
},
kad_remove_local_record,
}
Expand All @@ -131,7 +172,12 @@ impl EventLoop {
loop {
tokio::select! {
event = self.swarm.next() => self.handle_event(event.expect("Swarm stream should be infinite")).await,
Some(command) = self.command_receiver.recv() => self.handle_command(command).await,
command = self.command_receiver.recv() => match command {
Some(c) => self.handle_command(c),
// Command channel closed, thus shutting down the network event loop.
None => return,
},
_ = self.bootstrap.timer.tick() => self.handle_periodic_bootstraps(),
}
}
}
Expand Down Expand Up @@ -252,6 +298,8 @@ impl EventLoop {
self.pending_kad_queries.remove(&id)
{
_ = ch.send(Ok(()));
// we can say that the startup bootstrap is done here
self.bootstrap.is_startup_done = true;
}
}
},
Expand All @@ -269,7 +317,7 @@ impl EventLoop {
}
},
SwarmEvent::Behaviour(BehaviourEvent::Identify(event)) => {
// record Indetify Behaviour events
// record Identify Behaviour events
self.metrics.record(&event);

match event {
Expand All @@ -278,54 +326,32 @@ impl EventLoop {
info: Info { listen_addrs, .. },
} => {
debug!("Identity Received from: {peer_id:?} on listen address: {listen_addrs:?}");
// before we try and do a reservation with the relay
// we have to exchange observed addresses
// in this case relay needs to tell us our own
if peer_id == self.relay_reservation.id
&& !self.relay_reservation.is_reserved
{
match self.swarm.listen_on(
self.relay_reservation
.address
.clone()
.with(Protocol::P2p(peer_id.into()))
.with(Protocol::P2pCircuit),
) {
Ok(_) => {
self.relay_reservation.is_reserved = true;
},
Err(e) => {
error!("Local node failed to listen on relay address. Error: {e:#?}");
},
}
}
self.establish_relay_circuit(peer_id);

// only interested in addresses with actual Multiaddresses
// ones that contains the 'p2p' tag
let addrs = listen_addrs
listen_addrs
.into_iter()
.filter(|a| a.to_string().contains(Protocol::P2p(peer_id.into()).tag()))
.collect::<Vec<Multiaddr>>();

for addr in addrs {
self.swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, addr.clone());

// if address contains relay circuit tag,
// dial that address for immediate Direct Connection Upgrade
if *self.swarm.local_peer_id() != peer_id
&& addr.to_string().contains(Protocol::P2pCircuit.tag())
{
_ = self.swarm.dial(
DialOpts::peer_id(peer_id)
.condition(PeerCondition::Disconnected)
.addresses(vec![addr.with(Protocol::P2pCircuit)])
.build(),
);
}
}
.for_each(|a| {
self.swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, a.clone());

// if address contains relay circuit tag,
// dial that address for immediate Direct Connection Upgrade
if *self.swarm.local_peer_id() != peer_id
&& a.to_string().contains(Protocol::P2pCircuit.tag())
{
_ = self.swarm.dial(
DialOpts::peer_id(peer_id)
.condition(PeerCondition::Disconnected)
.addresses(vec![a.with(Protocol::P2pCircuit)])
.build(),
);
}
});
},
IdentifyEvent::Sent { peer_id } => {
debug!("Identity Sent event to: {peer_id:?}");
Expand Down Expand Up @@ -377,12 +403,8 @@ impl EventLoop {
// if so, create reservation request with relay
if new == NatStatus::Private || old == NatStatus::Private {
info!("Autonat says we're still private.");
// choose relay by random
if let Some(relay) = self.relay_nodes.choose(&mut rand::thread_rng()) {
let (relay_id, relay_addr) = relay.clone();
// appoint this relay as our chosen one
self.relay_reservation(relay_id, relay_addr);
}
// select a relay, try to dial it
self.select_and_dial_relay();
};
},
},
Expand Down Expand Up @@ -426,7 +448,7 @@ impl EventLoop {
num_established,
cause,
} => {
trace!("Connection closed. PeerID: {peer_id:?}. Address: {:?}. Num establ: {num_established:?}. Cause: {cause:?}", endpoint.get_remote_address());
trace!("Connection closed. PeerID: {peer_id:?}. Address: {:?}. Num established: {num_established:?}. Cause: {cause:?}", endpoint.get_remote_address());

if let Some(cause) = cause {
match cause {
Expand Down Expand Up @@ -467,9 +489,9 @@ impl EventLoop {
if let Some(peer_id) = peer_id {
trace!("Error produced by peer with PeerId: {peer_id:?}");
// if the peer giving us problems is the chosen relay
// just remove it by reseting the reservatin state slot
if self.relay_reservation.id == peer_id {
self.reset_relay_reservation();
// just remove it by resetting the reservation state slot
if self.relay.id == peer_id {
self.relay.reset();
}
}
},
Expand All @@ -480,7 +502,7 @@ impl EventLoop {
}
}

async fn handle_command(&mut self, command: Command) {
fn handle_command(&mut self, command: Command) {
match command {
Command::StartListening { addr, sender } => {
_ = match self.swarm.listen_on(addr) {
Expand Down Expand Up @@ -554,7 +576,7 @@ impl EventLoop {
}

fn dump_hash_map_block_stats(&mut self) {
let mut occurence_map = HashMap::new();
let mut occurrence_map = HashMap::new();

for record in self
.swarm
Expand All @@ -571,10 +593,10 @@ impl EventLoop {
.split_once(':')
.expect("unable to split the key string");

let count = occurence_map.entry(block_num.to_string()).or_insert(0);
let count = occurrence_map.entry(block_num.to_string()).or_insert(0);
*count += 1;
}
let mut sorted: Vec<(&String, &i32)> = occurence_map.iter().collect();
let mut sorted: Vec<(&String, &i32)> = occurrence_map.iter().collect();
sorted.sort_by(|a, b| a.0.cmp(b.0));
for (block_number, cell_count) in sorted {
trace!(
Expand Down Expand Up @@ -612,19 +634,63 @@ impl EventLoop {
));
}

fn reset_relay_reservation(&mut self) {
self.relay_reservation = RelayReservation {
id: PeerId::random(),
address: Multiaddr::empty(),
is_reserved: false,
};
fn handle_periodic_bootstraps(&mut self) {
// commence with periodic bootstraps,
// only when the initial startup bootstrap is done
if self.bootstrap.is_startup_done {
_ = self.swarm.behaviour_mut().kademlia.bootstrap();
}
}

fn relay_reservation(&mut self, id: PeerId, address: Multiaddr) {
self.relay_reservation = RelayReservation {
id,
address,
is_reserved: false,
};
fn establish_relay_circuit(&mut self, peer_id: PeerId) {
// before we try and create a circuit with the relay
// we have to exchange observed addresses
// in this case we're waiting on relay to tell us our own
if peer_id == self.relay.id && !self.relay.is_circuit_established {
match self.swarm.listen_on(
self.relay
.address
.clone()
.with(Protocol::P2p(peer_id.into()))
.with(Protocol::P2pCircuit),
) {
Ok(_) => {
info!("Relay circuit established with relay: {peer_id:?}");
self.relay.is_circuit_established = true;
},
Err(e) => {
// failed to establish a circuit, reset to try another relay
self.relay.reset();
error!("Local node failed to listen on relay address. Error: {e:#?}");
},
}
}
}

fn select_and_dial_relay(&mut self) {
// select a random relay from the list of known ones
self.relay.select_random();

// dial selected relay,
// so we don't wait on swarm to do it eventually
match self.swarm.dial(
DialOpts::peer_id(self.relay.id)
.condition(PeerCondition::NotDialing)
.addresses(vec![self.relay.address.clone()])
.build(),
) {
Ok(_) => {
info!("Dialing Relay: {id:?} succeeded.", id = self.relay.id);
},
Err(e) => {
// got an error while dialing,
// better select a new relay and try again
self.relay.reset();
error!(
"Dialing Relay: {id:?}, produced an error: {e:?}",
id = self.relay.id
);
},
}
}
}
1 change: 1 addition & 0 deletions src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
kad::{Kademlia, KademliaCaching, KademliaConfig},
mdns::{tokio::Behaviour as Mdns, Config as MdnsConfig},
metrics::Metrics,
noise::{Keypair, NoiseConfig, X25519Spec},

Check warning on line 23 in src/network/mod.rs

View workflow job for this annotation

GitHub Actions / build_and_test

use of deprecated type alias `libp2p::libp2p_noise::Keypair`: This type will be made private in the future. Use `libp2p_noise::Config::new` instead to use the noise protocol.

Check warning on line 23 in src/network/mod.rs

View workflow job for this annotation

GitHub Actions / build_and_test

use of deprecated struct `libp2p::libp2p_noise::NoiseConfig`: Use `libp2p_noise::Config` instead. All other handshake patterns are deprecated and will be removed.

Check warning on line 23 in src/network/mod.rs

View workflow job for this annotation

GitHub Actions / build_and_test

use of deprecated type alias `libp2p::libp2p_noise::X25519Spec`: This type will be made private in the future. Use `libp2p_noise::Config::new` instead to use the noise protocol.
ping::{Behaviour as Ping, Config as PingConfig},
quic::{tokio::Transport as TokioQuic, Config as QuicConfig},
relay::{self, client::Behaviour as RelayClient},
Expand Down Expand Up @@ -94,11 +94,11 @@
let transport = {
let quic_transport = TokioQuic::new(QuicConfig::new(&id_keys));
// upgrade relay transport to be used with swarm
let noise_keys = Keypair::<X25519Spec>::new().into_authentic(&id_keys)?;

Check warning on line 97 in src/network/mod.rs

View workflow job for this annotation

GitHub Actions / build_and_test

use of deprecated type alias `libp2p::libp2p_noise::Keypair`: This type will be made private in the future. Use `libp2p_noise::Config::new` instead to use the noise protocol.

Check warning on line 97 in src/network/mod.rs

View workflow job for this annotation

GitHub Actions / build_and_test

use of deprecated type alias `libp2p::libp2p_noise::X25519Spec`: This type will be made private in the future. Use `libp2p_noise::Config::new` instead to use the noise protocol.
let upgraded_relay_transport = relay_client_transport
.upgrade(Version::V1Lazy)
.authenticate(NoiseConfig::xx(noise_keys).into_authenticated())

Check warning on line 100 in src/network/mod.rs

View workflow job for this annotation

GitHub Actions / build_and_test

use of deprecated struct `libp2p::libp2p_noise::NoiseConfig`: Use `libp2p_noise::Config` instead. All other handshake patterns are deprecated and will be removed.
.multiplex(libp2p::yamux::YamuxConfig::default());

Check warning on line 101 in src/network/mod.rs

View workflow job for this annotation

GitHub Actions / build_and_test

use of deprecated type alias `libp2p::libp2p_yamux::YamuxConfig`: Import the `yamux` module and refer to this type as `yamux::Config` instead.
// relay transport only handles listening and dialing on relayed [`Multiaddr`]
// and depends on other transport to do the actual transmission of data, we have to combine the two
let transport =
Expand Down Expand Up @@ -185,6 +185,7 @@
metrics,
avail_metrics,
cfg.relays,
cfg.bootstrap_interval,
kad_remove_local_record,
),
))
Expand Down
5 changes: 5 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ pub struct RuntimeConfig {
pub identify_agent: String,
/// Vector of Light Client bootstrap nodes, used to bootstrap DHT. If not set, light client acts as a bootstrap node, waiting for first peer to connect for DHT bootstrap (default: empty).
pub bootstraps: Vec<(String, Multiaddr)>,
/// Defines a period of time in which periodic bootstraps will be repeated. (default: 300 sec)
pub bootstrap_period: u64,
/// Vector of Relay nodes, which are used for hole punching
pub relays: Vec<(String, Multiaddr)>,
/// WebSocket endpoint of full node for subscribing to latest header, etc (default: [ws://127.0.0.1:9944]).
Expand Down Expand Up @@ -347,6 +349,7 @@ pub struct LibP2PConfig {
pub kademlia: KademliaConfig,
pub is_relay: bool,
pub relays: Vec<(PeerId, Multiaddr)>,
pub bootstrap_interval: Duration,
}

impl From<&RuntimeConfig> for LibP2PConfig {
Expand All @@ -366,6 +369,7 @@ impl From<&RuntimeConfig> for LibP2PConfig {
kademlia: val.into(),
is_relay: val.relays.is_empty(),
relays: relay_nodes,
bootstrap_interval: Duration::from_secs(val.bootstrap_period),
}
}
}
Expand Down Expand Up @@ -495,6 +499,7 @@ impl Default for RuntimeConfig {
identify_protocol: "/avail_kad/id/1.0.0".to_string(),
identify_agent: "avail-light-client/rust-client".to_string(),
bootstraps: Vec::new(),
bootstrap_period: 300,
relays: Vec::new(),
full_node_ws: vec!["ws://127.0.0.1:9944".to_owned()],
app_id: None,
Expand Down
Loading