From 596f62312e909ff50a4c98f37ac7d1e39c8da5a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksandar=20Terenti=C4=87?= Date: Tue, 23 Apr 2024 16:39:57 +0200 Subject: [PATCH] Move providers to separate module. --- src/network/p2p.rs | 2 + src/network/p2p/kad_mem_providers.rs | 129 +++++++++++++++++++++++++++ src/network/p2p/kad_mem_store.rs | 102 ++++----------------- src/types.rs | 9 +- 4 files changed, 151 insertions(+), 91 deletions(-) create mode 100644 src/network/p2p/kad_mem_providers.rs diff --git a/src/network/p2p.rs b/src/network/p2p.rs index 6fff46aab..07e4801af 100644 --- a/src/network/p2p.rs +++ b/src/network/p2p.rs @@ -19,11 +19,13 @@ use tracing::info; pub mod analyzer; mod client; mod event_loop; +mod kad_mem_providers; mod kad_mem_store; use crate::types::{LibP2PConfig, SecretKey}; pub use client::Client; pub use event_loop::EventLoop; +pub use kad_mem_providers::ProvidersConfig; pub use kad_mem_store::MemoryStoreConfig; use self::{client::BlockStat, kad_mem_store::MemoryStore}; diff --git a/src/network/p2p/kad_mem_providers.rs b/src/network/p2p/kad_mem_providers.rs new file mode 100644 index 000000000..62cdf9162 --- /dev/null +++ b/src/network/p2p/kad_mem_providers.rs @@ -0,0 +1,129 @@ +use libp2p::identity::PeerId; +use libp2p::kad::store::{Error, Result}; +use libp2p::kad::{KBucketKey, ProviderRecord, RecordKey, K_VALUE}; +use smallvec::SmallVec; +use std::borrow::Cow; +use std::collections::{hash_map, hash_set, HashMap, HashSet}; +use std::iter; + +#[derive(Clone, Debug)] +pub struct ProvidersConfig { + /// The maximum number of providers stored for a key. + /// + /// This should match up with the chosen replication factor. + pub max_providers_per_key: usize, + /// The maximum number of provider records for which the + /// local node is the provider. + pub max_provided_keys: usize, +} + +impl Default for ProvidersConfig { + // Default values kept in line with libp2p + fn default() -> Self { + Self { + max_provided_keys: 1024, + max_providers_per_key: K_VALUE.get(), + } + } +} + +pub struct Providers { + /// Providers configuration + config: ProvidersConfig, + /// The stored provider records. + providers: HashMap>, + /// The set of all provider records for the node identified by `local_key`. + /// Must be kept in sync with `providers`. + provided: HashSet, +} + +pub type ProviderIter<'a> = iter::Map< + hash_set::Iter<'a, ProviderRecord>, + fn(&'a ProviderRecord) -> Cow<'a, ProviderRecord>, +>; + +impl Providers { + pub fn with_config(config: ProvidersConfig) -> Self { + Providers { + config, + providers: Default::default(), + provided: Default::default(), + } + } + + pub fn add_provider( + &mut self, + local_key: KBucketKey, + record: ProviderRecord, + ) -> Result<()> { + let num_keys = self.providers.len(); + + // Obtain the entry + let providers = match self.providers.entry(record.key.clone()) { + e @ hash_map::Entry::Occupied(_) => e, + e @ hash_map::Entry::Vacant(_) => { + if self.config.max_provided_keys == num_keys { + return Err(Error::MaxProvidedKeys); + } + e + }, + } + .or_insert_with(Default::default); + + if let Some(i) = providers.iter().position(|p| p.provider == record.provider) { + // In-place update of an existing provider record. + providers.as_mut()[i] = record; + } else { + // It is a new provider record for that key. + let key = KBucketKey::new(record.key.clone()); + let provider = KBucketKey::from(record.provider); + if let Some(i) = providers.iter().position(|p| { + let pk = KBucketKey::from(p.provider); + provider.distance(&key) < pk.distance(&key) + }) { + // Insert the new provider. + if local_key.preimage() == &record.provider { + self.provided.insert(record.clone()); + } + providers.insert(i, record); + // Remove the excess provider, if any. + if providers.len() > self.config.max_providers_per_key { + if let Some(p) = providers.pop() { + self.provided.remove(&p); + } + } + } else if providers.len() < self.config.max_providers_per_key { + // The distance of the new provider to the key is larger than + // the distance of any existing provider, but there is still room. + if local_key.preimage() == &record.provider { + self.provided.insert(record.clone()); + } + providers.push(record); + } + } + Ok(()) + } + + pub fn providers(&self, key: &RecordKey) -> Vec { + self.providers + .get(key) + .map_or_else(Vec::new, |ps| ps.clone().into_vec()) + } + + pub fn provided(&self) -> ProviderIter<'_> { + self.provided.iter().map(Cow::Borrowed) + } + + pub fn remove_provider(&mut self, key: &RecordKey, provider: &PeerId) { + if let hash_map::Entry::Occupied(mut e) = self.providers.entry(key.clone()) { + let providers = e.get_mut(); + if let Some(i) = providers.iter().position(|p| &p.provider == provider) { + let p = providers.remove(i); + self.provided.remove(&p); + } + if providers.is_empty() { + e.remove(); + } + } + } +} diff --git a/src/network/p2p/kad_mem_store.rs b/src/network/p2p/kad_mem_store.rs index 6c8cd3b6a..d9174b8dd 100644 --- a/src/network/p2p/kad_mem_store.rs +++ b/src/network/p2p/kad_mem_store.rs @@ -18,12 +18,12 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use super::kad_mem_providers::{ProviderIter, Providers, ProvidersConfig}; use libp2p::identity::PeerId; use libp2p::kad::store::{Error, RecordStore, Result}; -use libp2p::kad::{KBucketKey, ProviderRecord, Record, RecordKey, K_VALUE}; -use smallvec::SmallVec; +use libp2p::kad::{KBucketKey, ProviderRecord, Record, RecordKey}; use std::borrow::Cow; -use std::collections::{hash_map, hash_set, HashMap, HashSet}; +use std::collections::{hash_map, HashMap}; use std::iter; use tracing::trace; @@ -36,11 +36,7 @@ pub struct MemoryStore { /// The stored (regular) records. records: HashMap, /// The stored provider records. - providers: HashMap>, - /// The set of all provider records for the node identified by `local_key`. - /// - /// Must be kept in sync with `providers`. - provided: HashSet, + providers: Providers, } /// Configuration for a `MemoryStore`. @@ -50,13 +46,7 @@ pub struct MemoryStoreConfig { pub max_records: usize, /// The maximum size of record values, in bytes. pub max_value_bytes: usize, - /// The maximum number of providers stored for a key. - /// - /// This should match up with the chosen replication factor. - pub max_providers_per_key: usize, - /// The maximum number of provider records for which the - /// local node is the provider. - pub max_provided_keys: usize, + pub providers: ProvidersConfig, } impl Default for MemoryStoreConfig { @@ -65,8 +55,7 @@ impl Default for MemoryStoreConfig { Self { max_records: 1024, max_value_bytes: 65 * 1024, - max_provided_keys: 1024, - max_providers_per_key: K_VALUE.get(), + providers: Default::default(), } } } @@ -82,10 +71,9 @@ impl MemoryStore { pub fn with_config(local_id: PeerId, config: MemoryStoreConfig) -> Self { MemoryStore { local_key: KBucketKey::from(local_id), - config, records: HashMap::default(), - provided: HashSet::default(), - providers: HashMap::default(), + providers: Providers::with_config(config.providers.clone()), + config, } } @@ -114,10 +102,7 @@ impl RecordStore for MemoryStore { type RecordsIter<'a> = iter::Map, fn(&'a Record) -> Cow<'a, Record>>; - type ProvidedIter<'a> = iter::Map< - hash_set::Iter<'a, ProviderRecord>, - fn(&'a ProviderRecord) -> Cow<'a, ProviderRecord>, - >; + type ProvidedIter<'a> = ProviderIter<'a>; fn get(&self, k: &RecordKey) -> Option> { self.records.get(k).map(Cow::Borrowed) @@ -154,76 +139,19 @@ impl RecordStore for MemoryStore { } fn add_provider(&mut self, record: ProviderRecord) -> Result<()> { - let num_keys = self.providers.len(); - - // Obtain the entry - let providers = match self.providers.entry(record.key.clone()) { - e @ hash_map::Entry::Occupied(_) => e, - e @ hash_map::Entry::Vacant(_) => { - if self.config.max_provided_keys == num_keys { - return Err(Error::MaxProvidedKeys); - } - e - }, - } - .or_insert_with(Default::default); - - if let Some(i) = providers.iter().position(|p| p.provider == record.provider) { - // In-place update of an existing provider record. - providers.as_mut()[i] = record; - } else { - // It is a new provider record for that key. - let local_key = self.local_key.clone(); - let key = KBucketKey::new(record.key.clone()); - let provider = KBucketKey::from(record.provider); - if let Some(i) = providers.iter().position(|p| { - let pk = KBucketKey::from(p.provider); - provider.distance(&key) < pk.distance(&key) - }) { - // Insert the new provider. - if local_key.preimage() == &record.provider { - self.provided.insert(record.clone()); - } - providers.insert(i, record); - // Remove the excess provider, if any. - if providers.len() > self.config.max_providers_per_key { - if let Some(p) = providers.pop() { - self.provided.remove(&p); - } - } - } else if providers.len() < self.config.max_providers_per_key { - // The distance of the new provider to the key is larger than - // the distance of any existing provider, but there is still room. - if local_key.preimage() == &record.provider { - self.provided.insert(record.clone()); - } - providers.push(record); - } - } - Ok(()) + self.providers.add_provider(self.local_key.clone(), record) } fn providers(&self, key: &RecordKey) -> Vec { - self.providers - .get(key) - .map_or_else(Vec::new, |ps| ps.clone().into_vec()) + self.providers.providers(key) } fn provided(&self) -> Self::ProvidedIter<'_> { - self.provided.iter().map(Cow::Borrowed) + self.providers.provided() } fn remove_provider(&mut self, key: &RecordKey, provider: &PeerId) { - if let hash_map::Entry::Occupied(mut e) = self.providers.entry(key.clone()) { - let providers = e.get_mut(); - if let Some(i) = providers.iter().position(|p| &p.provider == provider) { - let p = providers.remove(i); - self.provided.remove(&p); - } - if providers.is_empty() { - e.remove(); - } - } + self.providers.remove_provider(key, provider) } } @@ -336,7 +264,7 @@ mod tests { } records.sort_by_key(distance); - records.truncate(store.config.max_providers_per_key); + records.truncate(store.config.providers.max_providers_per_key); assert!(records == store.providers(&key).to_vec()) } } @@ -372,7 +300,7 @@ mod tests { #[test] fn max_provided_keys() { let mut store = MemoryStore::new(PeerId::random()); - for _ in 0..store.config.max_provided_keys { + for _ in 0..store.config.providers.max_provided_keys { let key = random_multihash(); let prv = PeerId::random(); let rec = ProviderRecord::new(key, prv, Vec::new()); diff --git a/src/types.rs b/src/types.rs index 36f721977..bc3567361 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,6 +1,5 @@ //! Shared light client structs and enums. - -use crate::network::p2p::MemoryStoreConfig; +use crate::network::p2p::{MemoryStoreConfig, ProvidersConfig}; use crate::network::rpc::{Event, Node as RpcNode}; use crate::utils::{extract_app_lookup, extract_kate}; use avail_core::DataLookup; @@ -606,8 +605,10 @@ impl From<&LibP2PConfig> for MemoryStoreConfig { MemoryStoreConfig { max_records: cfg.kademlia.max_kad_record_number, // ~2hrs max_value_bytes: cfg.kademlia.max_kad_record_size + 1, - max_providers_per_key: usize::from(cfg.kademlia.record_replication_factor), // Needs to match the replication factor, per libp2p docs - max_provided_keys: cfg.kademlia.max_kad_provided_keys, + providers: ProvidersConfig { + max_providers_per_key: usize::from(cfg.kademlia.record_replication_factor), // Needs to match the replication factor, per libp2p docs + max_provided_keys: cfg.kademlia.max_kad_provided_keys, + }, } } }