From 40b17d6137621a803aa03aebac488979c35526e9 Mon Sep 17 00:00:00 2001 From: Asmaa Magdoub Date: Tue, 24 Dec 2024 17:17:08 +0200 Subject: [PATCH] feat(consensus): try sync using context instead of sync_receiver --- crates/papyrus_node/src/run.rs | 2 +- .../papyrus_consensus/src/manager.rs | 32 ++--- .../papyrus_consensus/src/manager_test.rs | 120 +++--------------- .../src/consensus_manager.rs | 2 +- 4 files changed, 35 insertions(+), 121 deletions(-) diff --git a/crates/papyrus_node/src/run.rs b/crates/papyrus_node/src/run.rs index 1bfdeae756..ed9d81c948 100644 --- a/crates/papyrus_node/src/run.rs +++ b/crates/papyrus_node/src/run.rs @@ -220,9 +220,9 @@ fn spawn_consensus( config.validator_id, config.consensus_delay, config.timeouts.clone(), + config.sync_retry_interval, network_channels.into(), inbound_internal_receiver, - futures::stream::pending(), ) .await?) })) diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index 4c3d05f50e..b4e3e62216 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -14,7 +14,7 @@ use std::time::Duration; use futures::channel::mpsc; use futures::stream::FuturesUnordered; -use futures::{Stream, StreamExt}; +use futures::StreamExt; use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT}; use papyrus_network::network_manager::BroadcastTopicClientTrait; use papyrus_network_types::network_types::BroadcastedMessageMetadata; @@ -40,6 +40,7 @@ use crate::types::{BroadcastVoteChannel, ConsensusContext, ConsensusError, Decis /// - `validator_id`: The ID of this node. /// - `consensus_delay`: delay before starting consensus; allowing the network to connect to peers. /// - `timeouts`: The timeouts for the consensus algorithm. +/// - `sync_retry_interval`: The interval to wait between sync retries. /// - `vote_receiver`: The channels to receive votes from the network. These are self contained /// messages. /// - `proposal_receiver`: The channel to receive proposals from the network. Proposals are @@ -49,20 +50,19 @@ use crate::types::{BroadcastVoteChannel, ConsensusContext, ConsensusError, Decis #[instrument(skip_all, level = "info")] #[allow(missing_docs)] #[allow(clippy::too_many_arguments)] -pub async fn run_consensus( +pub async fn run_consensus( mut context: ContextT, start_active_height: BlockNumber, start_observe_height: BlockNumber, validator_id: ValidatorId, consensus_delay: Duration, timeouts: TimeoutsConfig, + sync_retry_interval: Duration, mut vote_receiver: BroadcastVoteChannel, mut proposal_receiver: mpsc::Receiver>, - mut sync_receiver: SyncReceiverT, ) -> Result<(), ConsensusError> where ContextT: ConsensusContext, - SyncReceiverT: Stream + Unpin, { info!( "Running consensus, start_active_height={}, start_observe_height={}, validator_id={}, \ @@ -89,9 +89,9 @@ where &mut context, current_height, must_observer, + sync_retry_interval, &mut vote_receiver, &mut proposal_receiver, - &mut sync_receiver, ) .await? { @@ -155,20 +155,17 @@ impl MultiHeightManager { /// Inputs - see [`run_consensus`]. /// - `must_observer`: Whether the node must observe or if it is allowed to be active (assuming /// it is in the validator set). - #[instrument(skip(self, context, broadcast_channels, sync_receiver), level = "info")] + #[instrument(skip(self, context, broadcast_channels), level = "info")] #[allow(clippy::too_many_arguments)] - pub(crate) async fn run_height( + pub(crate) async fn run_height( &mut self, context: &mut ContextT, height: BlockNumber, must_observer: bool, + sync_retry_interval: Duration, broadcast_channels: &mut BroadcastVoteChannel, proposal_receiver: &mut mpsc::Receiver>, - sync_receiver: &mut SyncReceiverT, - ) -> Result - where - SyncReceiverT: Stream + Unpin, - { + ) -> Result { let validators = context.validators(height).await; let is_observer = must_observer || !validators.contains(&self.validator_id); info!("running consensus for height {height:?} with validator set {validators:?}"); @@ -203,15 +200,10 @@ impl MultiHeightManager { Some(shc_event) = shc_events.next() => { shc.handle_event(context, shc_event).await? }, - sync_height = sync_receiver.next() => { - let Some(sync_height) = sync_height else { - return Err(ConsensusError::SyncError("Sync receiver closed".to_string())) - }; - if sync_height >= height { - info!("Sync to height: {}. current_height={}", sync_height, height); - return Ok(RunHeightRes::Sync(sync_height)); + _ = tokio::time::sleep(sync_retry_interval) => { + if context.try_sync(height).await { + return Ok(RunHeightRes::Sync(height)); } - debug!("Ignoring sync to height: {}. current_height={}", sync_height, height); continue; } }; diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index 6fbaeff15a..33506ff41c 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -1,11 +1,9 @@ -use std::sync::Arc; use std::time::Duration; use std::vec; use futures::channel::{mpsc, oneshot}; use futures::SinkExt; use lazy_static::lazy_static; -use mockall::predicate::eq; use papyrus_network::network_manager::test_utils::{ mock_register_broadcast_topic, MockBroadcastedMessagesSender, @@ -16,12 +14,11 @@ use papyrus_protobuf::consensus::{ProposalFin, Vote, DEFAULT_VALIDATOR_ID}; use papyrus_test_utils::{get_rng, GetTestInstance}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_types_core::felt::Felt; -use tokio::sync::Notify; use super::{run_consensus, MultiHeightManager, RunHeightRes}; use crate::config::TimeoutsConfig; use crate::test_utils::{precommit, prevote, proposal_init, MockTestContext, TestProposalPart}; -use crate::types::{ConsensusError, ValidatorId}; +use crate::types::ValidatorId; lazy_static! { static ref PROPOSER_ID: ValidatorId = DEFAULT_VALIDATOR_ID.into(); @@ -36,6 +33,7 @@ lazy_static! { } const CHANNEL_SIZE: usize = 10; +const SYNC_RETRY_INTERVAL: Duration = Duration::from_millis(100); async fn send(sender: &mut MockBroadcastedMessagesSender, msg: Vote) { let broadcasted_message_metadata = @@ -118,9 +116,9 @@ async fn manager_multiple_heights_unordered() { &mut context, BlockNumber(1), false, + SYNC_RETRY_INTERVAL, &mut subscriber_channels, &mut proposal_receiver_receiver, - &mut futures::stream::pending(), ) .await .unwrap(); @@ -133,9 +131,9 @@ async fn manager_multiple_heights_unordered() { &mut context, BlockNumber(2), false, + SYNC_RETRY_INTERVAL, &mut subscriber_channels, &mut proposal_receiver_receiver, - &mut futures::stream::pending(), ) .await .unwrap(); @@ -155,12 +153,19 @@ async fn run_consensus_sync() { context.expect_proposer().returning(move |_, _| *PROPOSER_ID); context.expect_set_height_and_round().returning(move |_, _| ()); context.expect_broadcast().returning(move |_| Ok(())); - context.expect_decision_reached().return_once(move |block, votes| { - assert_eq!(block, BlockHash(Felt::TWO)); - assert_eq!(votes[0].height, 2); - decision_tx.send(()).unwrap(); - Ok(()) - }); + context + .expect_decision_reached() + .withf(move |block, votes| *block == BlockHash(Felt::TWO) && votes[0].height == 2) + .return_once(move |_, _| { + decision_tx.send(()).unwrap(); + Ok(()) + }); + context + .expect_try_sync() + .withf(move |height| *height == BlockNumber(1)) + .times(1) + .returning(|_| true); + context.expect_try_sync().returning(|_| false); // Send messages for height 2. send_proposal( @@ -175,8 +180,7 @@ async fn run_consensus_sync() { send(&mut network_sender, precommit(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await; // Start at height 1. - let (mut sync_sender, mut sync_receiver) = mpsc::unbounded(); - let consensus_handle = tokio::spawn(async move { + tokio::spawn(async move { run_consensus( context, BlockNumber(1), @@ -184,98 +188,15 @@ async fn run_consensus_sync() { *VALIDATOR_ID, Duration::ZERO, TIMEOUTS.clone(), + SYNC_RETRY_INTERVAL, subscriber_channels.into(), proposal_receiver_receiver, - &mut sync_receiver, ) .await }); - // Send sync for height 1. - sync_sender.send(BlockNumber(1)).await.unwrap(); - // Make sure the sync is processed before the upcoming messages. - tokio::time::sleep(Duration::from_millis(100)).await; - // Decision for height 2. decision_rx.await.unwrap(); - - // Drop the sender to close consensus and gracefully shut down. - drop(sync_sender); - assert!(matches!(consensus_handle.await.unwrap(), Err(ConsensusError::SyncError(_)))); -} - -// Check for cancellation safety when ignoring old heights. If the current height check was done -// within the select branch this test would hang. -#[tokio::test] -async fn run_consensus_sync_cancellation_safety() { - let mut context = MockTestContext::new(); - let proposal_handled = Arc::new(Notify::new()); - let (decision_tx, decision_rx) = oneshot::channel(); - - let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); - - expect_validate_proposal(&mut context, Felt::ONE); - context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]); - context.expect_proposer().returning(move |_, _| *PROPOSER_ID); - context.expect_set_height_and_round().returning(move |_, _| ()); - let proposal_handled_clone = Arc::clone(&proposal_handled); - context - .expect_broadcast() - .with(eq(prevote(Some(Felt::ONE), 1, 0, *VALIDATOR_ID))) - // May occur repeatedly due to re-broadcasting. - .returning(move |_| { - proposal_handled_clone.notify_one(); - Ok(()) - }); - context.expect_broadcast().returning(move |_| Ok(())); - context.expect_decision_reached().return_once(|block, votes| { - assert_eq!(block, BlockHash(Felt::ONE)); - assert_eq!(votes[0].height, 1); - decision_tx.send(()).unwrap(); - Ok(()) - }); - - let TestSubscriberChannels { mock_network, subscriber_channels } = - mock_register_broadcast_topic().unwrap(); - let (mut sync_sender, mut sync_receiver) = mpsc::unbounded(); - - let consensus_handle = tokio::spawn(async move { - run_consensus( - context, - BlockNumber(1), - BlockNumber(1), - *VALIDATOR_ID, - Duration::ZERO, - TIMEOUTS.clone(), - subscriber_channels.into(), - proposal_receiver_receiver, - &mut sync_receiver, - ) - .await - }); - let mut network_sender = mock_network.broadcasted_messages_sender; - - // Send a proposal for height 1. - send_proposal( - &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))], - ) - .await; - proposal_handled.notified().await; - - // Send an old sync. This should not cancel the current height. - sync_sender.send(BlockNumber(0)).await.unwrap(); - // Make sure the sync is processed before the upcoming messages. - tokio::time::sleep(Duration::from_millis(100)).await; - - // Finished messages for 1 - send(&mut network_sender, prevote(Some(Felt::ONE), 1, 0, *PROPOSER_ID)).await; - send(&mut network_sender, precommit(Some(Felt::ONE), 1, 0, *PROPOSER_ID)).await; - decision_rx.await.unwrap(); - - // Drop the sender to close consensus and gracefully shut down. - drop(sync_sender); - assert!(matches!(consensus_handle.await.unwrap(), Err(ConsensusError::SyncError(_)))); } #[tokio::test] @@ -310,6 +231,7 @@ async fn test_timeouts() { .expect_validators() .returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID, *VALIDATOR_ID_2, *VALIDATOR_ID_3]); context.expect_proposer().returning(move |_, _| *PROPOSER_ID); + context.expect_try_sync().returning(|_| false); let (timeout_send, timeout_receive) = oneshot::channel(); // Node handled Timeout events and responded with NIL vote. @@ -330,9 +252,9 @@ async fn test_timeouts() { &mut context, BlockNumber(1), false, + SYNC_RETRY_INTERVAL, &mut subscriber_channels.into(), &mut proposal_receiver_receiver, - &mut futures::stream::pending(), ) .await .unwrap(); diff --git a/crates/starknet_consensus_manager/src/consensus_manager.rs b/crates/starknet_consensus_manager/src/consensus_manager.rs index c1aa115932..7920e7f711 100644 --- a/crates/starknet_consensus_manager/src/consensus_manager.rs +++ b/crates/starknet_consensus_manager/src/consensus_manager.rs @@ -98,9 +98,9 @@ impl ConsensusManager { self.config.consensus_config.validator_id, self.config.consensus_config.consensus_delay, self.config.consensus_config.timeouts.clone(), + self.config.consensus_config.sync_retry_interval, votes_broadcast_channels.into(), inbound_internal_receiver, - futures::stream::pending(), ); tokio::select! {