Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blockhain rpc limits #605

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod util;

use std::num::{NonZeroU32, NonZeroU64};
use std::sync::Arc;

Expand Down Expand Up @@ -161,6 +163,13 @@ impl<B: BroadcastListener> Service<ServiceRequest> for BlockchainRpcService<B> {
}
};

let method = util::Constructor::from_tl_id(constructor);
let label = [("method", method.map_or("unknown", |x| x.as_str()))];
let _guard = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);

let inner = self.inner.clone();

// NOTE: update `constructor_to_string` after adding new methods
tycho_network::match_tl_request!(body, tag = constructor, {
overlay::Ping as _ => BoxFutureOrNoop::future(async {
Some(Response::from_tl(overlay::Pong))
Expand All @@ -171,65 +180,44 @@ impl<B: BroadcastListener> Service<ServiceRequest> for BlockchainRpcService<B> {
max_size = req.max_size,
"getNextKeyBlockIds",
);

let inner = self.inner.clone();
BoxFutureOrNoop::future(async move {
let res = inner.handle_get_next_key_block_ids(&req);
Some(Response::from_tl(res))
Some(Response::from_tl(inner.handle_get_next_key_block_ids(&req)))
})
},
rpc::GetBlockFull as req => {
tracing::debug!(block_id = %req.block_id, "getBlockFull");

let inner = self.inner.clone();
BoxFutureOrNoop::future(async move {
let res = inner.handle_get_block_full(&req).await;
Some(Response::from_tl(res))
Some(Response::from_tl(inner.handle_get_block_full(&req).await))
})
},
rpc::GetNextBlockFull as req => {
tracing::debug!(prev_block_id = %req.prev_block_id, "getNextBlockFull");

let inner = self.inner.clone();
BoxFutureOrNoop::future(async move {
let res = inner.handle_get_next_block_full(&req).await;
Some(Response::from_tl(res))
Some(Response::from_tl(inner.handle_get_next_block_full(&req).await))
})
},
rpc::GetBlockDataChunk as req => {
tracing::debug!(block_id = %req.block_id, offset = %req.offset, "getBlockDataChunk");

let inner = self.inner.clone();
BoxFutureOrNoop::future(async move {
let res = inner.handle_get_block_data_chunk(&req);
Some(Response::from_tl(res))
Some(Response::from_tl(inner.handle_get_block_data_chunk(&req)))
})
},
rpc::GetKeyBlockProof as req => {
tracing::debug!(block_id = %req.block_id, "getKeyBlockProof");

let inner = self.inner.clone();
BoxFutureOrNoop::future(async move {
let res = inner.handle_get_key_block_proof(&req).await;
Some(Response::from_tl(res))
Some(Response::from_tl(inner.handle_get_key_block_proof(&req).await))
})
},
rpc::GetPersistentShardStateInfo as req => {
tracing::debug!(block_id = %req.block_id, "getPersistentShardStateInfo");

let inner = self.inner.clone();
BoxFutureOrNoop::future(async move {
let res = inner.handle_get_persistent_state_info(&req);
Some(Response::from_tl(res))
Some(Response::from_tl(inner.handle_get_persistent_state_info(&req)))
})
},
rpc::GetPersistentQueueStateInfo as req => {
tracing::debug!(block_id = %req.block_id, "getPersistentQueueStateInfo");

let inner = self.inner.clone();
BoxFutureOrNoop::future(async move {
let res = inner.handle_get_queue_persistent_state_info(&req);
Some(Response::from_tl(res))
Some(Response::from_tl(inner.handle_get_queue_persistent_state_info(&req)))
})
},
rpc::GetPersistentShardStateChunk as req => {
Expand All @@ -238,11 +226,8 @@ impl<B: BroadcastListener> Service<ServiceRequest> for BlockchainRpcService<B> {
offset = %req.offset,
"getPersistentShardStateChunk"
);

let inner = self.inner.clone();
BoxFutureOrNoop::future(async move {
let res = inner.handle_get_persistent_shard_state_chunk(&req).await;
Some(Response::from_tl(res))
Some(Response::from_tl(inner.handle_get_persistent_shard_state_chunk(&req).await))
})
},
rpc::GetPersistentQueueStateChunk as req => {
Expand All @@ -251,20 +236,14 @@ impl<B: BroadcastListener> Service<ServiceRequest> for BlockchainRpcService<B> {
offset = %req.offset,
"getPersistentQueueStateChunk"
);

let inner = self.inner.clone();
BoxFutureOrNoop::future(async move {
let res = inner.handle_get_persistent_queue_state_chunk(&req).await;
Some(Response::from_tl(res))
Some(Response::from_tl(inner.handle_get_persistent_queue_state_chunk(&req).await))
})
},
rpc::GetArchiveInfo as req => {
tracing::debug!(mc_seqno = %req.mc_seqno, "getArchiveInfo");

let inner = self.inner.clone();
BoxFutureOrNoop::future(async move {
let res = inner.handle_get_archive_info(&req).await;
Some(Response::from_tl(res))
Some(Response::from_tl(inner.handle_get_archive_info(&req).await))
})
},
rpc::GetArchiveChunk as req => {
Expand All @@ -273,11 +252,8 @@ impl<B: BroadcastListener> Service<ServiceRequest> for BlockchainRpcService<B> {
offset = %req.offset,
"getArchiveChunk"
);

let inner = self.inner.clone();
BoxFutureOrNoop::future(async move {
let res = inner.handle_get_archive_chunk(&req).await;
Some(Response::from_tl(res))
Some(Response::from_tl(inner.handle_get_archive_chunk(&req).await))
})
},
}, e => {
Expand Down Expand Up @@ -359,9 +335,6 @@ impl<B> Inner<B> {
&self,
req: &rpc::GetNextKeyBlockIds,
) -> overlay::Response<KeyBlockIds> {
let label = [("method", "getNextKeyBlockIds")];
let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);

let block_handle_storage = self.storage().block_handle_storage();

let limit = std::cmp::min(req.max_size as usize, self.config.max_key_blocks_list_len);
Expand Down Expand Up @@ -405,9 +378,6 @@ impl<B> Inner<B> {
}

async fn handle_get_block_full(&self, req: &rpc::GetBlockFull) -> overlay::Response<BlockFull> {
let label = [("method", "getBlockFull")];
let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);

match self.get_block_full(&req.block_id).await {
Ok(block_full) => overlay::Response::Ok(block_full),
Err(e) => {
Expand All @@ -421,9 +391,6 @@ impl<B> Inner<B> {
&self,
req: &rpc::GetNextBlockFull,
) -> overlay::Response<BlockFull> {
let label = [("method", "getNextBlockFull")];
let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);

let block_handle_storage = self.storage().block_handle_storage();
let block_connection_storage = self.storage().block_connection_storage();

Expand All @@ -448,9 +415,6 @@ impl<B> Inner<B> {
}

fn handle_get_block_data_chunk(&self, req: &rpc::GetBlockDataChunk) -> overlay::Response<Data> {
let label = [("method", "getBlockDataChunk")];
let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);

let block_storage = self.storage.block_storage();
match block_storage.get_block_data_chunk(&req.block_id, req.offset) {
Ok(Some(data)) => overlay::Response::Ok(Data {
Expand All @@ -468,9 +432,6 @@ impl<B> Inner<B> {
&self,
req: &rpc::GetKeyBlockProof,
) -> overlay::Response<KeyBlockProof> {
let label = [("method", "getKeyBlockProof")];
let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);

let block_handle_storage = self.storage().block_handle_storage();
let block_storage = self.storage().block_storage();

Expand Down Expand Up @@ -502,9 +463,6 @@ impl<B> Inner<B> {
let mc_seqno = req.mc_seqno;
let node_state = self.storage.node_state();

let label = [("method", "getArchiveInfo")];
let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);

match node_state.load_last_mc_block_id() {
Some(last_applied_mc_block) => {
if mc_seqno > last_applied_mc_block.seqno {
Expand Down Expand Up @@ -540,9 +498,6 @@ impl<B> Inner<B> {
&self,
req: &rpc::GetArchiveChunk,
) -> overlay::Response<Data> {
let label = [("method", "getArchiveChunk")];
let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);

let block_storage = self.storage.block_storage();

let get_archive_chunk = || async {
Expand Down Expand Up @@ -578,8 +533,6 @@ impl<B> Inner<B> {
&self,
req: &rpc::GetPersistentQueueStateInfo,
) -> overlay::Response<PersistentStateInfo> {
let label = [("method", "getQueuePersistentStateInfo")];
let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
let res = self.read_persistent_state_info(&req.block_id, PersistentStateKind::Queue);
overlay::Response::Ok(res)
}
Expand All @@ -588,8 +541,6 @@ impl<B> Inner<B> {
&self,
req: &rpc::GetPersistentShardStateChunk,
) -> overlay::Response<Data> {
let label = [("method", "getPersistentShardStateChunk")];
let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
self.read_persistent_state_chunk(&req.block_id, req.offset, PersistentStateKind::Shard)
.await
}
Expand All @@ -598,8 +549,6 @@ impl<B> Inner<B> {
&self,
req: &rpc::GetPersistentQueueStateChunk,
) -> overlay::Response<Data> {
let label = [("method", "getPersistentQueueStateChunk")];
let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
self.read_persistent_state_chunk(&req.block_id, req.offset, PersistentStateKind::Queue)
.await
}
Expand Down
43 changes: 43 additions & 0 deletions core/src/blockchain_rpc/service/util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use crate::proto::blockchain::*;
use crate::proto::overlay;

macro_rules! constructor_to_string {
($($ty:path as $name:ident),* $(,)?) => {
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Constructor {
$($name),*
}

impl Constructor {
pub fn from_tl_id(id: u32) -> Option<Self> {
match id {
$(<$ty>::TL_ID => Some(Self::$name)),*,
_ => None
}
}

pub fn as_str(&self) -> &'static str {
match self {
$(Self::$name => stringify!($name)),*
}
}

}
};
}

// update list in `def core_blockchain_rpc_per_method_stats() -> RowPanel:` after changing this
constructor_to_string! {
overlay::Ping as Ping,
rpc::GetNextKeyBlockIds as GetNextKeyBlockIds,
rpc::GetBlockFull as GetBlockFull,
rpc::GetNextBlockFull as GetNextBlockFull,
rpc::GetBlockDataChunk as GetBlockDataChunk,
rpc::GetKeyBlockProof as GetKeyBlockProof,
rpc::GetPersistentShardStateInfo as GetPersistentShardStateInfo,
rpc::GetPersistentQueueStateInfo as GetPersistentQueueStateInfo,
rpc::GetPersistentShardStateChunk as GetPersistentShardStateChunk,
rpc::GetPersistentQueueStateChunk as GetPersistentQueueStateChunk,
rpc::GetArchiveInfo as GetArchiveInfo,
rpc::GetArchiveChunk as GetArchiveChunk
}
Loading