Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proxy bench tools #208

Draft
wants to merge 45 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
fa96d42
make client an ipv6 endpoint
grooviegermanikus Sep 7, 2023
440ab7f
use simplifind client binding
grooviegermanikus Sep 7, 2023
208e8eb
bind client socket dual-stack
grooviegermanikus Sep 7, 2023
7b1ad67
WIP: send transactions to TPU via QUIC instead of RPC
grooviegermanikus Sep 8, 2023
c87d64c
cheap hack putting leaders in a local file
grooviegermanikus Sep 15, 2023
a148ed5
compile
grooviegermanikus Sep 15, 2023
466910b
fix leaders.dat path
grooviegermanikus Sep 15, 2023
5c627ec
reduce log output
grooviegermanikus Sep 15, 2023
a43afff
soft handling of transaction_channel closed
grooviegermanikus Sep 15, 2023
1f9e464
deal with log errors
grooviegermanikus Sep 15, 2023
33acb4b
leaders.dat atomic write
grooviegermanikus Sep 15, 2023
529548e
check for leader.dat age
grooviegermanikus Sep 15, 2023
6d8acf0
temp diable channel drain
grooviegermanikus Sep 15, 2023
0a2df9a
re-enable diable channel drain
grooviegermanikus Sep 15, 2023
70617c7
merge gm patch
godmodegalactus Sep 15, 2023
7a77ae9
merge gm patch
grooviegermanikus Sep 15, 2023
ee100d4
add proxy to CI build
grooviegermanikus Sep 15, 2023
757ef0a
more log info on leaders.data
grooviegermanikus Sep 15, 2023
e8f5a8c
add simple tool to send transaction against proxy
grooviegermanikus Sep 18, 2023
c6fc4fe
Merge branch 'main' into groovie/bench-using-quic-to-tpu
grooviegermanikus Sep 18, 2023
faa3eff
send_test_proxy_forward_requests
grooviegermanikus Sep 18, 2023
e862606
continue on broadcast error
grooviegermanikus Sep 18, 2023
ede7d62
format
grooviegermanikus Sep 18, 2023
a3ed4d3
add bench for direct quic
grooviegermanikus Sep 18, 2023
4e8ded3
add 2nd bench bin
grooviegermanikus Sep 18, 2023
702b908
proxy: integrate prometheus
grooviegermanikus Sep 18, 2023
1b10e14
clippy
grooviegermanikus Sep 18, 2023
6154043
prometheus config
grooviegermanikus Sep 18, 2023
3e98c52
add feature for "leaders.dat"
grooviegermanikus Sep 18, 2023
2bd6046
cleanup
grooviegermanikus Sep 18, 2023
f53511e
add naive timeouts to .send_uni
grooviegermanikus Sep 18, 2023
57c11cd
relax error handling
grooviegermanikus Sep 18, 2023
897a798
comment assumptions for connection manager
grooviegermanikus Sep 19, 2023
1a85527
code fmt
grooviegermanikus Sep 19, 2023
cfaf5f1
add bench-direct-quic to docker build
grooviegermanikus Sep 19, 2023
0ab364a
rename bench to bench_rpc
grooviegermanikus Sep 19, 2023
176cc86
remove unused dependencies
grooviegermanikus Sep 19, 2023
163e649
refactor bench files
grooviegermanikus Sep 19, 2023
7ab5ea0
add rust-toolchain.toml with rust 1.70.0
grooviegermanikus Sep 19, 2023
001f61e
refactor autoconnect
grooviegermanikus Sep 19, 2023
c7cc774
fix minor rust issue
grooviegermanikus Sep 19, 2023
488ab74
disable flaky with_1000_transactions_direct test
grooviegermanikus Sep 19, 2023
679ca6e
Merge remote-tracking branch 'origin/main' into groovie/proxy-bench-t…
grooviegermanikus Sep 21, 2023
b343c1c
add flag ENABLE_LEADERSDAT_FOR_BENCH for leaders.dat
grooviegermanikus Sep 21, 2023
9bdbf14
Merge remote-tracking branch 'origin/main' into groovie/proxy-bench-t…
grooviegermanikus Sep 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
487 changes: 245 additions & 242 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ prometheus = "0.13.3"
lazy_static = "1.4.0"
dotenv = "0.15.0"
async-channel = "1.8.0"
quinn = "0.9.3"
rustls = { version = "=0.20.8", default-features = false }
quinn = "0.9.4"
rustls = { version = "0.20.9", default-features = false }
solana-lite-rpc-services = {path = "services", version="0.2.3"}
solana-lite-rpc-core = {path = "core", version="0.2.3"}
solana-lite-rpc-cluster-endpoints = {path = "cluster-endpoints", version="0.2.3"}
Expand Down
6 changes: 4 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ FROM base as build
COPY --from=plan /app/recipe.json recipe.json
RUN cargo chef cook --release --recipe-path recipe.json
COPY . .
RUN cargo build --release --bin lite-rpc --bin solana-lite-rpc-quic-forward-proxy
RUN cargo build --release --bin lite-rpc --bin solana-lite-rpc-quic-forward-proxy --bin bench-rpc --bin bench-direct-quic

