Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add events needed for replication #142

Merged
merged 24 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
41a8875
Add `replication` feature
cowlicks Oct 21, 2024
f2572da
Add events related to replication
cowlicks Oct 21, 2024
a2be0f6
Add traits and their impls we need for replication
cowlicks Oct 21, 2024
f5e4002
Use replication feature within Hypercore
cowlicks Oct 21, 2024
fdf5907
Fix TODO's in docs
cowlicks Oct 21, 2024
1ea24d4
use reg-comments in code
cowlicks Oct 22, 2024
237d089
rm hypercore impls
cowlicks Oct 22, 2024
4952d93
rm using tokio::spawn
cowlicks Oct 22, 2024
df35b15
Replace tokio's broadcast with async_broadcast.
cowlicks Oct 22, 2024
6654972
rm unneeded tokio feature checks
cowlicks Oct 22, 2024
f79f0b2
add async-lock use it to rm tokio
cowlicks Oct 22, 2024
2f6dc46
use try_broadcast and set_await_active = false
cowlicks Oct 22, 2024
b6f9fa1
fix issue where channel was closing
cowlicks Oct 22, 2024
d4e756c
Derive PartialEq for AppendOutcome & Info
cowlicks Oct 22, 2024
cdc52b8
docs
cowlicks Oct 22, 2024
25c722a
pub create_hypercore_with_data available for tests
cowlicks Oct 22, 2024
b52b25a
Add tests for SharedCore methods and events
cowlicks Oct 22, 2024
d6dd673
Move shared_core to own module and add as feature
cowlicks Oct 23, 2024
659275d
Add shared-core tests to CI
cowlicks Oct 24, 2024
745165d
create_core_with_data_and_key pub(crate) for tests
cowlicks Oct 23, 2024
7626a17
move test_events to events module
cowlicks Oct 23, 2024
384ea8c
Add From<CoreMethodsError> for ReplMethErr
cowlicks Oct 23, 2024
fea70bb
Add test for replication methods
cowlicks Oct 23, 2024
5725b38
fix doc errors
cowlicks Oct 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ jobs:
cargo check --all-targets --no-default-features --features async-std,sparse
cargo check --all-targets --no-default-features --features async-std,sparse,cache
cargo test --no-default-features --features js_interop_tests,tokio
cargo test --no-default-features --features js_interop_tests,tokio,shared-core
cargo test --no-default-features --features js_interop_tests,tokio,sparse
cargo test --no-default-features --features js_interop_tests,tokio,sparse,cache
cargo test --no-default-features --features js_interop_tests,async-std
cargo test --no-default-features --features js_interop_tests,async-std,shared-core
cargo test --no-default-features --features js_interop_tests,async-std,sparse
cargo test --no-default-features --features js_interop_tests,async-std,sparse,cache
cargo test --benches --no-default-features --features tokio
Expand All @@ -64,9 +66,11 @@ jobs:
cargo check --all-targets --no-default-features --features async-std,sparse
cargo check --all-targets --no-default-features --features async-std,sparse,cache
cargo test --no-default-features --features tokio
cargo test --no-default-features --features tokio,shared-core
cargo test --no-default-features --features tokio,sparse
cargo test --no-default-features --features tokio,sparse,cache
cargo test --no-default-features --features async-std
cargo test --no-default-features --features async-std,shared-core
cargo test --no-default-features --features async-std,sparse
cargo test --no-default-features --features async-std,sparse,cache
cargo test --benches --no-default-features --features tokio
Expand All @@ -89,9 +93,11 @@ jobs:
cargo check --all-targets --no-default-features --features async-std,sparse
cargo check --all-targets --no-default-features --features async-std,sparse,cache
cargo test --no-default-features --features js_interop_tests,tokio
cargo test --no-default-features --features js_interop_tests,tokio,shared-core
cargo test --no-default-features --features js_interop_tests,tokio,sparse
cargo test --no-default-features --features js_interop_tests,tokio,sparse,cache
cargo test --no-default-features --features js_interop_tests,async-std
cargo test --no-default-features --features js_interop_tests,async-std,shared-core
cargo test --no-default-features --features js_interop_tests,async-std,sparse
cargo test --no-default-features --features js_interop_tests,async-std,sparse,cache
cargo test --benches --no-default-features --features tokio
Expand Down
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ futures = "0.3"
crc32fast = "1"
intmap = "2"
moka = { version = "0.12", optional = true, features = ["sync"] }
async-broadcast = { version = "0.7.1", optional = true }
async-lock = {version = "3.4.0", optional = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
random-access-disk = { version = "3", default-features = false }
Expand All @@ -59,7 +61,9 @@ test-log = { version = "0.2.11", default-features = false, features = ["trace"]
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] }

