diff --git a/crates/papyrus_node/src/bin/run_consensus.rs b/crates/papyrus_node/src/bin/run_consensus.rs index 236f825b85..62e5959a6b 100644 --- a/crates/papyrus_node/src/bin/run_consensus.rs +++ b/crates/papyrus_node/src/bin/run_consensus.rs @@ -10,7 +10,7 @@ use futures::stream::StreamExt; use papyrus_consensus::config::ConsensusConfig; use papyrus_consensus::simulation_network_receiver::NetworkReceiver; use papyrus_consensus::stream_handler::StreamHandler; -use papyrus_consensus::types::BroadcastConsensusMessageChannel; +use papyrus_consensus::types::BroadcastVoteChannel; use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext; use papyrus_network::gossipsub_impl::Topic; use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager}; @@ -91,7 +91,7 @@ fn build_consensus( test_config.drop_probability, test_config.invalid_probability, ); - let broadcast_channels = BroadcastConsensusMessageChannel { + let broadcast_vote_channels = BroadcastVoteChannel { broadcasted_messages_receiver: Box::new(network_receiver), broadcast_topic_client: network_channels.broadcast_topic_client, }; @@ -103,7 +103,7 @@ fn build_consensus( consensus_config.validator_id, consensus_config.consensus_delay, consensus_config.timeouts.clone(), - broadcast_channels, + broadcast_vote_channels, inbound_internal_receiver, sync_receiver, ); diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index 8609ca247e..cc1579d022 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -18,20 +18,14 @@ use futures::{Stream, StreamExt}; use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT}; use papyrus_network::network_manager::BroadcastTopicClientTrait; use papyrus_network_types::network_types::BroadcastedMessageMetadata; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit}; +use papyrus_protobuf::consensus::{ProposalInit, Vote}; use papyrus_protobuf::converters::ProtobufConversionError; use starknet_api::block::BlockNumber; use tracing::{debug, info, instrument}; use crate::config::TimeoutsConfig; use crate::single_height_consensus::{ShcReturn, SingleHeightConsensus}; -use crate::types::{ - BroadcastConsensusMessageChannel, - ConsensusContext, - ConsensusError, - Decision, - ValidatorId, -}; +use crate::types::{BroadcastVoteChannel, ConsensusContext, ConsensusError, Decision, ValidatorId}; /// Run consensus indefinitely. /// @@ -62,7 +56,7 @@ pub async fn run_consensus( validator_id: ValidatorId, consensus_delay: Duration, timeouts: TimeoutsConfig, - mut vote_receiver: BroadcastConsensusMessageChannel, + mut vote_receiver: BroadcastVoteChannel, mut proposal_receiver: mpsc::Receiver>, mut sync_receiver: SyncReceiverT, ) -> Result<(), ConsensusError> @@ -130,7 +124,7 @@ type ProposalReceiverTuple = (ProposalInit, mpsc::Receiver); #[derive(Debug, Default)] struct MultiHeightManager { validator_id: ValidatorId, - cached_messages: BTreeMap>, + future_votes: BTreeMap>, // Mapping: { Height : { Round : (Init, Receiver)}} cached_proposals: BTreeMap>>, timeouts: TimeoutsConfig, @@ -141,7 +135,7 @@ impl MultiHeightManager { pub(crate) fn new(validator_id: ValidatorId, timeouts: TimeoutsConfig) -> Self { Self { validator_id, - cached_messages: BTreeMap::new(), + future_votes: BTreeMap::new(), cached_proposals: BTreeMap::new(), timeouts, } @@ -167,7 +161,7 @@ impl MultiHeightManager { context: &mut ContextT, height: BlockNumber, must_observer: bool, - broadcast_channels: &mut BroadcastConsensusMessageChannel, + broadcast_channels: &mut BroadcastVoteChannel, proposal_receiver: &mut mpsc::Receiver>, sync_receiver: &mut SyncReceiverT, ) -> Result @@ -199,7 +193,7 @@ impl MultiHeightManager { loop { let shc_return = tokio::select! { message = broadcast_channels.broadcasted_messages_receiver.next() => { - self.handle_message( + self.handle_vote( context, height, &mut shc, message, broadcast_channels).await? }, Some(content_receiver) = proposal_receiver.next() => { @@ -251,7 +245,7 @@ impl MultiHeightManager { } for msg in self.get_current_height_messages(height) { - match shc.handle_message(context, msg).await? { + match shc.handle_vote(context, msg).await? { decision @ ShcReturn::Decision(_) => return Ok(decision), ShcReturn::Tasks(new_tasks) => tasks.extend(new_tasks), } @@ -299,18 +293,15 @@ impl MultiHeightManager { } // Handle a single consensus message. - async fn handle_message( + async fn handle_vote( &mut self, context: &mut ContextT, height: BlockNumber, shc: &mut SingleHeightConsensus, - message: Option<( - Result, - BroadcastedMessageMetadata, - )>, - broadcast_channels: &mut BroadcastConsensusMessageChannel, + vote: Option<(Result, BroadcastedMessageMetadata)>, + broadcast_channels: &mut BroadcastVoteChannel, ) -> Result { - let message = match message { + let message = match vote { None => Err(ConsensusError::InternalNetworkError( "NetworkReceiver should never be closed".to_string(), )), @@ -331,15 +322,14 @@ impl MultiHeightManager { // 1. Malicious - must be capped so a malicious peer can't DoS us. // 2. Parallel proposals - we may send/receive a proposal for (H+1, 0). // In general I think we will want to only cache (H+1, 0) messages. - if message.height() != height.0 { + if message.height != height.0 { debug!("Received a message for a different height. {:?}", message); - if message.height() > height.0 { - self.cached_messages.entry(message.height()).or_default().push(message); + if message.height > height.0 { + self.future_votes.entry(message.height).or_default().push(message); } return Ok(ShcReturn::Tasks(Vec::new())); } - - shc.handle_message(context, message).await + shc.handle_vote(context, message).await } // Checks if a cached proposal already exists (with correct height) @@ -372,10 +362,10 @@ impl MultiHeightManager { // - returns all of the current height messages. // - drops messages from earlier heights. // - retains future messages in the cache. - fn get_current_height_messages(&mut self, height: BlockNumber) -> Vec { - // Depends on `cached_messages` being sorted by height. + fn get_current_height_messages(&mut self, height: BlockNumber) -> Vec { + // Depends on `future_votes` being sorted by height. loop { - let Some(entry) = self.cached_messages.first_entry() else { + let Some(entry) = self.future_votes.first_entry() else { return Vec::new(); }; match entry.key().cmp(&height.0) { diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index 1f6d59f5fd..ea49229ab1 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -12,7 +12,7 @@ use papyrus_network::network_manager::test_utils::{ TestSubscriberChannels, }; use papyrus_network_types::network_types::BroadcastedMessageMetadata; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalFin, DEFAULT_VALIDATOR_ID}; +use papyrus_protobuf::consensus::{ProposalFin, Vote, DEFAULT_VALIDATOR_ID}; use papyrus_test_utils::{get_rng, GetTestInstance}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_types_core::felt::Felt; @@ -37,7 +37,7 @@ lazy_static! { const CHANNEL_SIZE: usize = 10; -async fn send(sender: &mut MockBroadcastedMessagesSender, msg: ConsensusMessage) { +async fn send(sender: &mut MockBroadcastedMessagesSender, msg: Vote) { let broadcasted_message_metadata = BroadcastedMessageMetadata::get_test_instance(&mut get_rng()); sender.send((msg, broadcasted_message_metadata)).await.unwrap(); @@ -320,7 +320,7 @@ async fn test_timeouts() { context .expect_broadcast() .times(1) - .withf(move |msg: &ConsensusMessage| msg == &prevote(None, 1, 1, *VALIDATOR_ID)) + .withf(move |msg: &Vote| msg == &prevote(None, 1, 1, *VALIDATOR_ID)) .return_once(move |_| { timeout_send.send(()).unwrap(); Ok(()) diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs index 4800ea83db..9380c731f2 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs @@ -17,7 +17,7 @@ use std::time::Duration; #[cfg(test)] use enum_as_inner::EnumAsInner; use futures::channel::{mpsc, oneshot}; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalFin, ProposalInit, Vote, VoteType}; +use papyrus_protobuf::consensus::{ProposalFin, ProposalInit, Vote, VoteType}; use starknet_api::block::BlockNumber; use tracing::{debug, info, instrument, trace, warn}; @@ -263,26 +263,6 @@ impl SingleHeightConsensus { self.handle_state_machine_events(context, sm_events).await } - /// Handle messages from peer nodes. - #[instrument(skip_all)] - pub(crate) async fn handle_message( - &mut self, - context: &mut ContextT, - message: ConsensusMessage, - ) -> Result { - debug!("Received message: {:?}", message); - match message { - ConsensusMessage::Proposal(_) => { - unimplemented!("Proposals should use `handle_proposal` due to fake streaming") - } - ConsensusMessage::Vote(vote) => { - let ret = self.handle_vote(context, vote).await; - context.set_height_and_round(self.height, self.state_machine.round()).await; - ret - } - } - } - pub async fn handle_event( &mut self, context: &mut ContextT, @@ -305,7 +285,7 @@ impl SingleHeightConsensus { if last_vote.round > round { return Ok(ShcReturn::Tasks(Vec::new())); } - context.broadcast(ConsensusMessage::Vote(last_vote.clone())).await?; + context.broadcast(last_vote.clone()).await?; Ok(ShcReturn::Tasks(vec![ShcTask::Prevote( self.timeouts.prevote_timeout, StateMachineEvent::Prevote(proposal_id, round), @@ -318,7 +298,7 @@ impl SingleHeightConsensus { if last_vote.round > round { return Ok(ShcReturn::Tasks(Vec::new())); } - context.broadcast(ConsensusMessage::Vote(last_vote.clone())).await?; + context.broadcast(last_vote.clone()).await?; Ok(ShcReturn::Tasks(vec![ShcTask::Precommit( self.timeouts.precommit_timeout, StateMachineEvent::Precommit(proposal_id, round), @@ -374,8 +354,9 @@ impl SingleHeightConsensus { ret } + /// Handle vote messages from peer nodes. #[instrument(skip_all)] - async fn handle_vote( + pub(crate) async fn handle_vote( &mut self, context: &mut ContextT, vote: Vote, @@ -401,11 +382,7 @@ impl SingleHeightConsensus { Entry::Occupied(entry) => { let old = entry.get(); if old.block_hash != vote.block_hash { - return Err(ConsensusError::Equivocation( - self.height, - ConsensusMessage::Vote(old.clone()), - ConsensusMessage::Vote(vote), - )); + return Err(ConsensusError::Equivocation(self.height, old.clone(), vote)); } else { // Replay, ignore. return Ok(ShcReturn::Tasks(Vec::new())); @@ -414,7 +391,9 @@ impl SingleHeightConsensus { } let leader_fn = |round: Round| -> ValidatorId { context.proposer(self.height, round) }; let sm_events = self.state_machine.handle_event(sm_vote, &leader_fn); - self.handle_state_machine_events(context, sm_events).await + let ret = self.handle_state_machine_events(context, sm_events).await; + context.set_height_and_round(self.height, self.state_machine.round()).await; + ret } // Handle events output by the state machine. @@ -570,7 +549,7 @@ impl SingleHeightConsensus { // TODO(matan): Consider refactoring not to panic, rather log and return the error. panic!("State machine should not send repeat votes: old={:?}, new={:?}", old, vote); } - context.broadcast(ConsensusMessage::Vote(vote.clone())).await?; + context.broadcast(vote.clone()).await?; if last_vote.as_ref().map_or(false, |last| round < last.round) { return Ok(Vec::new()); } diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs index 7648c1d593..420d434a29 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs @@ -1,12 +1,7 @@ use futures::channel::{mpsc, oneshot}; use futures::SinkExt; use lazy_static::lazy_static; -use papyrus_protobuf::consensus::{ - ConsensusMessage, - ProposalFin, - ProposalInit, - DEFAULT_VALIDATOR_ID, -}; +use papyrus_protobuf::consensus::{ProposalFin, ProposalInit, Vote, DEFAULT_VALIDATOR_ID}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_types_core::felt::Felt; use test_case::test_case; @@ -96,7 +91,7 @@ async fn proposer() { context .expect_broadcast() .times(1) - .withf(move |msg: &ConsensusMessage| msg == &prevote(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)) + .withf(move |msg: &Vote| msg == &prevote(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)) .returning(move |_| Ok(())); // Sends proposal and prevote. let shc_ret = shc.start(&mut context).await.unwrap(); @@ -110,20 +105,18 @@ async fn proposer() { Ok(ShcReturn::Tasks(vec![prevote_task(Some(BLOCK.id.0), 0)])) ); assert_eq!( - shc.handle_message(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1)).await, + shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1)).await, Ok(ShcReturn::Tasks(Vec::new())) ); // 3 of 4 Prevotes is enough to send a Precommit. context .expect_broadcast() .times(1) - .withf(move |msg: &ConsensusMessage| { - msg == &precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID) - }) + .withf(move |msg: &Vote| msg == &precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)) .returning(move |_| Ok(())); // The Node got a Prevote quorum. assert_eq!( - shc.handle_message(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)).await, + shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)).await, Ok(ShcReturn::Tasks(vec![timeout_prevote_task(0), precommit_task(Some(BLOCK.id.0), 0),])) ); @@ -134,27 +127,22 @@ async fn proposer() { precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID), ]; assert_eq!( - shc.handle_message(&mut context, precommits[0].clone()).await, + shc.handle_vote(&mut context, precommits[0].clone()).await, Ok(ShcReturn::Tasks(Vec::new())) ); // The disagreeing vote counts towards the timeout, which uses a heterogeneous quorum, but not // the decision, which uses a homogenous quorum. assert_eq!( - shc.handle_message(&mut context, precommits[1].clone()).await, + shc.handle_vote(&mut context, precommits[1].clone()).await, Ok(ShcReturn::Tasks(vec![timeout_precommit_task(0),])) ); let ShcReturn::Decision(decision) = - shc.handle_message(&mut context, precommits[2].clone()).await.unwrap() + shc.handle_vote(&mut context, precommits[2].clone()).await.unwrap() else { panic!("Expected decision"); }; assert_eq!(decision.block, BLOCK.id); - assert!( - decision - .precommits - .into_iter() - .all(|item| precommits.contains(&ConsensusMessage::Vote(item))) - ); + assert!(decision.precommits.into_iter().all(|item| precommits.contains(&item))); } #[test_case(false; "single_proposal")] @@ -182,9 +170,7 @@ async fn validator(repeat_proposal: bool) { context .expect_broadcast() .times(1) - .withf(move |msg: &ConsensusMessage| { - msg == &prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1) - }) + .withf(move |msg: &Vote| msg == &prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1)) .returning(move |_| Ok(())); let shc_ret = handle_proposal(&mut shc, &mut context).await; assert_eq!(shc_ret.as_tasks().unwrap()[0].as_validate_proposal().unwrap().0, &*PROPOSAL_INIT); @@ -198,20 +184,18 @@ async fn validator(repeat_proposal: bool) { assert_eq!(shc_ret, ShcReturn::Tasks(Vec::new())); } assert_eq!( - shc.handle_message(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)).await, + shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)).await, Ok(ShcReturn::Tasks(Vec::new())) ); // 3 of 4 Prevotes is enough to send a Precommit. context .expect_broadcast() .times(1) - .withf(move |msg: &ConsensusMessage| { - msg == &precommit(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1) - }) + .withf(move |msg: &Vote| msg == &precommit(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1)) .returning(move |_| Ok(())); // The Node got a Prevote quorum. assert_eq!( - shc.handle_message(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)).await, + shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)).await, Ok(ShcReturn::Tasks(vec![timeout_prevote_task(0), precommit_task(Some(BLOCK.id.0), 0)])) ); @@ -221,21 +205,16 @@ async fn validator(repeat_proposal: bool) { precommit(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1), ]; assert_eq!( - shc.handle_message(&mut context, precommits[0].clone()).await, + shc.handle_vote(&mut context, precommits[0].clone()).await, Ok(ShcReturn::Tasks(Vec::new())) ); let ShcReturn::Decision(decision) = - shc.handle_message(&mut context, precommits[1].clone()).await.unwrap() + shc.handle_vote(&mut context, precommits[1].clone()).await.unwrap() else { panic!("Expected decision"); }; assert_eq!(decision.block, BLOCK.id); - assert!( - decision - .precommits - .into_iter() - .all(|item| precommits.contains(&ConsensusMessage::Vote(item))) - ); + assert!(decision.precommits.into_iter().all(|item| precommits.contains(&item))); } #[test_case(true; "repeat")] @@ -262,7 +241,7 @@ async fn vote_twice(same_vote: bool) { context .expect_broadcast() .times(1) // Shows the repeat vote is ignored. - .withf(move |msg: &ConsensusMessage| msg == &prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1)) + .withf(move |msg: &Vote| msg == &prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1)) .returning(move |_| Ok(())); let shc_ret = handle_proposal(&mut shc, &mut context).await; assert_eq!(shc_ret.as_tasks().unwrap()[0].as_validate_proposal().unwrap().0, &*PROPOSAL_INIT,); @@ -271,16 +250,15 @@ async fn vote_twice(same_vote: bool) { Ok(ShcReturn::Tasks(vec![prevote_task(Some(BLOCK.id.0), 0)])) ); - let res = shc.handle_message(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)).await; + let res = shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)).await; assert_eq!(res, Ok(ShcReturn::Tasks(Vec::new()))); context .expect_broadcast() .times(1) // Shows the repeat vote is ignored. - .withf(move |msg: &ConsensusMessage| msg == &precommit(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1)) + .withf(move |msg: &Vote| msg == &precommit(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1)) .returning(move |_| Ok(())); - let res = - shc.handle_message(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)).await; + let res = shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)).await; // The Node got a Prevote quorum. assert_eq!( res, @@ -288,12 +266,12 @@ async fn vote_twice(same_vote: bool) { ); let first_vote = precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID); - let res = shc.handle_message(&mut context, first_vote.clone()).await; + let res = shc.handle_vote(&mut context, first_vote.clone()).await; assert_eq!(res, Ok(ShcReturn::Tasks(Vec::new()))); let second_vote = if same_vote { first_vote.clone() } else { precommit(Some(Felt::TWO), 0, 0, *PROPOSER_ID) }; - let res = shc.handle_message(&mut context, second_vote.clone()).await; + let res = shc.handle_vote(&mut context, second_vote.clone()).await; if same_vote { assert_eq!(res, Ok(ShcReturn::Tasks(Vec::new()))); } else { @@ -301,7 +279,7 @@ async fn vote_twice(same_vote: bool) { } let ShcReturn::Decision(decision) = shc - .handle_message(&mut context, precommit(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)) + .handle_vote(&mut context, precommit(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)) .await .unwrap() else { @@ -332,7 +310,7 @@ async fn rebroadcast_votes() { context .expect_broadcast() .times(1) - .withf(move |msg: &ConsensusMessage| msg == &prevote(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)) + .withf(move |msg: &Vote| msg == &prevote(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)) .returning(move |_| Ok(())); // Sends proposal and prevote. let shc_ret = shc.start(&mut context).await.unwrap(); @@ -346,20 +324,20 @@ async fn rebroadcast_votes() { Ok(ShcReturn::Tasks(vec![prevote_task(Some(BLOCK.id.0), 0)])) ); assert_eq!( - shc.handle_message(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1)).await, + shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1)).await, Ok(ShcReturn::Tasks(Vec::new())) ); // 3 of 4 Prevotes is enough to send a Precommit. context .expect_broadcast() .times(2) // vote rebroadcast - .withf(move |msg: &ConsensusMessage| { + .withf(move |msg: &Vote| { msg == &precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID) }) .returning(move |_| Ok(())); // The Node got a Prevote quorum. assert_eq!( - shc.handle_message(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)).await, + shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)).await, Ok(ShcReturn::Tasks(vec![timeout_prevote_task(0), precommit_task(Some(BLOCK.id.0), 0),])) ); // Re-broadcast vote. @@ -395,7 +373,7 @@ async fn repropose() { context .expect_broadcast() .times(1) - .withf(move |msg: &ConsensusMessage| msg == &prevote(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)) + .withf(move |msg: &Vote| msg == &prevote(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)) .returning(move |_| Ok(())); // Sends proposal and prevote. shc.start(&mut context).await.unwrap(); @@ -405,19 +383,15 @@ async fn repropose() { ) .await .unwrap(); - shc.handle_message(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1)) - .await - .unwrap(); + shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1)).await.unwrap(); context .expect_broadcast() .times(1) - .withf(move |msg: &ConsensusMessage| { - msg == &precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID) - }) + .withf(move |msg: &Vote| msg == &precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)) .returning(move |_| Ok(())); // The Node got a Prevote quorum, and set valid proposal. assert_eq!( - shc.handle_message(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)).await, + shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)).await, Ok(ShcReturn::Tasks(vec![timeout_prevote_task(0), precommit_task(Some(BLOCK.id.0), 0),])) ); // Advance to the next round. @@ -426,8 +400,8 @@ async fn repropose() { precommit(None, 0, 0, *VALIDATOR_ID_2), precommit(None, 0, 0, *VALIDATOR_ID_3), ]; - shc.handle_message(&mut context, precommits[0].clone()).await.unwrap(); - shc.handle_message(&mut context, precommits[1].clone()).await.unwrap(); + shc.handle_vote(&mut context, precommits[0].clone()).await.unwrap(); + shc.handle_vote(&mut context, precommits[1].clone()).await.unwrap(); // After NIL precommits, the proposer should re-propose. context.expect_repropose().returning(move |id, init| { assert_eq!(init.height, BlockNumber(0)); @@ -436,9 +410,9 @@ async fn repropose() { context .expect_broadcast() .times(1) - .withf(move |msg: &ConsensusMessage| msg == &prevote(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID)) + .withf(move |msg: &Vote| msg == &prevote(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID)) .returning(move |_| Ok(())); - shc.handle_message(&mut context, precommits[2].clone()).await.unwrap(); + shc.handle_vote(&mut context, precommits[2].clone()).await.unwrap(); shc.handle_event( &mut context, ShcEvent::TimeoutPrecommit(StateMachineEvent::TimeoutPrecommit(0)), @@ -451,18 +425,13 @@ async fn repropose() { precommit(Some(BLOCK.id.0), 0, 1, *VALIDATOR_ID_2), precommit(Some(BLOCK.id.0), 0, 1, *VALIDATOR_ID_3), ]; - shc.handle_message(&mut context, precommits[0].clone()).await.unwrap(); - shc.handle_message(&mut context, precommits[1].clone()).await.unwrap(); + shc.handle_vote(&mut context, precommits[0].clone()).await.unwrap(); + shc.handle_vote(&mut context, precommits[1].clone()).await.unwrap(); let ShcReturn::Decision(decision) = - shc.handle_message(&mut context, precommits[2].clone()).await.unwrap() + shc.handle_vote(&mut context, precommits[2].clone()).await.unwrap() else { panic!("Expected decision"); }; assert_eq!(decision.block, BLOCK.id); - assert!( - decision - .precommits - .into_iter() - .all(|item| precommits.contains(&ConsensusMessage::Vote(item))) - ); + assert!(decision.precommits.into_iter().all(|item| precommits.contains(&item))); } diff --git a/crates/sequencing/papyrus_consensus/src/test_utils.rs b/crates/sequencing/papyrus_consensus/src/test_utils.rs index be41380d35..6a76f6cc95 100644 --- a/crates/sequencing/papyrus_consensus/src/test_utils.rs +++ b/crates/sequencing/papyrus_consensus/src/test_utils.rs @@ -3,7 +3,7 @@ use std::time::Duration; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; use mockall::mock; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalFin, ProposalInit, Vote, VoteType}; +use papyrus_protobuf::consensus::{ProposalFin, ProposalInit, Vote, VoteType}; use papyrus_protobuf::converters::ProtobufConversionError; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_types_core::felt::Felt; @@ -82,7 +82,7 @@ mock! { fn proposer(&self, height: BlockNumber, round: Round) -> ValidatorId; - async fn broadcast(&mut self, message: ConsensusMessage) -> Result<(), ConsensusError>; + async fn broadcast(&mut self, message: Vote) -> Result<(), ConsensusError>; async fn decision_reached( &mut self, @@ -94,30 +94,14 @@ mock! { } } -pub fn prevote( - block_felt: Option, - height: u64, - round: u32, - voter: ValidatorId, -) -> ConsensusMessage { +pub fn prevote(block_felt: Option, height: u64, round: u32, voter: ValidatorId) -> Vote { let block_hash = block_felt.map(BlockHash); - ConsensusMessage::Vote(Vote { vote_type: VoteType::Prevote, height, round, block_hash, voter }) + Vote { vote_type: VoteType::Prevote, height, round, block_hash, voter } } -pub fn precommit( - block_felt: Option, - height: u64, - round: u32, - voter: ValidatorId, -) -> ConsensusMessage { +pub fn precommit(block_felt: Option, height: u64, round: u32, voter: ValidatorId) -> Vote { let block_hash = block_felt.map(BlockHash); - ConsensusMessage::Vote(Vote { - vote_type: VoteType::Precommit, - height, - round, - block_hash, - voter, - }) + Vote { vote_type: VoteType::Precommit, height, round, block_hash, voter } } pub fn proposal_init(height: u64, round: u32, proposer: ValidatorId) -> ProposalInit { ProposalInit { height: BlockNumber(height), round, proposer, ..Default::default() } diff --git a/crates/sequencing/papyrus_consensus/src/types.rs b/crates/sequencing/papyrus_consensus/src/types.rs index c64e2fb253..48244cf8f2 100644 --- a/crates/sequencing/papyrus_consensus/src/types.rs +++ b/crates/sequencing/papyrus_consensus/src/types.rs @@ -11,7 +11,7 @@ use papyrus_network::network_manager::{ GenericReceiver, }; use papyrus_network_types::network_types::BroadcastedMessageMetadata; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalFin, ProposalInit, Vote}; +use papyrus_protobuf::consensus::{ProposalFin, ProposalInit, Vote}; use papyrus_protobuf::converters::ProtobufConversionError; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::core::ContractAddress; @@ -100,7 +100,7 @@ pub trait ConsensusContext { /// Calculates the ID of the Proposer based on the inputs. fn proposer(&self, height: BlockNumber, round: Round) -> ValidatorId; - async fn broadcast(&mut self, message: ConsensusMessage) -> Result<(), ConsensusError>; + async fn broadcast(&mut self, message: Vote) -> Result<(), ConsensusError>; /// Update the context that a decision has been reached for a given height. /// - `block` identifies the decision. @@ -132,17 +132,15 @@ impl Debug for Decision { } } -pub struct BroadcastConsensusMessageChannel { - pub broadcasted_messages_receiver: GenericReceiver<( - Result, - BroadcastedMessageMetadata, - )>, - pub broadcast_topic_client: BroadcastTopicClient, +pub struct BroadcastVoteChannel { + pub broadcasted_messages_receiver: + GenericReceiver<(Result, BroadcastedMessageMetadata)>, + pub broadcast_topic_client: BroadcastTopicClient, } -impl From> for BroadcastConsensusMessageChannel { - fn from(broadcast_topic_channels: BroadcastTopicChannels) -> Self { - BroadcastConsensusMessageChannel { +impl From> for BroadcastVoteChannel { + fn from(broadcast_topic_channels: BroadcastTopicChannels) -> Self { + BroadcastVoteChannel { broadcasted_messages_receiver: Box::new( broadcast_topic_channels.broadcasted_messages_receiver, ), @@ -165,7 +163,7 @@ pub enum ConsensusError { #[error(transparent)] SendError(#[from] mpsc::SendError), #[error("Conflicting messages for block {0}. Old: {1:?}, New: {2:?}")] - Equivocation(BlockNumber, ConsensusMessage, ConsensusMessage), + Equivocation(BlockNumber, Vote, Vote), // Indicates an error in communication between consensus and the node's networking component. // As opposed to an error between this node and peer nodes. #[error("{0}")] diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs index e7b833ffa2..b7cab99875 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs @@ -24,7 +24,6 @@ use papyrus_consensus::types::{ }; use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait}; use papyrus_protobuf::consensus::{ - ConsensusMessage, ProposalFin, ProposalInit, ProposalPart, @@ -47,7 +46,7 @@ const CHANNEL_SIZE: usize = 100; pub struct PapyrusConsensusContext { storage_reader: StorageReader, - network_broadcast_client: BroadcastTopicClient, + network_broadcast_client: BroadcastTopicClient, network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, validators: Vec, sync_broadcast_sender: Option>, @@ -60,7 +59,7 @@ pub struct PapyrusConsensusContext { impl PapyrusConsensusContext { pub fn new( storage_reader: StorageReader, - network_broadcast_client: BroadcastTopicClient, + network_broadcast_client: BroadcastTopicClient, network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, num_validators: u64, sync_broadcast_sender: Option>, @@ -277,7 +276,7 @@ impl ConsensusContext for PapyrusConsensusContext { *self.validators.first().expect("there should be at least one validator") } - async fn broadcast(&mut self, message: ConsensusMessage) -> Result<(), ConsensusError> { + async fn broadcast(&mut self, message: Vote) -> Result<(), ConsensusError> { debug!("Broadcasting message: {message:?}"); self.network_broadcast_client.broadcast_message(message).await?; Ok(()) diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs index ab2276001a..a22d92b0ed 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs @@ -11,7 +11,6 @@ use papyrus_network::network_manager::test_utils::{ }; use papyrus_network::network_manager::BroadcastTopicChannels; use papyrus_protobuf::consensus::{ - ConsensusMessage, ProposalFin, ProposalInit, ProposalPart, @@ -113,12 +112,8 @@ async fn decision() { assert_eq!(sync_network.messages_to_broadcast_receiver.next().await.unwrap(), precommit); } -fn test_setup() -> ( - Block, - PapyrusConsensusContext, - BroadcastNetworkMock, - BroadcastNetworkMock, -) { +fn test_setup() +-> (Block, PapyrusConsensusContext, BroadcastNetworkMock, BroadcastNetworkMock) { let ((storage_reader, mut storage_writer), _temp_dir) = get_test_storage(); let block = get_test_block(5, None, None, None); let block_number = block.header.block_header_without_hash.block_number; diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index 6dd27cc4d9..f9e07c10fd 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -21,7 +21,6 @@ use papyrus_consensus::types::{ }; use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait}; use papyrus_protobuf::consensus::{ - ConsensusMessage, ProposalFin, ProposalInit, ProposalPart, @@ -119,7 +118,7 @@ pub struct SequencerConsensusContext { BTreeMap)>, outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, // Used to broadcast votes to other consensus nodes. - vote_broadcast_client: BroadcastTopicClient, + vote_broadcast_client: BroadcastTopicClient, // Used to convert Transaction to ExecutableTransaction. chain_id: ChainId, cende_ambassador: Arc, @@ -129,7 +128,7 @@ impl SequencerConsensusContext { pub fn new( batcher: Arc, outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, - vote_broadcast_client: BroadcastTopicClient, + vote_broadcast_client: BroadcastTopicClient, num_validators: u64, chain_id: ChainId, cende_ambassador: Arc, @@ -304,7 +303,7 @@ impl ConsensusContext for SequencerConsensusContext { .expect("There should be at least one validator") } - async fn broadcast(&mut self, message: ConsensusMessage) -> Result<(), ConsensusError> { + async fn broadcast(&mut self, message: Vote) -> Result<(), ConsensusError> { debug!("Broadcasting message: {message:?}"); self.vote_broadcast_client.broadcast_message(message).await?; Ok(()) diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs index ab438276da..b72fbe3900 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -14,12 +14,12 @@ use papyrus_network::network_manager::test_utils::{ }; use papyrus_network::network_manager::BroadcastTopicChannels; use papyrus_protobuf::consensus::{ - ConsensusMessage, ProposalFin, ProposalInit, ProposalPart, StreamMessage, TransactionBatch, + Vote, }; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::core::{ChainId, Nonce, StateDiffCommitment}; @@ -67,7 +67,7 @@ fn generate_invoke_tx(nonce: u8) -> Transaction { // Structs which aren't utilized but should not be dropped. struct NetworkDependencies { - _vote_network: BroadcastNetworkMock, + _vote_network: BroadcastNetworkMock, _new_proposal_network: BroadcastNetworkMock>, } diff --git a/crates/starknet_consensus_manager/src/consensus_manager.rs b/crates/starknet_consensus_manager/src/consensus_manager.rs index a40c502b5b..b5fcf4bdec 100644 --- a/crates/starknet_consensus_manager/src/consensus_manager.rs +++ b/crates/starknet_consensus_manager/src/consensus_manager.rs @@ -8,7 +8,7 @@ use papyrus_consensus_orchestrator::cende::CendeAmbassador; use papyrus_consensus_orchestrator::sequencer_consensus_context::SequencerConsensusContext; use papyrus_network::gossipsub_impl::Topic; use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager}; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalPart, StreamMessage}; +use papyrus_protobuf::consensus::{ProposalPart, StreamMessage, Vote}; use starknet_api::block::BlockNumber; use starknet_batcher_types::communication::SharedBatcherClient; use starknet_sequencer_infra::component_definitions::ComponentStarter; @@ -51,7 +51,7 @@ impl ConsensusManager { .expect("Failed to register broadcast topic"); let votes_broadcast_channels = network_manager - .register_broadcast_topic::( + .register_broadcast_topic::( Topic::new(CONSENSUS_VOTES_TOPIC), BROADCAST_BUFFER_SIZE, )