Skip to content

Commit

Permalink
feat(core): add metrics subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed Jun 10, 2024
1 parent 4252648 commit 52f4410
Show file tree
Hide file tree
Showing 15 changed files with 217 additions and 273 deletions.
8 changes: 5 additions & 3 deletions cli/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use tycho_collator::validator::config::ValidatorConfig;
use tycho_collator::validator::validator::ValidatorStdImplFactory;
use tycho_core::block_strider::{
BlockProvider, BlockStrider, BlockchainBlockProvider, BlockchainBlockProviderConfig,
OptionalBlockStuff, PersistentBlockStriderState, StateSubscriber, StateSubscriberContext,
StateSubscriberExt, StorageBlockProvider,
MetricsSubscriber, OptionalBlockStuff, PersistentBlockStriderState, StateSubscriber,
StateSubscriberContext, StateSubscriberExt, StorageBlockProvider,
};
use tycho_core::blockchain_rpc::{
BlockchainRpcClient, BlockchainRpcService, BlockchainRpcServiceConfig, BroadcastListener,
Expand Down Expand Up @@ -666,7 +666,9 @@ impl Node {
.with_state_subscriber(
self.state_tracker.clone(),
self.storage.clone(),
collator_state_subscriber.chain(rpc_state),
collator_state_subscriber
.chain(rpc_state)
.chain(MetricsSubscriber),
)
.build();

Expand Down
5 changes: 0 additions & 5 deletions collator/src/collator/do_collate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,11 +478,6 @@ impl CollatorStdImpl {
// metrics
let labels = [("workchain", self.shard_id.workchain().to_string())];

metrics::counter!("tycho_do_collate_msgs_exec_count_all_total", &labels)
.increment(collation_data.execute_count_all as _);
metrics::counter!("tycho_do_collate_msgs_exec_count_ext_total", &labels)
.increment(collation_data.execute_count_ext as _);

metrics::counter!("tycho_do_collate_tx_total", &labels)
.increment(collation_data.tx_count as _);

Expand Down
14 changes: 7 additions & 7 deletions core/src/block_strider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ pub use self::state_applier::ShardStateApplier;
#[cfg(any(test, feature = "test"))]
pub use self::subscriber::test::PrintSubscriber;
pub use self::subscriber::{
BlockSubscriber, BlockSubscriberContext, BlockSubscriberExt, ChainSubscriber, NoopSubscriber,
StateSubscriber, StateSubscriberContext, StateSubscriberExt,
BlockSubscriber, BlockSubscriberContext, BlockSubscriberExt, ChainSubscriber,
MetricsSubscriber, NoopSubscriber, StateSubscriber, StateSubscriberContext, StateSubscriberExt,
};

mod provider;
Expand Down Expand Up @@ -126,7 +126,7 @@ where
while let Some(next) = self.fetch_next_master_block().await {
let started_at = Instant::now();
self.process_mc_block(next.data, next.archive_data).await?;
metrics::histogram!("tycho_process_mc_block_time").record(started_at.elapsed());
metrics::histogram!("tycho_core_process_mc_block_time").record(started_at.elapsed());
}

tracing::info!("block strider loop finished");
Expand Down Expand Up @@ -154,11 +154,11 @@ where
while let Some(blocks) = download_futures.next().await.transpose()? {
process_futures.push(Box::pin(self.process_shard_blocks(&mc_block_id, blocks)));
}
metrics::histogram!("tycho_download_shard_blocks_time").record(started_at.elapsed());
metrics::histogram!("tycho_core_download_sc_blocks_time").record(started_at.elapsed());

// Wait for all shard blocks to be processed
while process_futures.next().await.transpose()?.is_some() {}
metrics::histogram!("tycho_process_shard_blocks_time").record(started_at.elapsed());
metrics::histogram!("tycho_core_process_sc_blocks_time").record(started_at.elapsed());

// Process masterchain block
let cx = BlockSubscriberContext {
Expand Down Expand Up @@ -189,7 +189,7 @@ where
tracing::debug!(block_id = %top_block_id, "fetched shard block");
debug_assert_eq!(block.id(), &top_block_id);

metrics::histogram!("tycho_fetch_shard_block_time").record(started_at.elapsed());
metrics::histogram!("tycho_core_fetch_sc_block_time").record(started_at.elapsed());

// Parse info in advance to make borrow checker happy
let info = block.data.load_info()?;
Expand Down Expand Up @@ -234,7 +234,7 @@ where

let started_at = Instant::now();
self.subscriber.handle_block(&cx).await?;
metrics::histogram!("tycho_process_shard_block_time").record(started_at.elapsed());
metrics::histogram!("tycho_core_process_sc_block_time").record(started_at.elapsed());

self.state.commit_shard(&block_id);
}
Expand Down
16 changes: 8 additions & 8 deletions core/src/block_strider/state_applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,22 +100,22 @@ where
prev_root_cell,
)
.await?;
metrics::histogram!("tycho_apply_block_time").record(started_at.elapsed());
metrics::histogram!("tycho_core_apply_block_time").record(started_at.elapsed());

// Update metrics
let gen_utime = handle.meta().gen_utime() as f64;
let seqno = handle.id().seqno as f64;
let now = tycho_util::time::now_millis() as f64 / 1000.0;

if cx.block.id().is_masterchain() {
metrics::gauge!("tycho_last_mc_block_utime").set(gen_utime);
metrics::gauge!("tycho_last_mc_block_seqno").set(seqno);
metrics::gauge!("tycho_last_mc_block_applied").set(now);
metrics::gauge!("tycho_core_last_mc_block_utime").set(gen_utime);
metrics::gauge!("tycho_core_last_mc_block_seqno").set(seqno);
metrics::gauge!("tycho_core_last_mc_block_applied").set(now);
} else {
// TODO: only store max
metrics::gauge!("tycho_last_shard_block_utime").set(gen_utime);
metrics::gauge!("tycho_last_shard_block_seqno").set(seqno);
metrics::gauge!("tycho_last_shard_block_applied").set(now);
metrics::gauge!("tycho_core_last_sc_block_utime").set(gen_utime);
metrics::gauge!("tycho_core_last_sc_block_seqno").set(seqno);
metrics::gauge!("tycho_core_last_sc_block_applied").set(now);
}

// Process state
Expand All @@ -127,7 +127,7 @@ where
state,
};
self.inner.state_subscriber.handle_state(&cx).await?;
metrics::histogram!("tycho_subscriber_handle_block_time").record(started_at.elapsed());
metrics::histogram!("tycho_core_subscriber_handle_block_time").record(started_at.elapsed());

// Mark block as applied
handle_storage.store_block_applied(&handle);
Expand Down
133 changes: 133 additions & 0 deletions core/src/block_strider/subscriber/metrics_subscriber.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use anyhow::Result;
use everscale_types::models::{AccountStatus, ComputePhase, InMsg, MsgInfo, OutMsg, TxInfo};
use tycho_block_util::block::BlockStuff;

use crate::block_strider::{
BlockSubscriber, BlockSubscriberContext, StateSubscriber, StateSubscriberContext,
};

#[derive(Debug, Clone, Copy)]
pub struct MetricsSubscriber;

impl BlockSubscriber for MetricsSubscriber {
type HandleBlockFut<'a> = futures_util::future::Ready<Result<()>>;

fn handle_block(&self, cx: &BlockSubscriberContext) -> Self::HandleBlockFut<'_> {
if let Err(e) = handle_block(&cx.block) {
tracing::error!("failed to handle block: {e:?}");
}
futures_util::future::ready(Ok(()))
}
}

