From d3516c9652d402abc53101c8542a6f780b19078f Mon Sep 17 00:00:00 2001 From: Andrew Hariri Date: Mon, 27 Jan 2025 13:48:06 -0800 Subject: [PATCH] [exec-pool] BlockRetrievalRequest address Balaji comments --- consensus/src/block_storage/sync_manager.rs | 4 +- consensus/src/epoch_manager.rs | 44 ++++++++++--------- consensus/src/network.rs | 27 +++++++----- consensus/src/network_interface.rs | 13 +++--- consensus/src/network_tests.rs | 2 +- consensus/src/round_manager_test.rs | 44 +++++++++++-------- .../tests/staged/consensus.yaml | 4 +- 7 files changed, 75 insertions(+), 63 deletions(-) diff --git a/consensus/src/block_storage/sync_manager.rs b/consensus/src/block_storage/sync_manager.rs index b28bccfa11f4e2..13d3887fccf442 100644 --- a/consensus/src/block_storage/sync_manager.rs +++ b/consensus/src/block_storage/sync_manager.rs @@ -17,7 +17,7 @@ use crate::{ epoch_manager::LivenessStorageData, logging::{LogEvent, LogSchema}, monitor, - network::{IncomingBlockRetrievalRequest, NetworkSender}, + network::{DeprecatedIncomingBlockRetrievalRequest, NetworkSender}, network_interface::ConsensusMsg, payload_manager::TPayloadManager, persistent_liveness_storage::{LedgerRecoveryData, PersistentLivenessStorage, RecoveryData}, @@ -470,7 +470,7 @@ impl BlockStore { /// future possible changes. pub async fn process_block_retrieval( &self, - request: IncomingBlockRetrievalRequest, + request: DeprecatedIncomingBlockRetrievalRequest, ) -> anyhow::Result<()> { fail_point!("consensus::process_block_retrieval", |_| { Err(anyhow::anyhow!("Injected error in process_block_retrieval")) diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index 69d8b5f441f533..4bf56e5027c2e5 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -31,8 +31,8 @@ use crate::{ metrics_safety_rules::MetricsSafetyRules, monitor, network::{ - IncomingBatchRetrievalRequest, IncomingBlockRetrievalRequest, - IncomingBlockRetrievalRequestV2, IncomingDAGRequest, IncomingRandGenRequest, + DeprecatedIncomingBlockRetrievalRequest, IncomingBatchRetrievalRequest, + IncomingBlockRetrievalRequest, IncomingDAGRequest, IncomingRandGenRequest, IncomingRpcRequest, NetworkReceivers, NetworkSender, }, network_interface::{ConsensusMsg, ConsensusNetworkClient}, @@ -60,7 +60,7 @@ use aptos_bounded_executor::BoundedExecutor; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::config::{ConsensusConfig, DagConsensusConfig, ExecutionConfig, NodeConfig}; use aptos_consensus_types::{ - block_retrieval::{BlockRetrievalRequest, BlockRetrievalRequestV1}, + block_retrieval::BlockRetrievalRequest, common::{Author, Round}, epoch_retrieval::EpochRetrievalRequest, proof_of_store::ProofCache, @@ -155,7 +155,7 @@ pub struct EpochManager { round_manager_close_tx: Option>>, epoch_state: Option>, block_retrieval_tx: - Option>, + Option>, quorum_store_msg_tx: Option>, quorum_store_coordinator_tx: Option>, quorum_store_storage: Arc, @@ -563,11 +563,12 @@ impl EpochManager

{ block_store: Arc, max_blocks_allowed: u64, ) { - let (request_tx, mut request_rx) = aptos_channel::new::<_, IncomingBlockRetrievalRequest>( - QueueStyle::KLAST, - 10, - Some(&counters::BLOCK_RETRIEVAL_TASK_MSGS), - ); + let (request_tx, mut request_rx) = + aptos_channel::new::<_, DeprecatedIncomingBlockRetrievalRequest>( + QueueStyle::KLAST, + 10, + Some(&counters::BLOCK_RETRIEVAL_TASK_MSGS), + ); let task = async move { info!(epoch = epoch, "Block retrieval task starts"); while let Some(request) = request_rx.next().await { @@ -1691,7 +1692,7 @@ impl EpochManager

{ ensure!(matches!( request, IncomingRpcRequest::DeprecatedBlockRetrieval(_) - | IncomingRpcRequest::BlockRetrievalV2(_) + | IncomingRpcRequest::BlockRetrieval(_) )); }, _ => {}, @@ -1731,24 +1732,25 @@ impl EpochManager

{ bail!("Rand manager not started"); } }, - IncomingRpcRequest::BlockRetrievalV2(IncomingBlockRetrievalRequestV2 { + IncomingRpcRequest::BlockRetrieval(IncomingBlockRetrievalRequest { req, protocol, response_sender, }) => { if let Some(tx) = &self.block_retrieval_tx { - let checked_request: BlockRetrievalRequestV1 = match req { - BlockRetrievalRequest::V1(v1) => v1, + match req { + BlockRetrievalRequest::V1(v1) => { + tx.push(peer_id, DeprecatedIncomingBlockRetrievalRequest { + req: v1, + protocol, + response_sender, + }) + }, // TODO @bchocho @hariria implement after all nodes upgrade to release with enum BlockRetrievalRequest (not struct) - BlockRetrievalRequest::V2(_) => { - unimplemented!("Should not have received a BlockRetrievalRequestV2...") + BlockRetrievalRequest::V2(v2) => { + bail!("Should not have received a BlockRetrievalRequestV2 {:?} from peer with id {}", v2, peer_id); }, - }; - tx.push(peer_id, IncomingBlockRetrievalRequest { - req: checked_request, - protocol, - response_sender, - }) + } } else { error!("Round manager not started"); Ok(()) diff --git a/consensus/src/network.rs b/consensus/src/network.rs index 9897db4b885fad..2d91327e84856e 100644 --- a/consensus/src/network.rs +++ b/consensus/src/network.rs @@ -92,15 +92,17 @@ impl RpcResponder { } } -/// NOTE: The [`IncomingBlockRetrievalRequest`](IncomingBlockRetrievalRequest) struct is being -/// deprecated in favor of [`IncomingBlockRetrievalRequestV2`](IncomingBlockRetrievalRequestV2) which -/// supports the new [`BlockRetrievalRequest`](BlockRetrievalRequest) enum for the `req` field +/// NOTE: +/// 1. [`IncomingBlockRetrievalRequest`](DeprecatedIncomingBlockRetrievalRequest) struct was +/// renamed to `DeprecatedIncomingBlockRetrievalRequest`. +/// 2. `DeprecatedIncomingBlockRetrievalRequest` is being deprecated in favor of a new [`IncomingBlockRetrievalRequest`](IncomingBlockRetrievalRequest) +/// struct which supports the new [`BlockRetrievalRequest`](BlockRetrievalRequest) enum for the `req` field /// -/// Going forward, please use [`IncomingBlockRetrievalRequestV2`](IncomingBlockRetrievalRequestV2) +/// Going forward, please use [`IncomingBlockRetrievalRequest`](IncomingBlockRetrievalRequest) /// For more details, see comments above [`BlockRetrievalRequestV1`](BlockRetrievalRequestV1) /// TODO @bchocho @hariria can remove after all nodes upgrade to release with enum BlockRetrievalRequest (not struct) #[derive(Debug)] -pub struct IncomingBlockRetrievalRequest { +pub struct DeprecatedIncomingBlockRetrievalRequest { pub req: BlockRetrievalRequestV1, pub protocol: ProtocolId, pub response_sender: oneshot::Sender>, @@ -109,7 +111,7 @@ pub struct IncomingBlockRetrievalRequest { /// The block retrieval request is used internally for implementing RPC: the callback is executed /// for carrying the response #[derive(Debug)] -pub struct IncomingBlockRetrievalRequestV2 { +pub struct IncomingBlockRetrievalRequest { pub req: BlockRetrievalRequest, pub protocol: ProtocolId, pub response_sender: oneshot::Sender>, @@ -148,12 +150,12 @@ pub struct IncomingRandGenRequest { pub enum IncomingRpcRequest { /// NOTE: This is being phased out in two releases to accommodate `IncomingBlockRetrievalRequestV2` /// TODO @bchocho @hariria can remove after all nodes upgrade to release with enum BlockRetrievalRequest (not struct) - DeprecatedBlockRetrieval(IncomingBlockRetrievalRequest), + DeprecatedBlockRetrieval(DeprecatedIncomingBlockRetrievalRequest), BatchRetrieval(IncomingBatchRetrievalRequest), DAGRequest(IncomingDAGRequest), CommitRequest(IncomingCommitRequest), RandGenRequest(IncomingRandGenRequest), - BlockRetrievalV2(IncomingBlockRetrievalRequestV2), + BlockRetrieval(IncomingBlockRetrievalRequest), } impl IncomingRpcRequest { @@ -165,7 +167,7 @@ impl IncomingRpcRequest { IncomingRpcRequest::RandGenRequest(req) => Some(req.req.epoch()), IncomingRpcRequest::CommitRequest(req) => req.req.epoch(), IncomingRpcRequest::DeprecatedBlockRetrieval(_) => None, - IncomingRpcRequest::BlockRetrievalV2(_) => None, + IncomingRpcRequest::BlockRetrieval(_) => None, } } } @@ -254,7 +256,8 @@ impl NetworkSender { }); ensure!(from != self.author, "Retrieve block from self"); - let msg = ConsensusMsg::BlockRetrievalRequest(Box::new(retrieval_request.clone())); + let msg = + ConsensusMsg::DeprecatedBlockRetrievalRequest(Box::new(retrieval_request.clone())); counters::CONSENSUS_SENT_MSGS .with_label_values(&[msg.name()]) .inc(); @@ -824,7 +827,7 @@ impl NetworkTask { .with_label_values(&[msg.name()]) .inc(); let req = match msg { - ConsensusMsg::BlockRetrievalRequest(request) => { + ConsensusMsg::DeprecatedBlockRetrievalRequest(request) => { debug!( remote_peer = peer_id, event = LogEvent::ReceiveBlockRetrieval, @@ -832,7 +835,7 @@ impl NetworkTask { request ); IncomingRpcRequest::DeprecatedBlockRetrieval( - IncomingBlockRetrievalRequest { + DeprecatedIncomingBlockRetrievalRequest { req: *request, protocol, response_sender: callback, diff --git a/consensus/src/network_interface.rs b/consensus/src/network_interface.rs index ce8d02b97908a3..83a37c39036e00 100644 --- a/consensus/src/network_interface.rs +++ b/consensus/src/network_interface.rs @@ -35,11 +35,12 @@ use std::{collections::HashMap, time::Duration}; /// Network type for consensus #[derive(Clone, Debug, Deserialize, Serialize)] pub enum ConsensusMsg { - /// DEPRECATED: Please use [`ConsensusMsg::BlockRetrievalRequestV2`](ConsensusMsg::BlockRetrievalRequestV2) going forward + /// DEPRECATED: Please use [`ConsensusMsg::BlockRetrievalRequest`](ConsensusMsg::BlockRetrievalRequest) going forward + /// This variant was renamed from `BlockRetrievalRequest` to `DeprecatedBlockRetrievalRequest` /// RPC to get a chain of block of the given length starting from the given block id. /// Note: Naming here is `BlockRetrievalRequest` and not `DeprecatedBlockRetrievalRequest` /// to be consistent with the naming implementation in [`ConsensusMsg::name`](ConsensusMsg::name) - BlockRetrievalRequest(Box), + DeprecatedBlockRetrievalRequest(Box), /// Carries the returned blocks and the retrieval status. BlockRetrievalResponse(Box), /// Request to get a EpochChangeProof from current_epoch to target_epoch @@ -87,7 +88,7 @@ pub enum ConsensusMsg { /// RoundTimeoutMsg is broadcasted by a validator once it decides to timeout the current round. RoundTimeoutMsg(Box), /// RPC to get a chain of block of the given length starting from the given block id, using epoch and round. - BlockRetrievalRequestV2(Box), + BlockRetrievalRequest(Box), } /// Network type for consensus @@ -96,8 +97,7 @@ impl ConsensusMsg { /// TODO @bchocho @hariria can remove after all nodes upgrade to release with enum BlockRetrievalRequest (not struct) pub fn name(&self) -> &str { match self { - // TODO Not changing the naming. Double check if this is OK - ConsensusMsg::BlockRetrievalRequest(_) => "BlockRetrievalRequest", + ConsensusMsg::DeprecatedBlockRetrievalRequest(_) => "DeprecatedBlockRetrievalRequest", ConsensusMsg::BlockRetrievalResponse(_) => "BlockRetrievalResponse", ConsensusMsg::EpochRetrievalRequest(_) => "EpochRetrievalRequest", ConsensusMsg::ProposalMsg(_) => "ProposalMsg", @@ -117,8 +117,7 @@ impl ConsensusMsg { ConsensusMsg::RandGenMessage(_) => "RandGenMessage", ConsensusMsg::BatchResponseV2(_) => "BatchResponseV2", ConsensusMsg::RoundTimeoutMsg(_) => "RoundTimeoutV2", - // TODO should this also be BlockRetrievalRequest? What's the appropriate naming here? - ConsensusMsg::BlockRetrievalRequestV2(_) => "BlockRetrievalRequestV2", + ConsensusMsg::BlockRetrievalRequest(_) => "BlockRetrievalRequest", } } } diff --git a/consensus/src/network_tests.rs b/consensus/src/network_tests.rs index f8fff9630d907a..96950120d12174 100644 --- a/consensus/src/network_tests.rs +++ b/consensus/src/network_tests.rs @@ -880,7 +880,7 @@ mod tests { .push((peer_id, protocol_id), bad_msg) .unwrap(); - let liveness_check_msg = ConsensusMsg::BlockRetrievalRequest(Box::new( + let liveness_check_msg = ConsensusMsg::DeprecatedBlockRetrievalRequest(Box::new( BlockRetrievalRequestV1::new(HashValue::random(), 1), )); diff --git a/consensus/src/round_manager_test.rs b/consensus/src/round_manager_test.rs index e76b66ed815321..3fe18787a8cfe5 100644 --- a/consensus/src/round_manager_test.rs +++ b/consensus/src/round_manager_test.rs @@ -14,7 +14,9 @@ use crate::{ round_state::{ExponentialTimeInterval, RoundState}, }, metrics_safety_rules::MetricsSafetyRules, - network::{IncomingBlockRetrievalRequest, IncomingBlockRetrievalRequestV2, NetworkSender}, + network::{ + DeprecatedIncomingBlockRetrievalRequest, IncomingBlockRetrievalRequest, NetworkSender, + }, network_interface::{CommitMessage, ConsensusMsg, ConsensusNetworkClient, DIRECT_SEND, RPC}, network_tests::{NetworkPlayground, TwinId}, payload_manager::DirectMempoolPayloadManager, @@ -505,16 +507,20 @@ impl NodeSetup { } /// SOON TO BE DEPRECATED: Please use [`poll_block_retrieval_v2`](NodeSetup::poll_block_retrieval_v2) going forward - /// NOTE: [`IncomingBlockRetrievalRequest`](IncomingBlockRetrievalRequest) is being phased out over two releases + /// NOTE: [`IncomingBlockRetrievalRequest`](DeprecatedIncomingBlockRetrievalRequest) is being phased out over two releases /// After the first release, this can be deleted - pub async fn poll_block_retrieval(&mut self) -> Option { + pub async fn poll_block_retrieval( + &mut self, + ) -> Option { match self.poll_next_network_event() { Some(Event::RpcRequest(_, msg, protocol, response_sender)) => match msg { - ConsensusMsg::BlockRetrievalRequest(v) => Some(IncomingBlockRetrievalRequest { - req: *v, - protocol, - response_sender, - }), + ConsensusMsg::DeprecatedBlockRetrievalRequest(v) => { + Some(DeprecatedIncomingBlockRetrievalRequest { + req: *v, + protocol, + response_sender, + }) + }, msg => panic!( "Unexpected Consensus Message: {:?} on node {}", msg, @@ -530,15 +536,17 @@ impl NodeSetup { } } - pub async fn poll_block_retrieval_v2(&mut self) -> Option { + pub async fn poll_block_retrieval_v2(&mut self) -> Option { match self.poll_next_network_event() { Some(Event::RpcRequest(_, msg, protocol, response_sender)) => match msg { - ConsensusMsg::BlockRetrievalRequest(v) => Some(IncomingBlockRetrievalRequestV2 { - req: BlockRetrievalRequest::V1(*v), - protocol, - response_sender, - }), - ConsensusMsg::BlockRetrievalRequestV2(v) => Some(IncomingBlockRetrievalRequestV2 { + ConsensusMsg::DeprecatedBlockRetrievalRequest(v) => { + Some(IncomingBlockRetrievalRequest { + req: BlockRetrievalRequest::V1(*v), + protocol, + response_sender, + }) + }, + ConsensusMsg::BlockRetrievalRequest(v) => Some(IncomingBlockRetrievalRequest { req: *v, protocol, response_sender, @@ -1368,7 +1376,7 @@ fn response_on_block_retrieval() { // first verify that we can retrieve the block if it's in the tree let (tx1, rx1) = oneshot::channel(); - let single_block_request = IncomingBlockRetrievalRequest { + let single_block_request = DeprecatedIncomingBlockRetrievalRequest { req: BlockRetrievalRequestV1::new(block_id, 1), protocol: ProtocolId::ConsensusRpcBcs, response_sender: tx1, @@ -1391,7 +1399,7 @@ fn response_on_block_retrieval() { // verify that if a block is not there, return ID_NOT_FOUND let (tx2, rx2) = oneshot::channel(); - let missing_block_request = IncomingBlockRetrievalRequest { + let missing_block_request = DeprecatedIncomingBlockRetrievalRequest { req: BlockRetrievalRequestV1::new(HashValue::random(), 1), protocol: ProtocolId::ConsensusRpcBcs, response_sender: tx2, @@ -1415,7 +1423,7 @@ fn response_on_block_retrieval() { // if asked for many blocks, return NOT_ENOUGH_BLOCKS let (tx3, rx3) = oneshot::channel(); - let many_block_request = IncomingBlockRetrievalRequest { + let many_block_request = DeprecatedIncomingBlockRetrievalRequest { req: BlockRetrievalRequestV1::new(block_id, 3), protocol: ProtocolId::ConsensusRpcBcs, response_sender: tx3, diff --git a/testsuite/generate-format/tests/staged/consensus.yaml b/testsuite/generate-format/tests/staged/consensus.yaml index d325bc6448e02b..02aef37aade9e6 100644 --- a/testsuite/generate-format/tests/staged/consensus.yaml +++ b/testsuite/generate-format/tests/staged/consensus.yaml @@ -376,7 +376,7 @@ CommitVote: ConsensusMsg: ENUM: 0: - BlockRetrievalRequest: + DeprecatedBlockRetrievalRequest: NEWTYPE: TYPENAME: BlockRetrievalRequestV1 1: @@ -456,7 +456,7 @@ ConsensusMsg: NEWTYPE: TYPENAME: RoundTimeoutMsg 20: - BlockRetrievalRequestV2: + BlockRetrievalRequest: NEWTYPE: TYPENAME: BlockRetrievalRequest ContractEvent: