Skip to content

Commit

Permalink
feat(consensus): proposed block parked and ready event (#1055)
Browse files Browse the repository at this point in the history
Description
---
Adds ProposedBlockParked and ParkedBlockReady events to consensus

Motivation and Context
---
Notifies event consumers of a proposed block that needs to be parked or
has been unparked.
Currently this is simply logged but further tracking features may be
implemented for these events.

How Has This Been Tested?
---
Manually

What process can a PR reviewer use to test or verify this change?
---
Check logs for parked blocks

Breaking Changes
---

- [x] None
- [ ] Requires data directory to be deleted
- [ ] Other - Please specify
  • Loading branch information
sdbondi authored Jun 21, 2024
1 parent 7b523be commit f61dcd4
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 9 deletions.
2 changes: 2 additions & 0 deletions applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ impl DanNode {
}

async fn handle_hotstuff_event(&self, event: HotstuffEvent) -> Result<(), anyhow::Error> {
info!(target: LOG_TARGET, "🔥 consensus event: {event}");

let HotstuffEvent::BlockCommitted { block_id, .. } = event else {
return Ok(());
};
Expand Down
18 changes: 13 additions & 5 deletions dan_layer/consensus/src/hotstuff/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@
// SPDX-License-Identifier: BSD-3-Clause

use tari_dan_common_types::NodeHeight;
use tari_dan_storage::consensus_models::BlockId;
use tari_dan_storage::consensus_models::{BlockId, LeafBlock};

#[derive(Debug, Clone)]
#[derive(Debug, Clone, thiserror::Error)]
pub enum HotstuffEvent {
/// A block has been committed
#[error("Block {block_id} has been committed at height {height}")]
BlockCommitted { block_id: BlockId, height: NodeHeight },
/// A critical failure occurred in consensus
#[error("Consensus failure: {message}")]
Failure { message: String },
/// A leader has timed out
#[error("Leader timeout: new height {new_height}")]
LeaderTimeout { new_height: NodeHeight },
#[error("Block {block} has been parked ({num_missing_txs} missing, {num_awaiting_txs} awaiting execution)")]
ProposedBlockParked {
block: LeafBlock,
num_missing_txs: usize,
num_awaiting_txs: usize,
},
#[error("Parked block {block} is ready")]
ParkedBlockReady { block: LeafBlock },
}
20 changes: 18 additions & 2 deletions dan_layer/consensus/src/hotstuff/on_inbound_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ use tari_dan_storage::{
};
use tari_epoch_manager::EpochManagerReader;
use tari_transaction::TransactionId;
use tokio::{sync::mpsc, time};
use tokio::{
sync::{broadcast, mpsc},
time,
};

use super::config::HotstuffConfig;
use crate::{
Expand All @@ -32,7 +35,7 @@ use crate::{
check_quorum_certificate,
check_signature,
},
hotstuff::error::HotStuffError,
hotstuff::{error::HotStuffError, HotstuffEvent},
messages::{HotstuffMessage, ProposalMessage, RequestMissingTransactionsMessage},
traits::{ConsensusSpec, OutboundMessaging},
};
Expand All @@ -52,6 +55,7 @@ pub struct OnInboundMessage<TConsensusSpec: ConsensusSpec> {
tx_msg_ready: mpsc::UnboundedSender<(TConsensusSpec::Addr, HotstuffMessage)>,
message_buffer: MessageBuffer<TConsensusSpec::Addr>,
transaction_pool: TransactionPool<TConsensusSpec::StateStore>,
tx_events: broadcast::Sender<HotstuffEvent>,
}

impl<TConsensusSpec> OnInboundMessage<TConsensusSpec>
Expand All @@ -66,6 +70,7 @@ where TConsensusSpec: ConsensusSpec
vote_signing_service: TConsensusSpec::SignatureService,
outbound_messaging: TConsensusSpec::OutboundMessaging,
transaction_pool: TransactionPool<TConsensusSpec::StateStore>,
tx_events: broadcast::Sender<HotstuffEvent>,
) -> Self {
let (tx_msg_ready, rx_msg_ready) = mpsc::unbounded_channel();
Self {
Expand All @@ -79,6 +84,7 @@ where TConsensusSpec: ConsensusSpec
tx_msg_ready,
message_buffer: MessageBuffer::new(rx_msg_ready),
transaction_pool,
tx_events,
}
}

Expand Down Expand Up @@ -223,6 +229,10 @@ where TConsensusSpec: ConsensusSpec
.get_validator_node_by_public_key(unparked_block.epoch(), unparked_block.proposed_by())
.await?;

let _ignore = self.tx_events.send(HotstuffEvent::ParkedBlockReady {
block: unparked_block.as_leaf_block(),
});

self.report_message_ready(
vn.address,
HotstuffMessage::Proposal(ProposalMessage { block: unparked_block }),
Expand Down Expand Up @@ -250,6 +260,12 @@ where TConsensusSpec: ConsensusSpec
"🔥 Block {} has {} missing transactions and {} awaiting execution", block, missing_tx_ids.len(), awaiting_execution.len(),
);

let _ignore = self.tx_events.send(HotstuffEvent::ProposedBlockParked {
block: block.as_leaf_block(),
num_missing_txs: missing_tx_ids.len(),
num_awaiting_txs: awaiting_execution.len(),
});

if !missing_tx_ids.is_empty() {
let block_id = *block.id();
let epoch = block.epoch();
Expand Down
1 change: 1 addition & 0 deletions dan_layer/consensus/src/hotstuff/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ impl<TConsensusSpec: ConsensusSpec> HotstuffWorker<TConsensusSpec> {
signing_service.clone(),
outbound_messaging.clone(),
transaction_pool.clone(),
tx_events.clone(),
),

on_next_sync_view: OnNextSyncViewHandler::new(
Expand Down
4 changes: 2 additions & 2 deletions dan_layer/consensus_tests/src/support/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ impl Test {
match event {
HotstuffEvent::BlockCommitted { block_id, height } => return (address, block_id, height),
HotstuffEvent::Failure { message } => panic!("[{}] Consensus failure: {}", address, message),
HotstuffEvent::LeaderTimeout { new_height } => {
log::info!("[{address}] Leader timeout. New height {new_height}");
other => {
log::info!("[{}] Ignoring event: {:?}", address, other);
continue;
},
}
Expand Down

0 comments on commit f61dcd4

Please sign in to comment.