Skip to content

Commit

Permalink
chore(sequencing): remove ConsensusMessage from context
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware committed Dec 22, 2024
1 parent 28f0abb commit dad001e
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 184 deletions.
6 changes: 3 additions & 3 deletions crates/papyrus_node/src/bin/run_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures::stream::StreamExt;
use papyrus_consensus::config::ConsensusConfig;
use papyrus_consensus::simulation_network_receiver::NetworkReceiver;
use papyrus_consensus::stream_handler::StreamHandler;
use papyrus_consensus::types::BroadcastConsensusMessageChannel;
use papyrus_consensus::types::BroadcastVoteChannel;
use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager};
Expand Down Expand Up @@ -91,7 +91,7 @@ fn build_consensus(
test_config.drop_probability,
test_config.invalid_probability,
);
let broadcast_channels = BroadcastConsensusMessageChannel {
let broadcast_vote_channels = BroadcastVoteChannel {
broadcasted_messages_receiver: Box::new(network_receiver),
broadcast_topic_client: network_channels.broadcast_topic_client,
};
Expand All @@ -103,7 +103,7 @@ fn build_consensus(
consensus_config.validator_id,
consensus_config.consensus_delay,
consensus_config.timeouts.clone(),
broadcast_channels,
broadcast_vote_channels,
inbound_internal_receiver,
sync_receiver,
);
Expand Down
42 changes: 16 additions & 26 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,14 @@ use futures::{Stream, StreamExt};
use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT};
use papyrus_network::network_manager::BroadcastTopicClientTrait;
use papyrus_network_types::network_types::BroadcastedMessageMetadata;
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit};
use papyrus_protobuf::consensus::{ProposalInit, Vote};
use papyrus_protobuf::converters::ProtobufConversionError;
use starknet_api::block::BlockNumber;
use tracing::{debug, info, instrument};

use crate::config::TimeoutsConfig;
use crate::single_height_consensus::{ShcReturn, SingleHeightConsensus};
use crate::types::{
BroadcastConsensusMessageChannel,
ConsensusContext,
ConsensusError,
Decision,
ValidatorId,
};
use crate::types::{BroadcastVoteChannel, ConsensusContext, ConsensusError, Decision, ValidatorId};

/// Run consensus indefinitely.
///
Expand Down Expand Up @@ -62,7 +56,7 @@ pub async fn run_consensus<ContextT, SyncReceiverT>(
validator_id: ValidatorId,
consensus_delay: Duration,
timeouts: TimeoutsConfig,
mut vote_receiver: BroadcastConsensusMessageChannel,
mut vote_receiver: BroadcastVoteChannel,
mut proposal_receiver: mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
mut sync_receiver: SyncReceiverT,
) -> Result<(), ConsensusError>
Expand Down Expand Up @@ -130,7 +124,7 @@ type ProposalReceiverTuple<T> = (ProposalInit, mpsc::Receiver<T>);
#[derive(Debug, Default)]
struct MultiHeightManager<ContextT: ConsensusContext> {
validator_id: ValidatorId,
cached_messages: BTreeMap<u64, Vec<ConsensusMessage>>,
cached_vote_messages: BTreeMap<u64, Vec<Vote>>,
// Mapping: { Height : { Round : (Init, Receiver)}}
cached_proposals: BTreeMap<u64, BTreeMap<u32, ProposalReceiverTuple<ContextT::ProposalPart>>>,
timeouts: TimeoutsConfig,
Expand All @@ -141,7 +135,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
pub(crate) fn new(validator_id: ValidatorId, timeouts: TimeoutsConfig) -> Self {
Self {
validator_id,
cached_messages: BTreeMap::new(),
cached_vote_messages: BTreeMap::new(),
cached_proposals: BTreeMap::new(),
timeouts,
}
Expand All @@ -167,7 +161,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
context: &mut ContextT,
height: BlockNumber,
must_observer: bool,
broadcast_channels: &mut BroadcastConsensusMessageChannel,
broadcast_channels: &mut BroadcastVoteChannel,
proposal_receiver: &mut mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
sync_receiver: &mut SyncReceiverT,
) -> Result<RunHeightRes, ConsensusError>
Expand Down Expand Up @@ -251,7 +245,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
}

