diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 48d939cd2..2cd85210b 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -66,6 +66,7 @@ jobs: --all-features \ --no-fail-fast \ --failure-output final \ + --exclude informalsystems-malachitebft-test \ --exclude informalsystems-malachitebft-starknet-test \ --exclude informalsystems-malachitebft-starknet-test-mbt \ --exclude informalsystems-malachitebft-discovery-test @@ -106,9 +107,45 @@ jobs: run: | cargo maelstrom \ --slots 16 \ - --include 'package.match(informalsystems-malachitebft-starknet-test) || package.match(informalsystems-malachitebft-discovery-test)' \ + --include 'package.equals(informalsystems-malachitebft-starknet-test) || package.equals(informalsystems-malachitebft-discovery-test)' \ --exclude 'package.match(informalsystems-malachitebft-starknet-test-mbt)' + integration_test_app: + name: Integration Tests (test app) + runs-on: ubuntu-latest + defaults: + run: + working-directory: code + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Install Protoc + uses: arduino/setup-protoc@v3 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + - name: Setup Node + uses: actions/setup-node@v3 + with: + node-version: "18" + - name: Install Quint + run: npm install -g @informalsystems/quint + - name: Setup Rust toolchain + uses: actions-rust-lang/setup-rust-toolchain@v1 + with: + cache-workspaces: "code" + - name: Install cargo-maelstrom + uses: taiki-e/install-action@v2 + with: + tool: cargo-maelstrom + - name: Disable apparmor container restrictions + run: sudo sysctl -w kernel.apparmor_restrict_unprivileged_userns=0 + - name: Run integration tests + continue-on-error: true + run: | + cargo maelstrom \ + --slots 8 \ + --include 'package.equals(informalsystems-malachitebft-test)' + no_std: name: no_std compatibility needs: changes diff --git a/code/Cargo.lock b/code/Cargo.lock index 3a1ae929c..36382c729 100644 --- a/code/Cargo.lock +++ b/code/Cargo.lock @@ -2118,8 +2118,10 @@ dependencies = [ "informalsystems-malachitebft-peer", "informalsystems-malachitebft-sync", "libp2p-identity", + "ractor", "rand 0.8.5", "serde", + "tokio", "tracing", ] @@ -2494,6 +2496,7 @@ dependencies = [ "async-trait", "base64 0.22.1", "bytes", + "bytesize", "ed25519-consensus", "eyre", "hex", @@ -2506,6 +2509,8 @@ dependencies = [ "informalsystems-malachitebft-proto", "informalsystems-malachitebft-signing-ed25519", "informalsystems-malachitebft-sync", + "informalsystems-malachitebft-test-app", + "informalsystems-malachitebft-test-framework", "prost", "prost-build", "prost-types", @@ -2514,6 +2519,32 @@ dependencies = [ "serde_json", "sha3", "signature", + "tokio", + "tracing", +] + +[[package]] +name = "informalsystems-malachitebft-test-app" +version = "0.0.1" +dependencies = [ + "async-trait", + "bytes", + "color-eyre", + "derive-where", + "eyre", + "informalsystems-malachitebft-app-channel", + "informalsystems-malachitebft-proto", + "informalsystems-malachitebft-test", + "informalsystems-malachitebft-test-cli", + "informalsystems-malachitebft-test-framework", + "prost", + "rand 0.8.5", + "redb", + "serde_json", + "sha3", + "thiserror 2.0.11", + "tokio", + "tracing", ] [[package]] @@ -2540,6 +2571,29 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "informalsystems-malachitebft-test-framework" +version = "0.0.1" +dependencies = [ + "axum", + "bytesize", + "eyre", + "informalsystems-malachitebft-app-channel", + "informalsystems-malachitebft-config", + "informalsystems-malachitebft-core-consensus", + "informalsystems-malachitebft-core-types", + "informalsystems-malachitebft-engine", + "informalsystems-malachitebft-metrics", + "informalsystems-malachitebft-test", + "informalsystems-malachitebft-test-app", + "ractor", + "rand 0.8.5", + "tempfile", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "informalsystems-malachitebft-test-mbt" version = "0.0.1" diff --git a/code/Cargo.toml b/code/Cargo.toml index a87a1482e..4135fd9b5 100644 --- a/code/Cargo.toml +++ b/code/Cargo.toml @@ -24,9 +24,11 @@ members = [ # Test "crates/test", + "crates/test/app", "crates/test/cli", "crates/test/mbt", "crates/test/mempool", + "crates/test/framework", "crates/network/test", # Starknet @@ -64,7 +66,6 @@ unexpected_cfgs = { level = "warn", check-cfg = ['cfg(coverage_nightly)'] } malachitebft-engine = { version = "0.0.1", package = "informalsystems-malachitebft-engine", path = "crates/engine" } malachitebft-app = { version = "0.0.1", package = "informalsystems-malachitebft-app", path = "crates/app" } malachitebft-app-channel = { version = "0.0.1", package = "informalsystems-malachitebft-app-channel", path = "crates/app-channel" } -malachitebft-test-cli = { version = "0.0.1", package = "informalsystems-malachitebft-test-cli", path = "crates/test/cli" } malachitebft-codec = { version = "0.0.1", package = "informalsystems-malachitebft-codec", path = "crates/codec" } malachitebft-config = { version = "0.0.1", package = "informalsystems-malachitebft-config", path = "crates/config" } malachitebft-core-consensus = { version = "0.0.1", package = "informalsystems-malachitebft-core-consensus", path = "crates/core-consensus" } @@ -83,8 +84,11 @@ malachitebft-wal = { version = "0.0.1", package = "informalsystem # Test malachitebft-test = { version = "0.0.1", package = "informalsystems-malachitebft-test", path = "crates/test" } +malachitebft-test-app = { version = "0.0.1", package = "informalsystems-malachitebft-test-app", path = "crates/test/app" } +malachitebft-test-cli = { version = "0.0.1", package = "informalsystems-malachitebft-test-cli", path = "crates/test/cli" } malachitebft-test-mbt = { version = "0.0.1", package = "informalsystems-malachitebft-test-mbt", path = "crates/test/mbt" } malachitebft-test-mempool = { version = "0.0.1", package = "informalsystems-malachitebft-test-mempool", path = "crates/test/mempool" } +malachitebft-test-framework = { version = "0.0.1", package = "informalsystems-malachitebft-test-framework", path = "crates/test/framework" } malachitebft-discovery-test = { version = "0.0.1", package = "informalsystems-malachitebft-discovery-test", path = "crates/network/test" } # Starknet diff --git a/code/crates/app-channel/src/lib.rs b/code/crates/app-channel/src/lib.rs index 37c708f1c..f382394c6 100644 --- a/code/crates/app-channel/src/lib.rs +++ b/code/crates/app-channel/src/lib.rs @@ -19,4 +19,4 @@ mod msgs; pub use msgs::{AppMsg, Channels, ConsensusMsg, NetworkMsg, Reply}; mod run; -pub use run::run; +pub use run::{start_engine, EngineHandle}; diff --git a/code/crates/app-channel/src/msgs.rs b/code/crates/app-channel/src/msgs.rs index 793a49b80..b5892ec0e 100644 --- a/code/crates/app-channel/src/msgs.rs +++ b/code/crates/app-channel/src/msgs.rs @@ -2,12 +2,13 @@ use std::time::Duration; use bytes::Bytes; use derive_where::derive_where; -use malachitebft_app::consensus::VoteExtensionError; use tokio::sync::mpsc; use tokio::sync::oneshot; +use malachitebft_app::consensus::VoteExtensionError; use malachitebft_engine::consensus::Msg as ConsensusActorMsg; use malachitebft_engine::network::Msg as NetworkActorMsg; +use malachitebft_engine::util::events::TxEvent; use crate::app::types::core::{CommitCertificate, Context, Round, ValueId, VoteExtensions}; use crate::app::types::streaming::StreamMessage; @@ -22,6 +23,8 @@ pub struct Channels { pub consensus: mpsc::Receiver>, /// Channel for sending messages to the networking layer pub network: mpsc::Sender>, + /// Receiver of events, call `subscribe` to receive them + pub events: TxEvent, } /// Messages sent from consensus to the application. diff --git a/code/crates/app-channel/src/run.rs b/code/crates/app-channel/src/run.rs index f6bd2e3f2..a19e61943 100644 --- a/code/crates/app-channel/src/run.rs +++ b/code/crates/app-channel/src/run.rs @@ -1,30 +1,37 @@ //! Run Malachite consensus with the given configuration and context. //! Provides the application with a channel for receiving messages from consensus. -use std::path::PathBuf; - use eyre::Result; +use malachitebft_app::spawn::{ + spawn_consensus_actor, spawn_node_actor, spawn_sync_actor, spawn_wal_actor, +}; +use malachitebft_engine::node::NodeRef; +use malachitebft_engine::util::events::TxEvent; +use tokio::task::JoinHandle; + +use crate::app; use crate::app::types::codec::{ConsensusCodec, SyncCodec, WalCodec}; use crate::app::types::config::Config as NodeConfig; use crate::app::types::core::Context; use crate::app::types::metrics::{Metrics, SharedRegistry}; use crate::spawn::{spawn_host_actor, spawn_network_actor}; -use crate::{app, Channels}; +use crate::Channels; -use malachitebft_app::{spawn_consensus_actor, spawn_sync_actor, spawn_wal_actor}; -use malachitebft_engine::util::events::TxEvent; +pub struct EngineHandle { + pub actor: NodeRef, + pub handle: JoinHandle<()>, +} #[tracing::instrument("node", skip_all, fields(moniker = %cfg.moniker))] -pub async fn run( +pub async fn start_engine( ctx: Ctx, codec: Codec, node: Node, cfg: NodeConfig, - private_key_file: PathBuf, start_height: Option, initial_validator_set: Ctx::ValidatorSet, -) -> Result> +) -> Result<(Channels, EngineHandle)> where Ctx: Context, Node: app::Node, @@ -37,7 +44,7 @@ where let registry = SharedRegistry::global().with_moniker(cfg.moniker.as_str()); let metrics = Metrics::register(®istry); - let private_key_file = node.load_private_key_file(private_key_file)?; + let private_key_file = node.load_private_key_file()?; let private_key = node.load_private_key(private_key_file); let public_key = node.get_public_key(&private_key); let address = node.get_address(&public_key); @@ -45,13 +52,13 @@ where let signing_provider = node.get_signing_provider(private_key); // Spawn consensus gossip - let (network, network_tx) = + let (network, tx_network) = spawn_network_actor(&cfg, keypair, ®istry, codec.clone()).await?; let wal = spawn_wal_actor(&ctx, codec, &node.get_home_dir(), ®istry).await?; // Spawn the host actor - let (connector, consensus_rx) = spawn_host_actor(metrics.clone()).await?; + let (connector, rx_consensus) = spawn_host_actor(metrics.clone()).await?; let sync = spawn_sync_actor( ctx.clone(), @@ -62,25 +69,37 @@ where ) .await?; + let tx_event = TxEvent::new(); + // Spawn consensus - let _consensus = spawn_consensus_actor( + let consensus = spawn_consensus_actor( start_height, initial_validator_set, address, - ctx, + ctx.clone(), cfg, Box::new(signing_provider), - network, - connector, - wal, + network.clone(), + connector.clone(), + wal.clone(), sync.clone(), metrics, - TxEvent::new(), + tx_event.clone(), ) .await?; - Ok(Channels { - consensus: consensus_rx, - network: network_tx, - }) + let (node, handle) = spawn_node_actor(ctx, network, consensus, wal, sync, connector).await?; + + let channels = Channels { + consensus: rx_consensus, + network: tx_network, + events: tx_event, + }; + + let handle = EngineHandle { + actor: node, + handle, + }; + + Ok((channels, handle)) } diff --git a/code/crates/app-channel/src/spawn.rs b/code/crates/app-channel/src/spawn.rs index b93521a3f..39889f47d 100644 --- a/code/crates/app-channel/src/spawn.rs +++ b/code/crates/app-channel/src/spawn.rs @@ -3,6 +3,7 @@ use eyre::Result; use tokio::sync::mpsc; +use malachitebft_app as app; use malachitebft_app::types::metrics::SharedRegistry; use malachitebft_app::types::Keypair; use malachitebft_config::Config as NodeConfig; @@ -40,7 +41,7 @@ where { let (tx, mut rx) = mpsc::channel::>(1); - let actor_ref = malachitebft_app::spawn_network_actor(cfg, keypair, registry, codec).await?; + let actor_ref = app::spawn::spawn_network_actor(cfg, keypair, registry, codec).await?; tokio::spawn({ let actor_ref = actor_ref.clone(); diff --git a/code/crates/app/Cargo.toml b/code/crates/app/Cargo.toml index 12f005e71..baa545002 100644 --- a/code/crates/app/Cargo.toml +++ b/code/crates/app/Cargo.toml @@ -13,13 +13,13 @@ readme = "../../../README.md" all-features = true [dependencies] -malachitebft-engine.workspace = true malachitebft-codec.workspace = true -malachitebft-core-types.workspace = true malachitebft-config.workspace = true malachitebft-core-consensus.workspace = true -malachitebft-network.workspace = true +malachitebft-core-types.workspace = true +malachitebft-engine.workspace = true malachitebft-metrics.workspace = true +malachitebft-network.workspace = true malachitebft-peer.workspace = true malachitebft-sync.workspace = true @@ -27,8 +27,10 @@ async-trait = { workspace = true } derive-where = { workspace = true } eyre = { workspace = true } libp2p-identity = { workspace = true } +ractor = { workspace = true } rand = { workspace = true } serde = { workspace = true } +tokio = { workspace = true } tracing = { workspace = true } [lints] diff --git a/code/crates/app/src/lib.rs b/code/crates/app/src/lib.rs index eafae21ca..ae907d6ac 100644 --- a/code/crates/app/src/lib.rs +++ b/code/crates/app/src/lib.rs @@ -12,10 +12,12 @@ mod node; pub use node::Node; pub mod part_store; +pub mod spawn; pub mod types; -mod spawn; -pub use spawn::{spawn_consensus_actor, spawn_network_actor, spawn_sync_actor, spawn_wal_actor}; +pub mod events { + pub use malachitebft_engine::util::events::TxEvent; +} pub mod streaming { pub use malachitebft_engine::util::streaming::*; diff --git a/code/crates/app/src/node.rs b/code/crates/app/src/node.rs index 3b13981bc..38bff3031 100644 --- a/code/crates/app/src/node.rs +++ b/code/crates/app/src/node.rs @@ -31,7 +31,7 @@ pub trait Node { fn load_private_key(&self, file: Self::PrivateKeyFile) -> PrivateKey; - fn load_private_key_file(&self, path: impl AsRef) -> io::Result; + fn load_private_key_file(&self) -> io::Result; fn make_private_key_file(&self, private_key: PrivateKey) -> Self::PrivateKeyFile; diff --git a/code/crates/app/src/spawn.rs b/code/crates/app/src/spawn.rs index 875fb4394..d18c33306 100644 --- a/code/crates/app/src/spawn.rs +++ b/code/crates/app/src/spawn.rs @@ -4,23 +4,50 @@ use std::path::Path; use std::time::Duration; use eyre::Result; -use malachitebft_core_types::SigningProvider; +use tokio::task::JoinHandle; use tracing::Span; use malachitebft_engine::consensus::{Consensus, ConsensusCodec, ConsensusParams, ConsensusRef}; use malachitebft_engine::host::HostRef; use malachitebft_engine::network::{Network, NetworkRef}; +use malachitebft_engine::node::{Node, NodeRef}; use malachitebft_engine::sync::{Params as SyncParams, Sync, SyncCodec, SyncRef}; use malachitebft_engine::util::events::TxEvent; use malachitebft_engine::wal::{Wal, WalCodec, WalRef}; use malachitebft_network::{Config as NetworkConfig, DiscoveryConfig, GossipSubConfig, Keypair}; use crate::types::config::{Config as NodeConfig, PubSubProtocol, SyncConfig, TransportProtocol}; -use crate::types::core::Context; +use crate::types::core::{Context, SigningProvider}; use crate::types::metrics::{Metrics, SharedRegistry}; use crate::types::sync; use crate::types::ValuePayload; +pub async fn spawn_node_actor( + ctx: Ctx, + network: NetworkRef, + consensus: ConsensusRef, + wal: WalRef, + sync: Option>, + host: HostRef, +) -> Result<(NodeRef, JoinHandle<()>)> +where + Ctx: Context, +{ + // Spawn the node actor + let node = Node::new( + ctx, + network, + consensus, + wal, + sync, + host, + tracing::Span::current(), + ); + + let (actor_ref, handle) = node.spawn().await?; + Ok((actor_ref, handle)) +} + pub async fn spawn_network_actor( cfg: &NodeConfig, keypair: Keypair, diff --git a/code/crates/config/src/lib.rs b/code/crates/config/src/lib.rs index 42d126d13..447b5e9c9 100644 --- a/code/crates/config/src/lib.rs +++ b/code/crates/config/src/lib.rs @@ -403,6 +403,22 @@ pub enum ValuePayload { ProposalAndParts, } +impl ValuePayload { + pub fn include_parts(&self) -> bool { + match self { + Self::ProposalOnly => false, + Self::PartsOnly | Self::ProposalAndParts => true, + } + } + + pub fn include_proposal(&self) -> bool { + match self { + Self::PartsOnly => false, + Self::ProposalOnly | Self::ProposalAndParts => true, + } + } +} + /// Timeouts #[derive(Copy, Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct TimeoutConfig { diff --git a/code/crates/core-consensus/src/effect.rs b/code/crates/core-consensus/src/effect.rs index 70ce78e80..f7cd6284b 100644 --- a/code/crates/core-consensus/src/effect.rs +++ b/code/crates/core-consensus/src/effect.rs @@ -100,11 +100,11 @@ where /// Requests the application to re-stream a proposal that it has already seen. /// - /// The application MUST re-publish again to its pwers all + /// The application MUST re-publish again to its peers all /// the proposal parts pertaining to that value. /// /// Resume with: [`resume::Continue`] - RestreamValue( + RestreamProposal( /// Height of the value Ctx::Height, /// Round of the value diff --git a/code/crates/core-consensus/src/handle/driver.rs b/code/crates/core-consensus/src/handle/driver.rs index b15d14092..6cc6339f2 100644 --- a/code/crates/core-consensus/src/handle/driver.rs +++ b/code/crates/core-consensus/src/handle/driver.rs @@ -210,7 +210,7 @@ where if signed_proposal.pol_round().is_defined() { perform!( co, - Effect::RestreamValue( + Effect::RestreamProposal( signed_proposal.height(), signed_proposal.round(), signed_proposal.pol_round(), diff --git a/code/crates/engine/src/consensus.rs b/code/crates/engine/src/consensus.rs index 74087b6cf..62301aee7 100644 --- a/code/crates/engine/src/consensus.rs +++ b/code/crates/engine/src/consensus.rs @@ -471,7 +471,7 @@ where NetworkEvent::ProposalPart(from, part) => { if state.consensus.params.value_payload.proposal_only() { - error!(%from, "Properly configured peer should never send block part messages in Proposal mode"); + error!(%from, "Properly configured peer should never send proposal part messages in Proposal mode"); return Ok(()); } @@ -961,7 +961,7 @@ where Ok(r.resume_with(validator_set)) } - Effect::RestreamValue(height, round, valid_round, address, value_id, r) => { + Effect::RestreamProposal(height, round, valid_round, address, value_id, r) => { self.host .cast(HostMsg::RestreamValue { height, diff --git a/code/crates/engine/src/host.rs b/code/crates/engine/src/host.rs index c45e008b8..432357751 100644 --- a/code/crates/engine/src/host.rs +++ b/code/crates/engine/src/host.rs @@ -29,7 +29,7 @@ pub enum HostMsg { proposer: Ctx::Address, }, - /// Request to build a local block/value from Driver + /// Request to build a local value to propose GetValue { height: Ctx::Height, round: Round, diff --git a/code/crates/engine/src/node.rs b/code/crates/engine/src/node.rs index c8d331873..db8f53d6a 100644 --- a/code/crates/engine/src/node.rs +++ b/code/crates/engine/src/node.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, SupervisionEvent}; +use ractor::{Actor, ActorProcessingErr, ActorRef, SupervisionEvent}; use tokio::task::JoinHandle; use tracing::{error, info, warn}; @@ -20,9 +20,7 @@ pub struct Node { consensus: ConsensusRef, wal: WalRef, sync: Option>, - mempool: ActorCell, host: HostRef, - start_height: Ctx::Height, span: tracing::Span, } @@ -37,9 +35,7 @@ where consensus: ConsensusRef, wal: WalRef, sync: Option>, - mempool: ActorCell, host: HostRef, - start_height: Ctx::Height, span: tracing::Span, ) -> Self { Self { @@ -48,9 +44,7 @@ where consensus, wal, sync, - mempool, host, - start_height, span, } } @@ -77,7 +71,6 @@ where // Set ourselves as the supervisor of the other actors self.network.link(myself.get_cell()); self.consensus.link(myself.get_cell()); - self.mempool.link(myself.get_cell()); self.host.link(myself.get_cell()); self.wal.link(myself.get_cell()); diff --git a/code/crates/engine/src/sync.rs b/code/crates/engine/src/sync.rs index b9e78357f..64a237b2a 100644 --- a/code/crates/engine/src/sync.rs +++ b/code/crates/engine/src/sync.rs @@ -265,7 +265,7 @@ where .cast(NetworkMsg::OutgoingResponse(request_id, response))?; } - Effect::GetValue(request_id, height) => { + Effect::GetDecidedValue(request_id, height) => { self.host.call_and_forward( |reply_to| HostMsg::GetDecidedValue { height, reply_to }, myself, diff --git a/code/crates/engine/src/util/events.rs b/code/crates/engine/src/util/events.rs index a7ecb4430..d716d1afc 100644 --- a/code/crates/engine/src/util/events.rs +++ b/code/crates/engine/src/util/events.rs @@ -10,6 +10,7 @@ use malachitebft_core_types::{CommitCertificate, Context, Round, Timeout, ValueO pub type RxEvent = broadcast::Receiver>; +#[derive_where(Clone)] pub struct TxEvent { tx: broadcast::Sender>, } diff --git a/code/crates/engine/src/wal.rs b/code/crates/engine/src/wal.rs index e55d45b94..3b01fc3dd 100644 --- a/code/crates/engine/src/wal.rs +++ b/code/crates/engine/src/wal.rs @@ -185,6 +185,11 @@ where type Arguments = Args; type State = State; + #[tracing::instrument( + name = "wal.pre_start", + parent = &self.span, + skip_all, + )] async fn pre_start( &self, _myself: WalRef, @@ -196,7 +201,7 @@ where let (tx, rx) = mpsc::channel(100); // Spawn a system thread to perform blocking WAL operations. - let handle = self::thread::spawn(tracing::Span::current(), log, args.codec, rx); + let handle = self::thread::spawn(self.span.clone(), log, args.codec, rx); Ok(State { height: Ctx::Height::default(), @@ -224,6 +229,12 @@ where Ok(()) } + #[tracing::instrument( + name = "wal.post_stop", + parent = &self.span, + skip_all, + fields(height = %state.height), + )] async fn post_stop( &self, _: WalRef, diff --git a/code/crates/starknet/host/src/actor.rs b/code/crates/starknet/host/src/actor.rs index b7f7e7dbe..09355ce45 100644 --- a/code/crates/starknet/host/src/actor.rs +++ b/code/crates/starknet/host/src/actor.rs @@ -162,7 +162,7 @@ impl Host { address, value_id, } => { - on_restream_value( + on_restream_proposal( state, &self.network, height, @@ -211,6 +211,7 @@ impl Host { value_bytes, reply_to, } => on_process_synced_value(value_bytes, height, round, validator_address, reply_to), + HostMsg::PeerJoined { peer_id } => { debug!(%peer_id, "Peer joined the network"); Ok(()) @@ -430,7 +431,7 @@ async fn find_previously_built_value( Ok(proposed_value) } -async fn on_restream_value( +async fn on_restream_proposal( state: &mut HostState, network: &NetworkRef, height: Height, diff --git a/code/crates/starknet/host/src/node.rs b/code/crates/starknet/host/src/node.rs index e6851c205..0ce06bea2 100644 --- a/code/crates/starknet/host/src/node.rs +++ b/code/crates/starknet/host/src/node.rs @@ -88,11 +88,8 @@ impl Node for StarknetNode { file.private_key } - fn load_private_key_file( - &self, - path: impl AsRef, - ) -> std::io::Result { - let private_key = std::fs::read_to_string(path)?; + fn load_private_key_file(&self) -> std::io::Result { + let private_key = std::fs::read_to_string(&self.private_key_file)?; serde_json::from_str(&private_key).map_err(|e| e.into()) } @@ -123,10 +120,8 @@ impl Node for StarknetNode { let span = tracing::error_span!("node", moniker = %self.config.moniker); let _enter = span.enter(); - let priv_key_file = self.load_private_key_file(self.private_key_file.clone())?; - + let priv_key_file = self.load_private_key_file()?; let private_key = self.load_private_key(priv_key_file); - let genesis = self.load_genesis(self.genesis_file.clone())?; let start_height = self.start_height.map(|height| Height::new(height, 1)); diff --git a/code/crates/starknet/host/src/spawn.rs b/code/crates/starknet/host/src/spawn.rs index 2e9a777f8..1aeb99056 100644 --- a/code/crates/starknet/host/src/spawn.rs +++ b/code/crates/starknet/host/src/spawn.rs @@ -101,17 +101,7 @@ pub async fn spawn_node_actor( .await; // Spawn the node actor - let node = Node::new( - ctx, - network, - consensus, - wal, - sync, - mempool.get_cell(), - host, - start_height, - span, - ); + let node = Node::new(ctx, network, consensus, wal, sync, host, span); let (actor_ref, handle) = node.spawn().await.unwrap(); diff --git a/code/crates/starknet/test/src/lib.rs b/code/crates/starknet/test/src/lib.rs index 39df3606a..6de126553 100644 --- a/code/crates/starknet/test/src/lib.rs +++ b/code/crates/starknet/test/src/lib.rs @@ -9,6 +9,7 @@ use std::sync::Arc; use eyre::bail; use rand::rngs::StdRng; use rand::SeedableRng; +use tokio::sync::Mutex; use tokio::task::JoinSet; use tokio::time::{sleep, Duration}; use tracing::{debug, error, error_span, info, Instrument, Span}; @@ -404,12 +405,10 @@ where { let validator_set = self.validator_set.clone(); - let home_dir = tempfile::TempDir::with_prefix(format!( - "informalsystems-malachitebft-starknet-test-{}", - self.id - )) - .unwrap() - .into_path(); + let home_dir = + tempfile::TempDir::with_prefix(format!("malachitebft-starknet-test-{}", self.id)) + .unwrap() + .into_path(); set.spawn( async move { @@ -480,7 +479,7 @@ async fn run_node( let mut rx_event = tx_event.subscribe(); let rx_event_bg = tx_event.subscribe(); - let (mut actor_ref, mut handle) = spawn_node_actor( + let (mut actor_ref, _handle) = spawn_node_actor( config.clone(), home_dir.clone(), validator_set.clone(), @@ -493,12 +492,14 @@ async fn run_node( let decisions = Arc::new(AtomicUsize::new(0)); let current_height = Arc::new(AtomicUsize::new(0)); + let failure = Arc::new(Mutex::new(None)); let is_full_node = node.is_full_node(); let spawn_bg = |mut rx: RxEvent| { tokio::spawn({ let decisions = Arc::clone(&decisions); let current_height = Arc::clone(¤t_height); + let failure = Arc::clone(&failure); async move { while let Ok(event) = rx.recv().await { @@ -510,10 +511,15 @@ async fn run_node( decisions.fetch_add(1, Ordering::SeqCst); } Event::Published(msg) if is_full_node => { - panic!("Full nodes unexpectedly publish a consensus message: {msg:?}"); + failure.lock().await.replace(format!( + "Full node unexpectedly published a consensus message: {msg:?}" + )); } Event::WalReplayError(e) => { - panic!("WAL replay error: {e}"); + failure + .lock() + .await + .replace(format!("WAL replay error: {e}")); } _ => (), } @@ -528,6 +534,10 @@ async fn run_node( let mut bg = spawn_bg(rx_event_bg); for step in node.steps { + if let Some(failure) = failure.lock().await.take() { + return TestResult::Failure(failure); + } + match step { Step::WaitUntil(target_height) => { info!("Waiting until node reaches height {target_height}"); @@ -550,9 +560,7 @@ async fn run_node( sleep(after).await; actor_ref.kill_and_wait(None).await.expect("Node must stop"); - bg.abort(); - handle.abort(); } Step::ResetDb => { @@ -573,7 +581,8 @@ async fn run_node( let new_rx_event_bg = tx_event.subscribe(); info!("Spawning node"); - let (new_actor_ref, new_handle) = spawn_node_actor( + + let (new_actor_ref, _) = spawn_node_actor( config.clone(), home_dir.clone(), validator_set.clone(), @@ -589,7 +598,6 @@ async fn run_node( bg = spawn_bg(new_rx_event_bg); actor_ref = new_actor_ref; - handle = new_handle; rx_event = new_rx_event; } @@ -604,7 +612,6 @@ async fn run_node( } Err(e) => { actor_ref.stop(Some("Test failed".to_string())); - handle.abort(); bg.abort(); return TestResult::Failure(e.to_string()); @@ -617,7 +624,6 @@ async fn run_node( let actual = decisions.load(Ordering::SeqCst); actor_ref.stop(Some("Test is over".to_string())); - handle.abort(); bg.abort(); if expected.check(actual) { @@ -637,7 +643,6 @@ async fn run_node( Step::Fail(reason) => { actor_ref.stop(Some("Test failed".to_string())); - handle.abort(); bg.abort(); return TestResult::Failure(reason); @@ -645,7 +650,12 @@ async fn run_node( } } - return TestResult::Success("OK".to_string()); + let failure = failure.lock().await.take(); + if let Some(failure) = failure { + TestResult::Failure(failure) + } else { + TestResult::Success("OK".to_string()) + } } pub fn init_logging(test_module: &str) { @@ -658,9 +668,9 @@ pub fn init_logging(test_module: &str) { .any(|(k, v)| std::env::var(k).as_deref() == Ok(v)); let directive = if enable_debug { - format!("informalsystems=info,{test_module}=debug,ractor=error,debug") + format!("{test_module}=debug,ractor=error,informalsystems_malachitebft=debug") } else { - format!("informalsystems=debug,{test_module}=debug,ractor=error,warn") + format!("{test_module}=debug,ractor=error,informalsystems_malachitefbft=info") }; let filter = EnvFilter::builder().parse(directive).unwrap(); diff --git a/code/crates/sync/src/handle.rs b/code/crates/sync/src/handle.rs index ec2f7dffe..304916a76 100644 --- a/code/crates/sync/src/handle.rs +++ b/code/crates/sync/src/handle.rs @@ -44,7 +44,7 @@ pub enum Effect { SendValueResponse(InboundRequestId, ValueResponse), /// Retrieve a value from the application - GetValue(InboundRequestId, Ctx::Height), + GetDecidedValue(InboundRequestId, Ctx::Height), /// Send a VoteSet request to a peer SendVoteSetRequest(PeerId, VoteSetRequest), @@ -212,7 +212,7 @@ where metrics.decided_value_request_received(request.height.as_u64()); - perform!(co, Effect::GetValue(request_id, request.height)); + perform!(co, Effect::GetDecidedValue(request_id, request.height)); Ok(()) } @@ -378,7 +378,7 @@ async fn request_value_from_peer( where Ctx: Context, { - debug!(sync.height = %height, %peer, "Requesting value from peer"); + info!(sync.height = %height, %peer, "Requesting sync value from peer"); perform!( co, diff --git a/code/crates/test/Cargo.toml b/code/crates/test/Cargo.toml index 1c0d22fd4..8562aca19 100644 --- a/code/crates/test/Cargo.toml +++ b/code/crates/test/Cargo.toml @@ -37,6 +37,14 @@ serde_json = { workspace = true } sha3 = { workspace = true } signature = { workspace = true } +[dev-dependencies] +malachitebft-test-app.workspace = true +malachitebft-test-framework.workspace = true + +bytesize.workspace = true +tokio.workspace = true +tracing.workspace = true + [build-dependencies] prost-build = { workspace = true } diff --git a/code/crates/test/app/Cargo.toml b/code/crates/test/app/Cargo.toml new file mode 100644 index 000000000..15e5b04cc --- /dev/null +++ b/code/crates/test/app/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "informalsystems-malachitebft-test-app" +version.workspace = true +edition.workspace = true +repository.workspace = true +license.workspace = true +rust-version.workspace = true +publish = false + +[dependencies] +async-trait.workspace = true +bytes.workspace = true +color-eyre.workspace = true +derive-where.workspace = true +eyre.workspace = true +prost.workspace = true +rand.workspace = true +redb.workspace = true +serde_json.workspace = true +sha3.workspace = true +thiserror.workspace = true +tokio.workspace = true +tracing.workspace = true + +malachitebft-app-channel.workspace = true +malachitebft-proto.workspace = true +malachitebft-test.workspace = true +malachitebft-test-cli.workspace = true + +[dev-dependencies] +malachitebft-test-framework.workspace = true + +[lints] +workspace = true diff --git a/code/crates/test/app/src/app.rs b/code/crates/test/app/src/app.rs new file mode 100644 index 000000000..07b082894 --- /dev/null +++ b/code/crates/test/app/src/app.rs @@ -0,0 +1,308 @@ +use std::time::Duration; + +use eyre::eyre; +use tokio::time::sleep; +use tracing::{error, info}; + +use malachitebft_app_channel::app::streaming::StreamContent; +use malachitebft_app_channel::app::types::codec::Codec; +use malachitebft_app_channel::app::types::core::{Round, Validity}; +use malachitebft_app_channel::app::types::sync::RawDecidedValue; +use malachitebft_app_channel::app::types::ProposedValue; +use malachitebft_app_channel::{AppMsg, Channels, ConsensusMsg, NetworkMsg}; +use malachitebft_test::codec::proto::ProtobufCodec; +use malachitebft_test::{Genesis, Height, TestContext}; + +use crate::state::{decode_value, State}; + +pub async fn run( + genesis: Genesis, + state: &mut State, + channels: &mut Channels, +) -> eyre::Result<()> { + while let Some(msg) = channels.consensus.recv().await { + match msg { + // The first message to handle is the `ConsensusReady` message, signaling to the app + // that Malachite is ready to start consensus + AppMsg::ConsensusReady { reply } => { + let start_height = state + .store + .max_decided_value_height() + .await + .map(|height| height.increment()) + .unwrap_or_else(|| Height::new(1)); + + info!(%start_height, "Consensus is ready"); + + sleep(Duration::from_millis(200)).await; + + // We can simply respond by telling the engine to start consensus + // at the next height, and provide it with the genesis validator set + if reply + .send(ConsensusMsg::StartHeight( + start_height, + genesis.validator_set.clone(), + )) + .is_err() + { + error!("Failed to send ConsensusReady reply"); + } + } + + // The next message to handle is the `StartRound` message, signaling to the app + // that consensus has entered a new round (including the initial round 0) + AppMsg::StartedRound { + height, + round, + proposer, + } => { + info!(%height, %round, %proposer, "Started round"); + + // We can use that opportunity to update our internal state + state.current_height = height; + state.current_round = round; + state.current_proposer = Some(proposer); + } + + // At some point, we may end up being the proposer for that round, and the engine + // will then ask us for a value to propose to the other validators. + AppMsg::GetValue { + height, + round, + timeout: _, + reply, + } => { + // NOTE: We can ignore the timeout as we are building the value right away. + // If we were let's say reaping as many txes from a mempool and executing them, + // then we would need to respect the timeout and stop at a certain point. + + info!(%height, %round, "Consensus is requesting a value to propose"); + + // Here it is important that, if we have previously built a value for this height and round, + // we send back the very same value. + let proposal = match state.get_previously_built_value(height, round).await? { + Some(proposal) => { + info!(value = %proposal.value.id(), "Re-using previously built value"); + proposal + } + None => { + // If we have not previously built a value for that very same height and round, + // we need to create a new value to propose and send it back to consensus. + info!("Building a new value to propose"); + state.propose_value(height, round).await? + } + }; + + // Send it to consensus + if reply.send(proposal.clone()).is_err() { + error!("Failed to send GetValue reply"); + } + + if !state.config.consensus.value_payload.include_parts() { + return Ok(()); + } + + // Now what's left to do is to break down the value to propose into parts, + // and send those parts over the network to our peers, for them to re-assemble the full value. + for stream_message in state.stream_proposal(proposal) { + info!(%height, %round, "Streaming proposal part: {stream_message:?}"); + channels + .network + .send(NetworkMsg::PublishProposalPart(stream_message)) + .await?; + } + } + + // On the receiving end of these proposal parts (ie. when we are not the proposer), + // we need to process these parts and re-assemble the full value. + // To this end, we store each part that we receive and assemble the full value once we + // have all its constituent parts. Then we send that value back to consensus for it to + // consider and vote for or against it (ie. vote `nil`), depending on its validity. + AppMsg::ReceivedProposalPart { from, part, reply } => { + let part_type = match &part.content { + StreamContent::Data(part) => part.get_type(), + StreamContent::Fin => "end of stream", + }; + + info!(%from, %part.sequence, part.type = %part_type, "Received proposal part"); + + let proposed_value = state.received_proposal_part(from, part).await?; + + if reply.send(proposed_value).is_err() { + error!("Failed to send ReceivedProposalPart reply"); + } + } + + // In some cases, e.g. to verify the signature of a vote received at a higher height + // than the one we are at (e.g. because we are lagging behind a little bit), + // the engine may ask us for the validator set at that height. + // + // In our case, our validator set stays constant between heights so we can + // send back the validator set found in our genesis state. + AppMsg::GetValidatorSet { height: _, reply } => { + if reply.send(genesis.validator_set.clone()).is_err() { + error!("Failed to send GetValidatorSet reply"); + } + } + + // After some time, consensus will finally reach a decision on the value + // to commit for the current height, and will notify the application, + // providing it with a commit certificate which contains the ID of the value + // that was decided on as well as the set of commits for that value, + // ie. the precommits together with their (aggregated) signatures. + AppMsg::Decided { + certificate, + extensions: _, + reply, + } => { + info!( + height = %certificate.height, round = %certificate.round, + value = %certificate.value_id, + "Consensus has decided on value" + ); + + // When that happens, we store the decided value in our store + state.commit(certificate).await?; + + sleep(Duration::from_millis(500)).await; + + // And then we instruct consensus to start the next height + if reply + .send(ConsensusMsg::StartHeight( + state.current_height, + genesis.validator_set.clone(), + )) + .is_err() + { + error!("Failed to send Decided reply"); + } + } + + // It may happen that our node is lagging behind its peers. In that case, + // a synchronization mechanism will automatically kick to try and catch up to + // our peers. When that happens, some of these peers will send us decided values + // for the heights in between the one we are currently at (included) and the one + // that they are at. When the engine receives such a value, it will forward to the application + // to decode it from its wire format and send back the decoded value to consensus. + AppMsg::ProcessSyncedValue { + height, + round, + proposer, + value_bytes, + reply, + } => { + info!(%height, %round, "Processing synced value"); + + let value = decode_value(value_bytes); + + let proposal = ProposedValue { + height, + round, + valid_round: Round::Nil, + proposer, + value, + validity: Validity::Valid, + }; + + state.store_synced_value(proposal.clone()).await?; + + if reply.send(proposal).is_err() { + error!("Failed to send ProcessSyncedValue reply"); + } + } + + // If, on the other hand, we are not lagging behind but are instead asked by one of + // our peer to help them catch up because they are the one lagging behind, + // then the engine might ask the application to provide with the value + // that was decided at some lower height. In that case, we fetch it from our store + // and send it to consensus. + AppMsg::GetDecidedValue { height, reply } => { + info!(%height, "Received sync request for decided value"); + + let decided_value = state.get_decided_value(height).await; + info!(%height, "Found decided value: {decided_value:?}"); + + let raw_decided_value = decided_value.map(|decided_value| RawDecidedValue { + certificate: decided_value.certificate, + value_bytes: ProtobufCodec.encode(&decided_value.value).unwrap(), // FIXME: unwrap + }); + + if reply.send(raw_decided_value).is_err() { + error!("Failed to send GetDecidedValue reply"); + } + } + + // In order to figure out if we can help a peer that is lagging behind, + // the engine may ask us for the height of the earliest available value in our store. + AppMsg::GetHistoryMinHeight { reply } => { + let min_height = state.get_earliest_height().await; + + if reply.send(min_height).is_err() { + error!("Failed to send GetHistoryMinHeight reply"); + } + } + + AppMsg::RestreamProposal { + height, + round, + valid_round, + address, + value_id, + } => { + if !state.config.consensus.value_payload.include_parts() { + return Ok(()); + } + + info!(%height, %round, %value_id, "Restreaming existing proposal..."); + + let Some(proposal) = state + .get_proposal(height, round, valid_round, address, value_id) + .await + else { + error!(%height, %round, %value_id, "Failed to find proposal to restream"); + return Ok(()); + }; + + for stream_message in state.stream_proposal(proposal) { + info!(%height, %round, %value_id, "Publishing proposal part: {stream_message:?}"); + + channels + .network + .send(NetworkMsg::PublishProposalPart(stream_message)) + .await?; + } + } + + AppMsg::PeerJoined { peer_id } => { + info!(%peer_id, "Peer joined our local view of network"); + + // You might want to track connected peers in your state + state.peers.insert(peer_id); + } + + AppMsg::PeerLeft { peer_id } => { + info!(%peer_id, "Peer left our local view of network"); + + // Remove the peer from tracking + state.peers.remove(&peer_id); + } + + AppMsg::ExtendVote { reply, .. } => { + if reply.send(None).is_err() { + error!("Failed to send ExtendVote reply"); + } + } + + AppMsg::VerifyVoteExtension { reply, .. } => { + if reply.send(Ok(())).is_err() { + error!("Failed to send VerifyVoteExtension reply"); + } + } + } + } + + // If we get there, it can only be because the channel we use to receive message + // from consensus has been closed, meaning that the consensus actor has died. + // We can do nothing but return an error here. + Err(eyre!("Consensus channel closed unexpectedly")) +} diff --git a/code/crates/test/app/src/lib.rs b/code/crates/test/app/src/lib.rs new file mode 100644 index 000000000..9341da944 --- /dev/null +++ b/code/crates/test/app/src/lib.rs @@ -0,0 +1,5 @@ +pub mod app; +pub mod node; +pub mod state; +pub mod store; +pub mod streaming; diff --git a/code/crates/test/app/src/node.rs b/code/crates/test/app/src/node.rs new file mode 100644 index 000000000..f1ec838ab --- /dev/null +++ b/code/crates/test/app/src/node.rs @@ -0,0 +1,178 @@ +//! The Application (or Node) definition. The Node trait implements the Consensus context and the +//! cryptographic library used for signing. + +use std::path::{Path, PathBuf}; + +use async_trait::async_trait; +use rand::{CryptoRng, RngCore}; +use tokio::task::JoinHandle; +use tracing::Instrument; + +use malachitebft_app_channel::app::events::TxEvent; +use malachitebft_app_channel::app::types::config::Config; // TODO: Move into test app +use malachitebft_app_channel::app::types::core::VotingPower; +use malachitebft_app_channel::app::types::Keypair; +use malachitebft_app_channel::app::Node; +use malachitebft_app_channel::EngineHandle; + +// Use the same types used for integration tests. +// A real application would use its own types and context instead. +use malachitebft_test::codec::proto::ProtobufCodec; +use malachitebft_test::{ + Address, Ed25519Provider, Genesis, Height, PrivateKey, PublicKey, TestContext, Validator, + ValidatorSet, +}; + +use crate::state::State; +use crate::store::Store; + +pub struct Handles { + pub app: JoinHandle<()>, + pub engine: EngineHandle, + pub tx_event: TxEvent, +} + +/// Main application struct implementing the consensus node functionality +#[derive(Clone)] +pub struct App { + pub config: Config, + pub home_dir: PathBuf, + pub validator_set: ValidatorSet, + pub private_key: PrivateKey, + pub start_height: Option, +} + +impl App { + pub async fn start(&self) -> eyre::Result { + let span = tracing::error_span!("node", moniker = %self.config.moniker); + let _guard = span.enter(); + + let public_key = self.get_public_key(&self.private_key); + let address = self.get_address(&public_key); + let private_key_file = self.load_private_key_file()?; + let private_key = self.load_private_key(private_key_file); + let signing_provider = self.get_signing_provider(private_key); + + let validators = self + .validator_set + .validators + .iter() + .map(|v| (v.public_key, v.voting_power)) + .collect(); + + let genesis = self.make_genesis(validators); + + let ctx = TestContext::new(); + let codec = ProtobufCodec; + + let (mut channels, engine_handle) = malachitebft_app_channel::start_engine( + ctx, + codec, + self.clone(), + self.config.clone(), + self.start_height, + self.validator_set.clone(), + ) + .await?; + + drop(_guard); + + let config = self.config.clone(); + let store = Store::open(self.get_home_dir().join("store.db"))?; + let start_height = self.start_height.unwrap_or_default(); + + let mut state = State::new( + ctx, + config, + genesis.clone(), + address, + start_height, + store, + signing_provider, + ); + + let tx_event = channels.events.clone(); + + let app_handle = tokio::spawn( + async move { + if let Err(e) = crate::app::run(genesis, &mut state, &mut channels).await { + tracing::error!("Application has failed with an error: {e}"); + } + } + .instrument(span), + ); + + Ok(Handles { + app: app_handle, + engine: engine_handle, + tx_event, + }) + } +} + +#[async_trait] +impl Node for App { + type Context = TestContext; + type Genesis = Genesis; + type PrivateKeyFile = PrivateKey; + type SigningProvider = Ed25519Provider; + + fn get_home_dir(&self) -> PathBuf { + self.home_dir.to_owned() + } + + fn get_signing_provider(&self, private_key: PrivateKey) -> Self::SigningProvider { + Ed25519Provider::new(private_key) + } + + fn generate_private_key(&self, rng: R) -> PrivateKey + where + R: RngCore + CryptoRng, + { + PrivateKey::generate(rng) + } + + fn get_address(&self, pk: &PublicKey) -> Address { + Address::from_public_key(pk) + } + + fn get_public_key(&self, pk: &PrivateKey) -> PublicKey { + pk.public_key() + } + + fn get_keypair(&self, pk: PrivateKey) -> Keypair { + Keypair::ed25519_from_bytes(pk.inner().to_bytes()).unwrap() + } + + fn load_private_key(&self, file: Self::PrivateKeyFile) -> PrivateKey { + file + } + + fn load_private_key_file(&self) -> std::io::Result { + Ok(self.private_key.clone()) + } + + fn make_private_key_file(&self, private_key: PrivateKey) -> Self::PrivateKeyFile { + private_key + } + + fn load_genesis(&self, path: impl AsRef) -> std::io::Result { + let genesis = std::fs::read_to_string(path)?; + serde_json::from_str(&genesis).map_err(|e| e.into()) + } + + fn make_genesis(&self, validators: Vec<(PublicKey, VotingPower)>) -> Self::Genesis { + let validators = validators + .into_iter() + .map(|(pk, vp)| Validator::new(pk, vp)); + + let validator_set = ValidatorSet::new(validators); + + Genesis { validator_set } + } + + async fn run(self) -> eyre::Result<()> { + let handles = self.start().await?; + handles.app.await.map_err(Into::into) + } +} diff --git a/code/crates/test/app/src/state.rs b/code/crates/test/app/src/state.rs new file mode 100644 index 000000000..15fc21927 --- /dev/null +++ b/code/crates/test/app/src/state.rs @@ -0,0 +1,479 @@ +//! Internal state of the application. This is a simplified abstract to keep it simple. +//! A regular application would have mempool implemented, a proper database and input methods like RPC. + +use std::collections::HashSet; + +use bytes::Bytes; +use eyre::eyre; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use sha3::Digest; +use tracing::{debug, error}; + +use malachitebft_app_channel::app::consensus::ProposedValue; +use malachitebft_app_channel::app::streaming::{StreamContent, StreamId, StreamMessage}; +use malachitebft_app_channel::app::types::codec::Codec; +use malachitebft_app_channel::app::types::config::Config; // TODO: Move into test app +use malachitebft_app_channel::app::types::core::{CommitCertificate, Round, Validity}; +use malachitebft_app_channel::app::types::{LocallyProposedValue, PeerId}; +use malachitebft_test::codec::proto::ProtobufCodec; +use malachitebft_test::{ + Address, Ed25519Provider, Genesis, Height, ProposalData, ProposalFin, ProposalInit, + ProposalPart, TestContext, ValidatorSet, Value, ValueId, +}; + +use crate::store::{DecidedValue, Store}; +use crate::streaming::{PartStreamsMap, ProposalParts}; + +/// Number of historical values to keep in the store +const HISTORY_LENGTH: u64 = 500; + +/// Represents the internal state of the application node +/// Contains information about current height, round, proposals and blocks +pub struct State { + pub ctx: TestContext, + pub config: Config, + pub genesis: Genesis, + pub address: Address, + pub current_height: Height, + pub current_round: Round, + pub current_proposer: Option
, + pub peers: HashSet, + pub store: Store, + + signing_provider: Ed25519Provider, + streams_map: PartStreamsMap, + rng: StdRng, +} + +// Make up a seed for the rng based on our address in +// order for each node to likely propose different values at +// each round. +fn seed_from_address(address: &Address) -> u64 { + address.into_inner().chunks(8).fold(0u64, |acc, chunk| { + let term = chunk.iter().fold(0u64, |acc, &x| { + acc.wrapping_shl(8).wrapping_add(u64::from(x)) + }); + acc.wrapping_add(term) + }) +} + +impl State { + /// Creates a new State instance with the given validator address and starting height + pub fn new( + ctx: TestContext, + config: Config, + genesis: Genesis, + address: Address, + height: Height, + store: Store, + signing_provider: Ed25519Provider, + ) -> Self { + Self { + ctx, + config, + genesis, + address, + store, + signing_provider, + current_height: height, + current_round: Round::new(0), + current_proposer: None, + streams_map: PartStreamsMap::new(), + rng: StdRng::seed_from_u64(seed_from_address(&address)), + peers: HashSet::new(), + } + } + + /// Returns the set of validators. + pub fn get_validator_set(&self) -> &ValidatorSet { + &self.genesis.validator_set + } + + /// Returns the earliest height available in the state + pub async fn get_earliest_height(&self) -> Height { + self.store + .min_decided_value_height() + .await + .unwrap_or_default() + } + + /// Processes and adds a new proposal to the state if it's valid + /// Returns Some(ProposedValue) if the proposal was accepted, None otherwise + pub async fn received_proposal_part( + &mut self, + from: PeerId, + part: StreamMessage, + ) -> eyre::Result>> { + let sequence = part.sequence; + + // Check if we have a full proposal + let Some(parts) = self.streams_map.insert(from, part) else { + return Ok(None); + }; + + // Check if the proposal is outdated + if parts.height < self.current_height { + debug!( + height = %self.current_height, + round = %self.current_round, + part.height = %parts.height, + part.round = %parts.round, + part.sequence = %sequence, + "Received outdated proposal part, ignoring" + ); + + return Ok(None); + } + + // Verify the proposal signature + match self.verify_proposal_signature(&parts) { + Ok(()) => { + // Signature verified successfully, continue processing + } + Err(SignatureVerificationError::MissingInitPart) => { + return Err(eyre!( + "Expected to have full proposal but `Init` proposal part is missing for proposer: {}", + parts.proposer + )); + } + Err(SignatureVerificationError::MissingFinPart) => { + return Err(eyre!( + "Expected to have full proposal but `Fin` proposal part is missing for proposer: {}", + parts.proposer + )); + } + Err(SignatureVerificationError::ProposerNotFound) => { + error!(proposer = %parts.proposer, "Proposer not found in validator set"); + return Ok(None); + } + Err(SignatureVerificationError::InvalidSignature) => { + error!(proposer = %parts.proposer, "Invalid signature in Fin part"); + return Ok(None); + } + } + + // Re-assemble the proposal from its parts + let value = assemble_value_from_parts(parts); + + self.store.store_undecided_proposal(value.clone()).await?; + + Ok(Some(value)) + } + + /// Retrieves a decided block at the given height + pub async fn get_decided_value(&self, height: Height) -> Option { + self.store.get_decided_value(height).await.ok().flatten() + } + + /// Commits a value with the given certificate, updating internal state + /// and moving to the next height + pub async fn commit( + &mut self, + certificate: CommitCertificate, + ) -> eyre::Result<()> { + let (height, round) = (certificate.height, certificate.round); + + let Ok(Some(proposal)) = self.store.get_undecided_proposal(height, round).await else { + return Err(eyre!( + "Trying to commit a value at height {height} and round {round} for which there is no proposal: {}", + certificate.value_id + )); + }; + + self.store + .store_decided_value(&certificate, proposal.value) + .await?; + + self.store.remove_undecided_proposal(height, round).await?; + + // Prune the store, keep the last HISTORY_LENGTH values + let retain_height = Height::new(height.as_u64().saturating_sub(HISTORY_LENGTH)); + self.store.prune(retain_height).await?; + + // Move to next height + self.current_height = self.current_height.increment(); + self.current_round = Round::new(0); + + Ok(()) + } + + pub async fn store_synced_value( + &mut self, + proposal: ProposedValue, + ) -> eyre::Result<()> { + self.store.store_undecided_proposal(proposal).await?; + Ok(()) + } + + /// Retrieves a previously built proposal value for the given height + pub async fn get_previously_built_value( + &self, + height: Height, + round: Round, + ) -> eyre::Result>> { + let Some(proposal) = self.store.get_undecided_proposal(height, round).await? else { + return Ok(None); + }; + + Ok(Some(LocallyProposedValue::new( + proposal.height, + proposal.round, + proposal.value, + ))) + } + + /// Creates a new proposal value for the given height + /// Returns either a previously built proposal or creates a new one + async fn create_proposal( + &mut self, + height: Height, + round: Round, + ) -> eyre::Result> { + assert_eq!(height, self.current_height); + assert_eq!(round, self.current_round); + + // We create a new value. + let value = self.make_value(); + + let proposal = ProposedValue { + height, + round, + valid_round: Round::Nil, + proposer: self.address, // We are the proposer + value, + validity: Validity::Valid, // Our proposals are de facto valid + }; + + // Insert the new proposal into the undecided proposals. + self.store + .store_undecided_proposal(proposal.clone()) + .await?; + + Ok(proposal) + } + + /// Make up a new value to propose + /// A real application would have a more complex logic here, + /// typically reaping transactions from a mempool and executing them against its state, + /// before computing the merkle root of the new app state. + fn make_value(&mut self) -> Value { + let value = self.rng.gen_range(100..=100000); + Value::new(value) + } + + pub async fn get_proposal( + &self, + height: Height, + round: Round, + _valid_round: Round, + _proposer: Address, + value_id: ValueId, + ) -> Option> { + Some(LocallyProposedValue::new( + height, + round, + Value::new(value_id.as_u64()), + )) + } + + /// Creates a new proposal value for the given height + /// Returns either a previously built proposal or creates a new one + pub async fn propose_value( + &mut self, + height: Height, + round: Round, + ) -> eyre::Result> { + assert_eq!(height, self.current_height); + assert_eq!(round, self.current_round); + + let proposal = self.create_proposal(height, round).await?; + + Ok(LocallyProposedValue::new( + proposal.height, + proposal.round, + proposal.value, + )) + } + + fn stream_id(&self) -> StreamId { + let mut bytes = Vec::with_capacity(size_of::() + size_of::()); + bytes.extend_from_slice(&self.current_height.as_u64().to_be_bytes()); + bytes.extend_from_slice(&self.current_round.as_u32().unwrap().to_be_bytes()); + StreamId::new(bytes.into()) + } + + /// Creates a stream message containing a proposal part. + /// Updates internal sequence number and current proposal. + pub fn stream_proposal( + &mut self, + value: LocallyProposedValue, + ) -> impl Iterator> { + let parts = self.value_to_parts(value); + let stream_id = self.stream_id(); + + let mut msgs = Vec::with_capacity(parts.len() + 1); + let mut sequence = 0; + + for part in parts { + let msg = StreamMessage::new(stream_id.clone(), sequence, StreamContent::Data(part)); + sequence += 1; + msgs.push(msg); + } + + msgs.push(StreamMessage::new(stream_id, sequence, StreamContent::Fin)); + + msgs.into_iter() + } + + fn value_to_parts(&self, value: LocallyProposedValue) -> Vec { + let mut hasher = sha3::Keccak256::new(); + let mut parts = Vec::new(); + + // Init + // Include metadata about the proposal + { + parts.push(ProposalPart::Init(ProposalInit::new( + value.height, + value.round, + self.address, + ))); + + hasher.update(value.height.as_u64().to_be_bytes().as_slice()); + hasher.update(value.round.as_i64().to_be_bytes().as_slice()); + } + + // Data + // Include each prime factor of the value as a separate proposal part + { + for factor in factor_value(value.value) { + parts.push(ProposalPart::Data(ProposalData::new(factor))); + + hasher.update(factor.to_be_bytes().as_slice()); + } + } + + // Fin + // Sign the hash of the proposal parts + { + let hash = hasher.finalize().to_vec(); + let signature = self.signing_provider.sign(&hash); + parts.push(ProposalPart::Fin(ProposalFin::new(signature))); + } + + parts + } + + /// Verifies the signature of the proposal. + /// Returns `Ok(())` if the signature is valid, or an appropriate `SignatureVerificationError`. + fn verify_proposal_signature( + &self, + parts: &ProposalParts, + ) -> Result<(), SignatureVerificationError> { + let mut hasher = sha3::Keccak256::new(); + + let init = parts + .init() + .ok_or(SignatureVerificationError::MissingInitPart)?; + + let fin = parts + .fin() + .ok_or(SignatureVerificationError::MissingFinPart)?; + + let hash = { + hasher.update(init.height.as_u64().to_be_bytes()); + hasher.update(init.round.as_i64().to_be_bytes()); + + // The correctness of the hash computation relies on the parts being ordered by sequence + // number, which is guaranteed by the `PartStreamsMap`. + for part in parts.parts.iter().filter_map(|part| part.as_data()) { + hasher.update(part.factor.to_be_bytes()); + } + + hasher.finalize() + }; + + // Retrieve the the proposer + let proposer = self + .get_validator_set() + .get_by_address(&parts.proposer) + .ok_or(SignatureVerificationError::ProposerNotFound)?; + + // Verify the signature + if !self + .signing_provider + .verify(&hash, &fin.signature, &proposer.public_key) + { + return Err(SignatureVerificationError::InvalidSignature); + } + + Ok(()) + } +} + +/// Re-assemble a [`ProposedValue`] from its [`ProposalParts`]. +/// +/// This is done by multiplying all the factors in the parts. +fn assemble_value_from_parts(parts: ProposalParts) -> ProposedValue { + let value = parts + .parts + .iter() + .filter_map(|part| part.as_data()) + .fold(1, |acc, data| acc * data.factor); + + ProposedValue { + height: parts.height, + round: parts.round, + valid_round: Round::Nil, + proposer: parts.proposer, + value: Value::new(value), + validity: Validity::Valid, // TODO: Check signature in Fin part + } +} + +/// Decodes a Value from its byte representation using ProtobufCodec +pub fn decode_value(bytes: Bytes) -> Value { + ProtobufCodec.decode(bytes).unwrap() +} + +/// Returns the list of prime factors of the given value +/// +/// In a real application, this would typically split transactions +/// into chunks ino order to reduce bandwidth requirements due +/// to duplication of gossip messages. +fn factor_value(value: Value) -> Vec { + let mut factors = Vec::new(); + let mut n = value.value; + + let mut i = 2; + while i * i <= n { + if n % i == 0 { + factors.push(i); + n /= i; + } else { + i += 1; + } + } + + if n > 1 { + factors.push(n); + } + + factors +} + +/// Represents errors that can occur during the verification of a proposal's signature. +#[derive(Debug)] +enum SignatureVerificationError { + /// Indicates that the `Init` part of the proposal is unexpectedly missing. + MissingInitPart, + + /// Indicates that the `Fin` part of the proposal is unexpectedly missing. + MissingFinPart, + + /// Indicates that the proposer was not found in the validator set. + ProposerNotFound, + + /// Indicates that the signature in the `Fin` part is invalid. + InvalidSignature, +} diff --git a/code/crates/test/app/src/store.rs b/code/crates/test/app/src/store.rs new file mode 100644 index 000000000..b67b1c8e1 --- /dev/null +++ b/code/crates/test/app/src/store.rs @@ -0,0 +1,330 @@ +use std::ops::RangeBounds; +use std::path::Path; +use std::sync::Arc; + +use bytes::Bytes; +use prost::Message; +use redb::ReadableTable; +use thiserror::Error; +use tracing::error; + +use malachitebft_app_channel::app::types::codec::Codec; +use malachitebft_app_channel::app::types::core::{CommitCertificate, Round}; +use malachitebft_app_channel::app::types::ProposedValue; +use malachitebft_proto::{Error as ProtoError, Protobuf}; +use malachitebft_test::codec::proto as codec; +use malachitebft_test::codec::proto::ProtobufCodec; +use malachitebft_test::proto; +use malachitebft_test::{Height, TestContext, Value}; + +mod keys; +use keys::{HeightKey, UndecidedValueKey}; + +#[derive(Clone, Debug)] +pub struct DecidedValue { + pub value: Value, + pub certificate: CommitCertificate, +} + +fn decode_certificate(bytes: &[u8]) -> Result, ProtoError> { + let proto = proto::CommitCertificate::decode(bytes)?; + codec::decode_certificate(proto) +} + +fn encode_certificate(certificate: &CommitCertificate) -> Result, ProtoError> { + let proto = codec::encode_certificate(certificate)?; + Ok(proto.encode_to_vec()) +} + +#[derive(Debug, Error)] +pub enum StoreError { + #[error("Database error: {0}")] + Database(#[from] redb::DatabaseError), + + #[error("Storage error: {0}")] + Storage(#[from] redb::StorageError), + + #[error("Table error: {0}")] + Table(#[from] redb::TableError), + + #[error("Commit error: {0}")] + Commit(#[from] redb::CommitError), + + #[error("Transaction error: {0}")] + Transaction(#[from] redb::TransactionError), + + #[error("Failed to encode/decode Protobuf: {0}")] + Protobuf(#[from] ProtoError), + + #[error("Failed to join on task: {0}")] + TaskJoin(#[from] tokio::task::JoinError), +} + +const CERTIFICATES_TABLE: redb::TableDefinition> = + redb::TableDefinition::new("certificates"); + +const DECIDED_VALUES_TABLE: redb::TableDefinition> = + redb::TableDefinition::new("decided_values"); + +const UNDECIDED_PROPOSALS_TABLE: redb::TableDefinition> = + redb::TableDefinition::new("undecided_values"); + +struct Db { + db: redb::Database, +} + +impl Db { + fn new(path: impl AsRef) -> Result { + Ok(Self { + db: redb::Database::create(path).map_err(StoreError::Database)?, + }) + } + + fn get_decided_value(&self, height: Height) -> Result, StoreError> { + let tx = self.db.begin_read()?; + let value = { + let table = tx.open_table(DECIDED_VALUES_TABLE)?; + let value = table.get(&height)?; + value.and_then(|value| Value::from_bytes(&value.value()).ok()) + }; + let certificate = { + let table = tx.open_table(CERTIFICATES_TABLE)?; + let value = table.get(&height)?; + value.and_then(|value| decode_certificate(&value.value()).ok()) + }; + + let decided_value = value + .zip(certificate) + .map(|(value, certificate)| DecidedValue { value, certificate }); + + Ok(decided_value) + } + + fn insert_decided_value(&self, decided_value: DecidedValue) -> Result<(), StoreError> { + let height = decided_value.certificate.height; + + let tx = self.db.begin_write()?; + { + let mut values = tx.open_table(DECIDED_VALUES_TABLE)?; + values.insert(height, decided_value.value.to_bytes()?.to_vec())?; + } + { + let mut certificates = tx.open_table(CERTIFICATES_TABLE)?; + certificates.insert(height, encode_certificate(&decided_value.certificate)?)?; + } + tx.commit()?; + + Ok(()) + } + + pub fn get_undecided_proposal( + &self, + height: Height, + round: Round, + ) -> Result>, StoreError> { + let tx = self.db.begin_read()?; + let table = tx.open_table(UNDECIDED_PROPOSALS_TABLE)?; + + let value = if let Ok(Some(value)) = table.get(&(height, round)) { + Some( + ProtobufCodec + .decode(Bytes::from(value.value())) + .map_err(StoreError::Protobuf)?, + ) + } else { + None + }; + + Ok(value) + } + + fn insert_undecided_proposal( + &self, + proposal: ProposedValue, + ) -> Result<(), StoreError> { + let key = (proposal.height, proposal.round); + let value = ProtobufCodec.encode(&proposal)?; + let tx = self.db.begin_write()?; + { + let mut table = tx.open_table(UNDECIDED_PROPOSALS_TABLE)?; + table.insert(key, value.to_vec())?; + } + tx.commit()?; + Ok(()) + } + + pub fn remove_undecided_proposal( + &self, + height: Height, + round: Round, + ) -> Result<(), StoreError> { + let tx = self.db.begin_write()?; + { + let mut table = tx.open_table(UNDECIDED_PROPOSALS_TABLE)?; + table.remove(&(height, round))?; + } + tx.commit()?; + Ok(()) + } + + fn height_range( + &self, + table: &Table, + range: impl RangeBounds, + ) -> Result, StoreError> + where + Table: redb::ReadableTable>, + { + Ok(table + .range(range)? + .flatten() + .map(|(key, _)| key.value()) + .collect::>()) + } + + fn undecided_proposals_range
( + &self, + table: &Table, + range: impl RangeBounds<(Height, Round)>, + ) -> Result, StoreError> + where + Table: redb::ReadableTable>, + { + Ok(table + .range(range)? + .flatten() + .map(|(key, _)| key.value()) + .collect::>()) + } + + fn prune(&self, retain_height: Height) -> Result, StoreError> { + let tx = self.db.begin_write().unwrap(); + let pruned = { + let mut undecided = tx.open_table(UNDECIDED_PROPOSALS_TABLE)?; + let keys = self.undecided_proposals_range(&undecided, ..(retain_height, Round::Nil))?; + for key in keys { + undecided.remove(key)?; + } + + let mut decided = tx.open_table(DECIDED_VALUES_TABLE)?; + let mut certificates = tx.open_table(CERTIFICATES_TABLE)?; + + let keys = self.height_range(&decided, ..retain_height)?; + for key in &keys { + decided.remove(key)?; + certificates.remove(key)?; + } + keys + }; + tx.commit()?; + + Ok(pruned) + } + + fn min_decided_value_height(&self) -> Option { + let tx = self.db.begin_read().unwrap(); + let table = tx.open_table(DECIDED_VALUES_TABLE).unwrap(); + let (key, _) = table.first().ok()??; + Some(key.value()) + } + + fn max_decided_value_height(&self) -> Option { + let tx = self.db.begin_read().unwrap(); + let table = tx.open_table(DECIDED_VALUES_TABLE).unwrap(); + let (key, _) = table.last().ok()??; + Some(key.value()) + } + + fn create_tables(&self) -> Result<(), StoreError> { + let tx = self.db.begin_write()?; + // Implicitly creates the tables if they do not exist yet + let _ = tx.open_table(DECIDED_VALUES_TABLE)?; + let _ = tx.open_table(CERTIFICATES_TABLE)?; + let _ = tx.open_table(UNDECIDED_PROPOSALS_TABLE)?; + tx.commit()?; + Ok(()) + } +} + +#[derive(Clone)] +pub struct Store { + db: Arc, +} + +impl Store { + pub fn open(path: impl AsRef) -> Result { + let db = Db::new(path)?; + db.create_tables()?; + + Ok(Self { db: Arc::new(db) }) + } + + pub async fn min_decided_value_height(&self) -> Option { + let db = Arc::clone(&self.db); + tokio::task::spawn_blocking(move || db.min_decided_value_height()) + .await + .ok() + .flatten() + } + + pub async fn max_decided_value_height(&self) -> Option { + let db = Arc::clone(&self.db); + tokio::task::spawn_blocking(move || db.max_decided_value_height()) + .await + .ok() + .flatten() + } + + pub async fn get_decided_value( + &self, + height: Height, + ) -> Result, StoreError> { + let db = Arc::clone(&self.db); + tokio::task::spawn_blocking(move || db.get_decided_value(height)).await? + } + + pub async fn store_decided_value( + &self, + certificate: &CommitCertificate, + value: Value, + ) -> Result<(), StoreError> { + let decided_value = DecidedValue { + value, + certificate: certificate.clone(), + }; + + let db = Arc::clone(&self.db); + tokio::task::spawn_blocking(move || db.insert_decided_value(decided_value)).await? + } + + pub async fn store_undecided_proposal( + &self, + value: ProposedValue, + ) -> Result<(), StoreError> { + let db = Arc::clone(&self.db); + tokio::task::spawn_blocking(move || db.insert_undecided_proposal(value)).await? + } + + pub async fn remove_undecided_proposal( + &self, + height: Height, + round: Round, + ) -> Result<(), StoreError> { + let db = Arc::clone(&self.db); + tokio::task::spawn_blocking(move || db.remove_undecided_proposal(height, round)).await? + } + + pub async fn get_undecided_proposal( + &self, + height: Height, + round: Round, + ) -> Result>, StoreError> { + let db = Arc::clone(&self.db); + tokio::task::spawn_blocking(move || db.get_undecided_proposal(height, round)).await? + } + + pub async fn prune(&self, retain_height: Height) -> Result, StoreError> { + let db = Arc::clone(&self.db); + tokio::task::spawn_blocking(move || db.prune(retain_height)).await? + } +} diff --git a/code/crates/test/app/src/store/keys.rs b/code/crates/test/app/src/store/keys.rs new file mode 100644 index 000000000..e45cc83c8 --- /dev/null +++ b/code/crates/test/app/src/store/keys.rs @@ -0,0 +1,83 @@ +use core::mem::size_of; + +use malachitebft_app_channel::app::types::core::Round; +use malachitebft_test::Height; + +pub type UndecidedValueKey = (HeightKey, RoundKey); + +#[derive(Copy, Clone, Debug)] +pub struct HeightKey; + +impl redb::Value for HeightKey { + type SelfType<'a> = Height; + type AsBytes<'a> = [u8; size_of::()]; + + fn fixed_width() -> Option { + Some(size_of::()) + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + let height = ::from_bytes(data); + + Height::new(height) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + ::as_bytes(&value.as_u64()) + } + + fn type_name() -> redb::TypeName { + redb::TypeName::new("Height") + } +} + +impl redb::Key for HeightKey { + fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering { + ::compare(data1, data2) + } +} + +#[derive(Copy, Clone, Debug)] +pub struct RoundKey; + +impl redb::Value for RoundKey { + type SelfType<'a> = Round; + type AsBytes<'a> = [u8; size_of::()]; + + fn fixed_width() -> Option { + Some(size_of::()) + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + let round = ::from_bytes(data); + Round::from(round) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + ::as_bytes(&value.as_i64()) + } + + fn type_name() -> redb::TypeName { + redb::TypeName::new("Round") + } +} + +impl redb::Key for RoundKey { + fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering { + ::compare(data1, data2) + } +} diff --git a/code/crates/test/app/src/streaming.rs b/code/crates/test/app/src/streaming.rs new file mode 100644 index 000000000..143717eee --- /dev/null +++ b/code/crates/test/app/src/streaming.rs @@ -0,0 +1,150 @@ +use std::cmp::Ordering; +use std::collections::{BTreeMap, BinaryHeap, HashSet}; + +use malachitebft_app_channel::app::consensus::PeerId; +use malachitebft_app_channel::app::streaming::{Sequence, StreamId, StreamMessage}; +use malachitebft_app_channel::app::types::core::Round; +use malachitebft_test::{Address, Height, ProposalFin, ProposalInit, ProposalPart}; + +struct MinSeq(StreamMessage); + +impl PartialEq for MinSeq { + fn eq(&self, other: &Self) -> bool { + self.0.sequence == other.0.sequence + } +} + +impl Eq for MinSeq {} + +impl Ord for MinSeq { + fn cmp(&self, other: &Self) -> Ordering { + other.0.sequence.cmp(&self.0.sequence) + } +} + +impl PartialOrd for MinSeq { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +struct MinHeap(BinaryHeap>); + +impl Default for MinHeap { + fn default() -> Self { + Self(BinaryHeap::new()) + } +} + +impl MinHeap { + fn push(&mut self, msg: StreamMessage) { + self.0.push(MinSeq(msg)); + } + + fn len(&self) -> usize { + self.0.len() + } + + fn drain(&mut self) -> Vec { + self.0 + .drain() + .filter_map(|msg| msg.0.content.into_data()) + .collect() + } +} + +#[derive(Default)] +struct StreamState { + buffer: MinHeap, + init_info: Option, + seen_sequences: HashSet, + total_messages: usize, + fin_received: bool, +} + +impl StreamState { + fn is_done(&self) -> bool { + self.init_info.is_some() && self.fin_received && self.buffer.len() == self.total_messages + } + + fn insert(&mut self, msg: StreamMessage) -> Option { + if msg.is_first() { + self.init_info = msg.content.as_data().and_then(|p| p.as_init()).cloned(); + } + + if msg.is_fin() { + self.fin_received = true; + self.total_messages = msg.sequence as usize + 1; + } + + self.buffer.push(msg); + + if self.is_done() { + let init_info = self.init_info.take()?; + + Some(ProposalParts { + height: init_info.height, + round: init_info.round, + proposer: init_info.proposer, + parts: self.buffer.drain(), + }) + } else { + None + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ProposalParts { + pub height: Height, + pub round: Round, + pub proposer: Address, + pub parts: Vec, +} + +impl ProposalParts { + pub fn init(&self) -> Option<&ProposalInit> { + self.parts.iter().find_map(|p| p.as_init()) + } + + pub fn fin(&self) -> Option<&ProposalFin> { + self.parts.iter().find_map(|p| p.as_fin()) + } +} + +#[derive(Default)] +pub struct PartStreamsMap { + streams: BTreeMap<(PeerId, StreamId), StreamState>, +} + +impl PartStreamsMap { + pub fn new() -> Self { + Self::default() + } + + pub fn insert( + &mut self, + peer_id: PeerId, + msg: StreamMessage, + ) -> Option { + let stream_id = msg.stream_id.clone(); + + let state = self + .streams + .entry((peer_id, stream_id.clone())) + .or_default(); + + if !state.seen_sequences.insert(msg.sequence) { + // We have already seen a message with this sequence number. + return None; + } + + let result = state.insert(msg); + + if state.is_done() { + self.streams.remove(&(peer_id, stream_id)); + } + + result + } +} diff --git a/code/crates/test/framework/Cargo.toml b/code/crates/test/framework/Cargo.toml new file mode 100644 index 000000000..cb4d88a0b --- /dev/null +++ b/code/crates/test/framework/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "informalsystems-malachitebft-test-framework" +publish = false + +version.workspace = true +edition.workspace = true +repository.workspace = true +license.workspace = true +rust-version.workspace = true + +[dependencies] +malachitebft-engine.workspace = true +malachitebft-core-types.workspace = true +malachitebft-config.workspace = true +malachitebft-core-consensus.workspace = true +malachitebft-metrics.workspace = true +malachitebft-test.workspace = true +malachitebft-test-app.workspace = true +malachitebft-app-channel.workspace = true + +axum.workspace = true +bytesize.workspace = true +eyre.workspace = true +rand.workspace = true +ractor.workspace = true +tokio.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +tempfile.workspace = true + +[lints] +workspace = true diff --git a/code/crates/test/framework/src/lib.rs b/code/crates/test/framework/src/lib.rs new file mode 100644 index 000000000..f4b20e405 --- /dev/null +++ b/code/crates/test/framework/src/lib.rs @@ -0,0 +1,807 @@ +use core::fmt; +use std::fs::{create_dir_all, remove_dir_all}; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +use eyre::bail; +use rand::rngs::StdRng; +use rand::SeedableRng; +use tokio::sync::Mutex; +use tokio::task::JoinSet; +use tokio::time::error::Elapsed; +use tokio::time::{sleep, Duration}; +use tracing::{debug, error, error_span, info, Instrument}; + +use malachitebft_config::{ + Config as NodeConfig, Config, DiscoveryConfig, LoggingConfig, PubSubProtocol, SyncConfig, + TestConfig, TransportProtocol, +}; +use malachitebft_core_consensus::{LocallyProposedValue, SignedConsensusMsg}; +use malachitebft_core_types::{SignedVote, VotingPower}; +use malachitebft_engine::util::events::{Event, RxEvent, TxEvent}; +use malachitebft_test::{Height, PrivateKey, TestContext, Validator, ValidatorSet}; +use malachitebft_test_app::node::App; + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum Expected { + Exactly(usize), + AtLeast(usize), + AtMost(usize), + LessThan(usize), + GreaterThan(usize), +} + +impl Expected { + pub fn check(&self, actual: usize) -> bool { + match self { + Expected::Exactly(expected) => actual == *expected, + Expected::AtLeast(expected) => actual >= *expected, + Expected::AtMost(expected) => actual <= *expected, + Expected::LessThan(expected) => actual < *expected, + Expected::GreaterThan(expected) => actual > *expected, + } + } +} + +impl fmt::Display for Expected { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Expected::Exactly(n) => write!(f, "exactly {n}"), + Expected::AtLeast(n) => write!(f, "at least {n}"), + Expected::AtMost(n) => write!(f, "at most {n}"), + Expected::LessThan(n) => write!(f, "less than {n}"), + Expected::GreaterThan(n) => write!(f, "greater than {n}"), + } + } +} + +pub struct TestParams { + pub enable_sync: bool, + pub protocol: PubSubProtocol, + pub block_size: ByteSize, + pub tx_size: ByteSize, + pub txs_per_part: usize, + pub vote_extensions: Option, + pub value_payload: ValuePayload, + pub max_retain_blocks: usize, + pub timeout_step: Duration, +} + +impl Default for TestParams { + fn default() -> Self { + Self { + enable_sync: false, + protocol: PubSubProtocol::default(), + block_size: ByteSize::mib(1), + tx_size: ByteSize::kib(1), + txs_per_part: 256, + vote_extensions: None, + value_payload: ValuePayload::default(), + max_retain_blocks: 50, + timeout_step: Duration::from_secs(30), + } + } +} + +impl TestParams { + fn apply_to_config(&self, config: &mut Config) { + config.sync.enabled = self.enable_sync; + config.consensus.p2p.protocol = self.protocol; + config.consensus.max_block_size = self.block_size; + config.consensus.value_payload = self.value_payload; + config.test.tx_size = self.tx_size; + config.test.txs_per_part = self.txs_per_part; + config.test.vote_extensions.enabled = self.vote_extensions.is_some(); + config.test.vote_extensions.size = self.vote_extensions.unwrap_or_default(); + config.test.max_retain_blocks = self.max_retain_blocks; + config.consensus.timeouts.timeout_step = self.timeout_step; + } +} + +pub enum Step { + Crash(Duration), + ResetDb, + Restart(Duration), + WaitUntil(u64), + OnEvent(EventHandler), + Expect(Expected), + Success, + Fail(String), +} + +#[derive(Copy, Clone, Debug)] +pub enum HandlerResult { + WaitForNextEvent, + ContinueTest, +} + +pub type EventHandler = + Box, &mut S) -> Result + Send + Sync>; + +pub type NodeId = usize; + +pub struct TestNode { + pub id: NodeId, + pub voting_power: VotingPower, + pub start_height: Height, + pub start_delay: Duration, + pub steps: Vec>, + pub state: State, +} + +impl TestNode { + pub fn new(id: usize) -> Self + where + State: Default, + { + Self::new_with_state(id, State::default()) + } + + pub fn new_with_state(id: usize, state: State) -> Self { + Self { + id, + voting_power: 1, + start_height: Height::new(1), + start_delay: Duration::from_secs(0), + steps: vec![], + state, + } + } + + pub fn with_state(&mut self, state: State) -> &mut Self { + self.state = state; + self + } + + pub fn with_voting_power(&mut self, power: VotingPower) -> &mut Self { + self.voting_power = power; + self + } + + pub fn start(&mut self) -> &mut Self { + self.start_at(1) + } + + pub fn start_at(&mut self, height: u64) -> &mut Self { + self.start_after(height, Duration::from_secs(0)) + } + + pub fn start_after(&mut self, height: u64, delay: Duration) -> &mut Self { + self.start_height = Height::new(height); + self.start_delay = delay; + self + } + + pub fn crash(&mut self) -> &mut Self { + self.steps.push(Step::Crash(Duration::from_secs(0))); + self + } + + pub fn crash_after(&mut self, duration: Duration) -> &mut Self { + self.steps.push(Step::Crash(duration)); + self + } + + pub fn reset_db(&mut self) -> &mut Self { + self.steps.push(Step::ResetDb); + self + } + + pub fn restart_after(&mut self, delay: Duration) -> &mut Self { + self.steps.push(Step::Restart(delay)); + self + } + + pub fn wait_until(&mut self, height: u64) -> &mut Self { + self.steps.push(Step::WaitUntil(height)); + self + } + + pub fn on_event(&mut self, on_event: F) -> &mut Self + where + F: Fn(Event, &mut State) -> Result + + Send + + Sync + + 'static, + { + self.steps.push(Step::OnEvent(Box::new(on_event))); + self + } + + pub fn expect_wal_replay(&mut self, at_height: u64) -> &mut Self { + self.on_event(move |event, _| { + let Event::WalReplayBegin(height, count) = event else { + return Ok(HandlerResult::WaitForNextEvent); + }; + + info!("Replaying WAL at height {height} with {count} messages"); + + if height.as_u64() != at_height { + bail!("Unexpected WAL replay at height {height}, expected {at_height}") + } + + Ok(HandlerResult::ContinueTest) + }) + } + + pub fn expect_vote_set_request(&mut self, at_height: u64) -> &mut Self { + self.on_event(move |event, _| { + let Event::RequestedVoteSet(height, round) = event else { + return Ok(HandlerResult::WaitForNextEvent); + }; + + info!("Requested vote set for height {height} and round {round}"); + + if height.as_u64() != at_height { + bail!("Unexpected vote set request for height {height}, expected {at_height}") + } + + Ok(HandlerResult::ContinueTest) + }) + } + + pub fn on_proposed_value(&mut self, f: F) -> &mut Self + where + F: Fn(LocallyProposedValue, &mut State) -> Result + + Send + + Sync + + 'static, + { + self.on_event(move |event, state| { + if let Event::ProposedValue(value) = event { + f(value, state) + } else { + Ok(HandlerResult::WaitForNextEvent) + } + }) + } + + pub fn on_vote(&mut self, f: F) -> &mut Self + where + F: Fn(SignedVote, &mut State) -> Result + + Send + + Sync + + 'static, + { + self.on_event(move |event, state| { + if let Event::Published(SignedConsensusMsg::Vote(vote)) = event { + f(vote, state) + } else { + Ok(HandlerResult::WaitForNextEvent) + } + }) + } + + pub fn expect_decisions(&mut self, expected: Expected) -> &mut Self { + self.steps.push(Step::Expect(expected)); + self + } + + pub fn success(&mut self) -> &mut Self { + self.steps.push(Step::Success); + self + } + + pub fn full_node(&mut self) -> &mut Self { + self.voting_power = 0; + self + } + + pub fn is_full_node(&self) -> bool { + self.voting_power == 0 + } +} + +fn unique_id() -> usize { + use std::sync::atomic::{AtomicUsize, Ordering}; + static ID: AtomicUsize = AtomicUsize::new(1); + ID.fetch_add(1, Ordering::SeqCst) +} + +pub struct TestBuilder { + nodes: Vec>, +} + +impl Default for TestBuilder { + fn default() -> Self { + Self { nodes: Vec::new() } + } +} + +impl TestBuilder +where + S: Send + Sync + 'static, +{ + pub fn new() -> Self { + Self::default() + } + + pub fn add_node(&mut self) -> &mut TestNode + where + S: Default, + { + let node = TestNode::new(self.nodes.len() + 1); + self.nodes.push(node); + self.nodes.last_mut().unwrap() + } + + pub fn build(self) -> Test { + Test::new(self.nodes) + } +} + +pub struct Test { + pub id: usize, + pub nodes: Vec>, + pub private_keys: Vec, + pub validator_set: ValidatorSet, + pub consensus_base_port: usize, + pub mempool_base_port: usize, + pub metrics_base_port: usize, +} + +impl Test +where + S: Send + Sync + 'static, +{ + pub fn new(nodes: Vec>) -> Self { + let (validators, private_keys) = make_validators(voting_powers(&nodes)); + let validator_set = ValidatorSet::new(validators); + let id = unique_id(); + let base_port = 20_000 + id * 1000; + + Self { + id, + nodes, + private_keys, + validator_set, + consensus_base_port: base_port, + mempool_base_port: base_port + 100, + metrics_base_port: base_port + 200, + } + } + + pub fn generate_default_configs(&self) -> Vec { + (0..self.nodes.len()) + .map(|i| make_node_config(self, i)) + .collect() + } + + pub fn generate_custom_configs(&self, params: TestParams) -> Vec { + let mut configs = self.generate_default_configs(); + for config in &mut configs { + params.apply_to_config(config); + } + configs + } + + pub async fn run(self, timeout: Duration) { + let configs = self.generate_default_configs(); + self.run_with_config(configs, timeout).await + } + + pub async fn run_with_custom_config(self, timeout: Duration, params: TestParams) { + let configs = self.generate_custom_configs(params); + self.run_with_config(configs, timeout).await + } + + pub async fn run_with_config(self, configs: Vec, timeout: Duration) { + let _span = error_span!("test", id = %self.id).entered(); + + let mut set = JoinSet::new(); + + for ((node, config), private_key) in self + .nodes + .into_iter() + .zip(configs.into_iter()) + .zip(self.private_keys.into_iter()) + { + let validator_set = self.validator_set.clone(); + + let home_dir = tempfile::TempDir::with_prefix(format!( + "informalsystems-malachitebft-starknet-test-{}", + self.id + )) + .unwrap() + .into_path(); + + let id = node.id; + let task = run_node(node, home_dir, config, validator_set, private_key); + + set.spawn( + async move { + let result = tokio::time::timeout(timeout, task).await; + (id, result) + } + .in_current_span(), + ); + } + + let results = set.join_all().await; + check_results(results); + } +} + +fn check_results(results: Vec<(NodeId, Result)>) { + let mut errors = 0; + + for (id, result) in results { + let _span = tracing::error_span!("node", %id).entered(); + + match result { + Ok(TestResult::Success(reason)) => { + info!("Test succeeded: {reason}"); + } + Ok(TestResult::Failure(reason)) => { + errors += 1; + error!("Test failed: {reason}"); + } + Err(_) => { + errors += 1; + error!("Test timed out"); + } + } + } + + if errors > 0 { + error!("Test failed with {errors} errors"); + std::process::exit(1); + } +} + +#[derive(Debug)] +pub enum TestResult { + Success(String), + Failure(String), +} + +#[tracing::instrument("node", skip_all, fields(id = %node.id))] +async fn run_node( + mut node: TestNode, + home_dir: PathBuf, + config: Config, + validator_set: ValidatorSet, + private_key: PrivateKey, +) -> TestResult { + sleep(node.start_delay).await; + + info!("Spawning node with voting power {}", node.voting_power); + + let app = App { + config, + home_dir: home_dir.clone(), + private_key, + validator_set, + start_height: Some(node.start_height), + }; + + let mut handles = app.start().await.unwrap(); + + let mut rx_event = handles.tx_event.subscribe(); + let rx_event_bg = handles.tx_event.subscribe(); + + let decisions = Arc::new(AtomicUsize::new(0)); + let current_height = Arc::new(AtomicUsize::new(0)); + let failure = Arc::new(Mutex::new(None)); + let is_full_node = node.is_full_node(); + + let spawn_bg = |mut rx: RxEvent| { + tokio::spawn({ + let decisions = Arc::clone(&decisions); + let current_height = Arc::clone(¤t_height); + let failure = Arc::clone(&failure); + + async move { + while let Ok(event) = rx.recv().await { + match &event { + Event::StartedHeight(height) => { + current_height.store(height.as_u64() as usize, Ordering::SeqCst); + } + Event::Decided(_) => { + decisions.fetch_add(1, Ordering::SeqCst); + } + Event::Published(msg) if is_full_node => { + error!("Full node unexpectedly published a consensus message: {msg:?}"); + *failure.lock().await = Some(format!( + "Full node unexpectedly published a consensus message: {msg:?}" + )); + } + Event::WalReplayError(e) => { + error!("WAL replay error: {e}"); + *failure.lock().await = Some(format!("WAL replay error: {e}")); + } + _ => (), + } + + debug!("Event: {event}"); + } + } + .in_current_span() + }) + }; + + let mut bg = spawn_bg(rx_event_bg); + + for step in node.steps { + if let Some(failure) = failure.lock().await.take() { + return TestResult::Failure(failure); + } + + match step { + Step::WaitUntil(target_height) => { + info!("Waiting until node reaches height {target_height}"); + + 'inner: while let Ok(event) = rx_event.recv().await { + if let Some(failure) = failure.lock().await.take() { + return TestResult::Failure(failure); + } + + let Event::StartedHeight(height) = event else { + continue 'inner; + }; + + info!("Node started height {height}"); + + if height.as_u64() == target_height { + break 'inner; + } + } + } + + Step::Crash(after) => { + let height = current_height.load(Ordering::SeqCst); + + info!("Node will crash at height {height}"); + sleep(after).await; + + handles + .engine + .actor + .kill_and_wait(None) + .await + .expect("Node must stop"); + + bg.abort(); + handles.app.abort(); + handles.engine.handle.abort(); + } + + Step::ResetDb => { + info!("Resetting database"); + + let db_path = home_dir.join("db"); + let _ = remove_dir_all(&db_path); + create_dir_all(&db_path).expect("Database must be created"); + } + + Step::Restart(after) => { + info!("Node will restart in {after:?}"); + + sleep(after).await; + + let tx_event = TxEvent::new(); + let new_rx_event = tx_event.subscribe(); + let new_rx_event_bg = tx_event.subscribe(); + + info!("Spawning node"); + let new_handles = app.start().await.unwrap(); + + info!("Spawned"); + + bg = spawn_bg(new_rx_event_bg); + handles = new_handles; + rx_event = new_rx_event; + } + + Step::OnEvent(on_event) => { + 'inner: while let Ok(event) = rx_event.recv().await { + match on_event(event, &mut node.state) { + Ok(HandlerResult::WaitForNextEvent) => { + continue 'inner; + } + Ok(HandlerResult::ContinueTest) => { + break 'inner; + } + Err(e) => { + bg.abort(); + handles.engine.actor.stop(Some("Test failed".to_string())); + handles.app.abort(); + handles.engine.handle.abort(); + + return TestResult::Failure(e.to_string()); + } + } + } + } + + Step::Expect(expected) => { + let actual = decisions.load(Ordering::SeqCst); + + bg.abort(); + handles.engine.actor.stop(Some("Test failed".to_string())); + handles.app.abort(); + handles.engine.handle.abort(); + + if expected.check(actual) { + break; + } else { + return TestResult::Failure(format!( + "Incorrect number of decisions: got {actual}, expected: {expected}" + )); + } + } + + Step::Success => { + break; + } + + Step::Fail(reason) => { + bg.abort(); + handles.engine.actor.stop(Some("Test failed".to_string())); + handles.app.abort(); + handles.engine.handle.abort(); + + return TestResult::Failure(reason); + } + } + } + + let failure = failure.lock().await.take(); + if let Some(failure) = failure { + TestResult::Failure(failure) + } else { + TestResult::Success("OK".to_string()) + } +} + +pub fn init_logging(test_module: &str) { + use tracing_subscriber::util::SubscriberInitExt; + use tracing_subscriber::{EnvFilter, FmtSubscriber}; + + let debug_vars = &[("ACTIONS_RUNNER_DEBUG", "true"), ("MALACHITE_DEBUG", "1")]; + let enable_debug = debug_vars + .iter() + .any(|(k, v)| std::env::var(k).as_deref() == Ok(v)); + + let directive = if enable_debug { + format!("{test_module}=debug,informalsystems_malachitebft=trace,informalsystems_malachitebft_discovery=error,libp2p=warn,ractor=warn") + } else { + format!("{test_module}=debug,informalsystems_malachitebft=info,informalsystems_malachitebft_discovery=error,libp2p=warn,ractor=warn") + }; + + let filter = EnvFilter::builder().parse(directive).unwrap(); + + pub fn enable_ansi() -> bool { + use std::io::IsTerminal; + std::io::stdout().is_terminal() && std::io::stderr().is_terminal() + } + + // Construct a tracing subscriber with the supplied filter and enable reloading. + let builder = FmtSubscriber::builder() + .with_target(false) + .with_env_filter(filter) + .with_test_writer() + .with_ansi(enable_ansi()) + .with_thread_ids(false); + + let subscriber = builder.finish(); + + if let Err(e) = subscriber.try_init() { + eprintln!("Failed to initialize logging: {e}"); + } +} + +use bytesize::ByteSize; + +use malachitebft_config::{ + ConsensusConfig, MempoolConfig, MetricsConfig, P2pConfig, RuntimeConfig, TimeoutConfig, + ValuePayload, +}; + +fn transport_from_env(default: TransportProtocol) -> TransportProtocol { + if let Ok(protocol) = std::env::var("MALACHITE_TRANSPORT") { + TransportProtocol::from_str(&protocol).unwrap_or(default) + } else { + default + } +} + +pub fn make_node_config(test: &Test, i: usize) -> NodeConfig { + let transport = transport_from_env(TransportProtocol::Tcp); + let protocol = PubSubProtocol::default(); + + NodeConfig { + moniker: format!("node-{}", test.nodes[i].id), + logging: LoggingConfig::default(), + consensus: ConsensusConfig { + max_block_size: ByteSize::mib(1), + value_payload: ValuePayload::default(), + timeouts: TimeoutConfig::default(), + p2p: P2pConfig { + transport, + protocol, + discovery: DiscoveryConfig::default(), + listen_addr: transport.multiaddr("127.0.0.1", test.consensus_base_port + i), + persistent_peers: (0..test.nodes.len()) + .filter(|j| i != *j) + .map(|j| transport.multiaddr("127.0.0.1", test.consensus_base_port + j)) + .collect(), + ..Default::default() + }, + }, + mempool: MempoolConfig { + p2p: P2pConfig { + transport, + protocol, + listen_addr: transport.multiaddr("127.0.0.1", test.mempool_base_port + i), + persistent_peers: (0..test.nodes.len()) + .filter(|j| i != *j) + .map(|j| transport.multiaddr("127.0.0.1", test.mempool_base_port + j)) + .collect(), + ..Default::default() + }, + max_tx_count: 10000, + gossip_batch_size: 100, + }, + sync: SyncConfig { + enabled: true, + status_update_interval: Duration::from_secs(2), + request_timeout: Duration::from_secs(5), + }, + metrics: MetricsConfig { + enabled: false, + listen_addr: format!("127.0.0.1:{}", test.metrics_base_port + i) + .parse() + .unwrap(), + }, + runtime: RuntimeConfig::single_threaded(), + test: TestConfig::default(), + } +} + +fn voting_powers(nodes: &[TestNode]) -> Vec { + nodes.iter().map(|node| node.voting_power).collect() +} + +pub fn make_validators(voting_powers: Vec) -> (Vec, Vec) { + let mut rng = StdRng::seed_from_u64(0x42); + + let mut validators = Vec::with_capacity(voting_powers.len()); + let mut private_keys = Vec::with_capacity(voting_powers.len()); + + for vp in voting_powers { + let sk = PrivateKey::generate(&mut rng); + + if vp > 0 { + let val = Validator::new(sk.public_key(), vp); + validators.push(val); + } + + private_keys.push(sk); + } + + (validators, private_keys) +} + +use axum::routing::get; +use axum::Router; +use tokio::net::TcpListener; + +#[tracing::instrument(name = "metrics", skip_all)] +async fn serve_metrics(listen_addr: SocketAddr) { + let app = Router::new().route("/metrics", get(get_metrics)); + let listener = TcpListener::bind(listen_addr).await.unwrap(); + let address = listener.local_addr().unwrap(); + + async fn get_metrics() -> String { + let mut buf = String::new(); + malachitebft_metrics::export(&mut buf); + buf + } + + info!(%address, "Serving metrics"); + axum::serve(listener, app).await.unwrap(); +} diff --git a/code/crates/test/mempool/src/lib.rs b/code/crates/test/mempool/src/lib.rs index d39034561..32511671e 100644 --- a/code/crates/test/mempool/src/lib.rs +++ b/code/crates/test/mempool/src/lib.rs @@ -163,7 +163,7 @@ pub async fn spawn( let (tx_ctrl, rx_ctrl) = mpsc::channel(32); let peer_id = swarm.local_peer_id(); - let span = error_span!("mempool-network", peer = %peer_id); + let span = error_span!("mempool.network", peer = %peer_id); let task_handle = tokio::task::spawn(run(config, metrics, swarm, rx_ctrl, tx_event).instrument(span)); diff --git a/code/crates/test/src/node.rs b/code/crates/test/src/node.rs index 5662696b3..a1c1a3d83 100644 --- a/code/crates/test/src/node.rs +++ b/code/crates/test/src/node.rs @@ -53,11 +53,8 @@ impl Node for TestNode { file } - fn load_private_key_file( - &self, - path: impl AsRef, - ) -> std::io::Result { - let private_key = std::fs::read_to_string(path)?; + fn load_private_key_file(&self) -> std::io::Result { + let private_key = std::fs::read_to_string(&self.private_key_file)?; serde_json::from_str(&private_key).map_err(|e| e.into()) } diff --git a/code/crates/test/src/proposal_part.rs b/code/crates/test/src/proposal_part.rs index 414e3d7c5..de39601a8 100644 --- a/code/crates/test/src/proposal_part.rs +++ b/code/crates/test/src/proposal_part.rs @@ -60,6 +60,13 @@ impl ProposalPart { } } + pub fn as_fin(&self) -> Option<&ProposalFin> { + match self { + Self::Fin(fin) => Some(fin), + _ => None, + } + } + pub fn to_sign_bytes(&self) -> Bytes { proto::Protobuf::to_bytes(self).unwrap() } diff --git a/code/crates/test/tests/it/full_nodes.rs b/code/crates/test/tests/it/full_nodes.rs new file mode 100644 index 000000000..60964ab6a --- /dev/null +++ b/code/crates/test/tests/it/full_nodes.rs @@ -0,0 +1,165 @@ +use std::time::Duration; + +use malachitebft_test_framework::{init_logging, TestBuilder}; + +#[tokio::test] +pub async fn basic_full_node() { + init_logging(module_path!()); + + const HEIGHT: u64 = 5; + + let mut test = TestBuilder::<()>::new(); + + // Add 3 validators with different voting powers + test.add_node() + .with_voting_power(10) + .start() + .wait_until(HEIGHT) + .success(); + test.add_node() + .with_voting_power(20) + .start() + .wait_until(HEIGHT) + .success(); + test.add_node() + .with_voting_power(30) + .start() + .wait_until(HEIGHT) + .success(); + + // Add 2 full nodes that should follow consensus but not participate + test.add_node() + .full_node() + .start() + .wait_until(HEIGHT) + .success(); + test.add_node() + .full_node() + .start() + .wait_until(HEIGHT) + .success(); + + test.build().run(Duration::from_secs(30)).await +} + +#[tokio::test] +pub async fn full_node_crash_and_sync() { + init_logging(module_path!()); + + const HEIGHT: u64 = 10; + + let mut test = TestBuilder::<()>::new(); + + // Add validators + test.add_node() + .with_voting_power(20) + .start() + .wait_until(HEIGHT) + .success(); + test.add_node() + .with_voting_power(20) + .start() + .wait_until(HEIGHT) + .success(); + test.add_node() + .with_voting_power(20) + .start() + .wait_until(HEIGHT) + .success(); + + // Add a full node that crashes and needs to sync + test.add_node() + .full_node() + .start() + .wait_until(3) + .crash() + .reset_db() + .restart_after(Duration::from_secs(5)) + .wait_until(HEIGHT) + .success(); + + test.build().run(Duration::from_secs(60)).await +} + +#[tokio::test] +pub async fn late_starting_full_node() { + init_logging(module_path!()); + + const HEIGHT: u64 = 10; + + let mut test = TestBuilder::<()>::new(); + + // Add validators that start immediately + test.add_node() + .with_voting_power(20) + .start() + .wait_until(HEIGHT) + .success(); + test.add_node() + .with_voting_power(20) + .start() + .wait_until(HEIGHT) + .success(); + test.add_node() + .with_voting_power(20) + .start() + .wait_until(HEIGHT) + .success(); + + // Add a full node that starts late + test.add_node() + .full_node() + .start_after(1, Duration::from_secs(10)) + .wait_until(HEIGHT) + .success(); + + test.build().run(Duration::from_secs(60)).await +} + +#[tokio::test] +pub async fn mixed_validator_and_full_node_failures() { + init_logging(module_path!()); + + const HEIGHT: u64 = 10; + + let mut test = TestBuilder::<()>::new(); + + // Add stable validators + test.add_node() + .with_voting_power(30) + .start() + .wait_until(HEIGHT) + .success(); + test.add_node() + .with_voting_power(30) + .start() + .wait_until(HEIGHT) + .success(); + + // Add a validator that crashes + test.add_node() + .with_voting_power(20) + .start() + .wait_until(5) + .crash() + .restart_after(Duration::from_secs(10)) + .wait_until(HEIGHT) + .success(); + + // Add full nodes - one stable, one that crashes + test.add_node() + .full_node() + .start() + .wait_until(HEIGHT) + .success(); + test.add_node() + .full_node() + .start() + .wait_until(6) + .crash() + .restart_after(Duration::from_secs(15)) + .wait_until(HEIGHT) + .success(); + + test.build().run(Duration::from_secs(60)).await +} diff --git a/code/crates/test/tests/it/main.rs b/code/crates/test/tests/it/main.rs new file mode 100644 index 000000000..4d1ee7a67 --- /dev/null +++ b/code/crates/test/tests/it/main.rs @@ -0,0 +1,8 @@ +mod full_nodes; +mod n3f0; +mod n3f0_consensus_mode; +mod n3f0_pubsub_protocol; +mod n3f1; +mod value_sync; +mod vote_sync; +mod wal; diff --git a/code/crates/test/tests/it/n3f0.rs b/code/crates/test/tests/it/n3f0.rs new file mode 100644 index 000000000..17ba5f970 --- /dev/null +++ b/code/crates/test/tests/it/n3f0.rs @@ -0,0 +1,18 @@ +use std::time::Duration; + +use malachitebft_test_framework::{init_logging, TestBuilder}; + +#[tokio::test] +pub async fn all_correct_nodes() { + init_logging(module_path!()); + + const HEIGHT: u64 = 5; + + let mut test = TestBuilder::<()>::new(); + + test.add_node().start().wait_until(HEIGHT).success(); + test.add_node().start().wait_until(HEIGHT).success(); + test.add_node().start().wait_until(HEIGHT).success(); + + test.build().run(Duration::from_secs(30)).await +} diff --git a/code/crates/test/tests/it/n3f0_consensus_mode.rs b/code/crates/test/tests/it/n3f0_consensus_mode.rs new file mode 100644 index 000000000..379fde7a4 --- /dev/null +++ b/code/crates/test/tests/it/n3f0_consensus_mode.rs @@ -0,0 +1,52 @@ +use std::time::Duration; + +use malachitebft_config::ValuePayload; +use malachitebft_test_framework::{init_logging, TestBuilder, TestParams}; + +async fn run_test(params: TestParams) { + init_logging(module_path!()); + + const HEIGHT: u64 = 5; + + let mut test = TestBuilder::<()>::new(); + + test.add_node().start().wait_until(HEIGHT).success(); + test.add_node().start().wait_until(HEIGHT).success(); + test.add_node().start().wait_until(HEIGHT).success(); + + test.build() + .run_with_custom_config(Duration::from_secs(30), params) + .await +} + +#[tokio::test] +pub async fn parts_only() { + let params = TestParams { + value_payload: ValuePayload::PartsOnly, + ..Default::default() + }; + + run_test(params).await +} + +#[tokio::test] +pub async fn proposal_and_parts() { + let params = TestParams { + value_payload: ValuePayload::ProposalAndParts, + ..Default::default() + }; + + run_test(params).await +} + +// This functionality is not fully implemented yet +#[tokio::test] +#[ignore] +pub async fn proposal_only() { + let params = TestParams { + value_payload: ValuePayload::ProposalOnly, + ..Default::default() + }; + + run_test(params).await +} diff --git a/code/crates/test/tests/it/n3f0_pubsub_protocol.rs b/code/crates/test/tests/it/n3f0_pubsub_protocol.rs new file mode 100644 index 000000000..0bbf522e4 --- /dev/null +++ b/code/crates/test/tests/it/n3f0_pubsub_protocol.rs @@ -0,0 +1,77 @@ +use std::time::Duration; + +use bytesize::ByteSize; +use malachitebft_config::{GossipSubConfig, PubSubProtocol}; +use malachitebft_test_framework::{init_logging, TestBuilder, TestParams}; + +async fn run_test(params: TestParams) { + init_logging(module_path!()); + + const HEIGHT: u64 = 5; + + let mut test = TestBuilder::<()>::new(); + + test.add_node().start().wait_until(HEIGHT).success(); + test.add_node().start().wait_until(HEIGHT).success(); + test.add_node().start().wait_until(HEIGHT).success(); + + test.build() + .run_with_custom_config(Duration::from_secs(30), params) + .await +} + +#[tokio::test] +pub async fn broadcast_custom_config_1ktx() { + let params = TestParams { + enable_sync: false, + protocol: PubSubProtocol::Broadcast, + block_size: ByteSize::kib(1), + tx_size: ByteSize::kib(1), + txs_per_part: 1, + ..Default::default() + }; + + run_test(params).await +} + +#[tokio::test] +pub async fn broadcast_custom_config_2ktx() { + let params = TestParams { + enable_sync: false, + protocol: PubSubProtocol::Broadcast, + block_size: ByteSize::kib(2), + tx_size: ByteSize::kib(2), + txs_per_part: 1, + ..Default::default() + }; + + run_test(params).await +} + +#[tokio::test] +pub async fn gossip_custom_config_1ktx() { + let params = TestParams { + enable_sync: false, + protocol: PubSubProtocol::GossipSub(GossipSubConfig::default()), + block_size: ByteSize::kib(1), + tx_size: ByteSize::kib(1), + txs_per_part: 1, + ..Default::default() + }; + + run_test(params).await +} + +#[tokio::test] +pub async fn gossip_custom_config_2ktx() { + let params = TestParams { + enable_sync: false, + protocol: PubSubProtocol::GossipSub(GossipSubConfig::default()), + block_size: ByteSize::kib(2), + tx_size: ByteSize::kib(2), + txs_per_part: 1, + ..Default::default() + }; + + run_test(params).await +} diff --git a/code/crates/test/tests/it/n3f1.rs b/code/crates/test/tests/it/n3f1.rs new file mode 100644 index 000000000..95558c394 --- /dev/null +++ b/code/crates/test/tests/it/n3f1.rs @@ -0,0 +1,113 @@ +use std::time::Duration; + +use malachitebft_test_framework::{init_logging, TestBuilder}; + +#[tokio::test] +pub async fn proposer_fails_to_start() { + init_logging(module_path!()); + + const HEIGHT: u64 = 5; + + let mut test = TestBuilder::<()>::new(); + + test.add_node().with_voting_power(1).success(); + + test.add_node() + .with_voting_power(5) + .start() + .wait_until(HEIGHT) + .success(); + + test.add_node() + .with_voting_power(5) + .start() + .wait_until(HEIGHT) + .success(); + + test.build().run(Duration::from_secs(30)).await +} + +#[tokio::test] +pub async fn one_node_fails_to_start() { + init_logging(module_path!()); + + const HEIGHT: u64 = 5; + + let mut test = TestBuilder::<()>::new(); + + test.add_node() + .with_voting_power(5) + .start() + .wait_until(HEIGHT) + .success(); + + test.add_node() + .with_voting_power(5) + .start() + .wait_until(HEIGHT) + .success(); + + test.add_node().with_voting_power(1).success(); + + test.build().run(Duration::from_secs(30)).await +} + +#[tokio::test] +pub async fn proposer_crashes_at_height_2() { + init_logging(module_path!()); + + const HEIGHT: u64 = 5; + + let mut test = TestBuilder::<()>::new(); + + test.add_node() + .with_voting_power(5) + .start() + .wait_until(HEIGHT) + .success(); + + test.add_node() + .with_voting_power(1) + .start() + .wait_until(2) + .crash() + .success(); + + test.add_node() + .with_voting_power(5) + .start() + .wait_until(HEIGHT) + .success(); + + test.build().run(Duration::from_secs(30)).await +} + +#[tokio::test] +pub async fn one_node_crashes_at_height_3() { + init_logging(module_path!()); + + const HEIGHT: u64 = 5; + + let mut test = TestBuilder::<()>::new(); + + test.add_node() + .with_voting_power(5) + .start() + .wait_until(HEIGHT) + .success(); + + test.add_node() + .with_voting_power(5) + .start() + .wait_until(HEIGHT) + .success(); + + test.add_node() + .with_voting_power(1) + .start() + .wait_until(3) + .crash() + .success(); + + test.build().run(Duration::from_secs(30)).await +} diff --git a/code/crates/test/tests/it/value_sync.rs b/code/crates/test/tests/it/value_sync.rs new file mode 100644 index 000000000..ccc094e07 --- /dev/null +++ b/code/crates/test/tests/it/value_sync.rs @@ -0,0 +1,205 @@ +use std::time::Duration; + +use malachitebft_config::ValuePayload; +use malachitebft_test_framework::{init_logging, TestBuilder, TestParams}; + +pub async fn crash_restart_from_start(params: TestParams) { + init_logging(module_path!()); + + const HEIGHT: u64 = 10; + + let mut test = TestBuilder::<()>::new(); + + // Node 1 starts with 10 voting power. + test.add_node() + .with_voting_power(10) + .start() + // Wait until it reaches height 10 + .wait_until(HEIGHT) + // Record a successful test for this node + .success(); + + // Node 2 starts with 10 voting power, in parallel with node 1 and with the same behaviour + test.add_node() + .with_voting_power(10) + .start() + .wait_until(HEIGHT) + .success(); + + // Node 3 starts with 5 voting power, in parallel with node 1 and 2. + test.add_node() + .with_voting_power(5) + .start() + // Wait until the node reaches height 2... + .wait_until(2) + // ...and then kills it + .crash() + // Reset the database so that the node has to do Sync from height 1 + .reset_db() + // After that, it waits 5 seconds before restarting the node + .restart_after(Duration::from_secs(5)) + // Wait until the node reached the expected height + .wait_until(HEIGHT) + // Record a successful test for this node + .success(); + + test.build() + .run_with_custom_config( + Duration::from_secs(60), // Timeout for the whole test + TestParams { + enable_sync: true, // Enable Sync + ..params + }, + ) + .await +} + +#[tokio::test] +pub async fn crash_restart_from_start_parts_only() { + let params = TestParams { + value_payload: ValuePayload::PartsOnly, + ..Default::default() + }; + + crash_restart_from_start(params).await +} + +#[tokio::test] +#[ignore] // Not fully implemented yet +pub async fn crash_restart_from_start_proposal_only() { + let params = TestParams { + value_payload: ValuePayload::ProposalOnly, + ..Default::default() + }; + + crash_restart_from_start(params).await +} + +#[tokio::test] +pub async fn crash_restart_from_start_proposal_and_parts() { + let params = TestParams { + value_payload: ValuePayload::ProposalAndParts, + ..Default::default() + }; + + crash_restart_from_start(params).await +} + +#[tokio::test] +pub async fn crash_restart_from_latest() { + init_logging(module_path!()); + + const HEIGHT: u64 = 10; + + let mut test = TestBuilder::<()>::new(); + + test.add_node() + .with_voting_power(10) + .start() + .wait_until(HEIGHT) + .success(); + test.add_node() + .with_voting_power(10) + .start() + .wait_until(HEIGHT) + .success(); + + test.add_node() + .with_voting_power(5) + .start() + .wait_until(2) + .crash() + // We do not reset the database so that the node can restart from the latest height + .restart_after(Duration::from_secs(5)) + .wait_until(HEIGHT) + .success(); + + test.build() + .run_with_custom_config( + Duration::from_secs(60), + TestParams { + enable_sync: true, + ..Default::default() + }, + ) + .await +} + +#[tokio::test] +pub async fn aggressive_pruning() { + init_logging(module_path!()); + + const HEIGHT: u64 = 15; + + let mut test = TestBuilder::<()>::new(); + + // Node 1 starts with 10 voting power. + test.add_node() + .with_voting_power(10) + .start() + .wait_until(HEIGHT) + .success(); + test.add_node() + .with_voting_power(10) + .start() + .wait_until(HEIGHT) + .success(); + + test.add_node() + .with_voting_power(5) + .start() + .wait_until(2) + .crash() + .reset_db() + .restart_after(Duration::from_secs(5)) + .wait_until(HEIGHT) + .success(); + + test.build() + .run_with_custom_config( + Duration::from_secs(60), // Timeout for the whole test + TestParams { + enable_sync: true, // Enable Sync + max_retain_blocks: 10, // Prune blocks older than 10 + ..Default::default() + }, + ) + .await +} + +#[tokio::test] +pub async fn start_late() { + init_logging(module_path!()); + + const HEIGHT: u64 = 5; + + let mut test = TestBuilder::<()>::new(); + + test.add_node() + .with_voting_power(10) + .start() + .wait_until(HEIGHT * 2) + .success(); + + test.add_node() + .with_voting_power(10) + .start() + .wait_until(HEIGHT * 2) + .success(); + + test.add_node() + .with_voting_power(5) + .start_after(1, Duration::from_secs(10)) + .wait_until(HEIGHT) + .success(); + + test.build() + .run_with_custom_config( + Duration::from_secs(30), + TestParams { + enable_sync: true, + ..Default::default() + }, + ) + .await +} diff --git a/code/crates/test/tests/it/vote_sync.rs b/code/crates/test/tests/it/vote_sync.rs new file mode 100644 index 000000000..910956d03 --- /dev/null +++ b/code/crates/test/tests/it/vote_sync.rs @@ -0,0 +1,139 @@ +use std::time::Duration; + +use malachitebft_config::ValuePayload; +use malachitebft_test_framework::{init_logging, TestBuilder, TestParams}; + +// NOTE: These tests are very similar to the Sync tests, with the difference that +// all nodes have the same voting power and therefore get stuck when one of them dies. + +pub async fn crash_restart_from_start(params: TestParams) { + init_logging(module_path!()); + + const HEIGHT: u64 = 10; + + let mut test = TestBuilder::<()>::new(); + + test.add_node().start().wait_until(HEIGHT).success(); + test.add_node().start().wait_until(HEIGHT).success(); + + test.add_node() + .start() + // Wait until the node reaches height 4... + .wait_until(4) + // ...then kill it + .crash() + // Reset the database so that the node has to do Sync from height 1 + .reset_db() + // After that, it waits 5 seconds before restarting the node + .restart_after(Duration::from_secs(5)) + // Expect a vote set request for height 4 + .expect_vote_set_request(4) + // Wait until the node reached the expected height + .wait_until(HEIGHT) + // Record a successful test for this node + .success(); + + test.build() + .run_with_custom_config( + Duration::from_secs(60), // Timeout for the whole test + TestParams { + enable_sync: true, // Enable Sync + timeout_step: Duration::from_secs(5), + ..params + }, + ) + .await +} + +#[tokio::test] +pub async fn crash_restart_from_start_parts_only() { + let params = TestParams { + value_payload: ValuePayload::PartsOnly, + ..Default::default() + }; + + crash_restart_from_start(params).await +} + +#[tokio::test] +#[ignore] // Test app only supports parts-only mode +pub async fn crash_restart_from_start_proposal_only() { + let params = TestParams { + value_payload: ValuePayload::ProposalOnly, + ..Default::default() + }; + + crash_restart_from_start(params).await +} + +#[tokio::test] +#[ignore] // Test app only supports parts-only mode +pub async fn crash_restart_from_start_proposal_and_parts() { + let params = TestParams { + value_payload: ValuePayload::ProposalAndParts, + ..Default::default() + }; + + crash_restart_from_start(params).await +} + +#[tokio::test] +pub async fn crash_restart_from_latest() { + init_logging(module_path!()); + + const HEIGHT: u64 = 10; + + let mut test = TestBuilder::<()>::new(); + + test.add_node().start().wait_until(HEIGHT).success(); + test.add_node().start().wait_until(HEIGHT).success(); + test.add_node() + .start() + .wait_until(2) + .crash() + // We do not reset the database so that the node can restart from the latest height + .restart_after(Duration::from_secs(5)) + // Expect a vote set request for height 2 + .expect_vote_set_request(2) + .wait_until(HEIGHT) + .success(); + + test.build() + .run_with_custom_config( + Duration::from_secs(60), + TestParams { + enable_sync: true, + timeout_step: Duration::from_secs(5), + ..Default::default() + }, + ) + .await +} + +#[tokio::test] +pub async fn start_late() { + init_logging(module_path!()); + + const HEIGHT: u64 = 5; + let mut test = TestBuilder::<()>::new(); + + test.add_node().start().wait_until(HEIGHT * 2).success(); + test.add_node().start().wait_until(HEIGHT * 2).success(); + test.add_node() + .start_after(1, Duration::from_secs(10)) + // Expect a vote set request for height 1 + .expect_vote_set_request(1) + .wait_until(HEIGHT) + .success(); + + test.build() + .run_with_custom_config( + Duration::from_secs(60), + TestParams { + enable_sync: true, + timeout_step: Duration::from_secs(5), + ..Default::default() + }, + ) + .await +} diff --git a/code/crates/test/tests/it/wal.rs b/code/crates/test/tests/it/wal.rs new file mode 100644 index 000000000..1f79ba6ab --- /dev/null +++ b/code/crates/test/tests/it/wal.rs @@ -0,0 +1,198 @@ +use std::time::Duration; + +use eyre::bail; +use tracing::info; + +use informalsystems_malachitebft_test as malachitebft_test; + +use malachitebft_config::ValuePayload; +use malachitebft_core_consensus::LocallyProposedValue; +use malachitebft_core_types::SignedVote; +use malachitebft_engine::util::events::Event; +use malachitebft_test::TestContext; +use malachitebft_test_framework::{init_logging, HandlerResult, TestBuilder, TestParams}; + +#[tokio::test] +async fn proposer_crashes_after_proposing_parts_only() { + proposer_crashes_after_proposing(TestParams { + value_payload: ValuePayload::PartsOnly, + ..TestParams::default() + }) + .await +} + +#[tokio::test] +#[ignore] // Test app onky supports parts-only mode +async fn proposer_crashes_after_proposing_proposal_and_parts() { + proposer_crashes_after_proposing(TestParams { + value_payload: ValuePayload::ProposalAndParts, + ..TestParams::default() + }) + .await +} + +#[tokio::test] +#[ignore] // Not fully implemented yet +async fn proposer_crashes_after_proposing_proposal_only() { + proposer_crashes_after_proposing(TestParams { + value_payload: ValuePayload::ProposalOnly, + ..TestParams::default() + }) + .await +} + +async fn proposer_crashes_after_proposing(params: TestParams) { + init_logging(module_path!()); + + #[derive(Clone, Debug, Default)] + struct State { + first_proposed_value: Option>, + } + + const CRASH_HEIGHT: u64 = 4; + + let mut test = TestBuilder::::new(); + + test.add_node().with_voting_power(10).start().success(); + test.add_node().with_voting_power(10).start().success(); + + test.add_node() + .with_voting_power(40) + .start() + .wait_until(CRASH_HEIGHT) + // Wait until this node proposes a value + .on_event(|event, state| match event { + Event::ProposedValue(value) => { + info!("Proposer proposed block: {:?}", value.value); + state.first_proposed_value = Some(value); + Ok(HandlerResult::ContinueTest) + } + _ => Ok(HandlerResult::WaitForNextEvent), + }) + // Crash right after + .crash() + // Restart after 5 seconds + .restart_after(Duration::from_secs(5)) + // Check that we replay messages from the WAL + .expect_wal_replay(CRASH_HEIGHT) + // Wait until it proposes a value again, while replaying WAL + // Check that it is the same value as the first time + .on_proposed_value(|value, state| { + let Some(first_value) = state.first_proposed_value.as_ref() else { + bail!("Proposer did not propose a block"); + }; + + if first_value.value == value.value { + info!("Proposer re-proposed the same block: {:?}", value.value); + Ok(HandlerResult::ContinueTest) + } else { + bail!( + "Proposer just equivocated: expected {:?}, got {:?}", + first_value.value, + value.value + ) + } + }) + .success(); + + test.build() + .run_with_custom_config( + Duration::from_secs(60), + TestParams { + enable_sync: false, + ..params + }, + ) + .await +} + +#[tokio::test] +async fn non_proposer_crashes_after_voting_parts_only() { + non_proposer_crashes_after_voting(TestParams { + value_payload: ValuePayload::PartsOnly, + ..TestParams::default() + }) + .await +} + +#[tokio::test] +async fn non_proposer_crashes_after_voting_proposal_and_parts() { + non_proposer_crashes_after_voting(TestParams { + value_payload: ValuePayload::ProposalAndParts, + ..TestParams::default() + }) + .await +} + +#[tokio::test] +#[ignore] // Not fully implemented yet +async fn non_proposer_crashes_after_voting_proposal_only() { + non_proposer_crashes_after_voting(TestParams { + value_payload: ValuePayload::ProposalOnly, + ..TestParams::default() + }) + .await +} + +async fn non_proposer_crashes_after_voting(params: TestParams) { + init_logging(module_path!()); + + #[derive(Clone, Debug, Default)] + struct State { + first_vote: Option>, + } + + const CRASH_HEIGHT: u64 = 3; + + let mut test = TestBuilder::::new(); + + test.add_node() + .with_voting_power(40) + .start() + .wait_until(CRASH_HEIGHT) + // Wait until this node proposes a value + .on_vote(|vote, state| { + info!("Non-proposer voted"); + state.first_vote = Some(vote); + + Ok(HandlerResult::ContinueTest) + }) + // Crash right after + .crash() + // Restart after 5 seconds + .restart_after(Duration::from_secs(5)) + // Check that we replay messages from the WAL + .expect_wal_replay(CRASH_HEIGHT) + // Wait until it proposes a value again, while replaying WAL + // Check that it is the same value as the first time + .on_vote(|vote, state| { + let Some(first_vote) = state.first_vote.as_ref() else { + bail!("Non-proposer did not vote") + }; + + if first_vote.value == vote.value { + info!("Non-proposer voted the same way: {first_vote:?}"); + Ok(HandlerResult::ContinueTest) + } else { + bail!( + "Non-proposer just equivocated: expected {:?}, got {:?}", + first_vote.value, + vote.value + ) + } + }) + .success(); + + test.add_node().with_voting_power(10).start().success(); + test.add_node().with_voting_power(10).start().success(); + + test.build() + .run_with_custom_config( + Duration::from_secs(60), + TestParams { + enable_sync: false, + ..params + }, + ) + .await +} diff --git a/code/examples/channel/src/app.rs b/code/examples/channel/src/app.rs index 5bb5bbec9..fbfb1583f 100644 --- a/code/examples/channel/src/app.rs +++ b/code/examples/channel/src/app.rs @@ -1,4 +1,7 @@ +use std::time::Duration; + use eyre::eyre; +use tokio::time::sleep; use tracing::{error, info}; use malachitebft_app_channel::app::streaming::StreamContent; @@ -8,7 +11,7 @@ use malachitebft_app_channel::app::types::sync::RawDecidedValue; use malachitebft_app_channel::app::types::ProposedValue; use malachitebft_app_channel::{AppMsg, Channels, ConsensusMsg, NetworkMsg}; use malachitebft_test::codec::proto::ProtobufCodec; -use malachitebft_test::TestContext; +use malachitebft_test::{Height, TestContext}; use crate::state::{decode_value, State}; @@ -18,13 +21,22 @@ pub async fn run(state: &mut State, channels: &mut Channels) -> eyr // The first message to handle is the `ConsensusReady` message, signaling to the app // that Malachite is ready to start consensus AppMsg::ConsensusReady { reply } => { - info!("Consensus is ready"); + let start_height = state + .store + .max_decided_value_height() + .await + .map(|height| height.increment()) + .unwrap_or_else(|| Height::new(1)); + + info!(%start_height, "Consensus is ready"); + + sleep(Duration::from_millis(200)).await; // We can simply respond by telling the engine to start consensus // at the current height, which is initially 1 if reply .send(ConsensusMsg::StartHeight( - state.current_height, + start_height, state.get_validator_set().clone(), )) .is_err() @@ -63,22 +75,19 @@ pub async fn run(state: &mut State, channels: &mut Channels) -> eyr info!(%height, %round, "Consensus is requesting a value to propose"); // Here it is important that, if we have previously built a value for this height and round, - // we send back the very same value. We will not go into details here but this has to do - // with crash recovery and is not strictly necessary in this example app since all our state - // is kept in-memory and therefore is not crash tolerant at all. - if let Some(proposal) = state.get_previously_built_value(height, round).await? { - info!(value = %proposal.value.id(), "Re-using previously built value"); - - if reply.send(proposal).is_err() { - error!("Failed to send GetValue reply"); + // we send back the very same value. + let proposal = match state.get_previously_built_value(height, round).await? { + Some(proposal) => { + info!(value = %proposal.value.id(), "Re-using previously built value"); + proposal } - - return Ok(()); - } - - // If we have not previously built a value for that very same height and round, - // we need to create a new value to propose and send it back to consensus. - let proposal = state.propose_value(height, round).await?; + None => { + // If we have not previously built a value for that very same height and round, + // we need to create a new value to propose and send it back to consensus. + info!("Building a new value to propose"); + state.propose_value(height, round).await? + } + }; // Send it to consensus if reply.send(proposal.clone()).is_err() { @@ -89,17 +98,12 @@ pub async fn run(state: &mut State, channels: &mut Channels) -> eyr // and send those parts over the network to our peers, for them to re-assemble the full value. for stream_message in state.stream_proposal(proposal) { info!(%height, %round, "Streaming proposal part: {stream_message:?}"); + channels .network .send(NetworkMsg::PublishProposalPart(stream_message)) .await?; } - - // NOTE: In this tutorial, the value is simply an integer and therefore results in a very small - // message to gossip over the network, but if we were building a real application, - // say building blocks containing thousands of transactions, the proposal would typically only - // carry the block hash and the full block itself would be split into parts in order to - // avoid blowing up the bandwidth requirements by gossiping a single huge message. } AppMsg::ExtendVote { @@ -229,7 +233,10 @@ pub async fn run(state: &mut State, channels: &mut Channels) -> eyr // that was decided at some lower height. In that case, we fetch it from our store // and send it to consensus. AppMsg::GetDecidedValue { height, reply } => { + info!(%height, "Received sync request for decided value"); + let decided_value = state.get_decided_value(height).await; + info!(%height, "Found decided value: {decided_value:?}"); let raw_decided_value = decided_value.map(|decided_value| RawDecidedValue { certificate: decided_value.certificate, @@ -251,8 +258,31 @@ pub async fn run(state: &mut State, channels: &mut Channels) -> eyr } } - AppMsg::RestreamProposal { .. } => { - error!("RestreamProposal not implemented"); + AppMsg::RestreamProposal { + height, + round, + valid_round, + address, + value_id, + } => { + info!(%height, %round, "Restreaming existing proposal..."); + + let Some(proposal) = state + .get_proposal(height, round, valid_round, address, value_id) + .await + else { + error!(%height, %round, "Failed to find proposal to restream"); + return Ok(()); + }; + + for stream_message in state.stream_proposal(proposal) { + info!(%height, %round, "Publishing proposal part: {stream_message:?}"); + + channels + .network + .send(NetworkMsg::PublishProposalPart(stream_message)) + .await?; + } } AppMsg::PeerJoined { peer_id } => { diff --git a/code/examples/channel/src/node.rs b/code/examples/channel/src/node.rs index 19bf785c2..3ce0ba775 100644 --- a/code/examples/channel/src/node.rs +++ b/code/examples/channel/src/node.rs @@ -69,11 +69,8 @@ impl Node for App { file } - fn load_private_key_file( - &self, - path: impl AsRef, - ) -> std::io::Result { - let private_key = std::fs::read_to_string(path)?; + fn load_private_key_file(&self) -> std::io::Result { + let private_key = std::fs::read_to_string(&self.private_key_file)?; serde_json::from_str(&private_key).map_err(|e| e.into()) } @@ -104,24 +101,23 @@ impl Node for App { let span = tracing::error_span!("node", moniker = %self.config.moniker); let _enter = span.enter(); - let private_key_file = self.load_private_key_file(&self.private_key_file)?; + let private_key_file = self.load_private_key_file()?; let private_key = self.load_private_key(private_key_file); let public_key = self.get_public_key(&private_key); let address = self.get_address(&public_key); let signing_provider = self.get_signing_provider(private_key); let ctx = TestContext::new(); - let genesis = self.load_genesis(self.genesis_file.clone())?; + let genesis = self.load_genesis(&self.genesis_file)?; let initial_validator_set = genesis.validator_set.clone(); let codec = ProtobufCodec; - let mut channels = malachitebft_app_channel::run( + let (mut channels, _handle) = malachitebft_app_channel::start_engine( ctx, codec, self.clone(), self.config.clone(), - self.private_key_file.clone(), self.start_height, initial_validator_set, ) diff --git a/code/examples/channel/src/state.rs b/code/examples/channel/src/state.rs index 952ad0513..3db3f89a4 100644 --- a/code/examples/channel/src/state.rs +++ b/code/examples/channel/src/state.rs @@ -20,12 +20,15 @@ use malachitebft_app_channel::app::types::{LocallyProposedValue, PeerId}; use malachitebft_test::codec::proto::ProtobufCodec; use malachitebft_test::{ Address, Ed25519Provider, Genesis, Height, ProposalData, ProposalFin, ProposalInit, - ProposalPart, TestContext, ValidatorSet, Value, + ProposalPart, TestContext, ValidatorSet, Value, ValueId, }; use crate::store::{DecidedValue, Store}; use crate::streaming::{PartStreamsMap, ProposalParts}; +/// Number of historical values to keep in the store +const HISTORY_LENGTH: u64 = 100; + /// Represents the internal state of the application node /// Contains information about current height, round, proposals and blocks pub struct State { @@ -34,11 +37,11 @@ pub struct State { signing_provider: Ed25519Provider, genesis: Genesis, address: Address, - store: Store, vote_extensions: HashMap>, streams_map: PartStreamsMap, rng: StdRng, + pub store: Store, pub current_height: Height, pub current_round: Round, pub current_proposer: Option
, @@ -48,7 +51,10 @@ pub struct State { /// Represents errors that can occur during the verification of a proposal's signature. #[derive(Debug)] enum SignatureVerificationError { - /// Indicates that the `Fin` part of the proposal is missing. + /// Indicates that the `Init` part of the proposal is unexpectedly missing. + MissingInitPart, + + /// Indicates that the `Fin` part of the proposal is unexpectedly missing. MissingFinPart, /// Indicates that the proposer was not found in the validator set. @@ -118,11 +124,31 @@ impl State { return Ok(None); }; + // Check if the proposal is outdated + if parts.height < self.current_height { + debug!( + height = %self.current_height, + round = %self.current_round, + part.height = %parts.height, + part.round = %parts.round, + part.sequence = %sequence, + "Received outdated proposal part, ignoring" + ); + + return Ok(None); + } + // Verify the proposal signature match self.verify_proposal_signature(&parts) { Ok(()) => { // Signature verified successfully, continue processing } + Err(SignatureVerificationError::MissingInitPart) => { + return Err(eyre!( + "Expected to have full proposal but `Init` proposal part is missing for proposer: {}", + parts.proposer + )); + } Err(SignatureVerificationError::MissingFinPart) => { return Err(eyre!( "Expected to have full proposal but `Fin` proposal part is missing for proposer: {}", @@ -139,20 +165,6 @@ impl State { } } - // Check if the proposal is outdated - if parts.height < self.current_height { - debug!( - height = %self.current_height, - round = %self.current_round, - part.height = %parts.height, - part.round = %parts.round, - part.sequence = %sequence, - "Received outdated proposal part, ignoring" - ); - - return Ok(None); - } - // Re-assemble the proposal from its parts let value = assemble_value_from_parts(parts); @@ -173,29 +185,26 @@ impl State { certificate: CommitCertificate, extensions: VoteExtensions, ) -> eyre::Result<()> { - // Store extensions for use at next height if we are the proposer - self.vote_extensions - .insert(certificate.height.increment(), extensions); + let (height, round) = (certificate.height, certificate.round); - let Ok(Some(proposal)) = self - .store - .get_undecided_proposal(certificate.height, certificate.round) - .await - else { - error!( - height = %certificate.height, - "Trying to commit a value that is not decided" - ); + // Store extensions for use at next height if we are the proposer + self.vote_extensions.insert(height.increment(), extensions); - return Ok(()); // FIXME: Return an actual error and handle in caller + let Ok(Some(proposal)) = self.store.get_undecided_proposal(height, round).await else { + return Err(eyre!( + "Trying to commit a value at height {height} and round {round} for which there is no proposal: {}", + certificate.value_id + )); }; self.store .store_decided_value(&certificate, proposal.value) .await?; - // Prune the store, keep the last 5 heights - let retain_height = Height::new(certificate.height.as_u64().saturating_sub(5)); + self.store.remove_undecided_proposal(height, round).await?; + + // Prune the store, keep the last HISTORY_LENGTH values + let retain_height = Height::new(height.as_u64().saturating_sub(HISTORY_LENGTH)); self.store.prune(retain_height).await?; // Move to next height @@ -363,6 +372,21 @@ impl State { parts } + pub async fn get_proposal( + &self, + height: Height, + round: Round, + _valid_round: Round, + _proposer: Address, + value_id: ValueId, + ) -> Option> { + Some(LocallyProposedValue::new( + height, + round, + Value::new(value_id.as_u64()), + )) + } + /// Returns the set of validators. pub fn get_validator_set(&self) -> &ValidatorSet { &self.genesis.validator_set @@ -375,37 +399,39 @@ impl State { parts: &ProposalParts, ) -> Result<(), SignatureVerificationError> { let mut hasher = sha3::Keccak256::new(); - let mut signature = None; - - // Recreate the hash and extract the signature during traversal - for part in &parts.parts { - match part { - ProposalPart::Init(init) => { - hasher.update(init.height.as_u64().to_be_bytes()); - hasher.update(init.round.as_i64().to_be_bytes()); - } - ProposalPart::Data(data) => { - hasher.update(data.factor.to_be_bytes()); - } - ProposalPart::Fin(fin) => { - signature = Some(&fin.signature); - } + + let init = parts + .init() + .ok_or(SignatureVerificationError::MissingInitPart)?; + + let fin = parts + .fin() + .ok_or(SignatureVerificationError::MissingFinPart)?; + + let hash = { + hasher.update(init.height.as_u64().to_be_bytes()); + hasher.update(init.round.as_i64().to_be_bytes()); + + // The correctness of the hash computation relies on the parts being ordered by sequence + // number, which is guaranteed by the `PartStreamsMap`. + for part in parts.parts.iter().filter_map(|part| part.as_data()) { + hasher.update(part.factor.to_be_bytes()); } - } - let hash = hasher.finalize(); - let signature = signature.ok_or(SignatureVerificationError::MissingFinPart)?; + hasher.finalize() + }; - // Retrieve the public key of the proposer - let public_key = self + // Retrieve the the proposer + let proposer = self .get_validator_set() .get_by_address(&parts.proposer) - .map(|v| v.public_key); - - let public_key = public_key.ok_or(SignatureVerificationError::ProposerNotFound)?; + .ok_or(SignatureVerificationError::ProposerNotFound)?; // Verify the signature - if !self.signing_provider.verify(&hash, signature, &public_key) { + if !self + .signing_provider + .verify(&hash, &fin.signature, &proposer.public_key) + { return Err(SignatureVerificationError::InvalidSignature); } diff --git a/code/examples/channel/src/store.rs b/code/examples/channel/src/store.rs index 900b31337..b33d56606 100644 --- a/code/examples/channel/src/store.rs +++ b/code/examples/channel/src/store.rs @@ -207,6 +207,20 @@ impl Db { Ok(()) } + pub fn remove_undecided_proposal( + &self, + height: Height, + round: Round, + ) -> Result<(), StoreError> { + let tx = self.db.begin_write()?; + { + let mut table = tx.open_table(UNDECIDED_PROPOSALS_TABLE)?; + table.remove(&(height, round))?; + } + tx.commit()?; + Ok(()) + } + fn height_range
( &self, table: &Table, @@ -281,12 +295,12 @@ impl Db { Some(key.value()) } - // fn max_decided_value_height(&self) -> Option { - // let tx = self.db.begin_read().unwrap(); - // let table = tx.open_table(DECIDED_VALUES_TABLE).unwrap(); - // let (key, _) = table.last().ok()??; - // Some(key.value()) - // } + fn max_decided_value_height(&self) -> Option { + let tx = self.db.begin_read().unwrap(); + let table = tx.open_table(DECIDED_VALUES_TABLE).unwrap(); + let (key, _) = table.last().ok()??; + Some(key.value()) + } fn create_tables(&self) -> Result<(), StoreError> { let tx = self.db.begin_write()?; @@ -323,13 +337,13 @@ impl Store { .flatten() } - // pub async fn max_decided_value_height(&self) -> Option { - // let db = Arc::clone(&self.db); - // tokio::task::spawn_blocking(move || db.max_decided_value_height()) - // .await - // .ok() - // .flatten() - // } + pub async fn max_decided_value_height(&self) -> Option { + let db = Arc::clone(&self.db); + tokio::task::spawn_blocking(move || db.max_decided_value_height()) + .await + .ok() + .flatten() + } pub async fn get_decided_value( &self, @@ -362,6 +376,15 @@ impl Store { tokio::task::spawn_blocking(move || db.insert_undecided_proposal(value)).await? } + pub async fn remove_undecided_proposal( + &self, + height: Height, + round: Round, + ) -> Result<(), StoreError> { + let db = Arc::clone(&self.db); + tokio::task::spawn_blocking(move || db.remove_undecided_proposal(height, round)).await? + } + pub async fn get_undecided_proposal( &self, height: Height, diff --git a/code/examples/channel/src/streaming.rs b/code/examples/channel/src/streaming.rs index 036262391..09e6afcca 100644 --- a/code/examples/channel/src/streaming.rs +++ b/code/examples/channel/src/streaming.rs @@ -4,7 +4,7 @@ use std::collections::{BTreeMap, BinaryHeap, HashSet}; use malachitebft_app_channel::app::consensus::PeerId; use malachitebft_app_channel::app::streaming::{Sequence, StreamId, StreamMessage}; use malachitebft_app_channel::app::types::core::Round; -use malachitebft_test::{Address, Height, ProposalInit, ProposalPart}; +use malachitebft_test::{Address, Height, ProposalFin, ProposalInit, ProposalPart}; struct MinSeq(StreamMessage); @@ -102,6 +102,16 @@ pub struct ProposalParts { pub parts: Vec, } +impl ProposalParts { + pub fn init(&self) -> Option<&ProposalInit> { + self.parts.iter().find_map(|p| p.as_init()) + } + + pub fn fin(&self) -> Option<&ProposalFin> { + self.parts.iter().find_map(|p| p.as_fin()) + } +} + #[derive(Default)] pub struct PartStreamsMap { streams: BTreeMap<(PeerId, StreamId), StreamState>,