Skip to content

Commit

Permalink
Merge pull request #142 from cowlicks/replication
Browse files Browse the repository at this point in the history
Add events needed for replication
  • Loading branch information
ttiurani authored Oct 25, 2024
2 parents 7f70249 + 5725b38 commit c146362
Show file tree
Hide file tree
Showing 11 changed files with 577 additions and 23 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ jobs:
cargo check --all-targets --no-default-features --features async-std,sparse
cargo check --all-targets --no-default-features --features async-std,sparse,cache
cargo test --no-default-features --features js_interop_tests,tokio
cargo test --no-default-features --features js_interop_tests,tokio,shared-core
cargo test --no-default-features --features js_interop_tests,tokio,sparse
cargo test --no-default-features --features js_interop_tests,tokio,sparse,cache
cargo test --no-default-features --features js_interop_tests,async-std
cargo test --no-default-features --features js_interop_tests,async-std,shared-core
cargo test --no-default-features --features js_interop_tests,async-std,sparse
cargo test --no-default-features --features js_interop_tests,async-std,sparse,cache
cargo test --benches --no-default-features --features tokio
Expand All @@ -64,9 +66,11 @@ jobs:
cargo check --all-targets --no-default-features --features async-std,sparse
cargo check --all-targets --no-default-features --features async-std,sparse,cache
cargo test --no-default-features --features tokio
cargo test --no-default-features --features tokio,shared-core
cargo test --no-default-features --features tokio,sparse
cargo test --no-default-features --features tokio,sparse,cache
cargo test --no-default-features --features async-std
cargo test --no-default-features --features async-std,shared-core
cargo test --no-default-features --features async-std,sparse
cargo test --no-default-features --features async-std,sparse,cache
cargo test --benches --no-default-features --features tokio
Expand All @@ -89,9 +93,11 @@ jobs:
cargo check --all-targets --no-default-features --features async-std,sparse
cargo check --all-targets --no-default-features --features async-std,sparse,cache
cargo test --no-default-features --features js_interop_tests,tokio
cargo test --no-default-features --features js_interop_tests,tokio,shared-core
cargo test --no-default-features --features js_interop_tests,tokio,sparse
cargo test --no-default-features --features js_interop_tests,tokio,sparse,cache
cargo test --no-default-features --features js_interop_tests,async-std
cargo test --no-default-features --features js_interop_tests,async-std,shared-core
cargo test --no-default-features --features js_interop_tests,async-std,sparse
cargo test --no-default-features --features js_interop_tests,async-std,sparse,cache
cargo test --benches --no-default-features --features tokio
Expand Down
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ futures = "0.3"
crc32fast = "1"
intmap = "2"
moka = { version = "0.12", optional = true, features = ["sync"] }
async-broadcast = { version = "0.7.1", optional = true }
async-lock = {version = "3.4.0", optional = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
random-access-disk = { version = "3", default-features = false }
Expand All @@ -59,7 +61,9 @@ test-log = { version = "0.2.11", default-features = false, features = ["trace"]
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] }

[features]
default = ["tokio", "sparse"]
default = ["tokio", "sparse", "replication"]
replication = ["dep:async-broadcast"]
shared-core = ["replication", "dep:async-lock"]
sparse = ["random-access-disk/sparse"]
tokio = ["random-access-disk/tokio"]
async-std = ["random-access-disk/async-std"]
Expand Down
7 changes: 6 additions & 1 deletion src/common/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,21 @@ pub(crate) struct NodeByteRange {
pub(crate) length: u64,
}

/// Nodes that are persisted to disk.
/// Nodes of the Merkle Tree that are persisted to disk.
// TODO: replace `hash: Vec<u8>` with `hash: Hash`. This requires patching /
// rewriting the Blake2b crate to support `.from_bytes()` to serialize from
// disk.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Node {
/// This node's index in the Merkle tree
pub(crate) index: u64,
/// Hash of the data in this node
pub(crate) hash: Vec<u8>,
/// Number of bytes in this [`Node::data`]
pub(crate) length: u64,
/// Index of this nodes parent
pub(crate) parent: u64,
/// Hypercore's data. Can be receieved after the rest of the node, so it's optional.
pub(crate) data: Option<Vec<u8>>,
pub(crate) blank: bool,
}
Expand Down
11 changes: 6 additions & 5 deletions src/common/peer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Types needed for passing information with with peers.
//! hypercore-protocol-rs uses these types and wraps them
//! into wire messages.
use crate::Node;

#[derive(Debug, Clone, PartialEq)]
Expand All @@ -20,7 +21,7 @@ pub struct RequestSeek {
}

