Skip to content

Commit

Permalink
Merge pull request #2960 from tediou5/feat/remove-unique-record-binar…
Browse files Browse the repository at this point in the history
…y-heap

feat: remove unique record binary heap
  • Loading branch information
nazar-pc authored Sep 3, 2024
2 parents 098abac + 9bedf25 commit 90728e1
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 439 deletions.
252 changes: 131 additions & 121 deletions crates/subspace-farmer/src/farmer_cache.rs

Large diffs are not rendered by default.

76 changes: 62 additions & 14 deletions crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use crate::farmer_cache::{CacheBackend, FarmerCacheOffset};
use std::collections::hash_map::Values;
use std::collections::{HashMap, VecDeque};
use std::collections::btree_map::Values;
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::fmt;
use std::hash::Hash;
use subspace_core_primitives::PieceIndex;
use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::libp2p::PeerId;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::KeyWithDistance;
use tracing::{debug, trace};

#[derive(Debug, Clone)]
pub(super) struct PieceCachesState<CacheIndex> {
stored_pieces: HashMap<RecordKey, FarmerCacheOffset<CacheIndex>>,
stored_pieces: BTreeMap<KeyWithDistance, FarmerCacheOffset<CacheIndex>>,
dangling_free_offsets: VecDeque<FarmerCacheOffset<CacheIndex>>,
backends: Vec<CacheBackend>,
}
Expand All @@ -21,7 +23,7 @@ where
CacheIndex: TryFrom<usize>,
{
pub(super) fn new(
stored_pieces: HashMap<RecordKey, FarmerCacheOffset<CacheIndex>>,
stored_pieces: BTreeMap<KeyWithDistance, FarmerCacheOffset<CacheIndex>>,
dangling_free_offsets: VecDeque<FarmerCacheOffset<CacheIndex>>,
backends: Vec<CacheBackend>,
) -> Self {
Expand Down Expand Up @@ -69,39 +71,39 @@ where

pub(super) fn get_stored_piece(
&self,
key: &RecordKey,
key: &KeyWithDistance,
) -> Option<&FarmerCacheOffset<CacheIndex>> {
self.stored_pieces.get(key)
}

pub(super) fn contains_stored_piece(&self, key: &RecordKey) -> bool {
pub(super) fn contains_stored_piece(&self, key: &KeyWithDistance) -> bool {
self.stored_pieces.contains_key(key)
}

pub(super) fn push_stored_piece(
&mut self,
key: RecordKey,
key: KeyWithDistance,
cache_offset: FarmerCacheOffset<CacheIndex>,
) -> Option<FarmerCacheOffset<CacheIndex>> {
self.stored_pieces.insert(key, cache_offset)
}

pub(super) fn stored_pieces_offests(
pub(super) fn stored_pieces_offsets(
&self,
) -> Values<'_, RecordKey, FarmerCacheOffset<CacheIndex>> {
) -> Values<'_, KeyWithDistance, FarmerCacheOffset<CacheIndex>> {
self.stored_pieces.values()
}

pub(super) fn remove_stored_piece(
&mut self,
key: &RecordKey,
key: &KeyWithDistance,
) -> Option<FarmerCacheOffset<CacheIndex>> {
self.stored_pieces.remove(key)
}

pub(super) fn free_unneeded_stored_pieces(
&mut self,
piece_indices_to_store: &mut HashMap<RecordKey, PieceIndex>,
piece_indices_to_store: &mut HashMap<KeyWithDistance, PieceIndex>,
) {
self.stored_pieces
.extract_if(|key, _offset| piece_indices_to_store.remove(key).is_none())
Expand All @@ -128,7 +130,7 @@ where
pub(super) fn reuse(
self,
) -> (
HashMap<RecordKey, FarmerCacheOffset<CacheIndex>>,
BTreeMap<KeyWithDistance, FarmerCacheOffset<CacheIndex>>,
VecDeque<FarmerCacheOffset<CacheIndex>>,
) {
let Self {
Expand All @@ -141,12 +143,58 @@ where
dangling_free_offsets.clear();
(stored_pieces, dangling_free_offsets)
}

pub(super) fn should_replace(
&mut self,
key: &KeyWithDistance,
) -> Option<(KeyWithDistance, FarmerCacheOffset<CacheIndex>)> {
if !self.should_include_key_internal(key) {
return None;
}

if !self.has_free_capacity() {
self.stored_pieces.pop_last()
} else {
None
}
}

pub(super) fn should_include_key(&self, peer_id: PeerId, key: PieceIndex) -> bool {
let key = KeyWithDistance::new(peer_id, key.to_multihash());
self.should_include_key_internal(&key)
}

fn has_free_capacity(&self) -> bool {
if !self.dangling_free_offsets.is_empty() {
return true;
}

self.backends.iter().any(|backend| backend.free_size() > 0)
}

fn should_include_key_internal(&self, key: &KeyWithDistance) -> bool {
if self.stored_pieces.contains_key(key) {
return false;
}

if self.has_free_capacity() {
return true;
}

let top_key = self.stored_pieces.last_key_value().map(|(key, _)| key);

if let Some(top_key) = top_key {
top_key > key
} else {
false
}
}
}

impl<CacheIndex> Default for PieceCachesState<CacheIndex> {
fn default() -> Self {
Self {
stored_pieces: HashMap::default(),
stored_pieces: BTreeMap::default(),
dangling_free_offsets: VecDeque::default(),
backends: Vec::default(),
}
Expand Down
21 changes: 20 additions & 1 deletion crates/subspace-farmer/src/farmer_cache/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::disk_piece_cache::DiskPieceCache;
use crate::farmer_cache::FarmerCache;
use crate::farmer_cache::{decode_piece_index_from_record_key, FarmerCache};
use crate::node_client::{Error, NodeClient};
use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
Expand All @@ -17,6 +17,7 @@ use subspace_core_primitives::{
};
use subspace_farmer_components::{FarmerProtocolInfo, PieceGetter};
use subspace_networking::libp2p::identity;
use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_rpc_primitives::{
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
Expand Down Expand Up @@ -436,3 +437,21 @@ async fn basic() {
farmer_cache_worker_exited.await.unwrap();
}
}

#[test]
fn decode_piece_index_from_record_key_test() {
let piece_index_0 = PieceIndex::from(0);
let record_0 = RecordKey::from(piece_index_0.to_multihash());
let decode_0 = decode_piece_index_from_record_key(&record_0);
assert_eq!(decode_0, piece_index_0);

let piece_index_123456 = PieceIndex::from(123456);
let record_123456 = RecordKey::from(piece_index_123456.to_multihash());
let decode_123456 = decode_piece_index_from_record_key(&record_123456);
assert_eq!(decode_123456, piece_index_123456);

let piece_index_max = PieceIndex::from(u64::MAX);
let record_max = RecordKey::from(piece_index_max.to_multihash());
let decode_max = decode_piece_index_from_record_key(&record_max);
assert_eq!(decode_max, piece_index_max);
}
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
duration_constructors,
exact_size_is_empty,
fmt_helpers_for_derive,
hash_extract_if,
btree_extract_if,
impl_trait_in_assoc_type,
int_roundings,
iter_collect_into,
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ pub use protocols::request_response::handlers::segment_header::{
SegmentHeaderBySegmentIndexesRequestHandler, SegmentHeaderRequest, SegmentHeaderResponse,
};
pub use shared::PeerDiscovered;
pub use utils::key_with_distance::KeyWithDistance;
pub use utils::multihash::Multihash;
pub use utils::unique_record_binary_heap::{KeyWrapper, UniqueRecordBinaryHeap};
pub use utils::PeerAddress;
2 changes: 1 addition & 1 deletion crates/subspace-networking/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! Miscellaneous utilities for networking.

pub(crate) mod key_with_distance;
pub mod multihash;
pub mod piece_provider;
pub(crate) mod rate_limiter;
pub(crate) mod unique_record_binary_heap;

use event_listener_primitives::Bag;
use futures::future::{Fuse, FusedFuture, FutureExt};
Expand Down
63 changes: 63 additions & 0 deletions crates/subspace-networking/src/utils/key_with_distance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use libp2p::kad::KBucketDistance;
pub use libp2p::kad::RecordKey;
pub use libp2p::PeerId;
use std::cmp::Ordering;
use std::hash::Hash;

type KademliaBucketKey<T> = libp2p::kad::KBucketKey<T>;

/// Helper structure. It wraps Kademlia distance to a given peer for heap-metrics.
#[derive(Debug, Clone, Eq)]
pub struct KeyWithDistance {
key: RecordKey,
distance: KBucketDistance,
}

impl KeyWithDistance {
/// Creates a new [`KeyWithDistance`] instance with the given `PeerId` and `K` key.
///
/// The `distance` is calculated as the distance between the `KademliaBucketKey` derived
/// from the `PeerId` and the `KademliaBucketKey` derived from the `K` key.
pub fn new<K>(peer_id: PeerId, key: K) -> Self
where
RecordKey: From<K>,
{
Self::new_with_record_key(peer_id, RecordKey::from(key))
}

/// Creates a new [`KeyWithDistance`] instance with the given `PeerId` and `RecordKey`.
pub fn new_with_record_key(peer_id: PeerId, key: RecordKey) -> Self {
let peer_key = KademliaBucketKey::from(peer_id);
let distance = KademliaBucketKey::new(key.as_ref()).distance(&peer_key);
Self { key, distance }
}

/// Returns a reference to the record key.
pub fn record_key(&self) -> &RecordKey {
&self.key
}
}

impl PartialEq for KeyWithDistance {
fn eq(&self, other: &Self) -> bool {
self.key == other.key
}
}

impl PartialOrd for KeyWithDistance {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for KeyWithDistance {
fn cmp(&self, other: &Self) -> Ordering {
self.distance.cmp(&other.distance)
}
}

impl Hash for KeyWithDistance {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.key.hash(state);
}
}
Loading

0 comments on commit 90728e1

Please sign in to comment.