Skip to content

Commit

Permalink
fix(consensus): reduce output-only messaging,fix jmt error
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Sep 18, 2024
1 parent 70fcc69 commit 7f41097
Show file tree
Hide file tree
Showing 14 changed files with 191 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub fn process_foreign_block<TTx: StateStoreReadTransaction>(
);
info!(
target: LOG_TARGET,
"🧩 Processing FOREIGN PROPOSAL for block {}, justify_qc: {}",
"🧩 Processing FOREIGN PROPOSAL {}, justify_qc: {}",
proposal.block(),
proposal.justify_qc(),
);
Expand Down
19 changes: 13 additions & 6 deletions dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ where TConsensusSpec: ConsensusSpec
is_newview_propose: bool,
propose_epoch_end: bool,
) -> Result<(), HotStuffError> {
let _timer = TraceTimer::info(LOG_TARGET, "OnPropose");
if let Some(last_proposed) = self.store.with_read_tx(|tx| LastProposed::get(tx)).optional()? {
if last_proposed.epoch == leaf_block.epoch && last_proposed.height > leaf_block.height {
// is_newview_propose means that a NEWVIEW has reached quorum and nodes are expecting us to propose.
Expand Down Expand Up @@ -179,6 +180,13 @@ where TConsensusSpec: ConsensusSpec
let high_qc = HighQc::get(&**tx, epoch)?;
let high_qc_cert = high_qc.get_quorum_certificate(&**tx)?;

info!(
target: LOG_TARGET,
"🌿 PROPOSE local block with parent {}. HighQC: {}",
leaf_block,
high_qc_cert,
);

let next_block = on_propose.build_next_block(
tx,
epoch,
Expand Down Expand Up @@ -444,7 +452,7 @@ where TConsensusSpec: ConsensusSpec
// Each foreign proposal is "heavier" than a transaction command
.checked_sub(foreign_proposals.len() * 4 + burnt_utxos.len())
.filter(|n| *n > 0)
.map(|size| self.transaction_pool.get_batch_for_next_block(tx, size))
.map(|size| self.transaction_pool.get_batch_for_next_block(tx, size, parent_block.block_id()))
.transpose()?
.unwrap_or_default()
};
Expand Down Expand Up @@ -527,7 +535,7 @@ where TConsensusSpec: ConsensusSpec
}
timer.done();

// This relies on the UTXO commands being ordered last
// This relies on the UTXO commands being ordered after transaction commands
for utxo in burnt_utxos {
let id = VersionedSubstateId::new(utxo.substate_id.clone(), 0);
let shard = id.to_substate_address().to_shard(local_committee_info.num_preshards());
Expand All @@ -549,8 +557,7 @@ where TConsensusSpec: ConsensusSpec
);

let timer = TraceTimer::info(LOG_TARGET, "Propose calculate state root");
let pending_tree_diffs =
PendingShardStateTreeDiff::get_all_up_to_commit_block(tx, high_qc_certificate.block_id())?;
let pending_tree_diffs = PendingShardStateTreeDiff::get_all_up_to_commit_block(tx, parent_block.block_id())?;

let (state_root, _) = calculate_state_merkle_root(
tx,
Expand Down Expand Up @@ -728,15 +735,15 @@ where TConsensusSpec: ConsensusSpec
// foreign inputs/outputs.
tx_rec.set_local_decision(Decision::Commit);
// Set partial evidence using local inputs and known outputs.
tx_rec.set_evidence(multishard.to_initial_evidence(
tx_rec.evidence_mut().update(&multishard.to_initial_evidence(
local_committee_info.num_preshards(),
local_committee_info.num_committees(),
));
}
},
Decision::Abort => {
// CASE: The transaction was ABORTed due to a lock conflict
let execution = multishard.into_execution().expect("Abort should have execution");
let execution = multishard.into_execution().expect("Abort must have execution");
tx_rec.update_from_execution(
local_committee_info.num_preshards(),
local_committee_info.num_committees(),
Expand Down
24 changes: 13 additions & 11 deletions dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tari_dan_common_types::{
committee::CommitteeInfo,
optional::Optional,
Epoch,
ShardGroup,
ToSubstateAddress,
VersionedSubstateId,
};
Expand Down Expand Up @@ -108,7 +109,7 @@ where TConsensusSpec: ConsensusSpec
valid_block: &ValidBlock,
local_committee_info: &CommitteeInfo,
can_propose_epoch_end: bool,
foreign_committee_infos: HashMap<BlockId, CommitteeInfo>,
foreign_committee_infos: HashMap<ShardGroup, CommitteeInfo>,
) -> Result<BlockDecision, HotStuffError> {
let _timer =
TraceTimer::info(LOG_TARGET, "Decide on local block").with_iterations(valid_block.block().commands().len());
Expand Down Expand Up @@ -247,7 +248,7 @@ where TConsensusSpec: ConsensusSpec
local_committee_info: &CommitteeInfo,
valid_block: &ValidBlock,
can_propose_epoch_end: bool,
foreign_committee_infos: &HashMap<BlockId, CommitteeInfo>,
foreign_committee_infos: &HashMap<ShardGroup, CommitteeInfo>,
proposed_block_change_set: &mut ProposedBlockChangeSet,
) -> Result<(), HotStuffError> {
if !self.should_vote(tx, valid_block.block())? {
Expand Down Expand Up @@ -299,7 +300,7 @@ where TConsensusSpec: ConsensusSpec
block: &Block,
local_committee_info: &CommitteeInfo,
can_propose_epoch_end: bool,
foreign_committee_infos: &HashMap<BlockId, CommitteeInfo>,
foreign_committee_infos: &HashMap<ShardGroup, CommitteeInfo>,
proposed_block_change_set: &mut ProposedBlockChangeSet,
) -> Result<(), HotStuffError> {
// Store used for transactions that have inputs without specific versions.
Expand Down Expand Up @@ -407,11 +408,12 @@ where TConsensusSpec: ConsensusSpec
}
},
Command::ForeignProposal(fp_atom) => {
let Some(foreign_committee_info) = foreign_committee_infos.get(&fp_atom.block_id) else {
let Some(foreign_committee_info) = foreign_committee_infos.get(&fp_atom.shard_group) else {
warn!(
target: LOG_TARGET,
"❌ NO VOTE: ForeignProposal command in block {} but no foreign proposal found",
"❌ NO VOTE: ForeignProposal command in block {} {} but no foreign proposal found",
fp_atom.block_id,
fp_atom.shard_group,
);
proposed_block_change_set.no_vote(NoVoteReason::ForeignProposalCommandInBlockMissing);
return Ok(());
Expand Down Expand Up @@ -481,7 +483,7 @@ where TConsensusSpec: ConsensusSpec
return Ok(());
}

let pending = PendingShardStateTreeDiff::get_all_up_to_commit_block(tx, block.justify().block_id())?;
let pending = PendingShardStateTreeDiff::get_all_up_to_commit_block(tx, block.parent())?;
let (expected_merkle_root, tree_diffs) = calculate_state_merkle_root(
tx,
block.shard_group(),
Expand Down Expand Up @@ -713,7 +715,7 @@ where TConsensusSpec: ConsensusSpec

info!(
target: LOG_TARGET,
"👨‍🔧 PREPARE: Executing transaction {} in block {}",
"👨‍🔧 PREPARE: Transaction {} in block {}",
tx_rec.transaction_id(),
block,
);
Expand Down Expand Up @@ -785,7 +787,7 @@ where TConsensusSpec: ConsensusSpec
// foreign inputs/outputs.
tx_rec.set_local_decision(Decision::Commit);
// Set partial evidence for local inputs using what we know.
tx_rec.set_evidence(multishard.to_initial_evidence(
tx_rec.evidence_mut().update(&multishard.to_initial_evidence(
local_committee_info.num_preshards(),
local_committee_info.num_committees(),
));
Expand Down Expand Up @@ -1481,7 +1483,7 @@ where TConsensusSpec: ConsensusSpec
{
warn!(
target: LOG_TARGET,
"❌ NO VOTE: Foreign proposal for block {block_id} has already been proposed in this block.",
"❌ NO VOTE: Foreign proposal {block_id} has already been proposed in this block.",
block_id = fp_atom.block_id,
);
return Ok(Some(NoVoteReason::ForeignProposalAlreadyProposed));
Expand All @@ -1490,7 +1492,7 @@ where TConsensusSpec: ConsensusSpec
let Some(fp) = fp_atom.get_proposal(tx).optional()? else {
warn!(
target: LOG_TARGET,
"❌ NO VOTE: Foreign proposal for block {block_id} has not been received.",
"❌ NO VOTE: Foreign proposal {block_id} has not been received.",
block_id = fp_atom.block_id,
);
return Ok(Some(NoVoteReason::ForeignProposalNotReceived));
Expand All @@ -1502,7 +1504,7 @@ where TConsensusSpec: ConsensusSpec
if matches!(fp.status(), ForeignProposalStatus::Confirmed) {
warn!(
target: LOG_TARGET,
"❌ NO VOTE: Foreign proposal for block {block_id} has status {status}.",
"❌ NO VOTE: Foreign proposal {block_id} has status {status}.",
block_id = fp_atom.block_id,
status = fp.status(),
);
Expand Down
42 changes: 20 additions & 22 deletions dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,6 @@ where TConsensusSpec: ConsensusSpec
local_committee_info: &CommitteeInfo,
) -> Result<(), HotStuffError> {
let _timer = TraceTimer::debug(LOG_TARGET, "OnReceiveForeignProposal");
let foreign_committee_info = self
.epoch_manager
.get_committee_info_by_validator_public_key(message.block.epoch(), message.block.proposed_by())
.await?;
self.validate_and_save(message, local_committee_info, &foreign_committee_info)?;
Ok(())
}

pub fn validate_and_save(
&mut self,
message: ForeignProposalMessage,
local_committee_info: &CommitteeInfo,
foreign_committee_info: &CommitteeInfo,
) -> Result<(), HotStuffError> {
let proposal = ForeignProposal::from(message);

if self.store.with_read_tx(|tx| proposal.exists(tx))? {
Expand All @@ -72,10 +58,24 @@ where TConsensusSpec: ConsensusSpec
return Ok(());
}

let foreign_committee_info = self
.epoch_manager
.get_committee_info_by_validator_public_key(proposal.block.epoch(), proposal.block.proposed_by())
.await?;
self.store
.with_write_tx(|tx| self.validate_and_save(tx, proposal, local_committee_info, &foreign_committee_info))?;
Ok(())
}

pub fn validate_and_save(
&self,
tx: &mut <TConsensusSpec::StateStore as StateStore>::WriteTransaction<'_>,
proposal: ForeignProposal,
local_committee_info: &CommitteeInfo,
foreign_committee_info: &CommitteeInfo,
) -> Result<(), HotStuffError> {
// TODO: validate justify_qc
let mut foreign_receive_counter = self
.store
.with_read_tx(|tx| ForeignReceiveCounters::get_or_default(tx))?;
let mut foreign_receive_counter = ForeignReceiveCounters::get_or_default(&**tx)?;

if let Err(err) = self.validate_proposed_block(
proposal.block(),
Expand Down Expand Up @@ -107,15 +107,13 @@ where TConsensusSpec: ConsensusSpec

info!(
target: LOG_TARGET,
"🧩 Receive FOREIGN PROPOSAL for block {}, justify_qc: {}",
"🧩 Receive FOREIGN PROPOSAL {}, justify_qc: {}",
proposal.block(),
proposal.justify_qc(),
);

self.store.with_write_tx(|tx| {
foreign_receive_counter.save(tx)?;
proposal.upsert(tx, None)
})?;
foreign_receive_counter.save(tx)?;
proposal.upsert(tx, None)?;

// Foreign proposals to propose
self.pacemaker.on_beat();
Expand Down
Loading

0 comments on commit 7f41097

Please sign in to comment.