[features]
default = ["tokio", "sparse"]
default = ["tokio", "sparse", "replication"]
replication = ["dep:async-broadcast"]
shared-core = ["replication", "dep:async-lock"]
sparse = ["random-access-disk/sparse"]
tokio = ["random-access-disk/tokio"]
async-std = ["random-access-disk/async-std"]
Expand Down
7 changes: 6 additions & 1 deletion src/common/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,21 @@ pub(crate) struct NodeByteRange {
pub(crate) length: u64,
}

/// Nodes that are persisted to disk.
/// Nodes of the Merkle Tree that are persisted to disk.
// TODO: replace `hash: Vec<u8>` with `hash: Hash`. This requires patching /
// rewriting the Blake2b crate to support `.from_bytes()` to serialize from
// disk.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Node {
/// This node's index in the Merkle tree
pub(crate) index: u64,
/// Hash of the data in this node
pub(crate) hash: Vec<u8>,
/// Number of bytes in this [`Node::data`]
pub(crate) length: u64,
/// Index of this nodes parent
pub(crate) parent: u64,
/// Hypercore's data. Can be receieved after the rest of the node, so it's optional.
pub(crate) data: Option<Vec<u8>>,
pub(crate) blank: bool,
}
Expand Down
11 changes: 6 additions & 5 deletions src/common/peer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Types needed for passing information with with peers.
//! hypercore-protocol-rs uses these types and wraps them
//! into wire messages.

use crate::Node;

#[derive(Debug, Clone, PartialEq)]
Expand All @@ -20,7 +21,7 @@ pub struct RequestSeek {
}

