diff --git a/applications/tari_validator_node/src/dan_node.rs b/applications/tari_validator_node/src/dan_node.rs index 000129e9a..d3d367c92 100644 --- a/applications/tari_validator_node/src/dan_node.rs +++ b/applications/tari_validator_node/src/dan_node.rs @@ -87,6 +87,8 @@ impl DanNode { } async fn handle_hotstuff_event(&self, event: HotstuffEvent) -> Result<(), anyhow::Error> { + info!(target: LOG_TARGET, "🔥 consensus event: {event}"); + let HotstuffEvent::BlockCommitted { block_id, .. } = event else { return Ok(()); }; diff --git a/dan_layer/consensus/src/hotstuff/event.rs b/dan_layer/consensus/src/hotstuff/event.rs index 838104f11..744e3a662 100644 --- a/dan_layer/consensus/src/hotstuff/event.rs +++ b/dan_layer/consensus/src/hotstuff/event.rs @@ -2,14 +2,22 @@ // SPDX-License-Identifier: BSD-3-Clause use tari_dan_common_types::NodeHeight; -use tari_dan_storage::consensus_models::BlockId; +use tari_dan_storage::consensus_models::{BlockId, LeafBlock}; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, thiserror::Error)] pub enum HotstuffEvent { - /// A block has been committed + #[error("Block {block_id} has been committed at height {height}")] BlockCommitted { block_id: BlockId, height: NodeHeight }, - /// A critical failure occurred in consensus + #[error("Consensus failure: {message}")] Failure { message: String }, - /// A leader has timed out + #[error("Leader timeout: new height {new_height}")] LeaderTimeout { new_height: NodeHeight }, + #[error("Block {block} has been parked ({num_missing_txs} missing, {num_awaiting_txs} awaiting execution)")] + ProposedBlockParked { + block: LeafBlock, + num_missing_txs: usize, + num_awaiting_txs: usize, + }, + #[error("Parked block {block} is ready")] + ParkedBlockReady { block: LeafBlock }, } diff --git a/dan_layer/consensus/src/hotstuff/on_inbound_message.rs b/dan_layer/consensus/src/hotstuff/on_inbound_message.rs index 85c30a0ec..0edf829a9 100644 --- a/dan_layer/consensus/src/hotstuff/on_inbound_message.rs +++ b/dan_layer/consensus/src/hotstuff/on_inbound_message.rs @@ -20,7 +20,10 @@ use tari_dan_storage::{ }; use tari_epoch_manager::EpochManagerReader; use tari_transaction::TransactionId; -use tokio::{sync::mpsc, time}; +use tokio::{ + sync::{broadcast, mpsc}, + time, +}; use super::config::HotstuffConfig; use crate::{ @@ -32,7 +35,7 @@ use crate::{ check_quorum_certificate, check_signature, }, - hotstuff::error::HotStuffError, + hotstuff::{error::HotStuffError, HotstuffEvent}, messages::{HotstuffMessage, ProposalMessage, RequestMissingTransactionsMessage}, traits::{ConsensusSpec, OutboundMessaging}, }; @@ -52,6 +55,7 @@ pub struct OnInboundMessage { tx_msg_ready: mpsc::UnboundedSender<(TConsensusSpec::Addr, HotstuffMessage)>, message_buffer: MessageBuffer, transaction_pool: TransactionPool, + tx_events: broadcast::Sender, } impl OnInboundMessage @@ -66,6 +70,7 @@ where TConsensusSpec: ConsensusSpec vote_signing_service: TConsensusSpec::SignatureService, outbound_messaging: TConsensusSpec::OutboundMessaging, transaction_pool: TransactionPool, + tx_events: broadcast::Sender, ) -> Self { let (tx_msg_ready, rx_msg_ready) = mpsc::unbounded_channel(); Self { @@ -79,6 +84,7 @@ where TConsensusSpec: ConsensusSpec tx_msg_ready, message_buffer: MessageBuffer::new(rx_msg_ready), transaction_pool, + tx_events, } } @@ -223,6 +229,10 @@ where TConsensusSpec: ConsensusSpec .get_validator_node_by_public_key(unparked_block.epoch(), unparked_block.proposed_by()) .await?; + let _ignore = self.tx_events.send(HotstuffEvent::ParkedBlockReady { + block: unparked_block.as_leaf_block(), + }); + self.report_message_ready( vn.address, HotstuffMessage::Proposal(ProposalMessage { block: unparked_block }), @@ -250,6 +260,12 @@ where TConsensusSpec: ConsensusSpec "🔥 Block {} has {} missing transactions and {} awaiting execution", block, missing_tx_ids.len(), awaiting_execution.len(), ); + let _ignore = self.tx_events.send(HotstuffEvent::ProposedBlockParked { + block: block.as_leaf_block(), + num_missing_txs: missing_tx_ids.len(), + num_awaiting_txs: awaiting_execution.len(), + }); + if !missing_tx_ids.is_empty() { let block_id = *block.id(); let epoch = block.epoch(); diff --git a/dan_layer/consensus/src/hotstuff/worker.rs b/dan_layer/consensus/src/hotstuff/worker.rs index 9c0fe67b1..975d699ce 100644 --- a/dan_layer/consensus/src/hotstuff/worker.rs +++ b/dan_layer/consensus/src/hotstuff/worker.rs @@ -135,6 +135,7 @@ impl HotstuffWorker { signing_service.clone(), outbound_messaging.clone(), transaction_pool.clone(), + tx_events.clone(), ), on_next_sync_view: OnNextSyncViewHandler::new( diff --git a/dan_layer/consensus_tests/src/support/harness.rs b/dan_layer/consensus_tests/src/support/harness.rs index f38915fd0..cd7a06d37 100644 --- a/dan_layer/consensus_tests/src/support/harness.rs +++ b/dan_layer/consensus_tests/src/support/harness.rs @@ -162,8 +162,8 @@ impl Test { match event { HotstuffEvent::BlockCommitted { block_id, height } => return (address, block_id, height), HotstuffEvent::Failure { message } => panic!("[{}] Consensus failure: {}", address, message), - HotstuffEvent::LeaderTimeout { new_height } => { - log::info!("[{address}] Leader timeout. New height {new_height}"); + other => { + log::info!("[{}] Ignoring event: {:?}", address, other); continue; }, }