Skip to content

Commit

Permalink
[execution-pool] BlockRetrievalRequest deprecation
Browse files Browse the repository at this point in the history
  • Loading branch information
hariria committed Jan 25, 2025
1 parent 3e471a8 commit be5e4c3
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 34 deletions.
69 changes: 65 additions & 4 deletions consensus/consensus-types/src/block_retrieval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,27 @@ pub const NUM_PEERS_PER_RETRY: usize = 3;
pub const RETRY_INTERVAL_MSEC: u64 = 500;
pub const RPC_TIMEOUT_MSEC: u64 = 5000;

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub enum BlockRetrievalRequest {
V1(BlockRetrievalRequestV1),
V2(BlockRetrievalRequestV2),
}

/// RPC to get a chain of block of the given length starting from the given block id.
/// TODO @bchocho @hariria fix comment after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
/// NOTE: The [`BlockRetrievalRequest`](BlockRetrievalRequest) struct is being renamed to
/// [`BlockRetrievalRequestV1`](BlockRetrievalRequestV1) and deprecated in favor of a
/// [`BlockRetrievalRequest`](BlockRetrievalRequest) enum
///
/// Going forward, please use the [`BlockRetrievalRequest`](BlockRetrievalRequest) enum
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct BlockRetrievalRequest {
pub struct BlockRetrievalRequestV1 {
block_id: HashValue,
num_blocks: u64,
target_block_id: Option<HashValue>,
}

impl BlockRetrievalRequest {
impl BlockRetrievalRequestV1 {
pub fn new(block_id: HashValue, num_blocks: u64) -> Self {
Self {
block_id,
Expand Down Expand Up @@ -61,7 +73,7 @@ impl BlockRetrievalRequest {
}
}

impl fmt::Display for BlockRetrievalRequest {
impl fmt::Display for BlockRetrievalRequestV1 {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
Expand All @@ -71,6 +83,55 @@ impl fmt::Display for BlockRetrievalRequest {
}
}

/// RPC to get a chain of block of the given length starting from the given block id.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct BlockRetrievalRequestV2 {
block_id: HashValue,
num_blocks: u64,
// TODO: remove the Option, if it's not too painful
target_epoch_and_round: Option<(u64, u64)>,
}

impl BlockRetrievalRequestV2 {
pub fn new(block_id: HashValue, num_blocks: u64) -> Self {
BlockRetrievalRequestV2 {
block_id,
num_blocks,
target_epoch_and_round: None,
}
}

pub fn new_with_target_round(
block_id: HashValue,
num_blocks: u64,
target_epoch: u64,
target_round: u64,
) -> Self {
BlockRetrievalRequestV2 {
block_id,
num_blocks,
target_epoch_and_round: Some((target_epoch, target_round)),
}
}

pub fn block_id(&self) -> HashValue {
self.block_id
}

pub fn num_blocks(&self) -> u64 {
self.num_blocks
}

pub fn target_epoch_and_round(&self) -> Option<(u64, u64)> {
self.target_epoch_and_round
}

pub fn match_target_round(&self, epoch: u64, round: u64) -> bool {
self.target_epoch_and_round()
.map_or(false, |target| (epoch, round) <= target)
}
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub enum BlockRetrievalStatus {
// Successfully fill in the request.
Expand Down Expand Up @@ -105,7 +166,7 @@ impl BlockRetrievalResponse {

pub fn verify(
&self,
retrieval_request: BlockRetrievalRequest,
retrieval_request: BlockRetrievalRequestV1,
sig_verifier: &ValidatorVerifier,
) -> anyhow::Result<()> {
ensure!(
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/block_storage/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use anyhow::{anyhow, bail, Context};
use aptos_consensus_types::{
block::Block,
block_retrieval::{
BlockRetrievalRequest, BlockRetrievalResponse, BlockRetrievalStatus, NUM_PEERS_PER_RETRY,
BlockRetrievalRequestV1, BlockRetrievalResponse, BlockRetrievalStatus, NUM_PEERS_PER_RETRY,
NUM_RETRIES, RETRY_INTERVAL_MSEC, RPC_TIMEOUT_MSEC,
},
common::Author,
Expand Down Expand Up @@ -576,7 +576,7 @@ impl BlockRetriever {
.boxed(),
)
}
let request = BlockRetrievalRequest::new_with_target_block_id(
let request = BlockRetrievalRequestV1::new_with_target_block_id(
block_id,
retrieve_batch_size,
target_block_id,
Expand Down
40 changes: 36 additions & 4 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ use crate::{
metrics_safety_rules::MetricsSafetyRules,
monitor,
network::{
IncomingBatchRetrievalRequest, IncomingBlockRetrievalRequest, IncomingDAGRequest,
IncomingRandGenRequest, IncomingRpcRequest, NetworkReceivers, NetworkSender,
IncomingBatchRetrievalRequest, IncomingBlockRetrievalRequest,
IncomingBlockRetrievalRequestV2, IncomingDAGRequest, IncomingRandGenRequest,
IncomingRpcRequest, NetworkReceivers, NetworkSender,
},
network_interface::{ConsensusMsg, ConsensusNetworkClient},
payload_client::{
Expand All @@ -59,6 +60,7 @@ use aptos_bounded_executor::BoundedExecutor;
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_config::config::{ConsensusConfig, DagConsensusConfig, ExecutionConfig, NodeConfig};
use aptos_consensus_types::{
block_retrieval::{BlockRetrievalRequest, BlockRetrievalRequestV1},
common::{Author, Round},
epoch_retrieval::EpochRetrievalRequest,
proof_of_store::ProofCache,
Expand Down Expand Up @@ -1666,6 +1668,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
}
}

/// TODO: @bchocho @hariria can change after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
fn process_rpc_request(
&mut self,
peer_id: Author,
Expand All @@ -1684,13 +1687,19 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
return Ok(());
},
None => {
ensure!(matches!(request, IncomingRpcRequest::BlockRetrieval(_)));
// TODO: @bchocho @hariria can change after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
ensure!(matches!(
request,
IncomingRpcRequest::DeprecatedBlockRetrieval(_)
| IncomingRpcRequest::BlockRetrievalV2(_)
));
},
_ => {},
}

match request {
IncomingRpcRequest::BlockRetrieval(request) => {
// TODO @bchocho @hariria can remove after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
IncomingRpcRequest::DeprecatedBlockRetrieval(request) => {
if let Some(tx) = &self.block_retrieval_tx {
tx.push(peer_id, request)
} else {
Expand Down Expand Up @@ -1722,6 +1731,29 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
bail!("Rand manager not started");
}
},
IncomingRpcRequest::BlockRetrievalV2(IncomingBlockRetrievalRequestV2 {
req,
protocol,
response_sender,
}) => {
if let Some(tx) = &self.block_retrieval_tx {
let checked_request: BlockRetrievalRequestV1 = match req {
BlockRetrievalRequest::V1(v1) => v1,
// TODO @bchocho @hariria implement after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
BlockRetrievalRequest::V2(_) => {
unimplemented!("Should not have received a BlockRetrievalRequestV2...")
},
};
tx.push(peer_id, IncomingBlockRetrievalRequest {
req: checked_request,
protocol,
response_sender,
})
} else {
error!("Round manager not started");
Ok(())
}
},
}
}

Expand Down
41 changes: 31 additions & 10 deletions consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use anyhow::{anyhow, bail, ensure};
use aptos_channels::{self, aptos_channel, message_queues::QueueStyle};
use aptos_config::network_id::NetworkId;
use aptos_consensus_types::{
block_retrieval::{BlockRetrievalRequest, BlockRetrievalResponse},
block_retrieval::{BlockRetrievalRequest, BlockRetrievalRequestV1, BlockRetrievalResponse},
common::Author,
order_vote_msg::OrderVoteMsg,
pipeline::{commit_decision::CommitDecision, commit_vote::CommitVote},
Expand Down Expand Up @@ -92,10 +92,24 @@ impl RpcResponder {
}
}

/// NOTE: The [`IncomingBlockRetrievalRequest`](IncomingBlockRetrievalRequest) struct is being
/// deprecated in favor of [`IncomingBlockRetrievalRequestV2`](IncomingBlockRetrievalRequestV2) which
/// supports the new [`BlockRetrievalRequest`](BlockRetrievalRequest) enum for the `req` field
///
/// Going forward, please use [`IncomingBlockRetrievalRequestV2`](IncomingBlockRetrievalRequestV2)
/// For more details, see comments above [`BlockRetrievalRequestV1`](BlockRetrievalRequestV1)
/// TODO @bchocho @hariria can remove after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
#[derive(Debug)]
pub struct IncomingBlockRetrievalRequest {
pub req: BlockRetrievalRequestV1,
pub protocol: ProtocolId,
pub response_sender: oneshot::Sender<Result<Bytes, RpcError>>,
}

/// The block retrieval request is used internally for implementing RPC: the callback is executed
/// for carrying the response
#[derive(Debug)]
pub struct IncomingBlockRetrievalRequest {
pub struct IncomingBlockRetrievalRequestV2 {
pub req: BlockRetrievalRequest,
pub protocol: ProtocolId,
pub response_sender: oneshot::Sender<Result<Bytes, RpcError>>,
Expand Down Expand Up @@ -132,21 +146,26 @@ pub struct IncomingRandGenRequest {

#[derive(Debug)]
pub enum IncomingRpcRequest {
BlockRetrieval(IncomingBlockRetrievalRequest),
/// NOTE: This is being phased out in two releases to accommodate `IncomingBlockRetrievalRequestV2`
/// TODO @bchocho @hariria can remove after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
DeprecatedBlockRetrieval(IncomingBlockRetrievalRequest),
BatchRetrieval(IncomingBatchRetrievalRequest),
DAGRequest(IncomingDAGRequest),
CommitRequest(IncomingCommitRequest),
RandGenRequest(IncomingRandGenRequest),
BlockRetrievalV2(IncomingBlockRetrievalRequestV2),
}

impl IncomingRpcRequest {
/// TODO @bchocho @hariria can remove after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
pub fn epoch(&self) -> Option<u64> {
match self {
IncomingRpcRequest::BatchRetrieval(req) => Some(req.req.epoch()),
IncomingRpcRequest::DAGRequest(req) => Some(req.req.epoch()),
IncomingRpcRequest::RandGenRequest(req) => Some(req.req.epoch()),
IncomingRpcRequest::CommitRequest(req) => req.req.epoch(),
IncomingRpcRequest::BlockRetrieval(_) => None,
IncomingRpcRequest::DeprecatedBlockRetrieval(_) => None,
IncomingRpcRequest::BlockRetrievalV2(_) => None,
}
}
}
Expand Down Expand Up @@ -223,7 +242,7 @@ impl NetworkSender {
/// returns a future that is fulfilled with BlockRetrievalResponse.
pub async fn request_block(
&self,
retrieval_request: BlockRetrievalRequest,
retrieval_request: BlockRetrievalRequestV1,
from: Author,
timeout: Duration,
) -> anyhow::Result<BlockRetrievalResponse> {
Expand Down Expand Up @@ -812,11 +831,13 @@ impl NetworkTask {
"{}",
request
);
IncomingRpcRequest::BlockRetrieval(IncomingBlockRetrievalRequest {
req: *request,
protocol,
response_sender: callback,
})
IncomingRpcRequest::DeprecatedBlockRetrieval(
IncomingBlockRetrievalRequest {
req: *request,
protocol,
response_sender: callback,
},
)
},
ConsensusMsg::BatchRequestMsg(request) => {
debug!(
Expand Down
12 changes: 9 additions & 3 deletions consensus/src/network_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
};
use aptos_config::network_id::{NetworkId, PeerNetworkId};
use aptos_consensus_types::{
block_retrieval::{BlockRetrievalRequest, BlockRetrievalResponse},
block_retrieval::{BlockRetrievalRequest, BlockRetrievalRequestV1, BlockRetrievalResponse},
epoch_retrieval::EpochRetrievalRequest,
order_vote_msg::OrderVoteMsg,
pipeline::{commit_decision::CommitDecision, commit_vote::CommitVote},
Expand All @@ -35,8 +35,11 @@ use std::{collections::HashMap, time::Duration};
/// Network type for consensus
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum ConsensusMsg {
/// DEPRECATED: Please use [`ConsensusMsg::BlockRetrievalRequestV2`](ConsensusMsg::BlockRetrievalRequestV2) going forward
/// RPC to get a chain of block of the given length starting from the given block id.
BlockRetrievalRequest(Box<BlockRetrievalRequest>),
/// Note: Naming here is `BlockRetrievalRequest` and not `DeprecatedBlockRetrievalRequest`
/// to be consistent with the naming implementation in [`ConsensusMsg::name`](ConsensusMsg::name)
BlockRetrievalRequest(Box<BlockRetrievalRequestV1>),
/// Carries the returned blocks and the retrieval status.
BlockRetrievalResponse(Box<BlockRetrievalResponse>),
/// Request to get a EpochChangeProof from current_epoch to target_epoch
Expand Down Expand Up @@ -83,12 +86,14 @@ pub enum ConsensusMsg {
OrderVoteMsg(Box<OrderVoteMsg>),
/// RoundTimeoutMsg is broadcasted by a validator once it decides to timeout the current round.
RoundTimeoutMsg(Box<RoundTimeoutMsg>),
/// RPC to get a chain of block of the given length starting from the given block id, using epoch and round.
BlockRetrievalRequestV2(Box<BlockRetrievalRequest>),
}

/// Network type for consensus
impl ConsensusMsg {
/// ConsensusMsg type in string
///
/// TODO @bchocho @hariria can remove after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
pub fn name(&self) -> &str {
match self {
ConsensusMsg::BlockRetrievalRequest(_) => "BlockRetrievalRequest",
Expand All @@ -111,6 +116,7 @@ impl ConsensusMsg {
ConsensusMsg::RandGenMessage(_) => "RandGenMessage",
ConsensusMsg::BatchResponseV2(_) => "BatchResponseV2",
ConsensusMsg::RoundTimeoutMsg(_) => "RoundTimeoutV2",
ConsensusMsg::BlockRetrievalRequestV2(_) => "BlockRetrievalRequestV2",
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions consensus/src/network_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ mod tests {
};
use aptos_config::network_id::{NetworkId, PeerNetworkId};
use aptos_consensus_types::{
block_retrieval::{BlockRetrievalRequest, BlockRetrievalResponse, BlockRetrievalStatus},
block_retrieval::{BlockRetrievalRequestV1, BlockRetrievalResponse, BlockRetrievalStatus},
common::Payload,
};
use aptos_crypto::HashValue;
Expand Down Expand Up @@ -824,8 +824,9 @@ mod tests {
BlockRetrievalResponse::new(BlockRetrievalStatus::IdNotFound, vec![]);
let response = ConsensusMsg::BlockRetrievalResponse(Box::new(response));
let bytes = Bytes::from(serde_json::to_vec(&response).unwrap());
// TODO: @bchocho @hariria can change after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
match request {
IncomingRpcRequest::BlockRetrieval(request) => {
IncomingRpcRequest::DeprecatedBlockRetrieval(request) => {
request.response_sender.send(Ok(bytes)).unwrap()
},
_ => panic!("unexpected message"),
Expand All @@ -837,7 +838,7 @@ mod tests {
timed_block_on(&runtime, async {
let response = nodes[0]
.request_block(
BlockRetrievalRequest::new(HashValue::zero(), 1),
BlockRetrievalRequestV1::new(HashValue::zero(), 1),
peer,
Duration::from_secs(5),
)
Expand Down Expand Up @@ -880,7 +881,7 @@ mod tests {
.unwrap();

let liveness_check_msg = ConsensusMsg::BlockRetrievalRequest(Box::new(
BlockRetrievalRequest::new(HashValue::random(), 1),
BlockRetrievalRequestV1::new(HashValue::random(), 1),
));

let protocol_id = ProtocolId::ConsensusRpcJson;
Expand Down
Loading

0 comments on commit be5e4c3

Please sign in to comment.