Skip to content

Commit

Permalink
reduce/simplify prepare phase evidence, quicker epoch change
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Sep 20, 2024
1 parent a7c7b4f commit 219fe8a
Show file tree
Hide file tree
Showing 13 changed files with 113 additions and 106 deletions.
12 changes: 0 additions & 12 deletions dan_layer/common_types/src/committee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,18 +259,6 @@ impl CommitteeInfo {
.into_iter()
.filter(|substate_address| self.includes_substate_address(substate_address.borrow()))
}

/// Calculates the number of distinct shard groups for the given addresses
pub fn count_distinct_shard_groups<B: Borrow<SubstateAddress>, I: IntoIterator<Item = B>>(
&self,
addresses: I,
) -> usize {
addresses
.into_iter()
.map(|addr| addr.borrow().to_shard_group(self.num_shards, self.num_committees))
.collect::<std::collections::HashSet<_>>()
.len()
}
}

#[derive(Debug, Clone, Serialize)]
Expand Down
16 changes: 5 additions & 11 deletions dan_layer/common_types/src/versioned_substate_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ impl SubstateRequirement {
.map(|v| SubstateAddress::from_substate_id(self.substate_id(), v))
}

pub fn to_substate_address_zero_version(&self) -> SubstateAddress {
SubstateAddress::from_substate_id(self.substate_id(), 0)
}

/// Calculates and returns the shard number that this SubstateAddress belongs.
/// A shard is a fixed division of the 256-bit shard space.
/// If the substate version is not known, None is returned.
Expand Down Expand Up @@ -118,7 +122,7 @@ impl Display for SubstateRequirement {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.version {
Some(v) => write!(f, "{}:{}", self.substate_id, v),
None => write!(f, "{}", self.substate_id),
None => write!(f, "{}:?", self.substate_id),
}
}
}
Expand Down Expand Up @@ -180,16 +184,6 @@ impl VersionedSubstateId {
self.version
}

/// Calculates and returns the shard number that this SubstateAddress belongs.
/// A shard is an equal division of the 256-bit shard space.
pub fn to_shard(&self, num_shards: NumPreshards) -> Shard {
self.to_substate_address().to_shard(num_shards)
}

pub fn to_shard_group(&self, num_shards: NumPreshards, num_committees: u32) -> ShardGroup {
self.to_substate_address().to_shard_group(num_shards, num_committees)
}

