diff --git a/packages/ciphernode/Cargo.lock b/packages/ciphernode/Cargo.lock index e94bcb33..f4882352 100644 --- a/packages/ciphernode/Cargo.lock +++ b/packages/ciphernode/Cargo.lock @@ -2211,6 +2211,7 @@ dependencies = [ "enclave-core", "enclave_node", "hex", + "once_cell", "router", "rpassword", "tokio", @@ -2231,6 +2232,7 @@ dependencies = [ "bincode", "bs58", "futures-util", + "lazy_static", "serde", "sha2", ] @@ -2352,10 +2354,13 @@ dependencies = [ "alloy", "alloy-primitives 0.6.4", "anyhow", + "async-trait", "cipher 0.1.0", "data", "enclave-core", + "enclave_node", "futures-util", + "serde", "sortition", "tokio", "tracing", @@ -4445,9 +4450,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.19.0" +version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" [[package]] name = "opaque-debug" @@ -5335,6 +5340,7 @@ dependencies = [ "cipher 0.1.0", "data", "enclave-core", + "evm", "fhe 0.1.0", "keyshare", "serde", diff --git a/packages/ciphernode/Cargo.toml b/packages/ciphernode/Cargo.toml index 1fcc7534..2765678e 100644 --- a/packages/ciphernode/Cargo.toml +++ b/packages/ciphernode/Cargo.toml @@ -44,6 +44,7 @@ fhe-util = { git = "https://github.com/gnosisguild/fhe.rs", version = "0.1.0-bet futures = "0.3.30" futures-util = "0.3" hex = "0.4.3" +lazy_static = "1.5.0" num = "0.4.3" rand_chacha = "0.3.1" rand = "0.8.5" diff --git a/packages/ciphernode/README.md b/packages/ciphernode/README.md index cb035f5b..312b7e00 100644 --- a/packages/ciphernode/README.md +++ b/packages/ciphernode/README.md @@ -74,3 +74,36 @@ sequenceDiagram PTA--)PTA: Stop KS--)-KS: Stop ``` + +# Debugging + +You can debug using the `RUST_LOG` environment var to alter what output is produced by the node + + +``` +RUST_LOG=info enclave start +``` + +if you supply a tag as an argument you can filter for that tag + +``` +RUST_LOG="[sortition{id=cn1}]" enclave start --tag cn1 +``` + +This helps filter noise during tests where you might have multiple instances running and you need to see the output of a specific one. + +In order to add tracing to a method or function it is recommended to use the `instrument` macro. + +```rust +impl Sorition { + // ... + #[instrument(name="sortition", skip_all, fields(id = get_tag()))] + pub async fn attach( + bus: &Addr, + store: Repository, + ) -> Result> { + // ... + } +} +``` + diff --git a/packages/ciphernode/aggregator/src/plaintext_aggregator.rs b/packages/ciphernode/aggregator/src/plaintext_aggregator.rs index c65e8e55..fc5bdf60 100644 --- a/packages/ciphernode/aggregator/src/plaintext_aggregator.rs +++ b/packages/ciphernode/aggregator/src/plaintext_aggregator.rs @@ -236,7 +236,7 @@ impl FromSnapshotWithParams for PlaintextAggregator { } impl Checkpoint for PlaintextAggregator { - fn repository(&self) -> Repository { - self.store.clone() + fn repository(&self) -> &Repository { + &self.store } } diff --git a/packages/ciphernode/aggregator/src/publickey_aggregator.rs b/packages/ciphernode/aggregator/src/publickey_aggregator.rs index 8f19c21f..e5e3b640 100644 --- a/packages/ciphernode/aggregator/src/publickey_aggregator.rs +++ b/packages/ciphernode/aggregator/src/publickey_aggregator.rs @@ -261,7 +261,7 @@ impl FromSnapshotWithParams for PublicKeyAggregator { } impl Checkpoint for PublicKeyAggregator { - fn repository(&self) -> Repository { - self.store.clone() + fn repository(&self) -> &Repository { + &self.store } } diff --git a/packages/ciphernode/config/src/app_config.rs b/packages/ciphernode/config/src/app_config.rs index fb1c0576..dc02e724 100644 --- a/packages/ciphernode/config/src/app_config.rs +++ b/packages/ciphernode/config/src/app_config.rs @@ -10,11 +10,39 @@ use std::{ path::{Path, PathBuf}, }; +#[derive(Debug, Deserialize, Serialize, PartialEq)] +#[serde(untagged)] +pub enum Contract { + Full { + address: String, + deploy_block: Option, + }, + AddressOnly(String), +} + +impl Contract { + pub fn address(&self) -> &String { + use Contract::*; + match self { + Full { address, .. } => address, + AddressOnly(v) => v, + } + } + + pub fn deploy_block(&self) -> Option { + use Contract::*; + match self { + Full { deploy_block, .. } => deploy_block.clone(), + AddressOnly(_) => None, + } + } +} + #[derive(Debug, Deserialize, Serialize)] pub struct ContractAddresses { - pub enclave: String, - pub ciphernode_registry: String, - pub filter_registry: String, + pub enclave: Contract, + pub ciphernode_registry: Contract, + pub filter_registry: Contract, } #[derive(Debug, Deserialize, Serialize)] @@ -297,7 +325,9 @@ chains: rpc_url: "ws://localhost:8545" contracts: enclave: "0x9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0" - ciphernode_registry: "0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9" + ciphernode_registry: + address: "0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9" + deploy_block: 1764352873645 filter_registry: "0xDc64a140Aa3E981100a9becA4E685f962f0cF6C9" "#, )?; @@ -308,10 +338,18 @@ chains: assert_eq!(chain.name, "hardhat"); assert_eq!(chain.rpc_url, "ws://localhost:8545"); assert_eq!( - chain.contracts.enclave, + chain.contracts.enclave.address(), "0x9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0" ); - + assert_eq!( + chain.contracts.ciphernode_registry.address(), + "0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9" + ); + assert_eq!(chain.contracts.enclave.deploy_block(), None); + assert_eq!( + chain.contracts.ciphernode_registry.deploy_block(), + Some(1764352873645) + ); Ok(()) }); } diff --git a/packages/ciphernode/core/Cargo.toml b/packages/ciphernode/core/Cargo.toml index ede42cc4..b77f4045 100644 --- a/packages/ciphernode/core/Cargo.toml +++ b/packages/ciphernode/core/Cargo.toml @@ -19,3 +19,4 @@ alloy = { workspace = true } alloy-primitives = { workspace = true } alloy-sol-types = { workspace = true } anyhow = { workspace = true } +lazy_static = { workspace = true } diff --git a/packages/ciphernode/core/src/events.rs b/packages/ciphernode/core/src/events.rs index 96093ad3..276cd80c 100644 --- a/packages/ciphernode/core/src/events.rs +++ b/packages/ciphernode/core/src/events.rs @@ -56,7 +56,7 @@ impl TryFrom for U256 { pub struct EventId(pub [u8; 32]); impl EventId { - fn from(value: T) -> Self { + pub fn hash(value: T) -> Self { let mut hasher = Sha256::new(); let mut std_hasher = DefaultHasher::new(); value.hash(&mut std_hasher); @@ -224,7 +224,7 @@ pub trait FromError { impl From for EnclaveEvent { fn from(data: KeyshareCreated) -> Self { EnclaveEvent::KeyshareCreated { - id: EventId::from(data.clone()), + id: EventId::hash(data.clone()), data: data.clone(), } } @@ -233,7 +233,7 @@ impl From for EnclaveEvent { impl From for EnclaveEvent { fn from(data: E3Requested) -> Self { EnclaveEvent::E3Requested { - id: EventId::from(data.clone()), + id: EventId::hash(data.clone()), data: data.clone(), } } @@ -242,7 +242,7 @@ impl From for EnclaveEvent { impl From for EnclaveEvent { fn from(data: PublicKeyAggregated) -> Self { EnclaveEvent::PublicKeyAggregated { - id: EventId::from(data.clone()), + id: EventId::hash(data.clone()), data: data.clone(), } } @@ -251,7 +251,7 @@ impl From for EnclaveEvent { impl From for EnclaveEvent { fn from(data: CiphertextOutputPublished) -> Self { EnclaveEvent::CiphertextOutputPublished { - id: EventId::from(data.clone()), + id: EventId::hash(data.clone()), data: data.clone(), } } @@ -260,7 +260,7 @@ impl From for EnclaveEvent { impl From for EnclaveEvent { fn from(data: DecryptionshareCreated) -> Self { EnclaveEvent::DecryptionshareCreated { - id: EventId::from(data.clone()), + id: EventId::hash(data.clone()), data: data.clone(), } } @@ -269,7 +269,7 @@ impl From for EnclaveEvent { impl From for EnclaveEvent { fn from(data: PlaintextAggregated) -> Self { EnclaveEvent::PlaintextAggregated { - id: EventId::from(data.clone()), + id: EventId::hash(data.clone()), data: data.clone(), } } @@ -278,7 +278,7 @@ impl From for EnclaveEvent { impl From for EnclaveEvent { fn from(data: E3RequestComplete) -> Self { EnclaveEvent::E3RequestComplete { - id: EventId::from(data.clone()), + id: EventId::hash(data.clone()), data: data.clone(), } } @@ -287,7 +287,7 @@ impl From for EnclaveEvent { impl From for EnclaveEvent { fn from(data: CiphernodeSelected) -> Self { EnclaveEvent::CiphernodeSelected { - id: EventId::from(data.clone()), + id: EventId::hash(data.clone()), data: data.clone(), } } @@ -296,7 +296,7 @@ impl From for EnclaveEvent { impl From for EnclaveEvent { fn from(data: CiphernodeAdded) -> Self { EnclaveEvent::CiphernodeAdded { - id: EventId::from(data.clone()), + id: EventId::hash(data.clone()), data: data.clone(), } } @@ -305,7 +305,7 @@ impl From for EnclaveEvent { impl From for EnclaveEvent { fn from(data: CiphernodeRemoved) -> Self { EnclaveEvent::CiphernodeRemoved { - id: EventId::from(data.clone()), + id: EventId::hash(data.clone()), data: data.clone(), } } @@ -314,7 +314,7 @@ impl From for EnclaveEvent { impl From for EnclaveEvent { fn from(data: EnclaveError) -> Self { EnclaveEvent::EnclaveError { - id: EventId::from(data.clone()), + id: EventId::hash(data.clone()), data: data.clone(), } } @@ -323,7 +323,7 @@ impl From for EnclaveEvent { impl From for EnclaveEvent { fn from(data: Shutdown) -> Self { EnclaveEvent::Shutdown { - id: EventId::from(data.clone()), + id: EventId::hash(data.clone()), data: data.clone(), } } @@ -332,7 +332,7 @@ impl From for EnclaveEvent { impl From for EnclaveEvent { fn from(value: TestEvent) -> Self { EnclaveEvent::TestEvent { - id: EventId::from(value.clone()), + id: EventId::hash(value.clone()), data: value.clone(), } } @@ -552,6 +552,7 @@ impl Display for Shutdown { #[rtype(result = "()")] pub struct TestEvent { pub msg: String, + pub entropy: u64, } #[cfg(test)] diff --git a/packages/ciphernode/core/src/lib.rs b/packages/ciphernode/core/src/lib.rs index 154d1461..0084e902 100644 --- a/packages/ciphernode/core/src/lib.rs +++ b/packages/ciphernode/core/src/lib.rs @@ -5,7 +5,9 @@ mod eventbus; mod events; mod ordered_set; +mod tag; pub use eventbus::*; pub use events::*; pub use ordered_set::*; +pub use tag::*; diff --git a/packages/ciphernode/core/src/tag.rs b/packages/ciphernode/core/src/tag.rs new file mode 100644 index 00000000..7b67ebba --- /dev/null +++ b/packages/ciphernode/core/src/tag.rs @@ -0,0 +1,21 @@ +//! Tag management for EVM event processing. +//! +//! This module provides thread-safe access to a global string tag that's used to +//! differentiate between different EVM contract instances during event processing. +//! The tag helps track and manage historical and live events for specific contracts. + +use std::sync::OnceLock; + +/// Global tag for contract event tracking with a default value of "_". +/// This tag is initialized once and remains constant throughout the lifecycle +/// of event processing to ensure consistent event tracking across restarts. +static TAG: OnceLock = OnceLock::new(); + +pub fn get_tag() -> String { + TAG.get().cloned().unwrap_or_else(|| String::from("_")) +} + +pub fn set_tag(new_tag: impl Into) -> Result<(), &'static str> { + TAG.set(new_tag.into()) + .map_err(|_| "Tag has already been initialized") +} diff --git a/packages/ciphernode/data/src/data_store.rs b/packages/ciphernode/data/src/data_store.rs index e5abfcbd..e81fc478 100644 --- a/packages/ciphernode/data/src/data_store.rs +++ b/packages/ciphernode/data/src/data_store.rs @@ -37,7 +37,7 @@ impl Get { } /// Generate proxy for the DB -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct DataStore { scope: Vec, get: Recipient, diff --git a/packages/ciphernode/data/src/repository.rs b/packages/ciphernode/data/src/repository.rs index 7cf1e445..b44f2b07 100644 --- a/packages/ciphernode/data/src/repository.rs +++ b/packages/ciphernode/data/src/repository.rs @@ -4,6 +4,7 @@ use anyhow::Result; use crate::DataStore; +#[derive(Debug)] pub struct Repository { store: DataStore, _p: PhantomData, diff --git a/packages/ciphernode/data/src/sled_store.rs b/packages/ciphernode/data/src/sled_store.rs index 41de208f..362cb74a 100644 --- a/packages/ciphernode/data/src/sled_store.rs +++ b/packages/ciphernode/data/src/sled_store.rs @@ -1,13 +1,14 @@ -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use crate::{Get, Insert}; -use actix::{Actor, Addr, Handler}; +use actix::{Actor, ActorContext, Addr, Handler}; use anyhow::{Context, Result}; -use enclave_core::{BusError, EnclaveErrorType, EventBus}; +use enclave_core::{BusError, EnclaveErrorType, EnclaveEvent, EventBus, Subscribe}; use sled::Db; +use tracing::{error, info}; pub struct SledStore { - db: SledDb, + db: Option, bus: Addr, } @@ -16,17 +17,24 @@ impl Actor for SledStore { } impl SledStore { - pub fn new(bus: &Addr, path: &PathBuf) -> Result { + pub fn new(bus: &Addr, path: &PathBuf) -> Result> { + info!("Starting SledStore"); let db = SledDb::new(path)?; - Ok(Self { - db, + + let store = Self { + db: Some(db), bus: bus.clone(), - }) + } + .start(); + + bus.do_send(Subscribe::new("Shutdown", store.clone().into())); + + Ok(store) } pub fn from_db(db: SledDb) -> Result { Ok(Self { - db, + db: Some(db), bus: EventBus::new(false).start(), }) } @@ -36,9 +44,11 @@ impl Handler for SledStore { type Result = (); fn handle(&mut self, event: Insert, _: &mut Self::Context) -> Self::Result { - match self.db.insert(event) { - Err(err) => self.bus.err(EnclaveErrorType::Data, err), - _ => (), + if let Some(ref mut db) = &mut self.db { + match db.insert(event) { + Err(err) => self.bus.err(EnclaveErrorType::Data, err), + _ => (), + } } } } @@ -47,13 +57,28 @@ impl Handler for SledStore { type Result = Option>; fn handle(&mut self, event: Get, _: &mut Self::Context) -> Self::Result { - return match self.db.get(event) { - Ok(v) => v, - Err(err) => { - self.bus.err(EnclaveErrorType::Data, err); - None - } - }; + if let Some(ref mut db) = &mut self.db { + return match db.get(event) { + Ok(v) => v, + Err(err) => { + self.bus.err(EnclaveErrorType::Data, err); + None + } + }; + } else { + error!("Attempt to get data from dropped db"); + None + } + } +} + +impl Handler for SledStore { + type Result = (); + fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { + if let EnclaveEvent::Shutdown { .. } = msg { + let _db = self.db.take(); // db will be dropped + ctx.stop() + } } } diff --git a/packages/ciphernode/data/src/snapshot.rs b/packages/ciphernode/data/src/snapshot.rs index cfd445a2..a0203f45 100644 --- a/packages/ciphernode/data/src/snapshot.rs +++ b/packages/ciphernode/data/src/snapshot.rs @@ -20,7 +20,7 @@ where /// This trait enables the self type to checkpoint its state pub trait Checkpoint: Snapshot { /// Declare the DataStore instance available on the object - fn repository(&self) -> Repository; + fn repository(&self) -> &Repository; /// Write the current snapshot to the `Repository` provided by `repository()` fn checkpoint(&self) { diff --git a/packages/ciphernode/enclave/Cargo.toml b/packages/ciphernode/enclave/Cargo.toml index 2e55621e..f0a93ab7 100644 --- a/packages/ciphernode/enclave/Cargo.toml +++ b/packages/ciphernode/enclave/Cargo.toml @@ -19,6 +19,7 @@ dialoguer = "0.11.0" enclave-core = { path = "../core" } enclave_node = { path = "../enclave_node" } hex = { workspace = true } +once_cell = "1.20.2" router = { path = "../router" } rpassword = "7.3.1" tokio = { workspace = true } diff --git a/packages/ciphernode/enclave/src/commands/aggregator/mod.rs b/packages/ciphernode/enclave/src/commands/aggregator/mod.rs index 58a12474..d52c5f64 100644 --- a/packages/ciphernode/enclave/src/commands/aggregator/mod.rs +++ b/packages/ciphernode/enclave/src/commands/aggregator/mod.rs @@ -3,7 +3,7 @@ use anyhow::*; use clap::Subcommand; use config::AppConfig; -#[derive(Subcommand)] +#[derive(Subcommand, Debug)] pub enum AggregatorCommands { /// Start the application as an aggregator Start { diff --git a/packages/ciphernode/enclave/src/commands/aggregator/start.rs b/packages/ciphernode/enclave/src/commands/aggregator/start.rs index b4b130d9..4ae1e9ce 100644 --- a/packages/ciphernode/enclave/src/commands/aggregator/start.rs +++ b/packages/ciphernode/enclave/src/commands/aggregator/start.rs @@ -1,10 +1,12 @@ use anyhow::*; use config::AppConfig; +use enclave_core::get_tag; use enclave_node::{listen_for_shutdown, setup_aggregator}; -use tracing::info; +use tracing::{info, instrument}; use crate::owo; +#[instrument(name="app", skip_all,fields(id = get_tag()))] pub async fn execute( config: AppConfig, pubkey_write_path: Option<&str>, @@ -12,10 +14,10 @@ pub async fn execute( ) -> Result<()> { owo(); - info!("LAUNCHING AGGREGATOR"); - - let (bus, handle) = setup_aggregator(config, pubkey_write_path, plaintext_write_path).await?; + let (bus, handle, peer_id) = + setup_aggregator(config, pubkey_write_path, plaintext_write_path).await?; + info!("LAUNCHING AGGREGATOR {}", peer_id); tokio::spawn(listen_for_shutdown(bus.into(), handle)); std::future::pending::<()>().await; diff --git a/packages/ciphernode/enclave/src/commands/mod.rs b/packages/ciphernode/enclave/src/commands/mod.rs index aebde76a..b6ddb039 100644 --- a/packages/ciphernode/enclave/src/commands/mod.rs +++ b/packages/ciphernode/enclave/src/commands/mod.rs @@ -8,7 +8,7 @@ use aggregator::AggregatorCommands; use clap::Subcommand; use wallet::WalletCommands; -#[derive(Subcommand)] +#[derive(Subcommand, Debug)] pub enum Commands { /// Start the application Start, diff --git a/packages/ciphernode/enclave/src/commands/password/mod.rs b/packages/ciphernode/enclave/src/commands/password/mod.rs index e9264881..8067bc1e 100644 --- a/packages/ciphernode/enclave/src/commands/password/mod.rs +++ b/packages/ciphernode/enclave/src/commands/password/mod.rs @@ -5,7 +5,7 @@ use anyhow::*; use clap::Subcommand; use config::AppConfig; -#[derive(Subcommand)] +#[derive(Subcommand, Debug)] pub enum PasswordCommands { /// Create a new password Create { diff --git a/packages/ciphernode/enclave/src/commands/start.rs b/packages/ciphernode/enclave/src/commands/start.rs index 14df1c64..3cea046d 100644 --- a/packages/ciphernode/enclave/src/commands/start.rs +++ b/packages/ciphernode/enclave/src/commands/start.rs @@ -1,21 +1,19 @@ +use crate::owo; use anyhow::{anyhow, Result}; use config::AppConfig; +use enclave_core::get_tag; use enclave_node::{listen_for_shutdown, setup_ciphernode}; -use tracing::info; - -use crate::owo; +use tracing::{info, instrument}; +#[instrument(name="app", skip_all,fields(id = get_tag()))] pub async fn execute(config: AppConfig) -> Result<()> { owo(); - - // let address = Address::parse_checksummed(&config.address(), None).context("Invalid address")?; let Some(address) = config.address() else { return Err(anyhow!("You must provide an address")); }; - info!("LAUNCHING CIPHERNODE: ({})", address); - - let (bus, handle) = setup_ciphernode(config, address).await?; + let (bus, handle, peer_id) = setup_ciphernode(config, address).await?; + info!("LAUNCHING CIPHERNODE: ({}/{})", address, peer_id); tokio::spawn(listen_for_shutdown(bus.into(), handle)); diff --git a/packages/ciphernode/enclave/src/commands/wallet/mod.rs b/packages/ciphernode/enclave/src/commands/wallet/mod.rs index b07b1cc6..43130405 100644 --- a/packages/ciphernode/enclave/src/commands/wallet/mod.rs +++ b/packages/ciphernode/enclave/src/commands/wallet/mod.rs @@ -3,7 +3,7 @@ use anyhow::*; use clap::Subcommand; use config::AppConfig; -#[derive(Subcommand)] +#[derive(Subcommand, Debug)] pub enum WalletCommands { /// Set a new Wallet Private Key Set { diff --git a/packages/ciphernode/enclave/src/main.rs b/packages/ciphernode/enclave/src/main.rs index b8ec1978..f687dff5 100644 --- a/packages/ciphernode/enclave/src/main.rs +++ b/packages/ciphernode/enclave/src/main.rs @@ -2,6 +2,9 @@ use anyhow::Result; use clap::Parser; use commands::{aggregator, password, start, wallet, Commands}; use config::load_config; +use enclave_core::{get_tag, set_tag}; +use tracing::instrument; +use tracing_subscriber::EnvFilter; pub mod commands; const OWO: &str = r#" @@ -24,7 +27,7 @@ pub fn owo() { println!("\n\n\n\n"); } -#[derive(Parser)] +#[derive(Parser, Debug)] #[command(name = "enclave")] #[command(about = "A CLI for interacting with Enclave the open-source protocol for Encrypted Execution Environments (E3)", long_about = None)] pub struct Cli { @@ -34,12 +37,15 @@ pub struct Cli { #[command(subcommand)] command: Commands, + + #[arg(short, long, global = true)] + tag: Option, } impl Cli { + #[instrument(skip(self),fields(id = get_tag()))] pub async fn execute(self) -> Result<()> { let config_path = self.config.as_deref(); - let config = load_config(config_path)?; match self.command { @@ -51,19 +57,37 @@ impl Cli { Ok(()) } + + pub fn get_tag(&self) -> String { + if let Some(tag) = self.tag.clone() { + tag + } else { + "default".to_string() + } + } } #[actix::main] pub async fn main() { - tracing_subscriber::fmt::init(); - + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + // .with_env_filter("error") + // .with_env_filter("[app{id=cn1}]=info") + // .with_env_filter("[app{id=cn2}]=info,libp2p_mdns::behaviour=error") + // .with_env_filter("[app{id=cn3}]=info") + // .with_env_filter("[app{id=cn4}]=info") + // .with_env_filter("[app{id=ag}]=info") + .init(); let cli = Cli::parse(); - match cli.execute().await { - Ok(_) => (), - Err(err) => { - eprintln!("{}", err); - std::process::exit(1); - } + // Set the tag for all future traces + if let Err(err) = set_tag(cli.get_tag()) { + eprintln!("{}", err); + } + + // Execute the cli + if let Err(err) = cli.execute().await { + eprintln!("{}", err); + std::process::exit(1); } } diff --git a/packages/ciphernode/enclave_node/src/aggregator.rs b/packages/ciphernode/enclave_node/src/aggregator.rs index 18d47c24..45ee6358 100644 --- a/packages/ciphernode/enclave_node/src/aggregator.rs +++ b/packages/ciphernode/enclave_node/src/aggregator.rs @@ -29,14 +29,14 @@ pub async fn setup_aggregator( config: AppConfig, pubkey_write_path: Option<&str>, plaintext_write_path: Option<&str>, -) -> Result<(Addr, JoinHandle<()>)> { +) -> Result<(Addr, JoinHandle<()>, String)> { let bus = EventBus::new(true).start(); let rng = Arc::new(Mutex::new( ChaCha20Rng::from_rng(OsRng).expect("Failed to create RNG"), )); let store = setup_datastore(&config, &bus)?; let repositories = store.repositories(); - let sortition = Sortition::attach(&bus, repositories.sortition()); + let sortition = Sortition::attach(&bus, repositories.sortition()).await?; let cipher = Arc::new(Cipher::from_config(&config).await?); let signer = get_signer_from_repository(repositories.eth_private_key(), &cipher).await?; @@ -49,16 +49,30 @@ pub async fn setup_aggregator( let read_provider = create_readonly_provider(&ensure_ws_rpc(rpc_url)).await?; let write_provider = create_provider_with_signer(&ensure_http_rpc(rpc_url), &signer).await?; + EnclaveSol::attach( &bus, &read_provider, &write_provider, - &chain.contracts.enclave, + &chain.contracts.enclave.address(), + &repositories.enclave_sol_reader(read_provider.get_chain_id()), + chain.contracts.enclave.deploy_block(), + ) + .await?; + RegistryFilterSol::attach( + &bus, + &write_provider, + &chain.contracts.filter_registry.address(), + ) + .await?; + CiphernodeRegistrySol::attach( + &bus, + &read_provider, + &chain.contracts.ciphernode_registry.address(), + &repositories.ciphernode_registry_reader(read_provider.get_chain_id()), + chain.contracts.ciphernode_registry.deploy_block(), ) .await?; - RegistryFilterSol::attach(&bus, &write_provider, &chain.contracts.filter_registry).await?; - CiphernodeRegistrySol::attach(&bus, &read_provider, &chain.contracts.ciphernode_registry) - .await?; } E3RequestRouter::builder(&bus, store) @@ -68,7 +82,7 @@ pub async fn setup_aggregator( .build() .await?; - let (_, join_handle) = P2p::spawn_libp2p(bus.clone()).expect("Failed to setup libp2p"); + let (_, join_handle, peer_id) = P2p::spawn_libp2p(bus.clone()).expect("Failed to setup libp2p"); if let Some(path) = pubkey_write_path { PublicKeyWriter::attach(path, bus.clone()); @@ -80,5 +94,5 @@ pub async fn setup_aggregator( SimpleLogger::attach("AGG", bus.clone()); - Ok((bus, join_handle)) + Ok((bus, join_handle, peer_id)) } diff --git a/packages/ciphernode/enclave_node/src/ciphernode.rs b/packages/ciphernode/enclave_node/src/ciphernode.rs index 2840429e..f1fd54cd 100644 --- a/packages/ciphernode/enclave_node/src/ciphernode.rs +++ b/packages/ciphernode/enclave_node/src/ciphernode.rs @@ -3,8 +3,7 @@ use alloy::primitives::Address; use anyhow::Result; use cipher::Cipher; use config::AppConfig; -use data::{DataStore, InMemStore, SledStore}; -use enclave_core::EventBus; +use enclave_core::{get_tag, EventBus}; use evm::{ helpers::{create_readonly_provider, ensure_ws_rpc}, CiphernodeRegistrySol, EnclaveSolReader, @@ -19,13 +18,15 @@ use router::{ use sortition::Sortition; use std::sync::{Arc, Mutex}; use tokio::task::JoinHandle; +use tracing::instrument; use crate::setup_datastore; +#[instrument(name="app", skip_all,fields(id = get_tag()))] pub async fn setup_ciphernode( config: AppConfig, address: Address, -) -> Result<(Addr, JoinHandle<()>)> { +) -> Result<(Addr, JoinHandle<()>, String)> { let rng = Arc::new(Mutex::new( rand_chacha::ChaCha20Rng::from_rng(OsRng).expect("Failed to create RNG"), )); @@ -35,7 +36,7 @@ pub async fn setup_ciphernode( let repositories = store.repositories(); - let sortition = Sortition::attach(&bus, repositories.sortition()); + let sortition = Sortition::attach(&bus, repositories.sortition()).await?; CiphernodeSelector::attach(&bus, &sortition, &address.to_string()); for chain in config @@ -46,9 +47,22 @@ pub async fn setup_ciphernode( let rpc_url = &chain.rpc_url; let read_provider = create_readonly_provider(&ensure_ws_rpc(rpc_url)).await?; - EnclaveSolReader::attach(&bus, &read_provider, &chain.contracts.enclave).await?; - CiphernodeRegistrySol::attach(&bus, &read_provider, &chain.contracts.ciphernode_registry) - .await?; + EnclaveSolReader::attach( + &bus, + &read_provider, + &chain.contracts.enclave.address(), + &repositories.enclave_sol_reader(read_provider.get_chain_id()), + chain.contracts.enclave.deploy_block(), + ) + .await?; + CiphernodeRegistrySol::attach( + &bus, + &read_provider, + &chain.contracts.ciphernode_registry.address(), + &repositories.ciphernode_registry_reader(read_provider.get_chain_id()), + chain.contracts.ciphernode_registry.deploy_block(), + ) + .await?; } E3RequestRouter::builder(&bus, store.clone()) @@ -57,10 +71,10 @@ pub async fn setup_ciphernode( .build() .await?; - let (_, join_handle) = P2p::spawn_libp2p(bus.clone()).expect("Failed to setup libp2p"); + let (_, join_handle, peer_id) = P2p::spawn_libp2p(bus.clone()).expect("Failed to setup libp2p"); let nm = format!("CIPHER({})", &address.to_string()[0..5]); SimpleLogger::attach(&nm, bus.clone()); - Ok((bus, join_handle)) + Ok((bus, join_handle, peer_id)) } diff --git a/packages/ciphernode/enclave_node/src/datastore.rs b/packages/ciphernode/enclave_node/src/datastore.rs index f88ff834..17a2be04 100644 --- a/packages/ciphernode/enclave_node/src/datastore.rs +++ b/packages/ciphernode/enclave_node/src/datastore.rs @@ -1,3 +1,5 @@ +use std::path::PathBuf; + use actix::{Actor, Addr}; use anyhow::Result; use config::AppConfig; @@ -5,11 +7,19 @@ use data::{DataStore, InMemStore, SledStore}; use enclave_core::EventBus; use router::{Repositories, RepositoriesFactory}; +pub fn get_sled_store(bus: &Addr, db_file: &PathBuf) -> Result { + Ok((&SledStore::new(bus, db_file)?).into()) +} + +pub fn get_in_mem_store() -> DataStore { + (&InMemStore::new(true).start()).into() +} + pub fn setup_datastore(config: &AppConfig, bus: &Addr) -> Result { let store: DataStore = if !config.use_in_mem_store() { - (&SledStore::new(&bus, &config.db_file())?.start()).into() + get_sled_store(&bus, &config.db_file())? } else { - (&InMemStore::new(true).start()).into() + get_in_mem_store() }; Ok(store) } diff --git a/packages/ciphernode/evm/Cargo.toml b/packages/ciphernode/evm/Cargo.toml index 7c3a1cbf..fba4aea4 100644 --- a/packages/ciphernode/evm/Cargo.toml +++ b/packages/ciphernode/evm/Cargo.toml @@ -8,6 +8,7 @@ actix = { workspace = true } alloy = { workspace = true } alloy-primitives = { workspace = true } anyhow = { workspace = true } +async-trait = { workspace = true } enclave-core = { path = "../core" } data = { path = "../data" } futures-util = { workspace = true } @@ -15,4 +16,9 @@ sortition = { path = "../sortition" } cipher = { path = "../cipher" } tokio = { workspace = true } tracing = { workspace = true } +serde = { workspace = true } zeroize = { workspace = true } + +[dev-dependencies] +enclave_node = { path = "../enclave_node" } + diff --git a/packages/ciphernode/evm/src/ciphernode_registry_sol.rs b/packages/ciphernode/evm/src/ciphernode_registry_sol.rs index 49193e41..d88e6552 100644 --- a/packages/ciphernode/evm/src/ciphernode_registry_sol.rs +++ b/packages/ciphernode/evm/src/ciphernode_registry_sol.rs @@ -1,19 +1,22 @@ use crate::{ + event_reader::EvmEventReaderState, helpers::{ReadonlyProvider, WithChainId}, EvmEventReader, }; -use actix::Addr; +use actix::{Actor, Addr}; use alloy::{ primitives::{LogData, B256}, sol, sol_types::SolEvent, }; use anyhow::Result; +use data::Repository; use enclave_core::{EnclaveEvent, EventBus}; -use tracing::{error, trace}; +use tracing::{error, info, trace}; sol!( #[sol(rpc)] + #[derive(Debug)] ICiphernodeRegistry, "../../evm/artifacts/contracts/interfaces/ICiphernodeRegistry.sol/ICiphernodeRegistry.json" ); @@ -96,26 +99,49 @@ pub fn extractor(data: &LogData, topic: Option<&B256>, _: u64) -> Option, provider: &WithChainId, contract_address: &str, + repository: &Repository, + start_block: Option, ) -> Result>> { - let addr = EvmEventReader::attach(bus, provider, extractor, contract_address).await?; + let addr = EvmEventReader::attach( + provider, + extractor, + contract_address, + start_block, + &bus.clone().into(), + repository, + ) + .await?; + + info!(address=%contract_address, "EnclaveSolReader is listening to address"); + Ok(addr) } } +/// Wrapper for a reader and a future writer pub struct CiphernodeRegistrySol; impl CiphernodeRegistrySol { pub async fn attach( bus: &Addr, provider: &WithChainId, contract_address: &str, + repository: &Repository, + start_block: Option, ) -> Result<()> { - CiphernodeRegistrySolReader::attach(bus, provider, contract_address).await?; + CiphernodeRegistrySolReader::attach( + bus, + provider, + contract_address, + repository, + start_block, + ) + .await?; + // TODO: Writer if needed Ok(()) } } diff --git a/packages/ciphernode/evm/src/enclave_sol.rs b/packages/ciphernode/evm/src/enclave_sol.rs index bd0c6ef7..ed1f76bc 100644 --- a/packages/ciphernode/evm/src/enclave_sol.rs +++ b/packages/ciphernode/evm/src/enclave_sol.rs @@ -1,10 +1,12 @@ use crate::{ enclave_sol_reader::EnclaveSolReader, enclave_sol_writer::EnclaveSolWriter, + event_reader::EvmEventReaderState, helpers::{ReadonlyProvider, SignerProvider, WithChainId}, }; use actix::Addr; use anyhow::Result; +use data::Repository; use enclave_core::EventBus; pub struct EnclaveSol; @@ -14,8 +16,17 @@ impl EnclaveSol { read_provider: &WithChainId, write_provider: &WithChainId, contract_address: &str, + repository: &Repository, + start_block: Option, ) -> Result<()> { - EnclaveSolReader::attach(bus, read_provider, contract_address).await?; + EnclaveSolReader::attach( + bus, + read_provider, + contract_address, + repository, + start_block, + ) + .await?; EnclaveSolWriter::attach(bus, write_provider, contract_address).await?; Ok(()) } diff --git a/packages/ciphernode/evm/src/enclave_sol_reader.rs b/packages/ciphernode/evm/src/enclave_sol_reader.rs index 2dd8477c..c71c39de 100644 --- a/packages/ciphernode/evm/src/enclave_sol_reader.rs +++ b/packages/ciphernode/evm/src/enclave_sol_reader.rs @@ -1,3 +1,4 @@ +use crate::event_reader::EvmEventReaderState; use crate::helpers::{ReadonlyProvider, WithChainId}; use crate::EvmEventReader; use actix::Addr; @@ -5,8 +6,9 @@ use alloy::primitives::{LogData, B256}; use alloy::transports::BoxTransport; use alloy::{sol, sol_types::SolEvent}; use anyhow::Result; +use data::Repository; use enclave_core::{EnclaveEvent, EventBus}; -use tracing::{error, trace}; +use tracing::{error, info, trace}; sol!( #[sol(rpc)] @@ -86,8 +88,21 @@ impl EnclaveSolReader { bus: &Addr, provider: &WithChainId, contract_address: &str, + repository: &Repository, + start_block: Option, ) -> Result>> { - let addr = EvmEventReader::attach(bus, provider, extractor, contract_address).await?; + let addr = EvmEventReader::attach( + provider, + extractor, + contract_address, + start_block, + &bus.clone(), + repository, + ) + .await?; + + info!(address=%contract_address, "EnclaveSolReader is listening to address"); + Ok(addr) } } diff --git a/packages/ciphernode/evm/src/event_reader.rs b/packages/ciphernode/evm/src/event_reader.rs index 93264c72..4981c4f6 100644 --- a/packages/ciphernode/evm/src/event_reader.rs +++ b/packages/ciphernode/evm/src/event_reader.rs @@ -1,31 +1,89 @@ -use std::marker::PhantomData; - -use crate::helpers::{self, WithChainId}; +use crate::helpers::{ReadonlyProvider, WithChainId}; use actix::prelude::*; use actix::{Addr, Recipient}; +use alloy::eips::BlockNumberOrTag; +use alloy::primitives::Address; use alloy::primitives::{LogData, B256}; use alloy::providers::Provider; +use alloy::rpc::types::Filter; use alloy::transports::{BoxTransport, Transport}; -use alloy::{eips::BlockNumberOrTag, primitives::Address, rpc::types::Filter}; use anyhow::{anyhow, Result}; -use enclave_core::{BusError, EnclaveErrorType, EnclaveEvent, EventBus, Subscribe}; +use async_trait::async_trait; +use data::{Checkpoint, FromSnapshotWithParams, Repository, Snapshot}; +use enclave_core::{ + get_tag, BusError, EnclaveErrorType, EnclaveEvent, EventBus, EventId, Subscribe, +}; +use futures_util::stream::StreamExt; +use std::collections::HashSet; +use tokio::select; use tokio::sync::oneshot; -use tracing::info; +use tracing::{error, info, instrument, trace, warn}; + +#[derive(Message, Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] +#[rtype(result = "()")] +pub struct EnclaveEvmEvent { + pub event: EnclaveEvent, + pub block: Option, +} + +impl EnclaveEvmEvent { + pub fn new(event: EnclaveEvent, block: Option) -> Self { + Self { event, block } + } + + pub fn get_id(&self) -> EventId { + EventId::hash(self.clone()) + } +} pub type ExtractorFn = fn(&LogData, Option<&B256>, u64) -> Option; +pub type EventReader = EvmEventReader; + +pub struct EvmEventReaderParams +where + P: Provider + Clone + 'static, + T: Transport + Clone + Unpin, +{ + provider: WithChainId, + extractor: ExtractorFn, + contract_address: Address, + start_block: Option, + bus: Addr, + repository: Repository, +} + +#[derive(Default, serde::Serialize, serde::Deserialize, Clone)] +pub struct EvmEventReaderState { + pub ids: HashSet, + pub last_block: Option, +} + /// Connects to Enclave.sol converting EVM events to EnclaveEvents pub struct EvmEventReader where P: Provider + Clone + 'static, T: Transport + Clone + Unpin, { + /// The alloy provider provider: Option>, + /// The contract address contract_address: Address, - bus: Recipient, + /// The Extractor function to determine which events to extract and convert to EnclaveEvents extractor: ExtractorFn, + /// A shutdown receiver to listen to for shutdown signals sent to the loop this is only used + /// internally. You should send the Shutdown signal to the reader directly or via the EventBus shutdown_rx: Option>, + /// The sender for the shutdown signal this is only used internally shutdown_tx: Option>, + /// The block that processing should start from + start_block: Option, + /// Event bus for error propagation + bus: Addr, + /// The in memory state of the event reader + state: EvmEventReaderState, + /// Repository to save the state of the event reader + repository: Repository, } impl EvmEventReader @@ -33,36 +91,52 @@ where P: Provider + Clone + 'static, T: Transport + Clone + Unpin, { - pub fn new( - bus: &Addr, - provider: &WithChainId, - extractor: ExtractorFn, - contract_address: &Address, - ) -> Result { + pub fn new(params: EvmEventReaderParams) -> Self { let (shutdown_tx, shutdown_rx) = oneshot::channel(); - Ok(Self { - contract_address: contract_address.clone(), - provider: Some(provider.clone()), - extractor, - bus: bus.clone().into(), + Self { + contract_address: params.contract_address, + provider: Some(params.provider), + extractor: params.extractor, shutdown_rx: Some(shutdown_rx), shutdown_tx: Some(shutdown_tx), + start_block: params.start_block, + bus: params.bus, + state: EvmEventReaderState::default(), + repository: params.repository, + } + } + + #[instrument(name="evm_event_reader", skip_all, fields(id = get_tag()))] + pub async fn load(params: EvmEventReaderParams) -> Result { + Ok(if let Some(snapshot) = params.repository.read().await? { + info!("Loading from snapshot"); + Self::from_snapshot(params, snapshot).await? + } else { + info!("Loading from params"); + Self::new(params) }) } pub async fn attach( - bus: &Addr, provider: &WithChainId, extractor: ExtractorFn, contract_address: &str, + start_block: Option, + bus: &Addr, + repository: &Repository, ) -> Result> { - let addr = - EvmEventReader::new(bus, provider, extractor, &contract_address.parse()?)?.start(); + let params = EvmEventReaderParams { + provider: provider.clone(), + extractor, + contract_address: contract_address.parse()?, + start_block, + bus: bus.clone(), + repository: repository.clone(), + }; + let addr = EvmEventReader::load(params).await?.start(); - bus.send(Subscribe::new("Shutdown", addr.clone().into())) - .await?; + bus.do_send(Subscribe::new("Shutdown", addr.clone().into())); - info!(address=%contract_address, "Evm is listening to address"); Ok(addr) } } @@ -74,27 +148,118 @@ where { type Context = actix::Context; fn started(&mut self, ctx: &mut Self::Context) { + let processor = ctx.address().recipient(); let bus = self.bus.clone(); let Some(provider) = self.provider.take() else { tracing::error!("Could not start event reader as provider has already been used."); return; }; - let filter = Filter::new() - .address(self.contract_address) - .from_block(BlockNumberOrTag::Latest); + let extractor = self.extractor; let Some(shutdown) = self.shutdown_rx.take() else { bus.err(EnclaveErrorType::Evm, anyhow!("shutdown already called")); return; }; + let contract_address = self.contract_address; + let start_block = self.start_block; + let tag = get_tag(); ctx.spawn( - async move { helpers::stream_from_evm(provider, filter, bus, extractor, shutdown).await } - .into_actor(self), + async move { + stream_from_evm( + provider, + &contract_address, + &processor, + extractor, + shutdown, + start_block, + &bus, + &tag, + ) + .await + } + .into_actor(self), ); } } +#[instrument(name = "evm_event_reader", skip_all, fields(id=id))] +async fn stream_from_evm, T: Transport + Clone>( + provider: WithChainId, + contract_address: &Address, + processor: &Recipient, + extractor: fn(&LogData, Option<&B256>, u64) -> Option, + mut shutdown: oneshot::Receiver<()>, + start_block: Option, + bus: &Addr, + id: &str, +) { + let chain_id = provider.get_chain_id(); + let provider = provider.get_provider(); + + let historical_filter = Filter::new() + .address(contract_address.clone()) + .from_block(start_block.unwrap_or(0)); + let current_filter = Filter::new() + .address(*contract_address) + .from_block(BlockNumberOrTag::Latest); + + // Historical events + match provider.clone().get_logs(&historical_filter).await { + Ok(historical_logs) => { + info!("Fetched {} historical events", historical_logs.len()); + for log in historical_logs { + let block_number = log.block_number; + if let Some(event) = extractor(log.data(), log.topic0(), chain_id) { + trace!("Processing historical log"); + processor.do_send(EnclaveEvmEvent::new(event, block_number)); + } + } + } + Err(e) => { + warn!("Failed to fetch historical events: {}", e); + bus.err(EnclaveErrorType::Evm, anyhow!(e)); + return; + } + } + + info!("subscribing to live events"); + match provider.subscribe_logs(¤t_filter).await { + Ok(subscription) => { + let mut stream = subscription.into_stream(); + loop { + select! { + maybe_log = stream.next() => { + match maybe_log { + Some(log) => { + let block_number = log.block_number; + trace!("Received log from EVM"); + let Some(event) = extractor(log.data(), log.topic0(), chain_id) + else { + warn!("Failed to extract log from EVM."); + continue; + }; + info!("Extracted Evm Event: {}", event); + processor.do_send(EnclaveEvmEvent::new(event, block_number)); + + } + None => break, // Stream ended + } + } + _ = &mut shutdown => { + info!("Received shutdown signal, stopping EVM stream"); + break; + } + } + } + } + Err(e) => { + bus.err(EnclaveErrorType::Evm, anyhow!("{}", e)); + } + }; + info!("Exiting stream loop"); +} + impl Handler for EvmEventReader where P: Provider + Clone + 'static, @@ -109,3 +274,79 @@ where } } } + +impl Handler for EvmEventReader +where + P: Provider + Clone + 'static, + T: Transport + Clone + Unpin, +{ + type Result = (); + + #[instrument(name="evm_event_reader", skip_all, fields(id = get_tag()))] + fn handle(&mut self, wrapped: EnclaveEvmEvent, _: &mut Self::Context) -> Self::Result { + let event_id = wrapped.get_id(); + info!("Processing event: {}", event_id); + info!("cache length: {}", self.state.ids.len()); + if self.state.ids.contains(&event_id) { + error!( + "Event id {} has already been seen and was not forwarded to the bus", + &event_id + ); + return; + } + let event_type = wrapped.event.event_type(); + + // Forward everything else to the event bus + self.bus.do_send(wrapped.event); + + // Save processed ids + info!("Storing event(EVM) in cache {}({})", event_type, event_id); + self.state.ids.insert(event_id); + self.state.last_block = wrapped.block; + self.checkpoint(); + } +} + +impl Snapshot for EvmEventReader +where + P: Provider + Clone + 'static, + T: Transport + Clone + Unpin, +{ + type Snapshot = EvmEventReaderState; + fn snapshot(&self) -> Self::Snapshot { + self.state.clone() + } +} + +impl Checkpoint for EvmEventReader +where + P: Provider + Clone + 'static, + T: Transport + Clone + Unpin, +{ + fn repository(&self) -> &Repository { + &self.repository + } +} + +#[async_trait] +impl FromSnapshotWithParams for EvmEventReader +where + P: Provider + Clone + 'static, + T: Transport + Clone + Unpin, +{ + type Params = EvmEventReaderParams; + async fn from_snapshot(params: Self::Params, snapshot: Self::Snapshot) -> Result { + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + Ok(Self { + contract_address: params.contract_address, + provider: Some(params.provider), + extractor: params.extractor, + shutdown_rx: Some(shutdown_rx), + shutdown_tx: Some(shutdown_tx), + start_block: params.start_block, + bus: params.bus, + state: snapshot, + repository: params.repository, + }) + } +} diff --git a/packages/ciphernode/evm/src/helpers.rs b/packages/ciphernode/evm/src/helpers.rs index ecaf7f73..116f3c2c 100644 --- a/packages/ciphernode/evm/src/helpers.rs +++ b/packages/ciphernode/evm/src/helpers.rs @@ -1,7 +1,5 @@ -use actix::Recipient; use alloy::{ network::{Ethereum, EthereumWallet}, - primitives::{LogData, B256}, providers::{ fillers::{ BlobGasFiller, ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller, @@ -9,62 +7,15 @@ use alloy::{ }, Identity, Provider, ProviderBuilder, RootProvider, }, - rpc::types::Filter, signers::local::PrivateKeySigner, transports::{BoxTransport, Transport}, }; -use anyhow::anyhow; use anyhow::{bail, Context, Result}; use cipher::Cipher; use data::Repository; -use enclave_core::{BusError, EnclaveErrorType, EnclaveEvent}; -use futures_util::stream::StreamExt; use std::{env, marker::PhantomData, sync::Arc}; -use tokio::{select, sync::oneshot}; -use tracing::{info, trace}; use zeroize::Zeroizing; -pub async fn stream_from_evm, T: Transport + Clone>( - provider: WithChainId, - filter: Filter, - bus: Recipient, - extractor: fn(&LogData, Option<&B256>, u64) -> Option, - mut shutdown: oneshot::Receiver<()>, -) { - match provider.get_provider().subscribe_logs(&filter).await { - Ok(subscription) => { - let mut stream = subscription.into_stream(); - loop { - select! { - maybe_log = stream.next() => { - match maybe_log { - Some(log) => { - trace!("Received log from EVM"); - let Some(event) = extractor(log.data(), log.topic0(), provider.get_chain_id()) - else { - trace!("Failed to extract log from EVM"); - continue; - }; - info!("Extracted log from evm sending now."); - bus.do_send(event); - } - None => break, // Stream ended - } - } - _ = &mut shutdown => { - info!("Received shutdown signal, stopping EVM stream"); - break; - } - } - } - } - Err(e) => { - bus.err(EnclaveErrorType::Evm, anyhow!("{}", e)); - } - }; - info!("Exiting stream loop"); -} - /// We need to cache the chainId so we can easily use it in a non-async situation /// This wrapper just stores the chain_id with the Provider #[derive(Clone)] diff --git a/packages/ciphernode/evm/src/lib.rs b/packages/ciphernode/evm/src/lib.rs index 5d01008a..e7fb10c9 100644 --- a/packages/ciphernode/evm/src/lib.rs +++ b/packages/ciphernode/evm/src/lib.rs @@ -10,5 +10,5 @@ pub use ciphernode_registry_sol::{CiphernodeRegistrySol, CiphernodeRegistrySolRe pub use enclave_sol::EnclaveSol; pub use enclave_sol_reader::EnclaveSolReader; pub use enclave_sol_writer::EnclaveSolWriter; -pub use event_reader::{EvmEventReader, ExtractorFn}; +pub use event_reader::{EnclaveEvmEvent, EvmEventReader, EvmEventReaderState, ExtractorFn}; pub use registry_filter_sol::{RegistryFilterSol, RegistryFilterSolWriter}; diff --git a/packages/ciphernode/evm/tests/evm_reader.rs b/packages/ciphernode/evm/tests/evm_reader.rs index 96315c25..0fdacf07 100644 --- a/packages/ciphernode/evm/tests/evm_reader.rs +++ b/packages/ciphernode/evm/tests/evm_reader.rs @@ -1,14 +1,17 @@ -use actix::Actor; +use actix::{Actor, Addr}; use alloy::{ node_bindings::Anvil, + primitives::{FixedBytes, LogData}, providers::{ProviderBuilder, WsConnect}, sol, sol_types::SolEvent, }; use anyhow::Result; -use enclave_core::{EnclaveEvent, EventBus, GetHistory, TestEvent}; +use data::Repository; +use enclave_core::{EnclaveEvent, EventBus, GetHistory, Shutdown, TestEvent}; +use enclave_node::get_in_mem_store; use evm::{helpers::WithChainId, EvmEventReader}; -use std::{sync::Arc, time::Duration}; +use std::time::Duration; use tokio::time::sleep; sol!( @@ -17,32 +20,47 @@ sol!( "tests/fixtures/emit_logs.json" ); +fn test_event_extractor( + data: &LogData, + topic: Option<&FixedBytes<32>>, + _chain_id: u64, +) -> Option { + match topic { + Some(&EmitLogs::ValueChanged::SIGNATURE_HASH) => { + let Ok(event) = EmitLogs::ValueChanged::decode_log_data(data, true) else { + return None; + }; + Some(EnclaveEvent::from(TestEvent { + msg: event.value, + entropy: event.count.try_into().unwrap(), // This prevents de-duplication in tests + })) + } + _ => None, + } +} + #[actix::test] -async fn test_logs() -> Result<()> { +async fn evm_reader() -> Result<()> { // Create a WS provider // NOTE: Anvil must be available on $PATH let anvil = Anvil::new().block_time(1).try_spawn()?; - let ws = WsConnect::new(anvil.ws_endpoint()); - let provider = ProviderBuilder::new().on_ws(ws).await?; - let arc_provider = WithChainId::new(provider).await?; - let contract = Arc::new(EmitLogs::deploy(arc_provider.get_provider()).await?); + let provider = WithChainId::new( + ProviderBuilder::new() + .on_ws(WsConnect::new(anvil.ws_endpoint())) + .await?, + ) + .await?; + let contract = EmitLogs::deploy(provider.get_provider()).await?; let bus = EventBus::new(true).start(); + let repository = Repository::new(get_in_mem_store()); EvmEventReader::attach( - &bus, - &arc_provider, - |data, topic, _| match topic { - Some(&EmitLogs::ValueChanged::SIGNATURE_HASH) => { - let Ok(event) = EmitLogs::ValueChanged::decode_log_data(data, true) else { - return None; - }; - Some(EnclaveEvent::from(TestEvent { - msg: event.newValue, - })) - } - _ => None, - }, + &provider, + test_event_extractor, &contract.address().to_string(), + None, + &bus, + &repository, ) .await?; @@ -78,3 +96,179 @@ async fn test_logs() -> Result<()> { Ok(()) } + +#[actix::test] +async fn ensure_historical_events() -> Result<()> { + // Create a WS provider + // NOTE: Anvil must be available on $PATH + let anvil = Anvil::new().block_time(1).try_spawn()?; + let provider = WithChainId::new( + ProviderBuilder::new() + .on_ws(WsConnect::new(anvil.ws_endpoint())) + .await?, + ) + .await?; + let contract = EmitLogs::deploy(provider.get_provider()).await?; + let bus = EventBus::new(true).start(); + + let historical_msgs = vec!["these", "are", "historical", "events"]; + let live_events = vec!["these", "events", "are", "live"]; + + let repository = Repository::new(get_in_mem_store()); + for msg in historical_msgs.clone() { + contract + .setValue(msg.to_string()) + .send() + .await? + .watch() + .await?; + } + + EvmEventReader::attach( + &provider, + test_event_extractor, + &contract.address().to_string(), + None, + &bus, + &repository, + ) + .await?; + + for msg in live_events.clone() { + contract + .setValue(msg.to_string()) + .send() + .await? + .watch() + .await?; + } + + sleep(Duration::from_millis(1)).await; + + let expected: Vec<_> = historical_msgs.into_iter().chain(live_events).collect(); + + let history = bus.send(GetHistory).await?; + assert_eq!(history.len(), 8); + + let msgs: Vec<_> = history + .into_iter() + .filter_map(|evt| match evt { + EnclaveEvent::TestEvent { data, .. } => Some(data.msg), + _ => None, + }) + .collect(); + + assert_eq!(msgs, expected); + + Ok(()) +} + +#[actix::test] +async fn ensure_resume_after_shutdown() -> Result<()> { + // Create a WS provider + // NOTE: Anvil must be available on $PATH + let anvil = Anvil::new().block_time(1).try_spawn()?; + let provider = WithChainId::new( + ProviderBuilder::new() + .on_ws(WsConnect::new(anvil.ws_endpoint())) + .await?, + ) + .await?; + let contract = EmitLogs::deploy(provider.get_provider()).await?; + let bus = EventBus::new(true).start(); + + async fn get_msgs(bus: &Addr) -> Result> { + let history = bus.send(GetHistory).await?; + let msgs: Vec = history + .into_iter() + .filter_map(|evt| match evt { + EnclaveEvent::TestEvent { data, .. } => Some(data.msg), + _ => None, + }) + .collect(); + + Ok(msgs) + } + + let repository = Repository::new(get_in_mem_store()); + + for msg in ["before", "online"] { + contract + .setValue(msg.to_string()) + .send() + .await? + .watch() + .await?; + } + + let addr1 = EvmEventReader::attach( + &provider, + test_event_extractor, + &contract.address().to_string(), + None, + &bus, + &repository, + ) + .await?; + + for msg in ["live", "events"] { + contract + .setValue(msg.to_string()) + .send() + .await? + .watch() + .await?; + } + + // Ensure shutdown doesn't cause event to be lost. + sleep(Duration::from_millis(1)).await; + addr1.send(EnclaveEvent::from(Shutdown)).await?; + + for msg in ["these", "are", "not", "lost"] { + contract + .setValue(msg.to_string()) + .send() + .await? + .watch() + .await?; + } + + sleep(Duration::from_millis(1)).await; + let msgs = get_msgs(&bus).await?; + assert_eq!(msgs, ["before", "online", "live", "events"]); + + let _ = EvmEventReader::attach( + &provider, + test_event_extractor, + &contract.address().to_string(), + None, + &bus, + &repository, + ) + .await?; + + sleep(Duration::from_millis(1)).await; + let msgs = get_msgs(&bus).await?; + assert_eq!( + msgs, + ["before", "online", "live", "events", "these", "are", "not", "lost"] + ); + + for msg in ["resumed", "data"] { + contract + .setValue(msg.to_string()) + .send() + .await? + .watch() + .await?; + } + + sleep(Duration::from_millis(1)).await; + let msgs = get_msgs(&bus).await?; + assert_eq!( + msgs, + ["before", "online", "live", "events", "these", "are", "not", "lost", "resumed", "data"] + ); + + Ok(()) +} diff --git a/packages/ciphernode/evm/tests/fixtures/emit_logs.sol b/packages/ciphernode/evm/tests/fixtures/emit_logs.sol index e9bad33c..1c57518e 100644 --- a/packages/ciphernode/evm/tests/fixtures/emit_logs.sol +++ b/packages/ciphernode/evm/tests/fixtures/emit_logs.sol @@ -2,10 +2,12 @@ pragma solidity >=0.4.24; contract EmitLogs { - event ValueChanged(address indexed author, string oldValue, string newValue); + event ValueChanged(address indexed author, uint256 count, string value); string _value; + uint256 count = 0; + constructor() { _value = ""; } @@ -15,7 +17,8 @@ contract EmitLogs { } function setValue(string memory value) public { - emit ValueChanged(msg.sender, _value, value); + count++; + emit ValueChanged(msg.sender, count, value); _value = value; } } diff --git a/packages/ciphernode/keyshare/src/keyshare.rs b/packages/ciphernode/keyshare/src/keyshare.rs index 20c6cf67..a2e93c0e 100644 --- a/packages/ciphernode/keyshare/src/keyshare.rs +++ b/packages/ciphernode/keyshare/src/keyshare.rs @@ -85,8 +85,8 @@ impl Snapshot for Keyshare { } impl Checkpoint for Keyshare { - fn repository(&self) -> Repository { - self.store.clone() + fn repository(&self) -> &Repository { + &self.store } } diff --git a/packages/ciphernode/p2p/src/libp2p_router.rs b/packages/ciphernode/p2p/src/libp2p_router.rs index 9ed1a8c8..d270402f 100644 --- a/packages/ciphernode/p2p/src/libp2p_router.rs +++ b/packages/ciphernode/p2p/src/libp2p_router.rs @@ -9,7 +9,6 @@ use std::time::Duration; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::{io, select}; use tracing::{error, info, trace}; -use tracing_subscriber::EnvFilter; #[derive(NetworkBehaviour)] pub struct MyBehaviour { @@ -58,19 +57,22 @@ impl EnclaveRouter { )) } - pub fn with_identiy(&mut self, keypair: identity::Keypair) { - self.identity = Some(keypair); + pub fn with_identity(&mut self, keypair: &identity::Keypair) { + self.identity = Some(keypair.clone()); } pub fn connect_swarm(&mut self, discovery_type: String) -> Result<&Self, Box> { match discovery_type.as_str() { "mdns" => { - let _ = tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env()) - .try_init(); - // TODO: Use key if assigned already - let mut swarm = libp2p::SwarmBuilder::with_new_identity() + + let swarm = self + .identity + .clone() + .map_or_else( + || libp2p::SwarmBuilder::with_new_identity(), + |id| libp2p::SwarmBuilder::with_existing_identity(id), + ) .with_tokio() .with_tcp( tcp::Config::default(), @@ -92,6 +94,7 @@ impl EnclaveRouter { })? .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60))) .build(); + self.swarm = Some(swarm); } _ => info!("Defaulting to MDNS discovery"), diff --git a/packages/ciphernode/p2p/src/p2p.rs b/packages/ciphernode/p2p/src/p2p.rs index 635ad8e5..53bbd559 100644 --- a/packages/ciphernode/p2p/src/p2p.rs +++ b/packages/ciphernode/p2p/src/p2p.rs @@ -65,14 +65,16 @@ impl P2p { /// Spawn a Libp2p instance. Calls spawn and listen pub fn spawn_libp2p( bus: Addr, - ) -> Result<(Addr, tokio::task::JoinHandle<()>), Box> { + ) -> Result<(Addr, tokio::task::JoinHandle<()>, String), Box> { let (mut libp2p, tx, rx) = EnclaveRouter::new()?; + let keypair = libp2p::identity::Keypair::generate_ed25519(); + libp2p.with_identity(&keypair); libp2p.connect_swarm("mdns".to_string())?; libp2p.join_topic("enclave-keygen-01")?; let p2p_addr = Self::spawn_and_listen(bus, tx, rx); let handle = tokio::spawn(async move { libp2p.start().await.unwrap() }); - Ok((p2p_addr, handle)) + Ok((p2p_addr, handle, keypair.public().to_peer_id().to_string())) } } diff --git a/packages/ciphernode/router/Cargo.toml b/packages/ciphernode/router/Cargo.toml index ce85d444..cb626737 100644 --- a/packages/ciphernode/router/Cargo.toml +++ b/packages/ciphernode/router/Cargo.toml @@ -11,6 +11,7 @@ fhe = { path = "../fhe" } data = { path = "../data" } keyshare = { path = "../keyshare" } aggregator = { path = "../aggregator" } +evm = { path = "../evm" } anyhow = { workspace = true } serde = { workspace = true } cipher = { path = "../cipher" } diff --git a/packages/ciphernode/router/src/context.rs b/packages/ciphernode/router/src/context.rs index c3643132..506e106c 100644 --- a/packages/ciphernode/router/src/context.rs +++ b/packages/ciphernode/router/src/context.rs @@ -143,7 +143,7 @@ impl E3RequestContext { impl RepositoriesFactory for E3RequestContext { fn repositories(&self) -> Repositories { - self.repository().into() + self.repository().clone().into() } } @@ -186,7 +186,7 @@ impl FromSnapshotWithParams for E3RequestContext { } impl Checkpoint for E3RequestContext { - fn repository(&self) -> Repository { - self.store.clone() + fn repository(&self) -> &Repository { + &self.store } } diff --git a/packages/ciphernode/router/src/e3_request_router.rs b/packages/ciphernode/router/src/e3_request_router.rs index 943a3691..0890d4e0 100644 --- a/packages/ciphernode/router/src/e3_request_router.rs +++ b/packages/ciphernode/router/src/e3_request_router.rs @@ -190,8 +190,8 @@ impl Snapshot for E3RequestRouter { } impl Checkpoint for E3RequestRouter { - fn repository(&self) -> Repository { - self.store.clone() + fn repository(&self) -> &Repository { + &self.store } } diff --git a/packages/ciphernode/router/src/repositories.rs b/packages/ciphernode/router/src/repositories.rs index 0d5f703a..e8557c18 100644 --- a/packages/ciphernode/router/src/repositories.rs +++ b/packages/ciphernode/router/src/repositories.rs @@ -2,6 +2,7 @@ use crate::{CommitteeMeta, E3RequestContextSnapshot, E3RequestRouterSnapshot}; use aggregator::{PlaintextAggregatorState, PublicKeyAggregatorState}; use data::{DataStore, Repository}; use enclave_core::E3id; +use evm::EvmEventReaderState; use fhe::FheSnapshot; use keyshare::KeyshareState; use sortition::SortitionModule; @@ -72,6 +73,19 @@ impl Repositories { pub fn eth_private_key(&self) -> Repository> { Repository::new(self.store.scope(format!("//eth_private_key"))) } + + pub fn enclave_sol_reader(&self, chain_id: u64) -> Repository { + Repository::new( + self.store + .scope(format!("//evm_readers/enclave/{chain_id}")), + ) + } + pub fn ciphernode_registry_reader(&self, chain_id: u64) -> Repository { + Repository::new( + self.store + .scope(format!("//evm_readers/ciphernode_registry/{chain_id}")), + ) + } } pub trait RepositoriesFactory { diff --git a/packages/ciphernode/sortition/src/sortition.rs b/packages/ciphernode/sortition/src/sortition.rs index 0f992c96..a41c9fa9 100644 --- a/packages/ciphernode/sortition/src/sortition.rs +++ b/packages/ciphernode/sortition/src/sortition.rs @@ -5,10 +5,11 @@ use anyhow::{anyhow, Result}; use async_trait::async_trait; use data::{Checkpoint, FromSnapshotWithParams, Repository, Snapshot}; use enclave_core::{ - BusError, CiphernodeAdded, CiphernodeRemoved, EnclaveErrorType, EnclaveEvent, EventBus, Seed, - Subscribe, + get_tag, BusError, CiphernodeAdded, CiphernodeRemoved, EnclaveErrorType, EnclaveEvent, + EventBus, Seed, Subscribe, }; use std::collections::HashSet; +use tracing::{info, instrument}; #[derive(Message, Clone, Debug, PartialEq, Eq)] #[rtype(result = "bool")] @@ -24,7 +25,7 @@ pub trait SortitionList { fn remove(&mut self, address: T); } -#[derive(Clone, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct SortitionModule { nodes: HashSet, } @@ -35,6 +36,10 @@ impl SortitionModule { nodes: HashSet::new(), } } + + pub fn nodes(&self) -> &HashSet { + &self.nodes + } } impl Default for SortitionModule { @@ -87,6 +92,7 @@ pub struct Sortition { store: Repository, } +#[derive(Debug)] pub struct SortitionParams { pub bus: Addr, pub store: Repository, @@ -101,14 +107,30 @@ impl Sortition { } } - pub fn attach(bus: &Addr, store: Repository) -> Addr { - let addr = Sortition::new(SortitionParams { + #[instrument(name="sortition", skip_all, fields(id = get_tag()))] + pub async fn attach( + bus: &Addr, + store: Repository, + ) -> Result> { + let addr = Sortition::load(SortitionParams { bus: bus.clone(), store, }) + .await? .start(); bus.do_send(Subscribe::new("CiphernodeAdded", addr.clone().into())); - addr + Ok(addr) + } + + #[instrument(name="sortition", skip_all, fields(id = get_tag()))] + pub async fn load(params: SortitionParams) -> Result { + Ok(if let Some(snapshot) = params.store.read().await? { + info!("Loading from snapshot"); + Self::from_snapshot(params, snapshot).await? + } else { + info!("Loading from params"); + Self::new(params) + }) } pub fn get_nodes(&self) -> Vec { @@ -130,7 +152,14 @@ impl Snapshot for Sortition { #[async_trait] impl FromSnapshotWithParams for Sortition { type Params = SortitionParams; + + #[instrument(name="sortition", skip_all, fields(id = get_tag()))] async fn from_snapshot(params: Self::Params, snapshot: Self::Snapshot) -> Result { + info!("Loaded snapshot with {} nodes", snapshot.nodes().len()); + info!( + "Nodes:\n\n{:?}\n", + snapshot.nodes().into_iter().collect::>() + ); Ok(Sortition { bus: params.bus, store: params.store, @@ -140,8 +169,8 @@ impl FromSnapshotWithParams for Sortition { } impl Checkpoint for Sortition { - fn repository(&self) -> Repository { - self.store.clone() + fn repository(&self) -> &Repository { + &self.store } } @@ -158,20 +187,30 @@ impl Handler for Sortition { impl Handler for Sortition { type Result = (); + + #[instrument(name="sortition", skip_all, fields(id = get_tag()))] fn handle(&mut self, msg: CiphernodeAdded, _ctx: &mut Self::Context) -> Self::Result { + info!("Adding node: {}", msg.address); self.list.add(msg.address); + self.checkpoint(); } } impl Handler for Sortition { type Result = (); + + #[instrument(name="sortition", skip_all, fields(id = get_tag()))] fn handle(&mut self, msg: CiphernodeRemoved, _ctx: &mut Self::Context) -> Self::Result { + info!("Removing node: {}", msg.address); self.list.remove(msg.address); + self.checkpoint(); } } impl Handler for Sortition { type Result = bool; + + #[instrument(name="sortition", skip_all, fields(id = get_tag()))] fn handle(&mut self, msg: GetHasNode, _ctx: &mut Self::Context) -> Self::Result { match self.list.contains(msg.seed, msg.size, msg.address) { Ok(val) => val, diff --git a/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs b/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs index 1ac603bd..9e12fab6 100644 --- a/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs +++ b/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs @@ -52,7 +52,7 @@ async fn setup_local_ciphernode( let store = DataStore::from(&data_actor); let repositories = store.repositories(); // create ciphernode actor for managing ciphernode flow - let sortition = Sortition::attach(&bus, repositories.sortition()); + let sortition = Sortition::attach(&bus, repositories.sortition()).await?; CiphernodeSelector::attach(&bus, &sortition, addr); let router = E3RequestRouter::builder(&bus, store) diff --git a/tests/basic_integration/base.sh b/tests/basic_integration/base.sh new file mode 100755 index 00000000..3fcf7cae --- /dev/null +++ b/tests/basic_integration/base.sh @@ -0,0 +1,110 @@ +#!/usr/bin/env bash + +set -eu # Exit immediately if a command exits with a non-zero status + +# Get the directory of the currently executing script +THIS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +# Source the file from the same directory +source "$THIS_DIR/fns.sh" + +heading "Start the EVM node" + +launch_evm + +until curl -f -s "http://localhost:8545" > /dev/null; do + sleep 1 +done + +# Set the password for all ciphernodes +set_password cn1 "$CIPHERNODE_SECRET" +set_password cn2 "$CIPHERNODE_SECRET" +set_password cn3 "$CIPHERNODE_SECRET" +set_password cn4 "$CIPHERNODE_SECRET" +set_password ag "$CIPHERNODE_SECRET" +set_private_key ag "$PRIVATE_KEY" + +# Launch 4 ciphernodes +launch_ciphernode cn1 +launch_ciphernode cn2 +launch_ciphernode cn3 +launch_ciphernode cn4 +launch_aggregator ag + +waiton-files "$ROOT_DIR/packages/ciphernode/target/debug/enclave" "$ROOT_DIR/packages/ciphernode/target/debug/fake_encrypt" + +heading "Add ciphernode $CIPHERNODE_ADDRESS_1" +yarn ciphernode:add --ciphernode-address $CIPHERNODE_ADDRESS_1 --network localhost + +heading "Add ciphernode $CIPHERNODE_ADDRESS_2" +yarn ciphernode:add --ciphernode-address $CIPHERNODE_ADDRESS_2 --network localhost + +heading "Add ciphernode $CIPHERNODE_ADDRESS_3" +yarn ciphernode:add --ciphernode-address $CIPHERNODE_ADDRESS_3 --network localhost + +heading "Add ciphernode $CIPHERNODE_ADDRESS_4" +yarn ciphernode:add --ciphernode-address $CIPHERNODE_ADDRESS_4 --network localhost + +heading "Request Committee" + +ENCODED_PARAMS=0x$($SCRIPT_DIR/lib/pack_e3_params.sh --moduli 0x3FFFFFFF000001 --degree 2048 --plaintext-modulus 1032193) + +yarn committee:new --network localhost --duration 4 --e3-params "$ENCODED_PARAMS" + +waiton "$SCRIPT_DIR/output/pubkey.bin" +PUBLIC_KEY=$(xxd -p -c 10000000 "$SCRIPT_DIR/output/pubkey.bin") + +heading "Mock encrypted plaintext" +$SCRIPT_DIR/lib/fake_encrypt.sh --input "$SCRIPT_DIR/output/pubkey.bin" --output "$SCRIPT_DIR/output/output.bin" --plaintext $PLAINTEXT + +heading "Mock activate e3-id" +yarn e3:activate --e3-id 0 --public-key "0x$PUBLIC_KEY" --network localhost + +heading "Mock publish input e3-id" +yarn e3:publishInput --network localhost --e3-id 0 --data 0x12345678 + +sleep 4 # wait for input deadline to pass + +waiton "$SCRIPT_DIR/output/output.bin" + +heading "Publish ciphertext to EVM" +yarn e3:publishCiphertext --e3-id 0 --network localhost --data-file "$SCRIPT_DIR/output/output.bin" --proof 0x12345678 + +waiton "$SCRIPT_DIR/output/plaintext.txt" + +ACTUAL=$(cat $SCRIPT_DIR/output/plaintext.txt) + + +# Assume plaintext is shorter + +if [[ "$ACTUAL" != "$PLAINTEXT"* ]]; then + echo "Invalid plaintext decrypted: actual='$ACTUAL' expected='$PLAINTEXT'" + echo "Test FAILED" + exit 1 +fi + +heading "Test PASSED !" +echo -e "\033[32m + ██████ + ██████ + ██████ + ██████ + ██████ + ██████ + ██ ██████ + ████ ██████ + ██████ ██████ + ██████████ + ████████ + ██████ + ████ + ██ + \033[0m" + +pkill -15 -f "target/debug/enclave" || true +pkill -15 -f "target/debug/aggregator" || true + +sleep 4 + +cleanup 0 + diff --git a/tests/basic_integration/fns.sh b/tests/basic_integration/fns.sh new file mode 100644 index 00000000..78f45a8e --- /dev/null +++ b/tests/basic_integration/fns.sh @@ -0,0 +1,144 @@ + +# Get the script's location +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ROOT_DIR="$(cd "$SCRIPT_DIR/../.." && pwd)" +PLAINTEXT="1234,567890" +ID=$(date +%s) + +if [[ "$ROOT_DIR" != "$(pwd)" ]]; then + echo "This script must be run from the root" + exit 1 +fi + +# Environment variables +RPC_URL="ws://localhost:8545" + +PRIVATE_KEY="0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" +CIPHERNODE_SECRET="We are the music makers and we are the dreamers of the dreams." + +# These contracts are based on the deterministic order of hardhat deploy +# We _may_ wish to get these off the hardhat environment somehow? +ENCLAVE_CONTRACT="0x9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0" +REGISTRY_CONTRACT="0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9" +REGISTRY_FILTER_CONTRACT="0xDc64a140Aa3E981100a9becA4E685f962f0cF6C9" +INPUT_VALIDATOR_CONTRACT="0x8A791620dd6260079BF849Dc5567aDC3F2FdC318" + +# These are random addresses for now +CIPHERNODE_ADDRESS_1="0x2546BcD3c84621e976D8185a91A922aE77ECEc30" +CIPHERNODE_ADDRESS_2="0xbDA5747bFD65F08deb54cb465eB87D40e51B197E" +CIPHERNODE_ADDRESS_3="0xdD2FD4581271e230360230F9337D5c0430Bf44C0" +CIPHERNODE_ADDRESS_4="0x8626f6940E2eb28930eFb4CeF49B2d1F2C9C1199" + + +# Function to clean up background processes +cleanup() { + echo "Cleaning up processes..." + kill $(jobs -p) 2>/dev/null + exit ${1:-1} +} + +heading() { + echo "" + echo "" + echo "--------------------------------------------------------------" + echo " $1 " + echo "--------------------------------------------------------------" + echo "" +} + +waiton() { + local file_path="$1" + until [ -f "$file_path" ]; do + sleep 1 + done +} + +waiton-files() { + local timeout=600 # 10 minutes timeout + local start_time=$(date +%s) + while true; do + all_exist=true + for file in "$@"; do + if [ ! -f "$file" ]; then + all_exist=false + break + fi + done + if $all_exist; then + break + fi + if [ $(($(date +%s) - start_time)) -ge $timeout ]; then + echo "Timeout waiting for files: $@" >&2 + return 1 + fi + sleep 1 + done +} + +set_password() { + local name="$1" + local password="$2" + yarn enclave password create \ + --config "$SCRIPT_DIR/lib/$name/config.yaml" \ + --password "$password" +} + +launch_ciphernode() { + local name="$1" + heading "Launch ciphernode $name" + yarn enclave start \ + --tag "$name" \ + --config "$SCRIPT_DIR/lib/$name/config.yaml" & echo $! > "/tmp/enclave.${ID}_${name}.pid" +} + +set_private_key() { + local name="$1" + local private_key="$2" + + yarn enclave wallet set \ + --config "$SCRIPT_DIR/lib/$name/config.yaml" \ + --private-key "$private_key" +} + +launch_aggregator() { + local name="$1" + heading "Launch aggregator $name" + + yarn enclave aggregator start \ + --tag "$name" \ + --config "$SCRIPT_DIR/lib/$name/config.yaml" \ + --pubkey-write-path "$SCRIPT_DIR/output/pubkey.bin" \ + --plaintext-write-path "$SCRIPT_DIR/output/plaintext.txt" & echo $! > "/tmp/enclave.${ID}_${name}.pid" + + ps aux | grep aggregator +} + +kill_proc() { + local name=$1 + local pid=$(ps aux | grep 'enclave' | grep "\--tag $name" | awk '{ print $2 }') + echo "Killing $pid" + kill $pid +} + +metallica() { + pkill -9 -f "target/debug/enclave" || true + pkill -9 -f "hardhat node" || true +} + +launch_evm() { + if [ ! -z "${SILENT_EVM:-}" ]; then + yarn evm:node &> /dev/null & + else + yarn evm:node & + fi +} + +metallica + +# Set up trap to catch errors and interrupts +trap 'cleanup $?' ERR INT TERM + +$SCRIPT_DIR/lib/clean_folders.sh "$SCRIPT_DIR" +$SCRIPT_DIR/lib/prebuild.sh + + diff --git a/tests/basic_integration/lib/ag/config.yaml b/tests/basic_integration/lib/ag/config.yaml index 55bdb526..68b919f1 100644 --- a/tests/basic_integration/lib/ag/config.yaml +++ b/tests/basic_integration/lib/ag/config.yaml @@ -1,6 +1,6 @@ config_dir: . data_dir: . -address: "0x8626f6940E2eb28930eFb4CeF49B2d1F2C9C1199" +address: "0x8626a6940E2eb28930eFb4CeF49B2d1F2C9C1199" chains: - name: "hardhat" rpc_url: "ws://localhost:8545" diff --git a/tests/basic_integration/lib/clean_folders.sh b/tests/basic_integration/lib/clean_folders.sh index 58a1195e..5c7686a0 100755 --- a/tests/basic_integration/lib/clean_folders.sh +++ b/tests/basic_integration/lib/clean_folders.sh @@ -4,6 +4,7 @@ clean_folders() { # Delete output artifacts rm -rf "$SCRIPT_DIR/output/"* + rm -rf "/tmp/enclave.*.pid" # Delete enclave artifacts for name in cn1 cn2 cn3 cn4 ag; do diff --git a/tests/basic_integration/lib/cn2/config.yaml b/tests/basic_integration/lib/cn2/config.yaml index 20d5183f..5cbce324 100644 --- a/tests/basic_integration/lib/cn2/config.yaml +++ b/tests/basic_integration/lib/cn2/config.yaml @@ -1,4 +1,5 @@ config_dir: . +data_dir: . address: "0xbDA5747bFD65F08deb54cb465eB87D40e51B197E" chains: - name: "hardhat" diff --git a/tests/basic_integration/persist.sh b/tests/basic_integration/persist.sh new file mode 100755 index 00000000..42a2cb1b --- /dev/null +++ b/tests/basic_integration/persist.sh @@ -0,0 +1,121 @@ +#!/usr/bin/env bash + +set -eu # Exit immediately if a command exits with a non-zero status + +# Get the directory of the currently executing script +THIS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +# Source the file from the same directory +source "$THIS_DIR/fns.sh" + +heading "Start the EVM node" + +launch_evm + +until curl -f -s "http://localhost:8545" > /dev/null; do + sleep 1 +done + +# Set the password for all ciphernodes +set_password cn1 "$CIPHERNODE_SECRET" +set_password cn2 "$CIPHERNODE_SECRET" +set_password cn3 "$CIPHERNODE_SECRET" +set_password cn4 "$CIPHERNODE_SECRET" +set_password ag "$CIPHERNODE_SECRET" +set_private_key ag "$PRIVATE_KEY" + +# Launch 4 ciphernodes +launch_ciphernode cn1 +launch_ciphernode cn2 +launch_ciphernode cn3 +launch_ciphernode cn4 +launch_aggregator ag + +waiton-files "$ROOT_DIR/packages/ciphernode/target/debug/enclave" "$ROOT_DIR/packages/ciphernode/target/debug/fake_encrypt" + +heading "Add ciphernode $CIPHERNODE_ADDRESS_1" +yarn ciphernode:add --ciphernode-address $CIPHERNODE_ADDRESS_1 --network localhost + +heading "Add ciphernode $CIPHERNODE_ADDRESS_2" +yarn ciphernode:add --ciphernode-address $CIPHERNODE_ADDRESS_2 --network localhost + +heading "Add ciphernode $CIPHERNODE_ADDRESS_3" +yarn ciphernode:add --ciphernode-address $CIPHERNODE_ADDRESS_3 --network localhost + +heading "Add ciphernode $CIPHERNODE_ADDRESS_4" +yarn ciphernode:add --ciphernode-address $CIPHERNODE_ADDRESS_4 --network localhost + +heading "Request Committee" + +ENCODED_PARAMS=0x$($SCRIPT_DIR/lib/pack_e3_params.sh --moduli 0x3FFFFFFF000001 --degree 2048 --plaintext-modulus 1032193) + +yarn committee:new --network localhost --duration 4 --e3-params "$ENCODED_PARAMS" + +waiton "$SCRIPT_DIR/output/pubkey.bin" +PUBLIC_KEY=$(xxd -p -c 10000000 "$SCRIPT_DIR/output/pubkey.bin") + + +# kill aggregator +kill_proc ag + +sleep 2 + +# relaunch the aggregator +launch_aggregator ag + +sleep 2 + +heading "Mock encrypted plaintext" +$SCRIPT_DIR/lib/fake_encrypt.sh --input "$SCRIPT_DIR/output/pubkey.bin" --output "$SCRIPT_DIR/output/output.bin" --plaintext $PLAINTEXT + +heading "Mock activate e3-id" +yarn e3:activate --e3-id 0 --public-key "0x$PUBLIC_KEY" --network localhost + +heading "Mock publish input e3-id" +yarn e3:publishInput --network localhost --e3-id 0 --data 0x12345678 + +sleep 4 # wait for input deadline to pass + +waiton "$SCRIPT_DIR/output/output.bin" + +heading "Publish ciphertext to EVM" +yarn e3:publishCiphertext --e3-id 0 --network localhost --data-file "$SCRIPT_DIR/output/output.bin" --proof 0x12345678 + +waiton "$SCRIPT_DIR/output/plaintext.txt" + +ACTUAL=$(cat $SCRIPT_DIR/output/plaintext.txt) + + +# Assume plaintext is shorter + +if [[ "$ACTUAL" != "$PLAINTEXT"* ]]; then + echo "Invalid plaintext decrypted: actual='$ACTUAL' expected='$PLAINTEXT'" + echo "Test FAILED" + exit 1 +fi + +heading "Test PASSED !" +echo -e "\033[32m + ██████ + ██████ + ██████ + ██████ + ██████ + ██████ + ██ ██████ + ████ ██████ + ██████ ██████ + ██████████ + ████████ + ██████ + ████ + ██ + \033[0m" + +pkill -15 -f "target/debug/enclave" || true +pkill -15 -f "target/debug/aggregator" || true + +sleep 4 + +cleanup 0 + diff --git a/tests/basic_integration/test.sh b/tests/basic_integration/test.sh index 1fb9e3f5..5377d3b7 100755 --- a/tests/basic_integration/test.sh +++ b/tests/basic_integration/test.sh @@ -2,229 +2,12 @@ set -eu # Exit immediately if a command exits with a non-zero status -# Get the script's location -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -ROOT_DIR="$(cd "$SCRIPT_DIR/../.." && pwd)" -PLAINTEXT="1234,567890" +THIS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" -if [[ "$ROOT_DIR" != "$(pwd)" ]]; then - echo "This script must be run from the root" - exit 1 +if [ $# -eq 0 ]; then + "$THIS_DIR/persist.sh" + "$THIS_DIR/base.sh" +else + "$THIS_DIR/$1.sh" fi -export RUST_LOG=info - -# Environment variables -RPC_URL="ws://localhost:8545" - -PRIVATE_KEY="0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" -CIPHERNODE_SECRET="We are the music makers and we are the dreamers of the dreams." - -# These contracts are based on the deterministic order of hardhat deploy -# We _may_ wish to get these off the hardhat environment somehow? -ENCLAVE_CONTRACT="0x9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0" -REGISTRY_CONTRACT="0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9" -REGISTRY_FILTER_CONTRACT="0xDc64a140Aa3E981100a9becA4E685f962f0cF6C9" -INPUT_VALIDATOR_CONTRACT="0x8A791620dd6260079BF849Dc5567aDC3F2FdC318" - -# These are random addresses for now -CIPHERNODE_ADDRESS_1="0x2546BcD3c84621e976D8185a91A922aE77ECEc30" -CIPHERNODE_ADDRESS_2="0xbDA5747bFD65F08deb54cb465eB87D40e51B197E" -CIPHERNODE_ADDRESS_3="0xdD2FD4581271e230360230F9337D5c0430Bf44C0" -CIPHERNODE_ADDRESS_4="0x8626f6940E2eb28930eFb4CeF49B2d1F2C9C1199" - - -# Function to clean up background processes -cleanup() { - echo "Cleaning up processes..." - kill $(jobs -p) 2>/dev/null - exit ${1:-1} -} - -heading() { - echo "" - echo "" - echo "--------------------------------------------------------------" - echo " $1 " - echo "--------------------------------------------------------------" - echo "" -} - -waiton() { - local file_path="$1" - until [ -f "$file_path" ]; do - sleep 1 - done -} - -waiton-files() { - local timeout=600 # 10 minutes timeout - local start_time=$(date +%s) - while true; do - all_exist=true - for file in "$@"; do - if [ ! -f "$file" ]; then - all_exist=false - break - fi - done - if $all_exist; then - break - fi - if [ $(($(date +%s) - start_time)) -ge $timeout ]; then - echo "Timeout waiting for files: $@" >&2 - return 1 - fi - sleep 1 - done -} - -set_password() { - local name="$1" - local password="$2" - yarn enclave password create \ - --config "$SCRIPT_DIR/lib/$name/config.yaml" \ - --password "$password" -} - -launch_ciphernode() { - local name="$1" - heading "Launch ciphernode $name" - yarn enclave start \ - --config "$SCRIPT_DIR/lib/$name/config.yaml" & -} - -set_private_key() { - local name="$1" - local private_key="$2" - - yarn enclave wallet set \ - --config "$SCRIPT_DIR/lib/$name/config.yaml" \ - --private-key "$private_key" -} - -launch_aggregator() { - local name="$1" - heading "Launch aggregator $name" - - yarn enclave aggregator start \ - --config "$SCRIPT_DIR/lib/$name/config.yaml" \ - --pubkey-write-path "$SCRIPT_DIR/output/pubkey.bin" \ - --plaintext-write-path "$SCRIPT_DIR/output/plaintext.txt" & -} - - - -pkill -9 -f "target/debug/enclave" || true -pkill -9 -f "hardhat node" || true - -# Set up trap to catch errors and interrupts -trap 'cleanup $?' ERR INT TERM - - -$SCRIPT_DIR/lib/clean_folders.sh "$SCRIPT_DIR" -$SCRIPT_DIR/lib/prebuild.sh - -heading "Start the EVM node" - -yarn evm:node & - -until curl -f -s "http://localhost:8545" > /dev/null; do - sleep 1 -done - -# Set the password for all ciphernodes -set_password cn1 "$CIPHERNODE_SECRET" -set_password cn2 "$CIPHERNODE_SECRET" -set_password cn3 "$CIPHERNODE_SECRET" -set_password cn4 "$CIPHERNODE_SECRET" -set_password ag "$CIPHERNODE_SECRET" - -set_private_key ag "$PRIVATE_KEY" - -# Launch 4 ciphernodes -launch_ciphernode cn1 -launch_ciphernode cn2 -launch_ciphernode cn3 -launch_ciphernode cn4 -launch_aggregator ag - -sleep 1 - -waiton-files "$ROOT_DIR/packages/ciphernode/target/debug/enclave" "$ROOT_DIR/packages/ciphernode/target/debug/fake_encrypt" - -heading "Add ciphernode $CIPHERNODE_ADDRESS_1" -yarn ciphernode:add --ciphernode-address $CIPHERNODE_ADDRESS_1 --network localhost - -heading "Add ciphernode $CIPHERNODE_ADDRESS_2" -yarn ciphernode:add --ciphernode-address $CIPHERNODE_ADDRESS_2 --network localhost - -heading "Add ciphernode $CIPHERNODE_ADDRESS_3" -yarn ciphernode:add --ciphernode-address $CIPHERNODE_ADDRESS_3 --network localhost - -heading "Add ciphernode $CIPHERNODE_ADDRESS_4" -yarn ciphernode:add --ciphernode-address $CIPHERNODE_ADDRESS_4 --network localhost - -heading "Request Committee" - -ENCODED_PARAMS=0x$($SCRIPT_DIR/lib/pack_e3_params.sh --moduli 0x3FFFFFFF000001 --degree 2048 --plaintext-modulus 1032193) - -yarn committee:new --network localhost --duration 4 --e3-params "$ENCODED_PARAMS" - -waiton "$SCRIPT_DIR/output/pubkey.bin" -PUBLIC_KEY=$(xxd -p -c 10000000 "$SCRIPT_DIR/output/pubkey.bin") - -heading "Mock encrypted plaintext" -$SCRIPT_DIR/lib/fake_encrypt.sh --input "$SCRIPT_DIR/output/pubkey.bin" --output "$SCRIPT_DIR/output/output.bin" --plaintext $PLAINTEXT - -heading "Mock activate e3-id" -yarn e3:activate --e3-id 0 --public-key "0x$PUBLIC_KEY" --network localhost - -heading "Mock publish input e3-id" -yarn e3:publishInput --network localhost --e3-id 0 --data 0x12345678 - -sleep 4 # wait for input deadline to pass - -waiton "$SCRIPT_DIR/output/output.bin" - -heading "Publish ciphertext to EVM" -yarn e3:publishCiphertext --e3-id 0 --network localhost --data-file "$SCRIPT_DIR/output/output.bin" --proof 0x12345678 - -waiton "$SCRIPT_DIR/output/plaintext.txt" - -ACTUAL=$(cat $SCRIPT_DIR/output/plaintext.txt) - - -# Assume plaintext is shorter - -if [[ "$ACTUAL" != "$PLAINTEXT"* ]]; then - echo "Invalid plaintext decrypted: actual='$ACTUAL' expected='$PLAINTEXT'" - echo "Test FAILED" - exit 1 -fi - -heading "Test PASSED !" -echo -e "\033[32m - ██████ - ██████ - ██████ - ██████ - ██████ - ██████ - ██ ██████ - ████ ██████ - ██████ ██████ - ██████████ - ████████ - ██████ - ████ - ██ - \033[0m" - -pkill -15 -f "target/debug/enclave" || true -pkill -15 -f "target/debug/aggregator" || true - -sleep 4 - -cleanup 0 -