Skip to content

Commit

Permalink
Move providers to separate module.
Browse files Browse the repository at this point in the history
  • Loading branch information
aterentic-ethernal committed Apr 23, 2024
1 parent aceff25 commit 596f623
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 91 deletions.
2 changes: 2 additions & 0 deletions src/network/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ use tracing::info;
pub mod analyzer;
mod client;
mod event_loop;
mod kad_mem_providers;
mod kad_mem_store;

use crate::types::{LibP2PConfig, SecretKey};
pub use client::Client;
pub use event_loop::EventLoop;
pub use kad_mem_providers::ProvidersConfig;
pub use kad_mem_store::MemoryStoreConfig;

use self::{client::BlockStat, kad_mem_store::MemoryStore};
Expand Down
129 changes: 129 additions & 0 deletions src/network/p2p/kad_mem_providers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use libp2p::identity::PeerId;
use libp2p::kad::store::{Error, Result};
use libp2p::kad::{KBucketKey, ProviderRecord, RecordKey, K_VALUE};
use smallvec::SmallVec;
use std::borrow::Cow;
use std::collections::{hash_map, hash_set, HashMap, HashSet};
use std::iter;

#[derive(Clone, Debug)]
pub struct ProvidersConfig {
/// The maximum number of providers stored for a key.
///
/// This should match up with the chosen replication factor.
pub max_providers_per_key: usize,
/// The maximum number of provider records for which the
/// local node is the provider.
pub max_provided_keys: usize,
}

impl Default for ProvidersConfig {
// Default values kept in line with libp2p
fn default() -> Self {
Self {
max_provided_keys: 1024,
max_providers_per_key: K_VALUE.get(),
}
}
}

pub struct Providers {
/// Providers configuration
config: ProvidersConfig,
/// The stored provider records.
providers: HashMap<RecordKey, SmallVec<[ProviderRecord; K_VALUE.get()]>>,
/// The set of all provider records for the node identified by `local_key`.
/// Must be kept in sync with `providers`.
provided: HashSet<ProviderRecord>,
}

pub type ProviderIter<'a> = iter::Map<
hash_set::Iter<'a, ProviderRecord>,
fn(&'a ProviderRecord) -> Cow<'a, ProviderRecord>,
>;

impl Providers {
pub fn with_config(config: ProvidersConfig) -> Self {
Providers {
config,
providers: Default::default(),
provided: Default::default(),
}
}

pub fn add_provider(
&mut self,
local_key: KBucketKey<PeerId>,
record: ProviderRecord,
) -> Result<()> {
let num_keys = self.providers.len();

// Obtain the entry
let providers = match self.providers.entry(record.key.clone()) {
e @ hash_map::Entry::Occupied(_) => e,
e @ hash_map::Entry::Vacant(_) => {
if self.config.max_provided_keys == num_keys {
return Err(Error::MaxProvidedKeys);
}
e
},
}
.or_insert_with(Default::default);

if let Some(i) = providers.iter().position(|p| p.provider == record.provider) {
// In-place update of an existing provider record.
providers.as_mut()[i] = record;
} else {
// It is a new provider record for that key.
let key = KBucketKey::new(record.key.clone());
let provider = KBucketKey::from(record.provider);
if let Some(i) = providers.iter().position(|p| {
let pk = KBucketKey::from(p.provider);
provider.distance(&key) < pk.distance(&key)
}) {
// Insert the new provider.
if local_key.preimage() == &record.provider {
self.provided.insert(record.clone());
}
providers.insert(i, record);
// Remove the excess provider, if any.
if providers.len() > self.config.max_providers_per_key {
if let Some(p) = providers.pop() {
self.provided.remove(&p);
}
}
} else if providers.len() < self.config.max_providers_per_key {
// The distance of the new provider to the key is larger than
// the distance of any existing provider, but there is still room.
if local_key.preimage() == &record.provider {
self.provided.insert(record.clone());
}
providers.push(record);
}
}
Ok(())
}

pub fn providers(&self, key: &RecordKey) -> Vec<ProviderRecord> {
self.providers
.get(key)
.map_or_else(Vec::new, |ps| ps.clone().into_vec())
}

pub fn provided(&self) -> ProviderIter<'_> {
self.provided.iter().map(Cow::Borrowed)
}

pub fn remove_provider(&mut self, key: &RecordKey, provider: &PeerId) {
if let hash_map::Entry::Occupied(mut e) = self.providers.entry(key.clone()) {
let providers = e.get_mut();
if let Some(i) = providers.iter().position(|p| &p.provider == provider) {
let p = providers.remove(i);
self.provided.remove(&p);
}
if providers.is_empty() {
e.remove();
}
}
}
}
102 changes: 15 additions & 87 deletions src/network/p2p/kad_mem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use super::kad_mem_providers::{ProviderIter, Providers, ProvidersConfig};
use libp2p::identity::PeerId;
use libp2p::kad::store::{Error, RecordStore, Result};
use libp2p::kad::{KBucketKey, ProviderRecord, Record, RecordKey, K_VALUE};
use smallvec::SmallVec;
use libp2p::kad::{KBucketKey, ProviderRecord, Record, RecordKey};
use std::borrow::Cow;
use std::collections::{hash_map, hash_set, HashMap, HashSet};
use std::collections::{hash_map, HashMap};
use std::iter;
use tracing::trace;

