Skip to content

Commit

Permalink
#2: add timeouts metric
Browse files Browse the repository at this point in the history
  • Loading branch information
telezhnaya committed May 30, 2024
1 parent 9d79bad commit 9d3d3e2
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 30 deletions.
23 changes: 1 addition & 22 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use clap::Parser;
use futures::try_join;
use near_jsonrpc_client::JsonRpcClient;
use tokio::sync::oneshot;
use tracing::info;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
Expand All @@ -15,9 +14,8 @@ async fn main() -> anyhow::Result<()> {
let engine = Engine::with_default_transactions();

match opts.mode {
Mode::Run => run(opts, engine).await,
Mode::List => list(engine).await,
Mode::Test => test(opts, engine).await,
Mode::Run => run(opts, engine).await,
}
}

Expand All @@ -38,25 +36,6 @@ async fn list(engine: Engine) -> anyhow::Result<()> {
Ok(())
}

async fn test(opts: Opts, engine: Engine) -> anyhow::Result<()> {
info!("running selected transactions: {:?}", opts.transaction_kind);
let rpc_client = JsonRpcClient::connect(&opts.rpc_url);
for (kind, tx) in engine.transactions() {
if opts.transaction_kind.contains(kind) {
info!("executing transaction {} for {}", tx.kind(), opts.signer_id);
let outcome = tx.execute(&rpc_client, opts.clone()).await?;
info!(
"completed transaction {} for {}: {:?}",
tx.kind(),
opts.signer_id,
outcome
);
return Ok(());
}
}
Ok(())
}

fn setup_tracing() {
let fmt_layer = fmt::layer().with_target(false);
let filter_layer = EnvFilter::try_from_default_env()
Expand Down
6 changes: 2 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ use std::net::SocketAddr;

#[derive(clap::ValueEnum, Debug, Clone, Subcommand)]
pub enum Mode {
/// Run selected transactions continuously.
Run,
/// Display the available transaction types.
List,
/// Run selected transactions once.
Test,
/// Run selected transactions continuously.
Run,
}

/// Start options
Expand Down
4 changes: 4 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub struct Metrics {
pub attempted_transactions: Family<Labels, Counter>,
pub successful_transactions: Family<Labels, Counter>,
pub failed_transactions: Family<Labels, Counter>,
pub timeouts: Family<Labels, Counter>,
pub transaction_latency: Family<Labels, Histogram>,
}

Expand Down Expand Up @@ -130,6 +131,8 @@ pub(crate) fn create_registry_and_metrics() -> (Arc<Registry>, Arc<Metrics>) {
"Number of failed transactions",
failed_transactions.clone(),
);
let timeouts = Family::<Labels, Counter>::default();
registry.register("timeouts", "Number of timeouts", timeouts.clone());
let transaction_latency = Family::<Labels, Histogram>::new_with_constructor(|| {
Histogram::new(exponential_buckets(2.0, 2.0, 6))
});
Expand All @@ -143,6 +146,7 @@ pub(crate) fn create_registry_and_metrics() -> (Arc<Registry>, Arc<Metrics>) {
attempted_transactions,
successful_transactions,
failed_transactions,
timeouts,
transaction_latency,
};
(Arc::new(registry), Arc::new(metrics))
Expand Down
9 changes: 8 additions & 1 deletion src/transaction/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ async fn run_account_transactions_once(
opts.signer_id
);

match tx_sample.execute(&rpc_client, opts.clone()).await {
match tx_sample
.execute(&rpc_client, opts.clone(), &metrics, &labels)
.await
{
Ok(outcome) => {
info!(
"completed transaction {}#{} for {}: {:?}",
Expand Down Expand Up @@ -216,6 +219,8 @@ mod tests {
&self,
_rpc_client: &JsonRpcClient,
_opts: Opts,
_metrics: &Arc<Metrics>,
_labels: &Labels,
) -> anyhow::Result<Duration> {
self.exec_counter
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Expand Down Expand Up @@ -252,6 +257,8 @@ mod tests {
&self,
_rpc_client: &JsonRpcClient,
_opts: Opts,
_metrics: &Arc<Metrics>,
_labels: &Labels,
) -> anyhow::Result<Duration> {
self.exec_counter.fetch_add(1, Ordering::SeqCst);
Err(anyhow::anyhow!("unknown error".to_string()))
Expand Down
18 changes: 15 additions & 3 deletions src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ use near_jsonrpc_primitives::types::transactions::{
};
use near_primitives::hash::CryptoHash;
use near_primitives::types::{BlockReference, Nonce};
use std::sync::Arc;
use std::time::Duration;
use strum_macros::Display;
use tokio::time::Instant;
use tracing::{debug, warn};

use crate::config::Opts;
use crate::metrics::{Labels, Metrics};

pub mod engine;

Expand Down Expand Up @@ -46,7 +48,13 @@ pub trait TransactionSample: Send + Sync {
block_hash: CryptoHash,
) -> RpcSendTransactionRequest;

async fn execute(&self, rpc_client: &JsonRpcClient, opts: Opts) -> anyhow::Result<Duration> {
async fn execute(
&self,
rpc_client: &JsonRpcClient,
opts: Opts,
metrics: &Arc<Metrics>,
labels: &Labels,
) -> anyhow::Result<Duration> {
let now = Instant::now();

let signer =
Expand Down Expand Up @@ -85,7 +93,9 @@ pub trait TransactionSample: Send + Sync {
}
Err(err) => {
match err.handler_error() {
Some(RpcTransactionError::TimeoutError) => {}
Some(RpcTransactionError::TimeoutError) => {
metrics.timeouts.get_or_create(labels).inc();
}
_ => {
warn!("failure during {}:\n{}\n", self.get_name(), err);
return Err(anyhow::anyhow!("{} failed: {}", self.get_name(), err));
Expand All @@ -107,7 +117,9 @@ pub trait TransactionSample: Send + Sync {
.await
{
Err(err) => match err.handler_error() {
Some(RpcTransactionError::TimeoutError) => {}
Some(RpcTransactionError::TimeoutError) => {
metrics.timeouts.get_or_create(labels).inc();
}
_ => {
warn!(
"failure during tx status request, {}:\n{}\n",
Expand Down

0 comments on commit 9d3d3e2

Please sign in to comment.