From 41a8875069253d9a69eab2e667dd590ddbe3e541 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Mon, 21 Oct 2024 12:38:52 -0400 Subject: [PATCH 01/24] Add `replication` feature --- Cargo.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 9e2f571..723e19e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ futures = "0.3" crc32fast = "1" intmap = "2" moka = { version = "0.12", optional = true, features = ["sync"] } +tokio = { version = "1.27.0", features = ["rt", "sync"], optional = true} [target.'cfg(not(target_arch = "wasm32"))'.dependencies] random-access-disk = { version = "3", default-features = false } @@ -60,8 +61,9 @@ tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] } [features] default = ["tokio", "sparse"] +replication = ["tokio"] sparse = ["random-access-disk/sparse"] -tokio = ["random-access-disk/tokio"] +tokio = ["dep:tokio", "random-access-disk/tokio"] async-std = ["random-access-disk/async-std"] cache = ["moka"] # Used only in interoperability tests under tests/js-interop which use the javascript version of hypercore From f2572da65ba4a8ced796767a7e0dd939146ade50 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Mon, 21 Oct 2024 12:38:53 -0400 Subject: [PATCH 02/24] Add events related to replication --- src/replication/events.rs | 106 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 src/replication/events.rs diff --git a/src/replication/events.rs b/src/replication/events.rs new file mode 100644 index 0000000..dc7bd31 --- /dev/null +++ b/src/replication/events.rs @@ -0,0 +1,106 @@ +//! events related to replication +use crate::{common::BitfieldUpdate, HypercoreError}; +use tokio::sync::broadcast; + +static MAX_EVENT_QUEUE_CAPACITY: usize = 32; + +/// Event emeitted by [`Events::send_on_get`] +#[derive(Debug, Clone)] +/// Emitted when [`Hypercore::get`] is called when the block is missing. +pub struct Get { + /// Index of the requested block + pub index: u64, + /// When the block is gotten this emits an event + pub get_result: broadcast::Sender<()>, +} + +/// Emitted when +#[derive(Debug, Clone)] +pub struct DataUpgrade {} + +/// Emitted when core gets new blocks +#[derive(Debug, Clone)] +pub struct Have { + /// Starting index of the blocks we have + pub start: u64, + /// The number of blocks + pub length: u64, + /// TODO + pub drop: bool, +} + +impl From<&BitfieldUpdate> for Have { + fn from( + BitfieldUpdate { + start, + length, + drop, + }: &BitfieldUpdate, + ) -> Self { + Have { + start: *start, + length: *length, + drop: *drop, + } + } +} + +#[derive(Debug, Clone)] +/// Core events relevant to replication +pub enum Event { + /// Emmited when core.get(i) happens for a missing block + Get(Get), + /// Emmitted when data.upgrade applied + DataUpgrade(DataUpgrade), + /// Emmitted when core gets new blocks + Have(Have), +} + +/// Derive From for Enum where enum variant and msg have the same name +macro_rules! impl_from_for_enum_variant { + ($enum_name:ident, $variant_and_msg_name:ident) => { + impl From<$variant_and_msg_name> for $enum_name { + fn from(value: $variant_and_msg_name) -> Self { + $enum_name::$variant_and_msg_name(value) + } + } + }; +} + +impl_from_for_enum_variant!(Event, Get); +impl_from_for_enum_variant!(Event, DataUpgrade); +impl_from_for_enum_variant!(Event, Have); + +#[derive(Debug)] +#[cfg(feature = "tokio")] +pub(crate) struct Events { + /// Channel for core events + pub(crate) channel: broadcast::Sender, +} + +#[cfg(feature = "tokio")] +impl Events { + pub(crate) fn new() -> Self { + Self { + channel: broadcast::channel(MAX_EVENT_QUEUE_CAPACITY).0, + } + } + + /// The internal channel errors on send when no replicators are subscribed, + /// For now we don't consider that an error, but just in case, we return a Result in case + /// we want to change this or add another fail path later. + pub(crate) fn send>(&self, evt: T) -> Result<(), HypercoreError> { + let _errs_when_no_replicators_subscribed = self.channel.send(evt.into()); + Ok(()) + } + + /// Send a [`Get`] messages and return the channel associated with it. + pub(crate) fn send_on_get(&self, index: u64) -> broadcast::Receiver<()> { + let (tx, rx) = broadcast::channel(1); + let _ = self.send(Get { + index, + get_result: tx, + }); + rx + } +} From a2be0f6d472e84afcc9c1f94e0934aaea8473ec6 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Mon, 21 Oct 2024 12:38:53 -0400 Subject: [PATCH 03/24] Add traits and their impls we need for replication --- src/replication/mod.rs | 231 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 231 insertions(+) create mode 100644 src/replication/mod.rs diff --git a/src/replication/mod.rs b/src/replication/mod.rs new file mode 100644 index 0000000..8d2d751 --- /dev/null +++ b/src/replication/mod.rs @@ -0,0 +1,231 @@ +//! External interface for replication +pub mod events; + +use crate::{ + AppendOutcome, Hypercore, HypercoreError, Info, PartialKeypair, Proof, RequestBlock, + RequestSeek, RequestUpgrade, +}; + +pub use events::Event; + +use tokio::sync::{broadcast::Receiver, Mutex}; + +use std::future::Future; +use std::sync::Arc; +/// Hypercore that can have multiple owners +#[derive(Debug, Clone)] +pub struct SharedCore(pub Arc>); + +impl From for SharedCore { + fn from(core: Hypercore) -> Self { + SharedCore(Arc::new(Mutex::new(core))) + } +} +impl SharedCore { + /// Create a shared core from a [`Hypercore`] + pub fn from_hypercore(core: Hypercore) -> Self { + SharedCore(Arc::new(Mutex::new(core))) + } +} + +/// Methods related to just this core's information +pub trait CoreInfo { + /// Get core info (see: [`Hypercore::info`] + fn info(&self) -> impl Future + Send; + /// Get the key_pair (see: [`Hypercore::key_pair`] + fn key_pair(&self) -> impl Future + Send; +} + +impl CoreInfo for SharedCore { + fn info(&self) -> impl Future + Send { + async move { + let core = &self.0.lock().await; + core.info() + } + } + + fn key_pair(&self) -> impl Future + Send { + async move { + let core = &self.0.lock().await; + core.key_pair().clone() + } + } +} + +impl CoreInfo for Hypercore { + fn info(&self) -> impl Future + Send { + async move { self.info() } + } + + fn key_pair(&self) -> impl Future + Send { + async move { self.key_pair().clone() } + } +} + +/// Error for ReplicationMethods trait +#[derive(thiserror::Error, Debug)] +pub enum ReplicationMethodsError { + /// Error from hypercore + #[error("Got a hypercore error: [{0}]")] + HypercoreError(#[from] HypercoreError), +} + +/// Methods needed for replication +pub trait ReplicationMethods: CoreInfo + Send { + /// ref Core::verify_and_apply_proof + fn verify_and_apply_proof( + &self, + proof: &Proof, + ) -> impl Future> + Send; + /// ref Core::missing_nodes + fn missing_nodes( + &self, + index: u64, + ) -> impl Future> + Send; + /// ref Core::create_proof + fn create_proof( + &self, + block: Option, + hash: Option, + seek: Option, + upgrade: Option, + ) -> impl Future, ReplicationMethodsError>> + Send; + /// subscribe to core events + fn event_subscribe(&self) -> impl Future>; +} + +impl ReplicationMethods for SharedCore { + fn verify_and_apply_proof( + &self, + proof: &Proof, + ) -> impl Future> { + async move { + let mut core = self.0.lock().await; + Ok(core.verify_and_apply_proof(proof).await?) + } + } + + fn missing_nodes( + &self, + index: u64, + ) -> impl Future> { + async move { + let mut core = self.0.lock().await; + Ok(core.missing_nodes(index).await?) + } + } + + fn create_proof( + &self, + block: Option, + hash: Option, + seek: Option, + upgrade: Option, + ) -> impl Future, ReplicationMethodsError>> { + async move { + let mut core = self.0.lock().await; + Ok(core.create_proof(block, hash, seek, upgrade).await?) + } + } + + fn event_subscribe(&self) -> impl Future> { + async move { self.0.lock().await.event_subscribe() } + } +} + +/// Error for ReplicationMethods trait +#[derive(thiserror::Error, Debug)] +pub enum CoreMethodsError { + /// Error from hypercore + #[error("Got a hypercore error [{0}]")] + HypercoreError(#[from] HypercoreError), +} + +/// Trait for things that consume [`crate::Hypercore`] can instead use this trait +/// so they can use all Hypercore-like things such as [`SharedCore`]. +pub trait CoreMethods: CoreInfo { + /// Check if the core has the block at the given index locally + fn has(&self, index: u64) -> impl Future + Send; + + /// get a block + fn get( + &self, + index: u64, + ) -> impl Future>, CoreMethodsError>> + Send; + + /// Append data to the core + fn append( + &self, + data: &[u8], + ) -> impl Future> + Send; + + /// Append a batch of data to the core + fn append_batch, B: AsRef<[A]> + Send>( + &self, + batch: B, + ) -> impl Future> + Send; +} + +impl CoreMethods for SharedCore { + fn has(&self, index: u64) -> impl Future + Send { + async move { + let core = self.0.lock().await; + core.has(index) + } + } + fn get( + &self, + index: u64, + ) -> impl Future>, CoreMethodsError>> + Send { + async move { + let mut core = self.0.lock().await; + Ok(core.get(index).await?) + } + } + + fn append( + &self, + data: &[u8], + ) -> impl Future> + Send { + async move { + let mut core = self.0.lock().await; + Ok(core.append(data).await?) + } + } + + fn append_batch, B: AsRef<[A]> + Send>( + &self, + batch: B, + ) -> impl Future> + Send { + async move { + let mut core = self.0.lock().await; + Ok(core.append_batch(batch).await?) + } + } +} + +impl CoreMethods for Hypercore { + fn has(&self, index: u64) -> impl Future + Send { + async move { self.has(index) } + } + fn get( + &self, + index: u64, + ) -> impl Future>, CoreMethodsError>> + Send { + async move { Ok(self.get(index).await?) } + } + + fn append( + &self, + data: &[u8], + ) -> impl Future> + Send { + async move { Ok(self.append(data).await?) } + } + + fn append_batch, B: AsRef<[A]> + Send>( + &self, + batch: B, + ) -> impl Future> + Send { + async move { Ok(self.append_batch(batch).await?) } + } +} From f5e4002845411f86b0aefc20219b0d7f87f66d91 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Mon, 21 Oct 2024 12:38:52 -0400 Subject: [PATCH 04/24] Use replication feature within Hypercore --- src/core.rs | 57 +++++++++++++++++++++++++++++++++++++++++++++++++---- src/lib.rs | 4 +++- 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/src/core.rs b/src/core.rs index 886ff98..e966a04 100644 --- a/src/core.rs +++ b/src/core.rs @@ -48,10 +48,12 @@ pub struct Hypercore { pub(crate) bitfield: Bitfield, skip_flush_count: u8, // autoFlush in Javascript header: Header, + #[cfg(feature = "replication")] + events: crate::replication::events::Events, } /// Response from append, matches that of the Javascript result -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct AppendOutcome { /// Length of the hypercore after append pub length: u64, @@ -247,6 +249,8 @@ impl Hypercore { bitfield, header, skip_flush_count: 0, + #[cfg(feature = "replication")] + events: crate::replication::events::Events::new(), }) } @@ -321,6 +325,14 @@ impl Hypercore { if self.should_flush_bitfield_and_tree_and_oplog() { self.flush_bitfield_and_tree_and_oplog(false).await?; } + + #[cfg(feature = "replication")] + { + let _ = self.events.send(crate::replication::events::DataUpgrade {}); + let _ = self + .events + .send(crate::replication::events::Have::from(&bitfield_update)); + } } // Return the new value @@ -330,10 +342,32 @@ impl Hypercore { }) } + #[cfg(feature = "replication")] + /// Subscribe to core events relevant to replication + pub fn event_subscribe( + &self, + ) -> tokio::sync::broadcast::Receiver { + self.events.channel.subscribe() + } + + /// Check if core has the block at the given `index` locally + #[instrument(ret, skip(self))] + pub fn has(&self, index: u64) -> bool { + self.bitfield.get(index) + } + /// Read value at given index, if any. #[instrument(err, skip(self))] pub async fn get(&mut self, index: u64) -> Result>, HypercoreError> { if !self.bitfield.get(index) { + #[cfg(feature = "replication")] + // if not in this core, try to get over network + { + let mut rx = self.events.send_on_get(index); + tokio::spawn(async move { + let _err_when_no_peers = rx.recv().await; + }); + } return Ok(None); } @@ -522,12 +556,12 @@ impl Hypercore { self.storage.flush_infos(&outcome.infos_to_flush).await?; self.header = outcome.header; - if let Some(bitfield_update) = bitfield_update { + if let Some(bitfield_update) = &bitfield_update { // Write to bitfield - self.bitfield.update(&bitfield_update); + self.bitfield.update(bitfield_update); // Contiguous length is known only now - update_contiguous_length(&mut self.header, &self.bitfield, &bitfield_update); + update_contiguous_length(&mut self.header, &self.bitfield, bitfield_update); } // Commit changeset to in-memory tree @@ -537,6 +571,21 @@ impl Hypercore { if self.should_flush_bitfield_and_tree_and_oplog() { self.flush_bitfield_and_tree_and_oplog(false).await?; } + + #[cfg(feature = "replication")] + { + if proof.upgrade.is_some() { + /// Notify replicator if we receieved an upgrade + let _ = self.events.send(crate::replication::events::DataUpgrade {}); + } + + /// Notify replicator if we receieved a bitfield update + if let Some(ref bitfield) = bitfield_update { + let _ = self + .events + .send(crate::replication::events::Have::from(bitfield)); + } + } Ok(true) } diff --git a/src/lib.rs b/src/lib.rs index 24d8627..eae3b21 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -#![forbid(unsafe_code, bad_style, future_incompatible)] +#![forbid(unsafe_code, future_incompatible)] #![forbid(rust_2018_idioms, rust_2018_compatibility)] #![forbid(missing_debug_implementations)] #![forbid(missing_docs)] @@ -74,6 +74,8 @@ pub mod encoding; pub mod prelude; +#[cfg(feature = "replication")] +pub mod replication; mod bitfield; mod builder; From fdf5907e5802e62f70c2ed08e1ae680cb0f5de13 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Mon, 21 Oct 2024 12:38:52 -0400 Subject: [PATCH 05/24] Fix TODO's in docs Also fix some small lints --- src/common/node.rs | 7 ++++++- src/common/peer.rs | 11 ++++++----- src/storage/mod.rs | 6 +----- src/tree/merkle_tree_changeset.rs | 4 ++-- 4 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/common/node.rs b/src/common/node.rs index 7e339d3..1c78144 100644 --- a/src/common/node.rs +++ b/src/common/node.rs @@ -14,16 +14,21 @@ pub(crate) struct NodeByteRange { pub(crate) length: u64, } -/// Nodes that are persisted to disk. +/// Nodes of the Merkle Tree that are persisted to disk. // TODO: replace `hash: Vec` with `hash: Hash`. This requires patching / // rewriting the Blake2b crate to support `.from_bytes()` to serialize from // disk. #[derive(Debug, Clone, PartialEq, Eq)] pub struct Node { + /// This node's index in the Merkle tree pub(crate) index: u64, + /// Hash of the data in this node pub(crate) hash: Vec, + /// Number of bytes in this [`Node::data`] pub(crate) length: u64, + /// Index of this nodes parent pub(crate) parent: u64, + /// Hypercore's data. Can be receieved after the rest of the node, so it's optional. pub(crate) data: Option>, pub(crate) blank: bool, } diff --git a/src/common/peer.rs b/src/common/peer.rs index c71b981..b420317 100644 --- a/src/common/peer.rs +++ b/src/common/peer.rs @@ -1,6 +1,7 @@ //! Types needed for passing information with with peers. //! hypercore-protocol-rs uses these types and wraps them //! into wire messages. + use crate::Node; #[derive(Debug, Clone, PartialEq)] @@ -20,7 +21,7 @@ pub struct RequestSeek { } #[derive(Debug, Clone, PartialEq)] -/// Request of a DataUpgrade from peer +/// Request for a DataUpgrade from peer pub struct RequestUpgrade { /// Hypercore start index pub start: u64, @@ -79,7 +80,7 @@ pub struct DataBlock { pub index: u64, /// Data block value in bytes pub value: Vec, - /// TODO: document + /// Nodes of the merkle tree pub nodes: Vec, } @@ -104,11 +105,11 @@ pub struct DataSeek { #[derive(Debug, Clone, PartialEq)] /// TODO: Document pub struct DataUpgrade { - /// TODO: Document + /// Starting block of this upgrade response pub start: u64, - /// TODO: Document + /// Number of blocks in this upgrade response pub length: u64, - /// TODO: Document + /// The nodes of the merkle tree pub nodes: Vec, /// TODO: Document pub additional_nodes: Vec, diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 333da2b..ad4b68a 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -147,11 +147,7 @@ impl Storage { instruction.index, &buf, )), - Err(RandomAccessError::OutOfBounds { - offset: _, - end: _, - length, - }) => { + Err(RandomAccessError::OutOfBounds { length, .. }) => { if instruction.allow_miss { Ok(StoreInfo::new_content_miss( instruction.store.clone(), diff --git a/src/tree/merkle_tree_changeset.rs b/src/tree/merkle_tree_changeset.rs index be28873..9305302 100644 --- a/src/tree/merkle_tree_changeset.rs +++ b/src/tree/merkle_tree_changeset.rs @@ -10,8 +10,8 @@ use crate::{ /// first create the changes to this changeset, get out information from this to put to the oplog, /// and the commit the changeset to the tree. /// -/// This is called "MerkleTreeBatch" in Javascript, see: -/// https://github.com/hypercore-protocol/hypercore/blob/master/lib/merkle-tree.js +/// This is called "MerkleTreeBatch" in Javascript, source +/// [here](https://github.com/holepunchto/hypercore/blob/88a1a2f1ebe6e33102688225516c4e882873f710/lib/merkle-tree.js#L44). #[derive(Debug)] pub(crate) struct MerkleTreeChangeset { pub(crate) length: u64, From 1ea24d4f9b707955ef16110dd8710001bbdf73dd Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 22 Oct 2024 12:25:19 -0400 Subject: [PATCH 06/24] use reg-comments in code --- src/core.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core.rs b/src/core.rs index e966a04..9923772 100644 --- a/src/core.rs +++ b/src/core.rs @@ -575,11 +575,11 @@ impl Hypercore { #[cfg(feature = "replication")] { if proof.upgrade.is_some() { - /// Notify replicator if we receieved an upgrade + // Notify replicator if we receieved an upgrade let _ = self.events.send(crate::replication::events::DataUpgrade {}); } - /// Notify replicator if we receieved a bitfield update + // Notify replicator if we receieved a bitfield update if let Some(ref bitfield) = bitfield_update { let _ = self .events From 237d0891da5f570cfa9f2d6f6453f7593565c088 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 22 Oct 2024 15:25:19 -0400 Subject: [PATCH 07/24] rm hypercore impls --- src/replication/mod.rs | 36 ------------------------------------ 1 file changed, 36 deletions(-) diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 8d2d751..3776a63 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -52,16 +52,6 @@ impl CoreInfo for SharedCore { } } -impl CoreInfo for Hypercore { - fn info(&self) -> impl Future + Send { - async move { self.info() } - } - - fn key_pair(&self) -> impl Future + Send { - async move { self.key_pair().clone() } - } -} - /// Error for ReplicationMethods trait #[derive(thiserror::Error, Debug)] pub enum ReplicationMethodsError { @@ -203,29 +193,3 @@ impl CoreMethods for SharedCore { } } } - -impl CoreMethods for Hypercore { - fn has(&self, index: u64) -> impl Future + Send { - async move { self.has(index) } - } - fn get( - &self, - index: u64, - ) -> impl Future>, CoreMethodsError>> + Send { - async move { Ok(self.get(index).await?) } - } - - fn append( - &self, - data: &[u8], - ) -> impl Future> + Send { - async move { Ok(self.append(data).await?) } - } - - fn append_batch, B: AsRef<[A]> + Send>( - &self, - batch: B, - ) -> impl Future> + Send { - async move { Ok(self.append_batch(batch).await?) } - } -} From 4952d93d6569f1150fe8c8f3b2083e357cfc6eae Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 22 Oct 2024 14:58:25 -0400 Subject: [PATCH 08/24] rm using tokio::spawn --- src/core.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/core.rs b/src/core.rs index 9923772..249c42f 100644 --- a/src/core.rs +++ b/src/core.rs @@ -363,10 +363,7 @@ impl Hypercore { #[cfg(feature = "replication")] // if not in this core, try to get over network { - let mut rx = self.events.send_on_get(index); - tokio::spawn(async move { - let _err_when_no_peers = rx.recv().await; - }); + self.events.send_on_get(index); } return Ok(None); } From df35b1583e6a2261c24b16609409e8765adf0215 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 22 Oct 2024 12:26:38 -0400 Subject: [PATCH 09/24] Replace tokio's broadcast with async_broadcast. To make replication feature runtime agnostic --- Cargo.toml | 3 ++- src/core.rs | 6 ++---- src/replication/events.rs | 14 +++++++------- src/replication/mod.rs | 2 +- 4 files changed, 12 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 723e19e..95ca96e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ crc32fast = "1" intmap = "2" moka = { version = "0.12", optional = true, features = ["sync"] } tokio = { version = "1.27.0", features = ["rt", "sync"], optional = true} +async-broadcast = { version = "0.7.1", optional = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] random-access-disk = { version = "3", default-features = false } @@ -61,7 +62,7 @@ tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] } [features] default = ["tokio", "sparse"] -replication = ["tokio"] +replication = ["tokio", "dep:async-broadcast"] sparse = ["random-access-disk/sparse"] tokio = ["dep:tokio", "random-access-disk/tokio"] async-std = ["random-access-disk/async-std"] diff --git a/src/core.rs b/src/core.rs index 249c42f..d848552 100644 --- a/src/core.rs +++ b/src/core.rs @@ -344,10 +344,8 @@ impl Hypercore { #[cfg(feature = "replication")] /// Subscribe to core events relevant to replication - pub fn event_subscribe( - &self, - ) -> tokio::sync::broadcast::Receiver { - self.events.channel.subscribe() + pub fn event_subscribe(&self) -> async_broadcast::Receiver { + self.events.channel.new_receiver() } /// Check if core has the block at the given `index` locally diff --git a/src/replication/events.rs b/src/replication/events.rs index dc7bd31..9dbcc77 100644 --- a/src/replication/events.rs +++ b/src/replication/events.rs @@ -1,6 +1,6 @@ //! events related to replication use crate::{common::BitfieldUpdate, HypercoreError}; -use tokio::sync::broadcast; +use async_broadcast::{broadcast, Receiver, Sender}; static MAX_EVENT_QUEUE_CAPACITY: usize = 32; @@ -11,7 +11,7 @@ pub struct Get { /// Index of the requested block pub index: u64, /// When the block is gotten this emits an event - pub get_result: broadcast::Sender<()>, + pub get_result: Sender<()>, } /// Emitted when @@ -75,14 +75,14 @@ impl_from_for_enum_variant!(Event, Have); #[cfg(feature = "tokio")] pub(crate) struct Events { /// Channel for core events - pub(crate) channel: broadcast::Sender, + pub(crate) channel: Sender, } #[cfg(feature = "tokio")] impl Events { pub(crate) fn new() -> Self { Self { - channel: broadcast::channel(MAX_EVENT_QUEUE_CAPACITY).0, + channel: broadcast(MAX_EVENT_QUEUE_CAPACITY).0, } } @@ -90,13 +90,13 @@ impl Events { /// For now we don't consider that an error, but just in case, we return a Result in case /// we want to change this or add another fail path later. pub(crate) fn send>(&self, evt: T) -> Result<(), HypercoreError> { - let _errs_when_no_replicators_subscribed = self.channel.send(evt.into()); + let _errs_when_no_replicators_subscribed = self.channel.broadcast(evt.into()); Ok(()) } /// Send a [`Get`] messages and return the channel associated with it. - pub(crate) fn send_on_get(&self, index: u64) -> broadcast::Receiver<()> { - let (tx, rx) = broadcast::channel(1); + pub(crate) fn send_on_get(&self, index: u64) -> Receiver<()> { + let (tx, rx) = broadcast(1); let _ = self.send(Get { index, get_result: tx, diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 3776a63..8c22f49 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -8,7 +8,7 @@ use crate::{ pub use events::Event; -use tokio::sync::{broadcast::Receiver, Mutex}; +use async_broadcast::Receiver; use std::future::Future; use std::sync::Arc; From 6654972719b850dbd6ca870cdb342c584689a6a0 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 22 Oct 2024 13:38:45 -0400 Subject: [PATCH 10/24] rm unneeded tokio feature checks --- src/replication/events.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/replication/events.rs b/src/replication/events.rs index 9dbcc77..c0541d0 100644 --- a/src/replication/events.rs +++ b/src/replication/events.rs @@ -72,13 +72,11 @@ impl_from_for_enum_variant!(Event, DataUpgrade); impl_from_for_enum_variant!(Event, Have); #[derive(Debug)] -#[cfg(feature = "tokio")] pub(crate) struct Events { /// Channel for core events pub(crate) channel: Sender, } -#[cfg(feature = "tokio")] impl Events { pub(crate) fn new() -> Self { Self { From f79f0b2920e96d2881c7d4329c6f7ab634e5e31f Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 22 Oct 2024 15:40:02 -0400 Subject: [PATCH 11/24] add async-lock use it to rm tokio --- Cargo.toml | 6 +++--- src/replication/mod.rs | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 95ca96e..b629d16 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,8 +39,8 @@ futures = "0.3" crc32fast = "1" intmap = "2" moka = { version = "0.12", optional = true, features = ["sync"] } -tokio = { version = "1.27.0", features = ["rt", "sync"], optional = true} async-broadcast = { version = "0.7.1", optional = true } +async-lock = {version = "3.4.0", optional = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] random-access-disk = { version = "3", default-features = false } @@ -62,9 +62,9 @@ tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] } [features] default = ["tokio", "sparse"] -replication = ["tokio", "dep:async-broadcast"] +replication = ["dep:async-broadcast", "dep:async-lock"] sparse = ["random-access-disk/sparse"] -tokio = ["dep:tokio", "random-access-disk/tokio"] +tokio = ["random-access-disk/tokio"] async-std = ["random-access-disk/async-std"] cache = ["moka"] # Used only in interoperability tests under tests/js-interop which use the javascript version of hypercore diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 8c22f49..8c29ea0 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -9,6 +9,7 @@ use crate::{ pub use events::Event; use async_broadcast::Receiver; +use async_lock::Mutex; use std::future::Future; use std::sync::Arc; From 2f6dc46ec9e07ce230293b3c56331044b4aa8c38 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 22 Oct 2024 16:16:01 -0400 Subject: [PATCH 12/24] use try_broadcast and set_await_active = false --- src/replication/events.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/replication/events.rs b/src/replication/events.rs index c0541d0..967ec89 100644 --- a/src/replication/events.rs +++ b/src/replication/events.rs @@ -79,22 +79,23 @@ pub(crate) struct Events { impl Events { pub(crate) fn new() -> Self { - Self { - channel: broadcast(MAX_EVENT_QUEUE_CAPACITY).0, - } + let mut channel = broadcast(MAX_EVENT_QUEUE_CAPACITY).0; + channel.set_await_active(false); + Self { channel } } /// The internal channel errors on send when no replicators are subscribed, /// For now we don't consider that an error, but just in case, we return a Result in case /// we want to change this or add another fail path later. pub(crate) fn send>(&self, evt: T) -> Result<(), HypercoreError> { - let _errs_when_no_replicators_subscribed = self.channel.broadcast(evt.into()); + let _errs_when_no_replicators_subscribed = self.channel.try_broadcast(evt.into()); Ok(()) } /// Send a [`Get`] messages and return the channel associated with it. pub(crate) fn send_on_get(&self, index: u64) -> Receiver<()> { - let (tx, rx) = broadcast(1); + let (mut tx, rx) = broadcast(1); + tx.set_await_active(false); let _ = self.send(Get { index, get_result: tx, From b6f9fa10860acf859bcd4710e4ff14ad78029acb Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 22 Oct 2024 16:35:20 -0400 Subject: [PATCH 13/24] fix issue where channel was closing --- src/replication/events.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/replication/events.rs b/src/replication/events.rs index 967ec89..4d695ae 100644 --- a/src/replication/events.rs +++ b/src/replication/events.rs @@ -1,6 +1,6 @@ //! events related to replication use crate::{common::BitfieldUpdate, HypercoreError}; -use async_broadcast::{broadcast, Receiver, Sender}; +use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender}; static MAX_EVENT_QUEUE_CAPACITY: usize = 32; @@ -75,13 +75,19 @@ impl_from_for_enum_variant!(Event, Have); pub(crate) struct Events { /// Channel for core events pub(crate) channel: Sender, + /// Kept around so `Events::channel` stays open. + _receiver: InactiveReceiver, } impl Events { pub(crate) fn new() -> Self { - let mut channel = broadcast(MAX_EVENT_QUEUE_CAPACITY).0; + let (mut channel, receiver) = broadcast(MAX_EVENT_QUEUE_CAPACITY); channel.set_await_active(false); - Self { channel } + let mut _receiver = receiver.deactivate(); + // Message sending is best effort. Is msg queue fills up, remove old messages to make place + // for new ones. + _receiver.set_overflow(true); + Self { channel, _receiver } } /// The internal channel errors on send when no replicators are subscribed, From d4e756cd2f7090e0383cbaa1fa2c29049ece4f3e Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 22 Oct 2024 17:53:16 -0400 Subject: [PATCH 14/24] Derive PartialEq for AppendOutcome & Info This was needed for tests. This is cheap and I imagin end users would want this too. --- src/core.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core.rs b/src/core.rs index d848552..6424812 100644 --- a/src/core.rs +++ b/src/core.rs @@ -53,7 +53,7 @@ pub struct Hypercore { } /// Response from append, matches that of the Javascript result -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct AppendOutcome { /// Length of the hypercore after append pub length: u64, @@ -62,7 +62,7 @@ pub struct AppendOutcome { } /// Info about the hypercore -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub struct Info { /// Length of the hypercore pub length: u64, From cdc52b834bc4bd1d7920812139e1fadce3991729 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 22 Oct 2024 16:53:10 -0400 Subject: [PATCH 15/24] docs --- src/core.rs | 4 ++-- src/replication/events.rs | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/core.rs b/src/core.rs index 6424812..2e18547 100644 --- a/src/core.rs +++ b/src/core.rs @@ -53,7 +53,7 @@ pub struct Hypercore { } /// Response from append, matches that of the Javascript result -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, PartialEq)] pub struct AppendOutcome { /// Length of the hypercore after append pub length: u64, @@ -359,7 +359,7 @@ impl Hypercore { pub async fn get(&mut self, index: u64) -> Result>, HypercoreError> { if !self.bitfield.get(index) { #[cfg(feature = "replication")] - // if not in this core, try to get over network + // if not in this core, emit Event::Get(index) { self.events.send_on_get(index); } diff --git a/src/replication/events.rs b/src/replication/events.rs index 4d695ae..5eebb4d 100644 --- a/src/replication/events.rs +++ b/src/replication/events.rs @@ -98,7 +98,8 @@ impl Events { Ok(()) } - /// Send a [`Get`] messages and return the channel associated with it. + /// Send a [`Get`] messages and return [`Receiver`] that will receive a message when block is + /// gotten. pub(crate) fn send_on_get(&self, index: u64) -> Receiver<()> { let (mut tx, rx) = broadcast(1); tx.set_await_active(false); From 25c722a4111bc6201f67fd3ce43c3a43741e8f1f Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 22 Oct 2024 17:54:26 -0400 Subject: [PATCH 16/24] pub create_hypercore_with_data available for tests --- src/core.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/core.rs b/src/core.rs index 2e18547..d1bc148 100644 --- a/src/core.rs +++ b/src/core.rs @@ -769,7 +769,7 @@ fn update_contiguous_length( } #[cfg(test)] -mod tests { +pub(crate) mod tests { use super::*; #[async_std::test] @@ -1135,7 +1135,9 @@ mod tests { Ok(()) } - async fn create_hypercore_with_data(length: u64) -> Result { + pub(crate) async fn create_hypercore_with_data( + length: u64, + ) -> Result { let signing_key = generate_signing_key(); create_hypercore_with_data_and_key_pair( length, From b52b25aa18359cc69054b55f80f99f018cc8d064 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 22 Oct 2024 17:55:05 -0400 Subject: [PATCH 17/24] Add tests for SharedCore methods and events --- src/replication/mod.rs | 112 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 8c29ea0..a32e3c3 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -194,3 +194,115 @@ impl CoreMethods for SharedCore { } } } + +#[cfg(test)] +mod tests { + use events::{Get, Have}; + + use super::*; + + #[async_std::test] + async fn shared_core_methods() -> Result<(), CoreMethodsError> { + let core = crate::core::tests::create_hypercore_with_data(0).await?; + let core = SharedCore::from(core); + + let info = core.info().await; + assert_eq!( + info, + crate::core::Info { + length: 0, + byte_length: 0, + contiguous_length: 0, + fork: 0, + writeable: true, + } + ); + + // key_pair is random, nothing to test here + let _kp = core.key_pair().await; + + assert_eq!(core.has(0).await, false); + assert_eq!(core.get(0).await?, None); + let res = core.append(b"foo").await?; + assert_eq!( + res, + AppendOutcome { + length: 1, + byte_length: 3 + } + ); + assert_eq!(core.has(0).await, true); + assert_eq!(core.get(0).await?, Some(b"foo".into())); + let res = core.append_batch([b"hello", b"world"]).await?; + assert_eq!( + res, + AppendOutcome { + length: 3, + byte_length: 13 + } + ); + assert_eq!(core.has(2).await, true); + assert_eq!(core.get(2).await?, Some(b"world".into())); + Ok(()) + } + + #[async_std::test] + async fn test_events() -> Result<(), CoreMethodsError> { + let core = crate::core::tests::create_hypercore_with_data(0).await?; + let core = SharedCore::from(core); + + // Check that appending data emits a DataUpgrade and Have event + + let mut rx = core.event_subscribe().await; + let handle = async_std::task::spawn(async move { + let mut out = vec![]; + loop { + if out.len() == 2 { + return (out, rx); + } + if let Ok(evt) = rx.recv().await { + out.push(evt); + } + } + }); + core.append(b"foo").await?; + let (res, mut rx) = handle.await; + assert!(matches!(res[0], Event::DataUpgrade(_))); + assert!(matches!( + res[1], + Event::Have(Have { + start: 0, + length: 1, + drop: false + }) + )); + // no messages in queue + assert!(rx.is_empty()); + + // Check that Hypercore::get for missing data emits a Get event + + let handle = async_std::task::spawn(async move { + let mut out = vec![]; + loop { + if out.len() == 1 { + return (out, rx); + } + if let Ok(evt) = rx.recv().await { + out.push(evt); + } + } + }); + assert_eq!(core.get(1).await?, None); + let (res, rx) = handle.await; + assert!(matches!( + res[0], + Event::Get(Get { + index: 1, + get_result: _ + }) + )); + // no messages in queue + assert!(rx.is_empty()); + Ok(()) + } +} From d6dd673e514c847fae0e57c22d8b0e81b3525d2c Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Wed, 23 Oct 2024 12:08:56 -0400 Subject: [PATCH 18/24] Move shared_core to own module and add as feature --- Cargo.toml | 5 +- src/replication/mod.rs | 232 +-------------------------------- src/replication/shared_core.rs | 232 +++++++++++++++++++++++++++++++++ 3 files changed, 242 insertions(+), 227 deletions(-) create mode 100644 src/replication/shared_core.rs diff --git a/Cargo.toml b/Cargo.toml index b629d16..1ad0d43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,8 +61,9 @@ test-log = { version = "0.2.11", default-features = false, features = ["trace"] tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] } [features] -default = ["tokio", "sparse"] -replication = ["dep:async-broadcast", "dep:async-lock"] +default = ["tokio", "sparse", "replication"] +replication = ["dep:async-broadcast"] +shared-core = ["replication", "dep:async-lock"] sparse = ["random-access-disk/sparse"] tokio = ["random-access-disk/tokio"] async-std = ["random-access-disk/async-std"] diff --git a/src/replication/mod.rs b/src/replication/mod.rs index a32e3c3..5caf9ae 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -1,33 +1,20 @@ //! External interface for replication pub mod events; +#[cfg(feature = "shared-core")] +pub mod shared_core; + +#[cfg(feature = "shared-core")] +pub use shared_core::SharedCore; use crate::{ - AppendOutcome, Hypercore, HypercoreError, Info, PartialKeypair, Proof, RequestBlock, - RequestSeek, RequestUpgrade, + AppendOutcome, HypercoreError, Info, PartialKeypair, Proof, RequestBlock, RequestSeek, + RequestUpgrade, }; pub use events::Event; use async_broadcast::Receiver; -use async_lock::Mutex; - use std::future::Future; -use std::sync::Arc; -/// Hypercore that can have multiple owners -#[derive(Debug, Clone)] -pub struct SharedCore(pub Arc>); - -impl From for SharedCore { - fn from(core: Hypercore) -> Self { - SharedCore(Arc::new(Mutex::new(core))) - } -} -impl SharedCore { - /// Create a shared core from a [`Hypercore`] - pub fn from_hypercore(core: Hypercore) -> Self { - SharedCore(Arc::new(Mutex::new(core))) - } -} /// Methods related to just this core's information pub trait CoreInfo { @@ -37,22 +24,6 @@ pub trait CoreInfo { fn key_pair(&self) -> impl Future + Send; } -impl CoreInfo for SharedCore { - fn info(&self) -> impl Future + Send { - async move { - let core = &self.0.lock().await; - core.info() - } - } - - fn key_pair(&self) -> impl Future + Send { - async move { - let core = &self.0.lock().await; - core.key_pair().clone() - } - } -} - /// Error for ReplicationMethods trait #[derive(thiserror::Error, Debug)] pub enum ReplicationMethodsError { @@ -85,45 +56,6 @@ pub trait ReplicationMethods: CoreInfo + Send { fn event_subscribe(&self) -> impl Future>; } -impl ReplicationMethods for SharedCore { - fn verify_and_apply_proof( - &self, - proof: &Proof, - ) -> impl Future> { - async move { - let mut core = self.0.lock().await; - Ok(core.verify_and_apply_proof(proof).await?) - } - } - - fn missing_nodes( - &self, - index: u64, - ) -> impl Future> { - async move { - let mut core = self.0.lock().await; - Ok(core.missing_nodes(index).await?) - } - } - - fn create_proof( - &self, - block: Option, - hash: Option, - seek: Option, - upgrade: Option, - ) -> impl Future, ReplicationMethodsError>> { - async move { - let mut core = self.0.lock().await; - Ok(core.create_proof(block, hash, seek, upgrade).await?) - } - } - - fn event_subscribe(&self) -> impl Future> { - async move { self.0.lock().await.event_subscribe() } - } -} - /// Error for ReplicationMethods trait #[derive(thiserror::Error, Debug)] pub enum CoreMethodsError { @@ -156,153 +88,3 @@ pub trait CoreMethods: CoreInfo { batch: B, ) -> impl Future> + Send; } - -impl CoreMethods for SharedCore { - fn has(&self, index: u64) -> impl Future + Send { - async move { - let core = self.0.lock().await; - core.has(index) - } - } - fn get( - &self, - index: u64, - ) -> impl Future>, CoreMethodsError>> + Send { - async move { - let mut core = self.0.lock().await; - Ok(core.get(index).await?) - } - } - - fn append( - &self, - data: &[u8], - ) -> impl Future> + Send { - async move { - let mut core = self.0.lock().await; - Ok(core.append(data).await?) - } - } - - fn append_batch, B: AsRef<[A]> + Send>( - &self, - batch: B, - ) -> impl Future> + Send { - async move { - let mut core = self.0.lock().await; - Ok(core.append_batch(batch).await?) - } - } -} - -#[cfg(test)] -mod tests { - use events::{Get, Have}; - - use super::*; - - #[async_std::test] - async fn shared_core_methods() -> Result<(), CoreMethodsError> { - let core = crate::core::tests::create_hypercore_with_data(0).await?; - let core = SharedCore::from(core); - - let info = core.info().await; - assert_eq!( - info, - crate::core::Info { - length: 0, - byte_length: 0, - contiguous_length: 0, - fork: 0, - writeable: true, - } - ); - - // key_pair is random, nothing to test here - let _kp = core.key_pair().await; - - assert_eq!(core.has(0).await, false); - assert_eq!(core.get(0).await?, None); - let res = core.append(b"foo").await?; - assert_eq!( - res, - AppendOutcome { - length: 1, - byte_length: 3 - } - ); - assert_eq!(core.has(0).await, true); - assert_eq!(core.get(0).await?, Some(b"foo".into())); - let res = core.append_batch([b"hello", b"world"]).await?; - assert_eq!( - res, - AppendOutcome { - length: 3, - byte_length: 13 - } - ); - assert_eq!(core.has(2).await, true); - assert_eq!(core.get(2).await?, Some(b"world".into())); - Ok(()) - } - - #[async_std::test] - async fn test_events() -> Result<(), CoreMethodsError> { - let core = crate::core::tests::create_hypercore_with_data(0).await?; - let core = SharedCore::from(core); - - // Check that appending data emits a DataUpgrade and Have event - - let mut rx = core.event_subscribe().await; - let handle = async_std::task::spawn(async move { - let mut out = vec![]; - loop { - if out.len() == 2 { - return (out, rx); - } - if let Ok(evt) = rx.recv().await { - out.push(evt); - } - } - }); - core.append(b"foo").await?; - let (res, mut rx) = handle.await; - assert!(matches!(res[0], Event::DataUpgrade(_))); - assert!(matches!( - res[1], - Event::Have(Have { - start: 0, - length: 1, - drop: false - }) - )); - // no messages in queue - assert!(rx.is_empty()); - - // Check that Hypercore::get for missing data emits a Get event - - let handle = async_std::task::spawn(async move { - let mut out = vec![]; - loop { - if out.len() == 1 { - return (out, rx); - } - if let Ok(evt) = rx.recv().await { - out.push(evt); - } - } - }); - assert_eq!(core.get(1).await?, None); - let (res, rx) = handle.await; - assert!(matches!( - res[0], - Event::Get(Get { - index: 1, - get_result: _ - }) - )); - // no messages in queue - assert!(rx.is_empty()); - Ok(()) - } -} diff --git a/src/replication/shared_core.rs b/src/replication/shared_core.rs new file mode 100644 index 0000000..5bc4534 --- /dev/null +++ b/src/replication/shared_core.rs @@ -0,0 +1,232 @@ +//! Implementation of a Hypercore that can have multiple owners. Along with implementations of all +//! the hypercore traits. +use crate::{ + AppendOutcome, Hypercore, Info, PartialKeypair, Proof, RequestBlock, RequestSeek, + RequestUpgrade, +}; +use async_broadcast::Receiver; +use async_lock::Mutex; +use std::{future::Future, sync::Arc}; + +use super::{ + CoreInfo, CoreMethods, CoreMethodsError, Event, ReplicationMethods, ReplicationMethodsError, +}; + +/// Hypercore that can have multiple owners +#[derive(Debug, Clone)] +pub struct SharedCore(pub Arc>); + +impl From for SharedCore { + fn from(core: Hypercore) -> Self { + SharedCore(Arc::new(Mutex::new(core))) + } +} +impl SharedCore { + /// Create a shared core from a [`Hypercore`] + pub fn from_hypercore(core: Hypercore) -> Self { + SharedCore(Arc::new(Mutex::new(core))) + } +} + +impl CoreInfo for SharedCore { + fn info(&self) -> impl Future + Send { + async move { + let core = &self.0.lock().await; + core.info() + } + } + + fn key_pair(&self) -> impl Future + Send { + async move { + let core = &self.0.lock().await; + core.key_pair().clone() + } + } +} + +impl ReplicationMethods for SharedCore { + fn verify_and_apply_proof( + &self, + proof: &Proof, + ) -> impl Future> { + async move { + let mut core = self.0.lock().await; + Ok(core.verify_and_apply_proof(proof).await?) + } + } + + fn missing_nodes( + &self, + index: u64, + ) -> impl Future> { + async move { + let mut core = self.0.lock().await; + Ok(core.missing_nodes(index).await?) + } + } + + fn create_proof( + &self, + block: Option, + hash: Option, + seek: Option, + upgrade: Option, + ) -> impl Future, ReplicationMethodsError>> { + async move { + let mut core = self.0.lock().await; + Ok(core.create_proof(block, hash, seek, upgrade).await?) + } + } + + fn event_subscribe(&self) -> impl Future> { + async move { self.0.lock().await.event_subscribe() } + } +} + +impl CoreMethods for SharedCore { + fn has(&self, index: u64) -> impl Future + Send { + async move { + let core = self.0.lock().await; + core.has(index) + } + } + fn get( + &self, + index: u64, + ) -> impl Future>, CoreMethodsError>> + Send { + async move { + let mut core = self.0.lock().await; + Ok(core.get(index).await?) + } + } + + fn append( + &self, + data: &[u8], + ) -> impl Future> + Send { + async move { + let mut core = self.0.lock().await; + Ok(core.append(data).await?) + } + } + + fn append_batch, B: AsRef<[A]> + Send>( + &self, + batch: B, + ) -> impl Future> + Send { + async move { + let mut core = self.0.lock().await; + Ok(core.append_batch(batch).await?) + } + } +} + +#[cfg(test)] +mod tests { + + use super::*; + use crate::replication::events::{Get, Have}; + #[async_std::test] + async fn shared_core_methods() -> Result<(), CoreMethodsError> { + let core = crate::core::tests::create_hypercore_with_data(0).await?; + let core = SharedCore::from(core); + + let info = core.info().await; + assert_eq!( + info, + crate::core::Info { + length: 0, + byte_length: 0, + contiguous_length: 0, + fork: 0, + writeable: true, + } + ); + + // key_pair is random, nothing to test here + let _kp = core.key_pair().await; + + assert_eq!(core.has(0).await, false); + assert_eq!(core.get(0).await?, None); + let res = core.append(b"foo").await?; + assert_eq!( + res, + AppendOutcome { + length: 1, + byte_length: 3 + } + ); + assert_eq!(core.has(0).await, true); + assert_eq!(core.get(0).await?, Some(b"foo".into())); + let res = core.append_batch([b"hello", b"world"]).await?; + assert_eq!( + res, + AppendOutcome { + length: 3, + byte_length: 13 + } + ); + assert_eq!(core.has(2).await, true); + assert_eq!(core.get(2).await?, Some(b"world".into())); + Ok(()) + } + + #[async_std::test] + async fn test_events() -> Result<(), CoreMethodsError> { + let mut core = crate::core::tests::create_hypercore_with_data(0).await?; + + // Check that appending data emits a DataUpgrade and Have event + + let mut rx = core.event_subscribe(); + let handle = async_std::task::spawn(async move { + let mut out = vec![]; + loop { + if out.len() == 2 { + return (out, rx); + } + if let Ok(evt) = rx.recv().await { + out.push(evt); + } + } + }); + core.append(b"foo").await?; + let (res, mut rx) = handle.await; + assert!(matches!(res[0], Event::DataUpgrade(_))); + assert!(matches!( + res[1], + Event::Have(Have { + start: 0, + length: 1, + drop: false + }) + )); + // no messages in queue + assert!(rx.is_empty()); + + // Check that Hypercore::get for missing data emits a Get event + + let handle = async_std::task::spawn(async move { + let mut out = vec![]; + loop { + if out.len() == 1 { + return (out, rx); + } + if let Ok(evt) = rx.recv().await { + out.push(evt); + } + } + }); + assert_eq!(core.get(1).await?, None); + let (res, rx) = handle.await; + assert!(matches!( + res[0], + Event::Get(Get { + index: 1, + get_result: _ + }) + )); + // no messages in queue + assert!(rx.is_empty()); + Ok(()) + } +} From 659275db3924cb77eab71ad29542e836cbc62756 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Thu, 24 Oct 2024 16:33:30 -0400 Subject: [PATCH 19/24] Add shared-core tests to CI --- .github/workflows/ci.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a51cf66..d0fac45 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -39,9 +39,11 @@ jobs: cargo check --all-targets --no-default-features --features async-std,sparse cargo check --all-targets --no-default-features --features async-std,sparse,cache cargo test --no-default-features --features js_interop_tests,tokio + cargo test --no-default-features --features js_interop_tests,tokio,shared-core cargo test --no-default-features --features js_interop_tests,tokio,sparse cargo test --no-default-features --features js_interop_tests,tokio,sparse,cache cargo test --no-default-features --features js_interop_tests,async-std + cargo test --no-default-features --features js_interop_tests,async-std,shared-core cargo test --no-default-features --features js_interop_tests,async-std,sparse cargo test --no-default-features --features js_interop_tests,async-std,sparse,cache cargo test --benches --no-default-features --features tokio @@ -64,9 +66,11 @@ jobs: cargo check --all-targets --no-default-features --features async-std,sparse cargo check --all-targets --no-default-features --features async-std,sparse,cache cargo test --no-default-features --features tokio + cargo test --no-default-features --features tokio,shared-core cargo test --no-default-features --features tokio,sparse cargo test --no-default-features --features tokio,sparse,cache cargo test --no-default-features --features async-std + cargo test --no-default-features --features async-std,shared-core cargo test --no-default-features --features async-std,sparse cargo test --no-default-features --features async-std,sparse,cache cargo test --benches --no-default-features --features tokio @@ -89,9 +93,11 @@ jobs: cargo check --all-targets --no-default-features --features async-std,sparse cargo check --all-targets --no-default-features --features async-std,sparse,cache cargo test --no-default-features --features js_interop_tests,tokio + cargo test --no-default-features --features js_interop_tests,tokio,shared-core cargo test --no-default-features --features js_interop_tests,tokio,sparse cargo test --no-default-features --features js_interop_tests,tokio,sparse,cache cargo test --no-default-features --features js_interop_tests,async-std + cargo test --no-default-features --features js_interop_tests,async-std,shared-core cargo test --no-default-features --features js_interop_tests,async-std,sparse cargo test --no-default-features --features js_interop_tests,async-std,sparse,cache cargo test --benches --no-default-features --features tokio From 745165d57b58537cc766c8124783db7a713c99b1 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Wed, 23 Oct 2024 13:11:10 -0400 Subject: [PATCH 20/24] create_core_with_data_and_key pub(crate) for tests --- src/core.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core.rs b/src/core.rs index d1bc148..cf82049 100644 --- a/src/core.rs +++ b/src/core.rs @@ -1149,7 +1149,7 @@ pub(crate) mod tests { .await } - async fn create_hypercore_with_data_and_key_pair( + pub(crate) async fn create_hypercore_with_data_and_key_pair( length: u64, key_pair: PartialKeypair, ) -> Result { From 7626a178b027c81aad51a0987490e3d4f1cc2ef3 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Wed, 23 Oct 2024 13:11:50 -0400 Subject: [PATCH 21/24] move test_events to events module --- src/replication/events.rs | 65 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/src/replication/events.rs b/src/replication/events.rs index 5eebb4d..baf16e3 100644 --- a/src/replication/events.rs +++ b/src/replication/events.rs @@ -110,3 +110,68 @@ impl Events { rx } } + +#[cfg(test)] +mod test { + use super::*; + use crate::replication::CoreMethodsError; + + #[async_std::test] + async fn test_events() -> Result<(), CoreMethodsError> { + let mut core = crate::core::tests::create_hypercore_with_data(0).await?; + + // Check that appending data emits a DataUpgrade and Have event + + let mut rx = core.event_subscribe(); + let handle = async_std::task::spawn(async move { + let mut out = vec![]; + loop { + if out.len() == 2 { + return (out, rx); + } + if let Ok(evt) = rx.recv().await { + out.push(evt); + } + } + }); + core.append(b"foo").await?; + let (res, mut rx) = handle.await; + assert!(matches!(res[0], Event::DataUpgrade(_))); + assert!(matches!( + res[1], + Event::Have(Have { + start: 0, + length: 1, + drop: false + }) + )); + // no messages in queue + assert!(rx.is_empty()); + + // Check that Hypercore::get for missing data emits a Get event + + let handle = async_std::task::spawn(async move { + let mut out = vec![]; + loop { + if out.len() == 1 { + return (out, rx); + } + if let Ok(evt) = rx.recv().await { + out.push(evt); + } + } + }); + assert_eq!(core.get(1).await?, None); + let (res, rx) = handle.await; + assert!(matches!( + res[0], + Event::Get(Get { + index: 1, + get_result: _ + }) + )); + // no messages in queue + assert!(rx.is_empty()); + Ok(()) + } +} From 384ea8cd569163dd310b6c930c0e5f1defa04183 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Wed, 23 Oct 2024 13:12:08 -0400 Subject: [PATCH 22/24] Add From for ReplMethErr --- src/replication/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 5caf9ae..e48d6c3 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -30,6 +30,9 @@ pub enum ReplicationMethodsError { /// Error from hypercore #[error("Got a hypercore error: [{0}]")] HypercoreError(#[from] HypercoreError), + /// Error from CoreMethods + #[error("Got a CoreMethods error: [{0}]")] + CoreMethodsError(#[from] CoreMethodsError), } /// Methods needed for replication @@ -56,7 +59,7 @@ pub trait ReplicationMethods: CoreInfo + Send { fn event_subscribe(&self) -> impl Future>; } -/// Error for ReplicationMethods trait +/// Error for CoreMethods trait #[derive(thiserror::Error, Debug)] pub enum CoreMethodsError { /// Error from hypercore From fea70bb1aa332d1958ab478c1fd78772dfbbbcbb Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Wed, 23 Oct 2024 13:12:35 -0400 Subject: [PATCH 23/24] Add test for replication methods --- src/replication/shared_core.rs | 104 +++++++++++++++------------------ 1 file changed, 48 insertions(+), 56 deletions(-) diff --git a/src/replication/shared_core.rs b/src/replication/shared_core.rs index 5bc4534..f30de47 100644 --- a/src/replication/shared_core.rs +++ b/src/replication/shared_core.rs @@ -125,12 +125,14 @@ impl CoreMethods for SharedCore { mod tests { use super::*; - use crate::replication::events::{Get, Have}; + + use crate::core::tests::{create_hypercore_with_data, create_hypercore_with_data_and_key_pair}; #[async_std::test] async fn shared_core_methods() -> Result<(), CoreMethodsError> { let core = crate::core::tests::create_hypercore_with_data(0).await?; let core = SharedCore::from(core); + // check CoreInfo let info = core.info().await; assert_eq!( info, @@ -146,6 +148,7 @@ mod tests { // key_pair is random, nothing to test here let _kp = core.key_pair().await; + // check CoreMethods assert_eq!(core.has(0).await, false); assert_eq!(core.get(0).await?, None); let res = core.append(b"foo").await?; @@ -172,61 +175,50 @@ mod tests { } #[async_std::test] - async fn test_events() -> Result<(), CoreMethodsError> { - let mut core = crate::core::tests::create_hypercore_with_data(0).await?; - - // Check that appending data emits a DataUpgrade and Have event - - let mut rx = core.event_subscribe(); - let handle = async_std::task::spawn(async move { - let mut out = vec![]; - loop { - if out.len() == 2 { - return (out, rx); - } - if let Ok(evt) = rx.recv().await { - out.push(evt); - } - } - }); - core.append(b"foo").await?; - let (res, mut rx) = handle.await; - assert!(matches!(res[0], Event::DataUpgrade(_))); - assert!(matches!( - res[1], - Event::Have(Have { - start: 0, - length: 1, - drop: false - }) - )); - // no messages in queue - assert!(rx.is_empty()); - - // Check that Hypercore::get for missing data emits a Get event - - let handle = async_std::task::spawn(async move { - let mut out = vec![]; - loop { - if out.len() == 1 { - return (out, rx); - } - if let Ok(evt) = rx.recv().await { - out.push(evt); - } - } - }); - assert_eq!(core.get(1).await?, None); - let (res, rx) = handle.await; - assert!(matches!( - res[0], - Event::Get(Get { - index: 1, - get_result: _ - }) - )); - // no messages in queue - assert!(rx.is_empty()); + async fn shared_core_replication_methods() -> Result<(), ReplicationMethodsError> { + let main = create_hypercore_with_data(10).await?; + let clone = create_hypercore_with_data_and_key_pair( + 0, + PartialKeypair { + public: main.key_pair.public, + secret: None, + }, + ) + .await?; + + let main = SharedCore::from(main); + let clone = SharedCore::from(clone); + + let index = 6; + let nodes = clone.missing_nodes(index).await?; + let proof = main + .create_proof( + None, + Some(RequestBlock { index, nodes }), + None, + Some(RequestUpgrade { + start: 0, + length: 10, + }), + ) + .await? + .unwrap(); + assert!(clone.verify_and_apply_proof(&proof).await?); + let main_info = main.info().await; + let clone_info = clone.info().await; + assert_eq!(main_info.byte_length, clone_info.byte_length); + assert_eq!(main_info.length, clone_info.length); + assert!(main.get(6).await?.is_some()); + assert!(clone.get(6).await?.is_none()); + + // Fetch data for index 6 and verify it is found + let index = 6; + let nodes = clone.missing_nodes(index).await?; + let proof = main + .create_proof(Some(RequestBlock { index, nodes }), None, None, None) + .await? + .unwrap(); + assert!(clone.verify_and_apply_proof(&proof).await?); Ok(()) } } From 5725b3812ab53d353342df7b8fccb691def8bf5a Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Wed, 23 Oct 2024 13:18:40 -0400 Subject: [PATCH 24/24] fix doc errors --- src/replication/events.rs | 4 ++-- src/replication/mod.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/replication/events.rs b/src/replication/events.rs index baf16e3..b9c07df 100644 --- a/src/replication/events.rs +++ b/src/replication/events.rs @@ -4,9 +4,9 @@ use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender}; static MAX_EVENT_QUEUE_CAPACITY: usize = 32; -/// Event emeitted by [`Events::send_on_get`] +/// Event emitted by [`crate::Hypercore::event_subscribe`] #[derive(Debug, Clone)] -/// Emitted when [`Hypercore::get`] is called when the block is missing. +/// Emitted when [`crate::Hypercore::get`] is called when the block is missing. pub struct Get { /// Index of the requested block pub index: u64, diff --git a/src/replication/mod.rs b/src/replication/mod.rs index e48d6c3..166cb30 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -18,9 +18,9 @@ use std::future::Future; /// Methods related to just this core's information pub trait CoreInfo { - /// Get core info (see: [`Hypercore::info`] + /// Get core info (see: [`crate::Hypercore::info`] fn info(&self) -> impl Future + Send; - /// Get the key_pair (see: [`Hypercore::key_pair`] + /// Get the key_pair (see: [`crate::Hypercore::key_pair`] fn key_pair(&self) -> impl Future + Send; } @@ -68,7 +68,7 @@ pub enum CoreMethodsError { } /// Trait for things that consume [`crate::Hypercore`] can instead use this trait -/// so they can use all Hypercore-like things such as [`SharedCore`]. +/// so they can use all Hypercore-like things such as `SharedCore`. pub trait CoreMethods: CoreInfo { /// Check if the core has the block at the given index locally fn has(&self, index: u64) -> impl Future + Send;