Expand All @@ -36,11 +36,7 @@ pub struct MemoryStore {
/// The stored (regular) records.
records: HashMap<RecordKey, Record>,
/// The stored provider records.
providers: HashMap<RecordKey, SmallVec<[ProviderRecord; K_VALUE.get()]>>,
/// The set of all provider records for the node identified by `local_key`.
///
/// Must be kept in sync with `providers`.
provided: HashSet<ProviderRecord>,
providers: Providers,
}

/// Configuration for a `MemoryStore`.
Expand All @@ -50,13 +46,7 @@ pub struct MemoryStoreConfig {
pub max_records: usize,
/// The maximum size of record values, in bytes.
pub max_value_bytes: usize,
/// The maximum number of providers stored for a key.
///
/// This should match up with the chosen replication factor.
pub max_providers_per_key: usize,
/// The maximum number of provider records for which the
/// local node is the provider.
pub max_provided_keys: usize,
pub providers: ProvidersConfig,
}

impl Default for MemoryStoreConfig {
Expand All @@ -65,8 +55,7 @@ impl Default for MemoryStoreConfig {
Self {
max_records: 1024,
max_value_bytes: 65 * 1024,
max_provided_keys: 1024,
max_providers_per_key: K_VALUE.get(),
providers: Default::default(),
}
}
}
Expand All @@ -82,10 +71,9 @@ impl MemoryStore {
pub fn with_config(local_id: PeerId, config: MemoryStoreConfig) -> Self {
MemoryStore {
local_key: KBucketKey::from(local_id),
config,
records: HashMap::default(),
provided: HashSet::default(),
providers: HashMap::default(),
providers: Providers::with_config(config.providers.clone()),
config,
}
}

Expand Down Expand Up @@ -114,10 +102,7 @@ impl RecordStore for MemoryStore {
type RecordsIter<'a> =
iter::Map<hash_map::Values<'a, RecordKey, Record>, fn(&'a Record) -> Cow<'a, Record>>;

type ProvidedIter<'a> = iter::Map<
hash_set::Iter<'a, ProviderRecord>,
fn(&'a ProviderRecord) -> Cow<'a, ProviderRecord>,
>;
type ProvidedIter<'a> = ProviderIter<'a>;

fn get(&self, k: &RecordKey) -> Option<Cow<'_, Record>> {
self.records.get(k).map(Cow::Borrowed)
Expand Down Expand Up @@ -154,76 +139,19 @@ impl RecordStore for MemoryStore {
}

fn add_provider(&mut self, record: ProviderRecord) -> Result<()> {
let num_keys = self.providers.len();

// Obtain the entry
let providers = match self.providers.entry(record.key.clone()) {
e @ hash_map::Entry::Occupied(_) => e,
e @ hash_map::Entry::Vacant(_) => {
if self.config.max_provided_keys == num_keys {
return Err(Error::MaxProvidedKeys);
}
e
},
}
.or_insert_with(Default::default);

if let Some(i) = providers.iter().position(|p| p.provider == record.provider) {
// In-place update of an existing provider record.
providers.as_mut()[i] = record;
} else {
// It is a new provider record for that key.
let local_key = self.local_key.clone();
let key = KBucketKey::new(record.key.clone());
let provider = KBucketKey::from(record.provider);
if let Some(i) = providers.iter().position(|p| {
let pk = KBucketKey::from(p.provider);
provider.distance(&key) < pk.distance(&key)
}) {
// Insert the new provider.
if local_key.preimage() == &record.provider {
self.provided.insert(record.clone());
}
providers.insert(i, record);
// Remove the excess provider, if any.
if providers.len() > self.config.max_providers_per_key {
if let Some(p) = providers.pop() {
self.provided.remove(&p);
}
}
} else if providers.len() < self.config.max_providers_per_key {
// The distance of the new provider to the key is larger than
// the distance of any existing provider, but there is still room.
if local_key.preimage() == &record.provider {
self.provided.insert(record.clone());
}
providers.push(record);
}
}
Ok(())
self.providers.add_provider(self.local_key.clone(), record)
}

fn providers(&self, key: &RecordKey) -> Vec<ProviderRecord> {
self.providers
.get(key)
.map_or_else(Vec::new, |ps| ps.clone().into_vec())
self.providers.providers(key)
}

fn provided(&self) -> Self::ProvidedIter<'_> {
self.provided.iter().map(Cow::Borrowed)
self.providers.provided()
}

fn remove_provider(&mut self, key: &RecordKey, provider: &PeerId) {
if let hash_map::Entry::Occupied(mut e) = self.providers.entry(key.clone()) {
let providers = e.get_mut();
if let Some(i) = providers.iter().position(|p| &p.provider == provider) {
let p = providers.remove(i);
self.provided.remove(&p);
}
if providers.is_empty() {
e.remove();
}
}
self.providers.remove_provider(key, provider)
}
}

