Skip to content

Commit

Permalink
reduce logging; add parallel streams on client side
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Jul 28, 2023
1 parent d5df190 commit e8f2bef
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use std::time::{Duration, Instant};
use tokio::runtime::{Builder, Runtime};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::SendError;
use tokio::task::JoinHandle;
use tokio::task::{JoinHandle, yield_now};
use tokio::time::{interval, sleep};
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{filter::LevelFilter, fmt};
Expand Down Expand Up @@ -126,7 +126,7 @@ pub fn bench_proxy() {

wireup_and_send_txs_via_channel(TestCaseParams {
// sample_tx_count: 1000, // this is the goal -- ATM test runs too long
sample_tx_count: 200,
sample_tx_count: 1000,
stake_connection: true,
proxy_mode: true,
});
Expand All @@ -143,6 +143,17 @@ pub fn with_10000_transactions() {
});
}

#[test]
pub fn with_10000_transactions_proxy() {
configure_logging(false);

wireup_and_send_txs_via_channel(TestCaseParams {
sample_tx_count: 10000,
stake_connection: true,
proxy_mode: true,
});
}

#[ignore]
#[test]
pub fn too_many_transactions() {
Expand All @@ -151,7 +162,7 @@ pub fn too_many_transactions() {
wireup_and_send_txs_via_channel(TestCaseParams {
sample_tx_count: 100000,
stake_connection: false,
proxy_mode: false,
proxy_mode: true,
});
}

Expand Down Expand Up @@ -250,7 +261,7 @@ fn wireup_and_send_txs_via_channel(test_case_params: TestCaseParams) {
CountMap::with_capacity(test_case_params.sample_tx_count as usize);
let warmup_tx_count: u32 = test_case_params.sample_tx_count / 2;
while packet_count < test_case_params.sample_tx_count {
if latest_tx.elapsed() > Duration::from_secs(5) {
if latest_tx.elapsed() > Duration::from_secs(25) {
warn!("abort after timeout waiting for packet from quic streamer");
break;
}
Expand Down Expand Up @@ -336,7 +347,7 @@ fn configure_logging(verbose: bool) {
let env_filter = if verbose {
"debug,rustls=info,quinn=info,quinn_proto=debug,solana_streamer=debug,solana_lite_rpc_quic_forward_proxy=trace"
} else {
"debug,rustls=info,quinn=info,quinn_proto=info,solana_streamer=debug,solana_lite_rpc_quic_forward_proxy=debug"
"info,rustls=info,quinn=info,quinn_proto=info,solana_streamer=info,solana_lite_rpc_quic_forward_proxy=info"
};
let span_mode = if verbose {
FmtSpan::CLOSE
Expand Down Expand Up @@ -426,6 +437,9 @@ async fn start_literpc_client(
for i in 0..test_case_params.sample_tx_count {
let raw_sample_tx = build_raw_sample_tx(i);
broadcast_sender.send(raw_sample_tx)?;
if (i+1) % 1000 == 0 {
yield_now().await;
}
}

// we need that to keep the tokio runtime dedicated to lite-rpc up long enough
Expand Down Expand Up @@ -549,7 +563,7 @@ async fn start_literpc_client_direct_mode(

for i in 0..test_case_params.sample_tx_count {
let raw_sample_tx = build_raw_sample_tx(i);
debug!(
trace!(
"broadcast transaction {} to {} receivers: {}",
raw_sample_tx.0,
broadcast_sender.receiver_count(),
Expand Down Expand Up @@ -642,14 +656,17 @@ async fn start_literpc_client_proxy_mode(

for i in 0..test_case_params.sample_tx_count {
let raw_sample_tx = build_raw_sample_tx(i);
debug!(
trace!(
"broadcast transaction {} to {} receivers: {}",
raw_sample_tx.0,
broadcast_sender.receiver_count(),
format!("hi {}", i)
);

broadcast_sender.send(raw_sample_tx)?;
if (i+1) % 1000 == 0 {
yield_now().await;
}
}

sleep(Duration::from_secs(30)).await;
Expand Down
43 changes: 24 additions & 19 deletions services/src/tpu_utils/quic_proxy_connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use anyhow::{bail, Context};
use async_trait::async_trait;
use futures::FutureExt;
use itertools::Itertools;
use log::{debug, error, info, warn};
use log::{debug, error, info, trace, warn};
use quinn::{ClientConfig, Connection, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig, VarInt};
use solana_sdk::packet::PACKET_DATA_SIZE;
use solana_sdk::pubkey::Pubkey;
Expand Down Expand Up @@ -40,6 +40,8 @@ pub struct QuicProxyConnectionManager {
current_tpu_nodes: Arc<RwLock<Vec<TpuNode>>>
}

const PARALLEL_STREAMS_TO_PROXY: usize = 4;

impl QuicProxyConnectionManager {
pub async fn new(
certificate: rustls::Certificate,
Expand Down Expand Up @@ -201,7 +203,7 @@ impl QuicProxyConnectionManager {

let tpu_fanout_nodes = current_tpu_nodes.read().await.clone();

info!("Sending copy of transaction batch of {} txs to {} tpu nodes via quic proxy",
trace!("Sending copy of transaction batch of {} txs to {} tpu nodes via quic proxy",
txs.len(), tpu_fanout_nodes.len());

for target_tpu_node in tpu_fanout_nodes {
Expand All @@ -222,9 +224,6 @@ impl QuicProxyConnectionManager {
proxy_address: SocketAddr, tpu_target_address: SocketAddr,
target_tpu_identity: Pubkey) -> anyhow::Result<()> {

info!("sending vecvec {} to quic proxy for TPU node {}",
raw_tx_batch.iter().map(|tx| tx.len()).into_iter().join(","), tpu_target_address);

// TODO add timeout
// let mut send_stream = timeout(Duration::from_millis(500), connection.open_uni()).await??;

Expand All @@ -242,25 +241,31 @@ impl QuicProxyConnectionManager {
txs.push(tx);
}

let forwarding_request = TpuForwardingRequest::new(tpu_target_address, target_tpu_identity, txs);
debug!("forwarding_request: {}", forwarding_request);

let proxy_request_raw = bincode::serialize(&forwarding_request).expect("Expect to serialize transactions");
for chunk in txs.chunks(PARALLEL_STREAMS_TO_PROXY) {

let send_result = auto_connection.send(proxy_request_raw).await;
let forwarding_request = TpuForwardingRequest::new(tpu_target_address, target_tpu_identity, chunk.into());
debug!("forwarding_request: {}", forwarding_request);

// let send_result =
// timeout(Duration::from_millis(3500), Self::send_proxy_request(endpoint, proxy_address, &proxy_request_raw))
// .await.context("Timeout sending data to quic proxy")?;
let proxy_request_raw = bincode::serialize(&forwarding_request).expect("Expect to serialize transactions");

match send_result {
Ok(()) => {
info!("Successfully sent data to quic proxy");
}
Err(e) => {
bail!("Failed to send data to quic proxy: {:?}", e);
let send_result = auto_connection.send(proxy_request_raw).await;

// let send_result =
// timeout(Duration::from_millis(3500), Self::send_proxy_request(endpoint, proxy_address, &proxy_request_raw))
// .await.context("Timeout sending data to quic proxy")?;

match send_result {
Ok(()) => {
debug!("Successfully sent {} txs to quic proxy", txs.len());
}
Err(e) => {
bail!("Failed to send data to quic proxy: {:?}", e);
}
}
}

} // -- one chunk


Ok(())
}
Expand Down
3 changes: 1 addition & 2 deletions services/src/tpu_utils/quinn_auto_reconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ impl AutoReconnect {
// TOOD do smart error handling + reconnect
let mut send_stream = self.refresh().await.open_uni().await?;
send_stream.write_all(payload.as_slice()).await?;
send_stream.finish().await?;

let _ = send_stream.finish().await;

Ok(())
}
Expand Down

0 comments on commit e8f2bef

Please sign in to comment.