Skip to content

Commit

Permalink
Clean up code in restransmit_stage.rs (#4569)
Browse files Browse the repository at this point in the history
* clean up code in restransmit_stage.rs by eliminating a function that simply passed args through

---------

Co-authored-by: Alex Pyattaev <[email protected]>
  • Loading branch information
alexpyattaev and Alex Pyattaev authored Jan 24, 2025
1 parent a1d26ad commit ac663e2
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 84 deletions.
12 changes: 6 additions & 6 deletions turbine/benches/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use {
timing::timestamp,
},
solana_streamer::socket::SocketAddrSpace,
solana_turbine::retransmit_stage::retransmitter,
solana_turbine::retransmit_stage::RetransmitStage,
std::{
iter::repeat_with,
net::Ipv4Addr,
Expand Down Expand Up @@ -120,14 +120,14 @@ fn bench_retransmitter(bencher: &mut Bencher) {

let num_packets = data_shreds.len();

let retransmitter_handles = retransmitter(
Arc::new(sockets),
quic_endpoint_sender,
let retransmit_stage = RetransmitStage::new(
bank_forks,
leader_schedule_cache,
cluster_info,
Arc::new(sockets),
quic_endpoint_sender,
shreds_receiver,
Arc::default(), // solana_rpc::max_slots::MaxSlots
Arc::new(solana_rpc::max_slots::MaxSlots::default()),
None,
None,
);
Expand Down Expand Up @@ -183,5 +183,5 @@ fn bench_retransmitter(bencher: &mut Bencher) {
total.store(0, Ordering::Relaxed);
});

retransmitter_handles.join().unwrap();
retransmit_stage.join().unwrap();
}
141 changes: 63 additions & 78 deletions turbine/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//! The `retransmit_stage` retransmits shreds between validators
#![allow(clippy::rc_buffer)]
use {
crate::cluster_nodes::{self, ClusterNodes, ClusterNodesCache, Error, MAX_NUM_TURBINE_HOPS},
Expand Down Expand Up @@ -178,14 +177,16 @@ impl<const K: usize> ShredDeduper<K> {
}
}

// pull the shreds from the shreds_receiver until empty, then retransmit them.
// uses a thread_pool to parallelize work if there are enough shreds to justify that
#[allow(clippy::too_many_arguments)]
fn retransmit(
thread_pool: &ThreadPool,
bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &LeaderScheduleCache,
cluster_info: &ClusterInfo,
shreds_receiver: &Receiver<Vec</*shred:*/ Vec<u8>>>,
sockets: &[UdpSocket],
retransmit_receiver: &Receiver<Vec</*shred:*/ Vec<u8>>>,
retransmit_sockets: &[UdpSocket],
quic_endpoint_sender: &AsyncSender<(SocketAddr, Bytes)>,
stats: &mut RetransmitStats,
cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
Expand All @@ -195,11 +196,11 @@ fn retransmit(
slot_status_notifier: Option<&SlotStatusNotifier>,
) -> Result<(), RecvError> {
// wait for something on the channel
let mut shreds = shreds_receiver.recv()?;
let mut shreds = retransmit_receiver.recv()?;
// now the batch has started
let mut timer_start = Measure::start("retransmit");
// drain the channel until it is empty to form a batch
shreds.extend(shreds_receiver.try_iter().flatten());
shreds.extend(retransmit_receiver.try_iter().flatten());
stats.num_shreds += shreds.len();
stats.total_batches += 1;

Expand Down Expand Up @@ -262,7 +263,7 @@ fn retransmit(
shred_deduper,
&cache,
socket_addr_space,
&sockets[index % sockets.len()],
&retransmit_sockets[index % retransmit_sockets.len()],
quic_endpoint_sender,
stats,
)
Expand All @@ -280,7 +281,7 @@ fn retransmit(
shred_deduper,
&cache,
socket_addr_space,
&sockets[index % sockets.len()],
&retransmit_sockets[index % retransmit_sockets.len()],
quic_endpoint_sender,
stats,
)
Expand All @@ -301,6 +302,7 @@ fn retransmit(
Ok(())
}

// Retransmit a single shred to all downstream nodes
fn retransmit_shred(
shred: Vec<u8>,
root_bank: &Bank,
Expand All @@ -313,7 +315,7 @@ fn retransmit_shred(
) -> Option<(
Slot, // Shred slot.
usize, // This node's distance from the turbine root.
usize, // Number of nodes the shred is retransmitted to.
usize, // Number of nodes the shred was retransmitted to.
)> {
let key = shred::layout::get_shred_id(&shred)?;
let (slot_leader, cluster_nodes) = cache.get(&key.slot())?;
Expand Down Expand Up @@ -369,70 +371,21 @@ fn retransmit_shred(
Some((key.slot(), root_distance, num_nodes))
}

/// Service to retransmit messages from the leader or layer 1 to relevant peer nodes.
/// See `cluster_info` for network layer definitions.
/// # Arguments
/// * `sockets` - Sockets to read from.
/// * `bank_forks` - The BankForks structure
/// * `leader_schedule_cache` - The leader schedule to verify shreds
/// * `cluster_info` - This structure needs to be updated and populated by the bank and via gossip.
/// * `r` - Receive channel for shreds to be retransmitted to all the layer 1 nodes.
pub fn retransmitter(
sockets: Arc<Vec<UdpSocket>>,
quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>,
bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
cluster_info: Arc<ClusterInfo>,
shreds_receiver: Receiver<Vec</*shred:*/ Vec<u8>>>,
max_slots: Arc<MaxSlots>,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
slot_status_notifier: Option<SlotStatusNotifier>,
) -> JoinHandle<()> {
let cluster_nodes_cache = ClusterNodesCache::<RetransmitStage>::new(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL,
);
let mut rng = rand::thread_rng();
let mut shred_deduper = ShredDeduper::new(&mut rng, DEDUPER_NUM_BITS);

let mut stats = RetransmitStats::new(Instant::now());

#[allow(clippy::manual_clamp)]
let num_threads = get_thread_count().min(8).max(sockets.len());
let thread_pool = ThreadPoolBuilder::new()
.num_threads(num_threads)
.thread_name(|i| format!("solRetransmit{i:02}"))
.build()
.unwrap();
Builder::new()
.name("solRetransmittr".to_string())
.spawn(move || {
while retransmit(
&thread_pool,
&bank_forks,
&leader_schedule_cache,
&cluster_info,
&shreds_receiver,
&sockets,
&quic_endpoint_sender,
&mut stats,
&cluster_nodes_cache,
&mut shred_deduper,
&max_slots,
rpc_subscriptions.as_deref(),
slot_status_notifier.as_ref(),
)
.is_ok()
{}
})
.unwrap()
}

/// Service to retransmit messages received from other peers in turbine.
pub struct RetransmitStage {
retransmit_thread_handle: JoinHandle<()>,
}

impl RetransmitStage {
/// Construct the RetransmitStage.
///
/// Key arguments:
/// * `retransmit_sockets` - Sockets to use for transmission of shreds
/// * `max_slots` - Structure to keep track of the Turbine progress
/// * `bank_forks` - Reference to the BankForks structure
/// * `leader_schedule_cache` - The leader schedule to verify shreds
/// * `cluster_info` - This structure needs to be updated and populated by the bank and via gossip.
/// * `retransmit_receiver` - Receive channel for batches of shreds to be retransmitted.
pub fn new(
bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
Expand All @@ -444,18 +397,50 @@ impl RetransmitStage {
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
slot_status_notifier: Option<SlotStatusNotifier>,
) -> Self {
let retransmit_thread_handle = retransmitter(
retransmit_sockets,
quic_endpoint_sender,
bank_forks,
leader_schedule_cache,
cluster_info,
retransmit_receiver,
max_slots,
rpc_subscriptions,
slot_status_notifier,
let cluster_nodes_cache = ClusterNodesCache::<RetransmitStage>::new(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL,
);

let mut stats = RetransmitStats::new(Instant::now());

let mut rng = rand::thread_rng();
let mut shred_deduper = ShredDeduper::new(&mut rng, DEDUPER_NUM_BITS);

let thread_pool = {
// Using clamp will panic if less than 8 sockets are provided
#[allow(clippy::manual_clamp)]
let num_threads = get_thread_count().min(8).max(retransmit_sockets.len());
ThreadPoolBuilder::new()
.num_threads(num_threads)
.thread_name(|i| format!("solRetransmit{i:02}"))
.build()
.unwrap()
};

let retransmit_thread_handle = Builder::new()
.name("solRetransmittr".to_string())
.spawn(move || {
while retransmit(
&thread_pool,
&bank_forks,
&leader_schedule_cache,
&cluster_info,
&retransmit_receiver,
&retransmit_sockets,
&quic_endpoint_sender,
&mut stats,
&cluster_nodes_cache,
&mut shred_deduper,
&max_slots,
rpc_subscriptions.as_deref(),
slot_status_notifier.as_ref(),
)
.is_ok()
{}
})
.unwrap();

Self {
retransmit_thread_handle,
}
Expand Down Expand Up @@ -505,7 +490,7 @@ impl RetransmitStats {
epoch_cache_update: 0u64,
retransmit_total: AtomicU64::default(),
compute_turbine_peers_total: AtomicU64::default(),
// Cache capacity is manually enforced.
// Cache capacity is manually enforced by `SLOT_STATS_CACHE_CAPACITY`
slot_stats: LruCache::<Slot, RetransmitSlotStats>::unbounded(),
unknown_shred_slot_leader: 0usize,
}
Expand Down

0 comments on commit ac663e2

Please sign in to comment.