Skip to content

Commit

Permalink
[execution-pool] BlockRetrievalRequest struct -> enum (#15812)
Browse files Browse the repository at this point in the history
* [execution-pool] BlockRetrievalRequest deprecation

* [execution-pool] BlockRetrievalRequest consensus.yaml

* [exec-pool] BlockRetrievalRequest address Balaji comments

* [exec-pool] Add a TODO for process_block_retrieval_v2
  • Loading branch information
hariria authored Jan 28, 2025
1 parent cbb92f0 commit 4e1eb2d
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 56 deletions.
67 changes: 63 additions & 4 deletions consensus/consensus-types/src/block_retrieval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,31 @@ pub const NUM_PEERS_PER_RETRY: usize = 3;
pub const RETRY_INTERVAL_MSEC: u64 = 500;
pub const RPC_TIMEOUT_MSEC: u64 = 5000;

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub enum BlockRetrievalRequest {
V1(BlockRetrievalRequestV1),
V2(BlockRetrievalRequestV2),
}

/// RPC to get a chain of block of the given length starting from the given block id.
/// TODO @bchocho @hariria fix comment after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
///
/// NOTE:
/// 1. The [`BlockRetrievalRequest`](BlockRetrievalRequest) struct was renamed to
/// [`BlockRetrievalRequestV1`](BlockRetrievalRequestV1) and deprecated
/// 2. [`BlockRetrievalRequest`](BlockRetrievalRequest) enum was introduced to replace the old
/// [`BlockRetrievalRequest`](BlockRetrievalRequest) struct
///
/// Please use the [`BlockRetrievalRequest`](BlockRetrievalRequest) enum going forward once this enum
/// is introduced in the next release
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct BlockRetrievalRequest {
pub struct BlockRetrievalRequestV1 {
block_id: HashValue,
num_blocks: u64,
target_block_id: Option<HashValue>,
}

impl BlockRetrievalRequest {
impl BlockRetrievalRequestV1 {
pub fn new(block_id: HashValue, num_blocks: u64) -> Self {
Self {
block_id,
Expand Down Expand Up @@ -61,7 +77,7 @@ impl BlockRetrievalRequest {
}
}

impl fmt::Display for BlockRetrievalRequest {
impl fmt::Display for BlockRetrievalRequestV1 {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
Expand All @@ -71,6 +87,48 @@ impl fmt::Display for BlockRetrievalRequest {
}
}

/// RPC to get a chain of block of the given length starting from the given block id.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct BlockRetrievalRequestV2 {
block_id: HashValue,
num_blocks: u64,
target_round: u64,
}

impl BlockRetrievalRequestV2 {
pub fn new(block_id: HashValue, num_blocks: u64, target_round: u64) -> Self {
BlockRetrievalRequestV2 {
block_id,
num_blocks,
target_round,
}
}

pub fn new_with_target_round(block_id: HashValue, num_blocks: u64, target_round: u64) -> Self {
BlockRetrievalRequestV2 {
block_id,
num_blocks,
target_round,
}
}

pub fn block_id(&self) -> HashValue {
self.block_id
}

pub fn num_blocks(&self) -> u64 {
self.num_blocks
}

pub fn target_round(&self) -> u64 {
self.target_round
}

pub fn match_target_round(&self, round: u64) -> bool {
round <= self.target_round()
}
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub enum BlockRetrievalStatus {
// Successfully fill in the request.
Expand Down Expand Up @@ -103,9 +161,10 @@ impl BlockRetrievalResponse {
&self.blocks
}

/// TODO @bchocho @hariria change `retrieval_request` after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
pub fn verify(
&self,
retrieval_request: BlockRetrievalRequest,
retrieval_request: BlockRetrievalRequestV1,
sig_verifier: &ValidatorVerifier,
) -> anyhow::Result<()> {
ensure!(
Expand Down
27 changes: 23 additions & 4 deletions consensus/src/block_storage/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use crate::{
epoch_manager::LivenessStorageData,
logging::{LogEvent, LogSchema},
monitor,
network::{IncomingBlockRetrievalRequest, NetworkSender},
network::{
DeprecatedIncomingBlockRetrievalRequest, IncomingBlockRetrievalRequest, NetworkSender,
},
network_interface::ConsensusMsg,
payload_manager::TPayloadManager,
persistent_liveness_storage::{LedgerRecoveryData, PersistentLivenessStorage, RecoveryData},
Expand All @@ -27,7 +29,7 @@ use anyhow::{anyhow, bail, Context};
use aptos_consensus_types::{
block::Block,
block_retrieval::{
BlockRetrievalRequest, BlockRetrievalResponse, BlockRetrievalStatus, NUM_PEERS_PER_RETRY,
BlockRetrievalRequestV1, BlockRetrievalResponse, BlockRetrievalStatus, NUM_PEERS_PER_RETRY,
NUM_RETRIES, RETRY_INTERVAL_MSEC, RPC_TIMEOUT_MSEC,
},
common::Author,
Expand Down Expand Up @@ -470,7 +472,7 @@ impl BlockStore {
/// future possible changes.
pub async fn process_block_retrieval(
&self,
request: IncomingBlockRetrievalRequest,
request: DeprecatedIncomingBlockRetrievalRequest,
) -> anyhow::Result<()> {
fail_point!("consensus::process_block_retrieval", |_| {
Err(anyhow::anyhow!("Injected error in process_block_retrieval"))
Expand Down Expand Up @@ -505,6 +507,23 @@ impl BlockStore {
.send(Ok(response_bytes.into()))
.map_err(|_| anyhow::anyhow!("Failed to send block retrieval response"))
}

/// TODO @bchocho @hariria to implement in upcoming PR
/// Retrieve a n chained blocks from the block store starting from
/// an initial parent id, returning with <n (as many as possible) if
/// id or its ancestors can not be found.
///
/// The current version of the function is not really async, but keeping it this way for
/// future possible changes.
pub async fn process_block_retrieval_v2(
&self,
request: IncomingBlockRetrievalRequest,
) -> anyhow::Result<()> {
bail!(
"Unexpected request {:?} for process_block_retrieval_v2",
request.req
)
}
}

/// BlockRetriever is used internally to retrieve blocks
Expand Down Expand Up @@ -576,7 +595,7 @@ impl BlockRetriever {
.boxed(),
)
}
let request = BlockRetrievalRequest::new_with_target_block_id(
let request = BlockRetrievalRequestV1::new_with_target_block_id(
block_id,
retrieve_batch_size,
target_block_id,
Expand Down
88 changes: 71 additions & 17 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ use crate::{
metrics_safety_rules::MetricsSafetyRules,
monitor,
network::{
IncomingBatchRetrievalRequest, IncomingBlockRetrievalRequest, IncomingDAGRequest,
IncomingRandGenRequest, IncomingRpcRequest, NetworkReceivers, NetworkSender,
DeprecatedIncomingBlockRetrievalRequest, IncomingBatchRetrievalRequest,
IncomingBlockRetrievalRequest, IncomingDAGRequest, IncomingRandGenRequest,
IncomingRpcRequest, NetworkReceivers, NetworkSender,
},
network_interface::{ConsensusMsg, ConsensusNetworkClient},
payload_client::{
Expand All @@ -59,6 +60,7 @@ use aptos_bounded_executor::BoundedExecutor;
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_config::config::{ConsensusConfig, DagConsensusConfig, ExecutionConfig, NodeConfig};
use aptos_consensus_types::{
block_retrieval::BlockRetrievalRequest,
common::{Author, Round},
epoch_retrieval::EpochRetrievalRequest,
proof_of_store::ProofCache,
Expand Down Expand Up @@ -569,18 +571,50 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
let task = async move {
info!(epoch = epoch, "Block retrieval task starts");
while let Some(request) = request_rx.next().await {
if request.req.num_blocks() > max_blocks_allowed {
warn!(
"Ignore block retrieval with too many blocks: {}",
request.req.num_blocks()
);
continue;
}
if let Err(e) = monitor!(
"process_block_retrieval",
block_store.process_block_retrieval(request).await
) {
warn!(epoch = epoch, error = ?e, kind = error_kind(&e));
match request.req {
// TODO @bchocho @hariria deprecate once BlockRetrievalRequest enum release is complete
BlockRetrievalRequest::V1(v1) => {
if v1.num_blocks() > max_blocks_allowed {
warn!(
"Ignore block retrieval with too many blocks: {}",
v1.num_blocks()
);
continue;
}
if let Err(e) = monitor!(
"process_block_retrieval",
block_store
.process_block_retrieval(DeprecatedIncomingBlockRetrievalRequest {
req: v1,
protocol: request.protocol,
response_sender: request.response_sender,
})
.await
) {
warn!(epoch = epoch, error = ?e, kind = error_kind(&e));
}
},
BlockRetrievalRequest::V2(v2) => {
if v2.num_blocks() > max_blocks_allowed {
warn!(
"Ignore block retrieval with too many blocks: {}",
v2.num_blocks()
);
continue;
}
if let Err(e) = monitor!(
"process_block_retrieval_v2",
block_store
.process_block_retrieval_v2(IncomingBlockRetrievalRequest {
req: BlockRetrievalRequest::V2(v2),
protocol: request.protocol,
response_sender: request.response_sender,
})
.await
) {
warn!(epoch = epoch, error = ?e, kind = error_kind(&e));
}
},
}
}
info!(epoch = epoch, "Block retrieval task stops");
Expand Down Expand Up @@ -1666,6 +1700,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
}
}

/// TODO: @bchocho @hariria can change after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
fn process_rpc_request(
&mut self,
peer_id: Author,
Expand All @@ -1684,15 +1719,26 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
return Ok(());
},
None => {
ensure!(matches!(request, IncomingRpcRequest::BlockRetrieval(_)));
// TODO: @bchocho @hariria can change after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
ensure!(matches!(
request,
IncomingRpcRequest::DeprecatedBlockRetrieval(_)
| IncomingRpcRequest::BlockRetrieval(_)
));
},
_ => {},
}

match request {
IncomingRpcRequest::BlockRetrieval(request) => {
// TODO @bchocho @hariria can remove after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
IncomingRpcRequest::DeprecatedBlockRetrieval(request) => {
if let Some(tx) = &self.block_retrieval_tx {
tx.push(peer_id, request)
let incoming_block_retrieval_request = IncomingBlockRetrievalRequest {
req: BlockRetrievalRequest::V1(request.req),
protocol: request.protocol,
response_sender: request.response_sender,
};
tx.push(peer_id, incoming_block_retrieval_request)
} else {
error!("Round manager not started");
Ok(())
Expand Down Expand Up @@ -1722,6 +1768,14 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
bail!("Rand manager not started");
}
},
IncomingRpcRequest::BlockRetrieval(request) => {
if let Some(tx) = &self.block_retrieval_tx {
tx.push(peer_id, request)
} else {
error!("Round manager not started");
Ok(())
}
},
}
}

Expand Down
Loading

0 comments on commit 4e1eb2d

Please sign in to comment.