#[derive(Debug, Clone, PartialEq)]
/// Request of a DataUpgrade from peer
/// Request for a DataUpgrade from peer
pub struct RequestUpgrade {
/// Hypercore start index
pub start: u64,
Expand Down Expand Up @@ -79,7 +80,7 @@ pub struct DataBlock {
pub index: u64,
/// Data block value in bytes
pub value: Vec<u8>,
/// TODO: document
/// Nodes of the merkle tree
pub nodes: Vec<Node>,
}

Expand All @@ -104,11 +105,11 @@ pub struct DataSeek {
#[derive(Debug, Clone, PartialEq)]
/// TODO: Document
pub struct DataUpgrade {
/// TODO: Document
/// Starting block of this upgrade response
pub start: u64,
/// TODO: Document
/// Number of blocks in this upgrade response
pub length: u64,
/// TODO: Document
/// The nodes of the merkle tree
pub nodes: Vec<Node>,
/// TODO: Document
pub additional_nodes: Vec<Node>,
Expand Down
62 changes: 54 additions & 8 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ pub struct Hypercore {
pub(crate) bitfield: Bitfield,
skip_flush_count: u8, // autoFlush in Javascript
header: Header,
#[cfg(feature = "replication")]
events: crate::replication::events::Events,
}

/// Response from append, matches that of the Javascript result
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub struct AppendOutcome {
/// Length of the hypercore after append
pub length: u64,
Expand All @@ -60,7 +62,7 @@ pub struct AppendOutcome {
}

/// Info about the hypercore
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub struct Info {
/// Length of the hypercore
pub length: u64,
Expand Down Expand Up @@ -247,6 +249,8 @@ impl Hypercore {
bitfield,
header,
skip_flush_count: 0,
#[cfg(feature = "replication")]
events: crate::replication::events::Events::new(),
})
}

Expand Down Expand Up @@ -321,6 +325,14 @@ impl Hypercore {
if self.should_flush_bitfield_and_tree_and_oplog() {
self.flush_bitfield_and_tree_and_oplog(false).await?;
}

#[cfg(feature = "replication")]
{
let _ = self.events.send(crate::replication::events::DataUpgrade {});
let _ = self
.events
.send(crate::replication::events::Have::from(&bitfield_update));
}
}

// Return the new value
Expand All @@ -330,10 +342,27 @@ impl Hypercore {
})
}

#[cfg(feature = "replication")]
/// Subscribe to core events relevant to replication
pub fn event_subscribe(&self) -> async_broadcast::Receiver<crate::replication::events::Event> {
self.events.channel.new_receiver()
}

/// Check if core has the block at the given `index` locally
#[instrument(ret, skip(self))]
pub fn has(&self, index: u64) -> bool {
self.bitfield.get(index)
}

/// Read value at given index, if any.
#[instrument(err, skip(self))]
pub async fn get(&mut self, index: u64) -> Result<Option<Vec<u8>>, HypercoreError> {
if !self.bitfield.get(index) {
#[cfg(feature = "replication")]
// if not in this core, emit Event::Get(index)
{
self.events.send_on_get(index);
}
return Ok(None);
}

Expand Down Expand Up @@ -522,12 +551,12 @@ impl Hypercore {
self.storage.flush_infos(&outcome.infos_to_flush).await?;
self.header = outcome.header;

if let Some(bitfield_update) = bitfield_update {
if let Some(bitfield_update) = &bitfield_update {
// Write to bitfield
self.bitfield.update(&bitfield_update);
self.bitfield.update(bitfield_update);

// Contiguous length is known only now
update_contiguous_length(&mut self.header, &self.bitfield, &bitfield_update);
update_contiguous_length(&mut self.header, &self.bitfield, bitfield_update);
}

// Commit changeset to in-memory tree
Expand All @@ -537,6 +566,21 @@ impl Hypercore {
if self.should_flush_bitfield_and_tree_and_oplog() {
self.flush_bitfield_and_tree_and_oplog(false).await?;
}

#[cfg(feature = "replication")]
{
if proof.upgrade.is_some() {
// Notify replicator if we receieved an upgrade
let _ = self.events.send(crate::replication::events::DataUpgrade {});
}

// Notify replicator if we receieved a bitfield update
if let Some(ref bitfield) = bitfield_update {
let _ = self
.events
.send(crate::replication::events::Have::from(bitfield));
}
}
Ok(true)
}

Expand Down Expand Up @@ -725,7 +769,7 @@ fn update_contiguous_length(
}

#[cfg(test)]
mod tests {
pub(crate) mod tests {
use super::*;

#[async_std::test]
Expand Down Expand Up @@ -1091,7 +1135,9 @@ mod tests {
Ok(())
}

async fn create_hypercore_with_data(length: u64) -> Result<Hypercore, HypercoreError> {
pub(crate) async fn create_hypercore_with_data(
length: u64,
) -> Result<Hypercore, HypercoreError> {
let signing_key = generate_signing_key();
create_hypercore_with_data_and_key_pair(
length,
Expand All @@ -1103,7 +1149,7 @@ mod tests {
.await
}

async fn create_hypercore_with_data_and_key_pair(
pub(crate) async fn create_hypercore_with_data_and_key_pair(
length: u64,
key_pair: PartialKeypair,
) -> Result<Hypercore, HypercoreError> {
Expand Down
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![forbid(unsafe_code, bad_style, future_incompatible)]
#![forbid(unsafe_code, future_incompatible)]
#![forbid(rust_2018_idioms, rust_2018_compatibility)]
#![forbid(missing_debug_implementations)]
#![forbid(missing_docs)]
Expand Down Expand Up @@ -74,6 +74,8 @@

pub mod encoding;
pub mod prelude;
#[cfg(feature = "replication")]
pub mod replication;

mod bitfield;
mod builder;
Expand Down
Loading
Loading