Expand Down Expand Up @@ -336,7 +264,7 @@ mod tests {
}

records.sort_by_key(distance);
records.truncate(store.config.max_providers_per_key);
records.truncate(store.config.providers.max_providers_per_key);
assert!(records == store.providers(&key).to_vec())
}
}
Expand Down Expand Up @@ -372,7 +300,7 @@ mod tests {
#[test]
fn max_provided_keys() {
let mut store = MemoryStore::new(PeerId::random());
for _ in 0..store.config.max_provided_keys {
for _ in 0..store.config.providers.max_provided_keys {
let key = random_multihash();
let prv = PeerId::random();
let rec = ProviderRecord::new(key, prv, Vec::new());
Expand Down
9 changes: 5 additions & 4 deletions src/types.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Shared light client structs and enums.

use crate::network::p2p::MemoryStoreConfig;
use crate::network::p2p::{MemoryStoreConfig, ProvidersConfig};
use crate::network::rpc::{Event, Node as RpcNode};
use crate::utils::{extract_app_lookup, extract_kate};
use avail_core::DataLookup;
Expand Down Expand Up @@ -606,8 +605,10 @@ impl From<&LibP2PConfig> for MemoryStoreConfig {
MemoryStoreConfig {
max_records: cfg.kademlia.max_kad_record_number, // ~2hrs
max_value_bytes: cfg.kademlia.max_kad_record_size + 1,
max_providers_per_key: usize::from(cfg.kademlia.record_replication_factor), // Needs to match the replication factor, per libp2p docs
max_provided_keys: cfg.kademlia.max_kad_provided_keys,
providers: ProvidersConfig {
max_providers_per_key: usize::from(cfg.kademlia.record_replication_factor), // Needs to match the replication factor, per libp2p docs
max_provided_keys: cfg.kademlia.max_kad_provided_keys,
},
}
}
}
Expand Down

0 comments on commit 596f623

Please sign in to comment.