From f61dcd47ff2910f3e5acf8d9f3358fa719459d62 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Fri, 21 Jun 2024 18:59:29 +0400 Subject: [PATCH] feat(consensus): proposed block parked and ready event (#1055) Description --- Adds ProposedBlockParked and ParkedBlockReady events to consensus Motivation and Context --- Notifies event consumers of a proposed block that needs to be parked or has been unparked. Currently this is simply logged but further tracking features may be implemented for these events. How Has This Been Tested? --- Manually What process can a PR reviewer use to test or verify this change? --- Check logs for parked blocks Breaking Changes --- - [x] None - [ ] Requires data directory to be deleted - [ ] Other - Please specify --- .../tari_validator_node/src/dan_node.rs | 2 ++ dan_layer/consensus/src/hotstuff/event.rs | 18 ++++++++++++----- .../src/hotstuff/on_inbound_message.rs | 20 +++++++++++++++++-- dan_layer/consensus/src/hotstuff/worker.rs | 1 + .../consensus_tests/src/support/harness.rs | 4 ++-- 5 files changed, 36 insertions(+), 9 deletions(-) diff --git a/applications/tari_validator_node/src/dan_node.rs b/applications/tari_validator_node/src/dan_node.rs index 000129e9a..d3d367c92 100644 --- a/applications/tari_validator_node/src/dan_node.rs +++ b/applications/tari_validator_node/src/dan_node.rs @@ -87,6 +87,8 @@ impl DanNode { } async fn handle_hotstuff_event(&self, event: HotstuffEvent) -> Result<(), anyhow::Error> { + info!(target: LOG_TARGET, "🔥 consensus event: {event}"); + let HotstuffEvent::BlockCommitted { block_id, .. } = event else { return Ok(()); }; diff --git a/dan_layer/consensus/src/hotstuff/event.rs b/dan_layer/consensus/src/hotstuff/event.rs index 838104f11..744e3a662 100644 --- a/dan_layer/consensus/src/hotstuff/event.rs +++ b/dan_layer/consensus/src/hotstuff/event.rs @@ -2,14 +2,22 @@ // SPDX-License-Identifier: BSD-3-Clause use tari_dan_common_types::NodeHeight; -use tari_dan_storage::consensus_models::BlockId; +use tari_dan_storage::consensus_models::{BlockId, LeafBlock}; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, thiserror::Error)] pub enum HotstuffEvent { - /// A block has been committed + #[error("Block {block_id} has been committed at height {height}")] BlockCommitted { block_id: BlockId, height: NodeHeight }, - /// A critical failure occurred in consensus + #[error("Consensus failure: {message}")] Failure { message: String }, - /// A leader has timed out + #[error("Leader timeout: new height {new_height}")] LeaderTimeout { new_height: NodeHeight }, + #[error("Block {block} has been parked ({num_missing_txs} missing, {num_awaiting_txs} awaiting execution)")] + ProposedBlockParked { + block: LeafBlock, + num_missing_txs: usize, + num_awaiting_txs: usize, + }, + #[error("Parked block {block} is ready")] + ParkedBlockReady { block: LeafBlock }, } diff --git a/dan_layer/consensus/src/hotstuff/on_inbound_message.rs b/dan_layer/consensus/src/hotstuff/on_inbound_message.rs index 85c30a0ec..0edf829a9 100644 --- a/dan_layer/consensus/src/hotstuff/on_inbound_message.rs +++ b/dan_layer/consensus/src/hotstuff/on_inbound_message.rs @@ -20,7 +20,10 @@ use tari_dan_storage::{ }; use tari_epoch_manager::EpochManagerReader; use tari_transaction::TransactionId; -use tokio::{sync::mpsc, time}; +use tokio::{ + sync::{broadcast, mpsc}, + time, +}; use super::config::HotstuffConfig; use crate::{ @@ -32,7 +35,7 @@ use crate::{ check_quorum_certificate, check_signature, }, - hotstuff::error::HotStuffError, + hotstuff::{error::HotStuffError, HotstuffEvent}, messages::{HotstuffMessage, ProposalMessage, RequestMissingTransactionsMessage}, traits::{ConsensusSpec, OutboundMessaging}, }; @@ -52,6 +55,7 @@ pub struct OnInboundMessage { tx_msg_ready: mpsc::UnboundedSender<(TConsensusSpec::Addr, HotstuffMessage)>, message_buffer: MessageBuffer, transaction_pool: TransactionPool, + tx_events: broadcast::Sender, } impl OnInboundMessage @@ -66,6 +70,7 @@ where TConsensusSpec: ConsensusSpec vote_signing_service: TConsensusSpec::SignatureService, outbound_messaging: TConsensusSpec::OutboundMessaging, transaction_pool: TransactionPool, + tx_events: broadcast::Sender, ) -> Self { let (tx_msg_ready, rx_msg_ready) = mpsc::unbounded_channel(); Self { @@ -79,6 +84,7 @@ where TConsensusSpec: ConsensusSpec tx_msg_ready, message_buffer: MessageBuffer::new(rx_msg_ready), transaction_pool, + tx_events, } } @@ -223,6 +229,10 @@ where TConsensusSpec: ConsensusSpec .get_validator_node_by_public_key(unparked_block.epoch(), unparked_block.proposed_by()) .await?; + let _ignore = self.tx_events.send(HotstuffEvent::ParkedBlockReady { + block: unparked_block.as_leaf_block(), + }); + self.report_message_ready( vn.address, HotstuffMessage::Proposal(ProposalMessage { block: unparked_block }), @@ -250,6 +260,12 @@ where TConsensusSpec: ConsensusSpec "🔥 Block {} has {} missing transactions and {} awaiting execution", block, missing_tx_ids.len(), awaiting_execution.len(), ); + let _ignore = self.tx_events.send(HotstuffEvent::ProposedBlockParked { + block: block.as_leaf_block(), + num_missing_txs: missing_tx_ids.len(), + num_awaiting_txs: awaiting_execution.len(), + }); + if !missing_tx_ids.is_empty() { let block_id = *block.id(); let epoch = block.epoch(); diff --git a/dan_layer/consensus/src/hotstuff/worker.rs b/dan_layer/consensus/src/hotstuff/worker.rs index 9c0fe67b1..975d699ce 100644 --- a/dan_layer/consensus/src/hotstuff/worker.rs +++ b/dan_layer/consensus/src/hotstuff/worker.rs @@ -135,6 +135,7 @@ impl HotstuffWorker { signing_service.clone(), outbound_messaging.clone(), transaction_pool.clone(), + tx_events.clone(), ), on_next_sync_view: OnNextSyncViewHandler::new( diff --git a/dan_layer/consensus_tests/src/support/harness.rs b/dan_layer/consensus_tests/src/support/harness.rs index f38915fd0..cd7a06d37 100644 --- a/dan_layer/consensus_tests/src/support/harness.rs +++ b/dan_layer/consensus_tests/src/support/harness.rs @@ -162,8 +162,8 @@ impl Test { match event { HotstuffEvent::BlockCommitted { block_id, height } => return (address, block_id, height), HotstuffEvent::Failure { message } => panic!("[{}] Consensus failure: {}", address, message), - HotstuffEvent::LeaderTimeout { new_height } => { - log::info!("[{address}] Leader timeout. New height {new_height}"); + other => { + log::info!("[{}] Ignoring event: {:?}", address, other); continue; }, }