Skip to content

Commit

Permalink
fix(consensus): vote handling improvement, bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Sep 25, 2024
1 parent 49c82f7 commit acdfd8f
Show file tree
Hide file tree
Showing 50 changed files with 599 additions and 592 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl ConsensusConstants {
committee_size: 7,
max_base_layer_blocks_ahead: 5,
max_base_layer_blocks_behind: 5,
num_preshards: NumPreshards::P64,
num_preshards: NumPreshards::P256,
pacemaker_max_base_time: Duration::from_secs(10),
}
}
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_indexer/src/transaction_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ where
target: LOG_TARGET,
"Submitting transaction with hash {} to the validator node", tx_hash
);
let transaction_substate_address = SubstateAddress::for_transaction_receipt(tx_hash.into_array().into());
let transaction_substate_address = tx_hash.to_substate_address();

if transaction.all_inputs_iter().next().is_none() {
self.try_with_committee(iter::once(transaction_substate_address), 2, |mut client| {
Expand Down Expand Up @@ -122,7 +122,7 @@ where
&self,
transaction_id: TransactionId,
) -> Result<TransactionResultStatus, TransactionManagerError> {
let transaction_substate_address = SubstateAddress::for_transaction_receipt(transaction_id.into_array().into());
let transaction_substate_address = transaction_id.to_substate_address();
self.try_with_committee(iter::once(transaction_substate_address), 1, |mut client| async move {
client.get_finalized_transaction_result(transaction_id).await.optional()
})
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_swarm_daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async-trait = { workspace = true }
axum = { workspace = true, features = ["multipart"] }
axum-jrpc = { workspace = true }
base64 = "0.22.1"
clap = { workspace = true, features = ["derive"] }
clap = { workspace = true, features = ["derive", "env"] }
fern = { workspace = true, features = ["colored"] }
futures = { workspace = true }
humantime = { workspace = true }
Expand All @@ -43,7 +43,7 @@ thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal", "process", "time", "fs"] }
toml = "0.8.12"
tonic = { workspace = true }
tower-http = { workspace = true, features = ["fs"] }
tower-http = { workspace = true, features = ["fs", "cors"] }
url = { workspace = true }

[target.'cfg(unix)'.dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ fn cargo_build<P: AsRef<Path>>(working_dir: P, package: &str) -> io::Result<Chil
Command::new("cargo")
.args(["build", "--release", "--bin", package])
.current_dir(working_dir)
.kill_on_drop(true)
.spawn()
}

Expand Down
29 changes: 10 additions & 19 deletions applications/tari_validator_node/src/consensus/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use prometheus::{core::Collector, IntCounter, IntGauge, IntGaugeVec, Opts, Regis
use tari_consensus::{hotstuff::HotStuffError, messages::HotstuffMessage, traits::hooks::ConsensusHooks};
use tari_dan_common_types::{NodeHeight, PeerAddress};
use tari_dan_storage::{
consensus_models::{Decision, QuorumDecision, TransactionAtom, TransactionPool, ValidBlock},
consensus_models::{Decision, QuorumDecision, TransactionAtom, ValidBlock},
StateStore,
};
use tari_state_store_sqlite::SqliteStateStore;
Expand All @@ -17,7 +17,7 @@ use crate::metrics::{CollectorRegister, LabelledCollector};

#[derive(Debug, Clone)]
pub struct PrometheusConsensusMetrics<S = SqliteStateStore<PeerAddress>> {
state_store: S,
_state_store: S,
local_blocks_received: IntCounter,
blocks_accepted: IntCounter,
blocks_rejected: IntCounter,
Expand All @@ -33,7 +33,7 @@ pub struct PrometheusConsensusMetrics<S = SqliteStateStore<PeerAddress>> {
pacemaker_leader_failures: IntCounter,
needs_sync: IntCounter,

transactions_pool_size: IntGauge,
_transactions_pool_size: IntGauge,
transactions_ready_for_consensus: IntCounter,
transactions_finalized_committed: IntCounter,
transactions_finalized_aborted: IntCounter,
Expand All @@ -42,7 +42,7 @@ pub struct PrometheusConsensusMetrics<S = SqliteStateStore<PeerAddress>> {
impl<S: StateStore> PrometheusConsensusMetrics<S> {
pub fn new(state_store: S, registry: &Registry) -> Self {
Self {
state_store,
_state_store: state_store,
local_blocks_received: IntCounter::new("consensus_blocks_received", "Number of blocks added")
.unwrap()
.register_at(registry),
Expand Down Expand Up @@ -97,9 +97,12 @@ impl<S: StateStore> PrometheusConsensusMetrics<S> {
)
.unwrap()
.register_at(registry),
transactions_pool_size: IntGauge::new("consensus_transactions_pool_size", "Number of transactions in pool")
.unwrap()
.register_at(registry),
_transactions_pool_size: IntGauge::new(
"consensus_transactions_pool_size",
"Number of transactions in pool",
)
.unwrap()
.register_at(registry),
}
}

Expand Down Expand Up @@ -168,18 +171,6 @@ impl<S: StateStore> ConsensusHooks for PrometheusConsensusMetrics<S> {
self.pacemaker_leader_failures.inc()
}

fn on_beat(&mut self) {
let Some(count) = self
.state_store
.with_read_tx(|tx| TransactionPool::<S>::new().count(tx))
.ok()
else {
return;
};

self.transactions_pool_size.set(count as i64);
}

fn on_needs_sync(&mut self, _local_height: NodeHeight, _remote_qc_height: NodeHeight) {
self.needs_sync.inc();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: BSD-3-Clause

use rand::rngs::OsRng;
use tari_common_types::types::{FixedHash, PublicKey};
use tari_common_types::types::PublicKey;
use tari_consensus::traits::{ValidatorSignatureService, VoteSignatureService};
use tari_dan_app_utilities::keypair::RistrettoKeypair;
use tari_dan_storage::consensus_models::{BlockId, QuorumDecision, ValidatorSchnorrSignature, ValidatorSignature};
Expand All @@ -29,14 +29,8 @@ impl ValidatorSignatureService for TariSignatureService {
}

impl VoteSignatureService for TariSignatureService {
fn verify(
&self,
signature: &ValidatorSignature,
leaf_hash: &FixedHash,
block_id: &BlockId,
decision: &QuorumDecision,
) -> bool {
let message = self.create_message(leaf_hash, block_id, decision);
fn verify(&self, signature: &ValidatorSignature, block_id: &BlockId, decision: &QuorumDecision) -> bool {
let message = self.create_message(block_id, decision);
signature.verify(message)
}
}
6 changes: 1 addition & 5 deletions applications/tari_validator_node/src/p2p/rpc/service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,7 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl {
}

async fn get_high_qc(&self, _request: Request<GetHighQcRequest>) -> Result<Response<GetHighQcResponse>, RpcStatus> {
let current_epoch = self
.epoch_manager
.current_epoch()
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;
let current_epoch = self.consensus.current_epoch();
let high_qc = self
.shard_state_store
.with_read_tx(|tx| {
Expand Down
44 changes: 24 additions & 20 deletions applications/tari_validator_node/src/p2p/services/mempool/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::collections::HashSet;
use std::{collections::HashSet, iter};

use log::*;
use tari_dan_common_types::{Epoch, NumPreshards, PeerAddress, ShardGroup, SubstateAddress};
use tari_dan_p2p::{proto, DanMessage};
use tari_dan_common_types::{Epoch, NumPreshards, PeerAddress, ShardGroup, ToSubstateAddress};
use tari_dan_p2p::{proto, DanMessage, NewTransactionMessage};
use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerReader};

use crate::p2p::services::{mempool::MempoolError, messaging::Gossip};
Expand All @@ -30,7 +30,7 @@ impl MempoolGossip<PeerAddress> {
}
}

pub async fn next_message(&mut self) -> Option<Result<(PeerAddress, DanMessage), MempoolError>> {
pub async fn next_message(&mut self) -> Option<Result<(PeerAddress, DanMessage, usize), MempoolError>> {
self.gossip
.next_message()
.await
Expand Down Expand Up @@ -79,19 +79,33 @@ impl MempoolGossip<PeerAddress> {
Ok(())
}

pub async fn forward_to_foreign_replicas<T: Into<DanMessage>>(
pub fn get_num_incoming_messages(&self) -> usize {
self.gossip.get_num_incoming_messages()
}

pub async fn forward_to_foreign_replicas(
&mut self,
epoch: Epoch,
substate_addresses: HashSet<SubstateAddress>,
msg: T,
msg: NewTransactionMessage,
exclude_shard_group: Option<ShardGroup>,
) -> Result<(), MempoolError> {
let n = self.epoch_manager.get_num_committees(epoch).await?;
let committee_shard = self.epoch_manager.get_local_committee_info(epoch).await?;
let local_shard_group = committee_shard.shard_group();
let shard_groups = substate_addresses
.into_iter()
.map(|s| s.to_shard_group(self.num_preshards, n))
let shard_groups = msg
.transaction
.all_inputs_iter()
.map(|s| {
s.or_zero_version()
.to_substate_address()
.to_shard_group(self.num_preshards, n)
})
.chain(iter::once(
msg.transaction
.id()
.to_substate_address()
.to_shard_group(self.num_preshards, n),
))
.filter(|sg| exclude_shard_group.as_ref() != Some(sg) && sg != &local_shard_group)
.collect::<HashSet<_>>();

Expand All @@ -108,16 +122,6 @@ impl MempoolGossip<PeerAddress> {

Ok(())
}

pub async fn gossip_to_foreign_replicas<T: Into<DanMessage>>(
&mut self,
epoch: Epoch,
addresses: HashSet<SubstateAddress>,
msg: T,
) -> Result<(), MempoolError> {
self.forward_to_foreign_replicas(epoch, addresses, msg, None).await?;
Ok(())
}
}

fn shard_group_to_topic(shard_group: ShardGroup) -> String {
Expand Down
Loading

0 comments on commit acdfd8f

Please sign in to comment.