FROM debian:bullseye-slim as run
RUN apt-get update && apt-get -y install ca-certificates libc6
COPY --from=build /app/target/release/solana-lite-rpc-quic-forward-proxy /usr/local/bin/
COPY --from=build /app/target/release/lite-rpc /usr/local/bin/
COPY --from=build /app/target/release/bench-rpc /usr/local/bin/
COPY --from=build /app/target/release/bench-direct-quic /usr/local/bin/

CMD lite-rpc --rpc-addr "$RPC_URL" --ws-addr "$WS_URL"
CMD lite-rpc --rpc-addr "$RPC_URL" --ws-addr "$WS_URL"
11 changes: 11 additions & 0 deletions bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,27 @@ name = "bench"
version = "0.2.3"
edition = "2021"

[[bin]]
name = "bench-rpc"
path = "src/bench_rpc.rs"

[[bin]]
name = "bench-direct-quic"
path = "src/bench_direct_quic.rs"

[dependencies]
solana-lite-rpc-quic-forward-proxy = { path = "../quic-forward-proxy" }
solana-sdk = { workspace = true }
solana-rpc-client = { workspace = true }
solana-streamer = {workspace = true}
log = { workspace = true }
anyhow = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
clap = { workspace = true }
tokio = { version = "1.28.2", features = ["full", "fs"]}
tracing-subscriber = { workspace = true }
bincode = { workspace = true }
csv = "1.2.1"
dirs = "5.0.0"
rand = "0.8.5"
Expand Down
300 changes: 300 additions & 0 deletions bench/src/bench_direct_quic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
/// bench tool that uses the TPU client from quic proxy to submit transactions to TPUs via QUIC
///
/// note: this tool requires a lite-rpc service that is configured to dump the current leader list to a file (leaders.dat)
/// important: need to enable flag ENABLE_LEADERSDAT_FOR_BENCH in tpu_service.rs
mod cli_direct_quic;
mod helpers;
mod metrics;

use anyhow::Context;
use helpers::BenchHelper;
use metrics::{AvgMetric, Metric, TxMetricData};

use clap::Parser;
use dashmap::DashMap;
use futures::future::join_all;
use log::{debug, error, info};
use solana_lite_rpc_quic_forward_proxy::outbound::tx_forward::tx_forwarder;
use solana_lite_rpc_quic_forward_proxy::shared::ForwardPacket;
use solana_lite_rpc_quic_forward_proxy::validator_identity::ValidatorIdentity;

use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client::rpc_client::SerializableTransaction;
use solana_sdk::signature::Signature;
use solana_sdk::transaction::Transaction;
use solana_sdk::{
commitment_config::CommitmentConfig, hash::Hash, signature::Keypair, signer::Signer,
slot_history::Slot,
};
use std::fs;
use std::fs::read_to_string;
use std::net::{SocketAddr, SocketAddrV4};
use std::str::FromStr;
use std::sync::atomic::AtomicBool;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use std::time::SystemTime;
use tokio::{
sync::{mpsc::UnboundedSender, RwLock},
time::{Duration, Instant},
};

