From 78f8f016f19e71b0ab60f3db1bae524e1d069f4b Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 15 Jan 2025 09:37:52 -0600 Subject: [PATCH 01/21] ForwardingStage basic skeleton --- core/src/forwarding_stage.rs | 90 ++++++++++++++++++++++++++++++++++++ core/src/lib.rs | 1 + 2 files changed, 91 insertions(+) create mode 100644 core/src/forwarding_stage.rs diff --git a/core/src/forwarding_stage.rs b/core/src/forwarding_stage.rs new file mode 100644 index 00000000000000..cdbe51605c1236 --- /dev/null +++ b/core/src/forwarding_stage.rs @@ -0,0 +1,90 @@ +use { + agave_banking_stage_ingress_types::BankingPacketBatch, + crossbeam_channel::Receiver, + min_max_heap::MinMaxHeap, + slab::Slab, + solana_client::connection_cache::ConnectionCache, + solana_net_utils::bind_to_unspecified, + solana_perf::{data_budget::DataBudget, packet::Packet}, + std::{ + net::UdpSocket, + sync::Arc, + thread::{Builder, JoinHandle}, + }, +}; + +pub struct ForwardingStage { + receiver: Receiver<(BankingPacketBatch, bool)>, + vote_packet_container: PacketContainer, + non_vote_packet_container: PacketContainer, + + connection_cache: Arc, + data_budget: DataBudget, + udp_socket: UdpSocket, +} + +impl ForwardingStage { + pub fn spawn( + receiver: Receiver<(BankingPacketBatch, bool)>, + connection_cache: Arc, + ) -> JoinHandle<()> { + let forwarding_stage = Self::new(receiver, connection_cache); + Builder::new() + .name("solFwdStage".to_string()) + .spawn(move || forwarding_stage.run()) + .unwrap() + } + + fn new( + receiver: Receiver<(BankingPacketBatch, bool)>, + connection_cache: Arc, + ) -> Self { + Self { + receiver, + vote_packet_container: PacketContainer::with_capacity(4096), + non_vote_packet_container: PacketContainer::with_capacity(4 * 4096), + connection_cache, + data_budget: DataBudget::default(), + udp_socket: bind_to_unspecified().unwrap(), + } + } + + fn run(mut self) { + loop { + if !self.receive_and_buffer() { + break; + } + self.forward_buffered_packets(); + } + } + + fn receive_and_buffer(&mut self) -> bool { + todo!() + } + + fn forward_buffered_packets(&mut self) { + todo!() + } +} + +struct PacketContainer { + priority_queue: MinMaxHeap, + packets: Slab, +} + +impl PacketContainer { + fn with_capacity(capacity: usize) -> Self { + Self { + priority_queue: MinMaxHeap::with_capacity(capacity), + packets: Slab::with_capacity(capacity), + } + } +} + +type Index = u16; + +#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] +struct PriorityIndex { + priority: u64, + index: u16, +} diff --git a/core/src/lib.rs b/core/src/lib.rs index c40eebdc09c4db..9ce541f18646c6 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -21,6 +21,7 @@ pub mod consensus; pub mod cost_update_service; pub mod drop_bank_service; pub mod fetch_stage; +pub mod forwarding_stage; pub mod gen_keys; pub mod next_leader; pub mod optimistic_confirmation_verifier; From fec3c40dba2d7f9bc22dd2c1c6c7f0958fe7714d Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 15 Jan 2025 09:46:03 -0600 Subject: [PATCH 02/21] receive_and_buffer --- core/src/forwarding_stage.rs | 37 ++++++++++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/core/src/forwarding_stage.rs b/core/src/forwarding_stage.rs index cdbe51605c1236..89d51a79bcb043 100644 --- a/core/src/forwarding_stage.rs +++ b/core/src/forwarding_stage.rs @@ -1,6 +1,6 @@ use { agave_banking_stage_ingress_types::BankingPacketBatch, - crossbeam_channel::Receiver, + crossbeam_channel::{Receiver, RecvTimeoutError}, min_max_heap::MinMaxHeap, slab::Slab, solana_client::connection_cache::ConnectionCache, @@ -10,6 +10,7 @@ use { net::UdpSocket, sync::Arc, thread::{Builder, JoinHandle}, + time::{Duration, Instant}, }, }; @@ -59,9 +60,41 @@ impl ForwardingStage { } fn receive_and_buffer(&mut self) -> bool { - todo!() + let now = Instant::now(); + const TIMEOUT: Duration = Duration::from_millis(10); + match self.receiver.recv_timeout(TIMEOUT) { + Ok((packet_batches, tpu_vote_batch)) => { + self.buffer_packet_batches(packet_batches, tpu_vote_batch); + + // Drain the channel up to timeout + let timed_out = loop { + if now.elapsed() >= TIMEOUT { + break true; + } + match self.receiver.try_recv() { + Ok((packet_batches, tpu_vote_batch)) => { + self.buffer_packet_batches(packet_batches, tpu_vote_batch) + } + Err(_) => break false, + } + }; + + // If timeout waas reached, prevent backup by draining all + // packets in the channel. + if timed_out { + warn!("ForwardingStage is backed up, dropping packets"); + while self.receiver.try_recv().is_ok() {} + } + + true + } + Err(RecvTimeoutError::Timeout) => true, + Err(RecvTimeoutError::Disconnected) => false, + } } + fn buffer_packet_batches(&mut self, packet_batches: BankingPacketBatch, tpu_vote_batch: bool) {} + fn forward_buffered_packets(&mut self) { todo!() } From c263cdc345096dbc9874ca0b76f66eaabf2f5de2 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 15 Jan 2025 15:50:32 -0600 Subject: [PATCH 03/21] buffer_packet_batches --- core/src/forwarding_stage.rs | 157 +++++++++++++++++++++++++++++++++-- 1 file changed, 148 insertions(+), 9 deletions(-) diff --git a/core/src/forwarding_stage.rs b/core/src/forwarding_stage.rs index 89d51a79bcb043..3afdfc5becf4a6 100644 --- a/core/src/forwarding_stage.rs +++ b/core/src/forwarding_stage.rs @@ -1,11 +1,18 @@ use { agave_banking_stage_ingress_types::BankingPacketBatch, + agave_transaction_view::transaction_view::SanitizedTransactionView, crossbeam_channel::{Receiver, RecvTimeoutError}, min_max_heap::MinMaxHeap, slab::Slab, solana_client::connection_cache::ConnectionCache, + solana_cost_model::cost_model::CostModel, solana_net_utils::bind_to_unspecified, solana_perf::{data_budget::DataBudget, packet::Packet}, + solana_runtime::{bank::Bank, root_bank_cache::RootBankCache}, + solana_runtime_transaction::{ + runtime_transaction::RuntimeTransaction, transaction_meta::StaticMeta, + }, + solana_sdk::fee::FeeBudgetLimits, std::{ net::UdpSocket, sync::Arc, @@ -19,6 +26,7 @@ pub struct ForwardingStage { vote_packet_container: PacketContainer, non_vote_packet_container: PacketContainer, + root_bank_cache: RootBankCache, connection_cache: Arc, data_budget: DataBudget, udp_socket: UdpSocket, @@ -28,8 +36,9 @@ impl ForwardingStage { pub fn spawn( receiver: Receiver<(BankingPacketBatch, bool)>, connection_cache: Arc, + root_bank_cache: RootBankCache, ) -> JoinHandle<()> { - let forwarding_stage = Self::new(receiver, connection_cache); + let forwarding_stage = Self::new(receiver, connection_cache, root_bank_cache); Builder::new() .name("solFwdStage".to_string()) .spawn(move || forwarding_stage.run()) @@ -39,11 +48,13 @@ impl ForwardingStage { fn new( receiver: Receiver<(BankingPacketBatch, bool)>, connection_cache: Arc, + root_bank_cache: RootBankCache, ) -> Self { Self { receiver, vote_packet_container: PacketContainer::with_capacity(4096), non_vote_packet_container: PacketContainer::with_capacity(4 * 4096), + root_bank_cache, connection_cache, data_budget: DataBudget::default(), udp_socket: bind_to_unspecified().unwrap(), @@ -52,19 +63,20 @@ impl ForwardingStage { fn run(mut self) { loop { - if !self.receive_and_buffer() { + let root_bank = self.root_bank_cache.root_bank(); + if !self.receive_and_buffer(&root_bank) { break; } self.forward_buffered_packets(); } } - fn receive_and_buffer(&mut self) -> bool { + fn receive_and_buffer(&mut self, bank: &Bank) -> bool { let now = Instant::now(); const TIMEOUT: Duration = Duration::from_millis(10); match self.receiver.recv_timeout(TIMEOUT) { Ok((packet_batches, tpu_vote_batch)) => { - self.buffer_packet_batches(packet_batches, tpu_vote_batch); + self.buffer_packet_batches(packet_batches, tpu_vote_batch, bank); // Drain the channel up to timeout let timed_out = loop { @@ -73,7 +85,7 @@ impl ForwardingStage { } match self.receiver.try_recv() { Ok((packet_batches, tpu_vote_batch)) => { - self.buffer_packet_batches(packet_batches, tpu_vote_batch) + self.buffer_packet_batches(packet_batches, tpu_vote_batch, bank) } Err(_) => break false, } @@ -93,11 +105,87 @@ impl ForwardingStage { } } - fn buffer_packet_batches(&mut self, packet_batches: BankingPacketBatch, tpu_vote_batch: bool) {} + fn buffer_packet_batches( + &mut self, + packet_batches: BankingPacketBatch, + tpu_vote_batch: bool, + bank: &Bank, + ) { + let packet_container = if tpu_vote_batch { + &mut self.vote_packet_container + } else { + &mut self.non_vote_packet_container + }; + + for batch in packet_batches.iter() { + for packet in batch.iter().filter(|p| Self::initial_packet_meta_filter(p)) { + let Some(packet_data) = packet.data(..) else { + // should never occur since we've already checked the + // packet is not marked for discard. + continue; + }; + + // Parse the transaction, make sure it passes basic sanitization checks. + let Ok(transaction) = SanitizedTransactionView::try_new_sanitized(packet_data) + else { + continue; + }; + + // Calculate static metadata for the transaction so that we + // are able to calculate fees for prioritization. + let Ok(transaction) = RuntimeTransaction::>::try_from( + transaction, + solana_sdk::transaction::MessageHash::Compute, + Some(packet.meta().is_simple_vote_tx()), + ) else { + continue; + }; + + // Calculate priority if we can, if this fails we drop. + let Some(priority) = calculate_priority(&transaction, bank) else { + continue; + }; + + // If at capacity, check lowest priority item. + if packet_container.priority_queue.len() + == packet_container.priority_queue.capacity() + { + let min_priority = packet_container + .priority_queue + .peek_min() + .expect("not empty") + .priority; + // If priority of current packet is not higher than the min + // drop the current packet. + if min_priority >= priority { + continue; + } + + let dropped_index = packet_container + .priority_queue + .pop_min() + .expect("not empty") + .index; + packet_container.packets.remove(dropped_index); + } + + let entry = packet_container.packets.vacant_entry(); + let index = entry.key(); + entry.insert(packet.clone()); + let priority_index = PriorityIndex { priority, index }; + packet_container.priority_queue.push(priority_index); + } + } + } fn forward_buffered_packets(&mut self) { todo!() } + + fn initial_packet_meta_filter(packet: &Packet) -> bool { + let meta = packet.meta(); + !meta.discard() && !meta.forwarded() && meta.is_from_staked_node() + } } struct PacketContainer { @@ -114,10 +202,61 @@ impl PacketContainer { } } -type Index = u16; - #[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] struct PriorityIndex { priority: u64, - index: u16, + index: usize, +} + +/// Calculate priority for a transaction: +/// +/// The priority is calculated as: +/// P = R / (1 + C) +/// where P is the priority, R is the reward, +/// and C is the cost towards block-limits. +/// +/// Current minimum costs are on the order of several hundred, +/// so the denominator is effectively C, and the +1 is simply +/// to avoid any division by zero due to a bug - these costs +/// are estimate by the cost-model and are not direct +/// from user input. They should never be zero. +/// Any difference in the prioritization is negligible for +/// the current transaction costs. +fn calculate_priority<'a>( + transaction: &RuntimeTransaction>, + bank: &Bank, +) -> Option { + let compute_budget_limits = transaction + .compute_budget_instruction_details() + .sanitize_and_convert_to_compute_budget_limits(&bank.feature_set) + .ok()?; + let fee_budget_limits = FeeBudgetLimits::from(compute_budget_limits); + + // Manually estimate fee here since currently interface doesn't allow a on SVM type. + // Doesn't need to be 100% accurate so long as close and consistent. + let prioritization_fee = fee_budget_limits.prioritization_fee; + let signature_details = transaction.signature_details(); + let signature_fee = signature_details + .total_signatures() + .saturating_mul(bank.fee_structure().lamports_per_signature); + let fee = signature_fee.saturating_add(prioritization_fee); + + let cost = CostModel::estimate_cost( + transaction, + transaction.program_instructions_iter(), + transaction.num_requested_write_locks(), + &bank.feature_set, + ); + + // We need a multiplier here to avoid rounding down too aggressively. + // For many transactions, the cost will be greater than the fees in terms of raw lamports. + // For the purposes of calculating prioritization, we multiply the fees by a large number so that + // the cost is a small fraction. + // An offset of 1 is used in the denominator to explicitly avoid division by zero. + const MULTIPLIER: u64 = 1_000_000; + Some( + MULTIPLIER + .saturating_mul(fee) + .wrapping_div(cost.sum().saturating_add(1)), + ) } From a152b38a1ed159471d6c6ccd6f3ac209db02a7fd Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 15 Jan 2025 15:58:53 -0600 Subject: [PATCH 04/21] first pass, single buffer --- core/src/forwarding_stage.rs | 47 ++++++++++++++---------------------- 1 file changed, 18 insertions(+), 29 deletions(-) diff --git a/core/src/forwarding_stage.rs b/core/src/forwarding_stage.rs index 3afdfc5becf4a6..63c72ce5796723 100644 --- a/core/src/forwarding_stage.rs +++ b/core/src/forwarding_stage.rs @@ -23,8 +23,7 @@ use { pub struct ForwardingStage { receiver: Receiver<(BankingPacketBatch, bool)>, - vote_packet_container: PacketContainer, - non_vote_packet_container: PacketContainer, + packet_container: PacketContainer, root_bank_cache: RootBankCache, connection_cache: Arc, @@ -52,8 +51,7 @@ impl ForwardingStage { ) -> Self { Self { receiver, - vote_packet_container: PacketContainer::with_capacity(4096), - non_vote_packet_container: PacketContainer::with_capacity(4 * 4096), + packet_container: PacketContainer::with_capacity(4 * 4096), root_bank_cache, connection_cache, data_budget: DataBudget::default(), @@ -75,8 +73,8 @@ impl ForwardingStage { let now = Instant::now(); const TIMEOUT: Duration = Duration::from_millis(10); match self.receiver.recv_timeout(TIMEOUT) { - Ok((packet_batches, tpu_vote_batch)) => { - self.buffer_packet_batches(packet_batches, tpu_vote_batch, bank); + Ok((packet_batches, _tpu_vote_batch)) => { + self.buffer_packet_batches(packet_batches, bank); // Drain the channel up to timeout let timed_out = loop { @@ -84,8 +82,8 @@ impl ForwardingStage { break true; } match self.receiver.try_recv() { - Ok((packet_batches, tpu_vote_batch)) => { - self.buffer_packet_batches(packet_batches, tpu_vote_batch, bank) + Ok((packet_batches, _tpu_vote_batch)) => { + self.buffer_packet_batches(packet_batches, bank) } Err(_) => break false, } @@ -105,18 +103,7 @@ impl ForwardingStage { } } - fn buffer_packet_batches( - &mut self, - packet_batches: BankingPacketBatch, - tpu_vote_batch: bool, - bank: &Bank, - ) { - let packet_container = if tpu_vote_batch { - &mut self.vote_packet_container - } else { - &mut self.non_vote_packet_container - }; - + fn buffer_packet_batches(&mut self, packet_batches: BankingPacketBatch, bank: &Bank) { for batch in packet_batches.iter() { for packet in batch.iter().filter(|p| Self::initial_packet_meta_filter(p)) { let Some(packet_data) = packet.data(..) else { @@ -147,10 +134,11 @@ impl ForwardingStage { }; // If at capacity, check lowest priority item. - if packet_container.priority_queue.len() - == packet_container.priority_queue.capacity() + if self.packet_container.priority_queue.len() + == self.packet_container.priority_queue.capacity() { - let min_priority = packet_container + let min_priority = self + .packet_container .priority_queue .peek_min() .expect("not empty") @@ -161,19 +149,20 @@ impl ForwardingStage { continue; } - let dropped_index = packet_container + let dropped_index = self + .packet_container .priority_queue .pop_min() .expect("not empty") .index; - packet_container.packets.remove(dropped_index); + self.packet_container.packets.remove(dropped_index); } - let entry = packet_container.packets.vacant_entry(); + let entry = self.packet_container.packets.vacant_entry(); let index = entry.key(); entry.insert(packet.clone()); let priority_index = PriorityIndex { priority, index }; - packet_container.priority_queue.push(priority_index); + self.packet_container.priority_queue.push(priority_index); } } } @@ -222,8 +211,8 @@ struct PriorityIndex { /// from user input. They should never be zero. /// Any difference in the prioritization is negligible for /// the current transaction costs. -fn calculate_priority<'a>( - transaction: &RuntimeTransaction>, +fn calculate_priority( + transaction: &RuntimeTransaction>, bank: &Bank, ) -> Option { let compute_budget_limits = transaction From 6d6eed38663007728ed292a513019ef65e09e0ef Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 15 Jan 2025 16:05:31 -0600 Subject: [PATCH 05/21] ForwardAddressGetter --- core/src/forwarding_stage.rs | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/core/src/forwarding_stage.rs b/core/src/forwarding_stage.rs index 63c72ce5796723..b3c84391be27be 100644 --- a/core/src/forwarding_stage.rs +++ b/core/src/forwarding_stage.rs @@ -1,4 +1,8 @@ use { + crate::{ + banking_stage::LikeClusterInfo, + next_leader::{next_leader, next_leader_tpu_vote}, + }, agave_banking_stage_ingress_types::BankingPacketBatch, agave_transaction_view::transaction_view::SanitizedTransactionView, crossbeam_channel::{Receiver, RecvTimeoutError}, @@ -6,21 +10,41 @@ use { slab::Slab, solana_client::connection_cache::ConnectionCache, solana_cost_model::cost_model::CostModel, + solana_gossip::contact_info::Protocol, solana_net_utils::bind_to_unspecified, solana_perf::{data_budget::DataBudget, packet::Packet}, + solana_poh::poh_recorder::PohRecorder, solana_runtime::{bank::Bank, root_bank_cache::RootBankCache}, solana_runtime_transaction::{ runtime_transaction::RuntimeTransaction, transaction_meta::StaticMeta, }, solana_sdk::fee::FeeBudgetLimits, std::{ - net::UdpSocket, - sync::Arc, + net::{SocketAddr, UdpSocket}, + sync::{Arc, RwLock}, thread::{Builder, JoinHandle}, time::{Duration, Instant}, }, }; +pub struct ForwardingAddresses { + pub tpu: Option, + pub tpu_vote: Option, +} + +pub trait ForwardAddressGetter: Send + Sync + 'static { + fn get_forwarding_addresses(&self, protocol: Protocol) -> ForwardingAddresses; +} + +impl ForwardAddressGetter for (T, Arc>) { + fn get_forwarding_addresses(&self, protocol: Protocol) -> ForwardingAddresses { + ForwardingAddresses { + tpu: next_leader(&self.0, &self.1, |node| node.tpu_forwards(protocol)).map(|(_, s)| s), + tpu_vote: next_leader_tpu_vote(&self.0, &self.1).map(|(_, s)| s), + } + } +} + pub struct ForwardingStage { receiver: Receiver<(BankingPacketBatch, bool)>, packet_container: PacketContainer, From 247143fe819bb2273d2acb7930c3323aaf5b605c Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 15 Jan 2025 16:07:44 -0600 Subject: [PATCH 06/21] add forward_address_getter field --- core/src/forwarding_stage.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/core/src/forwarding_stage.rs b/core/src/forwarding_stage.rs index b3c84391be27be..8610b93b902150 100644 --- a/core/src/forwarding_stage.rs +++ b/core/src/forwarding_stage.rs @@ -45,23 +45,30 @@ impl ForwardAddressGetter for (T, Arc>) } } -pub struct ForwardingStage { +pub struct ForwardingStage { receiver: Receiver<(BankingPacketBatch, bool)>, packet_container: PacketContainer, root_bank_cache: RootBankCache, + forward_address_getter: F, connection_cache: Arc, data_budget: DataBudget, udp_socket: UdpSocket, } -impl ForwardingStage { +impl ForwardingStage { pub fn spawn( receiver: Receiver<(BankingPacketBatch, bool)>, connection_cache: Arc, root_bank_cache: RootBankCache, + forward_address_getter: F, ) -> JoinHandle<()> { - let forwarding_stage = Self::new(receiver, connection_cache, root_bank_cache); + let forwarding_stage = Self::new( + receiver, + connection_cache, + root_bank_cache, + forward_address_getter, + ); Builder::new() .name("solFwdStage".to_string()) .spawn(move || forwarding_stage.run()) @@ -72,11 +79,13 @@ impl ForwardingStage { receiver: Receiver<(BankingPacketBatch, bool)>, connection_cache: Arc, root_bank_cache: RootBankCache, + forward_address_getter: F, ) -> Self { Self { receiver, packet_container: PacketContainer::with_capacity(4 * 4096), root_bank_cache, + forward_address_getter, connection_cache, data_budget: DataBudget::default(), udp_socket: bind_to_unspecified().unwrap(), From 2be12d01500f0c884c4bee417562fc63bcf4f543 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 15 Jan 2025 16:31:48 -0600 Subject: [PATCH 07/21] forwarding --- core/src/forwarding_stage.rs | 83 ++++++++++++++++++++++++++++++++++-- 1 file changed, 79 insertions(+), 4 deletions(-) diff --git a/core/src/forwarding_stage.rs b/core/src/forwarding_stage.rs index 8610b93b902150..2c53f815295197 100644 --- a/core/src/forwarding_stage.rs +++ b/core/src/forwarding_stage.rs @@ -9,6 +9,7 @@ use { min_max_heap::MinMaxHeap, slab::Slab, solana_client::connection_cache::ConnectionCache, + solana_connection_cache::client_connection::ClientConnection, solana_cost_model::cost_model::CostModel, solana_gossip::contact_info::Protocol, solana_net_utils::bind_to_unspecified, @@ -19,6 +20,7 @@ use { runtime_transaction::RuntimeTransaction, transaction_meta::StaticMeta, }, solana_sdk::fee::FeeBudgetLimits, + solana_streamer::sendmmsg::batch_send, std::{ net::{SocketAddr, UdpSocket}, sync::{Arc, RwLock}, @@ -27,6 +29,8 @@ use { }, }; +const FORWARD_BATCH_SIZE: usize = 128; + pub struct ForwardingAddresses { pub tpu: Option, pub tpu_vote: Option, @@ -200,14 +204,85 @@ impl ForwardingStage { } } - fn forward_buffered_packets(&mut self) { - todo!() - } - fn initial_packet_meta_filter(packet: &Packet) -> bool { let meta = packet.meta(); !meta.discard() && !meta.forwarded() && meta.is_from_staked_node() } + + fn forward_buffered_packets(&mut self) { + self.refresh_data_budget(); + + // Get forwarding addresses otherwise return now. + let ForwardingAddresses { + tpu: Some(tpu), + tpu_vote: Some(tpu_vote), + } = self + .forward_address_getter + .get_forwarding_addresses(self.connection_cache.protocol()) + else { + return; + }; + + let mut vote_batch = Vec::with_capacity(FORWARD_BATCH_SIZE); + let mut non_vote_batch = Vec::with_capacity(FORWARD_BATCH_SIZE); + + // Loop through packets creating batches of packets to forward. + while let Some(priority_index) = self.packet_container.priority_queue.pop_max() { + let packet = self + .packet_container + .packets + .get(priority_index.index) + .expect("packet exists"); + + // If it exceeds our data-budget, drop. + if !self.data_budget.take(packet.meta().size) { + self.packet_container.packets.remove(priority_index.index); + continue; + } + + let packet_data_vec = packet.data(..).expect("packet has data").to_vec(); + + if packet.meta().is_simple_vote_tx() { + vote_batch.push(packet_data_vec); + if vote_batch.len() == vote_batch.capacity() { + self.send_vote_batch(tpu_vote, &mut vote_batch); + } + } else { + non_vote_batch.push((packet_data_vec, tpu)); + if non_vote_batch.len() == non_vote_batch.capacity() { + self.send_non_vote_batch(&mut non_vote_batch); + } + } + } + } + + /// Re-fill the data budget if enough time has passed + fn refresh_data_budget(&self) { + const INTERVAL_MS: u64 = 100; + // 12 MB outbound limit per second + const MAX_BYTES_PER_SECOND: usize = 12_000_000; + const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000; + const MAX_BYTES_BUDGET: usize = MAX_BYTES_PER_INTERVAL * 5; + self.data_budget.update(INTERVAL_MS, |bytes| { + std::cmp::min( + bytes.saturating_add(MAX_BYTES_PER_INTERVAL), + MAX_BYTES_BUDGET, + ) + }); + } + + fn send_vote_batch(&self, addr: SocketAddr, vote_batch: &mut Vec>) { + let conn = self.connection_cache.get_connection(&addr); + let mut batch = Vec::with_capacity(FORWARD_BATCH_SIZE); + core::mem::swap(&mut batch, vote_batch); + let _res = conn.send_data_batch_async(batch); + } + + fn send_non_vote_batch(&self, non_vote_batch: &mut Vec<(Vec, SocketAddr)>) { + let mut batch = Vec::with_capacity(FORWARD_BATCH_SIZE); + core::mem::swap(&mut batch, non_vote_batch); + let _res = batch_send(&self.udp_socket, &batch); + } } struct PacketContainer { From bed46907ecf6451ae6c0d5336a1a62d66e77f7fa Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 15 Jan 2025 16:32:49 -0600 Subject: [PATCH 08/21] explicit use instead of qualifying --- core/src/forwarding_stage.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/forwarding_stage.rs b/core/src/forwarding_stage.rs index 2c53f815295197..43addf14bddeba 100644 --- a/core/src/forwarding_stage.rs +++ b/core/src/forwarding_stage.rs @@ -19,7 +19,7 @@ use { solana_runtime_transaction::{ runtime_transaction::RuntimeTransaction, transaction_meta::StaticMeta, }, - solana_sdk::fee::FeeBudgetLimits, + solana_sdk::{fee::FeeBudgetLimits, transaction::MessageHash}, solana_streamer::sendmmsg::batch_send, std::{ net::{SocketAddr, UdpSocket}, @@ -159,7 +159,7 @@ impl ForwardingStage { // are able to calculate fees for prioritization. let Ok(transaction) = RuntimeTransaction::>::try_from( transaction, - solana_sdk::transaction::MessageHash::Compute, + MessageHash::Compute, Some(packet.meta().is_simple_vote_tx()), ) else { continue; From e439f53175cd27fbd1712fa533ec7076f9884d6a Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 16 Jan 2025 09:01:56 -0600 Subject: [PATCH 09/21] basic count metrics --- core/src/forwarding_stage.rs | 170 ++++++++++++++++++++++++++++++----- 1 file changed, 146 insertions(+), 24 deletions(-) diff --git a/core/src/forwarding_stage.rs b/core/src/forwarding_stage.rs index 43addf14bddeba..a038ce06458dbe 100644 --- a/core/src/forwarding_stage.rs +++ b/core/src/forwarding_stage.rs @@ -58,6 +58,8 @@ pub struct ForwardingStage { connection_cache: Arc, data_budget: DataBudget, udp_socket: UdpSocket, + + metrics: ForwardingStageMetrics, } impl ForwardingStage { @@ -93,6 +95,7 @@ impl ForwardingStage { connection_cache, data_budget: DataBudget::default(), udp_socket: bind_to_unspecified().unwrap(), + metrics: ForwardingStageMetrics::default(), } } @@ -103,6 +106,7 @@ impl ForwardingStage { break; } self.forward_buffered_packets(); + self.metrics.maybe_report(); } } @@ -110,8 +114,9 @@ impl ForwardingStage { let now = Instant::now(); const TIMEOUT: Duration = Duration::from_millis(10); match self.receiver.recv_timeout(TIMEOUT) { - Ok((packet_batches, _tpu_vote_batch)) => { - self.buffer_packet_batches(packet_batches, bank); + Ok((packet_batches, tpu_vote_batch)) => { + self.metrics.did_something = true; + self.buffer_packet_batches(packet_batches, tpu_vote_batch, bank); // Drain the channel up to timeout let timed_out = loop { @@ -119,8 +124,8 @@ impl ForwardingStage { break true; } match self.receiver.try_recv() { - Ok((packet_batches, _tpu_vote_batch)) => { - self.buffer_packet_batches(packet_batches, bank) + Ok((packet_batches, tpu_vote_batch)) => { + self.buffer_packet_batches(packet_batches, tpu_vote_batch, bank) } Err(_) => break false, } @@ -130,7 +135,10 @@ impl ForwardingStage { // packets in the channel. if timed_out { warn!("ForwardingStage is backed up, dropping packets"); - while self.receiver.try_recv().is_ok() {} + while let Ok((packet_batch, _)) = self.receiver.try_recv() { + self.metrics.dropped_on_timeout += + packet_batch.iter().map(|b| b.len()).sum::(); + } } true @@ -140,7 +148,12 @@ impl ForwardingStage { } } - fn buffer_packet_batches(&mut self, packet_batches: BankingPacketBatch, bank: &Bank) { + fn buffer_packet_batches( + &mut self, + packet_batches: BankingPacketBatch, + is_tpu_vote_batch: bool, + bank: &Bank, + ) { for batch in packet_batches.iter() { for packet in batch.iter().filter(|p| Self::initial_packet_meta_filter(p)) { let Some(packet_data) = packet.data(..) else { @@ -149,24 +162,29 @@ impl ForwardingStage { continue; }; - // Parse the transaction, make sure it passes basic sanitization checks. - let Ok(transaction) = SanitizedTransactionView::try_new_sanitized(packet_data) + let vote_count = usize::from(is_tpu_vote_batch); + let non_vote_count = usize::from(!is_tpu_vote_batch); + + self.metrics.votes_received += vote_count; + self.metrics.non_votes_received += non_vote_count; + + // Perform basic sanitization checks and calculate priority. + // If any steps fail, drop the packet. + let Some(priority) = SanitizedTransactionView::try_new_sanitized(packet_data) + .map_err(|_| ()) + .and_then(|transaction| { + RuntimeTransaction::>::try_from( + transaction, + MessageHash::Compute, + Some(packet.meta().is_simple_vote_tx()), + ) + .map_err(|_| ()) + }) + .ok() + .and_then(|transaction| calculate_priority(&transaction, bank)) else { - continue; - }; - - // Calculate static metadata for the transaction so that we - // are able to calculate fees for prioritization. - let Ok(transaction) = RuntimeTransaction::>::try_from( - transaction, - MessageHash::Compute, - Some(packet.meta().is_simple_vote_tx()), - ) else { - continue; - }; - - // Calculate priority if we can, if this fails we drop. - let Some(priority) = calculate_priority(&transaction, bank) else { + self.metrics.votes_dropped_on_receive += vote_count; + self.metrics.non_votes_dropped_on_receive += non_vote_count; continue; }; @@ -183,6 +201,8 @@ impl ForwardingStage { // If priority of current packet is not higher than the min // drop the current packet. if min_priority >= priority { + self.metrics.votes_dropped_on_capacity += vote_count; + self.metrics.non_votes_dropped_on_capacity += non_vote_count; continue; } @@ -192,7 +212,11 @@ impl ForwardingStage { .pop_min() .expect("not empty") .index; - self.packet_container.packets.remove(dropped_index); + let dropped_packet = self.packet_container.packets.remove(dropped_index); + self.metrics.votes_dropped_on_capacity += + usize::from(dropped_packet.meta().is_simple_vote_tx()); + self.metrics.non_votes_dropped_on_capacity += + usize::from(!dropped_packet.meta().is_simple_vote_tx()); } let entry = self.packet_container.packets.vacant_entry(); @@ -210,6 +234,7 @@ impl ForwardingStage { } fn forward_buffered_packets(&mut self) { + self.metrics.did_something |= !self.packet_container.priority_queue.is_empty(); self.refresh_data_budget(); // Get forwarding addresses otherwise return now. @@ -236,6 +261,10 @@ impl ForwardingStage { // If it exceeds our data-budget, drop. if !self.data_budget.take(packet.meta().size) { + self.metrics.votes_dropped_on_data_budget += + usize::from(packet.meta().is_simple_vote_tx()); + self.metrics.non_votes_dropped_on_data_budget += + usize::from(!packet.meta().is_simple_vote_tx()); self.packet_container.packets.remove(priority_index.index); continue; } @@ -254,6 +283,16 @@ impl ForwardingStage { } } } + + // Send out remaining packets + if !vote_batch.is_empty() { + self.metrics.votes_forwarded += vote_batch.len(); + self.send_vote_batch(tpu_vote, &mut vote_batch); + } + if !non_vote_batch.is_empty() { + self.metrics.non_votes_forwarded += non_vote_batch.len(); + self.send_non_vote_batch(&mut non_vote_batch); + } } /// Re-fill the data budget if enough time has passed @@ -357,3 +396,86 @@ fn calculate_priority( .wrapping_div(cost.sum().saturating_add(1)), ) } + +struct ForwardingStageMetrics { + last_reported: Instant, + did_something: bool, + + votes_received: usize, + votes_dropped_on_receive: usize, + votes_dropped_on_capacity: usize, + votes_dropped_on_data_budget: usize, + votes_forwarded: usize, + + non_votes_received: usize, + non_votes_dropped_on_receive: usize, + non_votes_dropped_on_capacity: usize, + non_votes_dropped_on_data_budget: usize, + non_votes_forwarded: usize, + + dropped_on_timeout: usize, +} + +impl ForwardingStageMetrics { + fn maybe_report(&mut self) { + const REPORTING_INTERVAL: Duration = Duration::from_secs(1); + + if self.last_reported.elapsed() > REPORTING_INTERVAL { + // Reset time and all counts. + let metrics = core::mem::take(self); + + // Only report if something happened. + if !metrics.did_something { + return; + } + + datapoint_info!( + "forwarding_stage", + ("votes_received", metrics.votes_received, i64), + ( + "votes_dropped_on_receive", + metrics.votes_dropped_on_receive, + i64 + ), + ( + "votes_dropped_on_data_budget", + metrics.votes_dropped_on_data_budget, + i64 + ), + ("votes_forwarded", metrics.votes_forwarded, i64), + ("non_votes_received", metrics.non_votes_received, i64), + ( + "votes_dropped_on_receive", + metrics.votes_dropped_on_receive, + i64 + ), + ( + "votes_dropped_on_data_budget", + metrics.votes_dropped_on_data_budget, + i64 + ), + ("votes_forwarded", metrics.votes_forwarded, i64), + ); + } + } +} + +impl Default for ForwardingStageMetrics { + fn default() -> Self { + Self { + last_reported: Instant::now(), + did_something: false, + votes_received: 0, + votes_dropped_on_receive: 0, + votes_dropped_on_capacity: 0, + votes_dropped_on_data_budget: 0, + votes_forwarded: 0, + non_votes_received: 0, + non_votes_dropped_on_receive: 0, + non_votes_dropped_on_capacity: 0, + non_votes_dropped_on_data_budget: 0, + non_votes_forwarded: 0, + dropped_on_timeout: 0, + } + } +} From 2679358dbc65cca34dbd46350c9087d0aef6be15 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 16 Jan 2025 15:53:53 -0600 Subject: [PATCH 10/21] test_initial_packet_meta_filter --- core/src/forwarding_stage.rs | 46 ++++++++++++++++++++++++++++++------ 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/core/src/forwarding_stage.rs b/core/src/forwarding_stage.rs index a038ce06458dbe..15e82eadb7d4b1 100644 --- a/core/src/forwarding_stage.rs +++ b/core/src/forwarding_stage.rs @@ -19,7 +19,7 @@ use { solana_runtime_transaction::{ runtime_transaction::RuntimeTransaction, transaction_meta::StaticMeta, }, - solana_sdk::{fee::FeeBudgetLimits, transaction::MessageHash}, + solana_sdk::{fee::FeeBudgetLimits, packet, transaction::MessageHash}, solana_streamer::sendmmsg::batch_send, std::{ net::{SocketAddr, UdpSocket}, @@ -155,7 +155,10 @@ impl ForwardingStage { bank: &Bank, ) { for batch in packet_batches.iter() { - for packet in batch.iter().filter(|p| Self::initial_packet_meta_filter(p)) { + for packet in batch + .iter() + .filter(|p| initial_packet_meta_filter(p.meta())) + { let Some(packet_data) = packet.data(..) else { // should never occur since we've already checked the // packet is not marked for discard. @@ -228,11 +231,6 @@ impl ForwardingStage { } } - fn initial_packet_meta_filter(packet: &Packet) -> bool { - let meta = packet.meta(); - !meta.discard() && !meta.forwarded() && meta.is_from_staked_node() - } - fn forward_buffered_packets(&mut self) { self.metrics.did_something |= !self.packet_container.priority_queue.is_empty(); self.refresh_data_budget(); @@ -479,3 +477,37 @@ impl Default for ForwardingStageMetrics { } } } + +fn initial_packet_meta_filter(meta: &packet::Meta) -> bool { + !meta.discard() && !meta.forwarded() && meta.is_from_staked_node() +} + +#[cfg(test)] +mod tests { + use {super::*, packet::PacketFlags}; + + #[test] + fn test_initial_packet_meta_filter() { + fn meta_with_flags(packet_flags: PacketFlags) -> packet::Meta { + let mut meta = packet::Meta::default(); + meta.flags = packet_flags; + meta + } + + assert!(!initial_packet_meta_filter(&meta_with_flags( + PacketFlags::empty() + ))); + assert!(initial_packet_meta_filter(&meta_with_flags( + PacketFlags::FROM_STAKED_NODE + ))); + assert!(!initial_packet_meta_filter(&meta_with_flags( + PacketFlags::DISCARD + ))); + assert!(!initial_packet_meta_filter(&meta_with_flags( + PacketFlags::FORWARDED + ))); + assert!(!initial_packet_meta_filter(&meta_with_flags( + PacketFlags::FROM_STAKED_NODE | PacketFlags::DISCARD + ))); + } +} From 3dce54ae32b6a5a8487b06c3b863729ccc1db14c Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 16 Jan 2025 16:51:31 -0600 Subject: [PATCH 11/21] test_forwarding --- core/src/forwarding_stage.rs | 118 ++++++++++++++++++++++++++++++++--- 1 file changed, 111 insertions(+), 7 deletions(-) diff --git a/core/src/forwarding_stage.rs b/core/src/forwarding_stage.rs index 15e82eadb7d4b1..67bb3d21ab2350 100644 --- a/core/src/forwarding_stage.rs +++ b/core/src/forwarding_stage.rs @@ -31,6 +31,7 @@ use { const FORWARD_BATCH_SIZE: usize = 128; +#[derive(Clone)] pub struct ForwardingAddresses { pub tpu: Option, pub tpu_vote: Option, @@ -484,16 +485,42 @@ fn initial_packet_meta_filter(meta: &packet::Meta) -> bool { #[cfg(test)] mod tests { - use {super::*, packet::PacketFlags}; + use { + super::*, + crossbeam_channel::unbounded, + packet::PacketFlags, + solana_perf::packet::PacketBatch, + solana_pubkey::Pubkey, + solana_runtime::genesis_utils::create_genesis_config, + solana_sdk::{hash::Hash, signature::Keypair, system_transaction}, + }; + + impl ForwardAddressGetter for ForwardingAddresses { + fn get_forwarding_addresses(&self, _protocol: Protocol) -> ForwardingAddresses { + self.clone() + } + } + + fn meta_with_flags(packet_flags: PacketFlags) -> packet::Meta { + let mut meta = packet::Meta::default(); + meta.flags = packet_flags; + meta + } + + fn simple_transfer_with_flags(packet_flags: PacketFlags) -> Packet { + let transaction = system_transaction::transfer( + &Keypair::new(), + &Pubkey::new_unique(), + 1, + Hash::default(), + ); + let mut packet = Packet::from_data(None, &transaction).unwrap(); + packet.meta_mut().flags = packet_flags; + packet + } #[test] fn test_initial_packet_meta_filter() { - fn meta_with_flags(packet_flags: PacketFlags) -> packet::Meta { - let mut meta = packet::Meta::default(); - meta.flags = packet_flags; - meta - } - assert!(!initial_packet_meta_filter(&meta_with_flags( PacketFlags::empty() ))); @@ -510,4 +537,81 @@ mod tests { PacketFlags::FROM_STAKED_NODE | PacketFlags::DISCARD ))); } + + #[test] + fn test_forwarding() { + let vote_socket = bind_to_unspecified().unwrap(); + vote_socket + .set_read_timeout(Some(Duration::from_millis(10))) + .unwrap(); + let non_vote_socket = bind_to_unspecified().unwrap(); + non_vote_socket + .set_read_timeout(Some(Duration::from_millis(10))) + .unwrap(); + + let forwarding_addresses = ForwardingAddresses { + tpu_vote: Some(vote_socket.local_addr().unwrap()), + tpu: Some(non_vote_socket.local_addr().unwrap()), + }; + + let (packet_batch_sender, packet_batch_receiver) = unbounded(); + let connection_cache = Arc::new(ConnectionCache::with_udp("connection_cache_test", 1)); + let (_bank, bank_forks) = + Bank::new_with_bank_forks_for_tests(&create_genesis_config(1).genesis_config); + let root_bank_cache = RootBankCache::new(bank_forks); + let mut forwarding_stage = ForwardingStage::new( + packet_batch_receiver, + connection_cache, + root_bank_cache, + forwarding_addresses, + ); + + // Send packet batches. + let non_vote_packets = BankingPacketBatch::new(vec![PacketBatch::new(vec![ + simple_transfer_with_flags(PacketFlags::FROM_STAKED_NODE), + simple_transfer_with_flags(PacketFlags::FROM_STAKED_NODE | PacketFlags::DISCARD), + simple_transfer_with_flags(PacketFlags::FROM_STAKED_NODE | PacketFlags::FORWARDED), + ])]); + let vote_packets = BankingPacketBatch::new(vec![PacketBatch::new(vec![ + simple_transfer_with_flags(PacketFlags::SIMPLE_VOTE_TX | PacketFlags::FROM_STAKED_NODE), + simple_transfer_with_flags( + PacketFlags::SIMPLE_VOTE_TX | PacketFlags::FROM_STAKED_NODE | PacketFlags::DISCARD, + ), + simple_transfer_with_flags( + PacketFlags::SIMPLE_VOTE_TX + | PacketFlags::FROM_STAKED_NODE + | PacketFlags::FORWARDED, + ), + ])]); + + packet_batch_sender + .send((non_vote_packets.clone(), false)) + .unwrap(); + packet_batch_sender + .send((vote_packets.clone(), true)) + .unwrap(); + + let bank = forwarding_stage.root_bank_cache.root_bank(); + forwarding_stage.receive_and_buffer(&bank); + if !packet_batch_sender.is_empty() { + forwarding_stage.receive_and_buffer(&bank); + } + assert_eq!(forwarding_stage.packet_container.priority_queue.len(), 2); // only 2 valid packets + forwarding_stage.forward_buffered_packets(); + + let recv_buffer = &mut [0; 1024]; + let (vote_packet_bytes, _) = vote_socket.recv_from(recv_buffer).unwrap(); + assert_eq!( + &recv_buffer[..vote_packet_bytes], + vote_packets[0][0].data(..).unwrap() + ); + assert!(vote_socket.recv_from(recv_buffer).is_err()); + + let (non_vote_packet_bytes, _) = non_vote_socket.recv_from(recv_buffer).unwrap(); + assert_eq!( + &recv_buffer[..non_vote_packet_bytes], + non_vote_packets[0][0].data(..).unwrap() + ); + assert!(non_vote_socket.recv_from(recv_buffer).is_err()); + } } From a694e0085d0c195057e88b19a6811e76db2f1cbc Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 16 Jan 2025 17:12:38 -0600 Subject: [PATCH 12/21] clippy --- core/src/forwarding_stage.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/forwarding_stage.rs b/core/src/forwarding_stage.rs index 67bb3d21ab2350..c8541106944ee9 100644 --- a/core/src/forwarding_stage.rs +++ b/core/src/forwarding_stage.rs @@ -502,9 +502,10 @@ mod tests { } fn meta_with_flags(packet_flags: PacketFlags) -> packet::Meta { - let mut meta = packet::Meta::default(); - meta.flags = packet_flags; - meta + packet::Meta { + flags: packet_flags, + ..packet::Meta::default() + } } fn simple_transfer_with_flags(packet_flags: PacketFlags) -> Packet { From d4799c84b4988a0a9c87c1c3cb4299a1e64353ef Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 22 Jan 2025 09:31:25 -0600 Subject: [PATCH 13/21] documenting comments --- core/src/forwarding_stage.rs | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/core/src/forwarding_stage.rs b/core/src/forwarding_stage.rs index c8541106944ee9..11dd6875e9dd78 100644 --- a/core/src/forwarding_stage.rs +++ b/core/src/forwarding_stage.rs @@ -1,3 +1,6 @@ +//! `ForwardingStage` is a stage parallel to `BankingStage` that forwards +//! packets to a node that is or will be leader soon. + use { crate::{ banking_stage::LikeClusterInfo, @@ -29,11 +32,17 @@ use { }, }; +/// Value chosen because it was used historically, at some point +/// was found to be optimal. If we need to improve performance +/// this should be evaluated with new stage. const FORWARD_BATCH_SIZE: usize = 128; +/// Addresses to use for forwarding. #[derive(Clone)] pub struct ForwardingAddresses { + /// Forwarding address for TPU (non-votes) pub tpu: Option, + /// Forwarding address for votes pub tpu_vote: Option, } @@ -50,6 +59,7 @@ impl ForwardAddressGetter for (T, Arc>) } } +/// Forwards packets to current/next leader. pub struct ForwardingStage { receiver: Receiver<(BankingPacketBatch, bool)>, packet_container: PacketContainer, @@ -100,6 +110,7 @@ impl ForwardingStage { } } + /// Runs `ForwardingStage`'s main loop, to receive, order, and forward packets. fn run(mut self) { loop { let root_bank = self.root_bank_cache.root_bank(); @@ -111,9 +122,13 @@ impl ForwardingStage { } } + /// Receive packets from previous stage and insert them into the buffer. fn receive_and_buffer(&mut self, bank: &Bank) -> bool { - let now = Instant::now(); + // Timeout is long enough to receive packets but not too long that we + // forward infrequently. const TIMEOUT: Duration = Duration::from_millis(10); + + let now = Instant::now(); match self.receiver.recv_timeout(TIMEOUT) { Ok((packet_batches, tpu_vote_batch)) => { self.metrics.did_something = true; @@ -149,6 +164,7 @@ impl ForwardingStage { } } + /// Insert received packets into the packet container. fn buffer_packet_batches( &mut self, packet_batches: BankingPacketBatch, @@ -232,6 +248,9 @@ impl ForwardingStage { } } + /// Forwards packets that have been buffered. This will loop through all + /// packets. If the data budget is exceeded then remaining packets are + /// dropped. fn forward_buffered_packets(&mut self) { self.metrics.did_something |= !self.packet_container.priority_queue.is_empty(); self.refresh_data_budget(); @@ -323,6 +342,9 @@ impl ForwardingStage { } } +/// Container for storing packets. +/// Packet IDs are stored with priority in a priority queue and the actual +/// `Packet` are stored in a map. struct PacketContainer { priority_queue: MinMaxHeap, packets: Slab, From 39717abab09f564f1a5632d16e9a57681e6671c1 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 27 Jan 2025 08:46:33 -0600 Subject: [PATCH 14/21] pass DataBudget --- core/src/forwarding_stage.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/forwarding_stage.rs b/core/src/forwarding_stage.rs index 11dd6875e9dd78..9df54828cb35b3 100644 --- a/core/src/forwarding_stage.rs +++ b/core/src/forwarding_stage.rs @@ -79,12 +79,14 @@ impl ForwardingStage { connection_cache: Arc, root_bank_cache: RootBankCache, forward_address_getter: F, + data_budget: DataBudget, ) -> JoinHandle<()> { let forwarding_stage = Self::new( receiver, connection_cache, root_bank_cache, forward_address_getter, + data_budget, ); Builder::new() .name("solFwdStage".to_string()) @@ -97,6 +99,7 @@ impl ForwardingStage { connection_cache: Arc, root_bank_cache: RootBankCache, forward_address_getter: F, + data_budget: DataBudget, ) -> Self { Self { receiver, @@ -104,7 +107,7 @@ impl ForwardingStage { root_bank_cache, forward_address_getter, connection_cache, - data_budget: DataBudget::default(), + data_budget, udp_socket: bind_to_unspecified().unwrap(), metrics: ForwardingStageMetrics::default(), } @@ -587,6 +590,7 @@ mod tests { connection_cache, root_bank_cache, forwarding_addresses, + DataBudget::default(), ); // Send packet batches. From 08b536178e6d07b098024521745e715ec0473677 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 27 Jan 2025 08:51:57 -0600 Subject: [PATCH 15/21] bugfix: vote on udp --- core/src/forwarding_stage.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/core/src/forwarding_stage.rs b/core/src/forwarding_stage.rs index 9df54828cb35b3..1149f6513181af 100644 --- a/core/src/forwarding_stage.rs +++ b/core/src/forwarding_stage.rs @@ -293,14 +293,14 @@ impl ForwardingStage { let packet_data_vec = packet.data(..).expect("packet has data").to_vec(); if packet.meta().is_simple_vote_tx() { - vote_batch.push(packet_data_vec); + vote_batch.push((packet_data_vec, tpu_vote)); if vote_batch.len() == vote_batch.capacity() { - self.send_vote_batch(tpu_vote, &mut vote_batch); + self.send_vote_batch(&mut vote_batch); } } else { - non_vote_batch.push((packet_data_vec, tpu)); + non_vote_batch.push(packet_data_vec); if non_vote_batch.len() == non_vote_batch.capacity() { - self.send_non_vote_batch(&mut non_vote_batch); + self.send_non_vote_batch(tpu, &mut non_vote_batch); } } } @@ -308,11 +308,11 @@ impl ForwardingStage { // Send out remaining packets if !vote_batch.is_empty() { self.metrics.votes_forwarded += vote_batch.len(); - self.send_vote_batch(tpu_vote, &mut vote_batch); + self.send_vote_batch(&mut vote_batch); } if !non_vote_batch.is_empty() { self.metrics.non_votes_forwarded += non_vote_batch.len(); - self.send_non_vote_batch(&mut non_vote_batch); + self.send_non_vote_batch(tpu, &mut non_vote_batch); } } @@ -331,17 +331,17 @@ impl ForwardingStage { }); } - fn send_vote_batch(&self, addr: SocketAddr, vote_batch: &mut Vec>) { - let conn = self.connection_cache.get_connection(&addr); + fn send_vote_batch(&self, vote_batch: &mut Vec<(Vec, SocketAddr)>) { let mut batch = Vec::with_capacity(FORWARD_BATCH_SIZE); core::mem::swap(&mut batch, vote_batch); - let _res = conn.send_data_batch_async(batch); + let _res = batch_send(&self.udp_socket, &batch); } - fn send_non_vote_batch(&self, non_vote_batch: &mut Vec<(Vec, SocketAddr)>) { + fn send_non_vote_batch(&self, addr: SocketAddr, non_vote_batch: &mut Vec>) { + let conn = self.connection_cache.get_connection(&addr); let mut batch = Vec::with_capacity(FORWARD_BATCH_SIZE); core::mem::swap(&mut batch, non_vote_batch); - let _res = batch_send(&self.udp_socket, &batch); + let _res = conn.send_data_batch_async(batch); } } From fb06eef5f0d2e0762c0cea0cd1ec8c6432409694 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 27 Jan 2025 09:11:59 -0600 Subject: [PATCH 16/21] packet_container module --- core/src/forwarding_stage.rs | 70 ++++--------------- core/src/forwarding_stage/packet_container.rs | 56 +++++++++++++++ 2 files changed, 69 insertions(+), 57 deletions(-) create mode 100644 core/src/forwarding_stage/packet_container.rs diff --git a/core/src/forwarding_stage.rs b/core/src/forwarding_stage.rs index 1149f6513181af..823c287bad1ac3 100644 --- a/core/src/forwarding_stage.rs +++ b/core/src/forwarding_stage.rs @@ -9,14 +9,13 @@ use { agave_banking_stage_ingress_types::BankingPacketBatch, agave_transaction_view::transaction_view::SanitizedTransactionView, crossbeam_channel::{Receiver, RecvTimeoutError}, - min_max_heap::MinMaxHeap, - slab::Slab, + packet_container::PacketContainer, solana_client::connection_cache::ConnectionCache, solana_connection_cache::client_connection::ClientConnection, solana_cost_model::cost_model::CostModel, solana_gossip::contact_info::Protocol, solana_net_utils::bind_to_unspecified, - solana_perf::{data_budget::DataBudget, packet::Packet}, + solana_perf::data_budget::DataBudget, solana_poh::poh_recorder::PohRecorder, solana_runtime::{bank::Bank, root_bank_cache::RootBankCache}, solana_runtime_transaction::{ @@ -32,6 +31,8 @@ use { }, }; +mod packet_container; + /// Value chosen because it was used historically, at some point /// was found to be optimal. If we need to improve performance /// this should be evaluated with new stage. @@ -212,15 +213,8 @@ impl ForwardingStage { }; // If at capacity, check lowest priority item. - if self.packet_container.priority_queue.len() - == self.packet_container.priority_queue.capacity() - { - let min_priority = self - .packet_container - .priority_queue - .peek_min() - .expect("not empty") - .priority; + if self.packet_container.is_full() { + let min_priority = self.packet_container.min_priority().expect("not empty"); // If priority of current packet is not higher than the min // drop the current packet. if min_priority >= priority { @@ -229,24 +223,17 @@ impl ForwardingStage { continue; } - let dropped_index = self + let dropped_packet = self .packet_container - .priority_queue - .pop_min() - .expect("not empty") - .index; - let dropped_packet = self.packet_container.packets.remove(dropped_index); + .pop_and_remove_min() + .expect("not empty"); self.metrics.votes_dropped_on_capacity += usize::from(dropped_packet.meta().is_simple_vote_tx()); self.metrics.non_votes_dropped_on_capacity += usize::from(!dropped_packet.meta().is_simple_vote_tx()); } - let entry = self.packet_container.packets.vacant_entry(); - let index = entry.key(); - entry.insert(packet.clone()); - let priority_index = PriorityIndex { priority, index }; - self.packet_container.priority_queue.push(priority_index); + self.packet_container.insert(packet.clone(), priority); } } } @@ -255,7 +242,7 @@ impl ForwardingStage { /// packets. If the data budget is exceeded then remaining packets are /// dropped. fn forward_buffered_packets(&mut self) { - self.metrics.did_something |= !self.packet_container.priority_queue.is_empty(); + self.metrics.did_something |= !self.packet_container.is_empty(); self.refresh_data_budget(); // Get forwarding addresses otherwise return now. @@ -273,20 +260,13 @@ impl ForwardingStage { let mut non_vote_batch = Vec::with_capacity(FORWARD_BATCH_SIZE); // Loop through packets creating batches of packets to forward. - while let Some(priority_index) = self.packet_container.priority_queue.pop_max() { - let packet = self - .packet_container - .packets - .get(priority_index.index) - .expect("packet exists"); - + while let Some(packet) = self.packet_container.pop_and_remove_max() { // If it exceeds our data-budget, drop. if !self.data_budget.take(packet.meta().size) { self.metrics.votes_dropped_on_data_budget += usize::from(packet.meta().is_simple_vote_tx()); self.metrics.non_votes_dropped_on_data_budget += usize::from(!packet.meta().is_simple_vote_tx()); - self.packet_container.packets.remove(priority_index.index); continue; } @@ -345,29 +325,6 @@ impl ForwardingStage { } } -/// Container for storing packets. -/// Packet IDs are stored with priority in a priority queue and the actual -/// `Packet` are stored in a map. -struct PacketContainer { - priority_queue: MinMaxHeap, - packets: Slab, -} - -impl PacketContainer { - fn with_capacity(capacity: usize) -> Self { - Self { - priority_queue: MinMaxHeap::with_capacity(capacity), - packets: Slab::with_capacity(capacity), - } - } -} - -#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] -struct PriorityIndex { - priority: u64, - index: usize, -} - /// Calculate priority for a transaction: /// /// The priority is calculated as: @@ -514,7 +471,7 @@ mod tests { super::*, crossbeam_channel::unbounded, packet::PacketFlags, - solana_perf::packet::PacketBatch, + solana_perf::packet::{Packet, PacketBatch}, solana_pubkey::Pubkey, solana_runtime::genesis_utils::create_genesis_config, solana_sdk::{hash::Hash, signature::Keypair, system_transaction}, @@ -623,7 +580,6 @@ mod tests { if !packet_batch_sender.is_empty() { forwarding_stage.receive_and_buffer(&bank); } - assert_eq!(forwarding_stage.packet_container.priority_queue.len(), 2); // only 2 valid packets forwarding_stage.forward_buffered_packets(); let recv_buffer = &mut [0; 1024]; diff --git a/core/src/forwarding_stage/packet_container.rs b/core/src/forwarding_stage/packet_container.rs new file mode 100644 index 00000000000000..4ac171562dc41d --- /dev/null +++ b/core/src/forwarding_stage/packet_container.rs @@ -0,0 +1,56 @@ +use {min_max_heap::MinMaxHeap, slab::Slab, solana_perf::packet::Packet}; + +/// Container for storing packets. +/// Packet IDs are stored with priority in a priority queue and the actual +/// `Packet` are stored in a map. +pub struct PacketContainer { + priority_queue: MinMaxHeap, + packets: Slab, +} + +impl PacketContainer { + pub fn with_capacity(capacity: usize) -> Self { + Self { + priority_queue: MinMaxHeap::with_capacity(capacity), + packets: Slab::with_capacity(capacity), + } + } + + pub fn is_empty(&self) -> bool { + self.priority_queue.is_empty() + } + + pub fn is_full(&self) -> bool { + self.priority_queue.len() == self.priority_queue.capacity() + } + + pub fn min_priority(&self) -> Option { + self.priority_queue.peek_min().map(|min| min.priority) + } + + pub fn pop_and_remove_max(&mut self) -> Option { + self.priority_queue + .pop_max() + .map(|max| self.packets.remove(max.index)) + } + + pub fn pop_and_remove_min(&mut self) -> Option { + self.priority_queue + .pop_min() + .map(|min| self.packets.remove(min.index)) + } + + pub fn insert(&mut self, packet: Packet, priority: u64) { + let entry = self.packets.vacant_entry(); + let index = entry.key(); + entry.insert(packet.clone()); + let priority_index = PriorityIndex { priority, index }; + self.priority_queue.push(priority_index); + } +} + +#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] +struct PriorityIndex { + priority: u64, + index: usize, +} From 24630c8d58ae96785a718ea782d6698c13f162d6 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 27 Jan 2025 09:21:55 -0600 Subject: [PATCH 17/21] packet_container tests --- core/src/forwarding_stage/packet_container.rs | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/core/src/forwarding_stage/packet_container.rs b/core/src/forwarding_stage/packet_container.rs index 4ac171562dc41d..f299ddf749cb61 100644 --- a/core/src/forwarding_stage/packet_container.rs +++ b/core/src/forwarding_stage/packet_container.rs @@ -54,3 +54,77 @@ struct PriorityIndex { priority: u64, index: usize, } + +#[cfg(test)] +mod tests { + use {super::*, solana_sdk::packet::PacketFlags}; + + fn simple_packet_with_flags(packet_flags: PacketFlags) -> Packet { + let mut packet = Packet::default(); + packet.meta_mut().flags = packet_flags; + packet + } + + #[test] + fn test_packet_container_status() { + let mut container = PacketContainer::with_capacity(2); + assert!(container.is_empty()); + assert!(!container.is_full()); + container.insert(simple_packet_with_flags(PacketFlags::empty()), 1); + assert!(!container.is_empty()); + assert!(!container.is_full()); + container.insert(simple_packet_with_flags(PacketFlags::all()), 2); + assert!(!container.is_empty()); + assert!(container.is_full()); + } + + #[test] + fn test_packet_container_pop_and_remove_min() { + let mut container = PacketContainer::with_capacity(2); + assert!(container.pop_and_remove_min().is_none()); + container.insert(simple_packet_with_flags(PacketFlags::empty()), 1); + container.insert(simple_packet_with_flags(PacketFlags::all()), 2); + assert_eq!( + container + .pop_and_remove_min() + .expect("not empty") + .meta() + .flags, + PacketFlags::empty() + ); + assert_eq!( + container + .pop_and_remove_min() + .expect("not empty") + .meta() + .flags, + PacketFlags::all() + ); + assert!(container.pop_and_remove_min().is_none()); + } + + #[test] + fn test_packet_container_pop_and_remove_max() { + let mut container = PacketContainer::with_capacity(2); + assert!(container.pop_and_remove_max().is_none()); + container.insert(simple_packet_with_flags(PacketFlags::empty()), 1); + container.insert(simple_packet_with_flags(PacketFlags::all()), 2); + assert_eq!( + container + .pop_and_remove_max() + .expect("not empty") + .meta() + .flags, + PacketFlags::all() + ); + assert_eq!( + container + .pop_and_remove_max() + .expect("not empty") + .meta() + .flags, + PacketFlags::empty() + ); + assert!(container.pop_and_remove_max().is_none()); + } +} From 52186515a326af81f54ec18b38ff8b4a93cde3fa Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Mon, 27 Jan 2025 15:40:08 +0100 Subject: [PATCH 18/21] add vote client --- core/src/forwarding_stage.rs | 38 +++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/core/src/forwarding_stage.rs b/core/src/forwarding_stage.rs index 823c287bad1ac3..66d9fddb330a70 100644 --- a/core/src/forwarding_stage.rs +++ b/core/src/forwarding_stage.rs @@ -60,6 +60,24 @@ impl ForwardAddressGetter for (T, Arc>) } } +struct VoteClient { + udp_socket: UdpSocket, +} + +impl VoteClient { + fn new() -> Self { + Self { + udp_socket: bind_to_unspecified().unwrap(), + } + } + + fn send_batch(&self, input_batch: &mut Vec<(Vec, SocketAddr)>) { + let mut batch = Vec::with_capacity(FORWARD_BATCH_SIZE); + core::mem::swap(&mut batch, input_batch); // why do we swap? + let _res = batch_send(&self.udp_socket, &batch); + } +} + /// Forwards packets to current/next leader. pub struct ForwardingStage { receiver: Receiver<(BankingPacketBatch, bool)>, @@ -68,8 +86,8 @@ pub struct ForwardingStage { root_bank_cache: RootBankCache, forward_address_getter: F, connection_cache: Arc, + vote_client: VoteClient, data_budget: DataBudget, - udp_socket: UdpSocket, metrics: ForwardingStageMetrics, } @@ -108,8 +126,8 @@ impl ForwardingStage { root_bank_cache, forward_address_getter, connection_cache, + vote_client: VoteClient::new(), data_budget, - udp_socket: bind_to_unspecified().unwrap(), metrics: ForwardingStageMetrics::default(), } } @@ -256,8 +274,8 @@ impl ForwardingStage { return; }; - let mut vote_batch = Vec::with_capacity(FORWARD_BATCH_SIZE); let mut non_vote_batch = Vec::with_capacity(FORWARD_BATCH_SIZE); + let mut vote_batch = Vec::with_capacity(FORWARD_BATCH_SIZE); // Loop through packets creating batches of packets to forward. while let Some(packet) = self.packet_container.pop_and_remove_max() { @@ -275,7 +293,7 @@ impl ForwardingStage { if packet.meta().is_simple_vote_tx() { vote_batch.push((packet_data_vec, tpu_vote)); if vote_batch.len() == vote_batch.capacity() { - self.send_vote_batch(&mut vote_batch); + self.vote_client.send_batch(&mut vote_batch); } } else { non_vote_batch.push(packet_data_vec); @@ -288,7 +306,7 @@ impl ForwardingStage { // Send out remaining packets if !vote_batch.is_empty() { self.metrics.votes_forwarded += vote_batch.len(); - self.send_vote_batch(&mut vote_batch); + self.vote_client.send_batch(&mut vote_batch); } if !non_vote_batch.is_empty() { self.metrics.non_votes_forwarded += non_vote_batch.len(); @@ -311,12 +329,6 @@ impl ForwardingStage { }); } - fn send_vote_batch(&self, vote_batch: &mut Vec<(Vec, SocketAddr)>) { - let mut batch = Vec::with_capacity(FORWARD_BATCH_SIZE); - core::mem::swap(&mut batch, vote_batch); - let _res = batch_send(&self.udp_socket, &batch); - } - fn send_non_vote_batch(&self, addr: SocketAddr, non_vote_batch: &mut Vec>) { let conn = self.connection_cache.get_connection(&addr); let mut batch = Vec::with_capacity(FORWARD_BATCH_SIZE); @@ -471,6 +483,7 @@ mod tests { super::*, crossbeam_channel::unbounded, packet::PacketFlags, + solana_net_utils::bind_to_unspecified, solana_perf::packet::{Packet, PacketBatch}, solana_pubkey::Pubkey, solana_runtime::genesis_utils::create_genesis_config, @@ -582,6 +595,9 @@ mod tests { } forwarding_stage.forward_buffered_packets(); + assert_eq!(forwarding_stage.metrics.non_votes_forwarded, 1); + assert_eq!(forwarding_stage.metrics.votes_forwarded, 1); + let recv_buffer = &mut [0; 1024]; let (vote_packet_bytes, _) = vote_socket.recv_from(recv_buffer).unwrap(); assert_eq!( From 1892554facbbc2eae851e62711383520c248b987 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 29 Jan 2025 14:01:49 -0600 Subject: [PATCH 19/21] typo Co-authored-by: kirill lykov --- core/src/forwarding_stage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/forwarding_stage.rs b/core/src/forwarding_stage.rs index 66d9fddb330a70..be17f2b0aa6f43 100644 --- a/core/src/forwarding_stage.rs +++ b/core/src/forwarding_stage.rs @@ -169,7 +169,7 @@ impl ForwardingStage { } }; - // If timeout waas reached, prevent backup by draining all + // If timeout was reached, prevent backup by draining all // packets in the channel. if timed_out { warn!("ForwardingStage is backed up, dropping packets"); From b494274d04a554a270a307c565c0efa0473713ae Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 29 Jan 2025 15:19:00 -0600 Subject: [PATCH 20/21] remove swap comment --- core/src/forwarding_stage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/forwarding_stage.rs b/core/src/forwarding_stage.rs index be17f2b0aa6f43..5acc64059c3d85 100644 --- a/core/src/forwarding_stage.rs +++ b/core/src/forwarding_stage.rs @@ -73,7 +73,7 @@ impl VoteClient { fn send_batch(&self, input_batch: &mut Vec<(Vec, SocketAddr)>) { let mut batch = Vec::with_capacity(FORWARD_BATCH_SIZE); - core::mem::swap(&mut batch, input_batch); // why do we swap? + core::mem::swap(&mut batch, input_batch); let _res = batch_send(&self.udp_socket, &batch); } } From 05c5216bc84a7e0496ab781043ffcbdf1856c1ac Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 29 Jan 2025 15:22:18 -0600 Subject: [PATCH 21/21] remove swap --- core/src/forwarding_stage.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/forwarding_stage.rs b/core/src/forwarding_stage.rs index 5acc64059c3d85..12d08af1f8dd98 100644 --- a/core/src/forwarding_stage.rs +++ b/core/src/forwarding_stage.rs @@ -71,10 +71,9 @@ impl VoteClient { } } - fn send_batch(&self, input_batch: &mut Vec<(Vec, SocketAddr)>) { - let mut batch = Vec::with_capacity(FORWARD_BATCH_SIZE); - core::mem::swap(&mut batch, input_batch); - let _res = batch_send(&self.udp_socket, &batch); + fn send_batch(&self, batch: &mut Vec<(Vec, SocketAddr)>) { + let _res = batch_send(&self.udp_socket, batch); + batch.clear(); } }