Skip to content

Commit

Permalink
chore(sequencing): remove ConsensusMessage from context
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware committed Dec 19, 2024
1 parent d32e83f commit c667343
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 146 deletions.
4 changes: 2 additions & 2 deletions crates/papyrus_node/src/bin/run_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use clap::Parser;
use futures::stream::StreamExt;
use papyrus_consensus::config::ConsensusConfig;
use papyrus_consensus::simulation_network_receiver::NetworkReceiver;
use papyrus_consensus::types::BroadcastConsensusMessageChannel;
use papyrus_consensus::types::BroadcastVoteChannel;
use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::NetworkManager;
Expand Down Expand Up @@ -80,7 +80,7 @@ fn build_consensus(
test_config.drop_probability,
test_config.invalid_probability,
);
let broadcast_channels = BroadcastConsensusMessageChannel {
let broadcast_channels = BroadcastVoteChannel {
broadcasted_messages_receiver: Box::new(network_receiver),
broadcast_topic_client: network_channels.broadcast_topic_client,
};
Expand Down
32 changes: 11 additions & 21 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,14 @@ use futures::{Stream, StreamExt};
use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT};
use papyrus_network::network_manager::BroadcastTopicClientTrait;
use papyrus_network_types::network_types::BroadcastedMessageMetadata;
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit};
use papyrus_protobuf::consensus::{ProposalInit, Vote};
use papyrus_protobuf::converters::ProtobufConversionError;
use starknet_api::block::BlockNumber;
use tracing::{debug, info, instrument};

use crate::config::TimeoutsConfig;
use crate::single_height_consensus::{ShcReturn, SingleHeightConsensus};
use crate::types::{
BroadcastConsensusMessageChannel,
ConsensusContext,
ConsensusError,
Decision,
ValidatorId,
};
use crate::types::{BroadcastVoteChannel, ConsensusContext, ConsensusError, Decision, ValidatorId};

