From 5902243c5c45c9aaef913275808a2f1b260cfa84 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Mon, 21 Oct 2024 12:38:52 -0400 Subject: [PATCH] Use replication feature within Hypercore --- src/core.rs | 57 +++++++++++++++++++++++++++++++++++++++++++++++++---- src/lib.rs | 4 +++- 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/src/core.rs b/src/core.rs index 886ff98..e966a04 100644 --- a/src/core.rs +++ b/src/core.rs @@ -48,10 +48,12 @@ pub struct Hypercore { pub(crate) bitfield: Bitfield, skip_flush_count: u8, // autoFlush in Javascript header: Header, + #[cfg(feature = "replication")] + events: crate::replication::events::Events, } /// Response from append, matches that of the Javascript result -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct AppendOutcome { /// Length of the hypercore after append pub length: u64, @@ -247,6 +249,8 @@ impl Hypercore { bitfield, header, skip_flush_count: 0, + #[cfg(feature = "replication")] + events: crate::replication::events::Events::new(), }) } @@ -321,6 +325,14 @@ impl Hypercore { if self.should_flush_bitfield_and_tree_and_oplog() { self.flush_bitfield_and_tree_and_oplog(false).await?; } + + #[cfg(feature = "replication")] + { + let _ = self.events.send(crate::replication::events::DataUpgrade {}); + let _ = self + .events + .send(crate::replication::events::Have::from(&bitfield_update)); + } } // Return the new value @@ -330,10 +342,32 @@ impl Hypercore { }) } + #[cfg(feature = "replication")] + /// Subscribe to core events relevant to replication + pub fn event_subscribe( + &self, + ) -> tokio::sync::broadcast::Receiver { + self.events.channel.subscribe() + } + + /// Check if core has the block at the given `index` locally + #[instrument(ret, skip(self))] + pub fn has(&self, index: u64) -> bool { + self.bitfield.get(index) + } + /// Read value at given index, if any. #[instrument(err, skip(self))] pub async fn get(&mut self, index: u64) -> Result>, HypercoreError> { if !self.bitfield.get(index) { + #[cfg(feature = "replication")] + // if not in this core, try to get over network + { + let mut rx = self.events.send_on_get(index); + tokio::spawn(async move { + let _err_when_no_peers = rx.recv().await; + }); + } return Ok(None); } @@ -522,12 +556,12 @@ impl Hypercore { self.storage.flush_infos(&outcome.infos_to_flush).await?; self.header = outcome.header; - if let Some(bitfield_update) = bitfield_update { + if let Some(bitfield_update) = &bitfield_update { // Write to bitfield - self.bitfield.update(&bitfield_update); + self.bitfield.update(bitfield_update); // Contiguous length is known only now - update_contiguous_length(&mut self.header, &self.bitfield, &bitfield_update); + update_contiguous_length(&mut self.header, &self.bitfield, bitfield_update); } // Commit changeset to in-memory tree @@ -537,6 +571,21 @@ impl Hypercore { if self.should_flush_bitfield_and_tree_and_oplog() { self.flush_bitfield_and_tree_and_oplog(false).await?; } + + #[cfg(feature = "replication")] + { + if proof.upgrade.is_some() { + /// Notify replicator if we receieved an upgrade + let _ = self.events.send(crate::replication::events::DataUpgrade {}); + } + + /// Notify replicator if we receieved a bitfield update + if let Some(ref bitfield) = bitfield_update { + let _ = self + .events + .send(crate::replication::events::Have::from(bitfield)); + } + } Ok(true) } diff --git a/src/lib.rs b/src/lib.rs index 24d8627..eae3b21 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -#![forbid(unsafe_code, bad_style, future_incompatible)] +#![forbid(unsafe_code, future_incompatible)] #![forbid(rust_2018_idioms, rust_2018_compatibility)] #![forbid(missing_debug_implementations)] #![forbid(missing_docs)] @@ -74,6 +74,8 @@ pub mod encoding; pub mod prelude; +#[cfg(feature = "replication")] +pub mod replication; mod bitfield; mod builder;