From 3c9fc2c1a1e2efc5112751d63e4bbf82212c8e1c Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Tue, 25 Jul 2023 11:39:00 +0700 Subject: [PATCH 1/8] Introduce Kademlia bootstrap operation. --- .../src/bin/subspace-farmer/commands/farm.rs | 16 +++ .../bin/subspace-farmer/commands/farm/dsn.rs | 3 +- .../src/bin/subspace-bootstrap-node/main.rs | 24 ++++- crates/subspace-networking/src/create.rs | 5 + crates/subspace-networking/src/node.rs | 72 ++++++++++++- crates/subspace-networking/src/node_runner.rs | 102 ++++++++++++++++-- crates/subspace-networking/src/shared.rs | 5 + crates/subspace-service/src/dsn.rs | 3 +- crates/subspace-service/src/lib.rs | 18 +++- 9 files changed, 235 insertions(+), 13 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index e569d47ccc..a9d0dad825 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -5,6 +5,7 @@ use crate::commands::shared::print_disk_farm_info; use crate::utils::{get_required_plot_space_with_overhead, shutdown_signal}; use crate::{DiskFarm, FarmingArgs}; use anyhow::{anyhow, Context, Result}; +use futures::future::pending; use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use lru::LruCache; @@ -351,7 +352,22 @@ where )?; let mut networking_fut = Box::pin(networking_fut).fuse(); + let bootstrap_fut = Box::pin({ + let node = node.clone(); + + async move { + if let Err(err) = node.bootstrap().await { + warn!(?err, "DSN bootstrap failed."); + } + + pending::<()>().await; + } + }); + futures::select!( + // Network bootstrapping future + _ = bootstrap_fut.fuse() => {}, + // Signal future _ = signal.fuse() => {}, diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs index 1ee4b49610..e5217d9378 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs @@ -64,7 +64,7 @@ pub(super) fn configure_dsn( let networking_parameters_registry = { let known_addresses_db_path = base_path.join("known_addresses_db"); - NetworkingParametersManager::new(&known_addresses_db_path, bootstrap_nodes) + NetworkingParametersManager::new(&known_addresses_db_path, bootstrap_nodes.clone()) .map(|manager| manager.boxed())? }; @@ -303,6 +303,7 @@ pub(super) fn configure_dsn( special_connected_peers_handler: Arc::new(PeerInfo::is_farmer), // other (non-farmer) connections general_connected_peers_handler: Arc::new(|peer_info| !PeerInfo::is_farmer(peer_info)), + bootstrap_addresses: bootstrap_nodes, ..default_config }; diff --git a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs index 8dd88d42df..4a4a53f427 100644 --- a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs +++ b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs @@ -6,6 +6,8 @@ use anyhow::anyhow; use bytesize::ByteSize; use clap::{Parser, ValueHint}; use either::Either; +use futures::future::pending; +use futures::FutureExt; use libp2p::identity::ed25519::Keypair; use libp2p::{identity, Multiaddr, PeerId}; use serde::{Deserialize, Serialize}; @@ -18,7 +20,7 @@ use subspace_networking::{ peer_id, BootstrappedNetworkingParameters, Config, NetworkingParametersManager, ParityDbProviderStorage, PeerInfoProvider, VoidProviderStorage, }; -use tracing::{debug, info, Level}; +use tracing::{debug, info, warn, Level}; use tracing_subscriber::fmt::Subscriber; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; @@ -206,7 +208,25 @@ async fn main() -> anyhow::Result<()> { .detach(); info!("Subspace Bootstrap Node started"); - node_runner.run().await; + let bootstrap_fut = Box::pin({ + let node = node.clone(); + + async move { + if let Err(err) = node.bootstrap().await { + warn!(?err, "DSN bootstrap failed."); + } + + pending::<()>().await; + } + }); + + futures::select!( + // Network bootstrapping future + _ = bootstrap_fut.fuse() => {}, + + // Networking runner + _ = node_runner.run().fuse() => {}, + ); } Command::GenerateKeypair { json } => { let output = KeypairOutput::new(Keypair::generate()); diff --git a/crates/subspace-networking/src/create.rs b/crates/subspace-networking/src/create.rs index 0826038ebd..9f7fe3d4dc 100644 --- a/crates/subspace-networking/src/create.rs +++ b/crates/subspace-networking/src/create.rs @@ -220,6 +220,8 @@ pub struct Config { pub general_target_connections: u32, /// Defines target total (in and out) connection number that should be maintained for special peers. pub special_target_connections: u32, + /// Addresses to bootstrap Kademlia network + pub bootstrap_addresses: Vec, } impl fmt::Debug for Config { @@ -330,6 +332,7 @@ where special_connected_peers_handler: Arc::new(|_| false), general_target_connections: SWARM_TARGET_CONNECTION_NUMBER, special_target_connections: SWARM_TARGET_CONNECTION_NUMBER, + bootstrap_addresses: Vec::new(), } } } @@ -392,6 +395,7 @@ where special_connected_peers_handler: special_connection_decision_handler, general_target_connections, special_target_connections, + bootstrap_addresses, } = config; let local_peer_id = peer_id(&keypair); @@ -498,6 +502,7 @@ where protocol_version, general_connection_decision_handler, special_connection_decision_handler, + bootstrap_addresses, }); Ok((node, node_runner)) diff --git a/crates/subspace-networking/src/node.rs b/crates/subspace-networking/src/node.rs index 0f40919949..6ed656b861 100644 --- a/crates/subspace-networking/src/node.rs +++ b/crates/subspace-networking/src/node.rs @@ -6,7 +6,7 @@ use bytes::Bytes; use event_listener_primitives::HandlerId; use futures::channel::mpsc::SendError; use futures::channel::{mpsc, oneshot}; -use futures::{SinkExt, Stream}; +use futures::{SinkExt, Stream, StreamExt}; use libp2p::core::multihash::Multihash; use libp2p::gossipsub::{Sha256Topic, SubscriptionError}; use libp2p::kad::record::Key; @@ -19,7 +19,9 @@ use std::task::{Context, Poll}; use std::time::Duration; use thiserror::Error; use tokio::time::sleep; -use tracing::{error, trace}; +use tracing::{debug, error, trace}; + +const BOOTSTRAP_CHECK_DELAY: Duration = Duration::from_secs(1); /// Topic subscription, will unsubscribe when last instance is dropped for a particular topic. #[derive(Debug)] @@ -275,6 +277,26 @@ impl From for ConnectedPeersError { } } +#[derive(Debug, Error)] +pub enum BootstrapError { + /// Failed to send command to the node runner + #[error("Failed to send command to the node runner: {0}")] + SendCommand(#[from] SendError), + /// Node runner was dropped + #[error("Node runner was dropped")] + NodeRunnerDropped, + /// Failed to bootstrap a peer. + #[error("Failed to bootstrap a peer.")] + Bootstrap, +} + +impl From for BootstrapError { + #[inline] + fn from(oneshot::Canceled: oneshot::Canceled) -> Self { + Self::NodeRunnerDropped + } +} + /// Implementation of a network node on Subspace Network. #[derive(Debug, Clone)] #[must_use = "Node doesn't do anything if dropped"] @@ -297,6 +319,7 @@ impl Node { &self, key: Multihash, ) -> Result, GetValueError> { + self.wait_for_bootstrap().await; let permit = self.shared.kademlia_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = mpsc::unbounded(); @@ -320,6 +343,7 @@ impl Node { key: Multihash, value: Vec, ) -> Result, PutValueError> { + self.wait_for_bootstrap().await; let permit = self.shared.kademlia_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = mpsc::unbounded(); @@ -340,6 +364,7 @@ impl Node { /// Subcribe to some topic on the DSN. pub async fn subscribe(&self, topic: Sha256Topic) -> Result { + self.wait_for_bootstrap().await; let permit = self.shared.regular_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = oneshot::channel(); @@ -368,6 +393,7 @@ impl Node { /// Subcribe a messgo to some topic on the DSN. pub async fn publish(&self, topic: Sha256Topic, message: Vec) -> Result<(), PublishError> { + self.wait_for_bootstrap().await; let _permit = self.shared.regular_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = oneshot::channel(); @@ -393,6 +419,7 @@ impl Node { where Request: GenericRequest, { + self.wait_for_bootstrap().await; let _permit = self.shared.regular_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = oneshot::channel(); let command = Command::GenericRequest { @@ -414,6 +441,7 @@ impl Node { &self, key: Multihash, ) -> Result, GetClosestPeersError> { + self.wait_for_bootstrap().await; let permit = self.shared.kademlia_tasks_semaphore.acquire().await; trace!(?key, "Starting 'GetClosestPeers' request."); @@ -515,6 +543,7 @@ impl Node { &self, key: Multihash, ) -> Result, GetProvidersError> { + self.wait_for_bootstrap().await; let permit = self.shared.kademlia_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = mpsc::unbounded(); @@ -598,6 +627,31 @@ impl Node { .map_err(|_| ConnectedPeersError::ConnectedPeers) } + /// Bootstraps Kademlia network + pub async fn bootstrap(&self) -> Result<(), BootstrapError> { + let (result_sender, mut result_receiver) = mpsc::unbounded(); + + debug!("Starting 'bootstrap' request."); + + self.shared + .command_sender + .clone() + .send(Command::Bootstrap { result_sender }) + .await?; + + for step in 0.. { + let result = result_receiver.next().await; + + if result.is_some() { + debug!(%step, "Kademlia bootstrapping..."); + } else { + break; + } + } + + Ok(()) + } + /// Callback is called when we receive new [`crate::peer_info::PeerInfo`] pub fn on_peer_info(&self, callback: HandlerFn) -> HandlerId { self.shared.handlers.new_peer_info.add(callback) @@ -611,5 +665,19 @@ impl Node { /// Callback is called when a peer is connected. pub fn on_connected_peer(&self, callback: HandlerFn) -> HandlerId { self.shared.handlers.connected_peer.add(callback) + } + + pub(crate) async fn wait_for_bootstrap(&self) { + loop { + let was_bootstrapped = self.shared.bootstrap_finished.lock().to_owned(); + + if was_bootstrapped { + return; + } else { + trace!("Waiting for bootstrap..."); + + sleep(BOOTSTRAP_CHECK_DELAY).await; + } + } } } diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index 725b9a525b..4bc4406a2b 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -13,19 +13,21 @@ use crate::create::{ use crate::peer_info::{Event as PeerInfoEvent, PeerInfoSuccess}; use crate::request_responses::{Event as RequestResponseEvent, IfDisconnected}; use crate::shared::{Command, CreatedSubscription, NewPeerInfo, Shared}; -use crate::utils::{is_global_address_or_dns, PeerAddress, ResizableSemaphorePermit}; +use crate::utils::{ + convert_multiaddresses, is_global_address_or_dns, PeerAddress, ResizableSemaphorePermit, +}; use bytes::Bytes; use futures::channel::mpsc; use futures::future::Fuse; -use futures::{FutureExt, StreamExt}; +use futures::{FutureExt, SinkExt, StreamExt}; use libp2p::core::ConnectedPoint; use libp2p::gossipsub::{Event as GossipsubEvent, TopicHash}; use libp2p::identify::Event as IdentifyEvent; use libp2p::kad::store::RecordStore; use libp2p::kad::{ - GetClosestPeersError, GetClosestPeersOk, GetProvidersError, GetProvidersOk, GetRecordError, - GetRecordOk, InboundRequest, Kademlia, KademliaEvent, PeerRecord, ProgressStep, ProviderRecord, - PutRecordOk, QueryId, QueryResult, Quorum, Record, + BootstrapOk, GetClosestPeersError, GetClosestPeersOk, GetProvidersError, GetProvidersOk, + GetRecordError, GetRecordOk, InboundRequest, Kademlia, KademliaEvent, PeerRecord, ProgressStep, + ProviderRecord, PutRecordOk, QueryId, QueryResult, Quorum, Record, }; use libp2p::metrics::{Metrics, Recorder}; use libp2p::swarm::{DialError, SwarmEvent}; @@ -43,7 +45,7 @@ use std::sync::atomic::Ordering; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; use tokio::time::Sleep; -use tracing::{debug, error, trace, warn}; +use tracing::{debug, error, info, trace, warn}; // Defines a batch size for peer addresses from Kademlia buckets. const KADEMLIA_PEERS_ADDRESSES_BATCH_SIZE: usize = 20; @@ -78,6 +80,9 @@ enum QueryResultSender { // Just holding onto permit while data structure is not dropped _permit: ResizableSemaphorePermit, }, + Bootstrap { + sender: mpsc::UnboundedSender<()>, + }, } /// Runner for the Node. @@ -121,6 +126,8 @@ where special_connection_decision_handler: ConnectedPeersHandler, /// Randomness generator used for choosing Kademlia addresses. rng: StdRng, + /// Addresses to bootstrap Kademlia network + bootstrap_addresses: Vec, } // Helper struct for NodeRunner configuration (clippy requirement). @@ -140,6 +147,7 @@ where pub(crate) protocol_version: String, pub(crate) general_connection_decision_handler: ConnectedPeersHandler, pub(crate) special_connection_decision_handler: ConnectedPeersHandler, + pub(crate) bootstrap_addresses: Vec, } impl NodeRunner @@ -160,6 +168,7 @@ where protocol_version, general_connection_decision_handler, special_connection_decision_handler, + bootstrap_addresses, }: NodeRunnerConfig, ) -> Self { Self { @@ -184,6 +193,7 @@ where general_connection_decision_handler, special_connection_decision_handler, rng: StdRng::seed_from_u64(KADEMLIA_PEERS_ADDRESSES_BATCH_SIZE as u64), // any seed + bootstrap_addresses, } } @@ -774,10 +784,65 @@ where self.query_id_receivers.remove(&id); } } + KademliaEvent::OutboundQueryProgressed { + step: ProgressStep { last, .. }, + id, + result: QueryResult::Bootstrap(result), + .. + } => { + let mut cancelled = false; + if let Some(QueryResultSender::Bootstrap { sender }) = + self.query_id_receivers.get_mut(&id) + { + match result { + Ok(BootstrapOk { + peer, + num_remaining, + }) => { + trace!(%peer, %num_remaining, %last, "Bootstrap query step succeeded"); + + cancelled = Self::unbounded_send_and_cancel_on_error( + &mut self.swarm.behaviour_mut().kademlia, + sender, + (), + "Bootstrap", + &id, + ) || cancelled; + } + Err(error) => { + debug!(?error, "Bootstrap query failed.",); + + self.set_bootstrap_finished(false); + } + } + } + + if last || cancelled { + // There will be no more progress + self.query_id_receivers.remove(&id); + + if last { + self.set_bootstrap_finished(true); + } + + if cancelled { + self.set_bootstrap_finished(false); + } + } + } _ => {} } } + fn set_bootstrap_finished(&mut self, success: bool) { + if let Some(shared) = self.shared_weak.upgrade() { + let mut bootstrap_finished = shared.bootstrap_finished.lock(); + *bootstrap_finished = true; + + info!(%success, "Bootstrap finished.",); + } + } + // Returns `true` if query was cancelled fn unbounded_send_and_cancel_on_error( kademlia: &mut Kademlia>, @@ -1137,6 +1202,31 @@ where let _ = result_sender.send(connected_peers); } + Command::Bootstrap { mut result_sender } => { + let kademlia = &mut self.swarm.behaviour_mut().kademlia; + + for (peer_id, address) in convert_multiaddresses(self.bootstrap_addresses.clone()) { + kademlia.add_address(&peer_id, address); + } + + match kademlia.bootstrap() { + Ok(query_id) => { + self.query_id_receivers.insert( + query_id, + QueryResultSender::Bootstrap { + sender: result_sender, + }, + ); + } + Err(err) => { + debug!(?err, "Bootstrap error."); + + let _ = result_sender.close().await; + + self.set_bootstrap_finished(false); + } + } + } } } diff --git a/crates/subspace-networking/src/shared.rs b/crates/subspace-networking/src/shared.rs index c53a0cde11..489e7cec0e 100644 --- a/crates/subspace-networking/src/shared.rs +++ b/crates/subspace-networking/src/shared.rs @@ -86,6 +86,9 @@ pub(crate) enum Command { ConnectedPeers { result_sender: oneshot::Sender>, }, + Bootstrap { + result_sender: mpsc::UnboundedSender<()>, + }, } pub(crate) type HandlerFn = Arc; @@ -123,6 +126,7 @@ pub(crate) struct Shared { pub(crate) command_sender: mpsc::Sender, pub(crate) kademlia_tasks_semaphore: ResizableSemaphore, pub(crate) regular_tasks_semaphore: ResizableSemaphore, + pub(crate) bootstrap_finished: Mutex, } impl Shared { @@ -141,6 +145,7 @@ impl Shared { command_sender, kademlia_tasks_semaphore, regular_tasks_semaphore, + bootstrap_finished: Mutex::new(false), } } } diff --git a/crates/subspace-service/src/dsn.rs b/crates/subspace-service/src/dsn.rs index 5deb275652..dc2effd46e 100644 --- a/crates/subspace-service/src/dsn.rs +++ b/crates/subspace-service/src/dsn.rs @@ -118,7 +118,7 @@ where .map(|manager| manager.boxed()) }) .unwrap_or(Ok(BootstrappedNetworkingParameters::new( - dsn_config.bootstrap_nodes, + dsn_config.bootstrap_nodes.clone(), ) .boxed()))? }; @@ -241,6 +241,7 @@ where reserved_peers: dsn_config.reserved_peers, // maintain permanent connections with any peer general_connected_peers_handler: Arc::new(|_| true), + bootstrap_addresses: dsn_config.bootstrap_nodes.clone(), ..default_networking_config }; diff --git a/crates/subspace-service/src/lib.rs b/crates/subspace-service/src/lib.rs index bfb65b2fe4..29984f72e6 100644 --- a/crates/subspace-service/src/lib.rs +++ b/crates/subspace-service/src/lib.rs @@ -87,7 +87,7 @@ use subspace_runtime_primitives::opaque::Block; use subspace_runtime_primitives::{AccountId, Balance, Hash, Index as Nonce}; use subspace_transaction_pool::bundle_validator::BundleValidator; use subspace_transaction_pool::{FullPool, PreValidateTransaction}; -use tracing::{debug, error, info, Instrument}; +use tracing::{debug, error, info, warn, Instrument}; /// Error type for Subspace service. #[derive(thiserror::Error, Debug)] @@ -652,6 +652,22 @@ where ), ); + task_manager.spawn_handle().spawn( + "node-runner", + Some("subspace-networking-bootstrapping"), + Box::pin( + { + let node = node.clone(); + async move { + if let Err(err) = node.bootstrap().await { + warn!(?err, "DSN bootstrap failed."); + } + } + } + .in_current_span(), + ), + ); + (node, dsn_config.bootstrap_nodes, Some(piece_cache)) } }; From e26c6d3934d049bae71a31a7d5cca8188ff140ce Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Tue, 25 Jul 2023 15:46:00 +0700 Subject: [PATCH 2/8] Remove bootstrap-nodes from redialing. --- .../bin/subspace-farmer/commands/farm/dsn.rs | 3 +- .../examples/get-peers-complex.rs | 18 +++---- .../subspace-networking/examples/get-peers.rs | 9 ++-- .../examples/networking.rs | 9 ++-- .../subspace-networking/examples/requests.rs | 11 ++--- .../src/behavior/persistent_parameters.rs | 49 ++++--------------- .../subspace-networking/src/behavior/tests.rs | 9 ++-- .../src/bin/subspace-bootstrap-node/main.rs | 17 +++---- crates/subspace-networking/src/create.rs | 4 +- crates/subspace-networking/src/lib.rs | 4 +- crates/subspace-networking/src/node.rs | 2 +- crates/subspace-networking/src/node_runner.rs | 4 +- .../src/import_blocks_from_dsn.rs | 8 ++- crates/subspace-service/src/dsn.rs | 20 +++----- 14 files changed, 59 insertions(+), 108 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs index e5217d9378..973a6956e2 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs @@ -64,8 +64,7 @@ pub(super) fn configure_dsn( let networking_parameters_registry = { let known_addresses_db_path = base_path.join("known_addresses_db"); - NetworkingParametersManager::new(&known_addresses_db_path, bootstrap_nodes.clone()) - .map(|manager| manager.boxed())? + NetworkingParametersManager::new(&known_addresses_db_path).map(|manager| manager.boxed())? }; let weak_readers_and_pieces = Arc::downgrade(readers_and_pieces); diff --git a/crates/subspace-networking/examples/get-peers-complex.rs b/crates/subspace-networking/examples/get-peers-complex.rs index ae63ddebb9..e7affffeae 100644 --- a/crates/subspace-networking/examples/get-peers-complex.rs +++ b/crates/subspace-networking/examples/get-peers-complex.rs @@ -7,7 +7,7 @@ use libp2p::PeerId; use parking_lot::Mutex; use std::sync::Arc; use std::time::Duration; -use subspace_networking::{BootstrappedNetworkingParameters, Config, NetworkingParametersManager}; +use subspace_networking::{Config, NetworkingParametersManager, StubNetworkingParametersManager}; #[tokio::main] async fn main() { @@ -23,12 +23,10 @@ async fn main() { let mut nodes = Vec::with_capacity(TOTAL_NODE_COUNT); for i in 0..TOTAL_NODE_COUNT { let config = Config { - networking_parameters_registry: BootstrappedNetworkingParameters::new( - bootstrap_nodes.clone(), - ) - .boxed(), + networking_parameters_registry: StubNetworkingParametersManager.boxed(), listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()], allow_non_global_addresses_in_dht: true, + bootstrap_addresses: bootstrap_nodes.clone(), ..Config::default() }; let keypair = config.keypair.clone().try_into_ed25519().unwrap(); @@ -83,12 +81,10 @@ async fn main() { let config = Config { listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()], allow_non_global_addresses_in_dht: true, - networking_parameters_registry: NetworkingParametersManager::new( - db_path.as_ref(), - bootstrap_nodes, - ) - .unwrap() - .boxed(), + networking_parameters_registry: NetworkingParametersManager::new(db_path.as_ref()) + .unwrap() + .boxed(), + bootstrap_addresses: bootstrap_nodes, ..Config::default() }; diff --git a/crates/subspace-networking/examples/get-peers.rs b/crates/subspace-networking/examples/get-peers.rs index 953cfcc2b5..2f736a86e2 100644 --- a/crates/subspace-networking/examples/get-peers.rs +++ b/crates/subspace-networking/examples/get-peers.rs @@ -6,7 +6,7 @@ use parking_lot::Mutex; use std::sync::Arc; use std::time::Duration; use subspace_core_primitives::{crypto, PieceIndexHash, U256}; -use subspace_networking::{BootstrappedNetworkingParameters, Config}; +use subspace_networking::{Config, StubNetworkingParametersManager}; #[tokio::main] async fn main() { @@ -42,13 +42,12 @@ async fn main() { let node_1_addr = node_1_address_receiver.await.unwrap(); drop(on_new_listener_handler); + let bootstrap_addresses = vec![node_1_addr.with(Protocol::P2p(node_1.id().into()))]; let config_2 = Config { - networking_parameters_registry: BootstrappedNetworkingParameters::new(vec![ - node_1_addr.with(Protocol::P2p(node_1.id().into())) - ]) - .boxed(), + networking_parameters_registry: StubNetworkingParametersManager.boxed(), listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()], allow_non_global_addresses_in_dht: true, + bootstrap_addresses, ..Config::default() }; diff --git a/crates/subspace-networking/examples/networking.rs b/crates/subspace-networking/examples/networking.rs index 98b66edc4c..0f23f5619e 100644 --- a/crates/subspace-networking/examples/networking.rs +++ b/crates/subspace-networking/examples/networking.rs @@ -7,7 +7,7 @@ use libp2p::multiaddr::Protocol; use parking_lot::Mutex; use std::sync::Arc; use std::time::Duration; -use subspace_networking::{BootstrappedNetworkingParameters, Config}; +use subspace_networking::{Config, StubNetworkingParametersManager}; const TOPIC: &str = "Foo"; @@ -47,13 +47,12 @@ async fn main() { let mut subscription = node_1.subscribe(Sha256Topic::new(TOPIC)).await.unwrap(); + let bootstrap_addresses = vec![node_1_addr.with(Protocol::P2p(node_1.id().into()))]; let config_2 = Config { - networking_parameters_registry: BootstrappedNetworkingParameters::new(vec![ - node_1_addr.with(Protocol::P2p(node_1.id().into())) - ]) - .boxed(), + networking_parameters_registry: StubNetworkingParametersManager.boxed(), listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()], allow_non_global_addresses_in_dht: true, + bootstrap_addresses, ..Config::default() }; diff --git a/crates/subspace-networking/examples/requests.rs b/crates/subspace-networking/examples/requests.rs index 5e44f454ca..4d5b4edba7 100644 --- a/crates/subspace-networking/examples/requests.rs +++ b/crates/subspace-networking/examples/requests.rs @@ -7,8 +7,8 @@ use prometheus_client::registry::Registry; use std::sync::Arc; use std::time::Duration; use subspace_networking::{ - start_prometheus_metrics_server, BootstrappedNetworkingParameters, Config, GenericRequest, - GenericRequestHandler, + start_prometheus_metrics_server, Config, GenericRequest, GenericRequestHandler, + StubNetworkingParametersManager, }; use tokio::time::sleep; use tracing::error; @@ -86,16 +86,15 @@ async fn main() { let node_1_addr = node_1_address_receiver.await.unwrap(); drop(on_new_listener_handler); + let bootstrap_addresses = vec![node_1_addr.with(Protocol::P2p(node_1.id().into()))]; let config_2 = Config { - networking_parameters_registry: BootstrappedNetworkingParameters::new(vec![ - node_1_addr.with(Protocol::P2p(node_1.id().into())) - ]) - .boxed(), + networking_parameters_registry: StubNetworkingParametersManager.boxed(), listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()], allow_non_global_addresses_in_dht: true, request_response_protocols: vec![GenericRequestHandler::::create( |_, _| async { None }, )], + bootstrap_addresses, ..Config::default() }; diff --git a/crates/subspace-networking/src/behavior/persistent_parameters.rs b/crates/subspace-networking/src/behavior/persistent_parameters.rs index b4de534fb9..1387fad5b6 100644 --- a/crates/subspace-networking/src/behavior/persistent_parameters.rs +++ b/crates/subspace-networking/src/behavior/persistent_parameters.rs @@ -1,4 +1,4 @@ -use crate::utils::{convert_multiaddresses, CollectionBatcher, PeerAddress}; +use crate::utils::{CollectionBatcher, PeerAddress}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use futures::future::Fuse; @@ -70,32 +70,19 @@ impl Clone for Box { } } -/// Networking manager implementation with bootstrapped addresses. All other operations muted. +/// Networking manager implementation with NOOP implementation. #[derive(Clone, Default)] -pub struct BootstrappedNetworkingParameters { - bootstrap_addresses: Vec, -} - -impl BootstrappedNetworkingParameters { - /// Creates a new instance of `BootstrappedNetworkingParameters`. - pub fn new(bootstrap_addresses: Vec) -> Self { - Self { - bootstrap_addresses, - } - } +pub struct StubNetworkingParametersManager; - fn bootstrap_addresses(&self) -> Vec { - convert_multiaddresses(self.bootstrap_addresses.clone()) - } - - /// Returns an instance of `BootstrappedNetworkingParameters` as the `Box` reference. +impl StubNetworkingParametersManager { + /// Returns an instance of `StubNetworkingParametersManager` as the `Box` reference. pub fn boxed(self) -> Box { Box::new(self) } } #[async_trait] -impl NetworkingParametersRegistry for BootstrappedNetworkingParameters { +impl NetworkingParametersRegistry for StubNetworkingParametersManager { async fn add_known_peer(&mut self, _: PeerId, _: Vec) {} async fn remove_known_peer_addresses(&mut self, _peer_id: PeerId, _addresses: Vec) {} @@ -103,7 +90,7 @@ impl NetworkingParametersRegistry for BootstrappedNetworkingParameters { async fn remove_all_known_peer_addresses(&mut self, _peer_id: PeerId) {} async fn next_known_addresses_batch(&mut self) -> Vec { - self.bootstrap_addresses() + Vec::new() } async fn run(&mut self) { @@ -141,8 +128,6 @@ pub struct NetworkingParametersManager { column_id: u8, // Key to persistent parameters object_id: &'static [u8], - // Bootstrap addresses provided on creation - bootstrap_addresses: Vec, // Provides batching capabilities for the address collection (it stores the last batch index) collection_batcher: CollectionBatcher, } @@ -150,10 +135,7 @@ pub struct NetworkingParametersManager { impl NetworkingParametersManager { /// Object constructor. It accepts `NetworkingParametersProvider` implementation as a parameter. /// On object creation it starts a job for networking parameters cache handling. - pub fn new( - path: &Path, - bootstrap_addresses: Vec, - ) -> Result { + pub fn new(path: &Path) -> Result { let mut options = Options::with_columns(path, 1); // We don't use stats options.stats = false; @@ -184,7 +166,6 @@ impl NetworkingParametersManager { object_id, known_peers: cache, networking_parameters_save_delay: Self::default_delay(), - bootstrap_addresses, collection_batcher: CollectionBatcher::new( NonZeroUsize::new(PEERS_ADDRESSES_BATCH_SIZE) .expect("Manual non-zero initialization failed."), @@ -202,12 +183,6 @@ impl NetworkingParametersManager { .collect() } - // Returns boostrap addresses from networking parameters initialization. - // It removes p2p-protocol suffix. - fn bootstrap_addresses(&self) -> Vec { - convert_multiaddresses(self.bootstrap_addresses.clone()) - } - // Helps create a copy of the internal LruCache fn clone_known_peers(&self) -> LruCache> { let mut known_peers = LruCache::new(self.known_peers.cap()); @@ -304,12 +279,7 @@ impl NetworkingParametersRegistry for NetworkingParametersManager { async fn next_known_addresses_batch(&mut self) -> Vec { // We take cached known addresses and combine them with manually provided bootstrap addresses. - let combined_addresses = self - .known_addresses() - .await - .into_iter() - .chain(self.bootstrap_addresses()) - .collect::>(); + let combined_addresses = self.known_addresses().await.into_iter().collect::>(); trace!( "Peer addresses batch requested. Total list size: {}", @@ -359,7 +329,6 @@ impl NetworkingParametersRegistry for NetworkingParametersManager { db: self.db.clone(), column_id: self.column_id, object_id: self.object_id, - bootstrap_addresses: self.bootstrap_addresses.clone(), collection_batcher: self.collection_batcher.clone(), } .boxed() diff --git a/crates/subspace-networking/src/behavior/tests.rs b/crates/subspace-networking/src/behavior/tests.rs index 6fc48e5e34..f7743dd823 100644 --- a/crates/subspace-networking/src/behavior/tests.rs +++ b/crates/subspace-networking/src/behavior/tests.rs @@ -1,6 +1,6 @@ use super::persistent_parameters::remove_known_peer_addresses_internal; use crate::behavior::provider_storage::{instant_to_micros, micros_to_instant}; -use crate::{BootstrappedNetworkingParameters, Config, GenericRequest, GenericRequestHandler}; +use crate::{Config, GenericRequest, GenericRequestHandler, StubNetworkingParametersManager}; use futures::channel::oneshot; use libp2p::multiaddr::Protocol; use libp2p::{Multiaddr, PeerId}; @@ -159,16 +159,15 @@ async fn test_async_handler_works_with_pending_internal_future() { let node_1_addr = node_1_address_receiver.await.unwrap(); drop(on_new_listener_handler); + let bootstrap_addresses = vec![node_1_addr.with(Protocol::P2p(node_1.id().into()))]; let config_2 = Config { - networking_parameters_registry: BootstrappedNetworkingParameters::new(vec![ - node_1_addr.with(Protocol::P2p(node_1.id().into())) - ]) - .boxed(), + networking_parameters_registry: StubNetworkingParametersManager.boxed(), listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()], allow_non_global_addresses_in_dht: true, request_response_protocols: vec![GenericRequestHandler::::create( |_, _| async { None }, )], + bootstrap_addresses, ..Config::default() }; diff --git a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs index 4a4a53f427..5e25762672 100644 --- a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs +++ b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs @@ -17,8 +17,8 @@ use std::path::PathBuf; use std::sync::Arc; use subspace_networking::libp2p::multiaddr::Protocol; use subspace_networking::{ - peer_id, BootstrappedNetworkingParameters, Config, NetworkingParametersManager, - ParityDbProviderStorage, PeerInfoProvider, VoidProviderStorage, + peer_id, Config, NetworkingParametersManager, ParityDbProviderStorage, PeerInfoProvider, + StubNetworkingParametersManager, VoidProviderStorage, }; use tracing::{debug, info, warn, Level}; use tracing_subscriber::fmt::Subscriber; @@ -161,15 +161,10 @@ async fn main() -> anyhow::Result<()> { .map(|path| { let known_addresses_db = path.join("known_addresses_db"); - NetworkingParametersManager::new( - &known_addresses_db, - bootstrap_nodes.clone(), - ) - .map(|manager| manager.boxed()) + NetworkingParametersManager::new(&known_addresses_db) + .map(|manager| manager.boxed()) }) - .unwrap_or(Ok( - BootstrappedNetworkingParameters::new(bootstrap_nodes).boxed() - )) + .unwrap_or(Ok(StubNetworkingParametersManager.boxed())) .map_err(|err| anyhow!(err))? }; @@ -185,6 +180,8 @@ async fn main() -> anyhow::Result<()> { // we don't maintain permanent connections with any peer general_connected_peers_handler: Arc::new(|_| false), special_connected_peers_handler: Arc::new(|_| false), + bootstrap_addresses: bootstrap_nodes, + ..Config::new( protocol_version.to_string(), keypair, diff --git a/crates/subspace-networking/src/create.rs b/crates/subspace-networking/src/create.rs index 9f7fe3d4dc..9bfc8f27f1 100644 --- a/crates/subspace-networking/src/create.rs +++ b/crates/subspace-networking/src/create.rs @@ -2,7 +2,7 @@ pub(crate) mod temporary_bans; mod transport; use crate::behavior::persistent_parameters::{ - BootstrappedNetworkingParameters, NetworkingParametersRegistry, + NetworkingParametersRegistry, StubNetworkingParametersManager, }; use crate::behavior::provider_storage::MemoryProviderStorage; use crate::behavior::{provider_storage, Behavior, BehaviorConfig}; @@ -313,7 +313,7 @@ where provider_storage, allow_non_global_addresses_in_dht: false, initial_random_query_interval: Duration::from_secs(1), - networking_parameters_registry: BootstrappedNetworkingParameters::default().boxed(), + networking_parameters_registry: StubNetworkingParametersManager.boxed(), request_response_protocols: Vec::new(), yamux_config, reserved_peers: Vec::new(), diff --git a/crates/subspace-networking/src/lib.rs b/crates/subspace-networking/src/lib.rs index 8e8f002748..a6e339ba67 100644 --- a/crates/subspace-networking/src/lib.rs +++ b/crates/subspace-networking/src/lib.rs @@ -31,8 +31,8 @@ mod shared; pub mod utils; pub use crate::behavior::persistent_parameters::{ - BootstrappedNetworkingParameters, NetworkParametersPersistenceError, - NetworkingParametersManager, ParityDbError, + NetworkParametersPersistenceError, NetworkingParametersManager, ParityDbError, + StubNetworkingParametersManager, }; pub use crate::node::{ GetClosestPeersError, Node, SendRequestError, SubscribeError, TopicSubscription, diff --git a/crates/subspace-networking/src/node.rs b/crates/subspace-networking/src/node.rs index 6ed656b861..1637d2558a 100644 --- a/crates/subspace-networking/src/node.rs +++ b/crates/subspace-networking/src/node.rs @@ -665,7 +665,7 @@ impl Node { /// Callback is called when a peer is connected. pub fn on_connected_peer(&self, callback: HandlerFn) -> HandlerId { self.shared.handlers.connected_peer.add(callback) - } + } pub(crate) async fn wait_for_bootstrap(&self) { loop { diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index 4bc4406a2b..1432b962f0 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -45,7 +45,7 @@ use std::sync::atomic::Ordering; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; use tokio::time::Sleep; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, trace, warn}; // Defines a batch size for peer addresses from Kademlia buckets. const KADEMLIA_PEERS_ADDRESSES_BATCH_SIZE: usize = 20; @@ -839,7 +839,7 @@ where let mut bootstrap_finished = shared.bootstrap_finished.lock(); *bootstrap_finished = true; - info!(%success, "Bootstrap finished.",); + debug!(%success, "Bootstrap finished.",); } } diff --git a/crates/subspace-node/src/import_blocks_from_dsn.rs b/crates/subspace-node/src/import_blocks_from_dsn.rs index a23f2d8ceb..223347b363 100644 --- a/crates/subspace-node/src/import_blocks_from_dsn.rs +++ b/crates/subspace-node/src/import_blocks_from_dsn.rs @@ -22,7 +22,7 @@ use sp_core::traits::SpawnEssentialNamed; use sp_runtime::traits::Block as BlockT; use std::sync::Arc; use subspace_networking::libp2p::Multiaddr; -use subspace_networking::{BootstrappedNetworkingParameters, Config, PieceByHashRequestHandler}; +use subspace_networking::{Config, PieceByHashRequestHandler, StubNetworkingParametersManager}; use subspace_service::dsn::import_blocks::initial_block_import_from_dsn; /// The `import-blocks-from-network` command used to import blocks from Subspace Network DSN. @@ -61,14 +61,12 @@ impl ImportBlocksFromDsnCmd { IQ: sc_service::ImportQueue + 'static, { let (node, mut node_runner) = subspace_networking::create(Config { - networking_parameters_registry: BootstrappedNetworkingParameters::new( - self.bootstrap_node.clone(), - ) - .boxed(), + networking_parameters_registry: StubNetworkingParametersManager.boxed(), allow_non_global_addresses_in_dht: true, request_response_protocols: vec![PieceByHashRequestHandler::create( move |_, _| async { None }, )], + bootstrap_addresses: self.bootstrap_node.clone(), ..Config::default() }) .map_err(|error| sc_service::Error::Other(error.to_string()))?; diff --git a/crates/subspace-service/src/dsn.rs b/crates/subspace-service/src/dsn.rs index dc2effd46e..8ecc1c1dc8 100644 --- a/crates/subspace-service/src/dsn.rs +++ b/crates/subspace-service/src/dsn.rs @@ -15,12 +15,12 @@ use subspace_core_primitives::{SegmentHeader, SegmentIndex}; use subspace_networking::libp2p::kad::ProviderRecord; use subspace_networking::libp2p::{identity, Multiaddr}; use subspace_networking::{ - peer_id, BootstrappedNetworkingParameters, CreationError, MemoryProviderStorage, - NetworkParametersPersistenceError, NetworkingParametersManager, Node, NodeRunner, - ParityDbError, ParityDbProviderStorage, PeerInfoProvider, PieceAnnouncementRequestHandler, - PieceAnnouncementResponse, PieceByHashRequestHandler, PieceByHashResponse, ProviderStorage, + peer_id, CreationError, MemoryProviderStorage, NetworkParametersPersistenceError, + NetworkingParametersManager, Node, NodeRunner, ParityDbError, ParityDbProviderStorage, + PeerInfoProvider, PieceAnnouncementRequestHandler, PieceAnnouncementResponse, + PieceByHashRequestHandler, PieceByHashResponse, ProviderStorage, SegmentHeaderBySegmentIndexesRequestHandler, SegmentHeaderRequest, SegmentHeaderResponse, - KADEMLIA_PROVIDER_TTL_IN_SECS, + StubNetworkingParametersManager, KADEMLIA_PROVIDER_TTL_IN_SECS, }; use thiserror::Error; use tracing::{debug, error, trace}; @@ -114,13 +114,9 @@ where .map(|path| { let db_path = path.join("known_addresses_db"); - NetworkingParametersManager::new(&db_path, dsn_config.bootstrap_nodes.clone()) - .map(|manager| manager.boxed()) + NetworkingParametersManager::new(&db_path).map(|manager| manager.boxed()) }) - .unwrap_or(Ok(BootstrappedNetworkingParameters::new( - dsn_config.bootstrap_nodes.clone(), - ) - .boxed()))? + .unwrap_or(Ok(StubNetworkingParametersManager.boxed()))? }; let provider_storage = @@ -241,7 +237,7 @@ where reserved_peers: dsn_config.reserved_peers, // maintain permanent connections with any peer general_connected_peers_handler: Arc::new(|_| true), - bootstrap_addresses: dsn_config.bootstrap_nodes.clone(), + bootstrap_addresses: dsn_config.bootstrap_nodes, ..default_networking_config }; From 8f53b2150e98e2ecb02a6734b6e36c0052474d92 Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Wed, 26 Jul 2023 13:47:24 +0700 Subject: [PATCH 3/8] networking: Refactor bootstrapping. --- .../examples/get-peers-complex.rs | 3 +-- crates/subspace-networking/examples/get-peers.rs | 3 +-- crates/subspace-networking/examples/networking.rs | 3 +-- crates/subspace-networking/examples/requests.rs | 2 -- crates/subspace-networking/src/behavior/tests.rs | 15 +++++++++++++++ crates/subspace-networking/src/node.rs | 3 ++- crates/subspace-networking/src/node_runner.rs | 3 +-- crates/subspace-networking/src/shared.rs | 6 +++--- 8 files changed, 24 insertions(+), 14 deletions(-) diff --git a/crates/subspace-networking/examples/get-peers-complex.rs b/crates/subspace-networking/examples/get-peers-complex.rs index e7affffeae..bf23e4e12b 100644 --- a/crates/subspace-networking/examples/get-peers-complex.rs +++ b/crates/subspace-networking/examples/get-peers-complex.rs @@ -7,7 +7,7 @@ use libp2p::PeerId; use parking_lot::Mutex; use std::sync::Arc; use std::time::Duration; -use subspace_networking::{Config, NetworkingParametersManager, StubNetworkingParametersManager}; +use subspace_networking::{Config, NetworkingParametersManager}; #[tokio::main] async fn main() { @@ -23,7 +23,6 @@ async fn main() { let mut nodes = Vec::with_capacity(TOTAL_NODE_COUNT); for i in 0..TOTAL_NODE_COUNT { let config = Config { - networking_parameters_registry: StubNetworkingParametersManager.boxed(), listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()], allow_non_global_addresses_in_dht: true, bootstrap_addresses: bootstrap_nodes.clone(), diff --git a/crates/subspace-networking/examples/get-peers.rs b/crates/subspace-networking/examples/get-peers.rs index 2f736a86e2..554aa00f08 100644 --- a/crates/subspace-networking/examples/get-peers.rs +++ b/crates/subspace-networking/examples/get-peers.rs @@ -6,7 +6,7 @@ use parking_lot::Mutex; use std::sync::Arc; use std::time::Duration; use subspace_core_primitives::{crypto, PieceIndexHash, U256}; -use subspace_networking::{Config, StubNetworkingParametersManager}; +use subspace_networking::Config; #[tokio::main] async fn main() { @@ -44,7 +44,6 @@ async fn main() { let bootstrap_addresses = vec![node_1_addr.with(Protocol::P2p(node_1.id().into()))]; let config_2 = Config { - networking_parameters_registry: StubNetworkingParametersManager.boxed(), listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()], allow_non_global_addresses_in_dht: true, bootstrap_addresses, diff --git a/crates/subspace-networking/examples/networking.rs b/crates/subspace-networking/examples/networking.rs index 0f23f5619e..0ee46f4ab0 100644 --- a/crates/subspace-networking/examples/networking.rs +++ b/crates/subspace-networking/examples/networking.rs @@ -7,7 +7,7 @@ use libp2p::multiaddr::Protocol; use parking_lot::Mutex; use std::sync::Arc; use std::time::Duration; -use subspace_networking::{Config, StubNetworkingParametersManager}; +use subspace_networking::Config; const TOPIC: &str = "Foo"; @@ -49,7 +49,6 @@ async fn main() { let bootstrap_addresses = vec![node_1_addr.with(Protocol::P2p(node_1.id().into()))]; let config_2 = Config { - networking_parameters_registry: StubNetworkingParametersManager.boxed(), listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()], allow_non_global_addresses_in_dht: true, bootstrap_addresses, diff --git a/crates/subspace-networking/examples/requests.rs b/crates/subspace-networking/examples/requests.rs index 4d5b4edba7..12145a7748 100644 --- a/crates/subspace-networking/examples/requests.rs +++ b/crates/subspace-networking/examples/requests.rs @@ -8,7 +8,6 @@ use std::sync::Arc; use std::time::Duration; use subspace_networking::{ start_prometheus_metrics_server, Config, GenericRequest, GenericRequestHandler, - StubNetworkingParametersManager, }; use tokio::time::sleep; use tracing::error; @@ -88,7 +87,6 @@ async fn main() { let bootstrap_addresses = vec![node_1_addr.with(Protocol::P2p(node_1.id().into()))]; let config_2 = Config { - networking_parameters_registry: StubNetworkingParametersManager.boxed(), listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()], allow_non_global_addresses_in_dht: true, request_response_protocols: vec![GenericRequestHandler::::create( diff --git a/crates/subspace-networking/src/behavior/tests.rs b/crates/subspace-networking/src/behavior/tests.rs index f7743dd823..d8fcb230a9 100644 --- a/crates/subspace-networking/src/behavior/tests.rs +++ b/crates/subspace-networking/src/behavior/tests.rs @@ -2,6 +2,7 @@ use super::persistent_parameters::remove_known_peer_addresses_internal; use crate::behavior::provider_storage::{instant_to_micros, micros_to_instant}; use crate::{Config, GenericRequest, GenericRequestHandler, StubNetworkingParametersManager}; use futures::channel::oneshot; +use futures::future::pending; use libp2p::multiaddr::Protocol; use libp2p::{Multiaddr, PeerId}; use lru::LruCache; @@ -173,6 +174,20 @@ async fn test_async_handler_works_with_pending_internal_future() { let (node_2, mut node_runner_2) = crate::create(config_2).unwrap(); + let bootstrap_fut = Box::pin({ + let node = node_2.clone(); + + async move { + let _ = node.bootstrap().await; + + pending::<()>().await; + } + }); + + tokio::spawn(async move { + bootstrap_fut.await; + }); + tokio::spawn(async move { node_runner_2.run().await; }); diff --git a/crates/subspace-networking/src/node.rs b/crates/subspace-networking/src/node.rs index 1637d2558a..436be5e2a7 100644 --- a/crates/subspace-networking/src/node.rs +++ b/crates/subspace-networking/src/node.rs @@ -14,6 +14,7 @@ use libp2p::kad::PeerRecord; use libp2p::{Multiaddr, PeerId}; use parity_scale_codec::Decode; use std::pin::Pin; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; @@ -669,7 +670,7 @@ impl Node { pub(crate) async fn wait_for_bootstrap(&self) { loop { - let was_bootstrapped = self.shared.bootstrap_finished.lock().to_owned(); + let was_bootstrapped = self.shared.bootstrap_finished.load(Ordering::SeqCst); if was_bootstrapped { return; diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index 1432b962f0..0bbd253716 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -836,8 +836,7 @@ where fn set_bootstrap_finished(&mut self, success: bool) { if let Some(shared) = self.shared_weak.upgrade() { - let mut bootstrap_finished = shared.bootstrap_finished.lock(); - *bootstrap_finished = true; + shared.bootstrap_finished.store(true, Ordering::SeqCst); debug!(%success, "Bootstrap finished.",); } diff --git a/crates/subspace-networking/src/shared.rs b/crates/subspace-networking/src/shared.rs index 489e7cec0e..9e0cf53c17 100644 --- a/crates/subspace-networking/src/shared.rs +++ b/crates/subspace-networking/src/shared.rs @@ -13,7 +13,7 @@ use libp2p::kad::record::Key; use libp2p::kad::PeerRecord; use libp2p::{Multiaddr, PeerId}; use parking_lot::Mutex; -use std::sync::atomic::AtomicUsize; +use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::Arc; #[derive(Debug)] @@ -126,7 +126,7 @@ pub(crate) struct Shared { pub(crate) command_sender: mpsc::Sender, pub(crate) kademlia_tasks_semaphore: ResizableSemaphore, pub(crate) regular_tasks_semaphore: ResizableSemaphore, - pub(crate) bootstrap_finished: Mutex, + pub(crate) bootstrap_finished: AtomicBool, } impl Shared { @@ -145,7 +145,7 @@ impl Shared { command_sender, kademlia_tasks_semaphore, regular_tasks_semaphore, - bootstrap_finished: Mutex::new(false), + bootstrap_finished: AtomicBool::new(false), } } } From 5b298074f1de382cff67339ba7ff6ddcaee5f9f8 Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Wed, 26 Jul 2023 14:37:25 +0700 Subject: [PATCH 4/8] networking: Refactor networking parameters manager. --- .../src/bin/subspace-farmer/commands/farm/dsn.rs | 2 +- crates/subspace-networking/examples/get-peers-complex.rs | 8 +++++--- .../src/behavior/persistent_parameters.rs | 2 +- crates/subspace-networking/src/behavior/tests.rs | 3 +-- .../src/bin/subspace-bootstrap-node/main.rs | 4 ++-- crates/subspace-networking/src/create.rs | 9 +++++---- crates/subspace-networking/src/lib.rs | 1 - crates/subspace-node/src/import_blocks_from_dsn.rs | 3 +-- crates/subspace-service/src/dsn.rs | 4 ++-- 9 files changed, 18 insertions(+), 18 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs index 973a6956e2..1cace910b9 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs @@ -152,7 +152,7 @@ pub(super) fn configure_dsn( reserved_peers, listen_on, allow_non_global_addresses_in_dht: !disable_private_ips, - networking_parameters_registry, + networking_parameters_registry: Some(networking_parameters_registry), request_response_protocols: vec![ PieceAnnouncementRequestHandler::create({ move |peer_id, req| { diff --git a/crates/subspace-networking/examples/get-peers-complex.rs b/crates/subspace-networking/examples/get-peers-complex.rs index bf23e4e12b..cfd2fe85f7 100644 --- a/crates/subspace-networking/examples/get-peers-complex.rs +++ b/crates/subspace-networking/examples/get-peers-complex.rs @@ -80,9 +80,11 @@ async fn main() { let config = Config { listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()], allow_non_global_addresses_in_dht: true, - networking_parameters_registry: NetworkingParametersManager::new(db_path.as_ref()) - .unwrap() - .boxed(), + networking_parameters_registry: Some( + NetworkingParametersManager::new(db_path.as_ref()) + .unwrap() + .boxed(), + ), bootstrap_addresses: bootstrap_nodes, ..Config::default() }; diff --git a/crates/subspace-networking/src/behavior/persistent_parameters.rs b/crates/subspace-networking/src/behavior/persistent_parameters.rs index 1387fad5b6..0c7f9b3904 100644 --- a/crates/subspace-networking/src/behavior/persistent_parameters.rs +++ b/crates/subspace-networking/src/behavior/persistent_parameters.rs @@ -72,7 +72,7 @@ impl Clone for Box { /// Networking manager implementation with NOOP implementation. #[derive(Clone, Default)] -pub struct StubNetworkingParametersManager; +pub(crate) struct StubNetworkingParametersManager; impl StubNetworkingParametersManager { /// Returns an instance of `StubNetworkingParametersManager` as the `Box` reference. diff --git a/crates/subspace-networking/src/behavior/tests.rs b/crates/subspace-networking/src/behavior/tests.rs index d8fcb230a9..07a3f93f4e 100644 --- a/crates/subspace-networking/src/behavior/tests.rs +++ b/crates/subspace-networking/src/behavior/tests.rs @@ -1,6 +1,6 @@ use super::persistent_parameters::remove_known_peer_addresses_internal; use crate::behavior::provider_storage::{instant_to_micros, micros_to_instant}; -use crate::{Config, GenericRequest, GenericRequestHandler, StubNetworkingParametersManager}; +use crate::{Config, GenericRequest, GenericRequestHandler}; use futures::channel::oneshot; use futures::future::pending; use libp2p::multiaddr::Protocol; @@ -162,7 +162,6 @@ async fn test_async_handler_works_with_pending_internal_future() { let bootstrap_addresses = vec![node_1_addr.with(Protocol::P2p(node_1.id().into()))]; let config_2 = Config { - networking_parameters_registry: StubNetworkingParametersManager.boxed(), listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()], allow_non_global_addresses_in_dht: true, request_response_protocols: vec![GenericRequestHandler::::create( diff --git a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs index 5e25762672..351d713adb 100644 --- a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs +++ b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use subspace_networking::libp2p::multiaddr::Protocol; use subspace_networking::{ peer_id, Config, NetworkingParametersManager, ParityDbProviderStorage, PeerInfoProvider, - StubNetworkingParametersManager, VoidProviderStorage, + VoidProviderStorage, }; use tracing::{debug, info, warn, Level}; use tracing_subscriber::fmt::Subscriber; @@ -164,7 +164,7 @@ async fn main() -> anyhow::Result<()> { NetworkingParametersManager::new(&known_addresses_db) .map(|manager| manager.boxed()) }) - .unwrap_or(Ok(StubNetworkingParametersManager.boxed())) + .transpose() .map_err(|err| anyhow!(err))? }; diff --git a/crates/subspace-networking/src/create.rs b/crates/subspace-networking/src/create.rs index 9bfc8f27f1..6fb506057d 100644 --- a/crates/subspace-networking/src/create.rs +++ b/crates/subspace-networking/src/create.rs @@ -188,8 +188,8 @@ pub struct Config { pub allow_non_global_addresses_in_dht: bool, /// How frequently should random queries be done using Kademlia DHT to populate routing table. pub initial_random_query_interval: Duration, - /// A reference to the `NetworkingParametersRegistry` implementation. - pub networking_parameters_registry: Box, + /// A reference to the `NetworkingParametersRegistry` implementation (optional). + pub networking_parameters_registry: Option>, /// The configuration for the `RequestResponsesBehaviour` protocol. pub request_response_protocols: Vec>, /// Defines set of peers with a permanent connection (and reconnection if necessary). @@ -313,7 +313,7 @@ where provider_storage, allow_non_global_addresses_in_dht: false, initial_random_query_interval: Duration::from_secs(1), - networking_parameters_registry: StubNetworkingParametersManager.boxed(), + networking_parameters_registry: None, request_response_protocols: Vec::new(), yamux_config, reserved_peers: Vec::new(), @@ -495,7 +495,8 @@ where swarm, shared_weak, next_random_query_interval: initial_random_query_interval, - networking_parameters_registry, + networking_parameters_registry: networking_parameters_registry + .unwrap_or(StubNetworkingParametersManager.boxed()), reserved_peers: convert_multiaddresses(reserved_peers).into_iter().collect(), temporary_bans, metrics, diff --git a/crates/subspace-networking/src/lib.rs b/crates/subspace-networking/src/lib.rs index a6e339ba67..e3e1846916 100644 --- a/crates/subspace-networking/src/lib.rs +++ b/crates/subspace-networking/src/lib.rs @@ -32,7 +32,6 @@ pub mod utils; pub use crate::behavior::persistent_parameters::{ NetworkParametersPersistenceError, NetworkingParametersManager, ParityDbError, - StubNetworkingParametersManager, }; pub use crate::node::{ GetClosestPeersError, Node, SendRequestError, SubscribeError, TopicSubscription, diff --git a/crates/subspace-node/src/import_blocks_from_dsn.rs b/crates/subspace-node/src/import_blocks_from_dsn.rs index 223347b363..db410f6b75 100644 --- a/crates/subspace-node/src/import_blocks_from_dsn.rs +++ b/crates/subspace-node/src/import_blocks_from_dsn.rs @@ -22,7 +22,7 @@ use sp_core::traits::SpawnEssentialNamed; use sp_runtime::traits::Block as BlockT; use std::sync::Arc; use subspace_networking::libp2p::Multiaddr; -use subspace_networking::{Config, PieceByHashRequestHandler, StubNetworkingParametersManager}; +use subspace_networking::{Config, PieceByHashRequestHandler}; use subspace_service::dsn::import_blocks::initial_block_import_from_dsn; /// The `import-blocks-from-network` command used to import blocks from Subspace Network DSN. @@ -61,7 +61,6 @@ impl ImportBlocksFromDsnCmd { IQ: sc_service::ImportQueue + 'static, { let (node, mut node_runner) = subspace_networking::create(Config { - networking_parameters_registry: StubNetworkingParametersManager.boxed(), allow_non_global_addresses_in_dht: true, request_response_protocols: vec![PieceByHashRequestHandler::create( move |_, _| async { None }, diff --git a/crates/subspace-service/src/dsn.rs b/crates/subspace-service/src/dsn.rs index 8ecc1c1dc8..5ae7f0d2df 100644 --- a/crates/subspace-service/src/dsn.rs +++ b/crates/subspace-service/src/dsn.rs @@ -20,7 +20,7 @@ use subspace_networking::{ PeerInfoProvider, PieceAnnouncementRequestHandler, PieceAnnouncementResponse, PieceByHashRequestHandler, PieceByHashResponse, ProviderStorage, SegmentHeaderBySegmentIndexesRequestHandler, SegmentHeaderRequest, SegmentHeaderResponse, - StubNetworkingParametersManager, KADEMLIA_PROVIDER_TTL_IN_SECS, + KADEMLIA_PROVIDER_TTL_IN_SECS, }; use thiserror::Error; use tracing::{debug, error, trace}; @@ -116,7 +116,7 @@ where NetworkingParametersManager::new(&db_path).map(|manager| manager.boxed()) }) - .unwrap_or(Ok(StubNetworkingParametersManager.boxed()))? + .transpose()? }; let provider_storage = From 64fecdfab97397af9831ac7f710486485db8aa1d Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Wed, 26 Jul 2023 15:31:45 +0700 Subject: [PATCH 5/8] networking: Change bootstrapping algorithm. --- .../src/bin/subspace-farmer/commands/farm.rs | 16 --------- .../src/bin/subspace-bootstrap-node/main.rs | 24 ++----------- crates/subspace-networking/src/node_runner.rs | 36 +++++++++++++++++++ crates/subspace-service/src/lib.rs | 18 +--------- 4 files changed, 39 insertions(+), 55 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index a9d0dad825..e569d47ccc 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -5,7 +5,6 @@ use crate::commands::shared::print_disk_farm_info; use crate::utils::{get_required_plot_space_with_overhead, shutdown_signal}; use crate::{DiskFarm, FarmingArgs}; use anyhow::{anyhow, Context, Result}; -use futures::future::pending; use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use lru::LruCache; @@ -352,22 +351,7 @@ where )?; let mut networking_fut = Box::pin(networking_fut).fuse(); - let bootstrap_fut = Box::pin({ - let node = node.clone(); - - async move { - if let Err(err) = node.bootstrap().await { - warn!(?err, "DSN bootstrap failed."); - } - - pending::<()>().await; - } - }); - futures::select!( - // Network bootstrapping future - _ = bootstrap_fut.fuse() => {}, - // Signal future _ = signal.fuse() => {}, diff --git a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs index 351d713adb..83f2536778 100644 --- a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs +++ b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs @@ -6,8 +6,6 @@ use anyhow::anyhow; use bytesize::ByteSize; use clap::{Parser, ValueHint}; use either::Either; -use futures::future::pending; -use futures::FutureExt; use libp2p::identity::ed25519::Keypair; use libp2p::{identity, Multiaddr, PeerId}; use serde::{Deserialize, Serialize}; @@ -20,7 +18,7 @@ use subspace_networking::{ peer_id, Config, NetworkingParametersManager, ParityDbProviderStorage, PeerInfoProvider, VoidProviderStorage, }; -use tracing::{debug, info, warn, Level}; +use tracing::{debug, info, Level}; use tracing_subscriber::fmt::Subscriber; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; @@ -205,25 +203,7 @@ async fn main() -> anyhow::Result<()> { .detach(); info!("Subspace Bootstrap Node started"); - let bootstrap_fut = Box::pin({ - let node = node.clone(); - - async move { - if let Err(err) = node.bootstrap().await { - warn!(?err, "DSN bootstrap failed."); - } - - pending::<()>().await; - } - }); - - futures::select!( - // Network bootstrapping future - _ = bootstrap_fut.fuse() => {}, - - // Networking runner - _ = node_runner.run().fuse() => {}, - ); + node_runner.run().await; } Command::GenerateKeypair { json } => { let output = KeypairOutput::new(Keypair::generate()); diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index 0bbd253716..5ddf9ff1bc 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -199,6 +199,8 @@ where /// Drives the main networking future forward. pub async fn run(&mut self) { + self.bootstrap().await; + loop { futures::select! { _ = &mut self.random_query_timeout => { @@ -237,6 +239,40 @@ where } } + /// Bootstraps Kademlia network + pub async fn bootstrap(&mut self) { + let (result_sender, mut result_receiver) = mpsc::unbounded(); + + debug!("Bootstrap started."); + + self.handle_command(Command::Bootstrap { result_sender }) + .await; + + let mut bootstrap_step = 0; + loop { + futures::select! { + swarm_event = self.swarm.next() => { + if let Some(swarm_event) = swarm_event { + self.register_event_metrics(&swarm_event); + self.handle_swarm_event(swarm_event).await; + } else { + break; + } + }, + result = result_receiver.next() => { + if result.is_some() { + debug!(%bootstrap_step, "Kademlia bootstrapping..."); + bootstrap_step += 1; + } else { + break; + } + } + } + } + + debug!("Bootstrap finished."); + } + /// Handles periodical tasks. async fn handle_periodical_tasks(&mut self) { // Log current connections. diff --git a/crates/subspace-service/src/lib.rs b/crates/subspace-service/src/lib.rs index 29984f72e6..bfb65b2fe4 100644 --- a/crates/subspace-service/src/lib.rs +++ b/crates/subspace-service/src/lib.rs @@ -87,7 +87,7 @@ use subspace_runtime_primitives::opaque::Block; use subspace_runtime_primitives::{AccountId, Balance, Hash, Index as Nonce}; use subspace_transaction_pool::bundle_validator::BundleValidator; use subspace_transaction_pool::{FullPool, PreValidateTransaction}; -use tracing::{debug, error, info, warn, Instrument}; +use tracing::{debug, error, info, Instrument}; /// Error type for Subspace service. #[derive(thiserror::Error, Debug)] @@ -652,22 +652,6 @@ where ), ); - task_manager.spawn_handle().spawn( - "node-runner", - Some("subspace-networking-bootstrapping"), - Box::pin( - { - let node = node.clone(); - async move { - if let Err(err) = node.bootstrap().await { - warn!(?err, "DSN bootstrap failed."); - } - } - } - .in_current_span(), - ), - ); - (node, dsn_config.bootstrap_nodes, Some(piece_cache)) } }; From b7de0519796a17ee40840669efbc4e27bf49b173 Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Wed, 26 Jul 2023 17:31:12 +0700 Subject: [PATCH 6/8] networking: Ensure a single bootstrapping for node_runner.run(). --- crates/subspace-networking/src/node_runner.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index 5ddf9ff1bc..7a14f75718 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -128,6 +128,8 @@ where rng: StdRng, /// Addresses to bootstrap Kademlia network bootstrap_addresses: Vec, + /// Ensures a single bootstrap on run() invocation. + was_bootstrapped: bool, } // Helper struct for NodeRunner configuration (clippy requirement). @@ -194,12 +196,15 @@ where special_connection_decision_handler, rng: StdRng::seed_from_u64(KADEMLIA_PEERS_ADDRESSES_BATCH_SIZE as u64), // any seed bootstrap_addresses, + was_bootstrapped: false, } } /// Drives the main networking future forward. pub async fn run(&mut self) { - self.bootstrap().await; + if !self.was_bootstrapped { + self.bootstrap().await; + } loop { futures::select! { @@ -240,7 +245,9 @@ where } /// Bootstraps Kademlia network - pub async fn bootstrap(&mut self) { + async fn bootstrap(&mut self) { + self.was_bootstrapped = true; + let (result_sender, mut result_receiver) = mpsc::unbounded(); debug!("Bootstrap started."); From 76e464a868d781cd65abac608248eeb54eeb97ac Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Wed, 26 Jul 2023 18:11:31 +0300 Subject: [PATCH 7/8] Simplify bootstrapping process --- Cargo.lock | 10 ++ crates/subspace-networking/Cargo.toml | 1 + .../src/behavior/persistent_parameters.rs | 6 +- crates/subspace-networking/src/node.rs | 24 ----- crates/subspace-networking/src/node_runner.rs | 95 ++++++++++--------- crates/subspace-networking/src/shared.rs | 4 +- 6 files changed, 67 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c7dfa1390c..5d32e35b5b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -612,6 +612,15 @@ dependencies = [ "event-listener", ] +[[package]] +name = "async-mutex" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" +dependencies = [ + "event-listener", +] + [[package]] name = "async-oneshot" version = "0.5.0" @@ -10669,6 +10678,7 @@ version = "0.1.0" dependencies = [ "actix-web", "anyhow", + "async-mutex", "async-trait", "backoff", "bytes", diff --git a/crates/subspace-networking/Cargo.toml b/crates/subspace-networking/Cargo.toml index db9bf3d802..a9af82c7d0 100644 --- a/crates/subspace-networking/Cargo.toml +++ b/crates/subspace-networking/Cargo.toml @@ -16,6 +16,7 @@ include = [ ] [dependencies] +async-mutex = "1.4.0" actix-web = "4.3.1" anyhow = "1.0.71" async-trait = "0.1.68" diff --git a/crates/subspace-networking/src/behavior/persistent_parameters.rs b/crates/subspace-networking/src/behavior/persistent_parameters.rs index 0c7f9b3904..635d39c843 100644 --- a/crates/subspace-networking/src/behavior/persistent_parameters.rs +++ b/crates/subspace-networking/src/behavior/persistent_parameters.rs @@ -47,7 +47,7 @@ pub trait NetworkingParametersRegistry: Send + Sync { async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec); /// Unregisters associated addresses for peer ID. - async fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId); + fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId); /// Returns a batch of the combined collection of known addresses from networking parameters DB /// and boostrap addresses from networking parameters initialization. @@ -87,7 +87,7 @@ impl NetworkingParametersRegistry for StubNetworkingParametersManager { async fn remove_known_peer_addresses(&mut self, _peer_id: PeerId, _addresses: Vec) {} - async fn remove_all_known_peer_addresses(&mut self, _peer_id: PeerId) {} + fn remove_all_known_peer_addresses(&mut self, _peer_id: PeerId) {} async fn next_known_addresses_batch(&mut self) -> Vec { Vec::new() @@ -269,7 +269,7 @@ impl NetworkingParametersRegistry for NetworkingParametersManager { self.cache_need_saving = true; } - async fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId) { + fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId) { trace!(%peer_id, "Remove all peer addresses from the networking parameters registry"); self.known_peers.pop(&peer_id); diff --git a/crates/subspace-networking/src/node.rs b/crates/subspace-networking/src/node.rs index 436be5e2a7..c52e3d2b15 100644 --- a/crates/subspace-networking/src/node.rs +++ b/crates/subspace-networking/src/node.rs @@ -14,7 +14,6 @@ use libp2p::kad::PeerRecord; use libp2p::{Multiaddr, PeerId}; use parity_scale_codec::Decode; use std::pin::Pin; -use std::sync::atomic::Ordering; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; @@ -22,8 +21,6 @@ use thiserror::Error; use tokio::time::sleep; use tracing::{debug, error, trace}; -const BOOTSTRAP_CHECK_DELAY: Duration = Duration::from_secs(1); - /// Topic subscription, will unsubscribe when last instance is dropped for a particular topic. #[derive(Debug)] #[pin_project::pin_project(PinnedDrop)] @@ -320,7 +317,6 @@ impl Node { &self, key: Multihash, ) -> Result, GetValueError> { - self.wait_for_bootstrap().await; let permit = self.shared.kademlia_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = mpsc::unbounded(); @@ -344,7 +340,6 @@ impl Node { key: Multihash, value: Vec, ) -> Result, PutValueError> { - self.wait_for_bootstrap().await; let permit = self.shared.kademlia_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = mpsc::unbounded(); @@ -365,7 +360,6 @@ impl Node { /// Subcribe to some topic on the DSN. pub async fn subscribe(&self, topic: Sha256Topic) -> Result { - self.wait_for_bootstrap().await; let permit = self.shared.regular_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = oneshot::channel(); @@ -394,7 +388,6 @@ impl Node { /// Subcribe a messgo to some topic on the DSN. pub async fn publish(&self, topic: Sha256Topic, message: Vec) -> Result<(), PublishError> { - self.wait_for_bootstrap().await; let _permit = self.shared.regular_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = oneshot::channel(); @@ -420,7 +413,6 @@ impl Node { where Request: GenericRequest, { - self.wait_for_bootstrap().await; let _permit = self.shared.regular_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = oneshot::channel(); let command = Command::GenericRequest { @@ -442,7 +434,6 @@ impl Node { &self, key: Multihash, ) -> Result, GetClosestPeersError> { - self.wait_for_bootstrap().await; let permit = self.shared.kademlia_tasks_semaphore.acquire().await; trace!(?key, "Starting 'GetClosestPeers' request."); @@ -544,7 +535,6 @@ impl Node { &self, key: Multihash, ) -> Result, GetProvidersError> { - self.wait_for_bootstrap().await; let permit = self.shared.kademlia_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = mpsc::unbounded(); @@ -667,18 +657,4 @@ impl Node { pub fn on_connected_peer(&self, callback: HandlerFn) -> HandlerId { self.shared.handlers.connected_peer.add(callback) } - - pub(crate) async fn wait_for_bootstrap(&self) { - loop { - let was_bootstrapped = self.shared.bootstrap_finished.load(Ordering::SeqCst); - - if was_bootstrapped { - return; - } else { - trace!("Waiting for bootstrap..."); - - sleep(BOOTSTRAP_CHECK_DELAY).await; - } - } - } } diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index 7a14f75718..490466cc80 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -16,10 +16,11 @@ use crate::shared::{Command, CreatedSubscription, NewPeerInfo, Shared}; use crate::utils::{ convert_multiaddresses, is_global_address_or_dns, PeerAddress, ResizableSemaphorePermit, }; +use async_mutex::Mutex as AsyncMutex; use bytes::Bytes; use futures::channel::mpsc; use futures::future::Fuse; -use futures::{FutureExt, SinkExt, StreamExt}; +use futures::{FutureExt, StreamExt}; use libp2p::core::ConnectedPoint; use libp2p::gossipsub::{Event as GossipsubEvent, TopicHash}; use libp2p::identify::Event as IdentifyEvent; @@ -85,6 +86,14 @@ enum QueryResultSender { }, } +#[derive(Debug, Default)] +enum BootstrapCommandState { + #[default] + NotStarted, + InProgress(mpsc::UnboundedReceiver<()>), + Finished, +} + /// Runner for the Node. #[must_use = "Node does not function properly unless its runner is driven forward"] pub struct NodeRunner @@ -129,7 +138,7 @@ where /// Addresses to bootstrap Kademlia network bootstrap_addresses: Vec, /// Ensures a single bootstrap on run() invocation. - was_bootstrapped: bool, + bootstrap_command_state: Arc>, } // Helper struct for NodeRunner configuration (clippy requirement). @@ -196,15 +205,13 @@ where special_connection_decision_handler, rng: StdRng::seed_from_u64(KADEMLIA_PEERS_ADDRESSES_BATCH_SIZE as u64), // any seed bootstrap_addresses, - was_bootstrapped: false, + bootstrap_command_state: Arc::new(AsyncMutex::new(BootstrapCommandState::default())), } } /// Drives the main networking future forward. pub async fn run(&mut self) { - if !self.was_bootstrapped { - self.bootstrap().await; - } + self.bootstrap().await; loop { futures::select! { @@ -226,7 +233,7 @@ where }, command = self.command_receiver.next() => { if let Some(command) = command { - self.handle_command(command).await; + self.handle_command(command); } else { break; } @@ -246,14 +253,38 @@ where /// Bootstraps Kademlia network async fn bootstrap(&mut self) { - self.was_bootstrapped = true; + let bootstrap_command_state = Arc::clone(&self.bootstrap_command_state); + let mut bootstrap_command_state = bootstrap_command_state.lock().await; + let bootstrap_command_receiver = match &mut *bootstrap_command_state { + BootstrapCommandState::NotStarted => { + error!("Bootstrap started."); - let (result_sender, mut result_receiver) = mpsc::unbounded(); + let (bootstrap_command_sender, bootstrap_command_receiver) = mpsc::unbounded(); - debug!("Bootstrap started."); + self.handle_command(Command::Bootstrap { + result_sender: bootstrap_command_sender, + }); + + *bootstrap_command_state = + BootstrapCommandState::InProgress(bootstrap_command_receiver); + match &mut *bootstrap_command_state { + BootstrapCommandState::InProgress(bootstrap_command_receiver) => { + bootstrap_command_receiver + } + _ => { + unreachable!("Was just set to that exact value"); + } + } + } + BootstrapCommandState::InProgress(bootstrap_command_receiver) => { + bootstrap_command_receiver + } + BootstrapCommandState::Finished => { + return; + } + }; - self.handle_command(Command::Bootstrap { result_sender }) - .await; + debug!("Bootstrap started."); let mut bootstrap_step = 0; loop { @@ -266,7 +297,7 @@ where break; } }, - result = result_receiver.next() => { + result = bootstrap_command_receiver.next() => { if result.is_some() { debug!(%bootstrap_step, "Kademlia bootstrapping..."); bootstrap_step += 1; @@ -278,6 +309,7 @@ where } debug!("Bootstrap finished."); + *bootstrap_command_state = BootstrapCommandState::Finished; } /// Handles periodical tasks. @@ -559,7 +591,7 @@ where "Peer has different protocol version. Peer was banned.", ); - self.ban_peer(peer_id).await; + self.ban_peer(peer_id); } if info.listen_addrs.len() > 30 { @@ -853,9 +885,7 @@ where ) || cancelled; } Err(error) => { - debug!(?error, "Bootstrap query failed.",); - - self.set_bootstrap_finished(false); + debug!(?error, "Bootstrap query failed."); } } } @@ -863,28 +893,12 @@ where if last || cancelled { // There will be no more progress self.query_id_receivers.remove(&id); - - if last { - self.set_bootstrap_finished(true); - } - - if cancelled { - self.set_bootstrap_finished(false); - } } } _ => {} } } - fn set_bootstrap_finished(&mut self, success: bool) { - if let Some(shared) = self.shared_weak.upgrade() { - shared.bootstrap_finished.store(true, Ordering::SeqCst); - - debug!(%success, "Bootstrap finished.",); - } - } - // Returns `true` if query was cancelled fn unbounded_send_and_cancel_on_error( kademlia: &mut Kademlia>, @@ -982,7 +996,7 @@ where .add_peers_to_dial(&peers); } - async fn handle_command(&mut self, command: Command) { + fn handle_command(&mut self, command: Command) { match command { Command::GetValue { key, @@ -1234,7 +1248,7 @@ where ); } Command::BanPeer { peer_id } => { - self.ban_peer(peer_id).await; + self.ban_peer(peer_id); } Command::Dial { address } => { let _ = self.swarm.dial(address); @@ -1244,7 +1258,7 @@ where let _ = result_sender.send(connected_peers); } - Command::Bootstrap { mut result_sender } => { + Command::Bootstrap { result_sender } => { let kademlia = &mut self.swarm.behaviour_mut().kademlia; for (peer_id, address) in convert_multiaddresses(self.bootstrap_addresses.clone()) { @@ -1262,17 +1276,13 @@ where } Err(err) => { debug!(?err, "Bootstrap error."); - - let _ = result_sender.close().await; - - self.set_bootstrap_finished(false); } } } } } - async fn ban_peer(&mut self, peer_id: PeerId) { + fn ban_peer(&mut self, peer_id: PeerId) { // Remove temporary ban if there is any before creating a permanent one self.temporary_bans.lock().remove(&peer_id); @@ -1281,8 +1291,7 @@ where self.swarm.behaviour_mut().block_list.block_peer(peer_id); self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id); self.networking_parameters_registry - .remove_all_known_peer_addresses(peer_id) - .await; + .remove_all_known_peer_addresses(peer_id); } fn register_event_metrics(&mut self, swarm_event: &SwarmEvent) { diff --git a/crates/subspace-networking/src/shared.rs b/crates/subspace-networking/src/shared.rs index 9e0cf53c17..5cf6e10bad 100644 --- a/crates/subspace-networking/src/shared.rs +++ b/crates/subspace-networking/src/shared.rs @@ -13,7 +13,7 @@ use libp2p::kad::record::Key; use libp2p::kad::PeerRecord; use libp2p::{Multiaddr, PeerId}; use parking_lot::Mutex; -use std::sync::atomic::{AtomicBool, AtomicUsize}; +use std::sync::atomic::AtomicUsize; use std::sync::Arc; #[derive(Debug)] @@ -126,7 +126,6 @@ pub(crate) struct Shared { pub(crate) command_sender: mpsc::Sender, pub(crate) kademlia_tasks_semaphore: ResizableSemaphore, pub(crate) regular_tasks_semaphore: ResizableSemaphore, - pub(crate) bootstrap_finished: AtomicBool, } impl Shared { @@ -145,7 +144,6 @@ impl Shared { command_sender, kademlia_tasks_semaphore, regular_tasks_semaphore, - bootstrap_finished: AtomicBool::new(false), } } } From 589017cf05699ce516b8f249b1787a25f607c6e7 Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Wed, 26 Jul 2023 23:45:47 +0700 Subject: [PATCH 8/8] networking: Refactor node_runner. --- crates/subspace-networking/src/node_runner.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index 490466cc80..5eb5d442d7 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -253,11 +253,11 @@ where /// Bootstraps Kademlia network async fn bootstrap(&mut self) { - let bootstrap_command_state = Arc::clone(&self.bootstrap_command_state); + let bootstrap_command_state = self.bootstrap_command_state.clone(); let mut bootstrap_command_state = bootstrap_command_state.lock().await; let bootstrap_command_receiver = match &mut *bootstrap_command_state { BootstrapCommandState::NotStarted => { - error!("Bootstrap started."); + debug!("Bootstrap started."); let (bootstrap_command_sender, bootstrap_command_receiver) = mpsc::unbounded();