Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
Ma233 committed Jan 12, 2024
1 parent c4715b6 commit 0179abc
Show file tree
Hide file tree
Showing 14 changed files with 236 additions and 309 deletions.
32 changes: 17 additions & 15 deletions crates/core/src/dht/chord.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Chord algorithm implement.
#![warn(missing_docs)]
use std::str::FromStr;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::MutexGuard;
Expand All @@ -25,10 +26,9 @@ use crate::dht::SuccessorReader;
use crate::dht::SuccessorWriter;
use crate::error::Error;
use crate::error::Result;
use crate::storage::KvStorageInterface;
use crate::storage::MemStorage;
use crate::storage::PersistenceStorage;
use crate::storage::PersistenceStorageReadAndWrite;
use crate::storage::PersistenceStorageRemove;

/// PeerRing is used to help a node interact with other nodes.
/// All nodes in rings network form a clockwise ring in the order of Did.
Expand All @@ -48,9 +48,9 @@ pub struct PeerRing {
/// The did of previous node on the ring.
pub predecessor: Arc<Mutex<Option<Did>>>,
/// Local storage for [ChordStorage].
pub storage: Arc<PersistenceStorage>,
pub storage: Arc<dyn KvStorageInterface<VirtualNode> + Send + Sync>,
/// Local cache for [ChordStorage].
pub cache: Arc<MemStorage<Did, VirtualNode>>,
pub cache: Arc<dyn KvStorageInterface<VirtualNode> + Send + Sync>,
}

