Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
telezhnaya committed May 27, 2024
1 parent 56f5120 commit ac9aeff
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 281 deletions.
2 changes: 1 addition & 1 deletion src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn test(opts: Opts, engine: Engine) -> anyhow::Result<()> {
info!("executing transaction {} for {}", tx.kind(), opts.signer_id);
let outcome = tx.execute(&rpc_client, opts.clone()).await?;
info!(
"completed transaction {} for {}: {}",
"completed transaction {} for {}: {:?}",
tx.kind(),
opts.signer_id,
outcome
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ pub mod metrics;
pub use metrics::MetricServer;

pub mod transaction;
pub use transaction::{engine::Engine, TransactionKind, TransactionOutcome, TransactionSample};
pub use transaction::{engine::Engine, TransactionKind, TransactionSample};
48 changes: 38 additions & 10 deletions src/transaction/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ async fn run_account_transactions_once(
match tx_sample.execute(&rpc_client, opts.clone()).await {
Ok(outcome) => {
info!(
"completed transaction {}#{} for {}: {}",
"completed transaction {}#{} for {}: {:?}",
tx_sample.kind(),
i,
opts.signer_id,
Expand All @@ -139,7 +139,7 @@ async fn run_account_transactions_once(
metrics
.transaction_latency
.get_or_create(&labels)
.observe(outcome.latency.as_secs_f64());
.observe(outcome.as_secs_f64());
}
Err(err) => {
warn!(
Expand Down Expand Up @@ -167,14 +167,14 @@ mod tests {

use async_trait::async_trait;
use more_asserts::assert_ge;
use near_crypto::{KeyType, SecretKey};
use near_crypto::{InMemorySigner, KeyType, SecretKey};
use near_jsonrpc_primitives::types::transactions::RpcSendTransactionRequest;
use near_primitives::hash::CryptoHash;
use near_primitives::types::Nonce;
use tokio::{sync::oneshot, time::sleep};

use crate::config::Mode;
use crate::{
metrics::{create_registry_and_metrics, Labels},
TransactionOutcome,
};
use crate::metrics::{create_registry_and_metrics, Labels};

use super::*;

Expand All @@ -193,14 +193,28 @@ mod tests {
TransactionKind::TokenTransferDefault
}

fn get_name(&self) -> &str {
unimplemented!();
}

fn get_transaction_request(
&self,
_: &InMemorySigner,
_: Opts,
_: Nonce,
_: CryptoHash,
) -> RpcSendTransactionRequest {
unimplemented!();
}

async fn execute(
&self,
_rpc_client: &JsonRpcClient,
_opts: Opts,
) -> anyhow::Result<TransactionOutcome> {
) -> anyhow::Result<Duration> {
self.exec_counter
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(TransactionOutcome::new(std::time::Duration::from_millis(1)))
Ok(std::time::Duration::from_millis(1))
}
}

Expand All @@ -215,11 +229,25 @@ mod tests {
TransactionKind::FungibleTokenTransfer
}

fn get_name(&self) -> &str {
unimplemented!();
}

fn get_transaction_request(
&self,
_: &InMemorySigner,
_: Opts,
_: Nonce,
_: CryptoHash,
) -> RpcSendTransactionRequest {
unimplemented!();
}

async fn execute(
&self,
_rpc_client: &JsonRpcClient,
_opts: Opts,
) -> anyhow::Result<TransactionOutcome> {
) -> anyhow::Result<Duration> {
self.exec_counter.fetch_add(1, Ordering::SeqCst);
Err(anyhow::anyhow!("unknown error".to_string()))
}
Expand Down
68 changes: 18 additions & 50 deletions src/transaction/fungible_token_transfer.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use crate::config::Opts;
use crate::{TransactionOutcome, TransactionSample};
use crate::TransactionSample;
use async_trait::async_trait;
use near_jsonrpc_client::{methods, JsonRpcClient};
use near_jsonrpc_primitives::types::query::QueryResponseKind;
use near_crypto::InMemorySigner;
use near_jsonrpc_primitives::types::transactions::RpcSendTransactionRequest;
use near_primitives::action::FunctionCallAction;
use near_primitives::hash::CryptoHash;
use near_primitives::transaction::{Action, Transaction};
use near_primitives::types::BlockReference;
use tokio::time::Instant;
use tracing::{debug, warn};
use near_primitives::types::Nonce;

use super::TransactionKind;

Expand All @@ -19,37 +18,23 @@ impl TransactionSample for FungibleTokenTransfer {
TransactionKind::FungibleTokenTransfer
}

async fn execute(
fn get_name(&self) -> &str {
"USDT FT transfer"
}

fn get_transaction_request(
&self,
rpc_client: &JsonRpcClient,
signer: &InMemorySigner,
opts: Opts,
) -> anyhow::Result<TransactionOutcome> {
let signer = near_crypto::InMemorySigner::from_secret_key(
opts.signer_id.clone(),
opts.signer_key.clone(),
);

let access_key_response = rpc_client
.call(methods::query::RpcQueryRequest {
block_reference: BlockReference::latest(),
request: near_primitives::views::QueryRequest::ViewAccessKey {
account_id: signer.account_id.clone(),
public_key: signer.public_key.clone(),
},
})
.await?;

let current_nonce = match access_key_response.kind {
QueryResponseKind::AccessKey(access_key) => access_key.nonce,
_ => return Err(anyhow::anyhow!("Unreachable code")),
};

nonce: Nonce,
block_hash: CryptoHash,
) -> RpcSendTransactionRequest {
let transaction = Transaction {
signer_id: signer.account_id.clone(),
public_key: signer.public_key.clone(),
nonce: current_nonce + 1,
nonce: nonce + 1,
receiver_id: "usdt.tether-token.near".parse().unwrap(),
block_hash: access_key_response.block_hash,
block_hash,
actions: vec![Action::FunctionCall(Box::new(FunctionCallAction {
method_name: "ft_transfer".to_string(),
args: serde_json::json!({
Expand All @@ -62,26 +47,9 @@ impl TransactionSample for FungibleTokenTransfer {
deposit: 1,
}))],
};
let request = methods::send_tx::RpcSendTransactionRequest {
signed_transaction: transaction.sign(&signer),
RpcSendTransactionRequest {
signed_transaction: transaction.sign(signer),
wait_until: Default::default(),
};

let now = Instant::now();

match rpc_client.call(request).await {
Ok(response) => {
debug!(
"successful USDT FT transfer, status: {:?}\n",
response.final_execution_status,
);
let elapsed = now.elapsed();
Ok(TransactionOutcome::new(elapsed))
}
Err(err) => {
warn!("failure during USDT FT transfer:\n{}\n", err);
Err(anyhow::anyhow!("USDT FT transfer failed: {}", err))
}
}
}
}
77 changes: 60 additions & 17 deletions src/transaction/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use core::fmt;
use std::time::Duration;

use async_trait::async_trait;
use derive_more::{Constructor, Display, From};
use humantime::format_duration;
use near_jsonrpc_client::JsonRpcClient;
use derive_more::{Display, From};
use near_crypto::InMemorySigner;
use near_jsonrpc_client::{methods, JsonRpcClient};
use near_jsonrpc_primitives::types::query::QueryResponseKind;
use near_jsonrpc_primitives::types::transactions::RpcSendTransactionRequest;
use near_primitives::hash::CryptoHash;
use near_primitives::types::{BlockReference, Nonce};
use std::time::Duration;
use tokio::time::Instant;
use tracing::{debug, warn};

use crate::config::Opts;

Expand All @@ -29,20 +33,59 @@ pub enum TransactionKind {
pub trait TransactionSample: Send + Sync {
fn kind(&self) -> TransactionKind;

async fn execute(
fn get_name(&self) -> &str;

fn get_transaction_request(
&self,
rpc_client: &JsonRpcClient,
signer: &InMemorySigner,
opts: Opts,
) -> anyhow::Result<TransactionOutcome>;
}
nonce: Nonce,
block_hash: CryptoHash,
) -> RpcSendTransactionRequest;

#[derive(Debug, Constructor)]
pub struct TransactionOutcome {
pub latency: Duration,
}
async fn execute(&self, rpc_client: &JsonRpcClient, opts: Opts) -> anyhow::Result<Duration> {
let now = Instant::now();

let signer = near_crypto::InMemorySigner::from_secret_key(
opts.signer_id.clone(),
opts.signer_key.clone(),
);

let access_key_response = rpc_client
.call(methods::query::RpcQueryRequest {
block_reference: BlockReference::latest(),
request: near_primitives::views::QueryRequest::ViewAccessKey {
account_id: signer.account_id.clone(),
public_key: signer.public_key.clone(),
},
})
.await?;

let current_nonce = match access_key_response.kind {
QueryResponseKind::AccessKey(access_key) => access_key.nonce,
_ => return Err(anyhow::anyhow!("Unreachable code")),
};

let request = self.get_transaction_request(
&signer,
opts,
current_nonce,
access_key_response.block_hash,
);

impl fmt::Display for TransactionOutcome {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "latency = {}", format_duration(self.latency))
match rpc_client.call(request).await {
Ok(response) => {
debug!(
"successful {}, status: {:?}\n",
self.get_name(),
response.final_execution_status,
);
Ok(now.elapsed())
}
Err(err) => {
warn!("failure during {}:\n{}\n", self.get_name(), err);
Err(anyhow::anyhow!("{} failed: {}", self.get_name(), err))
}
}
}
}
70 changes: 19 additions & 51 deletions src/transaction/swap.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use crate::config::Opts;
use crate::{TransactionOutcome, TransactionSample};
use crate::TransactionSample;
use async_trait::async_trait;
use near_jsonrpc_client::{methods, JsonRpcClient};
use near_jsonrpc_primitives::types::query::QueryResponseKind;
use near_crypto::InMemorySigner;
use near_jsonrpc_primitives::types::transactions::RpcSendTransactionRequest;
use near_primitives::action::FunctionCallAction;
use near_primitives::hash::CryptoHash;
use near_primitives::transaction::{Action, Transaction};
use near_primitives::types::BlockReference;
use tokio::time::Instant;
use tracing::{debug, warn};
use near_primitives::types::Nonce;

use super::TransactionKind;

Expand All @@ -19,37 +18,23 @@ impl TransactionSample for Swap {
TransactionKind::Swap
}

async fn execute(
&self,
rpc_client: &JsonRpcClient,
opts: Opts,
) -> anyhow::Result<TransactionOutcome> {
let signer = near_crypto::InMemorySigner::from_secret_key(
opts.signer_id.clone(),
opts.signer_key.clone(),
);

let access_key_response = rpc_client
.call(methods::query::RpcQueryRequest {
block_reference: BlockReference::latest(),
request: near_primitives::views::QueryRequest::ViewAccessKey {
account_id: signer.account_id.clone(),
public_key: signer.public_key.clone(),
},
})
.await?;

let current_nonce = match access_key_response.kind {
QueryResponseKind::AccessKey(access_key) => access_key.nonce,
_ => return Err(anyhow::anyhow!("Unreachable code")),
};
fn get_name(&self) -> &str {
"swap from NEAR to USDT"
}

fn get_transaction_request(
&self,
signer: &InMemorySigner,
_opts: Opts,
nonce: Nonce,
block_hash: CryptoHash,
) -> RpcSendTransactionRequest {
let transaction = Transaction {
signer_id: signer.account_id.clone(),
public_key: signer.public_key.clone(),
nonce: current_nonce + 1,
nonce: nonce + 1,
receiver_id: "wrap.near".parse().unwrap(),
block_hash: access_key_response.block_hash,
block_hash,
actions: vec![Action::FunctionCall(Box::new(FunctionCallAction {
method_name: "near_deposit".to_string(),
args: serde_json::json!({})
Expand All @@ -68,26 +53,9 @@ impl TransactionSample for Swap {
})),
],
};
let request = methods::send_tx::RpcSendTransactionRequest {
signed_transaction: transaction.sign(&signer),
RpcSendTransactionRequest {
signed_transaction: transaction.sign(signer),
wait_until: Default::default(),
};

let now = Instant::now();

match rpc_client.call(request).await {
Ok(response) => {
debug!(
"successful swap from NEAR to USDT, status: {:?}\n",
response.final_execution_status,
);
let elapsed = now.elapsed();
Ok(TransactionOutcome::new(elapsed))
}
Err(err) => {
warn!("failure during swap from NEAR to USDT:\n{}\n", err);
Err(anyhow::anyhow!("swap from NEAR to USDT failed: {}", err))
}
}
}
}
Loading

0 comments on commit ac9aeff

Please sign in to comment.