impl StateSubscriber for MetricsSubscriber {
type HandleStateFut<'a> = futures_util::future::Ready<Result<()>>;

fn handle_state(&self, cx: &StateSubscriberContext) -> Self::HandleStateFut<'_> {
if let Err(e) = handle_block(&cx.block) {
tracing::error!("failed to handle block: {e:?}");
}
futures_util::future::ready(Ok(()))
}
}

fn handle_block(block: &BlockStuff) -> Result<()> {
let block_id = block.id();
let info = block.as_ref().load_info()?;
let extra = block.as_ref().load_extra()?;

let mut in_msg_count: u32 = 0;
for descr in extra.in_msg_description.load()?.iter() {
let (_, _, in_msg) = descr?;
in_msg_count += matches!(
in_msg,
InMsg::External(_) | InMsg::Immediate(_) | InMsg::Final(_)
) as u32;
}

let mut out_msgs_count: u32 = 0;
for descr in extra.out_msg_description.load()?.iter() {
let (_, _, out_msg) = descr?;
out_msgs_count += matches!(out_msg, OutMsg::New(_) | OutMsg::Immediate(_)) as u32;
}

let mut transaction_count = 0u32;
let mut message_count = 0u32;
let mut ext_message_count = 0u32;
let mut account_blocks_count = 0u32;
let mut contract_deployments = 0u32;
let mut contract_destructions = 0u32;
let mut total_gas_used = 0;

let account_blocks = extra.account_blocks.load()?;
for entry in account_blocks.iter() {
let (_, _, account_block) = entry?;
account_blocks_count += 1;

for entry in account_block.transactions.iter() {
let (_, _, tx) = entry?;
let tx = tx.load()?;

transaction_count += 1;
message_count += tx.in_msg.is_some() as u32 + tx.out_msg_count.into_inner() as u32;

if let Some(in_msg) = tx.load_in_msg()? {
ext_message_count += matches!(&in_msg.info, MsgInfo::ExtIn(_)) as u32;
}

let was_active = tx.orig_status == AccountStatus::Active;
let is_active = tx.end_status == AccountStatus::Active;

contract_deployments += (!was_active && is_active) as u32;
contract_destructions += (was_active && !is_active) as u32;

total_gas_used += 'gas: {
match tx.load_info()? {
TxInfo::Ordinary(info) => {
if let ComputePhase::Executed(phase) = &info.compute_phase {
break 'gas phase.gas_used.into_inner();
}
}
TxInfo::TickTock(info) => {
if let ComputePhase::Executed(phase) = &info.compute_phase {
break 'gas phase.gas_used.into_inner();
}
}
};

0
};
}
}

