diff --git a/core/src/blockchain_rpc/service.rs b/core/src/blockchain_rpc/service/mod.rs similarity index 84% rename from core/src/blockchain_rpc/service.rs rename to core/src/blockchain_rpc/service/mod.rs index 4b1d8e2df..5a4da0a2e 100644 --- a/core/src/blockchain_rpc/service.rs +++ b/core/src/blockchain_rpc/service/mod.rs @@ -1,3 +1,5 @@ +mod util; + use std::num::{NonZeroU32, NonZeroU64}; use std::sync::Arc; @@ -5,6 +7,7 @@ use anyhow::Context; use bytes::{Buf, Bytes}; use everscale_types::models::BlockId; use futures_util::Future; +use metrics::Label; use serde::{Deserialize, Serialize}; use tycho_network::{try_handle_prefix, InboundRequestMeta, Response, Service, ServiceRequest}; use tycho_storage::{ArchiveId, BlockConnection, KeyBlocksDirection, PersistentStateKind, Storage}; @@ -161,7 +164,19 @@ impl Service for BlockchainRpcService { } }; - tycho_network::match_tl_request!(body, tag = constructor, { + let method = util::Constructor::from_tl_id(constructor); + let label = vec![Label::new( + "method", + method.map_or("unknown", |m| m.as_str()), + )]; + let timer = + move || HistogramGuard::begin_with_labels_owned(RPC_METHOD_TIMINGS_METRIC, label); + + let inner = self.inner.clone(); + + // NOTE: update `constructor_to_string` after adding new methods + tycho_network::match_tl_request!(body, tag = constructor, + { overlay::Ping as _ => BoxFutureOrNoop::future(async { Some(Response::from_tl(overlay::Pong)) }), @@ -171,65 +186,44 @@ impl Service for BlockchainRpcService { max_size = req.max_size, "getNextKeyBlockIds", ); - - let inner = self.inner.clone(); - BoxFutureOrNoop::future(async move { - let res = inner.handle_get_next_key_block_ids(&req); - Some(Response::from_tl(res)) + BoxFutureOrNoop::with_pre_action(timer, async move { + Some(Response::from_tl(inner.handle_get_next_key_block_ids(&req))) }) }, rpc::GetBlockFull as req => { tracing::debug!(block_id = %req.block_id, "getBlockFull"); - - let inner = self.inner.clone(); - BoxFutureOrNoop::future(async move { - let res = inner.handle_get_block_full(&req).await; - Some(Response::from_tl(res)) + BoxFutureOrNoop::with_pre_action(timer,async move { + Some(Response::from_tl(inner.handle_get_block_full(&req).await)) }) }, rpc::GetNextBlockFull as req => { tracing::debug!(prev_block_id = %req.prev_block_id, "getNextBlockFull"); - - let inner = self.inner.clone(); - BoxFutureOrNoop::future(async move { - let res = inner.handle_get_next_block_full(&req).await; - Some(Response::from_tl(res)) + BoxFutureOrNoop::with_pre_action(timer,async move { + Some(Response::from_tl(inner.handle_get_next_block_full(&req).await)) }) }, rpc::GetBlockDataChunk as req => { tracing::debug!(block_id = %req.block_id, offset = %req.offset, "getBlockDataChunk"); - - let inner = self.inner.clone(); - BoxFutureOrNoop::future(async move { - let res = inner.handle_get_block_data_chunk(&req); - Some(Response::from_tl(res)) + BoxFutureOrNoop::with_pre_action(timer,async move { + Some(Response::from_tl(inner.handle_get_block_data_chunk(&req))) }) }, rpc::GetKeyBlockProof as req => { tracing::debug!(block_id = %req.block_id, "getKeyBlockProof"); - - let inner = self.inner.clone(); - BoxFutureOrNoop::future(async move { - let res = inner.handle_get_key_block_proof(&req).await; - Some(Response::from_tl(res)) + BoxFutureOrNoop::with_pre_action(timer,async move { + Some(Response::from_tl(inner.handle_get_key_block_proof(&req).await)) }) }, rpc::GetPersistentShardStateInfo as req => { tracing::debug!(block_id = %req.block_id, "getPersistentShardStateInfo"); - - let inner = self.inner.clone(); - BoxFutureOrNoop::future(async move { - let res = inner.handle_get_persistent_state_info(&req); - Some(Response::from_tl(res)) + BoxFutureOrNoop::with_pre_action(timer,async move { + Some(Response::from_tl(inner.handle_get_persistent_state_info(&req))) }) }, rpc::GetPersistentQueueStateInfo as req => { tracing::debug!(block_id = %req.block_id, "getPersistentQueueStateInfo"); - - let inner = self.inner.clone(); - BoxFutureOrNoop::future(async move { - let res = inner.handle_get_queue_persistent_state_info(&req); - Some(Response::from_tl(res)) + BoxFutureOrNoop::with_pre_action(timer,async move { + Some(Response::from_tl(inner.handle_get_queue_persistent_state_info(&req))) }) }, rpc::GetPersistentShardStateChunk as req => { @@ -238,11 +232,8 @@ impl Service for BlockchainRpcService { offset = %req.offset, "getPersistentShardStateChunk" ); - - let inner = self.inner.clone(); - BoxFutureOrNoop::future(async move { - let res = inner.handle_get_persistent_shard_state_chunk(&req).await; - Some(Response::from_tl(res)) + BoxFutureOrNoop::with_pre_action(timer,async move { + Some(Response::from_tl(inner.handle_get_persistent_shard_state_chunk(&req).await)) }) }, rpc::GetPersistentQueueStateChunk as req => { @@ -251,20 +242,14 @@ impl Service for BlockchainRpcService { offset = %req.offset, "getPersistentQueueStateChunk" ); - - let inner = self.inner.clone(); - BoxFutureOrNoop::future(async move { - let res = inner.handle_get_persistent_queue_state_chunk(&req).await; - Some(Response::from_tl(res)) + BoxFutureOrNoop::with_pre_action(timer,async move { + Some(Response::from_tl(inner.handle_get_persistent_queue_state_chunk(&req).await)) }) }, rpc::GetArchiveInfo as req => { tracing::debug!(mc_seqno = %req.mc_seqno, "getArchiveInfo"); - - let inner = self.inner.clone(); - BoxFutureOrNoop::future(async move { - let res = inner.handle_get_archive_info(&req).await; - Some(Response::from_tl(res)) + BoxFutureOrNoop::with_pre_action(timer,async move { + Some(Response::from_tl(inner.handle_get_archive_info(&req).await)) }) }, rpc::GetArchiveChunk as req => { @@ -273,11 +258,8 @@ impl Service for BlockchainRpcService { offset = %req.offset, "getArchiveChunk" ); - - let inner = self.inner.clone(); - BoxFutureOrNoop::future(async move { - let res = inner.handle_get_archive_chunk(&req).await; - Some(Response::from_tl(res)) + BoxFutureOrNoop::with_pre_action(timer,async move { + Some(Response::from_tl(inner.handle_get_archive_chunk(&req).await)) }) }, }, e => { @@ -359,9 +341,6 @@ impl Inner { &self, req: &rpc::GetNextKeyBlockIds, ) -> overlay::Response { - let label = [("method", "getNextKeyBlockIds")]; - let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label); - let block_handle_storage = self.storage().block_handle_storage(); let limit = std::cmp::min(req.max_size as usize, self.config.max_key_blocks_list_len); @@ -405,9 +384,6 @@ impl Inner { } async fn handle_get_block_full(&self, req: &rpc::GetBlockFull) -> overlay::Response { - let label = [("method", "getBlockFull")]; - let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label); - match self.get_block_full(&req.block_id).await { Ok(block_full) => overlay::Response::Ok(block_full), Err(e) => { @@ -421,9 +397,6 @@ impl Inner { &self, req: &rpc::GetNextBlockFull, ) -> overlay::Response { - let label = [("method", "getNextBlockFull")]; - let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label); - let block_handle_storage = self.storage().block_handle_storage(); let block_connection_storage = self.storage().block_connection_storage(); @@ -448,9 +421,6 @@ impl Inner { } fn handle_get_block_data_chunk(&self, req: &rpc::GetBlockDataChunk) -> overlay::Response { - let label = [("method", "getBlockDataChunk")]; - let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label); - let block_storage = self.storage.block_storage(); match block_storage.get_block_data_chunk(&req.block_id, req.offset) { Ok(Some(data)) => overlay::Response::Ok(Data { @@ -468,9 +438,6 @@ impl Inner { &self, req: &rpc::GetKeyBlockProof, ) -> overlay::Response { - let label = [("method", "getKeyBlockProof")]; - let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label); - let block_handle_storage = self.storage().block_handle_storage(); let block_storage = self.storage().block_storage(); @@ -502,9 +469,6 @@ impl Inner { let mc_seqno = req.mc_seqno; let node_state = self.storage.node_state(); - let label = [("method", "getArchiveInfo")]; - let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label); - match node_state.load_last_mc_block_id() { Some(last_applied_mc_block) => { if mc_seqno > last_applied_mc_block.seqno { @@ -540,9 +504,6 @@ impl Inner { &self, req: &rpc::GetArchiveChunk, ) -> overlay::Response { - let label = [("method", "getArchiveChunk")]; - let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label); - let block_storage = self.storage.block_storage(); let get_archive_chunk = || async { @@ -578,8 +539,6 @@ impl Inner { &self, req: &rpc::GetPersistentQueueStateInfo, ) -> overlay::Response { - let label = [("method", "getQueuePersistentStateInfo")]; - let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label); let res = self.read_persistent_state_info(&req.block_id, PersistentStateKind::Queue); overlay::Response::Ok(res) } @@ -588,8 +547,6 @@ impl Inner { &self, req: &rpc::GetPersistentShardStateChunk, ) -> overlay::Response { - let label = [("method", "getPersistentShardStateChunk")]; - let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label); self.read_persistent_state_chunk(&req.block_id, req.offset, PersistentStateKind::Shard) .await } @@ -598,8 +555,6 @@ impl Inner { &self, req: &rpc::GetPersistentQueueStateChunk, ) -> overlay::Response { - let label = [("method", "getPersistentQueueStateChunk")]; - let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label); self.read_persistent_state_chunk(&req.block_id, req.offset, PersistentStateKind::Queue) .await } diff --git a/core/src/blockchain_rpc/service/util.rs b/core/src/blockchain_rpc/service/util.rs new file mode 100644 index 000000000..738624546 --- /dev/null +++ b/core/src/blockchain_rpc/service/util.rs @@ -0,0 +1,43 @@ +use crate::proto::blockchain::*; +use crate::proto::overlay; + +macro_rules! constructor_to_string { + ($($ty:path as $name:ident),* $(,)?) => { + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + pub enum Constructor { + $($name),* + } + + impl Constructor { + pub fn from_tl_id(id: u32) -> Option { + match id { + $(<$ty>::TL_ID => Some(Self::$name)),*, + _ => None + } + } + + pub fn as_str(&self) -> &'static str { + match self { + $(Self::$name => stringify!($name)),* + } + } + + } + }; +} + +// update list in `def core_blockchain_rpc_per_method_stats() -> RowPanel:` after changing this +constructor_to_string! { + overlay::Ping as Ping, + rpc::GetNextKeyBlockIds as GetNextKeyBlockIds, + rpc::GetBlockFull as GetBlockFull, + rpc::GetNextBlockFull as GetNextBlockFull, + rpc::GetBlockDataChunk as GetBlockDataChunk, + rpc::GetKeyBlockProof as GetKeyBlockProof, + rpc::GetPersistentShardStateInfo as GetPersistentShardStateInfo, + rpc::GetPersistentQueueStateInfo as GetPersistentQueueStateInfo, + rpc::GetPersistentShardStateChunk as GetPersistentShardStateChunk, + rpc::GetPersistentQueueStateChunk as GetPersistentQueueStateChunk, + rpc::GetArchiveInfo as GetArchiveInfo, + rpc::GetArchiveChunk as GetArchiveChunk +} diff --git a/scripts/gen-dashboard.py b/scripts/gen-dashboard.py index a8695d3b1..da877892e 100644 --- a/scripts/gen-dashboard.py +++ b/scripts/gen-dashboard.py @@ -519,18 +519,7 @@ def net_traffic() -> RowPanel: return create_row("network: Traffic", metrics) -def core_blockchain_rpc() -> RowPanel: - methods = [ - "getNextKeyBlockIds", - "getBlockFull", - "getBlockDataChunk", - "getNextBlockFull", - "getKeyBlockProof", - "getArchiveInfo", - "getArchiveChunk", - "getPersistentStateInfo", - "getPersistentStatePart", - ] +def core_blockchain_rpc_general() -> RowPanel: metrics = [ create_gauge_panel( "tycho_core_overlay_client_validators_to_resolve", @@ -556,7 +545,36 @@ def core_blockchain_rpc() -> RowPanel: legend_format="{{instance}} - {{kind}}", ), ] - metrics += [ + return create_row("blockchain: RPC - General Stats", metrics) + + +def core_blockchain_rpc_per_method_stats() -> RowPanel: + methods = [ + "Ping", + "GetNextKeyBlockIds", + "GetBlockFull", + "GetNextBlockFull", + "GetBlockDataChunk", + "GetKeyBlockProof", + "GetPersistentShardStateInfo", + "GetPersistentQueueStateInfo", + "GetPersistentShardStateChunk", + "GetPersistentQueueStateChunk", + "GetArchiveInfo", + "GetArchiveChunk", + ] + + counter_panels = [ + create_counter_panel( + expr="tycho_blockchain_rpc_method_time_count", + title=f"Blockchain RPC {method} calls/s", + labels_selectors=[f'method="{method}"'], + legend_format="{{instance}}", + ) + for method in methods + ] + + heatmap_panels = [ create_heatmap_panel( "tycho_blockchain_rpc_method_time", f"Blockchain RPC {method} time", @@ -564,7 +582,8 @@ def core_blockchain_rpc() -> RowPanel: ) for method in methods ] - return create_row("blockchain: RPC", metrics) + + return create_row("blockchain: RPC - Method Stats", counter_panels + heatmap_panels) def net_conn_manager() -> RowPanel: @@ -881,9 +900,7 @@ def storage() -> RowPanel: create_gauge_panel( "tycho_core_mc_blocks_gc_lag", "Blocks GC lag", unit_format="Blocks" ), - create_gauge_panel( - "tycho_core_blocks_gc_tail_len", "GC diffs tail len" - ), + create_gauge_panel("tycho_core_blocks_gc_tail_len", "GC diffs tail len"), create_heatmap_panel( "tycho_storage_move_into_archive_time", "Time to move into archive" ), @@ -1277,7 +1294,7 @@ def collator_message_metrics() -> RowPanel: "tycho_do_collate_msgs_read_count_ext_by_partitions", "Read Ext msgs count (by partitions)", labels_selectors=['workchain=~"$workchain"', 'par_id=~"$partition"'], - by_labels=['instance', 'par_id'], + by_labels=["instance", "par_id"], ), create_counter_panel( "tycho_do_collate_msgs_exec_count_ext", @@ -1307,7 +1324,7 @@ def collator_message_metrics() -> RowPanel: "tycho_do_collate_msgs_read_count_int_by_partitions", "Read Int msgs count (by partitions)", labels_selectors=['workchain=~"$workchain"', 'par_id=~"$partition"'], - by_labels=['instance', 'par_id'], + by_labels=["instance", "par_id"], ), create_counter_panel( "tycho_do_collate_msgs_exec_count_int", @@ -1337,7 +1354,7 @@ def collator_message_metrics() -> RowPanel: "tycho_do_collate_msgs_read_count_new_int_by_partitions", "Read NewInt msgs count (by partitions)", labels_selectors=['workchain=~"$workchain"', 'par_id=~"$partition"'], - by_labels=['instance', 'par_id'], + by_labels=["instance", "par_id"], ), create_counter_panel( "tycho_do_collate_msgs_exec_count_new_int", @@ -1373,22 +1390,28 @@ def collator_queue_metrics() -> RowPanel: "tycho_internal_queue_gc_execute_task_time", "GC execute time" ), create_gauge_panel( - "tycho_internal_queue_gc_state_size", "Total GC state size", + "tycho_internal_queue_gc_state_size", + "Total GC state size", ), create_heatmap_panel( - "tycho_internal_queue_commited_state_iterator_create_time", "Commited iterator init time" + "tycho_internal_queue_commited_state_iterator_create_time", + "Commited iterator init time", ), create_heatmap_panel( - "tycho_internal_queue_uncommited_state_iterator_create_time", "Uncommitted iterator init time" + "tycho_internal_queue_uncommited_state_iterator_create_time", + "Uncommitted iterator init time", ), create_heatmap_panel( - "tycho_internal_queue_uncommitted_statistics_load_time", "Uncommited statistics load time" + "tycho_internal_queue_uncommitted_statistics_load_time", + "Uncommited statistics load time", ), create_heatmap_panel( - "tycho_internal_queue_committed_statistics_load_time", "Committed statistics load time" + "tycho_internal_queue_committed_statistics_load_time", + "Committed statistics load time", ), create_heatmap_panel( - "tycho_internal_queue_apply_diff_add_statistics_time", "Apply statistics time" + "tycho_internal_queue_apply_diff_add_statistics_time", + "Apply statistics time", ), create_heatmap_panel( "tycho_internal_queue_apply_diff_add_messages_time", "Apply messages time" @@ -1397,16 +1420,15 @@ def collator_queue_metrics() -> RowPanel: "tycho_internal_queue_apply_diff_add_statistics_accounts_count", "Statistics accounts count", legend_format=legend_format_partition, - by_labels=["instance", "partition"] - ), - create_heatmap_panel( - "tycho_internal_queue_snapshot_time", "Snapshot time" + by_labels=["instance", "partition"], ), + create_heatmap_panel("tycho_internal_queue_snapshot_time", "Snapshot time"), create_heatmap_panel( "tycho_internal_queue_create_iterator_time", "Create iterator time" ), create_heatmap_panel( - "tycho_internal_queue_add_messages_with_statistics_write_time", "Write uncommited data time" + "tycho_internal_queue_add_messages_with_statistics_write_time", + "Write uncommited data time", ), create_counter_panel( "tycho_collator_queue_adapter_iterators_count", "Iterators count" @@ -2351,7 +2373,8 @@ def templates() -> Templating: blockchain_stats(), core_bc(), core_block_strider(), - core_blockchain_rpc(), + core_blockchain_rpc_general(), + core_blockchain_rpc_per_method_stats(), storage(), collator_params_metrics(), collation_metrics(), diff --git a/util/src/futures/box_future_or_noop.rs b/util/src/futures/box_future_or_noop.rs index 66fd9c5fa..f859f7524 100644 --- a/util/src/futures/box_future_or_noop.rs +++ b/util/src/futures/box_future_or_noop.rs @@ -20,6 +20,20 @@ impl BoxFutureOrNoop { Err(f) => BoxFutureOrNoop::Boxed(f.boxed()), } } + + #[inline] + pub fn with_pre_action(t: Timer, f: F) -> Self + where + F: Future + Send + 'static, + Timer: FnOnce() -> TimerRet + Send + 'static, + TimerRet: Drop + Send + 'static, + { + let future = async move { + let _timer = t(); + f.await + }; + Self::future(future) + } } impl Future for BoxFutureOrNoop<()> { diff --git a/util/src/metrics/histogram_guard.rs b/util/src/metrics/histogram_guard.rs index 135dde75e..b4e8c8990 100644 --- a/util/src/metrics/histogram_guard.rs +++ b/util/src/metrics/histogram_guard.rs @@ -24,6 +24,16 @@ impl HistogramGuard { HistogramGuardWithLabels::begin(name, labels) } + pub fn begin_with_labels_owned( + name: &'static str, + labels: T, + ) -> HistogramGuardWithLabelsOwned + where + T: metrics::IntoLabels, + { + HistogramGuardWithLabelsOwned::begin(name, labels) + } + pub fn finish(mut self) -> Duration { let duration = self.started_at.elapsed(); if let Some(name) = self.name.take() { @@ -82,3 +92,47 @@ where } } } + +#[must_use = "The guard is used to update the histogram when it is dropped"] +pub struct HistogramGuardWithLabelsOwned +where + T: metrics::IntoLabels, +{ + name: Option<&'static str>, + started_at: Instant, + labels: Option, +} + +impl HistogramGuardWithLabelsOwned +where + T: metrics::IntoLabels, +{ + pub fn begin(name: &'static str, labels: T) -> Self { + Self { + name: Some(name), + started_at: Instant::now(), + labels: Some(labels), + } + } + + pub fn finish(mut self) -> Duration { + let duration = self.started_at.elapsed(); + if let Some(name) = self.name.take() { + let labels = self.labels.take().unwrap(); + metrics::histogram!(name, labels).record(duration); + } + duration + } +} + +impl Drop for HistogramGuardWithLabelsOwned +where + T: metrics::IntoLabels, +{ + fn drop(&mut self) { + if let Some(name) = self.name.take() { + let labels = self.labels.take().unwrap(); + metrics::histogram!(name, labels).record(self.started_at.elapsed()); + } + } +}