From 9ba2f1c97fce782200a9a888988dbaa8186ca2e9 Mon Sep 17 00:00:00 2001 From: Blake Griffith <blake.a.griffith@gmail.com> Date: Mon, 21 Oct 2024 12:38:53 -0400 Subject: [PATCH] Add traits and their impls we need for replication --- src/replication/mod.rs | 231 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 231 insertions(+) create mode 100644 src/replication/mod.rs diff --git a/src/replication/mod.rs b/src/replication/mod.rs new file mode 100644 index 0000000..8d2d751 --- /dev/null +++ b/src/replication/mod.rs @@ -0,0 +1,231 @@ +//! External interface for replication +pub mod events; + +use crate::{ + AppendOutcome, Hypercore, HypercoreError, Info, PartialKeypair, Proof, RequestBlock, + RequestSeek, RequestUpgrade, +}; + +pub use events::Event; + +use tokio::sync::{broadcast::Receiver, Mutex}; + +use std::future::Future; +use std::sync::Arc; +/// Hypercore that can have multiple owners +#[derive(Debug, Clone)] +pub struct SharedCore(pub Arc<Mutex<Hypercore>>); + +impl From<Hypercore> for SharedCore { + fn from(core: Hypercore) -> Self { + SharedCore(Arc::new(Mutex::new(core))) + } +} +impl SharedCore { + /// Create a shared core from a [`Hypercore`] + pub fn from_hypercore(core: Hypercore) -> Self { + SharedCore(Arc::new(Mutex::new(core))) + } +} + +/// Methods related to just this core's information +pub trait CoreInfo { + /// Get core info (see: [`Hypercore::info`] + fn info(&self) -> impl Future<Output = Info> + Send; + /// Get the key_pair (see: [`Hypercore::key_pair`] + fn key_pair(&self) -> impl Future<Output = PartialKeypair> + Send; +} + +impl CoreInfo for SharedCore { + fn info(&self) -> impl Future<Output = Info> + Send { + async move { + let core = &self.0.lock().await; + core.info() + } + } + + fn key_pair(&self) -> impl Future<Output = PartialKeypair> + Send { + async move { + let core = &self.0.lock().await; + core.key_pair().clone() + } + } +} + +impl CoreInfo for Hypercore { + fn info(&self) -> impl Future<Output = Info> + Send { + async move { self.info() } + } + + fn key_pair(&self) -> impl Future<Output = PartialKeypair> + Send { + async move { self.key_pair().clone() } + } +} + +/// Error for ReplicationMethods trait +#[derive(thiserror::Error, Debug)] +pub enum ReplicationMethodsError { + /// Error from hypercore + #[error("Got a hypercore error: [{0}]")] + HypercoreError(#[from] HypercoreError), +} + +/// Methods needed for replication +pub trait ReplicationMethods: CoreInfo + Send { + /// ref Core::verify_and_apply_proof + fn verify_and_apply_proof( + &self, + proof: &Proof, + ) -> impl Future<Output = Result<bool, ReplicationMethodsError>> + Send; + /// ref Core::missing_nodes + fn missing_nodes( + &self, + index: u64, + ) -> impl Future<Output = Result<u64, ReplicationMethodsError>> + Send; + /// ref Core::create_proof + fn create_proof( + &self, + block: Option<RequestBlock>, + hash: Option<RequestBlock>, + seek: Option<RequestSeek>, + upgrade: Option<RequestUpgrade>, + ) -> impl Future<Output = Result<Option<Proof>, ReplicationMethodsError>> + Send; + /// subscribe to core events + fn event_subscribe(&self) -> impl Future<Output = Receiver<Event>>; +} + +impl ReplicationMethods for SharedCore { + fn verify_and_apply_proof( + &self, + proof: &Proof, + ) -> impl Future<Output = Result<bool, ReplicationMethodsError>> { + async move { + let mut core = self.0.lock().await; + Ok(core.verify_and_apply_proof(proof).await?) + } + } + + fn missing_nodes( + &self, + index: u64, + ) -> impl Future<Output = Result<u64, ReplicationMethodsError>> { + async move { + let mut core = self.0.lock().await; + Ok(core.missing_nodes(index).await?) + } + } + + fn create_proof( + &self, + block: Option<RequestBlock>, + hash: Option<RequestBlock>, + seek: Option<RequestSeek>, + upgrade: Option<RequestUpgrade>, + ) -> impl Future<Output = Result<Option<Proof>, ReplicationMethodsError>> { + async move { + let mut core = self.0.lock().await; + Ok(core.create_proof(block, hash, seek, upgrade).await?) + } + } + + fn event_subscribe(&self) -> impl Future<Output = Receiver<Event>> { + async move { self.0.lock().await.event_subscribe() } + } +} + +/// Error for ReplicationMethods trait +#[derive(thiserror::Error, Debug)] +pub enum CoreMethodsError { + /// Error from hypercore + #[error("Got a hypercore error [{0}]")] + HypercoreError(#[from] HypercoreError), +} + +/// Trait for things that consume [`crate::Hypercore`] can instead use this trait +/// so they can use all Hypercore-like things such as [`SharedCore`]. +pub trait CoreMethods: CoreInfo { + /// Check if the core has the block at the given index locally + fn has(&self, index: u64) -> impl Future<Output = bool> + Send; + + /// get a block + fn get( + &self, + index: u64, + ) -> impl Future<Output = Result<Option<Vec<u8>>, CoreMethodsError>> + Send; + + /// Append data to the core + fn append( + &self, + data: &[u8], + ) -> impl Future<Output = Result<AppendOutcome, CoreMethodsError>> + Send; + + /// Append a batch of data to the core + fn append_batch<A: AsRef<[u8]>, B: AsRef<[A]> + Send>( + &self, + batch: B, + ) -> impl Future<Output = Result<AppendOutcome, CoreMethodsError>> + Send; +} + +impl CoreMethods for SharedCore { + fn has(&self, index: u64) -> impl Future<Output = bool> + Send { + async move { + let core = self.0.lock().await; + core.has(index) + } + } + fn get( + &self, + index: u64, + ) -> impl Future<Output = Result<Option<Vec<u8>>, CoreMethodsError>> + Send { + async move { + let mut core = self.0.lock().await; + Ok(core.get(index).await?) + } + } + + fn append( + &self, + data: &[u8], + ) -> impl Future<Output = Result<AppendOutcome, CoreMethodsError>> + Send { + async move { + let mut core = self.0.lock().await; + Ok(core.append(data).await?) + } + } + + fn append_batch<A: AsRef<[u8]>, B: AsRef<[A]> + Send>( + &self, + batch: B, + ) -> impl Future<Output = Result<AppendOutcome, CoreMethodsError>> + Send { + async move { + let mut core = self.0.lock().await; + Ok(core.append_batch(batch).await?) + } + } +} + +impl CoreMethods for Hypercore { + fn has(&self, index: u64) -> impl Future<Output = bool> + Send { + async move { self.has(index) } + } + fn get( + &self, + index: u64, + ) -> impl Future<Output = Result<Option<Vec<u8>>, CoreMethodsError>> + Send { + async move { Ok(self.get(index).await?) } + } + + fn append( + &self, + data: &[u8], + ) -> impl Future<Output = Result<AppendOutcome, CoreMethodsError>> + Send { + async move { Ok(self.append(data).await?) } + } + + fn append_batch<A: AsRef<[u8]>, B: AsRef<[A]> + Send>( + &self, + batch: B, + ) -> impl Future<Output = Result<AppendOutcome, CoreMethodsError>> + Send { + async move { Ok(self.append_batch(batch).await?) } + } +}