let out_in_message_ratio = if in_msg_count > 0 {
out_msgs_count as f64 / in_msg_count as f64
} else {
0.0
};
let out_message_account_ratio = if account_blocks_count > 0 {
out_msgs_count as f64 / account_blocks_count as f64
} else {
0.0
};

let labels = &[("workchain", block_id.shard.workchain().to_string())];
metrics::histogram!("tycho_bc_software_version", labels).record(info.gen_software.version);
metrics::histogram!("tycho_bc_in_msg_count", labels).record(in_msg_count);
metrics::histogram!("tycho_bc_out_msg_count", labels).record(out_msgs_count);

metrics::counter!("tycho_bc_txs_total", labels).increment(transaction_count as _);
metrics::counter!("tycho_bc_msgs_total", labels).increment(message_count as _);
metrics::counter!("tycho_bc_ext_msgs_total", labels).increment(ext_message_count as _);

metrics::counter!("tycho_bc_contract_deploy_total", labels)
.increment(contract_deployments as _);
metrics::counter!("tycho_bc_contract_delete_total", labels)
.increment(contract_destructions as _);

metrics::histogram!("tycho_bc_total_gas_used", labels).record(total_gas_used as f64);
metrics::histogram!("tycho_bc_out_in_msg_ratio", labels).record(out_in_message_ratio);
metrics::histogram!("tycho_bc_out_msg_acc_ratio", labels).record(out_message_account_ratio);

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ use tycho_block_util::archive::ArchiveData;
use tycho_block_util::block::BlockStuff;
use tycho_block_util::state::ShardStateStuff;

pub use self::metrics_subscriber::MetricsSubscriber;

mod metrics_subscriber;

// === trait BlockSubscriber ===

