diff --git a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs index b521749a..d348ce87 100644 --- a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs +++ b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs @@ -34,7 +34,7 @@ use std::time::{Duration, Instant}; use tokio::runtime::{Builder, Runtime}; use tokio::sync::broadcast; use tokio::sync::broadcast::error::SendError; -use tokio::task::JoinHandle; +use tokio::task::{JoinHandle, yield_now}; use tokio::time::{interval, sleep}; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{filter::LevelFilter, fmt}; @@ -126,7 +126,7 @@ pub fn bench_proxy() { wireup_and_send_txs_via_channel(TestCaseParams { // sample_tx_count: 1000, // this is the goal -- ATM test runs too long - sample_tx_count: 200, + sample_tx_count: 1000, stake_connection: true, proxy_mode: true, }); @@ -143,6 +143,17 @@ pub fn with_10000_transactions() { }); } +#[test] +pub fn with_10000_transactions_proxy() { + configure_logging(false); + + wireup_and_send_txs_via_channel(TestCaseParams { + sample_tx_count: 10000, + stake_connection: true, + proxy_mode: true, + }); +} + #[ignore] #[test] pub fn too_many_transactions() { @@ -151,7 +162,7 @@ pub fn too_many_transactions() { wireup_and_send_txs_via_channel(TestCaseParams { sample_tx_count: 100000, stake_connection: false, - proxy_mode: false, + proxy_mode: true, }); } @@ -250,7 +261,7 @@ fn wireup_and_send_txs_via_channel(test_case_params: TestCaseParams) { CountMap::with_capacity(test_case_params.sample_tx_count as usize); let warmup_tx_count: u32 = test_case_params.sample_tx_count / 2; while packet_count < test_case_params.sample_tx_count { - if latest_tx.elapsed() > Duration::from_secs(5) { + if latest_tx.elapsed() > Duration::from_secs(25) { warn!("abort after timeout waiting for packet from quic streamer"); break; } @@ -336,7 +347,7 @@ fn configure_logging(verbose: bool) { let env_filter = if verbose { "debug,rustls=info,quinn=info,quinn_proto=debug,solana_streamer=debug,solana_lite_rpc_quic_forward_proxy=trace" } else { - "debug,rustls=info,quinn=info,quinn_proto=info,solana_streamer=debug,solana_lite_rpc_quic_forward_proxy=debug" + "info,rustls=info,quinn=info,quinn_proto=info,solana_streamer=info,solana_lite_rpc_quic_forward_proxy=info" }; let span_mode = if verbose { FmtSpan::CLOSE @@ -426,6 +437,9 @@ async fn start_literpc_client( for i in 0..test_case_params.sample_tx_count { let raw_sample_tx = build_raw_sample_tx(i); broadcast_sender.send(raw_sample_tx)?; + if (i+1) % 1000 == 0 { + yield_now().await; + } } // we need that to keep the tokio runtime dedicated to lite-rpc up long enough @@ -549,7 +563,7 @@ async fn start_literpc_client_direct_mode( for i in 0..test_case_params.sample_tx_count { let raw_sample_tx = build_raw_sample_tx(i); - debug!( + trace!( "broadcast transaction {} to {} receivers: {}", raw_sample_tx.0, broadcast_sender.receiver_count(), @@ -642,7 +656,7 @@ async fn start_literpc_client_proxy_mode( for i in 0..test_case_params.sample_tx_count { let raw_sample_tx = build_raw_sample_tx(i); - debug!( + trace!( "broadcast transaction {} to {} receivers: {}", raw_sample_tx.0, broadcast_sender.receiver_count(), @@ -650,6 +664,9 @@ async fn start_literpc_client_proxy_mode( ); broadcast_sender.send(raw_sample_tx)?; + if (i+1) % 1000 == 0 { + yield_now().await; + } } sleep(Duration::from_secs(30)).await; diff --git a/services/src/tpu_utils/quic_proxy_connection_manager.rs b/services/src/tpu_utils/quic_proxy_connection_manager.rs index ed4e4570..ae8b9194 100644 --- a/services/src/tpu_utils/quic_proxy_connection_manager.rs +++ b/services/src/tpu_utils/quic_proxy_connection_manager.rs @@ -10,7 +10,7 @@ use anyhow::{bail, Context}; use async_trait::async_trait; use futures::FutureExt; use itertools::Itertools; -use log::{debug, error, info, warn}; +use log::{debug, error, info, trace, warn}; use quinn::{ClientConfig, Connection, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig, VarInt}; use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::pubkey::Pubkey; @@ -40,6 +40,8 @@ pub struct QuicProxyConnectionManager { current_tpu_nodes: Arc>> } +const PARALLEL_STREAMS_TO_PROXY: usize = 4; + impl QuicProxyConnectionManager { pub async fn new( certificate: rustls::Certificate, @@ -201,7 +203,7 @@ impl QuicProxyConnectionManager { let tpu_fanout_nodes = current_tpu_nodes.read().await.clone(); - info!("Sending copy of transaction batch of {} txs to {} tpu nodes via quic proxy", + trace!("Sending copy of transaction batch of {} txs to {} tpu nodes via quic proxy", txs.len(), tpu_fanout_nodes.len()); for target_tpu_node in tpu_fanout_nodes { @@ -222,9 +224,6 @@ impl QuicProxyConnectionManager { proxy_address: SocketAddr, tpu_target_address: SocketAddr, target_tpu_identity: Pubkey) -> anyhow::Result<()> { - info!("sending vecvec {} to quic proxy for TPU node {}", - raw_tx_batch.iter().map(|tx| tx.len()).into_iter().join(","), tpu_target_address); - // TODO add timeout // let mut send_stream = timeout(Duration::from_millis(500), connection.open_uni()).await??; @@ -242,25 +241,31 @@ impl QuicProxyConnectionManager { txs.push(tx); } - let forwarding_request = TpuForwardingRequest::new(tpu_target_address, target_tpu_identity, txs); - debug!("forwarding_request: {}", forwarding_request); - let proxy_request_raw = bincode::serialize(&forwarding_request).expect("Expect to serialize transactions"); + for chunk in txs.chunks(PARALLEL_STREAMS_TO_PROXY) { - let send_result = auto_connection.send(proxy_request_raw).await; + let forwarding_request = TpuForwardingRequest::new(tpu_target_address, target_tpu_identity, chunk.into()); + debug!("forwarding_request: {}", forwarding_request); - // let send_result = - // timeout(Duration::from_millis(3500), Self::send_proxy_request(endpoint, proxy_address, &proxy_request_raw)) - // .await.context("Timeout sending data to quic proxy")?; + let proxy_request_raw = bincode::serialize(&forwarding_request).expect("Expect to serialize transactions"); - match send_result { - Ok(()) => { - info!("Successfully sent data to quic proxy"); - } - Err(e) => { - bail!("Failed to send data to quic proxy: {:?}", e); + let send_result = auto_connection.send(proxy_request_raw).await; + + // let send_result = + // timeout(Duration::from_millis(3500), Self::send_proxy_request(endpoint, proxy_address, &proxy_request_raw)) + // .await.context("Timeout sending data to quic proxy")?; + + match send_result { + Ok(()) => { + debug!("Successfully sent {} txs to quic proxy", txs.len()); + } + Err(e) => { + bail!("Failed to send data to quic proxy: {:?}", e); + } } - } + + } // -- one chunk + Ok(()) } diff --git a/services/src/tpu_utils/quinn_auto_reconnect.rs b/services/src/tpu_utils/quinn_auto_reconnect.rs index 4d639fff..56ab85c9 100644 --- a/services/src/tpu_utils/quinn_auto_reconnect.rs +++ b/services/src/tpu_utils/quinn_auto_reconnect.rs @@ -40,8 +40,7 @@ impl AutoReconnect { // TOOD do smart error handling + reconnect let mut send_stream = self.refresh().await.open_uni().await?; send_stream.write_all(payload.as_slice()).await?; - send_stream.finish().await?; - + let _ = send_stream.finish().await; Ok(()) }