Skip to content

Commit

Permalink
feat(consensus): try sync using context instead of sync_receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
asmaastarkware committed Dec 26, 2024
1 parent ca7ae7d commit 6ca2b9c
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 134 deletions.
2 changes: 1 addition & 1 deletion crates/papyrus_node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,9 @@ fn spawn_consensus(
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
config.sync_retry_interval,
network_channels.into(),
inbound_internal_receiver,
futures::stream::pending(),
)
.await?)
}))
Expand Down
32 changes: 12 additions & 20 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::time::Duration;

use futures::channel::mpsc;
use futures::stream::FuturesUnordered;
use futures::{Stream, StreamExt};
use futures::StreamExt;
use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT};
use papyrus_network::network_manager::BroadcastTopicClientTrait;
use papyrus_network_types::network_types::BroadcastedMessageMetadata;
Expand All @@ -40,6 +40,7 @@ use crate::types::{BroadcastVoteChannel, ConsensusContext, ConsensusError, Decis
/// - `validator_id`: The ID of this node.
/// - `consensus_delay`: delay before starting consensus; allowing the network to connect to peers.
/// - `timeouts`: The timeouts for the consensus algorithm.
/// - `sync_retry_interval`: The interval to wait between sync retries.
/// - `vote_receiver`: The channels to receive votes from the network. These are self contained
/// messages.
/// - `proposal_receiver`: The channel to receive proposals from the network. Proposals are
Expand All @@ -49,20 +50,19 @@ use crate::types::{BroadcastVoteChannel, ConsensusContext, ConsensusError, Decis
#[instrument(skip_all, level = "info")]
#[allow(missing_docs)]
#[allow(clippy::too_many_arguments)]
pub async fn run_consensus<ContextT, SyncReceiverT>(
pub async fn run_consensus<ContextT>(
mut context: ContextT,
start_active_height: BlockNumber,
start_observe_height: BlockNumber,
validator_id: ValidatorId,
consensus_delay: Duration,
timeouts: TimeoutsConfig,
sync_retry_interval: Duration,
mut vote_receiver: BroadcastVoteChannel,
mut proposal_receiver: mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
mut sync_receiver: SyncReceiverT,
) -> Result<(), ConsensusError>
where
ContextT: ConsensusContext,
SyncReceiverT: Stream<Item = BlockNumber> + Unpin,
{
info!(
"Running consensus, start_active_height={}, start_observe_height={}, validator_id={}, \
Expand All @@ -89,9 +89,9 @@ where
&mut context,
current_height,
must_observer,
sync_retry_interval,
&mut vote_receiver,
&mut proposal_receiver,
&mut sync_receiver,
)
.await?
{
Expand Down Expand Up @@ -155,20 +155,17 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
/// Inputs - see [`run_consensus`].
/// - `must_observer`: Whether the node must observe or if it is allowed to be active (assuming
/// it is in the validator set).
#[instrument(skip(self, context, broadcast_channels, sync_receiver), level = "info")]
#[instrument(skip(self, context, broadcast_channels), level = "info")]
#[allow(clippy::too_many_arguments)]
pub(crate) async fn run_height<SyncReceiverT>(
pub(crate) async fn run_height(
&mut self,
context: &mut ContextT,
height: BlockNumber,
must_observer: bool,
sync_retry_interval: Duration,
broadcast_channels: &mut BroadcastVoteChannel,
proposal_receiver: &mut mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
sync_receiver: &mut SyncReceiverT,
) -> Result<RunHeightRes, ConsensusError>
where
SyncReceiverT: Stream<Item = BlockNumber> + Unpin,
{
) -> Result<RunHeightRes, ConsensusError> {
let validators = context.validators(height).await;
let is_observer = must_observer || !validators.contains(&self.validator_id);
info!("running consensus for height {height:?} with validator set {validators:?}");
Expand Down Expand Up @@ -203,15 +200,10 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
Some(shc_event) = shc_events.next() => {
shc.handle_event(context, shc_event).await?
},
sync_height = sync_receiver.next() => {
let Some(sync_height) = sync_height else {
return Err(ConsensusError::SyncError("Sync receiver closed".to_string()))
};
if sync_height >= height {
info!("Sync to height: {}. current_height={}", sync_height, height);
return Ok(RunHeightRes::Sync(sync_height));
_ = tokio::time::sleep(sync_retry_interval) => {
if context.try_sync(height).await {
return Ok(RunHeightRes::Sync(height));
}
debug!("Ignoring sync to height: {}. current_height={}", sync_height, height);
continue;
}
};
Expand Down
140 changes: 28 additions & 112 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use std::sync::Arc;
use std::time::Duration;
use std::vec;

use futures::channel::{mpsc, oneshot};
use futures::SinkExt;
use lazy_static::lazy_static;
use mockall::predicate::eq;
use papyrus_network::network_manager::test_utils::{
mock_register_broadcast_topic,
MockBroadcastedMessagesSender,
Expand All @@ -16,12 +14,11 @@ use papyrus_protobuf::consensus::{ProposalFin, Vote, DEFAULT_VALIDATOR_ID};
use papyrus_test_utils::{get_rng, GetTestInstance};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_types_core::felt::Felt;
use tokio::sync::Notify;

use super::{run_consensus, MultiHeightManager, RunHeightRes};
use crate::config::TimeoutsConfig;
use crate::test_utils::{precommit, prevote, proposal_init, MockTestContext, TestProposalPart};
use crate::types::{ConsensusError, ValidatorId};
use crate::types::ValidatorId;

lazy_static! {
static ref PROPOSER_ID: ValidatorId = DEFAULT_VALIDATOR_ID.into();
Expand All @@ -36,6 +33,7 @@ lazy_static! {
}

const CHANNEL_SIZE: usize = 10;
const SYNC_RETRY_INTERVAL: Duration = Duration::from_millis(100);

async fn send(sender: &mut MockBroadcastedMessagesSender<Vote>, msg: Vote) {
let broadcasted_message_metadata =
Expand All @@ -54,10 +52,10 @@ async fn send_proposal(
}
}

fn expect_validate_proposal(context: &mut MockTestContext, block_hash: Felt) {
fn expect_validate_proposal(context: &mut MockTestContext, block_hash: Felt, times: usize) {
context
.expect_validate_proposal()
.return_once(move |_, _, _| {
.returning(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((
Expand All @@ -67,7 +65,7 @@ fn expect_validate_proposal(context: &mut MockTestContext, block_hash: Felt) {
.unwrap();
block_receiver
})
.times(1);
.times(times);
}

fn assert_decision(res: RunHeightRes, id: Felt) {
Expand Down Expand Up @@ -105,7 +103,7 @@ async fn manager_multiple_heights_unordered() {

let mut context = MockTestContext::new();
// Run the manager for height 1.
expect_validate_proposal(&mut context, Felt::ONE);
expect_validate_proposal(&mut context, Felt::ONE, 1);
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_set_height_and_round().returning(move |_, _| ());
Expand All @@ -118,24 +116,24 @@ async fn manager_multiple_heights_unordered() {
&mut context,
BlockNumber(1),
false,
SYNC_RETRY_INTERVAL,
&mut subscriber_channels,
&mut proposal_receiver_receiver,
&mut futures::stream::pending(),
)
.await
.unwrap();
assert_decision(decision, Felt::ONE);

// Run the manager for height 2.
expect_validate_proposal(&mut context, Felt::TWO);
expect_validate_proposal(&mut context, Felt::TWO, 1);
let decision = manager
.run_height(
&mut context,
BlockNumber(2),
false,
SYNC_RETRY_INTERVAL,
&mut subscriber_channels,
&mut proposal_receiver_receiver,
&mut futures::stream::pending(),
)
.await
.unwrap();
Expand All @@ -150,17 +148,24 @@ async fn run_consensus_sync() {

let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

expect_validate_proposal(&mut context, Felt::TWO);
expect_validate_proposal(&mut context, Felt::TWO, 1);
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_broadcast().returning(move |_| Ok(()));
context.expect_decision_reached().return_once(move |block, votes| {
assert_eq!(block, BlockHash(Felt::TWO));
assert_eq!(votes[0].height, 2);
decision_tx.send(()).unwrap();
Ok(())
});
context
.expect_decision_reached()
.withf(move |block, votes| *block == BlockHash(Felt::TWO) && votes[0].height == 2)
.return_once(move |_, _| {
decision_tx.send(()).unwrap();
Ok(())
});
context
.expect_try_sync()
.withf(move |height| *height == BlockNumber(1))
.times(1)
.returning(|_| true);
context.expect_try_sync().returning(|_| false);

// Send messages for height 2.
send_proposal(
Expand All @@ -175,107 +180,23 @@ async fn run_consensus_sync() {
send(&mut network_sender, precommit(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await;

// Start at height 1.
let (mut sync_sender, mut sync_receiver) = mpsc::unbounded();
let consensus_handle = tokio::spawn(async move {
tokio::spawn(async move {
run_consensus(
context,
BlockNumber(1),
BlockNumber(1),
*VALIDATOR_ID,
Duration::ZERO,
TIMEOUTS.clone(),
SYNC_RETRY_INTERVAL,
subscriber_channels.into(),
proposal_receiver_receiver,
&mut sync_receiver,
)
.await
});

// Send sync for height 1.
sync_sender.send(BlockNumber(1)).await.unwrap();
// Make sure the sync is processed before the upcoming messages.
tokio::time::sleep(Duration::from_millis(100)).await;

// Decision for height 2.
decision_rx.await.unwrap();

// Drop the sender to close consensus and gracefully shut down.
drop(sync_sender);
assert!(matches!(consensus_handle.await.unwrap(), Err(ConsensusError::SyncError(_))));
}

// Check for cancellation safety when ignoring old heights. If the current height check was done
// within the select branch this test would hang.
#[tokio::test]
async fn run_consensus_sync_cancellation_safety() {
let mut context = MockTestContext::new();
let proposal_handled = Arc::new(Notify::new());
let (decision_tx, decision_rx) = oneshot::channel();

let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

expect_validate_proposal(&mut context, Felt::ONE);
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_set_height_and_round().returning(move |_, _| ());
let proposal_handled_clone = Arc::clone(&proposal_handled);
context
.expect_broadcast()
.with(eq(prevote(Some(Felt::ONE), 1, 0, *VALIDATOR_ID)))
// May occur repeatedly due to re-broadcasting.
.returning(move |_| {
proposal_handled_clone.notify_one();
Ok(())
});
context.expect_broadcast().returning(move |_| Ok(()));
context.expect_decision_reached().return_once(|block, votes| {
assert_eq!(block, BlockHash(Felt::ONE));
assert_eq!(votes[0].height, 1);
decision_tx.send(()).unwrap();
Ok(())
});

let TestSubscriberChannels { mock_network, subscriber_channels } =
mock_register_broadcast_topic().unwrap();
let (mut sync_sender, mut sync_receiver) = mpsc::unbounded();

let consensus_handle = tokio::spawn(async move {
run_consensus(
context,
BlockNumber(1),
BlockNumber(1),
*VALIDATOR_ID,
Duration::ZERO,
TIMEOUTS.clone(),
subscriber_channels.into(),
proposal_receiver_receiver,
&mut sync_receiver,
)
.await
});
let mut network_sender = mock_network.broadcasted_messages_sender;

// Send a proposal for height 1.
send_proposal(
&mut proposal_receiver_sender,
vec![TestProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))],
)
.await;
proposal_handled.notified().await;

// Send an old sync. This should not cancel the current height.
sync_sender.send(BlockNumber(0)).await.unwrap();
// Make sure the sync is processed before the upcoming messages.
tokio::time::sleep(Duration::from_millis(100)).await;

// Finished messages for 1
send(&mut network_sender, prevote(Some(Felt::ONE), 1, 0, *PROPOSER_ID)).await;
send(&mut network_sender, precommit(Some(Felt::ONE), 1, 0, *PROPOSER_ID)).await;
decision_rx.await.unwrap();

// Drop the sender to close consensus and gracefully shut down.
drop(sync_sender);
assert!(matches!(consensus_handle.await.unwrap(), Err(ConsensusError::SyncError(_))));
}

#[tokio::test]
Expand All @@ -299,17 +220,12 @@ async fn test_timeouts() {

let mut context = MockTestContext::new();
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_validate_proposal().returning(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) }))
.unwrap();
block_receiver
});
expect_validate_proposal(&mut context, Felt::ONE, 2);
context
.expect_validators()
.returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID, *VALIDATOR_ID_2, *VALIDATOR_ID_3]);
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_try_sync().returning(|_| false);

let (timeout_send, timeout_receive) = oneshot::channel();
// Node handled Timeout events and responded with NIL vote.
Expand All @@ -330,9 +246,9 @@ async fn test_timeouts() {
&mut context,
BlockNumber(1),
false,
SYNC_RETRY_INTERVAL,
&mut subscriber_channels.into(),
&mut proposal_receiver_receiver,
&mut futures::stream::pending(),
)
.await
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion crates/starknet_consensus_manager/src/consensus_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ impl ConsensusManager {
self.config.consensus_config.validator_id,
self.config.consensus_config.consensus_delay,
self.config.consensus_config.timeouts.clone(),
self.config.consensus_config.sync_retry_interval,
votes_broadcast_channels.into(),
inbound_internal_receiver,
futures::stream::pending(),
);

tokio::select! {
Expand Down

0 comments on commit 6ca2b9c

Please sign in to comment.