diff --git a/crates/papyrus_protobuf/src/consensus.rs b/crates/papyrus_protobuf/src/consensus.rs index ae2f2abebd4..6ff6a7cd23d 100644 --- a/crates/papyrus_protobuf/src/consensus.rs +++ b/crates/papyrus_protobuf/src/consensus.rs @@ -54,7 +54,7 @@ pub enum StreamMessageBody { Fin, } -#[derive(Debug, Clone, Hash, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, Hash, PartialEq)] pub struct StreamMessage> + TryFrom, Error = ProtobufConversionError>> { pub message: StreamMessageBody, pub stream_id: u64, @@ -62,7 +62,7 @@ pub struct StreamMessage> + TryFrom, Error = ProtobufCon } /// This message must be sent first when proposing a new block. -#[derive(Default, Debug, Clone, PartialEq)] +#[derive(Clone, Copy, Debug, Default, PartialEq)] pub struct ProposalInit { /// The height of the consensus (block number). pub height: BlockNumber, diff --git a/crates/papyrus_protobuf/src/converters/consensus_test.rs b/crates/papyrus_protobuf/src/converters/consensus_test.rs index 57e328a7fac..422c6a98107 100644 --- a/crates/papyrus_protobuf/src/converters/consensus_test.rs +++ b/crates/papyrus_protobuf/src/converters/consensus_test.rs @@ -112,7 +112,7 @@ fn convert_proposal_init_to_vec_u8_and_back() { let proposal_init = ProposalInit::get_test_instance(&mut rng); - let bytes_data: Vec = proposal_init.clone().into(); + let bytes_data: Vec = proposal_init.into(); let res_data = ProposalInit::try_from(bytes_data).unwrap(); assert_eq!(proposal_init, res_data); } diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index 4d8fd04f4a1..0dc091036ae 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -50,9 +50,7 @@ mock! { async fn validate_proposal( &mut self, - height: BlockNumber, - round: Round, - proposer: ValidatorId, + init: ProposalInit, timeout: Duration, content: mpsc::Receiver ) -> oneshot::Receiver; @@ -112,7 +110,7 @@ async fn manager_multiple_heights_unordered() { // Run the manager for height 1. context .expect_validate_proposal() - .return_once(move |_, _, _, _, _| { + .return_once(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender.send(BlockHash(Felt::ONE)).unwrap(); block_receiver @@ -139,7 +137,7 @@ async fn manager_multiple_heights_unordered() { // Run the manager for height 2. context .expect_validate_proposal() - .return_once(move |_, _, _, _, _| { + .return_once(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender.send(BlockHash(Felt::TWO)).unwrap(); block_receiver @@ -166,7 +164,7 @@ async fn run_consensus_sync() { // TODO(guyn): refactor this test to pass proposals through the correct channels. let (mut _proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); - context.expect_validate_proposal().return_once(move |_, _, _, _, _| { + context.expect_validate_proposal().return_once(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender.send(BlockHash(Felt::TWO)).unwrap(); block_receiver @@ -230,7 +228,7 @@ async fn run_consensus_sync_cancellation_safety() { // TODO(guyn): refactor this test to pass proposals through the correct channels. let (mut _proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); - context.expect_validate_proposal().return_once(move |_, _, _, _, _| { + context.expect_validate_proposal().return_once(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender.send(BlockHash(Felt::ONE)).unwrap(); block_receiver @@ -308,7 +306,7 @@ async fn test_timeouts() { let mut context = MockTestContext::new(); context.expect_set_height_and_round().returning(move |_, _, _| ()); - context.expect_validate_proposal().returning(move |_, _, _, _, _| { + context.expect_validate_proposal().returning(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender.send(BlockHash(Felt::ONE)).unwrap(); block_receiver diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs index f5bddfd1359..0f1beb5e7c1 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs @@ -241,13 +241,7 @@ impl SingleHeightConsensus { // twice in parallel. This could be caused by a network repeat or a malicious spam attack. proposal_entry.insert(None); let block_receiver = context - .validate_proposal( - self.height, - init.round, - init.proposer, - self.timeouts.proposal_timeout, - p2p_messages_receiver, - ) + .validate_proposal(init, self.timeouts.proposal_timeout, p2p_messages_receiver) .await; context .set_height_and_round(self.height, self.state_machine.round(), ValidatorId::default()) diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs index e391174bec9..6d400c3ebd0 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs @@ -69,7 +69,7 @@ async fn handle_proposal( shc.handle_proposal( context, - PROPOSAL_INIT.clone(), + *PROPOSAL_INIT, mpsc::channel(1).1, // content - ignored by SHC. fin_receiver, ) @@ -174,7 +174,7 @@ async fn validator(repeat_proposal: bool) { ); context.expect_proposer().returning(move |_, _| *PROPOSER_ID); - context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| { + context.expect_validate_proposal().times(1).returning(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender.send(BLOCK.id).unwrap(); block_receiver @@ -253,7 +253,7 @@ async fn vote_twice(same_vote: bool) { ); context.expect_proposer().times(1).returning(move |_, _| *PROPOSER_ID); - context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| { + context.expect_validate_proposal().times(1).returning(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender.send(BLOCK.id).unwrap(); block_receiver diff --git a/crates/sequencing/papyrus_consensus/src/test_utils.rs b/crates/sequencing/papyrus_consensus/src/test_utils.rs index ff455b0e0b8..d49400eed2f 100644 --- a/crates/sequencing/papyrus_consensus/src/test_utils.rs +++ b/crates/sequencing/papyrus_consensus/src/test_utils.rs @@ -40,9 +40,7 @@ mock! { async fn validate_proposal( &mut self, - height: BlockNumber, - round: Round, - proposer: ValidatorId, + init: ProposalInit, timeout: Duration, content: mpsc::Receiver ) -> oneshot::Receiver; diff --git a/crates/sequencing/papyrus_consensus/src/types.rs b/crates/sequencing/papyrus_consensus/src/types.rs index b4cf1a2c0d2..0768ce05901 100644 --- a/crates/sequencing/papyrus_consensus/src/types.rs +++ b/crates/sequencing/papyrus_consensus/src/types.rs @@ -74,9 +74,7 @@ pub trait ConsensusContext { /// by ConsensusContext. async fn validate_proposal( &mut self, - height: BlockNumber, - round: Round, - proposer: ValidatorId, + init: ProposalInit, timeout: Duration, content: mpsc::Receiver, ) -> oneshot::Receiver; diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs index d1046a20f67..047b2e98595 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs @@ -129,7 +129,7 @@ impl ConsensusContext for PapyrusConsensusContext { .await .expect("Failed to send proposal receiver"); proposal_sender - .send(Self::ProposalPart::Init(proposal_init.clone())) + .send(Self::ProposalPart::Init(proposal_init)) .await .expect("Failed to send proposal init"); proposal_sender @@ -164,9 +164,7 @@ impl ConsensusContext for PapyrusConsensusContext { async fn validate_proposal( &mut self, - height: BlockNumber, - _round: Round, - _proposer: ValidatorId, + proposal_init: ProposalInit, _timeout: Duration, mut content: mpsc::Receiver, ) -> oneshot::Receiver { @@ -179,14 +177,19 @@ impl ConsensusContext for PapyrusConsensusContext { // TODO(dvir): consider fix this for the case of reverts. If between the check that // the block in storage and to getting the transaction was a revert // this flow will fail. - wait_for_block(&storage_reader, height).await.expect("Failed to wait to block"); + wait_for_block(&storage_reader, proposal_init.height) + .await + .expect("Failed to wait to block"); let txn = storage_reader.begin_ro_txn().expect("Failed to begin ro txn"); let transactions = txn - .get_block_transactions(height) + .get_block_transactions(proposal_init.height) .expect("Get transactions from storage failed") .unwrap_or_else(|| { - panic!("Block in {height} was not found in storage despite waiting for it") + panic!( + "Block in {} was not found in storage despite waiting for it", + proposal_init.height + ) }); for tx in transactions.iter() { @@ -204,10 +207,13 @@ impl ConsensusContext for PapyrusConsensusContext { } let block_hash = txn - .get_block_header(height) + .get_block_header(proposal_init.height) .expect("Get header from storage failed") .unwrap_or_else(|| { - panic!("Block in {height} was not found in storage despite waiting for it") + panic!( + "Block in {} was not found in storage despite waiting for it", + proposal_init.height + ) }) .block_hash; @@ -215,12 +221,12 @@ impl ConsensusContext for PapyrusConsensusContext { .lock() .expect("Lock on active proposals was poisoned due to a previous panic"); - proposals.entry(height).or_default().insert(block_hash, transactions); + proposals.entry(proposal_init.height).or_default().insert(block_hash, transactions); // Done after inserting the proposal into the map to avoid race conditions between // insertion and calls to `repropose`. // This can happen as a result of sync interrupting `run_height`. fin_sender.send(block_hash).unwrap_or_else(|_| { - warn!("Failed to send block to consensus. height={height}"); + warn!("Failed to send block to consensus. height={}", proposal_init.height); }) } .instrument(debug_span!("consensus_validate_proposal")), diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs index cb63a53562f..11a5a332831 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs @@ -60,9 +60,12 @@ async fn validate_proposal_success() { let fin = papyrus_context .validate_proposal( - block_number, - 0, - ValidatorId::default(), + ProposalInit { + height: block_number, + round: 0, + valid_round: None, + proposer: ValidatorId::default(), + }, Duration::MAX, validate_receiver, ) @@ -87,9 +90,12 @@ async fn validate_proposal_fail() { let fin = papyrus_context .validate_proposal( - block_number, - 0, - ValidatorId::default(), + ProposalInit { + height: block_number, + round: 0, + valid_round: None, + proposer: ValidatorId::default(), + }, Duration::MAX, validate_receiver, ) diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index 47f0118515c..f9027fb4327 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -195,7 +195,7 @@ impl ConsensusContext for SequencerConsensusContext { .await .expect("Failed to send proposal receiver"); proposal_sender - .send(ProposalPart::Init(proposal_init.clone())) + .send(ProposalPart::Init(proposal_init)) .await .expect("Failed to send proposal init"); tokio::spawn( @@ -218,23 +218,28 @@ impl ConsensusContext for SequencerConsensusContext { async fn validate_proposal( &mut self, - height: BlockNumber, - round: Round, - proposer: ValidatorId, + proposal_init: ProposalInit, timeout: Duration, content: mpsc::Receiver, ) -> oneshot::Receiver { - assert_eq!(Some(height), self.current_height); + assert_eq!(Some(proposal_init.height), self.current_height); let (fin_sender, fin_receiver) = oneshot::channel(); - match round.cmp(&self.current_round) { + match proposal_init.round.cmp(&self.current_round) { std::cmp::Ordering::Less => fin_receiver, std::cmp::Ordering::Greater => { - self.queued_proposals.insert(round, ((height, timeout, content), fin_sender)); + self.queued_proposals.insert( + proposal_init.round, + ((proposal_init.height, timeout, content), fin_sender), + ); fin_receiver } std::cmp::Ordering::Equal => { self.validate_current_round_proposal( - height, proposer, timeout, content, fin_sender, + proposal_init.height, + proposal_init.proposer, + timeout, + content, + fin_sender, ) .await; fin_receiver diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs index 60775933782..12d057d9aa4 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -179,7 +179,16 @@ async fn validate_proposal_success() { let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); content_sender.send(TX_BATCH.clone()).await.unwrap(); let fin_receiver = context - .validate_proposal(BlockNumber(0), 0, ValidatorId::default(), TIMEOUT, content_receiver) + .validate_proposal( + ProposalInit { + height: BlockNumber(0), + round: 0, + valid_round: None, + proposer: ValidatorId::default(), + }, + TIMEOUT, + content_receiver, + ) .await; content_sender.close_channel(); assert_eq!(fin_receiver.await.unwrap().0, STATE_DIFF_COMMITMENT.0.0); @@ -233,7 +242,16 @@ async fn repropose() { let txs = vec![generate_invoke_tx(Felt::TWO)]; content_sender.send(txs.clone()).await.unwrap(); let fin_receiver = context - .validate_proposal(BlockNumber(0), 0, ValidatorId::default(), TIMEOUT, content_receiver) + .validate_proposal( + ProposalInit { + height: BlockNumber(0), + round: 0, + valid_round: None, + proposer: ValidatorId::default(), + }, + TIMEOUT, + content_receiver, + ) .await; content_sender.close_channel(); assert_eq!(fin_receiver.await.unwrap().0, STATE_DIFF_COMMITMENT.0.0); @@ -305,7 +323,16 @@ async fn proposals_from_different_rounds() { let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); content_sender.send(TX_BATCH.clone()).await.unwrap(); let fin_receiver_past_round = context - .validate_proposal(BlockNumber(0), 0, ValidatorId::default(), TIMEOUT, content_receiver) + .validate_proposal( + ProposalInit { + height: BlockNumber(0), + round: 0, + valid_round: None, + proposer: ValidatorId::default(), + }, + TIMEOUT, + content_receiver, + ) .await; content_sender.close_channel(); assert!(fin_receiver_past_round.await.is_err()); @@ -314,7 +341,16 @@ async fn proposals_from_different_rounds() { let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); content_sender.send(TX_BATCH.clone()).await.unwrap(); let fin_receiver_curr_round = context - .validate_proposal(BlockNumber(0), 1, ValidatorId::default(), TIMEOUT, content_receiver) + .validate_proposal( + ProposalInit { + height: BlockNumber(0), + round: 1, + valid_round: None, + proposer: ValidatorId::default(), + }, + TIMEOUT, + content_receiver, + ) .await; content_sender.close_channel(); assert_eq!(fin_receiver_curr_round.await.unwrap().0, STATE_DIFF_COMMITMENT.0.0); @@ -323,7 +359,16 @@ async fn proposals_from_different_rounds() { let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); content_sender.send(TX_BATCH.clone()).await.unwrap(); let fin_receiver_future_round = context - .validate_proposal(BlockNumber(0), 2, ValidatorId::default(), TIMEOUT, content_receiver) + .validate_proposal( + ProposalInit { + height: BlockNumber(0), + round: 2, + valid_round: None, + proposer: ValidatorId::default(), + }, + TIMEOUT, + content_receiver, + ) .await; content_sender.close_channel(); assert!(fin_receiver_future_round.now_or_never().is_none()); @@ -391,13 +436,31 @@ async fn interrupt_active_proposal() { // without needing interrupt. let (mut _content_sender_0, content_receiver) = mpsc::channel(CHANNEL_SIZE); let fin_receiver_0 = context - .validate_proposal(BlockNumber(0), 0, ValidatorId::default(), TIMEOUT, content_receiver) + .validate_proposal( + ProposalInit { + height: BlockNumber(0), + round: 0, + valid_round: None, + proposer: ValidatorId::default(), + }, + TIMEOUT, + content_receiver, + ) .await; let (mut content_sender_1, content_receiver) = mpsc::channel(CHANNEL_SIZE); content_sender_1.send(TX_BATCH.clone()).await.unwrap(); let fin_receiver_1 = context - .validate_proposal(BlockNumber(0), 1, ValidatorId::default(), TIMEOUT, content_receiver) + .validate_proposal( + ProposalInit { + height: BlockNumber(0), + round: 0, + valid_round: None, + proposer: ValidatorId::default(), + }, + TIMEOUT, + content_receiver, + ) .await; content_sender_1.close_channel(); // Move the context to the next round.