/// Type alias is just for making the code easy to read.
Expand Down Expand Up @@ -191,7 +191,7 @@ impl PeerRing {
// for Eth address, it's 160
finger: Arc::new(Mutex::new(FingerTable::new(did, 160))),
storage: Arc::new(storage),
cache: Arc::new(MemStorage::<Did, VirtualNode>::new()),
cache: Arc::new(MemStorage::new()),
did,
}
}
Expand Down Expand Up @@ -386,7 +386,7 @@ impl<const REDUNDANT: u16> ChordStorage<PeerRingAction, REDUNDANT> for PeerRing
for vid in vid.rotate_affine(REDUNDANT) {
let maybe_act = match self.find_successor(vid) {
// Resource should be stored in current node.
Ok(PeerRingAction::Some(succ)) => match self.storage.get(&vid).await {
Ok(PeerRingAction::Some(succ)) => match self.storage.get(&vid.to_string()).await {
Ok(Some(v)) => Ok(PeerRingAction::SomeVNode(v)),
Ok(None) => {
tracing::debug!(
Expand Down Expand Up @@ -437,13 +437,13 @@ impl<const REDUNDANT: u16> ChordStorage<PeerRingAction, REDUNDANT> for PeerRing
let maybe_act = match self.find_successor(vid) {
// `vnode` should be on current node.
Ok(PeerRingAction::Some(_)) => {
let this = if let Ok(Some(this)) = self.storage.get(&vid).await {
let this = if let Ok(Some(this)) = self.storage.get(&vid.to_string()).await {
Ok(this)
} else {
op.clone().gen_default_vnode()
}?;
let vnode = this.operate(op.clone())?;
self.storage.put(&vid, &vnode).await?;
self.storage.put(&vid.to_string(), &vnode).await?;
Ok(PeerRingAction::None)
}
// `vnode` should be on other nodes.
Expand Down Expand Up @@ -472,11 +472,13 @@ impl ChordStorageSync<PeerRingAction> for PeerRing {
/// and sync them to the new successor.
async fn sync_vnode_with_successor(&self, new_successor: Did) -> Result<PeerRingAction> {
let mut data = Vec::<VirtualNode>::new();
let all_items: Vec<(Did, VirtualNode)> = self.storage.get_all().await?;
let all_items: Vec<(String, VirtualNode)> = self.storage.get_all().await?;

// Pop out all items that are not between current node and `new_successor`.
for (vid, vnode) in all_items.iter() {
if self.bias(*vid) > self.bias(new_successor) && self.storage.remove(vid).await.is_ok()
for (vid_str, vnode) in all_items.iter() {
let vid = Did::from_str(&vid_str)?;
if self.bias(vid) > self.bias(new_successor)
&& self.storage.remove(&vid_str).await.is_ok()
{
data.push(vnode.clone());
}
Expand All @@ -497,13 +499,13 @@ impl ChordStorageSync<PeerRingAction> for PeerRing {
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl ChordStorageCache<PeerRingAction> for PeerRing {
/// Cache fetched `vnode` locally.
fn local_cache_set(&self, vnode: VirtualNode) {
self.cache.set(&vnode.did.clone(), vnode);
async fn local_cache_put(&self, vnode: VirtualNode) -> Result<()> {
self.cache.put(&vnode.did.to_string(), &vnode).await
}

/// Get vnode from local cache.
fn local_cache_get(&self, vid: Did) -> Option<VirtualNode> {
self.cache.get(&vid)
async fn local_cache_get(&self, vid: Did) -> Result<Option<VirtualNode>> {
self.cache.get(&vid.to_string()).await
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/dht/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ pub trait ChordStorageSync<Action>: Chord<Action> {
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait ChordStorageCache<Action>: Chord<Action> {
/// Cache fetched resource locally.
fn local_cache_set(&self, vnode: VirtualNode);
async fn local_cache_put(&self, vnode: VirtualNode) -> Result<()>;
/// Get local cache.
fn local_cache_get(&self, vid: Did) -> Option<VirtualNode>;
async fn local_cache_get(&self, vid: Did) -> Result<Option<VirtualNode>>;
}

/// Chord online correction that inspired by Pamela Zave's work.
Expand Down
24 changes: 8 additions & 16 deletions crates/core/src/inspect.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use std::sync::Arc;

use rings_transport::core::transport::ConnectionInterface;
use serde::Deserialize;
use serde::Serialize;

use crate::dht::vnode::VirtualNode;
use crate::dht::Did;
use crate::dht::PeerRing;
use crate::dht::SuccessorReader;
use crate::storage::MemStorage;
use crate::storage::PersistenceStorage;
use crate::storage::PersistenceStorageReadAndWrite;
use crate::storage::KvStorageInterface;
use crate::swarm::Swarm;

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -54,8 +53,8 @@ impl SwarmInspect {
.collect()
};
let persistence_storage =
StorageInspect::inspect_persistence_storage(&swarm.dht().storage).await;
let cache_storage = StorageInspect::inspect_mem_storage(&swarm.dht().cache);
StorageInspect::inspect_kv_storage(swarm.dht().storage.clone()).await;
let cache_storage = StorageInspect::inspect_kv_storage(swarm.dht().cache.clone()).await;

Self {
connections,
Expand Down Expand Up @@ -105,7 +104,9 @@ impl DHTInspect {
}

impl StorageInspect {
pub async fn inspect_persistence_storage(storage: &PersistenceStorage) -> Self {
pub async fn inspect_kv_storage(
storage: Arc<dyn KvStorageInterface<VirtualNode> + Send + Sync>,
) -> Self {
Self {
items: storage
.get_all()
Expand All @@ -115,15 +116,6 @@ impl StorageInspect {
.collect(),
}
}
pub fn inspect_mem_storage(storage: &MemStorage<Did, VirtualNode>) -> Self {
Self {
items: storage
.items()
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect(),
}
}
}

pub fn compress_iter<T>(iter: impl Iterator<Item = T>) -> Vec<(T, u64, u64)>
Expand Down
15 changes: 7 additions & 8 deletions crates/core/src/message/handlers/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async fn handle_storage_fetch_act(swarm: &Swarm, act: PeerRingAction) -> Result<
match act {
PeerRingAction::None => (),
PeerRingAction::SomeVNode(v) => {
swarm.dht.local_cache_set(v);
swarm.dht.local_cache_put(v).await?;
}
PeerRingAction::RemoteAction(next, dht_act) => {
if let PeerRingRemoteAction::FindVNode(vid) = dht_act {
Expand Down Expand Up @@ -158,7 +158,7 @@ pub(super) async fn handle_storage_operate_act(
impl ChordStorageInterfaceCacheChecker for Swarm {
/// Check local cache
async fn storage_check_cache(&self, vid: Did) -> Option<VirtualNode> {
self.dht.local_cache_get(vid)
self.dht.local_cache_get(vid).await.ok().flatten()
}
}

Expand Down Expand Up @@ -229,7 +229,7 @@ impl HandleMsg<FoundVNode> for MessageHandler {
return Ok(vec![MessageHandlerEvent::ForwardPayload(ctx.clone(), None)]);
}
for data in msg.data.iter().cloned() {
self.dht.local_cache_set(data);
self.dht.local_cache_put(data).await?;
}
Ok(vec![])
}
Expand Down Expand Up @@ -277,7 +277,6 @@ mod test {
use crate::message::handlers::connection::tests::test_only_two_nodes_establish_connection;
use crate::message::Encoder;
use crate::prelude::vnode::VNodeType;
use crate::storage::PersistenceStorageOperation;
use crate::tests::default::prepare_node;

#[tokio::test]
Expand All @@ -301,8 +300,8 @@ mod test {
(node2, node1)
};

assert!(node1.dht().cache.is_empty());
assert!(node2.dht().cache.is_empty());
assert_eq!(node1.dht().cache.count().await.unwrap(), 0);
assert_eq!(node2.dht().cache.count().await.unwrap(), 0);
assert!(node1.storage_check_cache(vid).await.is_none());
assert!(node2.storage_check_cache(vid).await.is_none());

Expand Down Expand Up @@ -375,8 +374,8 @@ mod test {
(node2, node1)
};

assert!(node1.dht().cache.is_empty());
assert!(node2.dht().cache.is_empty());
assert_eq!(node1.dht().cache.count().await.unwrap(), 0);
assert_eq!(node2.dht().cache.count().await.unwrap(), 0);
assert!(node1.storage_check_cache(vid).await.is_none());
assert!(node2.storage_check_cache(vid).await.is_none());

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ pub use crate::message::ChordStorageInterface;
pub use crate::message::ChordStorageInterfaceCacheChecker;
pub use crate::message::MessageRelay;
pub use crate::message::SubringInterface;
pub use crate::storage::KvStorageInterface;
pub use crate::storage::PersistenceStorage;
pub use crate::storage::PersistenceStorageReadAndWrite;
Loading

0 comments on commit 0179abc

Please sign in to comment.