#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
async fn main() {
tracing_subscriber::fmt::init();

let cli_direct_quic::Args {
tx_count,
runs,
run_interval_ms,
metrics_file_name,
rpc_addr,
transaction_save_file,
} = cli_direct_quic::Args::parse();

let mut run_interval_ms = tokio::time::interval(Duration::from_millis(run_interval_ms));

info!("Using RPC service on {rpc_addr}");

let mut avg_metric = AvgMetric::default();

let mut tasks = vec![];

let funded_payer = BenchHelper::get_payer().await.unwrap();
println!("payer : {}", funded_payer.pubkey());

let rpc_client = Arc::new(RpcClient::new_with_commitment(
rpc_addr.clone(),
CommitmentConfig::confirmed(),
));
let bh = rpc_client.get_latest_blockhash().await.unwrap();
let slot = rpc_client.get_slot().await.unwrap();
let block_hash: Arc<RwLock<Hash>> = Arc::new(RwLock::new(bh));
let current_slot = Arc::new(AtomicU64::new(slot));

{
// block hash updater task
let block_hash = block_hash.clone();
let rpc_client = rpc_client.clone();
let current_slot = current_slot.clone();
tokio::spawn(async move {
loop {
let bh = rpc_client.get_latest_blockhash().await;
match bh {
Ok(bh) => {
let mut lock = block_hash.write().await;
*lock = bh;
}
Err(e) => println!("blockhash update error {}", e),
}

let slot = rpc_client.get_slot().await;
match slot {
Ok(slot) => {
current_slot.store(slot, std::sync::atomic::Ordering::Relaxed);
}
Err(e) => println!("slot {}", e),
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
})
};

// transaction logger
let (tx_log_sx, mut tx_log_rx) = tokio::sync::mpsc::unbounded_channel::<TxMetricData>();
let log_transactions = !transaction_save_file.is_empty();
if log_transactions {
tokio::spawn(async move {
let mut tx_writer = csv::Writer::from_path(transaction_save_file).unwrap();
while let Some(x) = tx_log_rx.recv().await {
tx_writer.serialize(x).unwrap();
}
});
}

for seed in 0..runs {
let funded_payer = Keypair::from_bytes(funded_payer.to_bytes().as_slice()).unwrap();
tasks.push(tokio::spawn(bench(
rpc_client.clone(),
tx_count,
funded_payer,
seed as u64,
block_hash.clone(),
current_slot.clone(),
tx_log_sx.clone(),
log_transactions,
)));
// wait for an interval
run_interval_ms.tick().await;
}

let join_res = join_all(tasks).await;

let mut run_num = 1;

let mut csv_writer = csv::Writer::from_path(metrics_file_name).unwrap();
for res in join_res {
match res {
Ok(metric) => {
info!("Run {run_num}: Sent and Confirmed {tx_count} tx(s) in {metric:?} with",);
// update avg metric
avg_metric += &metric;
csv_writer.serialize(metric).unwrap();
}
Err(_) => {
error!("join error for run {}", run_num);
}
}
run_num += 1;
}

let avg_metric = Metric::from(avg_metric);

info!("Avg Metric {avg_metric:?}",);
csv_writer.serialize(avg_metric).unwrap();

csv_writer.flush().unwrap();
}

#[derive(Clone, Debug, Copy)]
struct TxSendData {
sent_duration: Duration,
sent_instant: Instant,
sent_slot: Slot,
}

#[allow(clippy::too_many_arguments)]
async fn bench(
rpc_client: Arc<RpcClient>,
tx_count: usize,
funded_payer: Keypair,
seed: u64,
block_hash: Arc<RwLock<Hash>>,
current_slot: Arc<AtomicU64>,
tx_metric_sx: UnboundedSender<TxMetricData>,
log_txs: bool,
) -> Metric {
let map_of_txs: Arc<DashMap<Signature, TxSendData>> = Arc::new(DashMap::new());
let (forwarder_channel, forward_receiver) = tokio::sync::mpsc::channel(1000);

{
let validator_identity = ValidatorIdentity::new(None);
let exit_signal = Arc::new(AtomicBool::new(false));
let _jh = tokio::spawn(tx_forwarder(
validator_identity,
forward_receiver,
exit_signal,
));
}
// transaction sender task
{
let map_of_txs = map_of_txs.clone();
let current_slot = current_slot.clone();
// let forwarder_channel = forwarder_channel.clone();
tokio::spawn(async move {
let map_of_txs = map_of_txs.clone();
let rand_strings = BenchHelper::generate_random_strings(tx_count, Some(seed));
for rand_string in rand_strings {
let blockhash = { *block_hash.read().await };
let tx = BenchHelper::create_memo_tx(&rand_string, &funded_payer, blockhash);

let leader_addrs = read_leaders_from_file("leaders.dat").expect("leaders.dat file");

let start_time = Instant::now();

debug!(
"sent tx {} to {} tpu nodes",
tx.get_signature(),
leader_addrs.len()
);
for tpu_address in &leader_addrs {
let tx_raw = bincode::serialize::<Transaction>(&tx).unwrap();
let packet = ForwardPacket::new(
vec![tx_raw],
SocketAddr::from(*tpu_address),
0xdeadbeef,
);

forwarder_channel.send(packet).await.unwrap();

map_of_txs.insert(
*tx.get_signature(),
TxSendData {
sent_duration: start_time.elapsed(),
sent_instant: Instant::now(),
sent_slot: current_slot.load(std::sync::atomic::Ordering::Relaxed),
},
);
}
}
});
}

let mut metric = Metric::default();
let confirmation_time = Instant::now();
let mut confirmed_count = 0;
while confirmation_time.elapsed() < Duration::from_secs(60)
&& !(map_of_txs.is_empty() && confirmed_count == tx_count)
{
let signatures = map_of_txs.iter().map(|x| *x.key()).collect::<Vec<_>>();
if signatures.is_empty() {
tokio::time::sleep(Duration::from_millis(1)).await;
continue;
}
let chunks = signatures.chunks(100).collect::<Vec<_>>();
for chunk in chunks {
if let Ok(res) = rpc_client.get_signature_statuses(chunk).await {
for (i, signature) in chunk.iter().enumerate() {
let tx_status = &res.value[i];
if tx_status.is_some() {
let tx_data = map_of_txs.get(signature).unwrap();
let time_to_confirm = tx_data.sent_instant.elapsed();
metric.add_successful_transaction(tx_data.sent_duration, time_to_confirm);

if log_txs {
let _ = tx_metric_sx.send(TxMetricData {
signature: signature.to_string(),
sent_slot: tx_data.sent_slot,
confirmed_slot: current_slot.load(Ordering::Relaxed),
time_to_send_in_millis: tx_data.sent_duration.as_millis() as u64,
time_to_confirm_in_millis: time_to_confirm.as_millis() as u64,
});
}
drop(tx_data);
map_of_txs.remove(signature);
confirmed_count += 1;
}
}
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
}

for tx in map_of_txs.iter() {
metric.add_unsuccessful_transaction(tx.sent_duration);
}
metric.finalize();
metric
}

// note: this file gets written by tpu_services::dump_leaders_to_file; see docs how to activate
fn read_leaders_from_file(leaders_file: &str) -> anyhow::Result<Vec<SocketAddrV4>> {
let last_modified = fs::metadata("leaders.dat")?.modified().unwrap();
let file_age = SystemTime::now().duration_since(last_modified).unwrap();
assert!(
file_age.as_millis() < 1000,
"leaders.dat is outdated ({:?}) - pls run patched lite-rpc service",
file_age
);
let leader_file = read_to_string(leaders_file)?;
let mut leader_addrs = vec![];
for line in leader_file.lines() {
let socket_addr = SocketAddrV4::from_str(line)
.context(format!("error parsing line: {}", line))
.unwrap();
leader_addrs.push(socket_addr);
}
Ok(leader_addrs)
}
16 changes: 9 additions & 7 deletions bench/src/main.rs → bench/src/bench_rpc.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use bench::{
cli::Args,
helpers::BenchHelper,
metrics::{AvgMetric, Metric, TxMetricData},
};
mod cli_rpc;
mod helpers;
mod metrics;

use helpers::BenchHelper;
use metrics::{AvgMetric, Metric, TxMetricData};

use clap::Parser;
use dashmap::DashMap;
use futures::future::join_all;
Expand All @@ -25,14 +27,14 @@ use tokio::{
async fn main() {
tracing_subscriber::fmt::init();

let Args {
let cli_rpc::Args {
tx_count,
runs,
run_interval_ms,
metrics_file_name,
lite_rpc_addr,
transaction_save_file,
} = Args::parse();
} = cli_rpc::Args::parse();

let mut run_interval_ms = tokio::time::interval(Duration::from_millis(run_interval_ms));

Expand Down
Loading