for msg in self.get_current_height_messages(height) {
match shc.handle_message(context, msg).await? {
match shc.handle_vote(context, msg).await? {
decision @ ShcReturn::Decision(_) => return Ok(decision),
ShcReturn::Tasks(new_tasks) => tasks.extend(new_tasks),
}
Expand Down Expand Up @@ -299,11 +293,8 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus,
message: Option<(
Result<ConsensusMessage, ProtobufConversionError>,
BroadcastedMessageMetadata,
)>,
broadcast_channels: &mut BroadcastConsensusMessageChannel,
message: Option<(Result<Vote, ProtobufConversionError>, BroadcastedMessageMetadata)>,
broadcast_channels: &mut BroadcastVoteChannel,
) -> Result<ShcReturn, ConsensusError> {
let message = match message {
None => Err(ConsensusError::InternalNetworkError(
Expand All @@ -326,15 +317,14 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
// 1. Malicious - must be capped so a malicious peer can't DoS us.
// 2. Parallel proposals - we may send/receive a proposal for (H+1, 0).
// In general I think we will want to only cache (H+1, 0) messages.
if message.height() != height.0 {
if message.height != height.0 {
debug!("Received a message for a different height. {:?}", message);
if message.height() > height.0 {
self.cached_messages.entry(message.height()).or_default().push(message);
if message.height > height.0 {
self.cached_vote_messages.entry(message.height).or_default().push(message);
}
return Ok(ShcReturn::Tasks(Vec::new()));
}

shc.handle_message(context, message).await
shc.handle_vote(context, message).await
}

// Checks if a cached proposal already exists (with correct height)
Expand Down Expand Up @@ -367,10 +357,10 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
// - returns all of the current height messages.
// - drops messages from earlier heights.
// - retains future messages in the cache.
fn get_current_height_messages(&mut self, height: BlockNumber) -> Vec<ConsensusMessage> {
// Depends on `cached_messages` being sorted by height.
fn get_current_height_messages(&mut self, height: BlockNumber) -> Vec<Vote> {
// Depends on `cached_vote_messages` being sorted by height.
loop {
let Some(entry) = self.cached_messages.first_entry() else {
let Some(entry) = self.cached_vote_messages.first_entry() else {
return Vec::new();
};
match entry.key().cmp(&height.0) {
Expand Down
6 changes: 3 additions & 3 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use papyrus_network::network_manager::test_utils::{
TestSubscriberChannels,
};
use papyrus_network_types::network_types::BroadcastedMessageMetadata;
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalFin, DEFAULT_VALIDATOR_ID};
use papyrus_protobuf::consensus::{ProposalFin, Vote, DEFAULT_VALIDATOR_ID};
use papyrus_test_utils::{get_rng, GetTestInstance};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_types_core::felt::Felt;
Expand All @@ -37,7 +37,7 @@ lazy_static! {

const CHANNEL_SIZE: usize = 10;

async fn send(sender: &mut MockBroadcastedMessagesSender<ConsensusMessage>, msg: ConsensusMessage) {
async fn send(sender: &mut MockBroadcastedMessagesSender<Vote>, msg: Vote) {
let broadcasted_message_metadata =
BroadcastedMessageMetadata::get_test_instance(&mut get_rng());
sender.send((msg, broadcasted_message_metadata)).await.unwrap();
Expand Down Expand Up @@ -320,7 +320,7 @@ async fn test_timeouts() {
context
.expect_broadcast()
.times(1)
.withf(move |msg: &ConsensusMessage| msg == &prevote(None, 1, 1, *VALIDATOR_ID))
.withf(move |msg: &Vote| msg == &prevote(None, 1, 1, *VALIDATOR_ID))
.return_once(move |_| {
timeout_send.send(()).unwrap();
Ok(())
Expand Down
51 changes: 22 additions & 29 deletions crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::time::Duration;
#[cfg(test)]
use enum_as_inner::EnumAsInner;
use futures::channel::{mpsc, oneshot};
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalFin, ProposalInit, Vote, VoteType};
use papyrus_protobuf::consensus::{ProposalFin, ProposalInit, Vote, VoteType};
use starknet_api::block::BlockNumber;
use tracing::{debug, info, instrument, trace, warn};

Expand Down Expand Up @@ -264,25 +264,20 @@ impl SingleHeightConsensus {
}

/// Handle messages from peer nodes.
#[instrument(skip_all)]
pub(crate) async fn handle_message<ContextT: ConsensusContext>(
&mut self,
context: &mut ContextT,
message: ConsensusMessage,
) -> Result<ShcReturn, ConsensusError> {
debug!("Received message: {:?}", message);
match message {
ConsensusMessage::Proposal(_) => {
unimplemented!("Proposals should use `handle_proposal` due to fake streaming")
}
ConsensusMessage::Vote(vote) => {
let ret = self.handle_vote(context, vote).await;
context.set_height_and_round(self.height, self.state_machine.round()).await;
ret
}
}
}
// #[instrument(skip_all)]
// pub(crate) async fn handle_message<ContextT: ConsensusContext>(
// &mut self,
// context: &mut ContextT,
// message: Vote,
// ) -> Result<ShcReturn, ConsensusError> {
// debug!("Received message: {:?}", message);

// let ret = self.handle_vote(context, message).await;
// context.set_height_and_round(self.height, self.state_machine.round()).await;
// ret
// }

/// Handle vote messages from peer nodes.
pub async fn handle_event<ContextT: ConsensusContext>(
&mut self,
context: &mut ContextT,
Expand All @@ -305,7 +300,7 @@ impl SingleHeightConsensus {
if last_vote.round > round {
return Ok(ShcReturn::Tasks(Vec::new()));
}
context.broadcast(ConsensusMessage::Vote(last_vote.clone())).await?;
context.broadcast(last_vote.clone()).await?;
Ok(ShcReturn::Tasks(vec![ShcTask::Prevote(
self.timeouts.prevote_timeout,
StateMachineEvent::Prevote(proposal_id, round),
Expand All @@ -318,7 +313,7 @@ impl SingleHeightConsensus {
if last_vote.round > round {
return Ok(ShcReturn::Tasks(Vec::new()));
}
context.broadcast(ConsensusMessage::Vote(last_vote.clone())).await?;
context.broadcast(last_vote.clone()).await?;
Ok(ShcReturn::Tasks(vec![ShcTask::Precommit(
self.timeouts.precommit_timeout,
StateMachineEvent::Precommit(proposal_id, round),
Expand Down Expand Up @@ -375,7 +370,7 @@ impl SingleHeightConsensus {
}

#[instrument(skip_all)]
async fn handle_vote<ContextT: ConsensusContext>(
pub async fn handle_vote<ContextT: ConsensusContext>(
&mut self,
context: &mut ContextT,
vote: Vote,
Expand All @@ -401,11 +396,7 @@ impl SingleHeightConsensus {
Entry::Occupied(entry) => {
let old = entry.get();
if old.block_hash != vote.block_hash {
return Err(ConsensusError::Equivocation(
self.height,
ConsensusMessage::Vote(old.clone()),
ConsensusMessage::Vote(vote),
));
return Err(ConsensusError::Equivocation(self.height, old.clone(), vote));
} else {
// Replay, ignore.
return Ok(ShcReturn::Tasks(Vec::new()));
Expand All @@ -414,7 +405,9 @@ impl SingleHeightConsensus {
}
let leader_fn = |round: Round| -> ValidatorId { context.proposer(self.height, round) };
let sm_events = self.state_machine.handle_event(sm_vote, &leader_fn);
self.handle_state_machine_events(context, sm_events).await
let ret = self.handle_state_machine_events(context, sm_events).await;
context.set_height_and_round(self.height, self.state_machine.round()).await;
ret
}

// Handle events output by the state machine.
Expand Down Expand Up @@ -567,7 +560,7 @@ impl SingleHeightConsensus {
// TODO(matan): Consider refactoring not to panic, rather log and return the error.
panic!("State machine should not send repeat votes: old={:?}, new={:?}", old, vote);
}
context.broadcast(ConsensusMessage::Vote(vote.clone())).await?;
context.broadcast(vote.clone()).await?;
if last_vote.as_ref().map_or(false, |last| round < last.round) {
return Ok(Vec::new());
}
Expand Down
Loading

0 comments on commit dad001e

Please sign in to comment.