From ac663e21413ac8c10eddc99d67ad8d13b6fea93a Mon Sep 17 00:00:00 2001 From: Alex Pyattaev Date: Sat, 25 Jan 2025 00:30:01 +0200 Subject: [PATCH] Clean up code in restransmit_stage.rs (#4569) * clean up code in restransmit_stage.rs by eliminating a function that simply passed args through --------- Co-authored-by: Alex Pyattaev --- turbine/benches/retransmit_stage.rs | 12 +-- turbine/src/retransmit_stage.rs | 141 +++++++++++++--------------- 2 files changed, 69 insertions(+), 84 deletions(-) diff --git a/turbine/benches/retransmit_stage.rs b/turbine/benches/retransmit_stage.rs index dcb6ee8ba1b2ef..ff7b703360e2e0 100644 --- a/turbine/benches/retransmit_stage.rs +++ b/turbine/benches/retransmit_stage.rs @@ -28,7 +28,7 @@ use { timing::timestamp, }, solana_streamer::socket::SocketAddrSpace, - solana_turbine::retransmit_stage::retransmitter, + solana_turbine::retransmit_stage::RetransmitStage, std::{ iter::repeat_with, net::Ipv4Addr, @@ -120,14 +120,14 @@ fn bench_retransmitter(bencher: &mut Bencher) { let num_packets = data_shreds.len(); - let retransmitter_handles = retransmitter( - Arc::new(sockets), - quic_endpoint_sender, + let retransmit_stage = RetransmitStage::new( bank_forks, leader_schedule_cache, cluster_info, + Arc::new(sockets), + quic_endpoint_sender, shreds_receiver, - Arc::default(), // solana_rpc::max_slots::MaxSlots + Arc::new(solana_rpc::max_slots::MaxSlots::default()), None, None, ); @@ -183,5 +183,5 @@ fn bench_retransmitter(bencher: &mut Bencher) { total.store(0, Ordering::Relaxed); }); - retransmitter_handles.join().unwrap(); + retransmit_stage.join().unwrap(); } diff --git a/turbine/src/retransmit_stage.rs b/turbine/src/retransmit_stage.rs index 3fc2f19319b355..f58e2a0114bc4d 100644 --- a/turbine/src/retransmit_stage.rs +++ b/turbine/src/retransmit_stage.rs @@ -1,5 +1,4 @@ //! The `retransmit_stage` retransmits shreds between validators -#![allow(clippy::rc_buffer)] use { crate::cluster_nodes::{self, ClusterNodes, ClusterNodesCache, Error, MAX_NUM_TURBINE_HOPS}, @@ -178,14 +177,16 @@ impl ShredDeduper { } } +// pull the shreds from the shreds_receiver until empty, then retransmit them. +// uses a thread_pool to parallelize work if there are enough shreds to justify that #[allow(clippy::too_many_arguments)] fn retransmit( thread_pool: &ThreadPool, bank_forks: &RwLock, leader_schedule_cache: &LeaderScheduleCache, cluster_info: &ClusterInfo, - shreds_receiver: &Receiver>>, - sockets: &[UdpSocket], + retransmit_receiver: &Receiver>>, + retransmit_sockets: &[UdpSocket], quic_endpoint_sender: &AsyncSender<(SocketAddr, Bytes)>, stats: &mut RetransmitStats, cluster_nodes_cache: &ClusterNodesCache, @@ -195,11 +196,11 @@ fn retransmit( slot_status_notifier: Option<&SlotStatusNotifier>, ) -> Result<(), RecvError> { // wait for something on the channel - let mut shreds = shreds_receiver.recv()?; + let mut shreds = retransmit_receiver.recv()?; // now the batch has started let mut timer_start = Measure::start("retransmit"); // drain the channel until it is empty to form a batch - shreds.extend(shreds_receiver.try_iter().flatten()); + shreds.extend(retransmit_receiver.try_iter().flatten()); stats.num_shreds += shreds.len(); stats.total_batches += 1; @@ -262,7 +263,7 @@ fn retransmit( shred_deduper, &cache, socket_addr_space, - &sockets[index % sockets.len()], + &retransmit_sockets[index % retransmit_sockets.len()], quic_endpoint_sender, stats, ) @@ -280,7 +281,7 @@ fn retransmit( shred_deduper, &cache, socket_addr_space, - &sockets[index % sockets.len()], + &retransmit_sockets[index % retransmit_sockets.len()], quic_endpoint_sender, stats, ) @@ -301,6 +302,7 @@ fn retransmit( Ok(()) } +// Retransmit a single shred to all downstream nodes fn retransmit_shred( shred: Vec, root_bank: &Bank, @@ -313,7 +315,7 @@ fn retransmit_shred( ) -> Option<( Slot, // Shred slot. usize, // This node's distance from the turbine root. - usize, // Number of nodes the shred is retransmitted to. + usize, // Number of nodes the shred was retransmitted to. )> { let key = shred::layout::get_shred_id(&shred)?; let (slot_leader, cluster_nodes) = cache.get(&key.slot())?; @@ -369,70 +371,21 @@ fn retransmit_shred( Some((key.slot(), root_distance, num_nodes)) } -/// Service to retransmit messages from the leader or layer 1 to relevant peer nodes. -/// See `cluster_info` for network layer definitions. -/// # Arguments -/// * `sockets` - Sockets to read from. -/// * `bank_forks` - The BankForks structure -/// * `leader_schedule_cache` - The leader schedule to verify shreds -/// * `cluster_info` - This structure needs to be updated and populated by the bank and via gossip. -/// * `r` - Receive channel for shreds to be retransmitted to all the layer 1 nodes. -pub fn retransmitter( - sockets: Arc>, - quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>, - bank_forks: Arc>, - leader_schedule_cache: Arc, - cluster_info: Arc, - shreds_receiver: Receiver>>, - max_slots: Arc, - rpc_subscriptions: Option>, - slot_status_notifier: Option, -) -> JoinHandle<()> { - let cluster_nodes_cache = ClusterNodesCache::::new( - CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, - CLUSTER_NODES_CACHE_TTL, - ); - let mut rng = rand::thread_rng(); - let mut shred_deduper = ShredDeduper::new(&mut rng, DEDUPER_NUM_BITS); - - let mut stats = RetransmitStats::new(Instant::now()); - - #[allow(clippy::manual_clamp)] - let num_threads = get_thread_count().min(8).max(sockets.len()); - let thread_pool = ThreadPoolBuilder::new() - .num_threads(num_threads) - .thread_name(|i| format!("solRetransmit{i:02}")) - .build() - .unwrap(); - Builder::new() - .name("solRetransmittr".to_string()) - .spawn(move || { - while retransmit( - &thread_pool, - &bank_forks, - &leader_schedule_cache, - &cluster_info, - &shreds_receiver, - &sockets, - &quic_endpoint_sender, - &mut stats, - &cluster_nodes_cache, - &mut shred_deduper, - &max_slots, - rpc_subscriptions.as_deref(), - slot_status_notifier.as_ref(), - ) - .is_ok() - {} - }) - .unwrap() -} - +/// Service to retransmit messages received from other peers in turbine. pub struct RetransmitStage { retransmit_thread_handle: JoinHandle<()>, } impl RetransmitStage { + /// Construct the RetransmitStage. + /// + /// Key arguments: + /// * `retransmit_sockets` - Sockets to use for transmission of shreds + /// * `max_slots` - Structure to keep track of the Turbine progress + /// * `bank_forks` - Reference to the BankForks structure + /// * `leader_schedule_cache` - The leader schedule to verify shreds + /// * `cluster_info` - This structure needs to be updated and populated by the bank and via gossip. + /// * `retransmit_receiver` - Receive channel for batches of shreds to be retransmitted. pub fn new( bank_forks: Arc>, leader_schedule_cache: Arc, @@ -444,18 +397,50 @@ impl RetransmitStage { rpc_subscriptions: Option>, slot_status_notifier: Option, ) -> Self { - let retransmit_thread_handle = retransmitter( - retransmit_sockets, - quic_endpoint_sender, - bank_forks, - leader_schedule_cache, - cluster_info, - retransmit_receiver, - max_slots, - rpc_subscriptions, - slot_status_notifier, + let cluster_nodes_cache = ClusterNodesCache::::new( + CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, + CLUSTER_NODES_CACHE_TTL, ); + let mut stats = RetransmitStats::new(Instant::now()); + + let mut rng = rand::thread_rng(); + let mut shred_deduper = ShredDeduper::new(&mut rng, DEDUPER_NUM_BITS); + + let thread_pool = { + // Using clamp will panic if less than 8 sockets are provided + #[allow(clippy::manual_clamp)] + let num_threads = get_thread_count().min(8).max(retransmit_sockets.len()); + ThreadPoolBuilder::new() + .num_threads(num_threads) + .thread_name(|i| format!("solRetransmit{i:02}")) + .build() + .unwrap() + }; + + let retransmit_thread_handle = Builder::new() + .name("solRetransmittr".to_string()) + .spawn(move || { + while retransmit( + &thread_pool, + &bank_forks, + &leader_schedule_cache, + &cluster_info, + &retransmit_receiver, + &retransmit_sockets, + &quic_endpoint_sender, + &mut stats, + &cluster_nodes_cache, + &mut shred_deduper, + &max_slots, + rpc_subscriptions.as_deref(), + slot_status_notifier.as_ref(), + ) + .is_ok() + {} + }) + .unwrap(); + Self { retransmit_thread_handle, } @@ -505,7 +490,7 @@ impl RetransmitStats { epoch_cache_update: 0u64, retransmit_total: AtomicU64::default(), compute_turbine_peers_total: AtomicU64::default(), - // Cache capacity is manually enforced. + // Cache capacity is manually enforced by `SLOT_STATS_CACHE_CAPACITY` slot_stats: LruCache::::unbounded(), unknown_shred_slot_leader: 0usize, }