Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(consensus): reduce output-only messaging,fix jmt error #1155

Merged
merged 10 commits into from
Sep 24, 2024
2 changes: 1 addition & 1 deletion applications/tari_indexer/src/dry_run/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ where TSubstateCache: SubstateCache + 'static
for (epoch, public_key) in claim_instructions {
let vn = self
.epoch_manager
.get_validator_node_by_public_key(epoch, &public_key)
.get_validator_node_by_public_key(epoch, public_key.clone())
.await?;
let address = VirtualSubstateId::UnclaimedValidatorFee {
epoch: epoch.as_u64(),
Expand Down
42 changes: 42 additions & 0 deletions applications/tari_swarm_daemon/src/webserver/rpc/instances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,27 @@ use serde::{Deserialize, Serialize};

use crate::{config::InstanceType, process_manager::InstanceId, webserver::context::HandlerContext};

#[derive(Debug, Clone, Deserialize)]
pub struct StartAllRequest {
instance_type: Option<InstanceType>,
}

#[derive(Debug, Clone, Serialize)]
pub struct StartAllResponse {
pub num_instances: u32,
}

pub async fn start_all(context: &HandlerContext, req: StartAllRequest) -> Result<StartAllResponse, anyhow::Error> {
let instances = context.process_manager().list_instances(req.instance_type).await?;

let num_instances = instances.len() as u32;
for instance in instances {
context.process_manager().start_instance(instance.id).await?;
}

Ok(StartAllResponse { num_instances })
}

pub type StartInstanceRequest = String;

#[derive(Debug, Clone, Serialize)]
Expand Down Expand Up @@ -65,6 +86,27 @@ pub async fn stop(context: &HandlerContext, req: StopInstanceRequest) -> Result<
Ok(StopInstanceResponse { success: true })
}

#[derive(Debug, Clone, Deserialize)]
pub struct StopAllRequest {
instance_type: Option<InstanceType>,
}

#[derive(Debug, Clone, Serialize)]
pub struct StopAllResponse {
pub num_instances: u32,
}

pub async fn stop_all(context: &HandlerContext, req: StopAllRequest) -> Result<StopAllResponse, anyhow::Error> {
let instances = context.process_manager().list_instances(req.instance_type).await?;

let num_instances = instances.len() as u32;
for instance in instances {
context.process_manager().stop_instance(instance.id).await?;
}

Ok(StopAllResponse { num_instances })
}

#[derive(Debug, Clone, Deserialize)]
pub struct ListInstancesRequest {
pub by_type: Option<InstanceType>,
Expand Down
2 changes: 2 additions & 0 deletions applications/tari_swarm_daemon/src/webserver/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ async fn json_rpc_handler(Extension(context): Extension<Arc<HandlerContext>>, va
"add_indexer" => call_handler(context, value, rpc::indexers::create).await,
"add_validator_node" => call_handler(context, value, rpc::validator_nodes::create).await,
"start" => call_handler(context, value, rpc::instances::start).await,
"start_all" => call_handler(context, value, rpc::instances::start_all).await,
"stop" => call_handler(context, value, rpc::instances::stop).await,
"stop_all" => call_handler(context, value, rpc::instances::stop_all).await,
"list_instances" => call_handler(context, value, rpc::instances::list).await,
"delete_data" => call_handler(context, value, rpc::instances::delete_data).await,
"burn_funds" => call_handler(context, value, rpc::minotari_wallets::burn_funds).await,
Expand Down
13 changes: 12 additions & 1 deletion applications/tari_swarm_daemon/webui/src/routes/Main.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ function ExtraInfoVN({ name, url, addTxToPool, autoRefresh, state, horizontal }:
}
return (<>
<hr />
<h3>Pool transaction</h3>
<h3>Pool transactions {pool.length}</h3>
<table style={{
width: "100%",
}}>
Expand Down Expand Up @@ -493,8 +493,19 @@ export default function Main() {
console.log("resp", resp);
});
};

const stopAll = () => {
jsonRpc("stop_all", { instance_type: "TariValidatorNode" }).then(getInfo);
};

const startAll = () => {
jsonRpc("start_all", { instance_type: "TariValidatorNode" }).then(getInfo);
};

return (
<div className="main">
<button onClick={() => stopAll()}>Stop all VNs</button>
<button onClick={() => startAll()}>Start all VNs</button>
<button onClick={() => setShowLogs(!showLogs)}>{showLogs && "Hide" || "Show"} logs</button>
<button onClick={() => setAutoRefresh(!autoRefresh)}>{autoRefresh && "Disable" || "Enable"} autorefresh
</button>
Expand Down
3 changes: 3 additions & 0 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ pub async fn spawn_services(
state_store.clone(),
mempool.clone(),
virtual_substate_manager,
consensus_handle.clone(),
)
.await?;
// Save final node identity after comms has initialized. This is required because the public_address can be
Expand Down Expand Up @@ -434,6 +435,7 @@ async fn spawn_p2p_rpc(
shard_store_store: SqliteStateStore<PeerAddress>,
mempool: MempoolHandle,
virtual_substate_manager: VirtualSubstateManager<SqliteStateStore<PeerAddress>, EpochManagerHandle<PeerAddress>>,
consensus: ConsensusHandle,
) -> anyhow::Result<()> {
let rpc_server = RpcServer::builder()
.with_maximum_simultaneous_sessions(config.validator_node.rpc.max_simultaneous_sessions)
Expand All @@ -444,6 +446,7 @@ async fn spawn_p2p_rpc(
shard_store_store,
mempool,
virtual_substate_manager,
consensus,
));

let (notify_tx, notify_rx) = mpsc::unbounded_channel();
Expand Down
5 changes: 5 additions & 0 deletions applications/tari_validator_node/src/consensus/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: BSD-3-Clause

use tari_consensus::hotstuff::{ConsensusCurrentState, CurrentView, HotstuffEvent};
use tari_dan_common_types::Epoch;
use tari_transaction::Transaction;
use tokio::sync::{broadcast, mpsc, watch};

Expand Down Expand Up @@ -30,6 +31,10 @@ impl ConsensusHandle {
}
}

pub fn current_epoch(&self) -> Epoch {
self.current_view.get_epoch()
}

pub async fn notify_new_transaction(
&self,
transaction: Transaction,
Expand Down
8 changes: 7 additions & 1 deletion applications/tari_validator_node/src/p2p/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,24 @@ use tari_epoch_manager::base_layer::EpochManagerHandle;
use tari_state_store_sqlite::SqliteStateStore;
use tari_validator_node_rpc::rpc_service::ValidatorNodeRpcServer;

use crate::{p2p::services::mempool::MempoolHandle, virtual_substate::VirtualSubstateManager};
use crate::{
consensus::ConsensusHandle,
p2p::services::mempool::MempoolHandle,
virtual_substate::VirtualSubstateManager,
};

pub fn create_tari_validator_node_rpc_service(
epoch_manager: EpochManagerHandle<PeerAddress>,
shard_store_store: SqliteStateStore<PeerAddress>,
mempool: MempoolHandle,
virtual_substate_manager: VirtualSubstateManager<SqliteStateStore<PeerAddress>, EpochManagerHandle<PeerAddress>>,
consensus: ConsensusHandle,
) -> ValidatorNodeRpcServer<ValidatorNodeRpcServiceImpl> {
ValidatorNodeRpcServer::new(ValidatorNodeRpcServiceImpl::new(
epoch_manager,
shard_store_store,
mempool,
virtual_substate_manager,
consensus,
))
}
10 changes: 5 additions & 5 deletions applications/tari_validator_node/src/p2p/rpc/service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use tari_validator_node_rpc::rpc_service::ValidatorNodeRpcService;
use tokio::{sync::mpsc, task};

use crate::{
consensus::ConsensusHandle,
p2p::{
rpc::{block_sync_task::BlockSyncTask, state_sync_task::StateSyncTask},
services::mempool::MempoolHandle,
Expand All @@ -80,6 +81,7 @@ pub struct ValidatorNodeRpcServiceImpl {
shard_state_store: SqliteStateStore<PeerAddress>,
mempool: MempoolHandle,
virtual_substate_manager: VirtualSubstateManager<SqliteStateStore<PeerAddress>, EpochManagerHandle<PeerAddress>>,
consensus: ConsensusHandle,
}

impl ValidatorNodeRpcServiceImpl {
Expand All @@ -91,12 +93,14 @@ impl ValidatorNodeRpcServiceImpl {
SqliteStateStore<PeerAddress>,
EpochManagerHandle<PeerAddress>,
>,
consensus: ConsensusHandle,
) -> Self {
Self {
epoch_manager,
shard_state_store,
mempool,
virtual_substate_manager,
consensus,
}
}
}
Expand Down Expand Up @@ -340,11 +344,7 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl {
request: Request<GetCheckpointRequest>,
) -> Result<Response<GetCheckpointResponse>, RpcStatus> {
let msg = request.into_message();
let current_epoch = self
.epoch_manager
.current_epoch()
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;
let current_epoch = self.consensus.current_epoch();
if msg.current_epoch != current_epoch {
// This may occur if one of the nodes has not fully scanned the base layer
return Err(RpcStatus::bad_request(format!(
Expand Down
12 changes: 0 additions & 12 deletions dan_layer/common_types/src/committee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,18 +259,6 @@ impl CommitteeInfo {
.into_iter()
.filter(|substate_address| self.includes_substate_address(substate_address.borrow()))
}

/// Calculates the number of distinct shard groups for the given addresses
pub fn count_distinct_shard_groups<B: Borrow<SubstateAddress>, I: IntoIterator<Item = B>>(
&self,
addresses: I,
) -> usize {
addresses
.into_iter()
.map(|addr| addr.borrow().to_shard_group(self.num_shards, self.num_committees))
.collect::<std::collections::HashSet<_>>()
.len()
}
}

#[derive(Debug, Clone, Serialize)]
Expand Down
16 changes: 5 additions & 11 deletions dan_layer/common_types/src/versioned_substate_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ impl SubstateRequirement {
.map(|v| SubstateAddress::from_substate_id(self.substate_id(), v))
}

pub fn to_substate_address_zero_version(&self) -> SubstateAddress {
SubstateAddress::from_substate_id(self.substate_id(), 0)
}

/// Calculates and returns the shard number that this SubstateAddress belongs.
/// A shard is a fixed division of the 256-bit shard space.
/// If the substate version is not known, None is returned.
Expand Down Expand Up @@ -118,7 +122,7 @@ impl Display for SubstateRequirement {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.version {
Some(v) => write!(f, "{}:{}", self.substate_id, v),
None => write!(f, "{}", self.substate_id),
None => write!(f, "{}:?", self.substate_id),
}
}
}
Expand Down Expand Up @@ -180,16 +184,6 @@ impl VersionedSubstateId {
self.version
}

/// Calculates and returns the shard number that this SubstateAddress belongs.
/// A shard is an equal division of the 256-bit shard space.
pub fn to_shard(&self, num_shards: NumPreshards) -> Shard {
self.to_substate_address().to_shard(num_shards)
}

pub fn to_shard_group(&self, num_shards: NumPreshards, num_committees: u32) -> ShardGroup {
self.to_substate_address().to_shard_group(num_shards, num_committees)
}

pub fn to_previous_version(&self) -> Option<Self> {
self.version
.checked_sub(1)
Expand Down
7 changes: 4 additions & 3 deletions dan_layer/consensus/src/block_validations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub async fn check_proposal<TConsensusSpec: ConsensusSpec>(
check_sidechain_id(block, config)?;
check_hash_and_height(block)?;
let committee_for_block = epoch_manager
.get_committee_by_validator_public_key(block.epoch(), block.proposed_by())
.get_committee_by_validator_public_key(block.epoch(), block.proposed_by().clone())
.await?;
check_proposed_by_leader(leader_strategy, &committee_for_block, block)?;
check_signature(block)?;
Expand Down Expand Up @@ -181,7 +181,7 @@ pub async fn check_quorum_certificate<TConsensusSpec: ConsensusSpec>(
let mut vns = vec![];
for signature in qc.signatures() {
let vn = epoch_manager
.get_validator_node_by_public_key(qc.epoch(), signature.public_key())
.get_validator_node_by_public_key(qc.epoch(), signature.public_key().clone())
.await?;
let committee_info = epoch_manager
.get_committee_info_for_substate(qc.epoch(), vn.shard_key)
Expand Down Expand Up @@ -209,7 +209,8 @@ pub async fn check_quorum_certificate<TConsensusSpec: ConsensusSpec>(
qc.signatures()
.first()
.ok_or::<HotStuffError>(ProposalValidationError::QuorumWasNotReached { qc: qc.clone() }.into())?
.public_key(),
.public_key()
.clone(),
)
.await?;

Expand Down
Loading
Loading