pub fn to_previous_version(&self) -> Option<Self> {
self.version
.checked_sub(1)
Expand Down
8 changes: 7 additions & 1 deletion dan_layer/consensus/src/hotstuff/block_change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ pub struct BlockDecision {
pub end_of_epoch: Option<Epoch>,
}

impl BlockDecision {
pub fn is_accept(&self) -> bool {
matches!(self.quorum_decision, Some(QuorumDecision::Accept))
}
}

#[derive(Debug, Clone)]
pub struct ProposedBlockChangeSet {
block: LeafBlock,
Expand Down Expand Up @@ -222,7 +228,7 @@ impl ProposedBlockChangeSet {
.entry(*transaction.transaction_id())
.or_default();

let ready_now = transaction.is_ready_for_next_stage();
let ready_now = transaction.is_ready_for_pending_stage();
change_mut.next_update = Some(TransactionPoolStatusUpdate::new(transaction, ready_now));
Ok(self)
}
Expand Down
16 changes: 12 additions & 4 deletions dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ pub fn process_foreign_block<TTx: StateStoreReadTransaction>(
for cmd in block.commands() {
match cmd {
Command::LocalPrepare(atom) => {
if !local_committee_info.includes_any_address(atom.evidence.substate_addresses_iter()) {
if atom
.evidence
.shard_groups_iter()
.all(|sg| *sg != local_committee_info.shard_group())
{
debug!(
target: LOG_TARGET,
"🧩 FOREIGN PROPOSAL: Command: LocalPrepare({}, {}), block: {} not relevant to local committee",
Expand Down Expand Up @@ -213,7 +217,11 @@ pub fn process_foreign_block<TTx: StateStoreReadTransaction>(
}
},
Command::LocalAccept(atom) => {
if !local_committee_info.includes_any_address(atom.evidence.substate_addresses_iter()) {
if atom
.evidence
.shard_groups_iter()
.all(|sg| *sg != local_committee_info.shard_group())
{
continue;
}

Expand Down Expand Up @@ -325,7 +333,7 @@ pub fn process_foreign_block<TTx: StateStoreReadTransaction>(
}
proposed_block_change_set.set_next_transaction_update(tx_rec)?;
}
} else if tx_rec.current_stage().is_local_prepared() && tx_rec.is_ready_for_next_stage() {
} else if tx_rec.current_stage().is_local_prepared() && tx_rec.is_ready_for_pending_stage() {
info!(
target: LOG_TARGET,
"🧩 FOREIGN PROPOSAL: Transaction is ready for propose ALL_PREPARED({}, {}) Local Stage: {}",
Expand All @@ -336,7 +344,7 @@ pub fn process_foreign_block<TTx: StateStoreReadTransaction>(

tx_rec.set_next_stage(TransactionPoolStage::LocalPrepared)?;
proposed_block_change_set.set_next_transaction_update(tx_rec)?;
} else if tx_rec.current_stage().is_local_accepted() && tx_rec.is_ready_for_next_stage() {
} else if tx_rec.current_stage().is_local_accepted() && tx_rec.is_ready_for_pending_stage() {
info!(
target: LOG_TARGET,
"🧩 FOREIGN PROPOSAL: Transaction is ready for propose ALL_ACCEPT({}, {}) Local Stage: {}",
Expand Down
15 changes: 6 additions & 9 deletions dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ where TConsensusSpec: ConsensusSpec
// Leader thinks that all local nodes agree that all shard groups have prepared, we are ready to accept
// locally
TransactionPoolStage::AllPrepared => Ok(Some(Command::LocalAccept(
self.get_transaction_atom_with_leader_fee(local_committee_info, &mut tx_rec)?,
self.get_transaction_atom_with_leader_fee(&mut tx_rec)?,
))),
// Leader thinks local nodes are ready to accept an ABORT
TransactionPoolStage::SomePrepared => Ok(Some(Command::LocalAccept(tx_rec.get_current_transaction_atom()))),
Expand Down Expand Up @@ -749,10 +749,9 @@ where TConsensusSpec: ConsensusSpec
// foreign inputs/outputs.
tx_rec.set_local_decision(Decision::Commit);
// Set partial evidence using local inputs and known outputs.
tx_rec.evidence_mut().update(&multishard.to_initial_evidence(
local_committee_info.num_preshards(),
local_committee_info.num_committees(),
));
tx_rec
.evidence_mut()
.update(&multishard.to_initial_evidence(local_committee_info));
}
},
Decision::Abort => {
Expand Down Expand Up @@ -871,18 +870,16 @@ where TConsensusSpec: ConsensusSpec
*tx_rec.transaction_id(),
&filter_diff_for_committee(local_committee_info, diff),
)?;
let atom = self.get_transaction_atom_with_leader_fee(local_committee_info, tx_rec)?;
let atom = self.get_transaction_atom_with_leader_fee(tx_rec)?;
Ok(Some(Command::AllAccept(atom)))
}

fn get_transaction_atom_with_leader_fee(
&self,
local_committee_info: &CommitteeInfo,
tx_rec: &mut TransactionPoolRecord,
) -> Result<TransactionAtom, HotStuffError> {
if tx_rec.current_decision().is_commit() {
let num_involved_shard_groups =
local_committee_info.count_distinct_shard_groups(tx_rec.evidence().substate_addresses_iter());
let num_involved_shard_groups = tx_rec.evidence().num_shard_groups();
let involved = NonZeroU64::new(num_involved_shard_groups as u64).ok_or_else(|| {
HotStuffError::InvariantError(format!(
"PROPOSE: Transaction {} involves zero shard groups",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,9 @@ where TConsensusSpec: ConsensusSpec
}

if !pool_tx.is_ready() {
if pool_tx.current_stage().is_local_prepared() && pool_tx.is_ready_for_next_stage() {
if pool_tx.current_stage().is_local_prepared() && pool_tx.is_ready_for_pending_stage() {
pool_tx.set_next_stage(TransactionPoolStage::LocalPrepared)?;
} else if pool_tx.current_stage().is_local_accepted() && pool_tx.is_ready_for_next_stage() {
} else if pool_tx.current_stage().is_local_accepted() && pool_tx.is_ready_for_pending_stage() {
pool_tx.set_next_stage(TransactionPoolStage::LocalAccepted)?;
} else {
// Nothing
Expand Down Expand Up @@ -378,14 +378,9 @@ where TConsensusSpec: ConsensusSpec
}
},
Command::LocalAccept(atom) => {
if let Some(reason) = self.evaluate_local_accept_command(
tx,
block,
&locked_block,
atom,
local_committee_info,
proposed_block_change_set,
)? {
if let Some(reason) =
self.evaluate_local_accept_command(tx, block, &locked_block, atom, proposed_block_change_set)?
{
proposed_block_change_set.no_vote(reason);
return Ok(());
}
Expand Down Expand Up @@ -793,10 +788,9 @@ where TConsensusSpec: ConsensusSpec
// foreign inputs/outputs.
tx_rec.set_local_decision(Decision::Commit);
// Set partial evidence for local inputs using what we know.
tx_rec.evidence_mut().update(&multishard.to_initial_evidence(
local_committee_info.num_preshards(),
local_committee_info.num_committees(),
));
tx_rec
.evidence_mut()
.update(&multishard.to_initial_evidence(local_committee_info));
}
},
Decision::Abort => {
Expand Down Expand Up @@ -1160,7 +1154,6 @@ where TConsensusSpec: ConsensusSpec
block: &Block,
locked_block: &LockedBlock,
atom: &TransactionAtom,
local_committee_info: &CommitteeInfo,
proposed_block_change_set: &mut ProposedBlockChangeSet,
) -> Result<Option<NoVoteReason>, HotStuffError> {
let Some(mut tx_rec) =
Expand Down Expand Up @@ -1233,8 +1226,7 @@ where TConsensusSpec: ConsensusSpec

// Check the leader fee in the local accept phase. The fee only applied (is added to the block fee) for
// AllAccept
let num_involved_shard_groups =
local_committee_info.count_distinct_shard_groups(tx_rec.evidence().substate_addresses_iter());
let num_involved_shard_groups = tx_rec.evidence().num_shard_groups();
let involved = NonZeroU64::new(num_involved_shard_groups as u64)
.ok_or_else(|| HotStuffError::InvariantError("Number of involved shard groups is 0".to_string()))?;
let calculated_leader_fee = tx_rec.calculate_leader_fee(involved, EXHAUST_DIVISOR);
Expand Down
11 changes: 8 additions & 3 deletions dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveLocalProposalHandler<TConsensusSpec
) -> Result<(), HotStuffError> {
let em_epoch = self.epoch_manager.current_epoch().await?;
let can_propose_epoch_end = em_epoch > current_epoch;
let is_epoch_end = valid_block.block().is_epoch_end();

let mut on_ready_to_vote_on_local_block = self.on_ready_to_vote_on_local_block.clone();
let (block_decision, valid_block) = task::spawn_blocking({
Expand All @@ -245,6 +246,7 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveLocalProposalHandler<TConsensusSpec
})
.await??;

let is_accept_decision = block_decision.is_accept();
if let Some(decision) = block_decision.quorum_decision {
self.pacemaker
.update_view(valid_block.epoch(), valid_block.height(), high_qc.block_height())
Expand Down Expand Up @@ -308,7 +310,6 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveLocalProposalHandler<TConsensusSpec
// TODO: We should exit consensus to sync for the epoch - when this is implemented, we will not
// need to create the genesis, set the pacemaker, etc.
self.pacemaker.set_epoch(next_epoch).await?;
self.pacemaker.on_beat();
} else {
info!(
target: LOG_TARGET,
Expand All @@ -317,6 +318,11 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveLocalProposalHandler<TConsensusSpec
}
}

// Propose quickly for the end of epoch chain
if is_accept_decision && is_epoch_end {
self.pacemaker.on_beat();
}

Ok(())
}

Expand Down Expand Up @@ -763,8 +769,7 @@ async fn broadcast_foreign_proposal_if_required<TConsensusSpec: ConsensusSpec>(
.filter(|atom| !atom.evidence.is_committee_output_only(local_committee_info))
.or_else(|| c.local_accept())
})
.flat_map(|p| p.evidence.substate_addresses_iter())
.map(|addr| addr.to_shard_group(num_preshards, num_committees))
.flat_map(|p| p.evidence.shard_groups_iter().copied())
.filter(|shard_group| local_shard_group != *shard_group)
.collect::<HashSet<_>>();
if non_local_shard_groups.is_empty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl<TStateStore: StateStore, TExecutor: BlockTransactionExecutor<TStateStore>>
Ok(executed)
}

fn execute_or_fetch(
pub fn execute_or_fetch(
&self,
store: &mut PendingSubstateStore<TStateStore>,
transaction: Transaction,
Expand Down
51 changes: 26 additions & 25 deletions dan_layer/consensus/src/hotstuff/transaction_manager/prepared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::collections::HashSet;

use indexmap::IndexMap;
use tari_dan_common_types::{NumPreshards, SubstateRequirement, VersionedSubstateId};
use tari_dan_common_types::{committee::CommitteeInfo, SubstateRequirement, VersionedSubstateId};
use tari_dan_storage::consensus_models::{Decision, Evidence, TransactionExecution, VersionedSubstateIdLockIntent};

use crate::hotstuff::substate_store::LockStatus;
Expand Down Expand Up @@ -101,50 +101,51 @@ impl MultiShardPreparedTransaction {
&self.local_inputs
}

pub fn outputs(&self) -> &HashSet<VersionedSubstateId> {
pub fn known_outputs(&self) -> &HashSet<VersionedSubstateId> {
&self.outputs
}

pub fn into_execution(self) -> Option<TransactionExecution> {
self.execution
}

pub fn to_initial_evidence(&self, num_preshards: NumPreshards, num_committees: u32) -> Evidence {
// if let Some(ref execution) = self.execution {
// return Evidence::from_inputs_and_outputs(execution.resolved_inputs(), execution.resulting_outputs());
// }
//
// // CASE: One or more local inputs are not found, so the transaction is aborted.
// if self.current_decision().is_abort() {
// return Evidence::from_inputs_and_outputs(
// self.execution
// .transaction()
// .all_inputs_iter()
// .map(|input| VersionedSubstateIdLockIntent::from_requirement(input, SubstateLockType::Read)),
// self.outputs
// .iter()
// .map(|id| VersionedSubstateIdLockIntent::output(id.clone())),
// );
// }

pub fn to_initial_evidence(&self, local_committee_info: &CommitteeInfo) -> Evidence {
// TODO: We do not know if the inputs locks required are Read/Write. Either we allow the user to
// specify this or we can correct the locks after execution. Currently, this limitation
// prevents concurrent multi-shard read locks.
let inputs = self
.local_inputs()
.iter()
.map(|(requirement, version)| VersionedSubstateId::new(requirement.substate_id.clone(), *version))
// TODO(correctness): to_zero_version is error prone when used in evidence and the correctness depends how it is used.
// e.g. using it to determining which shard is involved is fine, but loading substate by the address is incorrect (v0 may or may not be the actual pledged substate)
.chain(self.foreign_inputs().iter().map(|r| r.clone().or_zero_version()))
.map(|id| VersionedSubstateIdLockIntent::write(id, true));

let outputs = self
.outputs()
.known_outputs()
.iter()
.cloned()
.map(VersionedSubstateIdLockIntent::output);

Evidence::from_inputs_and_outputs(num_preshards, num_committees, inputs, outputs)
let mut evidence = Evidence::from_inputs_and_outputs(
local_committee_info.num_preshards(),
local_committee_info.num_committees(),
inputs,
outputs,
);

// Add foreign involved shard groups without adding any substates (because we do not know the pledged version
// yet)
self.foreign_inputs()
.iter()
.map(|r| {
r.to_substate_address_zero_version().to_shard_group(
local_committee_info.num_preshards(),
local_committee_info.num_committees(),
)
})
.for_each(|sg| {
evidence.add_shard_group(sg);
});

evidence
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,13 @@ impl<TStateStore: StateStore> BlockTransactionExecutor<TStateStore> for TestBloc
.inputs
.into_iter()
.map(|spec| {
let substate = resolved_inputs[spec.substate_requirement()].clone();
let substate = resolved_inputs.get(spec.substate_requirement()).unwrap_or_else(|| {
panic!(
"Missing input substate for transaction {} with requirement {}",
id,
spec.substate_requirement()
)
});
VersionedSubstateIdLockIntent::new(
VersionedSubstateId::new(spec.substate_id().clone(), substate.version()),
spec.lock_type(),
Expand Down
4 changes: 2 additions & 2 deletions dan_layer/storage/src/consensus_models/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,8 @@ impl Block {
.filter_map(|cmd| cmd.transaction())
.filter(|t| {
t.evidence
.substate_addresses_iter()
.any(|addr| committee_info.includes_substate_address(addr))
.shard_groups_iter()
.any(|sg| *sg == committee_info.shard_group())
})
.map(|t| t.id())
}
Expand Down
Loading

0 comments on commit 219fe8a

Please sign in to comment.