/// Run consensus indefinitely.
///
Expand Down Expand Up @@ -62,7 +56,7 @@ pub async fn run_consensus<ContextT, SyncReceiverT>(
validator_id: ValidatorId,
consensus_delay: Duration,
timeouts: TimeoutsConfig,
mut vote_receiver: BroadcastConsensusMessageChannel,
mut vote_receiver: BroadcastVoteChannel,
mut proposal_receiver: mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
mut sync_receiver: SyncReceiverT,
) -> Result<(), ConsensusError>
Expand Down Expand Up @@ -130,7 +124,7 @@ type ProposalReceiverTuple<T> = (ProposalInit, mpsc::Receiver<T>);
#[derive(Debug, Default)]
struct MultiHeightManager<ContextT: ConsensusContext> {
validator_id: ValidatorId,
cached_messages: BTreeMap<u64, Vec<ConsensusMessage>>,
cached_messages: BTreeMap<u64, Vec<Vote>>,
// Mapping: { Height : { Round : (Init, Receiver)}}
cached_proposals: BTreeMap<u64, BTreeMap<u32, ProposalReceiverTuple<ContextT::ProposalPart>>>,
timeouts: TimeoutsConfig,
Expand Down Expand Up @@ -167,7 +161,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
context: &mut ContextT,
height: BlockNumber,
must_observer: bool,
broadcast_channels: &mut BroadcastConsensusMessageChannel,
broadcast_channels: &mut BroadcastVoteChannel,
proposal_receiver: &mut mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
sync_receiver: &mut SyncReceiverT,
) -> Result<RunHeightRes, ConsensusError>
Expand Down Expand Up @@ -299,11 +293,8 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus,
message: Option<(
Result<ConsensusMessage, ProtobufConversionError>,
BroadcastedMessageMetadata,
)>,
broadcast_channels: &mut BroadcastConsensusMessageChannel,
message: Option<(Result<Vote, ProtobufConversionError>, BroadcastedMessageMetadata)>,
broadcast_channels: &mut BroadcastVoteChannel,
) -> Result<ShcReturn, ConsensusError> {
let message = match message {
None => Err(ConsensusError::InternalNetworkError(
Expand All @@ -326,14 +317,13 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
// 1. Malicious - must be capped so a malicious peer can't DoS us.
// 2. Parallel proposals - we may send/receive a proposal for (H+1, 0).
// In general I think we will want to only cache (H+1, 0) messages.
if message.height() != height.0 {
if message.height != height.0 {
debug!("Received a message for a different height. {:?}", message);
if message.height() > height.0 {
self.cached_messages.entry(message.height()).or_default().push(message);
if message.height > height.0 {
self.cached_messages.entry(message.height).or_default().push(message);
}
return Ok(ShcReturn::Tasks(Vec::new()));
}

shc.handle_message(context, message).await
}

Expand Down Expand Up @@ -367,7 +357,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
// - returns all of the current height messages.
// - drops messages from earlier heights.
// - retains future messages in the cache.
fn get_current_height_messages(&mut self, height: BlockNumber) -> Vec<ConsensusMessage> {
fn get_current_height_messages(&mut self, height: BlockNumber) -> Vec<Vote> {
// Depends on `cached_messages` being sorted by height.
loop {
let Some(entry) = self.cached_messages.first_entry() else {
Expand Down
10 changes: 3 additions & 7 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@ use papyrus_network::network_manager::test_utils::{
TestSubscriberChannels,
};
use papyrus_network_types::network_types::BroadcastedMessageMetadata;
use papyrus_protobuf::consensus::{
ConsensusMessage,
ProposalFin,
DEFAULT_VALIDATOR_ID,
};
use papyrus_protobuf::consensus::{ProposalFin, Vote, DEFAULT_VALIDATOR_ID};
use papyrus_test_utils::{get_rng, GetTestInstance};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_types_core::felt::Felt;
Expand All @@ -41,7 +37,7 @@ lazy_static! {

const CHANNEL_SIZE: usize = 10;

async fn send(sender: &mut MockBroadcastedMessagesSender<ConsensusMessage>, msg: ConsensusMessage) {
async fn send(sender: &mut MockBroadcastedMessagesSender<Vote>, msg: Vote) {
let broadcasted_message_metadata =
BroadcastedMessageMetadata::get_test_instance(&mut get_rng());
sender.send((msg, broadcasted_message_metadata)).await.unwrap();
Expand Down Expand Up @@ -324,7 +320,7 @@ async fn test_timeouts() {
context
.expect_broadcast()
.times(1)
.withf(move |msg: &ConsensusMessage| msg == &prevote(None, 1, 1, *VALIDATOR_ID))
.withf(move |msg: &Vote| msg == &prevote(None, 1, 1, *VALIDATOR_ID))
.return_once(move |_| {
timeout_send.send(()).unwrap();
Ok(())
Expand Down
30 changes: 10 additions & 20 deletions crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::time::Duration;
#[cfg(test)]
use enum_as_inner::EnumAsInner;
use futures::channel::{mpsc, oneshot};
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalFin, ProposalInit, Vote, VoteType};
use papyrus_protobuf::consensus::{ProposalFin, ProposalInit, Vote, VoteType};
use starknet_api::block::BlockNumber;
use tracing::{debug, info, instrument, trace, warn};

Expand Down Expand Up @@ -268,19 +268,13 @@ impl SingleHeightConsensus {
pub(crate) async fn handle_message<ContextT: ConsensusContext>(
&mut self,
context: &mut ContextT,
message: ConsensusMessage,
message: Vote,
) -> Result<ShcReturn, ConsensusError> {
debug!("Received message: {:?}", message);
match message {
ConsensusMessage::Proposal(_) => {
unimplemented!("Proposals should use `handle_proposal` due to fake streaming")
}
ConsensusMessage::Vote(vote) => {
let ret = self.handle_vote(context, vote).await;
context.set_height_and_round(self.height, self.state_machine.round()).await;
ret
}
}

let ret = self.handle_vote(context, message).await;
context.set_height_and_round(self.height, self.state_machine.round()).await;
ret
}

pub async fn handle_event<ContextT: ConsensusContext>(
Expand All @@ -305,7 +299,7 @@ impl SingleHeightConsensus {
if last_vote.round > round {
return Ok(ShcReturn::Tasks(Vec::new()));
}
context.broadcast(ConsensusMessage::Vote(last_vote.clone())).await?;
context.broadcast(last_vote.clone()).await?;
Ok(ShcReturn::Tasks(vec![ShcTask::Prevote(
self.timeouts.prevote_timeout,
StateMachineEvent::Prevote(proposal_id, round),
Expand All @@ -318,7 +312,7 @@ impl SingleHeightConsensus {
if last_vote.round > round {
return Ok(ShcReturn::Tasks(Vec::new()));
}
context.broadcast(ConsensusMessage::Vote(last_vote.clone())).await?;
context.broadcast(last_vote.clone()).await?;
Ok(ShcReturn::Tasks(vec![ShcTask::Precommit(
self.timeouts.precommit_timeout,
StateMachineEvent::Precommit(proposal_id, round),
Expand Down Expand Up @@ -401,11 +395,7 @@ impl SingleHeightConsensus {
Entry::Occupied(entry) => {
let old = entry.get();
if old.block_hash != vote.block_hash {
return Err(ConsensusError::Equivocation(
self.height,
ConsensusMessage::Vote(old.clone()),
ConsensusMessage::Vote(vote),
));
return Err(ConsensusError::Equivocation(self.height, old.clone(), vote));
} else {
// Replay, ignore.
return Ok(ShcReturn::Tasks(Vec::new()));
Expand Down Expand Up @@ -567,7 +557,7 @@ impl SingleHeightConsensus {
// TODO(matan): Consider refactoring not to panic, rather log and return the error.
panic!("State machine should not send repeat votes: old={:?}, new={:?}", old, vote);
}
context.broadcast(ConsensusMessage::Vote(vote.clone())).await?;
context.broadcast(vote.clone()).await?;
if last_vote.as_ref().map_or(false, |last| round < last.round) {
return Ok(Vec::new());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
use futures::channel::{mpsc, oneshot};
use futures::SinkExt;
use lazy_static::lazy_static;
use papyrus_protobuf::consensus::{
ConsensusMessage,
ProposalFin,
ProposalInit,
DEFAULT_VALIDATOR_ID,
};
use papyrus_protobuf::consensus::{ProposalFin, ProposalInit, Vote, DEFAULT_VALIDATOR_ID};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_types_core::felt::Felt;
use test_case::test_case;
Expand Down Expand Up @@ -96,7 +91,7 @@ async fn proposer() {
context
.expect_broadcast()
.times(1)
.withf(move |msg: &ConsensusMessage| msg == &prevote(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID))
.withf(move |msg: &Vote| msg == &prevote(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID))
.returning(move |_| Ok(()));
// Sends proposal and prevote.
let shc_ret = shc.start(&mut context).await.unwrap();
Expand All @@ -117,9 +112,7 @@ async fn proposer() {
context
.expect_broadcast()
.times(1)
.withf(move |msg: &ConsensusMessage| {
msg == &precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)
})
.withf(move |msg: &Vote| msg == &precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID))
.returning(move |_| Ok(()));
// The Node got a Prevote quorum.
assert_eq!(
Expand Down Expand Up @@ -149,12 +142,7 @@ async fn proposer() {
panic!("Expected decision");
};
assert_eq!(decision.block, BLOCK.id);
assert!(
decision
.precommits
.into_iter()
.all(|item| precommits.contains(&ConsensusMessage::Vote(item)))
);
assert!(decision.precommits.into_iter().all(|item| precommits.contains(&item)));
}

#[test_case(false; "single_proposal")]
Expand Down Expand Up @@ -182,9 +170,7 @@ async fn validator(repeat_proposal: bool) {
context
.expect_broadcast()
.times(1)
.withf(move |msg: &ConsensusMessage| {
msg == &prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1)
})
.withf(move |msg: &Vote| msg == &prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1))
.returning(move |_| Ok(()));
let shc_ret = handle_proposal(&mut shc, &mut context).await;
assert_eq!(shc_ret.as_tasks().unwrap()[0].as_validate_proposal().unwrap().0, &*PROPOSAL_INIT);
Expand All @@ -205,9 +191,7 @@ async fn validator(repeat_proposal: bool) {
context
.expect_broadcast()
.times(1)
.withf(move |msg: &ConsensusMessage| {
msg == &precommit(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1)
})
.withf(move |msg: &Vote| msg == &precommit(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1))
.returning(move |_| Ok(()));
// The Node got a Prevote quorum.
assert_eq!(
Expand All @@ -230,12 +214,7 @@ async fn validator(repeat_proposal: bool) {
panic!("Expected decision");
};
assert_eq!(decision.block, BLOCK.id);
assert!(
decision
.precommits
.into_iter()
.all(|item| precommits.contains(&ConsensusMessage::Vote(item)))
);
assert!(decision.precommits.into_iter().all(|item| precommits.contains(&item)));
}

#[test_case(true; "repeat")]
Expand All @@ -262,7 +241,7 @@ async fn vote_twice(same_vote: bool) {
context
.expect_broadcast()
.times(1) // Shows the repeat vote is ignored.
.withf(move |msg: &ConsensusMessage| msg == &prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1))
.withf(move |msg: &Vote| msg == &prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1))
.returning(move |_| Ok(()));
let shc_ret = handle_proposal(&mut shc, &mut context).await;
assert_eq!(shc_ret.as_tasks().unwrap()[0].as_validate_proposal().unwrap().0, &*PROPOSAL_INIT,);
Expand All @@ -277,7 +256,7 @@ async fn vote_twice(same_vote: bool) {
context
.expect_broadcast()
.times(1) // Shows the repeat vote is ignored.
.withf(move |msg: &ConsensusMessage| msg == &precommit(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1))
.withf(move |msg: &Vote| msg == &precommit(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1))
.returning(move |_| Ok(()));
let res =
shc.handle_message(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)).await;
Expand Down Expand Up @@ -332,7 +311,7 @@ async fn rebroadcast_votes() {
context
.expect_broadcast()
.times(1)
.withf(move |msg: &ConsensusMessage| msg == &prevote(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID))
.withf(move |msg: &Vote| msg == &prevote(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID))
.returning(move |_| Ok(()));
// Sends proposal and prevote.
let shc_ret = shc.start(&mut context).await.unwrap();
Expand All @@ -353,7 +332,7 @@ async fn rebroadcast_votes() {
context
.expect_broadcast()
.times(2) // vote rebroadcast
.withf(move |msg: &ConsensusMessage| {
.withf(move |msg: &Vote| {
msg == &precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)
})
.returning(move |_| Ok(()));
Expand Down Expand Up @@ -395,7 +374,7 @@ async fn repropose() {
context
.expect_broadcast()
.times(1)
.withf(move |msg: &ConsensusMessage| msg == &prevote(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID))
.withf(move |msg: &Vote| msg == &prevote(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID))
.returning(move |_| Ok(()));
// Sends proposal and prevote.
shc.start(&mut context).await.unwrap();
Expand All @@ -411,9 +390,7 @@ async fn repropose() {
context
.expect_broadcast()
.times(1)
.withf(move |msg: &ConsensusMessage| {
msg == &precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)
})
.withf(move |msg: &Vote| msg == &precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID))
.returning(move |_| Ok(()));
// The Node got a Prevote quorum, and set valid proposal.
assert_eq!(
Expand All @@ -436,7 +413,7 @@ async fn repropose() {
context
.expect_broadcast()
.times(1)
.withf(move |msg: &ConsensusMessage| msg == &prevote(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID))
.withf(move |msg: &Vote| msg == &prevote(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID))
.returning(move |_| Ok(()));
shc.handle_message(&mut context, precommits[2].clone()).await.unwrap();
shc.handle_event(
Expand All @@ -459,10 +436,5 @@ async fn repropose() {
panic!("Expected decision");
};
assert_eq!(decision.block, BLOCK.id);
assert!(
decision
.precommits
.into_iter()
.all(|item| precommits.contains(&ConsensusMessage::Vote(item)))
);
assert!(decision.precommits.into_iter().all(|item| precommits.contains(&item)));
}
Loading

0 comments on commit c667343

Please sign in to comment.