Skip to content

Commit

Permalink
make QUIC tpu QOS parameters configurable (#4170)
Browse files Browse the repository at this point in the history
* make QUIC tpu QOS parameters configurable

* Use max_connections_per_ipaddr_per_min

* set max_unstaked_connections to 0 for tpu-fwd and vote in testing

* fixed some clippy complaint

* missing max-streams-per-ms

* missing tpu_max_streams_per_ms

* vote does not accept unstaked connections

* Addressed some feedback from Alessandro

* re-export some constants moved/renamed and mark them deprecated

* removed duplicated code definition, use 'use'
  • Loading branch information
lijunwangs authored Jan 15, 2025
1 parent 1a18c26 commit 950d496
Show file tree
Hide file tree
Showing 14 changed files with 259 additions and 151 deletions.
2 changes: 1 addition & 1 deletion bench-tps/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use {
pubkey::Pubkey,
signature::{read_keypair_file, Keypair},
},
solana_streamer::nonblocking::quic::DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
solana_streamer::quic::DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC},
std::{
net::{IpAddr, Ipv4Addr},
Expand Down
45 changes: 13 additions & 32 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
//! multi-stage transaction processing pipeline in software.
pub use solana_sdk::net::DEFAULT_TPU_COALESCE;
// allow multiple connections for NAT and any open/close overlap
#[deprecated(
since = "2.2.0",
note = "Use solana_streamer::quic::DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER instead"
)]
pub use solana_streamer::quic::DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER as MAX_QUIC_CONNECTIONS_PER_PEER;
use {
crate::{
banking_stage::BankingStage,
Expand Down Expand Up @@ -37,10 +43,7 @@ use {
},
solana_sdk::{clock::Slot, pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair},
solana_streamer::{
quic::{
spawn_server_multi, QuicServerParams, SpawnServerResult, MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
},
quic::{spawn_server_multi, QuicServerParams, SpawnServerResult},
streamer::StakedNodes,
},
solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType},
Expand All @@ -54,9 +57,6 @@ use {
tokio::sync::mpsc::Sender as AsyncSender,
};

// allow multiple connections for NAT and any open/close overlap
pub const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8;

pub struct TpuSockets {
pub transactions: Vec<UdpSocket>,
pub transaction_forwards: Vec<UdpSocket>,
Expand Down Expand Up @@ -115,7 +115,9 @@ impl Tpu {
banking_tracer: Arc<BankingTracer>,
tracer_thread_hdl: TracerThread,
tpu_enable_udp: bool,
tpu_max_connections_per_ipaddr_per_minute: u64,
tpu_quic_server_config: QuicServerParams,
tpu_fwd_quic_server_config: QuicServerParams,
vote_quic_server_config: QuicServerParams,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
block_production_method: BlockProductionMethod,
transaction_struct: TransactionStructure,
Expand Down Expand Up @@ -179,15 +181,7 @@ impl Tpu {
vote_packet_sender.clone(),
exit.clone(),
staked_nodes.clone(),
QuicServerParams {
max_connections_per_peer: 1,
max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
coalesce: tpu_coalesce,
max_staked_connections: MAX_STAKED_CONNECTIONS
.saturating_add(MAX_UNSTAKED_CONNECTIONS),
max_unstaked_connections: 0,
..QuicServerParams::default()
},
vote_quic_server_config,
)
.unwrap();

Expand All @@ -204,12 +198,7 @@ impl Tpu {
packet_sender,
exit.clone(),
staked_nodes.clone(),
QuicServerParams {
max_connections_per_peer: MAX_QUIC_CONNECTIONS_PER_PEER,
max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
coalesce: tpu_coalesce,
..QuicServerParams::default()
},
tpu_quic_server_config,
)
.unwrap();

Expand All @@ -226,15 +215,7 @@ impl Tpu {
forwarded_packet_sender,
exit.clone(),
staked_nodes.clone(),
QuicServerParams {
max_connections_per_peer: MAX_QUIC_CONNECTIONS_PER_PEER,
max_staked_connections: MAX_STAKED_CONNECTIONS
.saturating_add(MAX_UNSTAKED_CONNECTIONS),
max_unstaked_connections: 0, // Prevent unstaked nodes from forwarding transactions
max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute,
coalesce: tpu_coalesce,
..QuicServerParams::default()
},
tpu_fwd_quic_server_config,
)
.unwrap();

Expand Down
72 changes: 49 additions & 23 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ use {
timing::timestamp,
},
solana_send_transaction_service::send_transaction_service,
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
solana_streamer::{quic::QuicServerParams, socket::SocketAddrSpace, streamer::StakedNodes},
solana_tpu_client::tpu_client::{
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, DEFAULT_VOTE_USE_QUIC,
},
solana_turbine::{self, broadcast_stage::BroadcastStageType},
solana_unified_scheduler_pool::DefaultSchedulerPool,
solana_vote_program::vote_state,
Expand Down Expand Up @@ -510,8 +513,42 @@ pub struct ValidatorTpuConfig {
pub tpu_connection_pool_size: usize,
/// Controls if to enable UDP for TPU tansactions.
pub tpu_enable_udp: bool,
/// Controls the new maximum connections per IpAddr per minute
pub tpu_max_connections_per_ipaddr_per_minute: u64,
/// QUIC server config for regular TPU
pub tpu_quic_server_config: QuicServerParams,
/// QUIC server config for TPU forward
pub tpu_fwd_quic_server_config: QuicServerParams,
/// QUIC server config for Vote
pub vote_quic_server_config: QuicServerParams,
}

impl ValidatorTpuConfig {
/// A convenient function to build a ValidatorTpuConfig for testing with good
/// default.
pub fn new_for_tests(tpu_enable_udp: bool) -> Self {
let tpu_quic_server_config = QuicServerParams {
max_connections_per_ipaddr_per_min: 32,
..Default::default()
};

let tpu_fwd_quic_server_config = QuicServerParams {
max_connections_per_ipaddr_per_min: 32,
max_unstaked_connections: 0,
..Default::default()
};

// vote and tpu_fwd share the same characteristics -- disallow non-staked connections:
let vote_quic_server_config = tpu_fwd_quic_server_config.clone();

ValidatorTpuConfig {
use_quic: DEFAULT_TPU_USE_QUIC,
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
tpu_enable_udp,
tpu_quic_server_config,
tpu_fwd_quic_server_config,
vote_quic_server_config,
}
}
}

pub struct Validator {
Expand Down Expand Up @@ -573,7 +610,9 @@ impl Validator {
vote_use_quic,
tpu_connection_pool_size,
tpu_enable_udp,
tpu_max_connections_per_ipaddr_per_minute,
tpu_quic_server_config,
tpu_fwd_quic_server_config,
vote_quic_server_config,
} = tpu_config;

let start_time = Instant::now();
Expand Down Expand Up @@ -1548,7 +1587,9 @@ impl Validator {
banking_tracer,
tracer_thread,
tpu_enable_udp,
tpu_max_connections_per_ipaddr_per_minute,
tpu_quic_server_config,
tpu_fwd_quic_server_config,
vote_quic_server_config,
&prioritization_fee_cache,
config.block_production_method.clone(),
config.transaction_struct.clone(),
Expand Down Expand Up @@ -2751,10 +2792,7 @@ mod tests {
get_tmp_ledger_path_auto_delete,
},
solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig},
solana_tpu_client::tpu_client::{
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC,
DEFAULT_VOTE_USE_QUIC,
},
solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
std::{fs::remove_dir_all, thread, time::Duration},
};

Expand Down Expand Up @@ -2792,13 +2830,7 @@ mod tests {
None, // rpc_to_plugin_manager_receiver
start_progress.clone(),
SocketAddrSpace::Unspecified,
ValidatorTpuConfig {
use_quic: DEFAULT_TPU_USE_QUIC,
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP,
tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute for test
},
ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down Expand Up @@ -3014,13 +3046,7 @@ mod tests {
None, // rpc_to_plugin_manager_receiver
Arc::new(RwLock::new(ValidatorStartProgress::default())),
SocketAddrSpace::Unspecified,
ValidatorTpuConfig {
use_quic: DEFAULT_TPU_USE_QUIC,
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP,
tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute for test
},
ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start")
Expand Down
28 changes: 5 additions & 23 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,15 +351,9 @@ impl LocalCluster {
None, // rpc_to_plugin_manager_receiver
Arc::new(RwLock::new(ValidatorStartProgress::default())),
socket_addr_space,
ValidatorTpuConfig {
use_quic: DEFAULT_TPU_USE_QUIC,
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
// We are turning tpu_enable_udp to true in order to prevent concurrent local cluster tests
// to use the same QUIC ports due to SO_REUSEPORT.
tpu_enable_udp: true,
tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute
},
// We are turning tpu_enable_udp to true in order to prevent concurrent local cluster tests
// to use the same QUIC ports due to SO_REUSEPORT.
ValidatorTpuConfig::new_for_tests(true),
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down Expand Up @@ -568,13 +562,7 @@ impl LocalCluster {
None, // rpc_to_plugin_manager_receiver
Arc::new(RwLock::new(ValidatorStartProgress::default())),
socket_addr_space,
ValidatorTpuConfig {
use_quic: DEFAULT_TPU_USE_QUIC,
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP,
tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per mintute
},
ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down Expand Up @@ -1100,13 +1088,7 @@ impl Cluster for LocalCluster {
None, // rpc_to_plugin_manager_receiver
Arc::new(RwLock::new(ValidatorStartProgress::default())),
socket_addr_space,
ValidatorTpuConfig {
use_quic: DEFAULT_TPU_USE_QUIC,
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP,
tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute, use higher value because of tests
},
ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down
15 changes: 11 additions & 4 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,20 @@ const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many";
const CONNECTION_CLOSE_CODE_INVALID_STREAM: u32 = 5;
const CONNECTION_CLOSE_REASON_INVALID_STREAM: &[u8] = b"invalid_stream";

/// Limit to 250K PPS
pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 250;

/// The new connections per minute from a particular IP address.
/// Heuristically set to the default maximum concurrent connections
/// per IP address. Might be adjusted later.
pub const DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE: u64 = 8;
#[deprecated(
since = "2.2.0",
note = "Use solana_streamer::quic::DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE"
)]
pub use crate::quic::DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE;
/// Limit to 250K PPS
#[deprecated(
since = "2.2.0",
note = "Use solana_streamer::quic::DEFAULT_MAX_STREAMS_PER_MS"
)]
pub use crate::quic::DEFAULT_MAX_STREAMS_PER_MS;

/// Total new connection counts per second. Heuristically taken from
/// the default staked and unstaked connection limits. Might be adjusted
Expand Down
16 changes: 7 additions & 9 deletions streamer/src/nonblocking/stream_throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,8 @@ pub mod test {
use {
super::*,
crate::{
nonblocking::{
quic::DEFAULT_MAX_STREAMS_PER_MS, stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS,
},
quic::{StreamerStats, MAX_UNSTAKED_CONNECTIONS},
nonblocking::stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS,
quic::{StreamerStats, DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_MAX_UNSTAKED_CONNECTIONS},
},
std::{
sync::{atomic::Ordering, Arc},
Expand All @@ -251,7 +249,7 @@ pub mod test {
fn test_max_streams_for_unstaked_connection() {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamerStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
// 25K packets per ms * 20% / 500 max unstaked connections
Expand All @@ -268,7 +266,7 @@ pub mod test {
fn test_max_streams_for_staked_connection() {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamerStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));

Expand Down Expand Up @@ -448,7 +446,7 @@ pub mod test {
fn test_update_ema() {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamerStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
stream_load_ema
Expand Down Expand Up @@ -477,7 +475,7 @@ pub mod test {
fn test_update_ema_missing_interval() {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamerStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
stream_load_ema
Expand All @@ -497,7 +495,7 @@ pub mod test {
fn test_update_ema_if_needed() {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamerStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
stream_load_ema
Expand Down
10 changes: 5 additions & 5 deletions streamer/src/nonblocking/testing_utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
use {
super::quic::{
spawn_server_multi, SpawnNonBlockingServerResult, ALPN_TPU_PROTOCOL_ID,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
},
crate::{
quic::{
QuicServerParams, StreamerStats, DEFAULT_TPU_COALESCE, MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
QuicServerParams, StreamerStats, DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_MAX_STAKED_CONNECTIONS, DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_UNSTAKED_CONNECTIONS, DEFAULT_TPU_COALESCE,
},
streamer::StakedNodes,
},
Expand Down Expand Up @@ -64,8 +64,8 @@ impl Default for TestServerConfig {
fn default() -> Self {
Self {
max_connections_per_peer: 1,
max_staked_connections: MAX_STAKED_CONNECTIONS,
max_unstaked_connections: MAX_UNSTAKED_CONNECTIONS,
max_staked_connections: DEFAULT_MAX_STAKED_CONNECTIONS,
max_unstaked_connections: DEFAULT_MAX_UNSTAKED_CONNECTIONS,
max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS,
max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
}
Expand Down
Loading

0 comments on commit 950d496

Please sign in to comment.