Skip to content

Commit

Permalink
[exec-pool] BlockRetrievalRequest address Balaji comments
Browse files Browse the repository at this point in the history
  • Loading branch information
hariria committed Jan 27, 2025
1 parent 902382e commit d3516c9
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 63 deletions.
4 changes: 2 additions & 2 deletions consensus/src/block_storage/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
epoch_manager::LivenessStorageData,
logging::{LogEvent, LogSchema},
monitor,
network::{IncomingBlockRetrievalRequest, NetworkSender},
network::{DeprecatedIncomingBlockRetrievalRequest, NetworkSender},
network_interface::ConsensusMsg,
payload_manager::TPayloadManager,
persistent_liveness_storage::{LedgerRecoveryData, PersistentLivenessStorage, RecoveryData},
Expand Down Expand Up @@ -470,7 +470,7 @@ impl BlockStore {
/// future possible changes.
pub async fn process_block_retrieval(
&self,
request: IncomingBlockRetrievalRequest,
request: DeprecatedIncomingBlockRetrievalRequest,
) -> anyhow::Result<()> {
fail_point!("consensus::process_block_retrieval", |_| {
Err(anyhow::anyhow!("Injected error in process_block_retrieval"))
Expand Down
44 changes: 23 additions & 21 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use crate::{
metrics_safety_rules::MetricsSafetyRules,
monitor,
network::{
IncomingBatchRetrievalRequest, IncomingBlockRetrievalRequest,
IncomingBlockRetrievalRequestV2, IncomingDAGRequest, IncomingRandGenRequest,
DeprecatedIncomingBlockRetrievalRequest, IncomingBatchRetrievalRequest,
IncomingBlockRetrievalRequest, IncomingDAGRequest, IncomingRandGenRequest,
IncomingRpcRequest, NetworkReceivers, NetworkSender,
},
network_interface::{ConsensusMsg, ConsensusNetworkClient},
Expand Down Expand Up @@ -60,7 +60,7 @@ use aptos_bounded_executor::BoundedExecutor;
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_config::config::{ConsensusConfig, DagConsensusConfig, ExecutionConfig, NodeConfig};
use aptos_consensus_types::{
block_retrieval::{BlockRetrievalRequest, BlockRetrievalRequestV1},
block_retrieval::BlockRetrievalRequest,
common::{Author, Round},
epoch_retrieval::EpochRetrievalRequest,
proof_of_store::ProofCache,
Expand Down Expand Up @@ -155,7 +155,7 @@ pub struct EpochManager<P: OnChainConfigProvider> {
round_manager_close_tx: Option<oneshot::Sender<oneshot::Sender<()>>>,
epoch_state: Option<Arc<EpochState>>,
block_retrieval_tx:
Option<aptos_channel::Sender<AccountAddress, IncomingBlockRetrievalRequest>>,
Option<aptos_channel::Sender<AccountAddress, DeprecatedIncomingBlockRetrievalRequest>>,
quorum_store_msg_tx: Option<aptos_channel::Sender<AccountAddress, (Author, VerifiedEvent)>>,
quorum_store_coordinator_tx: Option<Sender<CoordinatorCommand>>,
quorum_store_storage: Arc<dyn QuorumStoreStorage>,
Expand Down Expand Up @@ -563,11 +563,12 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
block_store: Arc<BlockStore>,
max_blocks_allowed: u64,
) {
let (request_tx, mut request_rx) = aptos_channel::new::<_, IncomingBlockRetrievalRequest>(
QueueStyle::KLAST,
10,
Some(&counters::BLOCK_RETRIEVAL_TASK_MSGS),
);
let (request_tx, mut request_rx) =
aptos_channel::new::<_, DeprecatedIncomingBlockRetrievalRequest>(
QueueStyle::KLAST,
10,
Some(&counters::BLOCK_RETRIEVAL_TASK_MSGS),
);
let task = async move {
info!(epoch = epoch, "Block retrieval task starts");
while let Some(request) = request_rx.next().await {
Expand Down Expand Up @@ -1691,7 +1692,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
ensure!(matches!(
request,
IncomingRpcRequest::DeprecatedBlockRetrieval(_)
| IncomingRpcRequest::BlockRetrievalV2(_)
| IncomingRpcRequest::BlockRetrieval(_)
));
},
_ => {},
Expand Down Expand Up @@ -1731,24 +1732,25 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
bail!("Rand manager not started");
}
},
IncomingRpcRequest::BlockRetrievalV2(IncomingBlockRetrievalRequestV2 {
IncomingRpcRequest::BlockRetrieval(IncomingBlockRetrievalRequest {
req,
protocol,
response_sender,
}) => {
if let Some(tx) = &self.block_retrieval_tx {
let checked_request: BlockRetrievalRequestV1 = match req {
BlockRetrievalRequest::V1(v1) => v1,
match req {
BlockRetrievalRequest::V1(v1) => {
tx.push(peer_id, DeprecatedIncomingBlockRetrievalRequest {
req: v1,
protocol,
response_sender,
})
},
// TODO @bchocho @hariria implement after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
BlockRetrievalRequest::V2(_) => {
unimplemented!("Should not have received a BlockRetrievalRequestV2...")
BlockRetrievalRequest::V2(v2) => {
bail!("Should not have received a BlockRetrievalRequestV2 {:?} from peer with id {}", v2, peer_id);
},
};
tx.push(peer_id, IncomingBlockRetrievalRequest {
req: checked_request,
protocol,
response_sender,
})
}
} else {
error!("Round manager not started");
Ok(())
Expand Down
27 changes: 15 additions & 12 deletions consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,17 @@ impl RpcResponder {
}
}

/// NOTE: The [`IncomingBlockRetrievalRequest`](IncomingBlockRetrievalRequest) struct is being
/// deprecated in favor of [`IncomingBlockRetrievalRequestV2`](IncomingBlockRetrievalRequestV2) which
/// supports the new [`BlockRetrievalRequest`](BlockRetrievalRequest) enum for the `req` field
/// NOTE:
/// 1. [`IncomingBlockRetrievalRequest`](DeprecatedIncomingBlockRetrievalRequest) struct was
/// renamed to `DeprecatedIncomingBlockRetrievalRequest`.
/// 2. `DeprecatedIncomingBlockRetrievalRequest` is being deprecated in favor of a new [`IncomingBlockRetrievalRequest`](IncomingBlockRetrievalRequest)
/// struct which supports the new [`BlockRetrievalRequest`](BlockRetrievalRequest) enum for the `req` field
///
/// Going forward, please use [`IncomingBlockRetrievalRequestV2`](IncomingBlockRetrievalRequestV2)
/// Going forward, please use [`IncomingBlockRetrievalRequest`](IncomingBlockRetrievalRequest)
/// For more details, see comments above [`BlockRetrievalRequestV1`](BlockRetrievalRequestV1)
/// TODO @bchocho @hariria can remove after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
#[derive(Debug)]
pub struct IncomingBlockRetrievalRequest {
pub struct DeprecatedIncomingBlockRetrievalRequest {
pub req: BlockRetrievalRequestV1,
pub protocol: ProtocolId,
pub response_sender: oneshot::Sender<Result<Bytes, RpcError>>,
Expand All @@ -109,7 +111,7 @@ pub struct IncomingBlockRetrievalRequest {
/// The block retrieval request is used internally for implementing RPC: the callback is executed
/// for carrying the response
#[derive(Debug)]
pub struct IncomingBlockRetrievalRequestV2 {
pub struct IncomingBlockRetrievalRequest {
pub req: BlockRetrievalRequest,
pub protocol: ProtocolId,
pub response_sender: oneshot::Sender<Result<Bytes, RpcError>>,
Expand Down Expand Up @@ -148,12 +150,12 @@ pub struct IncomingRandGenRequest {
pub enum IncomingRpcRequest {
/// NOTE: This is being phased out in two releases to accommodate `IncomingBlockRetrievalRequestV2`
/// TODO @bchocho @hariria can remove after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
DeprecatedBlockRetrieval(IncomingBlockRetrievalRequest),
DeprecatedBlockRetrieval(DeprecatedIncomingBlockRetrievalRequest),
BatchRetrieval(IncomingBatchRetrievalRequest),
DAGRequest(IncomingDAGRequest),
CommitRequest(IncomingCommitRequest),
RandGenRequest(IncomingRandGenRequest),
BlockRetrievalV2(IncomingBlockRetrievalRequestV2),
BlockRetrieval(IncomingBlockRetrievalRequest),
}

impl IncomingRpcRequest {
Expand All @@ -165,7 +167,7 @@ impl IncomingRpcRequest {
IncomingRpcRequest::RandGenRequest(req) => Some(req.req.epoch()),
IncomingRpcRequest::CommitRequest(req) => req.req.epoch(),
IncomingRpcRequest::DeprecatedBlockRetrieval(_) => None,
IncomingRpcRequest::BlockRetrievalV2(_) => None,
IncomingRpcRequest::BlockRetrieval(_) => None,
}
}
}
Expand Down Expand Up @@ -254,7 +256,8 @@ impl NetworkSender {
});

ensure!(from != self.author, "Retrieve block from self");
let msg = ConsensusMsg::BlockRetrievalRequest(Box::new(retrieval_request.clone()));
let msg =
ConsensusMsg::DeprecatedBlockRetrievalRequest(Box::new(retrieval_request.clone()));
counters::CONSENSUS_SENT_MSGS
.with_label_values(&[msg.name()])
.inc();
Expand Down Expand Up @@ -824,15 +827,15 @@ impl NetworkTask {
.with_label_values(&[msg.name()])
.inc();
let req = match msg {
ConsensusMsg::BlockRetrievalRequest(request) => {
ConsensusMsg::DeprecatedBlockRetrievalRequest(request) => {
debug!(
remote_peer = peer_id,
event = LogEvent::ReceiveBlockRetrieval,
"{}",
request
);
IncomingRpcRequest::DeprecatedBlockRetrieval(
IncomingBlockRetrievalRequest {
DeprecatedIncomingBlockRetrievalRequest {
req: *request,
protocol,
response_sender: callback,
Expand Down
13 changes: 6 additions & 7 deletions consensus/src/network_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ use std::{collections::HashMap, time::Duration};
/// Network type for consensus
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum ConsensusMsg {
/// DEPRECATED: Please use [`ConsensusMsg::BlockRetrievalRequestV2`](ConsensusMsg::BlockRetrievalRequestV2) going forward
/// DEPRECATED: Please use [`ConsensusMsg::BlockRetrievalRequest`](ConsensusMsg::BlockRetrievalRequest) going forward
/// This variant was renamed from `BlockRetrievalRequest` to `DeprecatedBlockRetrievalRequest`
/// RPC to get a chain of block of the given length starting from the given block id.
/// Note: Naming here is `BlockRetrievalRequest` and not `DeprecatedBlockRetrievalRequest`
/// to be consistent with the naming implementation in [`ConsensusMsg::name`](ConsensusMsg::name)
BlockRetrievalRequest(Box<BlockRetrievalRequestV1>),
DeprecatedBlockRetrievalRequest(Box<BlockRetrievalRequestV1>),
/// Carries the returned blocks and the retrieval status.
BlockRetrievalResponse(Box<BlockRetrievalResponse>),
/// Request to get a EpochChangeProof from current_epoch to target_epoch
Expand Down Expand Up @@ -87,7 +88,7 @@ pub enum ConsensusMsg {
/// RoundTimeoutMsg is broadcasted by a validator once it decides to timeout the current round.
RoundTimeoutMsg(Box<RoundTimeoutMsg>),
/// RPC to get a chain of block of the given length starting from the given block id, using epoch and round.
BlockRetrievalRequestV2(Box<BlockRetrievalRequest>),
BlockRetrievalRequest(Box<BlockRetrievalRequest>),
}

/// Network type for consensus
Expand All @@ -96,8 +97,7 @@ impl ConsensusMsg {
/// TODO @bchocho @hariria can remove after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
pub fn name(&self) -> &str {
match self {
// TODO Not changing the naming. Double check if this is OK
ConsensusMsg::BlockRetrievalRequest(_) => "BlockRetrievalRequest",
ConsensusMsg::DeprecatedBlockRetrievalRequest(_) => "DeprecatedBlockRetrievalRequest",
ConsensusMsg::BlockRetrievalResponse(_) => "BlockRetrievalResponse",
ConsensusMsg::EpochRetrievalRequest(_) => "EpochRetrievalRequest",
ConsensusMsg::ProposalMsg(_) => "ProposalMsg",
Expand All @@ -117,8 +117,7 @@ impl ConsensusMsg {
ConsensusMsg::RandGenMessage(_) => "RandGenMessage",
ConsensusMsg::BatchResponseV2(_) => "BatchResponseV2",
ConsensusMsg::RoundTimeoutMsg(_) => "RoundTimeoutV2",
// TODO should this also be BlockRetrievalRequest? What's the appropriate naming here?
ConsensusMsg::BlockRetrievalRequestV2(_) => "BlockRetrievalRequestV2",
ConsensusMsg::BlockRetrievalRequest(_) => "BlockRetrievalRequest",
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/network_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ mod tests {
.push((peer_id, protocol_id), bad_msg)
.unwrap();

let liveness_check_msg = ConsensusMsg::BlockRetrievalRequest(Box::new(
let liveness_check_msg = ConsensusMsg::DeprecatedBlockRetrievalRequest(Box::new(
BlockRetrievalRequestV1::new(HashValue::random(), 1),
));

Expand Down
44 changes: 26 additions & 18 deletions consensus/src/round_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use crate::{
round_state::{ExponentialTimeInterval, RoundState},
},
metrics_safety_rules::MetricsSafetyRules,
network::{IncomingBlockRetrievalRequest, IncomingBlockRetrievalRequestV2, NetworkSender},
network::{
DeprecatedIncomingBlockRetrievalRequest, IncomingBlockRetrievalRequest, NetworkSender,
},
network_interface::{CommitMessage, ConsensusMsg, ConsensusNetworkClient, DIRECT_SEND, RPC},
network_tests::{NetworkPlayground, TwinId},
payload_manager::DirectMempoolPayloadManager,
Expand Down Expand Up @@ -505,16 +507,20 @@ impl NodeSetup {
}

/// SOON TO BE DEPRECATED: Please use [`poll_block_retrieval_v2`](NodeSetup::poll_block_retrieval_v2) going forward
/// NOTE: [`IncomingBlockRetrievalRequest`](IncomingBlockRetrievalRequest) is being phased out over two releases
/// NOTE: [`IncomingBlockRetrievalRequest`](DeprecatedIncomingBlockRetrievalRequest) is being phased out over two releases
/// After the first release, this can be deleted
pub async fn poll_block_retrieval(&mut self) -> Option<IncomingBlockRetrievalRequest> {
pub async fn poll_block_retrieval(
&mut self,
) -> Option<DeprecatedIncomingBlockRetrievalRequest> {
match self.poll_next_network_event() {
Some(Event::RpcRequest(_, msg, protocol, response_sender)) => match msg {
ConsensusMsg::BlockRetrievalRequest(v) => Some(IncomingBlockRetrievalRequest {
req: *v,
protocol,
response_sender,
}),
ConsensusMsg::DeprecatedBlockRetrievalRequest(v) => {
Some(DeprecatedIncomingBlockRetrievalRequest {
req: *v,
protocol,
response_sender,
})
},
msg => panic!(
"Unexpected Consensus Message: {:?} on node {}",
msg,
Expand All @@ -530,15 +536,17 @@ impl NodeSetup {
}
}

pub async fn poll_block_retrieval_v2(&mut self) -> Option<IncomingBlockRetrievalRequestV2> {
pub async fn poll_block_retrieval_v2(&mut self) -> Option<IncomingBlockRetrievalRequest> {
match self.poll_next_network_event() {
Some(Event::RpcRequest(_, msg, protocol, response_sender)) => match msg {
ConsensusMsg::BlockRetrievalRequest(v) => Some(IncomingBlockRetrievalRequestV2 {
req: BlockRetrievalRequest::V1(*v),
protocol,
response_sender,
}),
ConsensusMsg::BlockRetrievalRequestV2(v) => Some(IncomingBlockRetrievalRequestV2 {
ConsensusMsg::DeprecatedBlockRetrievalRequest(v) => {
Some(IncomingBlockRetrievalRequest {
req: BlockRetrievalRequest::V1(*v),
protocol,
response_sender,
})
},
ConsensusMsg::BlockRetrievalRequest(v) => Some(IncomingBlockRetrievalRequest {
req: *v,
protocol,
response_sender,
Expand Down Expand Up @@ -1368,7 +1376,7 @@ fn response_on_block_retrieval() {

// first verify that we can retrieve the block if it's in the tree
let (tx1, rx1) = oneshot::channel();
let single_block_request = IncomingBlockRetrievalRequest {
let single_block_request = DeprecatedIncomingBlockRetrievalRequest {
req: BlockRetrievalRequestV1::new(block_id, 1),
protocol: ProtocolId::ConsensusRpcBcs,
response_sender: tx1,
Expand All @@ -1391,7 +1399,7 @@ fn response_on_block_retrieval() {

// verify that if a block is not there, return ID_NOT_FOUND
let (tx2, rx2) = oneshot::channel();
let missing_block_request = IncomingBlockRetrievalRequest {
let missing_block_request = DeprecatedIncomingBlockRetrievalRequest {
req: BlockRetrievalRequestV1::new(HashValue::random(), 1),
protocol: ProtocolId::ConsensusRpcBcs,
response_sender: tx2,
Expand All @@ -1415,7 +1423,7 @@ fn response_on_block_retrieval() {

// if asked for many blocks, return NOT_ENOUGH_BLOCKS
let (tx3, rx3) = oneshot::channel();
let many_block_request = IncomingBlockRetrievalRequest {
let many_block_request = DeprecatedIncomingBlockRetrievalRequest {
req: BlockRetrievalRequestV1::new(block_id, 3),
protocol: ProtocolId::ConsensusRpcBcs,
response_sender: tx3,
Expand Down
4 changes: 2 additions & 2 deletions testsuite/generate-format/tests/staged/consensus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ CommitVote:
ConsensusMsg:
ENUM:
0:
BlockRetrievalRequest:
DeprecatedBlockRetrievalRequest:
NEWTYPE:
TYPENAME: BlockRetrievalRequestV1
1:
Expand Down Expand Up @@ -456,7 +456,7 @@ ConsensusMsg:
NEWTYPE:
TYPENAME: RoundTimeoutMsg
20:
BlockRetrievalRequestV2:
BlockRetrievalRequest:
NEWTYPE:
TYPENAME: BlockRetrievalRequest
ContractEvent:
Expand Down

0 comments on commit d3516c9

Please sign in to comment.