diff --git a/Cargo.lock b/Cargo.lock index 9f2407bb95..2afa8ab125 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1769,20 +1769,6 @@ dependencies = [ "cipher 0.4.4", ] -[[package]] -name = "cuckoofilter" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b810a8449931679f64cd7eef1bbd0fa315801b6d5d9cdc1ace2804d6529eee18" -dependencies = [ - "byteorder", - "fnv", - "rand 0.7.3", - "serde", - "serde_bytes", - "serde_derive", -] - [[package]] name = "curve25519-dalek" version = "2.1.3" @@ -2236,6 +2222,7 @@ dependencies = [ "domain-test-primitives", "domain-test-service", "evm-domain-test-runtime", + "frame-system", "futures", "futures-timer", "num-traits", @@ -9866,15 +9853,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_bytes" -version = "0.11.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "416bda436f9aab92e02c8e10d49a15ddd339cea90b6e340fe51ed97abb548294" -dependencies = [ - "serde", -] - [[package]] name = "serde_derive" version = "1.0.175" @@ -11156,7 +11134,6 @@ dependencies = [ "blake3", "bytesize", "clap", - "cuckoofilter", "derive_more", "event-listener-primitives", "fdlimit", diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index 160ef85122..228813cd89 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -21,7 +21,6 @@ blake2 = "0.10.6" blake3 = { version = "1.4.1", default-features = false } bytesize = "1.2.0" clap = { version = "4.2.1", features = ["color", "derive"] } -cuckoofilter = { version = "0.5.0", features = ["serde_support"] } derive_more = "0.99.17" event-listener-primitives = "2.0.1" fdlimit = "0.2" diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index fdebc107ec..64b2419281 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -13,14 +13,12 @@ use std::fs; use std::num::NonZeroUsize; use std::sync::Arc; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; -use subspace_core_primitives::{Piece, Record, SectorIndex}; +use subspace_core_primitives::{Record, SectorIndex}; use subspace_erasure_coding::ErasureCoding; use subspace_farmer::piece_cache::PieceCache; use subspace_farmer::single_disk_farm::{ SingleDiskFarm, SingleDiskFarmError, SingleDiskFarmOptions, }; -use subspace_farmer::utils::archival_storage_info::ArchivalStorageInfo; -use subspace_farmer::utils::archival_storage_pieces::ArchivalStoragePieces; use subspace_farmer::utils::farmer_piece_getter::FarmerPieceGetter; use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator; use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces; @@ -97,15 +95,6 @@ where .await .map_err(|error| anyhow::anyhow!(error))?; - let cuckoo_filter_capacity = disk_farms - .iter() - .map(|df| df.allocated_plotting_space as usize) - .sum::() - / Piece::SIZE - + 1usize; - let archival_storage_pieces = ArchivalStoragePieces::new(cuckoo_filter_capacity); - let archival_storage_info = ArchivalStorageInfo::default(); - let first_farm_directory = disk_farms .first() .expect("Disk farm collection is not be empty as checked above; qed") @@ -130,8 +119,6 @@ where dsn, Arc::downgrade(&readers_and_pieces), node_client.clone(), - archival_storage_pieces.clone(), - archival_storage_info.clone(), piece_cache.clone(), )? }; @@ -159,7 +146,6 @@ where piece_provider, piece_cache.clone(), node_client.clone(), - archival_storage_info, Arc::clone(&readers_and_pieces), )); @@ -258,10 +244,7 @@ where // Collect already plotted pieces { let mut readers_and_pieces = readers_and_pieces.lock(); - let readers_and_pieces = readers_and_pieces.insert(ReadersAndPieces::new( - piece_readers, - archival_storage_pieces, - )); + let readers_and_pieces = readers_and_pieces.insert(ReadersAndPieces::new(piece_readers)); single_disk_farms.iter().enumerate().try_for_each( |(disk_farm_index, single_disk_farm)| { diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs index f14d86e57a..8d284f247d 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs @@ -4,8 +4,6 @@ use std::collections::HashSet; use std::path::PathBuf; use std::sync::{Arc, Weak}; use subspace_farmer::piece_cache::PieceCache; -use subspace_farmer::utils::archival_storage_info::ArchivalStorageInfo; -use subspace_farmer::utils::archival_storage_pieces::ArchivalStoragePieces; use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces; use subspace_farmer::{NodeClient, NodeRpcClient}; use subspace_networking::libp2p::identity::Keypair; @@ -14,7 +12,7 @@ use subspace_networking::libp2p::multiaddr::Protocol; use subspace_networking::utils::multihash::ToMultihash; use subspace_networking::utils::strip_peer_id; use subspace_networking::{ - create, Config, NetworkingParametersManager, Node, NodeRunner, PeerInfo, PeerInfoProvider, + construct, Config, NetworkingParametersManager, Node, NodeRunner, PeerInfo, PeerInfoProvider, PieceByIndexRequest, PieceByIndexRequestHandler, PieceByIndexResponse, SegmentHeaderBySegmentIndexesRequestHandler, SegmentHeaderRequest, SegmentHeaderResponse, }; @@ -45,8 +43,6 @@ pub(super) fn configure_dsn( }: DsnArgs, weak_readers_and_pieces: Weak>>, node_client: NodeRpcClient, - archival_storage_pieces: ArchivalStoragePieces, - archival_storage_info: ArchivalStorageInfo, piece_cache: PieceCache, ) -> Result<(Node, NodeRunner), anyhow::Error> { let networking_parameters_registry = NetworkingParametersManager::new( @@ -62,9 +58,7 @@ pub(super) fn configure_dsn( protocol_prefix, keypair, piece_cache.clone(), - Some(PeerInfoProvider::new_farmer(Box::new( - archival_storage_pieces, - ))), + Some(PeerInfoProvider::new_farmer()), ); let config = Config { reserved_peers, @@ -192,7 +186,7 @@ pub(super) fn configure_dsn( ..default_config }; - create(config) + construct(config) .map(|(node, node_runner)| { node.on_new_listener(Arc::new({ let node = node.clone(); @@ -206,33 +200,6 @@ pub(super) fn configure_dsn( })) .detach(); - node.on_peer_info(Arc::new({ - let archival_storage_info = archival_storage_info.clone(); - - move |new_peer_info| { - let peer_id = new_peer_info.peer_id; - let peer_info = &new_peer_info.peer_info; - - if let PeerInfo::Farmer { cuckoo_filter } = peer_info { - archival_storage_info.update_cuckoo_filter(peer_id, cuckoo_filter.clone()); - - debug!(%peer_id, ?peer_info, "Peer info cached",); - } - } - })) - .detach(); - - node.on_disconnected_peer(Arc::new({ - let archival_storage_info = archival_storage_info.clone(); - - move |peer_id| { - if archival_storage_info.remove_peer_filter(peer_id) { - debug!(%peer_id, "Peer filter removed.",); - } - } - })) - .detach(); - // Consider returning HandlerId instead of each `detach()` calls for other usages. (node, node_runner) }) diff --git a/crates/subspace-farmer/src/utils.rs b/crates/subspace-farmer/src/utils.rs index cbf4d5b07c..afe3c20722 100644 --- a/crates/subspace-farmer/src/utils.rs +++ b/crates/subspace-farmer/src/utils.rs @@ -1,5 +1,3 @@ -pub mod archival_storage_info; -pub mod archival_storage_pieces; pub mod farmer_piece_getter; pub mod piece_validator; pub mod readers_and_pieces; diff --git a/crates/subspace-farmer/src/utils/archival_storage_info.rs b/crates/subspace-farmer/src/utils/archival_storage_info.rs deleted file mode 100644 index 24353129ec..0000000000 --- a/crates/subspace-farmer/src/utils/archival_storage_info.rs +++ /dev/null @@ -1,51 +0,0 @@ -use cuckoofilter::{CuckooFilter, ExportedCuckooFilter}; -use parking_lot::Mutex; -use std::collections::hash_map::DefaultHasher; -use std::collections::HashMap; -use std::fmt; -use std::fmt::Debug; -use std::sync::Arc; -use subspace_core_primitives::PieceIndex; -use subspace_networking::libp2p::PeerId; -use subspace_networking::CuckooFilterDTO; - -#[derive(Clone, Default)] -pub struct ArchivalStorageInfo { - peers: Arc>>>, -} - -impl Debug for ArchivalStorageInfo { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ArchivalStorageInfo") - .field("peers (len)", &self.peers.lock().len()) - .finish() - } -} - -impl ArchivalStorageInfo { - pub fn update_cuckoo_filter(&self, peer_id: PeerId, cuckoo_filter_dto: Arc) { - let exported_filter = ExportedCuckooFilter { - values: cuckoo_filter_dto.values.clone(), - length: cuckoo_filter_dto.length as usize, - }; - - let cuckoo_filter = CuckooFilter::from(exported_filter); - - self.peers.lock().insert(peer_id, cuckoo_filter); - } - - pub fn remove_peer_filter(&self, peer_id: &PeerId) -> bool { - self.peers.lock().remove(peer_id).is_some() - } - - pub fn peers_contain_piece(&self, piece_index: &PieceIndex) -> Vec { - let mut result = Vec::new(); - for (peer_id, cuckoo_filter) in self.peers.lock().iter() { - if cuckoo_filter.contains(piece_index) { - result.push(*peer_id) - } - } - - result - } -} diff --git a/crates/subspace-farmer/src/utils/archival_storage_pieces.rs b/crates/subspace-farmer/src/utils/archival_storage_pieces.rs deleted file mode 100644 index cdaee990b0..0000000000 --- a/crates/subspace-farmer/src/utils/archival_storage_pieces.rs +++ /dev/null @@ -1,87 +0,0 @@ -use cuckoofilter::CuckooFilter; -use event_listener_primitives::{Bag, HandlerId}; -use parking_lot::Mutex; -use std::collections::hash_map::DefaultHasher; -use std::fmt; -use std::fmt::Debug; -use std::sync::Arc; -use subspace_core_primitives::PieceIndex; -use subspace_networking::{ - CuckooFilterDTO, CuckooFilterProvider, Notification, NotificationHandler, -}; -use tracing::warn; - -type NotificationEventHandler = Bag; - -// TODO: Consider renaming this type. -#[derive(Clone)] -pub struct ArchivalStoragePieces { - cuckoo_filter: Arc>>, - listeners: NotificationEventHandler, -} - -impl Debug for ArchivalStoragePieces { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ArchivalStoragePieces") - .field("cuckoo_filter (len)", &self.cuckoo_filter.lock().len()) - .finish() - } -} - -impl ArchivalStoragePieces { - pub fn new(capacity: usize) -> Self { - Self { - cuckoo_filter: Arc::new(Mutex::new(CuckooFilter::with_capacity(capacity))), - listeners: Bag::default(), - } - } - - pub fn add_pieces(&self, piece_indexes: &[PieceIndex]) { - let mut cuckoo_filter = self.cuckoo_filter.lock(); - let mut last_error = None; - - for piece_index in piece_indexes { - if let Err(err) = cuckoo_filter.add(piece_index) { - last_error.replace(err); - } - } - drop(cuckoo_filter); - - self.listeners.call_simple(&Notification); - - if let Some(err) = last_error { - warn!( - ?err, - "Cuckoo-filter returned an error during piece insertion." - ); - } - } - - pub fn delete_pieces(&self, piece_indexes: &[PieceIndex]) { - let mut cuckoo_filter = self.cuckoo_filter.lock(); - - for piece_index in piece_indexes { - cuckoo_filter.delete(piece_index); - } - drop(cuckoo_filter); - - self.listeners.call_simple(&Notification); - } -} - -impl CuckooFilterProvider for ArchivalStoragePieces { - fn cuckoo_filter(&self) -> CuckooFilterDTO { - let exported_filter = self.cuckoo_filter.lock().export(); - - CuckooFilterDTO { - values: exported_filter.values, - length: exported_filter.length as u64, - } - } - - fn on_notification(&self, handler: NotificationHandler) -> Option { - let handler_id = self.listeners.add(handler); - - Some(handler_id) - } -} diff --git a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs index f68b4bde33..2d9cb118ad 100644 --- a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs @@ -1,5 +1,4 @@ use crate::piece_cache::PieceCache; -use crate::utils::archival_storage_info::ArchivalStorageInfo; use crate::utils::readers_and_pieces::ReadersAndPieces; use crate::NodeClient; use async_trait::async_trait; @@ -14,14 +13,13 @@ use subspace_networking::libp2p::PeerId; use subspace_networking::utils::multihash::ToMultihash; use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator, RetryPolicy}; use subspace_networking::Node; -use tracing::error; +use tracing::{debug, error, trace}; pub struct FarmerPieceGetter { node: Node, piece_provider: PieceProvider, piece_cache: PieceCache, node_client: NC, - archival_storage_info: ArchivalStorageInfo, readers_and_pieces: Arc>>, } @@ -31,7 +29,6 @@ impl FarmerPieceGetter { piece_provider: PieceProvider, piece_cache: PieceCache, node_client: NC, - archival_storage_info: ArchivalStorageInfo, readers_and_pieces: Arc>>, ) -> Self { Self { @@ -39,7 +36,6 @@ impl FarmerPieceGetter { piece_provider, piece_cache, node_client, - archival_storage_info, readers_and_pieces, } } @@ -111,24 +107,30 @@ where // L1 piece acquisition // TODO: consider using retry policy for L1 lookups as well. let connected_peers = HashSet::::from_iter(self.node.connected_peers().await?); + if connected_peers.is_empty() { + debug!(%piece_index, "Cannot acquire piece from L1: no connected peers."); - for peer_id in self - .archival_storage_info - .peers_contain_piece(&piece_index) - .iter() - { - if connected_peers.contains(peer_id) { - let maybe_piece = self - .piece_provider - .get_piece_from_peer(*peer_id, piece_index) - .await; - - if maybe_piece.is_some() { - return Ok(maybe_piece); - } + return Ok(None); + } + + for peer_id in connected_peers.iter() { + let maybe_piece = self + .piece_provider + .get_piece_from_peer(*peer_id, piece_index) + .await; + + if maybe_piece.is_some() { + trace!(%piece_index, %peer_id, "L1 lookup succeeded."); + + return Ok(maybe_piece); } } + debug!( + %piece_index, + connected_peers=%connected_peers.len(), + "Cannot acquire piece: all methods yielded empty result." + ); Ok(None) } } diff --git a/crates/subspace-farmer/src/utils/readers_and_pieces.rs b/crates/subspace-farmer/src/utils/readers_and_pieces.rs index 4b65ccfe6f..54f9edf626 100644 --- a/crates/subspace-farmer/src/utils/readers_and_pieces.rs +++ b/crates/subspace-farmer/src/utils/readers_and_pieces.rs @@ -1,5 +1,4 @@ use crate::single_disk_farm::piece_reader::PieceReader; -use crate::utils::archival_storage_pieces::ArchivalStoragePieces; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::future::Future; @@ -14,21 +13,18 @@ struct PieceDetails { piece_offset: PieceOffset, } -/// Wrapper data structure for pieces plotted under multiple plots and corresponding piece readers, -/// it also maintains filter in given [`ArchivalStoragePieces`]. +/// Wrapper data structure for pieces plotted under multiple plots and corresponding piece readers. #[derive(Debug)] pub struct ReadersAndPieces { readers: Vec, pieces: HashMap>, - archival_storage_pieces: ArchivalStoragePieces, } impl ReadersAndPieces { - pub fn new(readers: Vec, archival_storage_pieces: ArchivalStoragePieces) -> Self { + pub fn new(readers: Vec) -> Self { Self { readers, pieces: HashMap::new(), - archival_storage_pieces, } } @@ -74,8 +70,6 @@ impl ReadersAndPieces { } pub fn add_sector(&mut self, disk_farm_index: u8, plotted_sector: &PlottedSector) { - let mut new_piece_indices = Vec::new(); - for (piece_offset, &piece_index) in (PieceOffset::ZERO..).zip(plotted_sector.piece_indexes.iter()) { @@ -91,19 +85,12 @@ impl ReadersAndPieces { } Entry::Vacant(entry) => { entry.insert(vec![piece_details]); - new_piece_indices.push(piece_index); } } } - - if !new_piece_indices.is_empty() { - self.archival_storage_pieces.add_pieces(&new_piece_indices); - } } pub fn delete_sector(&mut self, disk_farm_index: u8, plotted_sector: &PlottedSector) { - let mut deleted_piece_indices = Vec::new(); - for (piece_offset, &piece_index) in (PieceOffset::ZERO..).zip(plotted_sector.piece_indexes.iter()) { @@ -129,15 +116,9 @@ impl ReadersAndPieces { // We do not store empty lists if piece_details.is_empty() { entry.remove_entry(); - deleted_piece_indices.push(piece_index); } } } - - if !deleted_piece_indices.is_empty() { - self.archival_storage_pieces - .delete_pieces(&deleted_piece_indices); - } } pub fn piece_indices(&self) -> impl Iterator { diff --git a/crates/subspace-networking/examples/benchmark.rs b/crates/subspace-networking/examples/benchmark.rs index dc96bd0508..827b685c3e 100644 --- a/crates/subspace-networking/examples/benchmark.rs +++ b/crates/subspace-networking/examples/benchmark.rs @@ -162,7 +162,7 @@ pub async fn configure_dsn( enable_autonat: false, ..default_config }; - let (node, mut node_runner_1) = subspace_networking::create(config).unwrap(); + let (node, mut node_runner_1) = subspace_networking::construct(config).unwrap(); let (node_address_sender, node_address_receiver) = oneshot::channel(); let on_new_listener_handler = node.on_new_listener(Arc::new({ diff --git a/crates/subspace-networking/examples/get-peers.rs b/crates/subspace-networking/examples/get-peers.rs index 8371c45847..acf345bfe1 100644 --- a/crates/subspace-networking/examples/get-peers.rs +++ b/crates/subspace-networking/examples/get-peers.rs @@ -15,7 +15,7 @@ async fn main() { allow_non_global_addresses_in_dht: true, ..Config::default() }; - let (node_1, mut node_runner_1) = subspace_networking::create(config_1).unwrap(); + let (node_1, mut node_runner_1) = subspace_networking::construct(config_1).unwrap(); println!("Node 1 ID is {}", node_1.id()); @@ -48,7 +48,7 @@ async fn main() { ..Config::default() }; - let (node_2, mut node_runner_2) = subspace_networking::create(config_2).unwrap(); + let (node_2, mut node_runner_2) = subspace_networking::construct(config_2).unwrap(); println!("Node 2 ID is {}", node_2.id()); diff --git a/crates/subspace-networking/examples/metrics.rs b/crates/subspace-networking/examples/metrics.rs index 48e30e10da..a6616edfad 100644 --- a/crates/subspace-networking/examples/metrics.rs +++ b/crates/subspace-networking/examples/metrics.rs @@ -24,12 +24,13 @@ async fn main() { metrics: Some(metrics), ..Config::default() }; - let (node_1, mut node_runner_1) = subspace_networking::create(config_1).unwrap(); + let (node_1, mut node_runner_1) = subspace_networking::construct(config_1).unwrap(); // Init prometheus let prometheus_metrics_server_address = "127.0.0.1:63000".parse().unwrap(); - match start_prometheus_metrics_server(prometheus_metrics_server_address, metric_registry) { + match start_prometheus_metrics_server(vec![prometheus_metrics_server_address], metric_registry) + { Err(err) => { error!( ?prometheus_metrics_server_address, @@ -75,7 +76,7 @@ async fn main() { ..Config::default() }; - let (node_2, mut node_runner_2) = subspace_networking::create(config_2).unwrap(); + let (node_2, mut node_runner_2) = subspace_networking::construct(config_2).unwrap(); println!("Node 2 ID is {}", node_2.id()); diff --git a/crates/subspace-networking/examples/networking.rs b/crates/subspace-networking/examples/networking.rs index ccd4b3bd0a..08fc896b41 100644 --- a/crates/subspace-networking/examples/networking.rs +++ b/crates/subspace-networking/examples/networking.rs @@ -20,7 +20,7 @@ async fn main() { allow_non_global_addresses_in_dht: true, ..Config::default() }; - let (node_1, mut node_runner_1) = subspace_networking::create(config_1).unwrap(); + let (node_1, mut node_runner_1) = subspace_networking::construct(config_1).unwrap(); println!("Node 1 ID is {}", node_1.id()); @@ -55,7 +55,7 @@ async fn main() { ..Config::default() }; - let (node_2, mut node_runner_2) = subspace_networking::create(config_2).unwrap(); + let (node_2, mut node_runner_2) = subspace_networking::construct(config_2).unwrap(); println!("Node 2 ID is {}", node_2.id()); diff --git a/crates/subspace-networking/examples/requests.rs b/crates/subspace-networking/examples/requests.rs index 4dedce1b11..75efd4b328 100644 --- a/crates/subspace-networking/examples/requests.rs +++ b/crates/subspace-networking/examples/requests.rs @@ -36,7 +36,7 @@ async fn main() { )], ..Config::default() }; - let (node_1, mut node_runner_1) = subspace_networking::create(config_1).unwrap(); + let (node_1, mut node_runner_1) = subspace_networking::construct(config_1).unwrap(); println!("Node 1 ID is {}", node_1.id()); @@ -72,7 +72,7 @@ async fn main() { ..Config::default() }; - let (node_2, mut node_runner_2) = subspace_networking::create(config_2).unwrap(); + let (node_2, mut node_runner_2) = subspace_networking::construct(config_2).unwrap(); println!("Node 2 ID is {}", node_2.id()); diff --git a/crates/subspace-networking/src/behavior.rs b/crates/subspace-networking/src/behavior.rs index d1c7d7fb63..20d87321f3 100644 --- a/crates/subspace-networking/src/behavior.rs +++ b/crates/subspace-networking/src/behavior.rs @@ -2,17 +2,17 @@ pub(crate) mod persistent_parameters; #[cfg(test)] mod tests; -use crate::connected_peers::{ +use crate::protocols::connected_peers::{ Behaviour as ConnectedPeersBehaviour, Config as ConnectedPeersConfig, Event as ConnectedPeersEvent, }; -use crate::peer_info::{ +use crate::protocols::peer_info::{ Behaviour as PeerInfoBehaviour, Config as PeerInfoConfig, Event as PeerInfoEvent, }; -use crate::request_responses::{ - Event as RequestResponseEvent, RequestHandler, RequestResponsesBehaviour, +use crate::protocols::request_response::request_response_factory::{ + Event as RequestResponseEvent, RequestHandler, RequestResponseFactoryBehaviour, }; -use crate::reserved_peers::{ +use crate::protocols::reserved_peers::{ Behaviour as ReservedPeersBehaviour, Config as ReservedPeersConfig, Event as ReservedPeersEvent, }; use crate::PeerInfoProvider; @@ -75,7 +75,7 @@ pub(crate) struct Behavior { pub(crate) kademlia: Kademlia, pub(crate) gossipsub: Toggle, pub(crate) ping: Ping, - pub(crate) request_response: RequestResponsesBehaviour, + pub(crate) request_response: RequestResponseFactoryBehaviour, pub(crate) connection_limits: ConnectionLimitsBehaviour, pub(crate) block_list: BlockListBehaviour, pub(crate) reserved_peers: ReservedPeersBehaviour, @@ -119,9 +119,11 @@ where kademlia, gossipsub, ping: Ping::default(), - request_response: RequestResponsesBehaviour::new(config.request_response_protocols) - //TODO: Convert to an error. - .expect("RequestResponse protocols registration failed."), + request_response: RequestResponseFactoryBehaviour::new( + config.request_response_protocols, + ) + //TODO: Convert to an error. + .expect("RequestResponse protocols registration failed."), connection_limits: ConnectionLimitsBehaviour::new(config.connection_limits), block_list: BlockListBehaviour::default(), reserved_peers: ReservedPeersBehaviour::new(config.reserved_peers), diff --git a/crates/subspace-networking/src/behavior/tests.rs b/crates/subspace-networking/src/behavior/tests.rs index 78c63ebbd9..c3c0b84874 100644 --- a/crates/subspace-networking/src/behavior/tests.rs +++ b/crates/subspace-networking/src/behavior/tests.rs @@ -191,7 +191,7 @@ async fn test_async_handler_works_with_pending_internal_future() { )], ..Config::default() }; - let (node_1, mut node_runner_1) = crate::create(config_1).unwrap(); + let (node_1, mut node_runner_1) = crate::construct(config_1).unwrap(); let (node_1_address_sender, node_1_address_receiver) = oneshot::channel(); let on_new_listener_handler = node_1.on_new_listener(Arc::new({ @@ -225,7 +225,7 @@ async fn test_async_handler_works_with_pending_internal_future() { ..Config::default() }; - let (node_2, mut node_runner_2) = crate::create(config_2).unwrap(); + let (node_2, mut node_runner_2) = crate::construct(config_2).unwrap(); let bootstrap_fut = Box::pin({ let node = node_2.clone(); diff --git a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs index d09f9a4abe..59f45c67d9 100644 --- a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs +++ b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs @@ -3,14 +3,18 @@ #![feature(type_changing_struct_update)] use clap::Parser; +use futures::{select, FutureExt}; use libp2p::identity::ed25519::Keypair; +use libp2p::metrics::Metrics; use libp2p::{identity, Multiaddr, PeerId}; +use prometheus_client::registry::Registry; use serde::{Deserialize, Serialize}; use std::error::Error; use std::fmt::{Display, Formatter}; +use std::net::SocketAddr; use std::sync::Arc; use subspace_networking::libp2p::multiaddr::Protocol; -use subspace_networking::{peer_id, Config}; +use subspace_networking::{peer_id, start_prometheus_metrics_server, Config}; use tracing::{debug, info, Level}; use tracing_subscriber::fmt::Subscriber; use tracing_subscriber::util::SubscriberInitExt; @@ -55,6 +59,10 @@ enum Command { /// Known external addresses #[arg(long, alias = "external-address")] external_addresses: Vec, + /// Defines endpoints for the prometheus metrics server. It doesn't start without at least + /// one specified endpoint. Format: 127.0.0.1:8080 + #[arg(long, alias = "metrics-endpoint")] + metrics_endpoints: Vec, }, /// Generate a new keypair GenerateKeypair { @@ -117,6 +125,7 @@ async fn main() -> Result<(), Box> { enable_private_ips, protocol_version, external_addresses, + metrics_endpoints, } => { debug!( "Libp2p protocol stack instantiated with version: {} ", @@ -126,6 +135,16 @@ async fn main() -> Result<(), Box> { let decoded_keypair = Keypair::try_from_bytes(hex::decode(keypair)?.as_mut_slice())?; let keypair = identity::Keypair::from(decoded_keypair); + // Metrics + let mut metric_registry = Registry::default(); + let metrics_endpoints_are_specified = !metrics_endpoints.is_empty(); + let metrics = + metrics_endpoints_are_specified.then(|| Metrics::new(&mut metric_registry)); + + let prometheus_task = metrics_endpoints_are_specified + .then(|| start_prometheus_metrics_server(metrics_endpoints, metric_registry)) + .transpose()?; + let config = Config { listen_on, allow_non_global_addresses_in_dht: enable_private_ips, @@ -139,11 +158,12 @@ async fn main() -> Result<(), Box> { special_connected_peers_handler: None, bootstrap_addresses: bootstrap_nodes, external_addresses, + metrics, ..Config::new(protocol_version.to_string(), keypair, (), None) }; let (node, mut node_runner) = - subspace_networking::create(config).expect("Networking stack creation failed."); + subspace_networking::construct(config).expect("Networking stack creation failed."); node.on_new_listener(Arc::new({ let node_id = node.id(); @@ -158,7 +178,14 @@ async fn main() -> Result<(), Box> { .detach(); info!("Subspace Bootstrap Node started"); - node_runner.run().await; + if let Some(prometheus_task) = prometheus_task { + select! { + _ = node_runner.run().fuse() => {}, + _ = prometheus_task.fuse() => {}, + } + } else { + node_runner.run().await + } } Command::GenerateKeypair { json } => { let output = KeypairOutput::new(Keypair::generate()); diff --git a/crates/subspace-networking/src/create.rs b/crates/subspace-networking/src/constructor.rs similarity index 98% rename from crates/subspace-networking/src/create.rs rename to crates/subspace-networking/src/constructor.rs index cdda163043..45e101f9a4 100644 --- a/crates/subspace-networking/src/create.rs +++ b/crates/subspace-networking/src/constructor.rs @@ -5,14 +5,14 @@ use crate::behavior::persistent_parameters::{ NetworkingParametersRegistry, StubNetworkingParametersManager, }; use crate::behavior::{Behavior, BehaviorConfig}; -use crate::connected_peers::Config as ConnectedPeersConfig; -use crate::create::temporary_bans::TemporaryBans; -use crate::create::transport::build_transport; +use crate::constructor::temporary_bans::TemporaryBans; +use crate::constructor::transport::build_transport; use crate::node::Node; use crate::node_runner::{NodeRunner, NodeRunnerConfig}; -use crate::peer_info::PeerInfoProvider; -use crate::request_responses::RequestHandler; -use crate::reserved_peers::Config as ReservedPeersConfig; +use crate::protocols::connected_peers::Config as ConnectedPeersConfig; +use crate::protocols::peer_info::PeerInfoProvider; +use crate::protocols::request_response::request_response_factory::RequestHandler; +use crate::protocols::reserved_peers::Config as ReservedPeersConfig; use crate::shared::Shared; use crate::utils::{strip_peer_id, ResizableSemaphore}; use crate::{PeerInfo, PeerInfoConfig}; @@ -390,7 +390,7 @@ pub fn peer_id(keypair: &identity::Keypair) -> PeerId { } /// Create a new network node and node runner instances. -pub fn create( +pub fn construct( config: Config, ) -> Result<(Node, NodeRunner), CreationError> where diff --git a/crates/subspace-networking/src/create/temporary_bans.rs b/crates/subspace-networking/src/constructor/temporary_bans.rs similarity index 100% rename from crates/subspace-networking/src/create/temporary_bans.rs rename to crates/subspace-networking/src/constructor/temporary_bans.rs diff --git a/crates/subspace-networking/src/create/transport.rs b/crates/subspace-networking/src/constructor/transport.rs similarity index 99% rename from crates/subspace-networking/src/create/transport.rs rename to crates/subspace-networking/src/constructor/transport.rs index c2a881a56d..7468ea76d4 100644 --- a/crates/subspace-networking/src/create/transport.rs +++ b/crates/subspace-networking/src/constructor/transport.rs @@ -1,4 +1,4 @@ -use crate::create::temporary_bans::TemporaryBans; +use crate::constructor::temporary_bans::TemporaryBans; use crate::CreationError; use futures::future::Either; use libp2p::core::multiaddr::{Multiaddr, Protocol}; diff --git a/crates/subspace-networking/src/lib.rs b/crates/subspace-networking/src/lib.rs index b9db2519a8..13f984cbed 100644 --- a/crates/subspace-networking/src/lib.rs +++ b/crates/subspace-networking/src/lib.rs @@ -19,14 +19,11 @@ #![warn(missing_docs)] mod behavior; -mod connected_peers; -mod create; +mod constructor; mod node; mod node_runner; -mod peer_info; -mod request_handlers; -mod request_responses; -mod reserved_peers; +mod protocols; + mod shared; pub mod utils; @@ -37,17 +34,18 @@ pub use crate::node::{ GetClosestPeersError, Node, SendRequestError, SubscribeError, TopicSubscription, }; pub use crate::node_runner::NodeRunner; -pub use crate::peer_info::{ - Config as PeerInfoConfig, CuckooFilterDTO, CuckooFilterProvider, Notification, - NotificationHandler, PeerInfo, PeerInfoProvider, +pub use crate::protocols::peer_info::{ + Config as PeerInfoConfig, Notification, NotificationHandler, PeerInfo, PeerInfoProvider, }; -pub use create::{create, peer_id, Config, CreationError, LocalRecordProvider}; +pub use constructor::{construct, peer_id, Config, CreationError, LocalRecordProvider}; pub use libp2p; -pub use request_handlers::generic_request_handler::{GenericRequest, GenericRequestHandler}; -pub use request_handlers::piece_by_index::{ +pub use protocols::request_response::handlers::generic_request_handler::{ + GenericRequest, GenericRequestHandler, +}; +pub use protocols::request_response::handlers::piece_by_index::{ PieceByIndexRequest, PieceByIndexRequestHandler, PieceByIndexResponse, }; -pub use request_handlers::segment_header::{ +pub use protocols::request_response::handlers::segment_header::{ SegmentHeaderBySegmentIndexesRequestHandler, SegmentHeaderRequest, SegmentHeaderResponse, }; pub use shared::NewPeerInfo; diff --git a/crates/subspace-networking/src/node.rs b/crates/subspace-networking/src/node.rs index 5a80784b8e..60f9fdaf68 100644 --- a/crates/subspace-networking/src/node.rs +++ b/crates/subspace-networking/src/node.rs @@ -1,8 +1,9 @@ -use crate::request_handlers::generic_request_handler::GenericRequest; +use crate::protocols::request_response::handlers::generic_request_handler::GenericRequest; +use crate::protocols::request_response::request_response_factory; +pub use crate::shared::NewPeerInfo; use crate::shared::{Command, CreatedSubscription, Shared}; use crate::utils::multihash::Multihash; use crate::utils::{HandlerFn, ResizableSemaphorePermit}; -use crate::{request_responses, NewPeerInfo}; use bytes::Bytes; use event_listener_primitives::HandlerId; use futures::channel::mpsc::SendError; @@ -189,7 +190,7 @@ pub enum SendRequestError { NodeRunnerDropped, /// Underlying protocol returned an error, impossible to get response. #[error("Underlying protocol returned an error: {0}")] - ProtocolFailure(#[from] request_responses::RequestFailure), + ProtocolFailure(#[from] request_response_factory::RequestFailure), /// Underlying protocol returned an incorrect format, impossible to get response. #[error("Received incorrectly formatted response: {0}")] IncorrectResponseFormat(#[from] parity_scale_codec::Error), @@ -513,7 +514,7 @@ impl Node { Ok(()) } - /// Callback is called when we receive new [`crate::peer_info::PeerInfo`] + /// Callback is called when we receive new [`crate::protocols::peer_info::PeerInfo`] pub fn on_peer_info(&self, callback: HandlerFn) -> HandlerId { self.shared.handlers.new_peer_info.add(callback) } diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index dbbdcd5482..6f0053298b 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -5,15 +5,17 @@ use crate::behavior::persistent_parameters::{ use crate::behavior::{ Behavior, Event, GeneralConnectedPeersInstance, SpecialConnectedPeersInstance, }; -use crate::connected_peers::Event as ConnectedPeersEvent; -use crate::create; -use crate::create::temporary_bans::TemporaryBans; -use crate::create::{ +use crate::constructor; +use crate::constructor::temporary_bans::TemporaryBans; +use crate::constructor::{ ConnectedPeersHandler, LocalOnlyRecordStore, KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER, REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER, }; -use crate::peer_info::{Event as PeerInfoEvent, PeerInfoSuccess}; -use crate::request_responses::{Event as RequestResponseEvent, IfDisconnected}; +use crate::protocols::connected_peers::Event as ConnectedPeersEvent; +use crate::protocols::peer_info::{Event as PeerInfoEvent, PeerInfoSuccess}; +use crate::protocols::request_response::request_response_factory::{ + Event as RequestResponseEvent, IfDisconnected, +}; use crate::shared::{Command, CreatedSubscription, NewPeerInfo, Shared}; use crate::utils::{ is_global_address_or_dns, strip_peer_id, PeerAddress, ResizableSemaphorePermit, @@ -98,7 +100,7 @@ enum BootstrapCommandState { #[must_use = "Node does not function properly unless its runner is driven forward"] pub struct NodeRunner where - LocalRecordProvider: create::LocalRecordProvider + Send + Sync + 'static, + LocalRecordProvider: constructor::LocalRecordProvider + Send + Sync + 'static, { /// Should non-global addresses be added to the DHT? allow_non_global_addresses_in_dht: bool, @@ -154,7 +156,7 @@ where // Helper struct for NodeRunner configuration (clippy requirement). pub(crate) struct NodeRunnerConfig where - LocalRecordProvider: create::LocalRecordProvider + Send + Sync + 'static, + LocalRecordProvider: constructor::LocalRecordProvider + Send + Sync + 'static, { pub(crate) allow_non_global_addresses_in_dht: bool, pub(crate) command_receiver: mpsc::Receiver, @@ -175,7 +177,7 @@ where impl NodeRunner where - LocalRecordProvider: create::LocalRecordProvider + Send + Sync + 'static, + LocalRecordProvider: constructor::LocalRecordProvider + Send + Sync + 'static, { pub(crate) fn new( NodeRunnerConfig { diff --git a/crates/subspace-networking/src/protocols.rs b/crates/subspace-networking/src/protocols.rs new file mode 100644 index 0000000000..ec72d2dd89 --- /dev/null +++ b/crates/subspace-networking/src/protocols.rs @@ -0,0 +1,4 @@ +pub(crate) mod connected_peers; +pub mod peer_info; +pub mod request_response; +pub(crate) mod reserved_peers; diff --git a/crates/subspace-networking/src/connected_peers.rs b/crates/subspace-networking/src/protocols/connected_peers.rs similarity index 100% rename from crates/subspace-networking/src/connected_peers.rs rename to crates/subspace-networking/src/protocols/connected_peers.rs diff --git a/crates/subspace-networking/src/connected_peers/handler.rs b/crates/subspace-networking/src/protocols/connected_peers/handler.rs similarity index 100% rename from crates/subspace-networking/src/connected_peers/handler.rs rename to crates/subspace-networking/src/protocols/connected_peers/handler.rs diff --git a/crates/subspace-networking/src/peer_info.rs b/crates/subspace-networking/src/protocols/peer_info.rs similarity index 71% rename from crates/subspace-networking/src/peer_info.rs rename to crates/subspace-networking/src/protocols/peer_info.rs index 2b415c283d..e3e2039216 100644 --- a/crates/subspace-networking/src/peer_info.rs +++ b/crates/subspace-networking/src/protocols/peer_info.rs @@ -1,8 +1,7 @@ mod handler; mod protocol; -use crate::peer_info::handler::HandlerInEvent; -use event_listener_primitives::HandlerId; +use crate::protocols::peer_info::handler::HandlerInEvent; use handler::Handler; pub use handler::{Config, PeerInfoError, PeerInfoSuccess}; use libp2p::core::{Endpoint, Multiaddr}; @@ -15,9 +14,7 @@ use libp2p::PeerId; use parity_scale_codec::{Decode, Encode}; use parking_lot::Mutex; use std::collections::{HashSet, VecDeque}; -use std::fmt; use std::fmt::Debug; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::task::{Context, Poll, Waker}; use tracing::debug; @@ -28,22 +25,16 @@ pub struct Notification; /// Defines a subscription to a peer-info notification. pub type NotificationHandler = Arc; -/// Cuckoo filter data transfer object. -#[derive(Clone, Encode, Decode, Default)] -pub struct CuckooFilterDTO { - /// Exported cuckoo filter values. - pub values: Vec, +//TODO: remove on the next networking breaking change +/// Backward compatibility placeholder for the obsolete CuckooFilterDTO - +/// "Cuckoo filter data transfer object". +#[derive(Clone, Encode, Decode, Default, Debug)] +pub struct PlaceHolder { + /// A placeholder for: "Exported cuckoo filter values" field of the CuckooFilterDTO. + pub field1: Vec, /// Cuckoo filter items. - pub length: u64, -} - -impl fmt::Debug for CuckooFilterDTO { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("CuckooFilterDTO") - .field("values", &self.length) - .field("length", &self.values.len()) - .finish() - } + /// A placeholder for: "Cuckoo filter items." field of the CuckooFilterDTO. + pub field2: u64, } #[derive(Clone, Encode, Decode, Default, Debug)] @@ -51,8 +42,8 @@ impl fmt::Debug for CuckooFilterDTO { pub enum PeerInfo { /// DSN farmer. Farmer { - /// Peer info data. - cuckoo_filter: Arc, + /// Backward compatibility placeholder. + placeholder: PlaceHolder, }, /// DSN node. Node, @@ -81,10 +72,6 @@ pub struct Behaviour { requests: Vec, /// Provides up-to-date peer info. peer_info_provider: PeerInfoProvider, - /// Whether the behaviour should notify connected peers. - should_notify_handlers: Arc, - /// We just save the handler ID. - _notify_handler_id: Option, /// Known connected peers. connected_peers: HashSet, /// Future waker. @@ -108,15 +95,7 @@ pub enum PeerInfoProvider { /// Provides peer-info for Client peer type. Client, /// Provides peer-info for Farmer peer type. - Farmer(Box), -} - -/// Provides the current cuckoo-filter data. -pub trait CuckooFilterProvider: Debug + 'static { - /// Returns the current cuckoo filter data. - fn cuckoo_filter(&self) -> CuckooFilterDTO; - /// Subscribe to cuckoo filter updates and invoke provided callback. - fn on_notification(&self, callback: NotificationHandler) -> Option; + Farmer, } impl PeerInfoProvider { @@ -133,8 +112,8 @@ impl PeerInfoProvider { Self::Client } /// Creates a new Farmer peer-info provider. - pub fn new_farmer(provider: Box) -> Self { - Self::Farmer(provider) + pub fn new_farmer() -> Self { + Self::Farmer } /// Returns the peer info data. @@ -143,20 +122,11 @@ impl PeerInfoProvider { PeerInfoProvider::Node => PeerInfo::Node, PeerInfoProvider::BootstrapNode => PeerInfo::BootstrapNode, PeerInfoProvider::Client => PeerInfo::Client, - PeerInfoProvider::Farmer(provider) => PeerInfo::Farmer { - cuckoo_filter: Arc::new(provider.cuckoo_filter()), + PeerInfoProvider::Farmer => PeerInfo::Farmer { + placeholder: Default::default(), }, } } - /// Subscribe to peer info updates and invoke provided callback. - pub fn on_notification(&self, handler: NotificationHandler) -> Option { - match self { - PeerInfoProvider::Node | PeerInfoProvider::BootstrapNode | PeerInfoProvider::Client => { - None - } - PeerInfoProvider::Farmer(provider) => provider.on_notification(handler), - } - } } /// Event generated by the `Peer Info` network behaviour. @@ -172,26 +142,12 @@ impl Behaviour { /// Creates a new `Peer Info` network behaviour with the given configuration. pub fn new(config: Config, peer_info_provider: PeerInfoProvider) -> Self { let waker = Arc::new(Mutex::new(None::)); - let should_notify_handlers = Arc::new(AtomicBool::new(false)); - let _notify_handler_id = peer_info_provider.on_notification({ - let should_notify_handlers = should_notify_handlers.clone(); - let waker = waker.clone(); - - Arc::new(move |_| { - should_notify_handlers.store(true, Ordering::SeqCst); - if let Some(waker) = waker.lock().as_mut() { - waker.wake_by_ref(); - } - }) - }); Self { - _notify_handler_id, config, peer_info_provider, events: VecDeque::new(), requests: Vec::new(), - should_notify_handlers, connected_peers: HashSet::new(), waker, } @@ -246,19 +202,6 @@ impl NetworkBehaviour for Behaviour { cx: &mut Context<'_>, _: &mut impl PollParameters, ) -> Poll>> { - if self.should_notify_handlers.swap(false, Ordering::SeqCst) { - debug!("Notify peer-info handlers."); - - self.requests.clear(); - let peer_info = Arc::new(self.peer_info_provider.peer_info()); - for peer_id in self.connected_peers.iter().cloned() { - self.requests.push(Request { - peer_id, - peer_info: peer_info.clone(), - }); - } - } - if let Some(e) = self.events.pop_back() { let Event { result, peer_id } = &e; diff --git a/crates/subspace-networking/src/peer_info/handler.rs b/crates/subspace-networking/src/protocols/peer_info/handler.rs similarity index 99% rename from crates/subspace-networking/src/peer_info/handler.rs rename to crates/subspace-networking/src/protocols/peer_info/handler.rs index 8bcc819ba3..83174b31f3 100644 --- a/crates/subspace-networking/src/peer_info/handler.rs +++ b/crates/subspace-networking/src/protocols/peer_info/handler.rs @@ -1,4 +1,4 @@ -use crate::peer_info::{protocol, PeerInfo}; +use crate::protocols::peer_info::{protocol, PeerInfo}; use futures::future::BoxFuture; use futures::prelude::*; use libp2p::core::upgrade::ReadyUpgrade; diff --git a/crates/subspace-networking/src/peer_info/protocol.rs b/crates/subspace-networking/src/protocols/peer_info/protocol.rs similarity index 96% rename from crates/subspace-networking/src/peer_info/protocol.rs rename to crates/subspace-networking/src/protocols/peer_info/protocol.rs index 0d1991eab1..f17903f15b 100644 --- a/crates/subspace-networking/src/peer_info/protocol.rs +++ b/crates/subspace-networking/src/protocols/peer_info/protocol.rs @@ -1,6 +1,6 @@ //! This module defines low-level functions for working with inbound and outbound streams. -use crate::peer_info::PeerInfo; +use crate::protocols::peer_info::PeerInfo; use futures::prelude::*; use parity_scale_codec::{Decode, Encode}; use std::io; diff --git a/crates/subspace-networking/src/protocols/request_response.rs b/crates/subspace-networking/src/protocols/request_response.rs new file mode 100644 index 0000000000..3bd6115147 --- /dev/null +++ b/crates/subspace-networking/src/protocols/request_response.rs @@ -0,0 +1,2 @@ +pub(crate) mod handlers; +pub(crate) mod request_response_factory; diff --git a/crates/subspace-networking/src/request_handlers.rs b/crates/subspace-networking/src/protocols/request_response/handlers.rs similarity index 100% rename from crates/subspace-networking/src/request_handlers.rs rename to crates/subspace-networking/src/protocols/request_response/handlers.rs diff --git a/crates/subspace-networking/src/request_handlers/generic_request_handler.rs b/crates/subspace-networking/src/protocols/request_response/handlers/generic_request_handler.rs similarity index 97% rename from crates/subspace-networking/src/request_handlers/generic_request_handler.rs rename to crates/subspace-networking/src/protocols/request_response/handlers/generic_request_handler.rs index 76dd7ff97a..95a3f742fa 100644 --- a/crates/subspace-networking/src/request_handlers/generic_request_handler.rs +++ b/crates/subspace-networking/src/protocols/request_response/handlers/generic_request_handler.rs @@ -14,7 +14,9 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig, RequestHandler}; +use crate::protocols::request_response::request_response_factory::{ + IncomingRequest, OutgoingResponse, ProtocolConfig, RequestHandler, +}; use async_trait::async_trait; use futures::channel::mpsc; use futures::prelude::*; diff --git a/crates/subspace-networking/src/request_handlers/piece_by_index.rs b/crates/subspace-networking/src/protocols/request_response/handlers/piece_by_index.rs similarity index 91% rename from crates/subspace-networking/src/request_handlers/piece_by_index.rs rename to crates/subspace-networking/src/protocols/request_response/handlers/piece_by_index.rs index 9e6b03eaa9..38f330b92f 100644 --- a/crates/subspace-networking/src/request_handlers/piece_by_index.rs +++ b/crates/subspace-networking/src/protocols/request_response/handlers/piece_by_index.rs @@ -3,7 +3,7 @@ //! Handle (i.e. answer) incoming pieces requests from a remote peer received via //! `RequestResponsesBehaviour` with generic [`GenericRequestHandler`]. -use crate::request_handlers::generic_request_handler::{GenericRequest, GenericRequestHandler}; +use super::generic_request_handler::{GenericRequest, GenericRequestHandler}; use parity_scale_codec::{Decode, Encode}; use subspace_core_primitives::{Piece, PieceIndex}; diff --git a/crates/subspace-networking/src/request_handlers/segment_header.rs b/crates/subspace-networking/src/protocols/request_response/handlers/segment_header.rs similarity index 93% rename from crates/subspace-networking/src/request_handlers/segment_header.rs rename to crates/subspace-networking/src/protocols/request_response/handlers/segment_header.rs index dd46841f4a..1e925ec65c 100644 --- a/crates/subspace-networking/src/request_handlers/segment_header.rs +++ b/crates/subspace-networking/src/protocols/request_response/handlers/segment_header.rs @@ -3,7 +3,7 @@ //! Handle (i.e. answer) incoming segment headers requests from a remote peer received via //! `RequestResponsesBehaviour` with generic [`GenericRequestHandler`]. -use crate::request_handlers::generic_request_handler::{GenericRequest, GenericRequestHandler}; +use super::generic_request_handler::{GenericRequest, GenericRequestHandler}; use parity_scale_codec::{Decode, Encode}; use subspace_core_primitives::{SegmentHeader, SegmentIndex}; diff --git a/crates/subspace-networking/src/request_responses.rs b/crates/subspace-networking/src/protocols/request_response/request_response_factory.rs similarity index 99% rename from crates/subspace-networking/src/request_responses.rs rename to crates/subspace-networking/src/protocols/request_response/request_response_factory.rs index 8053e33f75..9123caa509 100644 --- a/crates/subspace-networking/src/request_responses.rs +++ b/crates/subspace-networking/src/protocols/request_response/request_response_factory.rs @@ -187,7 +187,7 @@ pub struct OutgoingResponse { pub sent_feedback: Option>, } -/// Event generated by the [`RequestResponsesBehaviour`]. +/// Event generated by the [`RequestResponseFactoryBehaviour`]. #[derive(Debug)] pub enum Event { /// A remote sent a request and either we have successfully answered it or an error happened. @@ -205,7 +205,7 @@ pub enum Event { result: Result<(), ResponseFailure>, }, - /// A request initiated using [`RequestResponsesBehaviour::send_request`] has succeeded or + /// A request initiated using [`RequestResponseFactoryBehaviour::send_request`] has succeeded or /// failed. /// /// This event is generated for statistics purposes. @@ -264,9 +264,10 @@ impl IfDisconnected { } } -/// Implementation of `NetworkBehaviour` that provides support for request-response protocols. +/// Implementation of `NetworkBehaviour` that provides support for multiple request-response +/// protocols. #[allow(clippy::type_complexity)] // to preserve compatibility with copied implementation -pub struct RequestResponsesBehaviour { +pub struct RequestResponseFactoryBehaviour { /// The multiple sub-protocols, by name. /// Contains the underlying libp2p `RequestResponse` behaviour, plus an optional /// "response builder" used to build responses for incoming requests. @@ -314,7 +315,7 @@ struct RequestProcessingOutcome { response: OutgoingResponse, } -impl RequestResponsesBehaviour { +impl RequestResponseFactoryBehaviour { /// Creates a new behaviour. Must be passed a list of supported protocols. Returns an error if /// the same protocol is passed twice. pub fn new( @@ -413,7 +414,7 @@ impl RequestResponsesBehaviour { } } -impl NetworkBehaviour for RequestResponsesBehaviour { +impl NetworkBehaviour for RequestResponseFactoryBehaviour { type ConnectionHandler = MultiHandler< String, as NetworkBehaviour>::ConnectionHandler, diff --git a/crates/subspace-networking/src/request_responses/tests.rs b/crates/subspace-networking/src/protocols/request_response/request_response_factory/tests.rs similarity index 98% rename from crates/subspace-networking/src/request_responses/tests.rs rename to crates/subspace-networking/src/protocols/request_response/request_response_factory/tests.rs index 20486cfa8a..91d4bdde67 100644 --- a/crates/subspace-networking/src/request_responses/tests.rs +++ b/crates/subspace-networking/src/protocols/request_response/request_response_factory/tests.rs @@ -1,6 +1,6 @@ -use crate::request_responses::{ +use crate::protocols::request_response::request_response_factory::{ Event, IfDisconnected, IncomingRequest, OutboundFailure, OutgoingResponse, ProtocolConfig, - RequestFailure, RequestHandler, RequestResponsesBehaviour, + RequestFailure, RequestHandler, RequestResponseFactoryBehaviour, }; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; @@ -37,7 +37,7 @@ impl RequestHandler for MockRunner { fn build_swarm( list: impl Iterator, -) -> (Swarm, Multiaddr) { +) -> (Swarm, Multiaddr) { let keypair = Keypair::generate_ed25519(); let transport = MemoryTransport::new() @@ -50,7 +50,7 @@ fn build_swarm( .into_iter() .map(|config| Box::new(MockRunner(config)) as Box) .collect::>(); - let behaviour = RequestResponsesBehaviour::new(configs).unwrap(); + let behaviour = RequestResponseFactoryBehaviour::new(configs).unwrap(); let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, keypair.public().to_peer_id()) diff --git a/crates/subspace-networking/src/reserved_peers.rs b/crates/subspace-networking/src/protocols/reserved_peers.rs similarity index 100% rename from crates/subspace-networking/src/reserved_peers.rs rename to crates/subspace-networking/src/protocols/reserved_peers.rs diff --git a/crates/subspace-networking/src/reserved_peers/handler.rs b/crates/subspace-networking/src/protocols/reserved_peers/handler.rs similarity index 100% rename from crates/subspace-networking/src/reserved_peers/handler.rs rename to crates/subspace-networking/src/protocols/reserved_peers/handler.rs diff --git a/crates/subspace-networking/src/shared.rs b/crates/subspace-networking/src/shared.rs index 186dc68e57..ab67d3166b 100644 --- a/crates/subspace-networking/src/shared.rs +++ b/crates/subspace-networking/src/shared.rs @@ -1,8 +1,8 @@ //! Data structures shared between node and node runner, facilitating exchange and creation of //! queries, subscriptions, various events and shared information. -use crate::peer_info::PeerInfo; -use crate::request_responses::RequestFailure; +use crate::protocols::peer_info::PeerInfo; +use crate::protocols::request_response::request_response_factory::RequestFailure; use crate::utils::multihash::Multihash; use crate::utils::{Handler, ResizableSemaphore, ResizableSemaphorePermit}; use bytes::Bytes; diff --git a/crates/subspace-networking/src/utils/prometheus.rs b/crates/subspace-networking/src/utils/prometheus.rs index 13e6f33503..e485409bd1 100644 --- a/crates/subspace-networking/src/utils/prometheus.rs +++ b/crates/subspace-networking/src/utils/prometheus.rs @@ -24,17 +24,17 @@ async fn metrics(registry: Data) -> Result, registry: Registry, ) -> std::io::Result>> { let shared_registry = Arc::new(Mutex::new(registry)); let data = Data::new(shared_registry); - info!("Starting metrics server on {} ...", address); + info!(?endpoints, "Starting metrics server...",); Ok( HttpServer::new(move || App::new().app_data(data.clone()).service(metrics)) - .bind(address)? + .bind(endpoints.as_slice())? .run(), ) } diff --git a/crates/subspace-service/src/dsn.rs b/crates/subspace-service/src/dsn.rs index 202fbbeaed..525f13430d 100644 --- a/crates/subspace-service/src/dsn.rs +++ b/crates/subspace-service/src/dsn.rs @@ -180,5 +180,5 @@ where ..default_networking_config }; - subspace_networking::create(networking_config).map_err(Into::into) + subspace_networking::construct(networking_config).map_err(Into::into) } diff --git a/domains/client/domain-operator/Cargo.toml b/domains/client/domain-operator/Cargo.toml index 8b336149fc..5458752d44 100644 --- a/domains/client/domain-operator/Cargo.toml +++ b/domains/client/domain-operator/Cargo.toml @@ -46,6 +46,7 @@ domain-client-message-relayer = { version = "0.1.0", path = "../relayer" } domain-test-service = { version = "0.1.0", path = "../../test/service" } domain-test-primitives = { version = "0.1.0", path = "../../test/primitives" } evm-domain-test-runtime = { version = "0.1.0", path = "../../test/runtime/evm" } +frame-system = { version = "4.0.0-dev", default-features = false, git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71" } num-traits = "0.2.15" pallet-balances = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71" } pallet-domains = { version = "0.1.0", path = "../../../crates/pallet-domains" } diff --git a/domains/client/domain-operator/src/aux_schema.rs b/domains/client/domain-operator/src/aux_schema.rs index dd57d8f1f0..c744a866f8 100644 --- a/domains/client/domain-operator/src/aux_schema.rs +++ b/domains/client/domain-operator/src/aux_schema.rs @@ -24,9 +24,12 @@ const BAD_RECEIPT_MISMATCH_INFO: &[u8] = b"bad_receipt_mismatch_info"; /// NOTE: Unbounded but the size is not expected to be large. const BAD_RECEIPT_NUMBERS: &[u8] = b"bad_receipt_numbers"; -/// domain_block_hash => consensus_block_hash +/// domain_block_hash => Vec /// /// Updated only when there is a new domain block produced +/// +/// NOTE: different consensus blocks may derive the exact same domain block, thus one domain block may +/// mapping to multiple consensus block. const CONSENSUS_HASH: &[u8] = b"consensus_block_hash"; /// domain_block_hash => latest_consensus_block_hash @@ -128,6 +131,13 @@ where } } + let consensus_hashes = { + let mut hashes = + consensus_block_hash_for::(backend, domain_hash)?; + hashes.push(consensus_hash); + hashes + }; + backend.insert_aux( &[ ( @@ -137,7 +147,7 @@ where // TODO: prune the stale mappings. ( (CONSENSUS_HASH, domain_hash).encode().as_slice(), - consensus_hash.encode().as_slice(), + consensus_hashes.encode().as_slice(), ), ( block_number_key.as_slice(), @@ -155,25 +165,6 @@ where ) } -/// Load the execution receipt for given domain block hash. -pub(super) fn load_execution_receipt_by_domain_hash( - backend: &Backend, - domain_hash: Block::Hash, -) -> ClientResult>> -where - Backend: AuxStore, - Block: BlockT, - CBlock: BlockT, -{ - match consensus_block_hash_for::(backend, domain_hash)? { - Some(consensus_block_hash) => load_decode( - backend, - execution_receipt_key(consensus_block_hash).as_slice(), - ), - None => Ok(None), - } -} - /// Load the execution receipt for given consensus block hash. pub(super) fn load_execution_receipt( backend: &Backend, @@ -254,13 +245,16 @@ where pub(super) fn consensus_block_hash_for( backend: &Backend, domain_hash: Hash, -) -> ClientResult> +) -> ClientResult> where Backend: AuxStore, Hash: Encode, PHash: Decode, { - load_decode(backend, (CONSENSUS_HASH, domain_hash).encode().as_slice()) + Ok( + load_decode(backend, (CONSENSUS_HASH, domain_hash).encode().as_slice())? + .unwrap_or_default(), + ) } // TODO: Unlock once domain test infra is workable again. diff --git a/domains/client/domain-operator/src/domain_block_processor.rs b/domains/client/domain-operator/src/domain_block_processor.rs index 47c48d47d5..9ddab279a1 100644 --- a/domains/client/domain-operator/src/domain_block_processor.rs +++ b/domains/client/domain-operator/src/domain_block_processor.rs @@ -341,15 +341,12 @@ where *genesis_header.state_root(), ) } else { - crate::aux_schema::load_execution_receipt_by_domain_hash::<_, Block, CBlock>( + crate::load_execution_receipt_by_domain_hash::( &*self.client, + &self.consensus_client, parent_hash, + parent_number, )? - .ok_or_else(|| { - sp_blockchain::Error::Backend(format!( - "Receipt of domain block #{parent_number},{parent_hash} not found" - )) - })? }; let execution_receipt = ExecutionReceipt { diff --git a/domains/client/domain-operator/src/domain_bundle_producer.rs b/domains/client/domain-operator/src/domain_bundle_producer.rs index 02f212dc11..85c1e53e1c 100644 --- a/domains/client/domain-operator/src/domain_bundle_producer.rs +++ b/domains/client/domain-operator/src/domain_bundle_producer.rs @@ -149,12 +149,12 @@ where global_randomness, } = slot_info; - let best_receipt_is_written = crate::aux_schema::consensus_block_hash_for::< + let best_receipt_is_written = !crate::aux_schema::consensus_block_hash_for::< _, _, CBlock::Hash, >(&*self.client, self.client.info().best_hash)? - .is_some(); + .is_empty(); // TODO: remove once the receipt generation can be done before the domain block is // committed to the database, in other words, only when the receipt of block N+1 has diff --git a/domains/client/domain-operator/src/domain_bundle_proposer.rs b/domains/client/domain-operator/src/domain_bundle_proposer.rs index fe3ab649ac..b16284c635 100644 --- a/domains/client/domain-operator/src/domain_bundle_proposer.rs +++ b/domains/client/domain-operator/src/domain_bundle_proposer.rs @@ -188,12 +188,12 @@ where "Collecting receipts at {parent_chain_block_hash:?}" ); - let header_block_receipt_is_written = crate::aux_schema::consensus_block_hash_for::< + let header_block_receipt_is_written = !crate::aux_schema::consensus_block_hash_for::< _, _, CBlock::Hash, >(&*self.client, header_hash)? - .is_some(); + .is_empty(); // TODO: remove once the receipt generation can be done before the domain block is // committed to the database, in other words, only when the receipt of block N+1 has @@ -229,14 +229,11 @@ where )) })?; - crate::aux_schema::load_execution_receipt_by_domain_hash::<_, Block, CBlock>( + crate::load_execution_receipt_by_domain_hash::( &*self.client, + &self.consensus_client, domain_hash, - )? - .ok_or_else(|| { - sp_blockchain::Error::Backend(format!( - "Receipt of domain block #{receipt_number},{domain_hash} not found" - )) - }) + receipt_number, + ) } } diff --git a/domains/client/domain-operator/src/lib.rs b/domains/client/domain-operator/src/lib.rs index 54a26e5af3..f66ec5eae5 100644 --- a/domains/client/domain-operator/src/lib.rs +++ b/domains/client/domain-operator/src/lib.rs @@ -84,7 +84,7 @@ pub use self::utils::{DomainBlockImportNotification, DomainImportNotifications}; use crate::utils::BlockInfo; use futures::channel::mpsc; use futures::Stream; -use sc_client_api::BlockImportNotification; +use sc_client_api::{AuxStore, BlockImportNotification}; use sc_utils::mpsc::TracingUnboundedSender; use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; @@ -239,3 +239,57 @@ where Ok(leaves.into_iter().rev().take(MAX_ACTIVE_LEAVES).collect()) } + +pub(crate) fn load_execution_receipt_by_domain_hash( + domain_client: &Client, + consensus_client: &Arc, + domain_hash: Block::Hash, + domain_number: NumberFor, +) -> Result, sp_blockchain::Error> +where + Block: BlockT, + CBlock: BlockT, + Client: AuxStore, + CClient: HeaderBackend, +{ + let not_found_error = || { + sp_blockchain::Error::Backend(format!( + "Receipt for domain block {domain_hash}#{domain_number} not found" + )) + }; + + // Get all the consensus blocks that mapped to `domain_hash` + let consensus_block_hashes = crate::aux_schema::consensus_block_hash_for::<_, _, CBlock::Hash>( + domain_client, + domain_hash, + )?; + + // Get the consensus block that is in the current canonical consensus chain + let consensus_block_hash = match consensus_block_hashes.len() { + 0 => return Err(not_found_error()), + 1 => consensus_block_hashes[0], + _ => { + let mut canonical_consensus_hash = None; + for hash in consensus_block_hashes { + // Check if `hash` is in the canonical chain + let block_number = consensus_client.number(hash)?.ok_or_else(not_found_error)?; + let canonical_block_hash = consensus_client + .hash(block_number)? + .ok_or_else(not_found_error)?; + + if canonical_block_hash == hash { + canonical_consensus_hash.replace(hash); + break; + } + } + canonical_consensus_hash.ok_or_else(not_found_error)? + } + }; + + // Get receipt by consensus block hash + crate::aux_schema::load_execution_receipt::<_, Block, CBlock>( + domain_client, + consensus_block_hash, + )? + .ok_or_else(not_found_error) +} diff --git a/domains/client/domain-operator/src/tests.rs b/domains/client/domain-operator/src/tests.rs index 81ea27cbfb..a4bbfce93f 100644 --- a/domains/client/domain-operator/src/tests.rs +++ b/domains/client/domain-operator/src/tests.rs @@ -20,9 +20,10 @@ use sp_domains::fraud_proof::{ExecutionPhase, FraudProof, InvalidStateTransition use sp_domains::transaction::InvalidTransactionCode; use sp_domains::{Bundle, DomainId, DomainsApi}; use sp_runtime::generic::{BlockId, Digest, DigestItem}; -use sp_runtime::traits::{BlakeTwo256, Header as HeaderT}; +use sp_runtime::traits::{BlakeTwo256, Block as BlockT, Header as HeaderT}; use sp_runtime::OpaqueExtrinsic; use subspace_fraud_proof::invalid_state_transition_proof::ExecutionProver; +use subspace_runtime_primitives::opaque::Block as CBlock; use subspace_runtime_primitives::Balance; use subspace_test_service::{ produce_block_with, produce_blocks, produce_blocks_until, MockConsensusNode, @@ -1816,3 +1817,96 @@ async fn test_restart_domain_operator() { assert_eq!(ferdie.client.info().best_number, 11); assert_eq!(alice.client.info().best_number, 10); } + +#[substrate_test_utils::test(flavor = "multi_thread")] +async fn test_multiple_consensus_blocks_derive_same_domain_block() { + let directory = TempDir::new().expect("Must be able to create temporary directory"); + + let mut builder = sc_cli::LoggerBuilder::new(""); + builder.with_colors(false); + let _ = builder.init(); + + let tokio_handle = tokio::runtime::Handle::current(); + + // Start Ferdie + let mut ferdie = MockConsensusNode::run( + tokio_handle.clone(), + Ferdie, + BasePath::new(directory.path().join("ferdie")), + ); + + // Produce 1 consensus block to initialize genesis domain + ferdie.produce_block_with_slot(1.into()).await.unwrap(); + + // Run Alice (a evm domain authority node) + let mut alice = domain_test_service::DomainNodeBuilder::new( + tokio_handle.clone(), + Alice, + BasePath::new(directory.path().join("alice")), + ) + .build_evm_node(Role::Authority, GENESIS_DOMAIN_ID, &mut ferdie) + .await; + + produce_blocks!(ferdie, alice, 3).await.unwrap(); + let common_block_hash = ferdie.client.info().best_hash; + let bundle_to_tx = |opaque_bundle| { + subspace_test_runtime::UncheckedExtrinsic::new_unsigned( + pallet_domains::Call::submit_bundle { opaque_bundle }.into(), + ) + .into() + }; + + // Fork A + let (slot, bundle) = ferdie.produce_slot_and_wait_for_bundle_submission().await; + // Include one more extrinsic in fork A such that we can have a different consensus block + let remark_tx = subspace_test_runtime::UncheckedExtrinsic::new_unsigned( + frame_system::Call::remark { remark: vec![0; 8] }.into(), + ) + .into(); + let consensus_block_hash_fork_a = ferdie + .produce_block_with_slot_at( + slot, + common_block_hash, + Some(vec![bundle_to_tx(bundle.clone().unwrap()), remark_tx]), + ) + .await + .unwrap(); + + // Fork B + let consensus_block_hash_fork_b = ferdie + .produce_block_with_slot_at( + slot, + common_block_hash, + Some(vec![bundle_to_tx(bundle.unwrap())]), + ) + .await + .unwrap(); + + // The same domain block mapped to 2 different consensus blocks + let consensus_best_hashes = crate::aux_schema::consensus_block_hash_for::< + _, + _, + ::Hash, + >(&*alice.client, alice.client.info().best_hash) + .unwrap(); + assert_eq!( + consensus_best_hashes, + vec![consensus_block_hash_fork_a, consensus_block_hash_fork_b] + ); + assert_ne!(consensus_block_hash_fork_a, consensus_block_hash_fork_b); + + // Produce one more block at fork A to make it the canonical chain and the operator + // should submit the ER of fork A + let (slot, bundle) = ferdie.produce_slot_and_wait_for_bundle_submission().await; + ferdie + .produce_block_with_slot_at(slot, consensus_block_hash_fork_a, None) + .await + .unwrap(); + assert_eq!( + bundle.unwrap().into_receipt().consensus_block_hash, + consensus_block_hash_fork_a + ); + + // Simply produce more block + produce_blocks!(ferdie, alice, 3).await.unwrap(); +}