Skip to content

Commit

Permalink
wip: Add PolkaCertificate
Browse files Browse the repository at this point in the history
  • Loading branch information
romac committed Feb 18, 2025
1 parent 8b5aa01 commit f6b9dae
Show file tree
Hide file tree
Showing 27 changed files with 409 additions and 118 deletions.
2 changes: 1 addition & 1 deletion code/crates/core-consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ all-features = true

[features]
default = ["std", "metrics"]
std = []
std = ["malachitebft-core-driver/std"]
metrics = ["std", "dep:malachitebft-metrics"]
debug = ["std", "malachitebft-core-driver/debug"]

Expand Down
1 change: 1 addition & 0 deletions code/crates/core-consensus/src/effect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ where
Ctx::Height,
Round,
VoteSet<Ctx>,
Option<PolkaCertificate<Ctx>>,
resume::Continue,
),

Expand Down
4 changes: 2 additions & 2 deletions code/crates/core-consensus/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ where
Input::VoteSetRequest(request_id, height, round) => {
on_vote_set_request(co, state, metrics, request_id, height, round).await
}
Input::VoteSetResponse(vote_set) => {
on_vote_set_response(co, state, metrics, vote_set).await
Input::VoteSetResponse(vote_set, polka_certificate) => {
on_vote_set_response(co, state, metrics, vote_set, polka_certificate).await
}
}
}
2 changes: 1 addition & 1 deletion code/crates/core-consensus/src/handle/decide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ where
// Look for an existing certificate
let (certificate, extensions) = state
.driver
.get_certificate(proposal_round, value.id())
.get_commit_certificate(proposal_round, value.id())
.cloned()
.map(|certificate| (certificate, VoteExtensions::default()))
.unwrap_or_else(|| {
Expand Down
14 changes: 13 additions & 1 deletion code/crates/core-consensus/src/handle/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,19 @@ where
DriverInput::CommitCertificate(certificate) => {
if certificate.height != state.driver.height() {
warn!(
"Ignoring certificate for height {}, current height: {}",
"Ignoring commit certificate for height {}, current height: {}",
certificate.height,
state.driver.height()
);

return Ok(());
}
}

DriverInput::PolkaCertificate(certificate) => {
if certificate.height != state.driver.height() {
warn!(
"Ignoring polka certificate for height {}, current height: {}",
certificate.height,
state.driver.height()
);
Expand Down
37 changes: 27 additions & 10 deletions code/crates/core-consensus/src/handle/vote_set.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::handle::driver::apply_driver_input;
use crate::handle::vote::on_vote;
use crate::input::RequestId;
use crate::prelude::*;
Expand All @@ -15,14 +16,17 @@ where
{
debug!(%height, %round, %request_id, "Received vote set request, retrieve the votes and send response if set is not empty");

let votes = state.restore_votes(height, round);

if !votes.is_empty() {
let vote_set = VoteSet::new(votes);

if let Some((votes, polka_certificate)) = state.restore_votes(height, round) {
perform!(
co,
Effect::SendVoteSetResponse(request_id, height, round, vote_set, Default::default())
Effect::SendVoteSetResponse(
request_id,
height,
round,
VoteSet::new(votes),
polka_certificate,
Default::default()
)
);
}

Expand All @@ -33,18 +37,31 @@ pub async fn on_vote_set_response<Ctx>(
co: &Co<Ctx>,
state: &mut State<Ctx>,
metrics: &Metrics,
response: VoteSet<Ctx>,
vote_set: VoteSet<Ctx>,
polka_certificate: Option<PolkaCertificate<Ctx>>,
) -> Result<(), Error<Ctx>>
where
Ctx: Context,
{
debug!(
height = %state.height(), round = %state.round(), votes.count = %response.len(),
height = %state.height(), round = %state.round(),
votes.count = %vote_set.len(),
polka.certificate = %polka_certificate.is_some(),
"Received vote set response"
);

for vote in response.votes {
let _ = on_vote(co, state, metrics, vote).await;
if let Some(certificate) = polka_certificate {
apply_driver_input(
co,
state,
metrics,
DriverInput::PolkaCertificate(certificate),
)
.await?;
} else {
for vote in vote_set.votes {
on_vote(co, state, metrics, vote).await?;
}
}

Ok(())
Expand Down
5 changes: 3 additions & 2 deletions code/crates/core-consensus/src/input.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use derive_where::derive_where;
use malachitebft_core_types::{
CommitCertificate, Context, Round, SignedProposal, SignedVote, Timeout, ValueOrigin, VoteSet,
CommitCertificate, Context, PolkaCertificate, Round, SignedProposal, SignedVote, Timeout,
ValueOrigin, VoteSet,
};

use crate::types::ProposedValue;
Expand Down Expand Up @@ -40,5 +41,5 @@ where
VoteSetRequest(RequestId, Ctx::Height, Round),

/// Vote set to be sent to peer
VoteSetResponse(VoteSet<Ctx>),
VoteSetResponse(VoteSet<Ctx>, Option<PolkaCertificate<Ctx>>),
}
27 changes: 20 additions & 7 deletions code/crates/core-consensus/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,30 @@ where
.collect()
}

pub fn restore_votes(&mut self, height: Ctx::Height, round: Round) -> Vec<SignedVote<Ctx>> {
// TODO optimization - get votes for all rounds higher than or equal to `round`
#[allow(clippy::type_complexity)]
pub fn restore_votes(
&mut self,
height: Ctx::Height,
round: Round,
) -> Option<(Vec<SignedVote<Ctx>>, Option<PolkaCertificate<Ctx>>)> {
// TODO: optimization - get votes for all rounds higher than or equal to `round`
if height != self.driver.height() {
return vec![];
return None;
}

if let Some(per_round) = self.driver.votes().per_round(round) {
per_round.received_votes().iter().cloned().collect()
} else {
vec![]
let per_round = self.driver.votes().per_round(round)?;
let votes = per_round
.received_votes()
.iter()
.cloned()
.collect::<Vec<_>>();

if votes.is_empty() {
return None;
}

let polka_certificate = self.driver.get_polka_certificate(round).cloned();
Some((votes, polka_certificate))
}

pub fn full_proposal_at_round_and_value(
Expand Down
95 changes: 81 additions & 14 deletions code/crates/core-driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ use malachitebft_core_state_machine::output::Output as RoundOutput;
use malachitebft_core_state_machine::state::{RoundValue, State as RoundState, Step};
use malachitebft_core_state_machine::state_machine::Info;
use malachitebft_core_types::{
CommitCertificate, Context, Proposal, Round, SignedProposal, SignedVote, Timeout, TimeoutKind,
Validator, ValidatorSet, Validity, ValueId, Vote, VoteType,
CommitCertificate, Context, NilOrVal, PolkaCertificate, Proposal, Round, SignedProposal,
SignedVote, Timeout, TimeoutKind, Validator, ValidatorSet, Validity, Value, ValueId, Vote,
VoteType,
};
use malachitebft_core_votekeeper::keeper::Output as VKOutput;
use malachitebft_core_votekeeper::keeper::VoteKeeper;

use crate::input::Input;
Expand Down Expand Up @@ -46,8 +48,11 @@ where
/// The vote keeper.
pub(crate) vote_keeper: VoteKeeper<Ctx>,

/// The certificate keeper
pub(crate) certificates: Vec<CommitCertificate<Ctx>>,
/// The commit certificates
pub(crate) commit_certificates: Vec<CommitCertificate<Ctx>>,

/// The polka certificates
pub(crate) polka_certificates: Vec<PolkaCertificate<Ctx>>,

/// The state of the round state machine.
pub(crate) round_state: RoundState<Ctx>,
Expand Down Expand Up @@ -89,7 +94,8 @@ where
round_state,
proposer: None,
pending_inputs: vec![],
certificates: vec![],
commit_certificates: vec![],
polka_certificates: vec![],
last_prevote: None,
last_precommit: None,
}
Expand All @@ -112,7 +118,8 @@ where
self.vote_keeper = vote_keeper;
self.round_state = round_state;
self.pending_inputs = vec![];
self.certificates = vec![];
self.commit_certificates = vec![];
self.polka_certificates = vec![];
self.last_prevote = None;
self.last_precommit = None;
}
Expand Down Expand Up @@ -197,16 +204,24 @@ where
}

/// Get a commit certificate for the given round and value id.
pub fn get_certificate(
pub fn get_commit_certificate(
&self,
round: Round,
value_id: ValueId<Ctx>,
) -> Option<&CommitCertificate<Ctx>> {
self.certificates
self.commit_certificates
.iter()
.find(|c| c.round == round && c.value_id == value_id)
}

/// Get a polka certificate for the given round.
pub fn get_polka_certificate(&self, round: Round) -> Option<&PolkaCertificate<Ctx>> {
self.polka_certificates
.iter()
.filter(|c| c.round <= round)
.last()
}

/// Store the last vote that we have cast
fn set_last_vote_cast(&mut self, vote: &Ctx::Vote) {
assert_eq!(vote.height(), self.height());
Expand Down Expand Up @@ -274,18 +289,28 @@ where
// - That vote is for a higher height than our last vote
// - That vote is for a higher round than our last vote
// - That vote is the same as our last vote
// Precommits have the additional constraint that the value must match the valid value
let can_vote = match vote.vote_type() {
VoteType::Prevote => self.last_prevote.as_ref().map_or(true, |prev| {
prev.height() < vote.height() || prev.round() < vote.round() || prev == &vote
}),
VoteType::Precommit => self.last_precommit.as_ref().map_or(true, |prev| {
prev.height() < vote.height() || prev.round() < vote.round() || prev == &vote
}),
VoteType::Precommit => {
let good_precommit = self.last_precommit.as_ref().map_or(true, |prev| {
prev.height() < vote.height() || prev.round() < vote.round() || prev == &vote
});
let match_valid = self.round_state.valid.as_ref().map_or(true, |valid| {
if let NilOrVal::Val(value_id) = vote.value() {
&valid.value.id() == value_id
} else {
true
}
});
good_precommit && match_valid
}
};

if can_vote {
self.set_last_vote_cast(&vote);

outputs.push(Output::Vote(vote));
}
}
Expand All @@ -294,6 +319,7 @@ where
fn apply(&mut self, input: Input<Ctx>) -> Result<Option<RoundOutput<Ctx>>, Error<Ctx>> {
match input {
Input::CommitCertificate(certificate) => self.apply_commit_certificate(certificate),
Input::PolkaCertificate(certificate) => self.apply_polka_certificate(certificate),
Input::NewRound(height, round, proposer) => {
self.apply_new_round(height, round, proposer)
}
Expand All @@ -317,12 +343,29 @@ where

let round = certificate.round;

match self.store_and_multiplex_certificate(certificate) {
match self.store_and_multiplex_commit_certificate(certificate) {
Some(round_input) => self.apply_input(round, round_input),
None => Ok(None),
}
}

fn apply_polka_certificate(
&mut self,
certificate: PolkaCertificate<Ctx>,
) -> Result<Option<RoundOutput<Ctx>>, Error<Ctx>> {
if self.height() != certificate.height {
return Err(Error::InvalidCertificateHeight {
certificate_height: certificate.height,
consensus_height: self.height(),
});
}

match dbg!(self.store_and_multiplex_polka_certificate(certificate)) {
Some(round_input) => dbg!(self.apply_input(self.round(), round_input)),
None => Ok(None),
}
}

fn apply_new_round(
&mut self,
height: Ctx::Height,
Expand Down Expand Up @@ -395,15 +438,39 @@ where
return Ok(None);
};

let round_input = self.multiplex_vote_threshold(output, vote_round);
if let VKOutput::PolkaValue(val) = &output {
self.store_polka_certificate(vote_round, val);
}

let round_input = self.multiplex_vote_threshold(output, vote_round);
if round_input == RoundInput::NoInput {
return Ok(None);
}

self.apply_input(vote_round, round_input)
}

fn store_polka_certificate(&mut self, vote_round: Round, value_id: &ValueId<Ctx>) {
let Some(per_round) = self.vote_keeper.per_round(vote_round) else {
return;
};

self.polka_certificates.push(PolkaCertificate {
height: self.height(),
round: vote_round,
value_id: value_id.clone(),
votes: per_round
.received_votes()
.iter()
.filter(|v| {
v.vote_type() == VoteType::Prevote
&& v.value().as_ref() == NilOrVal::Val(value_id)
})
.cloned()
.collect(),
})
}

fn apply_timeout(&mut self, timeout: Timeout) -> Result<Option<RoundOutput<Ctx>>, Error<Ctx>> {
let input = match timeout.kind {
TimeoutKind::Propose => RoundInput::TimeoutPropose,
Expand Down
6 changes: 5 additions & 1 deletion code/crates/core-driver/src/input.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use malachitebft_core_types::{
CommitCertificate, Context, Round, SignedProposal, SignedVote, Timeout, Validity,
CommitCertificate, Context, PolkaCertificate, Round, SignedProposal, SignedVote, Timeout,
Validity,
};

use derive_where::derive_where;
Expand All @@ -25,6 +26,9 @@ where
/// Received a commit certificate
CommitCertificate(CommitCertificate<Ctx>),

/// Received a polka certificate
PolkaCertificate(PolkaCertificate<Ctx>),

/// Receive a timeout
TimeoutElapsed(Timeout),
}
Loading

0 comments on commit f6b9dae

Please sign in to comment.