#[derive(Debug, Clone, PartialEq)]
/// Request of a DataUpgrade from peer
/// Request for a DataUpgrade from peer
pub struct RequestUpgrade {
/// Hypercore start index
pub start: u64,
Expand Down Expand Up @@ -79,7 +80,7 @@ pub struct DataBlock {
pub index: u64,
/// Data block value in bytes
pub value: Vec<u8>,
/// TODO: document
/// Nodes of the merkle tree
pub nodes: Vec<Node>,
}

Expand All @@ -104,11 +105,11 @@ pub struct DataSeek {
#[derive(Debug, Clone, PartialEq)]
/// TODO: Document
pub struct DataUpgrade {
/// TODO: Document
/// Starting block of this upgrade response
pub start: u64,
/// TODO: Document
/// Number of blocks in this upgrade response
pub length: u64,
/// TODO: Document
/// The nodes of the merkle tree
pub nodes: Vec<Node>,
/// TODO: Document
pub additional_nodes: Vec<Node>,
Expand Down
62 changes: 54 additions & 8 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ pub struct Hypercore {
pub(crate) bitfield: Bitfield,
skip_flush_count: u8, // autoFlush in Javascript
header: Header,
#[cfg(feature = "replication")]
events: crate::replication::events::Events,
}

/// Response from append, matches that of the Javascript result
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub struct AppendOutcome {
/// Length of the hypercore after append
pub length: u64,
Expand All @@ -60,7 +62,7 @@ pub struct AppendOutcome {
}

/// Info about the hypercore
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub struct Info {
/// Length of the hypercore
pub length: u64,
Expand Down Expand Up @@ -247,6 +249,8 @@ impl Hypercore {
bitfield,
header,
skip_flush_count: 0,
#[cfg(feature = "replication")]
events: crate::replication::events::Events::new(),
})
}

Expand Down Expand Up @@ -321,6 +325,14 @@ impl Hypercore {
if self.should_flush_bitfield_and_tree_and_oplog() {
self.flush_bitfield_and_tree_and_oplog(false).await?;
}

#[cfg(feature = "replication")]
{
let _ = self.events.send(crate::replication::events::DataUpgrade {});
let _ = self
.events
.send(crate::replication::events::Have::from(&bitfield_update));
}
}

// Return the new value
Expand All @@ -330,10 +342,27 @@ impl Hypercore {
})
}

#[cfg(feature = "replication")]
/// Subscribe to core events relevant to replication
pub fn event_subscribe(&self) -> async_broadcast::Receiver<crate::replication::events::Event> {
self.events.channel.new_receiver()
}

/// Check if core has the block at the given `index` locally
#[instrument(ret, skip(self))]
pub fn has(&self, index: u64) -> bool {
self.bitfield.get(index)
}

/// Read value at given index, if any.
#[instrument(err, skip(self))]
pub async fn get(&mut self, index: u64) -> Result<Option<Vec<u8>>, HypercoreError> {
if !self.bitfield.get(index) {
#[cfg(feature = "replication")]
// if not in this core, emit Event::Get(index)
{
self.events.send_on_get(index);
}
return Ok(None);
}

Expand Down Expand Up @@ -522,12 +551,12 @@ impl Hypercore {
self.storage.flush_infos(&outcome.infos_to_flush).await?;
self.header = outcome.header;

if let Some(bitfield_update) = bitfield_update {
if let Some(bitfield_update) = &bitfield_update {
// Write to bitfield
self.bitfield.update(&bitfield_update);
self.bitfield.update(bitfield_update);

// Contiguous length is known only now
update_contiguous_length(&mut self.header, &self.bitfield, &bitfield_update);
update_contiguous_length(&mut self.header, &self.bitfield, bitfield_update);
}

// Commit changeset to in-memory tree
Expand All @@ -537,6 +566,21 @@ impl Hypercore {
if self.should_flush_bitfield_and_tree_and_oplog() {
self.flush_bitfield_and_tree_and_oplog(false).await?;
}

#[cfg(feature = "replication")]
{
if proof.upgrade.is_some() {
// Notify replicator if we receieved an upgrade
let _ = self.events.send(crate::replication::events::DataUpgrade {});
}

// Notify replicator if we receieved a bitfield update
if let Some(ref bitfield) = bitfield_update {
let _ = self
.events
.send(crate::replication::events::Have::from(bitfield));
}
}
Ok(true)
}

Expand Down Expand Up @@ -725,7 +769,7 @@ fn update_contiguous_length(
}

#[cfg(test)]
mod tests {
pub(crate) mod tests {
use super::*;

#[async_std::test]
Expand Down Expand Up @@ -1091,7 +1135,9 @@ mod tests {
Ok(())
}

async fn create_hypercore_with_data(length: u64) -> Result<Hypercore, HypercoreError> {
pub(crate) async fn create_hypercore_with_data(
length: u64,
) -> Result<Hypercore, HypercoreError> {
let signing_key = generate_signing_key();
create_hypercore_with_data_and_key_pair(
length,
Expand All @@ -1103,7 +1149,7 @@ mod tests {
.await
}

async fn create_hypercore_with_data_and_key_pair(
pub(crate) async fn create_hypercore_with_data_and_key_pair(
length: u64,
key_pair: PartialKeypair,
) -> Result<Hypercore, HypercoreError> {
Expand Down
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![forbid(unsafe_code, bad_style, future_incompatible)]
#![forbid(unsafe_code, future_incompatible)]
#![forbid(rust_2018_idioms, rust_2018_compatibility)]
#![forbid(missing_debug_implementations)]
#![forbid(missing_docs)]
Expand Down Expand Up @@ -74,6 +74,8 @@
pub mod encoding;
pub mod prelude;
#[cfg(feature = "replication")]
pub mod replication;

mod bitfield;
mod builder;
Expand Down
Loading

0 comments on commit c146362

Please sign in to comment.