pub struct BlockSubscriberContext {
Expand Down
35 changes: 0 additions & 35 deletions network/src/dht/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,39 +42,6 @@ const METRIC_IN_REQ_FIND_VALUE_TOTAL: &str = "tycho_net_dht_in_req_find_value_to
const METRIC_IN_REQ_GET_NODE_INFO_TOTAL: &str = "tycho_net_dht_in_req_get_node_info_total";
const METRIC_IN_REQ_STORE_TOTAL: &str = "tycho_net_dht_in_req_store_value_total";

// Registered in `DhtServiceBuilder::build`
fn describe_metrics() {
metrics::describe_counter!(
METRIC_IN_REQ_TOTAL,
"Number of incoming DHT requests over time"
);
metrics::describe_counter!(
METRIC_IN_REQ_FAIL_TOTAL,
"Number of failed incoming DHT requests over time"
);

metrics::describe_counter!(
METRIC_IN_REQ_WITH_PEER_INFO_TOTAL,
"Number of incoming DHT requests with peer info over time"
);
metrics::describe_counter!(
METRIC_IN_REQ_FIND_NODE_TOTAL,
"Number of incoming DHT FindNode requests over time"
);
metrics::describe_counter!(
METRIC_IN_REQ_FIND_VALUE_TOTAL,
"Number of incoming DHT FindValue requests over time"
);
metrics::describe_counter!(
METRIC_IN_REQ_GET_NODE_INFO_TOTAL,
"Number of incoming DHT GetNodeInfo requests over time"
);
metrics::describe_counter!(
METRIC_IN_REQ_STORE_TOTAL,
"Number of incoming DHT Store requests over time"
);
}

#[derive(Clone)]
pub struct DhtClient {
inner: Arc<DhtInner>,
Expand Down Expand Up @@ -319,8 +286,6 @@ impl DhtServiceBuilder {
}

pub fn build(self) -> (DhtServiceBackgroundTasks, DhtService) {
describe_metrics();

let config = self.config.unwrap_or_default();

let storage = {
Expand Down
48 changes: 0 additions & 48 deletions network/src/network/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,54 +41,6 @@ const METRIC_CONNECTIONS_PENDING_DIALS: &str = "tycho_net_conn_pending_dials";
const METRIC_ACTIVE_PEERS: &str = "tycho_net_active_peers";
const METRIC_KNOWN_PEERS: &str = "tycho_net_known_peers";

pub fn describe_metrics() {
metrics::describe_histogram!(
METRIC_CONNECTION_OUT_TIME,
"Time taken to establish an outgoing connection"
);
metrics::describe_histogram!(
METRIC_CONNECTION_IN_TIME,
"Time taken to establish an incoming connection"
);

metrics::describe_counter!(
METRIC_CONNECTIONS_OUT_TOTAL,
"Number of established outgoing connections over time"
);
metrics::describe_counter!(
METRIC_CONNECTIONS_IN_TOTAL,
"Number of established incoming connections over time"
);
metrics::describe_counter!(
METRIC_CONNECTIONS_OUT_FAIL_TOTAL,
"Number of failed outgoing connections over time"
);
metrics::describe_counter!(
METRIC_CONNECTIONS_IN_FAIL_TOTAL,
"Number of failed incoming connections over time"
);

metrics::describe_gauge!(
METRIC_CONNECTIONS_ACTIVE,
"Number of currently active connections"
);
metrics::describe_gauge!(
METRIC_CONNECTIONS_PENDING,
"Number of currently pending connections"
);
metrics::describe_gauge!(
METRIC_CONNECTIONS_PARTIAL,
"Number of currently half-resolved connections"
);
metrics::describe_gauge!(
METRIC_CONNECTIONS_PENDING_DIALS,
"Number of currently pending connectivity checks"
);

metrics::describe_gauge!(METRIC_ACTIVE_PEERS, "Number of currently active peers");
metrics::describe_gauge!(METRIC_KNOWN_PEERS, "Number of currently known peers");
}

#[derive(Debug)]
pub(crate) enum ConnectionManagerRequest {
Connect(Address, PeerId, CallbackTx),
Expand Down
9 changes: 0 additions & 9 deletions network/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@ mod peer;
mod request_handler;
mod wire;

// Registered in `NetworkBuilder::build`
fn describe_metrics() {
connection_manager::describe_metrics();
request_handler::describe_metrics();
peer::describe_metrics();
}

pub struct NetworkBuilder<MandatoryFields = (String, [u8; 32])> {
mandatory_fields: MandatoryFields,
optional_fields: BuilderFields,
Expand Down Expand Up @@ -89,8 +82,6 @@ impl NetworkBuilder {
S: Send + Sync + Clone + 'static,
S: Service<ServiceRequest, QueryResponse = Response>,
{
describe_metrics();

let config = self.optional_fields.config.unwrap_or_default();
let quic_config = config.quic.clone().unwrap_or_default();
let (service_name, private_key) = self.mandatory_fields;
Expand Down
Loading

0 comments on commit 52f4410

Please sign in to comment.