diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index cc1579d022..8d49a5cbf0 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -21,7 +21,7 @@ use papyrus_network_types::network_types::BroadcastedMessageMetadata; use papyrus_protobuf::consensus::{ProposalInit, Vote}; use papyrus_protobuf::converters::ProtobufConversionError; use starknet_api::block::BlockNumber; -use tracing::{debug, info, instrument}; +use tracing::{debug, info, instrument, trace, warn}; use crate::config::TimeoutsConfig; use crate::single_height_consensus::{ShcReturn, SingleHeightConsensus}; @@ -46,7 +46,8 @@ use crate::types::{BroadcastVoteChannel, ConsensusContext, ConsensusError, Decis /// represented as streams (ProposalInit, Content.*, ProposalFin). // TODO(dvir): add test for this. // TODO(Asmaa): Update documentation when we update for the real sync. -#[instrument(skip_all, level = "info")] +// Always print the validator ID since some tests collate multiple consensus logs in a single file. +#[instrument(skip_all, fields(%validator_id), level = "error")] #[allow(missing_docs)] #[allow(clippy::too_many_arguments)] pub async fn run_consensus( @@ -65,13 +66,9 @@ where SyncReceiverT: Stream + Unpin, { info!( - "Running consensus, start_active_height={}, start_observe_height={}, validator_id={}, \ - consensus_delay={}, timeouts={:?}", - start_active_height, - start_observe_height, - validator_id, + "Running consensus, start_active_height={start_active_height}, \ + start_observe_height={start_observe_height}, consensus_delay={}, timeouts={timeouts:?}", consensus_delay.as_secs(), - timeouts ); // Add a short delay to allow peers to connect and avoid "InsufficientPeers" error @@ -96,10 +93,14 @@ where .await? { RunHeightRes::Decision(decision) => { + // We expect there to be under 100 validators, so this is a reasonable number of + // precommits to print. + info!("Decision reached. {:?}", decision); context.decision_reached(decision.block, decision.precommits).await?; current_height = current_height.unchecked_next(); } RunHeightRes::Sync(sync_height) => { + info!("Sync to height: {}. current_height={}", sync_height, current_height); metrics::increment_counter!(PAPYRUS_CONSENSUS_SYNC_COUNT); current_height = sync_height.unchecked_next(); } @@ -114,6 +115,7 @@ pub enum RunHeightRes { /// Decision reached. Decision(Decision), /// Sync protocol returned a future height. + // TODO(Asmaa): Remove BlockNumber since sync is only for the current block. Sync(BlockNumber), } @@ -155,7 +157,7 @@ impl MultiHeightManager { /// Inputs - see [`run_consensus`]. /// - `must_observer`: Whether the node must observe or if it is allowed to be active (assuming /// it is in the validator set). - #[instrument(skip(self, context, broadcast_channels, sync_receiver), level = "info")] + #[instrument(skip_all)] pub(crate) async fn run_height( &mut self, context: &mut ContextT, @@ -170,7 +172,10 @@ impl MultiHeightManager { { let validators = context.validators(height).await; let is_observer = must_observer || !validators.contains(&self.validator_id); - info!("running consensus for height {height:?} with validator set {validators:?}"); + info!( + "running consensus for height {height:?}. is_observer: {is_observer}, validators: \ + {validators:?}" + ); let mut shc = SingleHeightConsensus::new( height, is_observer, @@ -207,7 +212,6 @@ impl MultiHeightManager { return Err(ConsensusError::SyncError("Sync receiver closed".to_string())) }; if sync_height >= height { - info!("Sync to height: {}. current_height={}", sync_height, height); return Ok(RunHeightRes::Sync(sync_height)); } debug!("Ignoring sync to height: {}. current_height={}", sync_height, height); @@ -226,6 +230,7 @@ impl MultiHeightManager { } } + #[instrument(skip_all)] async fn start_height( &mut self, context: &mut ContextT, @@ -233,18 +238,28 @@ impl MultiHeightManager { shc: &mut SingleHeightConsensus, ) -> Result { let mut tasks = match shc.start(context).await? { - decision @ ShcReturn::Decision(_) => return Ok(decision), + decision @ ShcReturn::Decision(_) => { + // Start should generate either TimeoutProposal (validator) or GetProposal + // (proposer). We do not enforce this since the Manager is + // intentionally not meant to understand consensus in detail. + warn!("Decision reached at start of height. {:?}", decision); + return Ok(decision); + } ShcReturn::Tasks(tasks) => tasks, }; - for (init, content_receiver) in self.get_current_proposal(height) { + let cached_proposals = self.get_current_height_proposals(height); + trace!("Cached proposals for height {}: {:?}", height, cached_proposals); + for (init, content_receiver) in cached_proposals { match shc.handle_proposal(context, init, content_receiver).await? { decision @ ShcReturn::Decision(_) => return Ok(decision), ShcReturn::Tasks(new_tasks) => tasks.extend(new_tasks), } } - for msg in self.get_current_height_messages(height) { + let cached_votes = self.get_current_height_votes(height); + trace!("Cached votes for height {}: {:?}", height, cached_votes); + for msg in cached_votes { match shc.handle_vote(context, msg).await? { decision @ ShcReturn::Decision(_) => return Ok(decision), ShcReturn::Tasks(new_tasks) => tasks.extend(new_tasks), @@ -255,6 +270,7 @@ impl MultiHeightManager { } // Handle a new proposal receiver from the network. + #[instrument(skip_all)] async fn handle_proposal( &mut self, context: &mut ContextT, @@ -277,22 +293,29 @@ impl MultiHeightManager { }; let proposal_init: ProposalInit = first_part.try_into()?; - if proposal_init.height != height { - debug!("Received a proposal for a different height or round. {:?}", proposal_init); - if proposal_init.height > height { + match proposal_init.height.cmp(&height) { + std::cmp::Ordering::Greater => { + debug!("Received a proposal for a future height. {:?}", proposal_init); // Note: new proposals with the same height/round will be ignored. self.cached_proposals .entry(proposal_init.height.0) .or_default() .entry(proposal_init.round) .or_insert((proposal_init, content_receiver)); + Ok(ShcReturn::Tasks(Vec::new())) + } + std::cmp::Ordering::Less => { + trace!("Drop proposal from past height. {:?}", proposal_init); + Ok(ShcReturn::Tasks(Vec::new())) + } + std::cmp::Ordering::Equal => { + shc.handle_proposal(context, proposal_init, content_receiver).await } - return Ok(ShcReturn::Tasks(Vec::new())); } - shc.handle_proposal(context, proposal_init, content_receiver).await } // Handle a single consensus message. + #[instrument(skip_all)] async fn handle_vote( &mut self, context: &mut ContextT, @@ -321,15 +344,18 @@ impl MultiHeightManager { // TODO(matan): We need to figure out an actual caching strategy under 2 constraints: // 1. Malicious - must be capped so a malicious peer can't DoS us. // 2. Parallel proposals - we may send/receive a proposal for (H+1, 0). - // In general I think we will want to only cache (H+1, 0) messages. - if message.height != height.0 { - debug!("Received a message for a different height. {:?}", message); - if message.height > height.0 { + match message.height.cmp(&height.0) { + std::cmp::Ordering::Greater => { + debug!("Cache message for a future height. {:?}", message); self.future_votes.entry(message.height).or_default().push(message); + Ok(ShcReturn::Tasks(Vec::new())) + } + std::cmp::Ordering::Less => { + trace!("Drop message from past height. {:?}", message); + Ok(ShcReturn::Tasks(Vec::new())) } - return Ok(ShcReturn::Tasks(Vec::new())); + std::cmp::Ordering::Equal => shc.handle_vote(context, message).await, } - shc.handle_vote(context, message).await } // Checks if a cached proposal already exists (with correct height) @@ -337,7 +363,7 @@ impl MultiHeightManager { // - returns None if no proposal exists. // - cleans up any proposals from earlier heights. // - for a given height, returns the proposal with the lowest round (and removes it). - fn get_current_proposal( + fn get_current_height_proposals( &mut self, height: BlockNumber, ) -> Vec<(ProposalInit, mpsc::Receiver)> { @@ -362,7 +388,7 @@ impl MultiHeightManager { // - returns all of the current height messages. // - drops messages from earlier heights. // - retains future messages in the cache. - fn get_current_height_messages(&mut self, height: BlockNumber) -> Vec { + fn get_current_height_votes(&mut self, height: BlockNumber) -> Vec { // Depends on `future_votes` being sorted by height. loop { let Some(entry) = self.future_votes.first_entry() else {