Skip to content

Commit

Permalink
chore(consensus): update tracing in manager
Browse files Browse the repository at this point in the history
there are also upcoming tracing changes to the other modules which will complement these changes
  • Loading branch information
matan-starkware committed Dec 25, 2024
1 parent 3b486e4 commit c4b6d93
Showing 1 changed file with 53 additions and 27 deletions.
80 changes: 53 additions & 27 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use papyrus_network_types::network_types::BroadcastedMessageMetadata;
use papyrus_protobuf::consensus::{ProposalInit, Vote};
use papyrus_protobuf::converters::ProtobufConversionError;
use starknet_api::block::BlockNumber;
use tracing::{debug, info, instrument};
use tracing::{debug, info, instrument, trace, warn};

use crate::config::TimeoutsConfig;
use crate::single_height_consensus::{ShcReturn, SingleHeightConsensus};
Expand All @@ -46,7 +46,8 @@ use crate::types::{BroadcastVoteChannel, ConsensusContext, ConsensusError, Decis
/// represented as streams (ProposalInit, Content.*, ProposalFin).
// TODO(dvir): add test for this.
// TODO(Asmaa): Update documentation when we update for the real sync.
#[instrument(skip_all, level = "info")]
// Always print the validator ID since some tests collate multiple consensus logs in a single file.
#[instrument(skip_all, fields(%validator_id), level = "error")]
#[allow(missing_docs)]
#[allow(clippy::too_many_arguments)]
pub async fn run_consensus<ContextT, SyncReceiverT>(
Expand All @@ -65,13 +66,9 @@ where
SyncReceiverT: Stream<Item = BlockNumber> + Unpin,
{
info!(
"Running consensus, start_active_height={}, start_observe_height={}, validator_id={}, \
consensus_delay={}, timeouts={:?}",
start_active_height,
start_observe_height,
validator_id,
"Running consensus, start_active_height={start_active_height}, \
start_observe_height={start_observe_height}, consensus_delay={}, timeouts={timeouts:?}",
consensus_delay.as_secs(),
timeouts
);

// Add a short delay to allow peers to connect and avoid "InsufficientPeers" error
Expand All @@ -96,10 +93,14 @@ where
.await?
{
RunHeightRes::Decision(decision) => {
// We expect there to be under 100 validators, so this is a reasonable number of
// precommits to print.
info!("Decision reached. {:?}", decision);
context.decision_reached(decision.block, decision.precommits).await?;
current_height = current_height.unchecked_next();
}
RunHeightRes::Sync(sync_height) => {
info!("Sync to height: {}. current_height={}", sync_height, current_height);
metrics::increment_counter!(PAPYRUS_CONSENSUS_SYNC_COUNT);
current_height = sync_height.unchecked_next();
}
Expand All @@ -114,6 +115,7 @@ pub enum RunHeightRes {
/// Decision reached.
Decision(Decision),
/// Sync protocol returned a future height.
// TODO(Asmaa): Remove BlockNumber since sync is only for the current block.
Sync(BlockNumber),
}

Expand Down Expand Up @@ -155,7 +157,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
/// Inputs - see [`run_consensus`].
/// - `must_observer`: Whether the node must observe or if it is allowed to be active (assuming
/// it is in the validator set).
#[instrument(skip(self, context, broadcast_channels, sync_receiver), level = "info")]
#[instrument(skip_all)]
pub(crate) async fn run_height<SyncReceiverT>(
&mut self,
context: &mut ContextT,
Expand All @@ -170,7 +172,10 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
{
let validators = context.validators(height).await;
let is_observer = must_observer || !validators.contains(&self.validator_id);
info!("running consensus for height {height:?} with validator set {validators:?}");
info!(
"running consensus for height {height:?}. is_observer: {is_observer}, validators: \
{validators:?}"
);
let mut shc = SingleHeightConsensus::new(
height,
is_observer,
Expand Down Expand Up @@ -207,7 +212,6 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
return Err(ConsensusError::SyncError("Sync receiver closed".to_string()))
};
if sync_height >= height {
info!("Sync to height: {}. current_height={}", sync_height, height);
return Ok(RunHeightRes::Sync(sync_height));
}
debug!("Ignoring sync to height: {}. current_height={}", sync_height, height);
Expand All @@ -226,25 +230,36 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
}
}

#[instrument(skip_all)]
async fn start_height(
&mut self,
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus,
) -> Result<ShcReturn, ConsensusError> {
let mut tasks = match shc.start(context).await? {
decision @ ShcReturn::Decision(_) => return Ok(decision),
decision @ ShcReturn::Decision(_) => {
// Start should generate either TimeoutProposal (validator) or GetProposal
// (proposer). We do not enforce this since the Manager is
// intentionally not meant to understand consensus in detail.
warn!("Decision reached at start of height. {:?}", decision);
return Ok(decision);
}
ShcReturn::Tasks(tasks) => tasks,
};

for (init, content_receiver) in self.get_current_proposal(height) {
let cached_proposals = self.get_current_height_proposals(height);
trace!("Cached proposals for height {}: {:?}", height, cached_proposals);
for (init, content_receiver) in cached_proposals {
match shc.handle_proposal(context, init, content_receiver).await? {
decision @ ShcReturn::Decision(_) => return Ok(decision),
ShcReturn::Tasks(new_tasks) => tasks.extend(new_tasks),
}
}

for msg in self.get_current_height_messages(height) {
let cached_votes = self.get_current_height_votes(height);
trace!("Cached votes for height {}: {:?}", height, cached_votes);
for msg in cached_votes {
match shc.handle_vote(context, msg).await? {
decision @ ShcReturn::Decision(_) => return Ok(decision),
ShcReturn::Tasks(new_tasks) => tasks.extend(new_tasks),
Expand All @@ -255,6 +270,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
}

// Handle a new proposal receiver from the network.
#[instrument(skip_all)]
async fn handle_proposal(
&mut self,
context: &mut ContextT,
Expand All @@ -277,22 +293,29 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
};
let proposal_init: ProposalInit = first_part.try_into()?;

if proposal_init.height != height {
debug!("Received a proposal for a different height or round. {:?}", proposal_init);
if proposal_init.height > height {
match proposal_init.height.cmp(&height) {
std::cmp::Ordering::Greater => {
debug!("Received a proposal for a future height. {:?}", proposal_init);
// Note: new proposals with the same height/round will be ignored.
self.cached_proposals
.entry(proposal_init.height.0)
.or_default()
.entry(proposal_init.round)
.or_insert((proposal_init, content_receiver));
Ok(ShcReturn::Tasks(Vec::new()))
}
std::cmp::Ordering::Less => {
trace!("Drop proposal from past height. {:?}", proposal_init);
Ok(ShcReturn::Tasks(Vec::new()))
}
std::cmp::Ordering::Equal => {
shc.handle_proposal(context, proposal_init, content_receiver).await
}
return Ok(ShcReturn::Tasks(Vec::new()));
}
shc.handle_proposal(context, proposal_init, content_receiver).await
}

// Handle a single consensus message.
#[instrument(skip_all)]
async fn handle_vote(
&mut self,
context: &mut ContextT,
Expand Down Expand Up @@ -321,23 +344,26 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
// TODO(matan): We need to figure out an actual caching strategy under 2 constraints:
// 1. Malicious - must be capped so a malicious peer can't DoS us.
// 2. Parallel proposals - we may send/receive a proposal for (H+1, 0).
// In general I think we will want to only cache (H+1, 0) messages.
if message.height != height.0 {
debug!("Received a message for a different height. {:?}", message);
if message.height > height.0 {
match message.height.cmp(&height.0) {
std::cmp::Ordering::Greater => {
debug!("Cache message for a future height. {:?}", message);
self.future_votes.entry(message.height).or_default().push(message);
Ok(ShcReturn::Tasks(Vec::new()))
}
std::cmp::Ordering::Less => {
trace!("Drop message from past height. {:?}", message);
Ok(ShcReturn::Tasks(Vec::new()))
}
return Ok(ShcReturn::Tasks(Vec::new()));
std::cmp::Ordering::Equal => shc.handle_vote(context, message).await,
}
shc.handle_vote(context, message).await
}

// Checks if a cached proposal already exists (with correct height)
// - returns the proposal if it exists and removes it from the cache.
// - returns None if no proposal exists.
// - cleans up any proposals from earlier heights.
// - for a given height, returns the proposal with the lowest round (and removes it).
fn get_current_proposal(
fn get_current_height_proposals(
&mut self,
height: BlockNumber,
) -> Vec<(ProposalInit, mpsc::Receiver<ContextT::ProposalPart>)> {
Expand All @@ -362,7 +388,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
// - returns all of the current height messages.
// - drops messages from earlier heights.
// - retains future messages in the cache.
fn get_current_height_messages(&mut self, height: BlockNumber) -> Vec<Vote> {
fn get_current_height_votes(&mut self, height: BlockNumber) -> Vec<Vote> {
// Depends on `future_votes` being sorted by height.
loop {
let Some(entry) = self.future_votes.first_entry() else {
Expand Down

0 comments on commit c4b6d93

Please sign in to comment.