diff --git a/Cargo.lock b/Cargo.lock index de03e70a2cd0f4..a5807893be877d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6839,6 +6839,7 @@ name = "solana-core" version = "2.2.0" dependencies = [ "agave-banking-stage-ingress-types", + "agave-transaction-view", "ahash 0.8.11", "anyhow", "arrayvec", diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 6196f713666e19..c0edc0bee1f73d 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -11,7 +11,7 @@ use { solana_core::{ banking_stage::{update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage}, banking_trace::{BankingTracer, Channels, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT}, - validator::BlockProductionMethod, + validator::{BlockProductionMethod, TransactionStructure}, }, solana_gossip::cluster_info::{ClusterInfo, Node}, solana_ledger::{ @@ -290,6 +290,14 @@ fn main() { .possible_values(BlockProductionMethod::cli_names()) .help(BlockProductionMethod::cli_message()), ) + .arg( + Arg::with_name("transaction_struct") + .long("transaction-structure") + .value_name("STRUCT") + .takes_value(true) + .possible_values(TransactionStructure::cli_names()) + .help(TransactionStructure::cli_message()), + ) .arg( Arg::new("num_banking_threads") .long("num-banking-threads") @@ -320,6 +328,9 @@ fn main() { let block_production_method = matches .value_of_t::("block_production_method") .unwrap_or_default(); + let transaction_struct = matches + .value_of_t::("transaction_struct") + .unwrap_or_default(); let num_banking_threads = matches .value_of_t::("num_banking_threads") .unwrap_or_else(|_| BankingStage::num_threads()); @@ -470,6 +481,7 @@ fn main() { }; let banking_stage = BankingStage::new_num_threads( block_production_method, + transaction_struct, &cluster_info, &poh_recorder, non_vote_receiver, diff --git a/core/Cargo.toml b/core/Cargo.toml index 3425e63fd8fe1b..0c23cfa9108ed4 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -15,6 +15,7 @@ codecov = { repository = "solana-labs/solana", branch = "master", service = "git [dependencies] agave-banking-stage-ingress-types = { workspace = true } +agave-transaction-view = { workspace = true } ahash = { workspace = true } anyhow = { workspace = true } arrayvec = { workspace = true } diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index dfad8bc8c227cf..10e121db4210b9 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -3,7 +3,10 @@ use { agave_banking_stage_ingress_types::BankingPacketBatch, - solana_core::{banking_trace::Channels, validator::BlockProductionMethod}, + solana_core::{ + banking_trace::Channels, + validator::{BlockProductionMethod, TransactionStructure}, + }, solana_vote_program::{vote_state::TowerSync, vote_transaction::new_tower_sync_transaction}, }; @@ -193,7 +196,12 @@ enum TransactionType { ProgramsAndVotes, } -fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { +fn bench_banking( + bencher: &mut Bencher, + tx_type: TransactionType, + block_production_method: BlockProductionMethod, + transaction_struct: TransactionStructure, +) { solana_logger::setup(); let num_threads = BankingStage::num_threads() as usize; // a multiple of packet chunk duplicates to avoid races @@ -297,7 +305,8 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { let cluster_info = Arc::new(cluster_info); let (s, _r) = unbounded(); let _banking_stage = BankingStage::new( - BlockProductionMethod::CentralScheduler, + block_production_method, + transaction_struct, &cluster_info, &poh_recorder, non_vote_receiver, @@ -372,22 +381,82 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { #[bench] fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { - bench_banking(bencher, TransactionType::Accounts); + bench_banking( + bencher, + TransactionType::Accounts, + BlockProductionMethod::CentralScheduler, + TransactionStructure::Sdk, + ); } #[bench] fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { - bench_banking(bencher, TransactionType::Programs); + bench_banking( + bencher, + TransactionType::Programs, + BlockProductionMethod::CentralScheduler, + TransactionStructure::Sdk, + ); } #[bench] fn bench_banking_stage_multi_accounts_with_voting(bencher: &mut Bencher) { - bench_banking(bencher, TransactionType::AccountsAndVotes); + bench_banking( + bencher, + TransactionType::AccountsAndVotes, + BlockProductionMethod::CentralScheduler, + TransactionStructure::Sdk, + ); } #[bench] fn bench_banking_stage_multi_programs_with_voting(bencher: &mut Bencher) { - bench_banking(bencher, TransactionType::ProgramsAndVotes); + bench_banking( + bencher, + TransactionType::ProgramsAndVotes, + BlockProductionMethod::CentralScheduler, + TransactionStructure::Sdk, + ); +} + +#[bench] +fn bench_banking_stage_multi_accounts_view(bencher: &mut Bencher) { + bench_banking( + bencher, + TransactionType::Accounts, + BlockProductionMethod::CentralScheduler, + TransactionStructure::View, + ); +} + +#[bench] +fn bench_banking_stage_multi_programs_view(bencher: &mut Bencher) { + bench_banking( + bencher, + TransactionType::Programs, + BlockProductionMethod::CentralScheduler, + TransactionStructure::View, + ); +} + +#[bench] +fn bench_banking_stage_multi_accounts_with_voting_view(bencher: &mut Bencher) { + bench_banking( + bencher, + TransactionType::AccountsAndVotes, + BlockProductionMethod::CentralScheduler, + TransactionStructure::View, + ); +} + +#[bench] +fn bench_banking_stage_multi_programs_with_voting_view(bencher: &mut Bencher) { + bench_banking( + bencher, + TransactionType::ProgramsAndVotes, + BlockProductionMethod::CentralScheduler, + TransactionStructure::View, + ); } fn simulate_process_entries( diff --git a/core/src/banking_simulation.rs b/core/src/banking_simulation.rs index e811c9c6df9bd8..69c7112d86c45a 100644 --- a/core/src/banking_simulation.rs +++ b/core/src/banking_simulation.rs @@ -8,7 +8,7 @@ use { BankingTracer, ChannelLabel, Channels, TimedTracedEvent, TracedEvent, TracedSender, TracerThread, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, BASENAME, }, - validator::BlockProductionMethod, + validator::{BlockProductionMethod, TransactionStructure}, }, agave_banking_stage_ingress_types::BankingPacketBatch, assert_matches::assert_matches, @@ -679,6 +679,7 @@ impl BankingSimulator { bank_forks: Arc>, blockstore: Arc, block_production_method: BlockProductionMethod, + transaction_struct: TransactionStructure, ) -> (SenderLoop, SimulatorLoop, SimulatorThreads) { let parent_slot = self.parent_slot().unwrap(); let mut packet_batches_by_time = self.banking_trace_events.packet_batches_by_time; @@ -810,6 +811,7 @@ impl BankingSimulator { let prioritization_fee_cache = &Arc::new(PrioritizationFeeCache::new(0u64)); let banking_stage = BankingStage::new_num_threads( block_production_method.clone(), + transaction_struct.clone(), &cluster_info, &poh_recorder, non_vote_receiver, @@ -896,12 +898,14 @@ impl BankingSimulator { bank_forks: Arc>, blockstore: Arc, block_production_method: BlockProductionMethod, + transaction_struct: TransactionStructure, ) -> Result<(), SimulateError> { let (sender_loop, simulator_loop, simulator_threads) = self.prepare_simulation( genesis_config, bank_forks, blockstore, block_production_method, + transaction_struct, ); sender_loop.log_starting(); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 845e2981d2fe6d..fb77f899a92caf 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -25,7 +25,7 @@ use { scheduler_controller::SchedulerController, scheduler_error::SchedulerError, }, }, - validator::BlockProductionMethod, + validator::{BlockProductionMethod, TransactionStructure}, }, agave_banking_stage_ingress_types::BankingPacketReceiver, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, @@ -53,7 +53,9 @@ use { }, transaction_scheduler::{ prio_graph_scheduler::PrioGraphSchedulerConfig, - receive_and_buffer::SanitizedTransactionReceiveAndBuffer, + receive_and_buffer::{ + ReceiveAndBuffer, SanitizedTransactionReceiveAndBuffer, TransactionViewReceiveAndBuffer, + }, transaction_state_container::TransactionStateContainer, }, }; @@ -351,6 +353,7 @@ impl BankingStage { #[allow(clippy::too_many_arguments)] pub fn new( block_production_method: BlockProductionMethod, + transaction_struct: TransactionStructure, cluster_info: &impl LikeClusterInfo, poh_recorder: &Arc>, non_vote_receiver: BankingPacketReceiver, @@ -366,6 +369,7 @@ impl BankingStage { ) -> Self { Self::new_num_threads( block_production_method, + transaction_struct, cluster_info, poh_recorder, non_vote_receiver, @@ -385,6 +389,7 @@ impl BankingStage { #[allow(clippy::too_many_arguments)] pub fn new_num_threads( block_production_method: BlockProductionMethod, + transaction_struct: TransactionStructure, cluster_info: &impl LikeClusterInfo, poh_recorder: &Arc>, non_vote_receiver: BankingPacketReceiver, @@ -401,6 +406,7 @@ impl BankingStage { ) -> Self { match block_production_method { BlockProductionMethod::CentralScheduler => Self::new_central_scheduler( + transaction_struct, cluster_info, poh_recorder, non_vote_receiver, @@ -420,6 +426,7 @@ impl BankingStage { #[allow(clippy::too_many_arguments)] pub fn new_central_scheduler( + transaction_struct: TransactionStructure, cluster_info: &impl LikeClusterInfo, poh_recorder: &Arc>, non_vote_receiver: BankingPacketReceiver, @@ -482,6 +489,78 @@ impl BankingStage { )); } + let transaction_struct = + if enable_forwarding && !matches!(transaction_struct, TransactionStructure::Sdk) { + warn!( + "Forwarding only supported for `Sdk` transaction struct. Overriding to use `Sdk`." + ); + TransactionStructure::Sdk + } else { + transaction_struct + }; + + match transaction_struct { + TransactionStructure::Sdk => { + let receive_and_buffer = SanitizedTransactionReceiveAndBuffer::new( + PacketDeserializer::new(non_vote_receiver), + bank_forks.clone(), + enable_forwarding, + ); + Self::spawn_scheduler_and_workers( + &mut bank_thread_hdls, + receive_and_buffer, + decision_maker, + committer, + cluster_info, + poh_recorder, + num_threads, + log_messages_bytes_limit, + connection_cache, + bank_forks, + enable_forwarding, + data_budget, + ); + } + TransactionStructure::View => { + let receive_and_buffer = TransactionViewReceiveAndBuffer { + receiver: non_vote_receiver, + bank_forks: bank_forks.clone(), + }; + Self::spawn_scheduler_and_workers( + &mut bank_thread_hdls, + receive_and_buffer, + decision_maker, + committer, + cluster_info, + poh_recorder, + num_threads, + log_messages_bytes_limit, + connection_cache, + bank_forks, + enable_forwarding, + data_budget, + ); + } + } + + Self { bank_thread_hdls } + } + + #[allow(clippy::too_many_arguments)] + fn spawn_scheduler_and_workers( + bank_thread_hdls: &mut Vec>, + receive_and_buffer: R, + decision_maker: DecisionMaker, + committer: Committer, + cluster_info: &impl LikeClusterInfo, + poh_recorder: &Arc>, + num_threads: u32, + log_messages_bytes_limit: Option, + connection_cache: Arc, + bank_forks: Arc>, + enable_forwarding: bool, + data_budget: Arc, + ) { // Create channels for communication between scheduler and workers let num_workers = (num_threads).saturating_sub(NUM_VOTE_PROCESSING_THREADS); let (work_senders, work_receivers): (Vec>, Vec>) = @@ -527,39 +606,34 @@ impl BankingStage { }); // Spawn the central scheduler thread - bank_thread_hdls.push({ - let packet_deserializer = PacketDeserializer::new(non_vote_receiver); - let receive_and_buffer = SanitizedTransactionReceiveAndBuffer::new( - packet_deserializer, - bank_forks.clone(), - forwarder.is_some(), - ); - let scheduler = PrioGraphScheduler::new( - work_senders, - finished_work_receiver, - PrioGraphSchedulerConfig::default(), - ); - let scheduler_controller = SchedulerController::new( - decision_maker.clone(), - receive_and_buffer, - bank_forks, - scheduler, - worker_metrics, - forwarder, - ); + bank_thread_hdls.push( Builder::new() .name("solBnkTxSched".to_string()) - .spawn(move || match scheduler_controller.run() { - Ok(_) => {} - Err(SchedulerError::DisconnectedRecvChannel(_)) => {} - Err(SchedulerError::DisconnectedSendChannel(_)) => { - warn!("Unexpected worker disconnect from scheduler") + .spawn(move || { + let scheduler = PrioGraphScheduler::new( + work_senders, + finished_work_receiver, + PrioGraphSchedulerConfig::default(), + ); + let scheduler_controller = SchedulerController::new( + decision_maker.clone(), + receive_and_buffer, + bank_forks, + scheduler, + worker_metrics, + forwarder, + ); + + match scheduler_controller.run() { + Ok(_) => {} + Err(SchedulerError::DisconnectedRecvChannel(_)) => {} + Err(SchedulerError::DisconnectedSendChannel(_)) => { + warn!("Unexpected worker disconnect from scheduler") + } } }) - .unwrap() - }); - - Self { bank_thread_hdls } + .unwrap(), + ); } fn spawn_thread_local_multi_iterator_thread( @@ -775,6 +849,7 @@ mod tests { sync::atomic::{AtomicBool, Ordering}, thread::sleep, }, + test_case::test_case, }; pub(crate) fn new_test_cluster_info(keypair: Option>) -> (Node, ClusterInfo) { @@ -793,8 +868,9 @@ mod tests { .collect() } - #[test] - fn test_banking_stage_shutdown1() { + #[test_case(TransactionStructure::Sdk)] + #[test_case(TransactionStructure::View)] + fn test_banking_stage_shutdown1(transaction_struct: TransactionStructure) { let genesis_config = create_genesis_config(2).genesis_config; let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); let banking_tracer = BankingTracer::new_disabled(); @@ -820,6 +896,7 @@ mod tests { let banking_stage = BankingStage::new( BlockProductionMethod::CentralScheduler, + transaction_struct, &cluster_info, &poh_recorder, non_vote_receiver, @@ -843,8 +920,9 @@ mod tests { Blockstore::destroy(ledger_path.path()).unwrap(); } - #[test] - fn test_banking_stage_tick() { + #[test_case(TransactionStructure::Sdk)] + #[test_case(TransactionStructure::View)] + fn test_banking_stage_tick(transaction_struct: TransactionStructure) { solana_logger::setup(); let GenesisConfigInfo { mut genesis_config, .. @@ -880,6 +958,7 @@ mod tests { let banking_stage = BankingStage::new( BlockProductionMethod::CentralScheduler, + transaction_struct, &cluster_info, &poh_recorder, non_vote_receiver, @@ -926,7 +1005,10 @@ mod tests { with_vers.into_iter().map(|(b, _)| b).collect() } - fn test_banking_stage_entries_only(block_production_method: BlockProductionMethod) { + fn test_banking_stage_entries_only( + block_production_method: BlockProductionMethod, + transaction_struct: TransactionStructure, + ) { solana_logger::setup(); let GenesisConfigInfo { genesis_config, @@ -964,6 +1046,7 @@ mod tests { let banking_stage = BankingStage::new( block_production_method, + transaction_struct, &cluster_info, &poh_recorder, non_vote_receiver, @@ -1057,13 +1140,18 @@ mod tests { Blockstore::destroy(ledger_path.path()).unwrap(); } - #[test] - fn test_banking_stage_entries_only_central_scheduler() { - test_banking_stage_entries_only(BlockProductionMethod::CentralScheduler); + #[test_case(TransactionStructure::Sdk)] + #[test_case(TransactionStructure::View)] + fn test_banking_stage_entries_only_central_scheduler(transaction_struct: TransactionStructure) { + test_banking_stage_entries_only( + BlockProductionMethod::CentralScheduler, + transaction_struct, + ); } - #[test] - fn test_banking_stage_entryfication() { + #[test_case(TransactionStructure::Sdk)] + #[test_case(TransactionStructure::View)] + fn test_banking_stage_entryfication(transaction_struct: TransactionStructure) { solana_logger::setup(); // In this attack we'll demonstrate that a verifier can interpret the ledger // differently if either the server doesn't signal the ledger to add an @@ -1134,6 +1222,7 @@ mod tests { let cluster_info = Arc::new(cluster_info); let _banking_stage = BankingStage::new( BlockProductionMethod::CentralScheduler, + transaction_struct, &cluster_info, &poh_recorder, non_vote_receiver, @@ -1291,8 +1380,9 @@ mod tests { tick_producer.unwrap() } - #[test] - fn test_unprocessed_transaction_storage_full_send() { + #[test_case(TransactionStructure::Sdk)] + #[test_case(TransactionStructure::View)] + fn test_unprocessed_transaction_storage_full_send(transaction_struct: TransactionStructure) { solana_logger::setup(); let GenesisConfigInfo { genesis_config, @@ -1330,6 +1420,7 @@ mod tests { let banking_stage = BankingStage::new( BlockProductionMethod::CentralScheduler, + transaction_struct, &cluster_info, &poh_recorder, non_vote_receiver, diff --git a/core/src/banking_stage/packet_filter.rs b/core/src/banking_stage/packet_filter.rs index b9176c9b8ac91d..d4e68f590d8045 100644 --- a/core/src/banking_stage/packet_filter.rs +++ b/core/src/banking_stage/packet_filter.rs @@ -9,6 +9,8 @@ use { thiserror::Error, }; +pub const MAX_ALLOWED_PRECOMPILE_SIGNATURES: u64 = 8; + lazy_static! { // To calculate the static_builtin_cost_sum conservatively, an all-enabled dummy feature_set // is used. It lowers required minimal compute_unit_limit, aligns with future versions. @@ -58,7 +60,6 @@ impl ImmutableDeserializedPacket { } } - const MAX_ALLOWED_PRECOMPILE_SIGNATURES: u64 = 8; if num_precompile_signatures <= MAX_ALLOWED_PRECOMPILE_SIGNATURES { Ok(()) } else { diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index 21afe448d151d3..9284c0f9ed5028 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -1,18 +1,27 @@ use { super::{ scheduler_metrics::{SchedulerCountMetrics, SchedulerTimingMetrics}, - transaction_state_container::StateContainer, + transaction_state::TransactionState, + transaction_state_container::{ + SharedBytes, StateContainer, TransactionViewState, TransactionViewStateContainer, + }, }, crate::banking_stage::{ decision_maker::BufferedPacketsDecision, immutable_deserialized_packet::ImmutableDeserializedPacket, - packet_deserializer::PacketDeserializer, scheduler_messages::MaxAge, + packet_deserializer::PacketDeserializer, packet_filter::MAX_ALLOWED_PRECOMPILE_SIGNATURES, + scheduler_messages::MaxAge, transaction_scheduler::transaction_state::SanitizedTransactionTTL, TransactionStateContainer, }, + agave_banking_stage_ingress_types::{BankingPacketBatch, BankingPacketReceiver}, + agave_transaction_view::{ + resolved_transaction_view::ResolvedTransactionView, + transaction_version::TransactionVersion, transaction_view::SanitizedTransactionView, + }, arrayvec::ArrayVec, core::time::Duration, - crossbeam_channel::RecvTimeoutError, + crossbeam_channel::{RecvTimeoutError, TryRecvError}, solana_accounts_db::account_locks::validate_account_locks, solana_cost_model::cost_model::CostModel, solana_measure::measure_us, @@ -26,24 +35,29 @@ use { clock::{Epoch, Slot, MAX_PROCESSING_AGE}, fee::FeeBudgetLimits, saturating_add_assign, - transaction::SanitizedTransaction, + transaction::{MessageHash, SanitizedTransaction}, }, solana_svm::transaction_error_metrics::TransactionErrorMetrics, - std::sync::{Arc, RwLock}, + solana_svm_transaction::svm_message::SVMMessage, + std::{ + sync::{Arc, RwLock}, + time::Instant, + }, }; pub(crate) trait ReceiveAndBuffer { - type Transaction: TransactionWithMeta; - type Container: StateContainer; + type Transaction: TransactionWithMeta + Send + Sync; + type Container: StateContainer + Send + Sync; - /// Returns whether the packet receiver is still connected. + /// Return Err if the receiver is disconnected AND no packets were + /// received. Otherwise return Ok(num_received). fn receive_and_buffer_packets( &mut self, container: &mut Self::Container, timing_metrics: &mut SchedulerTimingMetrics, count_metrics: &mut SchedulerCountMetrics, decision: &BufferedPacketsDecision, - ) -> bool; + ) -> Result; } pub(crate) struct SanitizedTransactionReceiveAndBuffer { @@ -65,7 +79,7 @@ impl ReceiveAndBuffer for SanitizedTransactionReceiveAndBuffer { timing_metrics: &mut SchedulerTimingMetrics, count_metrics: &mut SchedulerCountMetrics, decision: &BufferedPacketsDecision, - ) -> bool { + ) -> Result { let remaining_queue_capacity = container.remaining_capacity(); const MAX_PACKET_RECEIVE_TIME: Duration = Duration::from_millis(10); @@ -95,7 +109,7 @@ impl ReceiveAndBuffer for SanitizedTransactionReceiveAndBuffer { saturating_add_assign!(timing_metrics.receive_time_us, receive_time_us); }); - match received_packet_results { + let num_received = match received_packet_results { Ok(receive_packet_results) => { let num_received_packets = receive_packet_results.deserialized_packets.len(); @@ -121,12 +135,13 @@ impl ReceiveAndBuffer for SanitizedTransactionReceiveAndBuffer { ); }); } + num_received_packets } - Err(RecvTimeoutError::Timeout) => {} - Err(RecvTimeoutError::Disconnected) => return false, - } + Err(RecvTimeoutError::Timeout) => 0, + Err(RecvTimeoutError::Disconnected) => return Err(()), + }; - true + Ok(num_received) } } @@ -278,6 +293,254 @@ impl SanitizedTransactionReceiveAndBuffer { } } +pub(crate) struct TransactionViewReceiveAndBuffer { + pub receiver: BankingPacketReceiver, + pub bank_forks: Arc>, +} + +impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { + type Transaction = RuntimeTransaction>; + type Container = TransactionViewStateContainer; + + fn receive_and_buffer_packets( + &mut self, + container: &mut Self::Container, + timing_metrics: &mut SchedulerTimingMetrics, + count_metrics: &mut SchedulerCountMetrics, + decision: &BufferedPacketsDecision, + ) -> Result { + let (root_bank, working_bank) = { + let bank_forks = self.bank_forks.read().unwrap(); + let root_bank = bank_forks.root_bank(); + let working_bank = bank_forks.working_bank(); + (root_bank, working_bank) + }; + + // Receive packet batches. + const TIMEOUT: Duration = Duration::from_millis(10); + let start = Instant::now(); + let mut num_received = 0; + let mut received_message = false; + + // If not leader/unknown, do a blocking-receive initially. This lets + // the thread sleep until a message is received, or until the timeout. + // Additionally, only sleep if the container is empty. + if container.is_empty() + && matches!( + decision, + BufferedPacketsDecision::Forward | BufferedPacketsDecision::ForwardAndHold + ) + { + // TODO: Is it better to manually sleep instead, avoiding the locking + // overhead for wakers? But then risk not waking up when message + // received - as long as sleep is somewhat short, this should be + // fine. + match self.receiver.recv_timeout(TIMEOUT) { + Ok(packet_batch_message) => { + received_message = true; + num_received += self.handle_packet_batch_message( + container, + timing_metrics, + count_metrics, + decision, + &root_bank, + &working_bank, + packet_batch_message, + ); + } + Err(RecvTimeoutError::Timeout) => return Ok(num_received), + Err(RecvTimeoutError::Disconnected) => { + return received_message.then_some(num_received).ok_or(()); + } + } + } + + while start.elapsed() < TIMEOUT { + match self.receiver.try_recv() { + Ok(packet_batch_message) => { + received_message = true; + num_received += self.handle_packet_batch_message( + container, + timing_metrics, + count_metrics, + decision, + &root_bank, + &working_bank, + packet_batch_message, + ); + } + Err(TryRecvError::Empty) => return Ok(num_received), + Err(TryRecvError::Disconnected) => { + return received_message.then_some(num_received).ok_or(()); + } + } + } + + Ok(num_received) + } +} + +impl TransactionViewReceiveAndBuffer { + /// Return number of received packets. + fn handle_packet_batch_message( + &mut self, + container: &mut TransactionViewStateContainer, + timing_metrics: &mut SchedulerTimingMetrics, + count_metrics: &mut SchedulerCountMetrics, + decision: &BufferedPacketsDecision, + root_bank: &Bank, + working_bank: &Bank, + packet_batch_message: BankingPacketBatch, + ) -> usize { + // Do not support forwarding - only add support for this if we really need it. + if matches!(decision, BufferedPacketsDecision::Forward) { + return 0; + } + + let start = Instant::now(); + // Sanitize packets, generate IDs, and insert into the container. + let alt_resolved_slot = root_bank.slot(); + let sanitized_epoch = root_bank.epoch(); + let transaction_account_lock_limit = working_bank.get_transaction_account_lock_limit(); + + let mut num_received = 0usize; + let mut num_buffered = 0usize; + let mut num_dropped_on_capacity = 0usize; + let mut num_dropped_on_receive = 0usize; + for packet_batch in packet_batch_message.iter() { + for packet in packet_batch.iter() { + let Some(packet_data) = packet.data(..) else { + continue; + }; + + num_received += 1; + + // Reserve free-space to copy packet into, run sanitization checks, and insert. + if container.try_insert_with_data( + packet_data, + |bytes| match Self::try_handle_packet( + bytes, + root_bank, + working_bank, + alt_resolved_slot, + sanitized_epoch, + transaction_account_lock_limit, + ) { + Ok(state) => { + num_buffered += 1; + Ok(state) + } + Err(()) => { + num_dropped_on_receive += 1; + Err(()) + } + }, + ) { + num_dropped_on_capacity += 1; + }; + } + } + + let buffer_time_us = start.elapsed().as_micros() as u64; + timing_metrics.update(|timing_metrics| { + saturating_add_assign!(timing_metrics.buffer_time_us, buffer_time_us); + }); + count_metrics.update(|count_metrics| { + saturating_add_assign!(count_metrics.num_received, num_received); + saturating_add_assign!(count_metrics.num_buffered, num_buffered); + saturating_add_assign!( + count_metrics.num_dropped_on_capacity, + num_dropped_on_capacity + ); + saturating_add_assign!(count_metrics.num_dropped_on_receive, num_dropped_on_receive); + }); + + num_received + } + + fn try_handle_packet( + bytes: SharedBytes, + root_bank: &Bank, + working_bank: &Bank, + alt_resolved_slot: Slot, + sanitized_epoch: Epoch, + transaction_account_lock_limit: usize, + ) -> Result { + // Parsing and basic sanitization checks + let Ok(view) = SanitizedTransactionView::try_new_sanitized(bytes) else { + return Err(()); + }; + + let Ok(view) = RuntimeTransaction::>::try_from( + view, + MessageHash::Compute, + None, + ) else { + return Err(()); + }; + + // Discard non-vote packets if in vote-only mode. + if root_bank.vote_only_bank() && !view.is_simple_vote_transaction() { + return Err(()); + } + + // Check excessive pre-compiles. + let signature_details = view.signature_details(); + let num_precompiles = signature_details.num_ed25519_instruction_signatures() + + signature_details.num_secp256k1_instruction_signatures() + + signature_details.num_secp256r1_instruction_signatures(); + if num_precompiles > MAX_ALLOWED_PRECOMPILE_SIGNATURES { + return Err(()); + } + + // Load addresses for transaction. + let load_addresses_result = match view.version() { + TransactionVersion::Legacy => Ok((None, u64::MAX)), + TransactionVersion::V0 => root_bank + .load_addresses_from_ref(view.address_table_lookup_iter()) + .map(|(loaded_addresses, deactivation_slot)| { + (Some(loaded_addresses), deactivation_slot) + }), + }; + let Ok((loaded_addresses, deactivation_slot)) = load_addresses_result else { + return Err(()); + }; + + let Ok(view) = RuntimeTransaction::>::try_from( + view, + loaded_addresses, + root_bank.get_reserved_account_keys(), + ) else { + return Err(()); + }; + + if validate_account_locks(view.account_keys(), transaction_account_lock_limit).is_err() { + return Err(()); + } + + let Ok(compute_budget_limits) = view + .compute_budget_instruction_details() + .sanitize_and_convert_to_compute_budget_limits(&working_bank.feature_set) + else { + return Err(()); + }; + + let max_age = calculate_max_age(sanitized_epoch, deactivation_slot, alt_resolved_slot); + let fee_budget_limits = FeeBudgetLimits::from(compute_budget_limits); + let (priority, cost) = calculate_priority_and_cost(&view, &fee_budget_limits, working_bank); + + Ok(TransactionState::new( + SanitizedTransactionTTL { + transaction: view, + max_age, + }, + None, + priority, + cost, + )) + } +} + /// Calculate priority and cost for a transaction: /// /// Cost is calculated through the `CostModel`, @@ -297,7 +560,7 @@ impl SanitizedTransactionReceiveAndBuffer { /// Any difference in the prioritization is negligible for /// the current transaction costs. fn calculate_priority_and_cost( - transaction: &RuntimeTransaction, + transaction: &impl TransactionWithMeta, fee_budget_limits: &FeeBudgetLimits, bank: &Bank, ) -> (u64, u64) { diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 8cd38b81ab3e67..e4e4388cff9c0d 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -107,7 +107,7 @@ impl SchedulerController { self.process_transactions(&decision)?; self.receive_completed()?; - if !self.receive_and_buffer_packets(&decision) { + if self.receive_and_buffer_packets(&decision).is_err() { break; } // Report metrics only if there is data. @@ -282,13 +282,13 @@ impl SchedulerController { ids_to_add_back.push(*id); // add back to the queue at end let state = self.container.get_mut_transaction_state(id.id).unwrap(); let sanitized_transaction = &state.transaction_ttl().transaction; - let immutable_packet = state.packet().clone(); + let immutable_packet = state.packet().expect("forwarding requires packet"); // If not already forwarded and can be forwarded, add to forwardable packets. if state.should_forward() && forwarder.try_add_packet( sanitized_transaction, - immutable_packet, + immutable_packet.clone(), feature_set, ) { @@ -421,7 +421,10 @@ impl SchedulerController { } /// Returns whether the packet receiver is still connected. - fn receive_and_buffer_packets(&mut self, decision: &BufferedPacketsDecision) -> bool { + fn receive_and_buffer_packets( + &mut self, + decision: &BufferedPacketsDecision, + ) -> Result { self.receive_and_buffer.receive_and_buffer_packets( &mut self.container, &mut self.timing_metrics, @@ -444,8 +447,9 @@ mod tests { prio_graph_scheduler::PrioGraphSchedulerConfig, receive_and_buffer::SanitizedTransactionReceiveAndBuffer, }, + TransactionViewReceiveAndBuffer, }, - agave_banking_stage_ingress_types::BankingPacketBatch, + agave_banking_stage_ingress_types::{BankingPacketBatch, BankingPacketReceiver}, crossbeam_channel::{unbounded, Receiver, Sender}, itertools::Itertools, solana_gossip::cluster_info::ClusterInfo, @@ -456,21 +460,15 @@ mod tests { solana_perf::packet::{to_packet_batches, PacketBatch, NUM_PACKETS}, solana_poh::poh_recorder::{PohRecorder, Record, WorkingBankEntry}, solana_runtime::bank::Bank, - solana_runtime_transaction::runtime_transaction::RuntimeTransaction, + solana_runtime_transaction::transaction_meta::StaticMeta, solana_sdk::{ - compute_budget::ComputeBudgetInstruction, - fee_calculator::FeeRateGovernor, - hash::Hash, - message::Message, - poh_config::PohConfig, - pubkey::Pubkey, - signature::Keypair, - signer::Signer, - system_instruction, system_transaction, - transaction::{SanitizedTransaction, Transaction}, + compute_budget::ComputeBudgetInstruction, fee_calculator::FeeRateGovernor, hash::Hash, + message::Message, poh_config::PohConfig, pubkey::Pubkey, signature::Keypair, + signer::Signer, system_instruction, system_transaction, transaction::Transaction, }, std::sync::{atomic::AtomicBool, Arc, RwLock}, tempfile::TempDir, + test_case::test_case, }; fn create_channels(num: usize) -> (Vec>, Vec>) { @@ -479,7 +477,7 @@ mod tests { // Helper struct to create tests that hold channels, files, etc. // such that our tests can be more easily set up and run. - struct TestFrame { + struct TestFrame { bank: Arc, mint_keypair: Keypair, _ledger_path: TempDir, @@ -488,18 +486,38 @@ mod tests { poh_recorder: Arc>, banking_packet_sender: Sender>>, - consume_work_receivers: - Vec>>>, - finished_consume_work_sender: - Sender>>, + consume_work_receivers: Vec>>, + finished_consume_work_sender: Sender>, + } + + fn test_create_sanitized_transaction_receive_and_buffer( + receiver: BankingPacketReceiver, + bank_forks: Arc>, + ) -> SanitizedTransactionReceiveAndBuffer { + SanitizedTransactionReceiveAndBuffer::new( + PacketDeserializer::new(receiver), + bank_forks, + false, + ) + } + + fn test_create_transaction_view_receive_and_buffer( + receiver: BankingPacketReceiver, + bank_forks: Arc>, + ) -> TransactionViewReceiveAndBuffer { + TransactionViewReceiveAndBuffer { + receiver, + bank_forks, + } } #[allow(clippy::type_complexity)] - fn create_test_frame( + fn create_test_frame( num_threads: usize, + create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc>) -> R, ) -> ( - TestFrame, - SchedulerController, SanitizedTransactionReceiveAndBuffer>, + TestFrame, + SchedulerController, R>, ) { let GenesisConfigInfo { mut genesis_config, @@ -527,7 +545,8 @@ mod tests { let decision_maker = DecisionMaker::new(Pubkey::new_unique(), poh_recorder.clone()); let (banking_packet_sender, banking_packet_receiver) = unbounded(); - let packet_deserializer = PacketDeserializer::new(banking_packet_receiver); + let receive_and_buffer = + create_receive_and_buffer(banking_packet_receiver, bank_forks.clone()); let (consume_work_senders, consume_work_receivers) = create_channels(num_threads); let (finished_consume_work_sender, finished_consume_work_receiver) = unbounded(); @@ -544,12 +563,6 @@ mod tests { finished_consume_work_sender, }; - let receive_and_buffer = SanitizedTransactionReceiveAndBuffer::new( - packet_deserializer, - bank_forks.clone(), - false, - ); - let scheduler = PrioGraphScheduler::new( consume_work_senders, finished_consume_work_receiver, @@ -605,24 +618,33 @@ mod tests { // In the tests, the decision will not become stale, so it is more convenient // to receive first and then schedule. fn test_receive_then_schedule( - scheduler_controller: &mut SchedulerController< - Arc, - SanitizedTransactionReceiveAndBuffer, - >, + scheduler_controller: &mut SchedulerController, impl ReceiveAndBuffer>, ) { let decision = scheduler_controller .decision_maker .make_consume_or_forward_decision(); assert!(matches!(decision, BufferedPacketsDecision::Consume(_))); assert!(scheduler_controller.receive_completed().is_ok()); - assert!(scheduler_controller.receive_and_buffer_packets(&decision)); + + // Time is not a reliable way for deterministic testing. + // Loop here until no more packets are received, this avoids parallel + // tests from inconsistently timing out and not receiving + // from the channel. + while scheduler_controller + .receive_and_buffer_packets(&decision) + .map(|n| n > 0) + .unwrap_or_default() + {} assert!(scheduler_controller.process_transactions(&decision).is_ok()); } - #[test] + #[test_case(test_create_sanitized_transaction_receive_and_buffer; "Sdk")] + #[test_case(test_create_transaction_view_receive_and_buffer; "View")] #[should_panic(expected = "batch id 0 is not being tracked")] - fn test_unexpected_batch_id() { - let (test_frame, scheduler_controller) = create_test_frame(1); + fn test_unexpected_batch_id( + create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc>) -> R, + ) { + let (test_frame, scheduler_controller) = create_test_frame(1, create_receive_and_buffer); let TestFrame { finished_consume_work_sender, .. @@ -643,9 +665,13 @@ mod tests { scheduler_controller.run().unwrap(); } - #[test] - fn test_schedule_consume_single_threaded_no_conflicts() { - let (test_frame, mut scheduler_controller) = create_test_frame(1); + #[test_case(test_create_sanitized_transaction_receive_and_buffer; "Sdk")] + #[test_case(test_create_transaction_view_receive_and_buffer; "View")] + fn test_schedule_consume_single_threaded_no_conflicts( + create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc>) -> R, + ) { + let (test_frame, mut scheduler_controller) = + create_test_frame(1, create_receive_and_buffer); let TestFrame { bank, mint_keypair, @@ -699,9 +725,13 @@ mod tests { assert_eq!(message_hashes, vec![&tx2_hash, &tx1_hash]); } - #[test] - fn test_schedule_consume_single_threaded_conflict() { - let (test_frame, mut scheduler_controller) = create_test_frame(1); + #[test_case(test_create_sanitized_transaction_receive_and_buffer; "Sdk")] + #[test_case(test_create_transaction_view_receive_and_buffer; "View")] + fn test_schedule_consume_single_threaded_conflict( + create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc>) -> R, + ) { + let (test_frame, mut scheduler_controller) = + create_test_frame(1, create_receive_and_buffer); let TestFrame { bank, mint_keypair, @@ -758,9 +788,13 @@ mod tests { assert_eq!(message_hashes, vec![&tx2_hash, &tx1_hash]); } - #[test] - fn test_schedule_consume_single_threaded_multi_batch() { - let (test_frame, mut scheduler_controller) = create_test_frame(1); + #[test_case(test_create_sanitized_transaction_receive_and_buffer; "Sdk")] + #[test_case(test_create_transaction_view_receive_and_buffer; "View")] + fn test_schedule_consume_single_threaded_multi_batch( + create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc>) -> R, + ) { + let (test_frame, mut scheduler_controller) = + create_test_frame(1, create_receive_and_buffer); let TestFrame { bank, mint_keypair, @@ -822,9 +856,13 @@ mod tests { ); } - #[test] - fn test_schedule_consume_simple_thread_selection() { - let (test_frame, mut scheduler_controller) = create_test_frame(2); + #[test_case(test_create_sanitized_transaction_receive_and_buffer; "Sdk")] + #[test_case(test_create_transaction_view_receive_and_buffer; "View")] + fn test_schedule_consume_simple_thread_selection( + create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc>) -> R, + ) { + let (test_frame, mut scheduler_controller) = + create_test_frame(2, create_receive_and_buffer); let TestFrame { bank, mint_keypair, @@ -889,9 +927,13 @@ mod tests { assert_eq!(t1_actual, t1_expected); } - #[test] - fn test_schedule_consume_retryable() { - let (test_frame, mut scheduler_controller) = create_test_frame(1); + #[test_case(test_create_sanitized_transaction_receive_and_buffer; "Sdk")] + #[test_case(test_create_transaction_view_receive_and_buffer; "View")] + fn test_schedule_consume_retryable( + create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc>) -> R, + ) { + let (test_frame, mut scheduler_controller) = + create_test_frame(1, create_receive_and_buffer); let TestFrame { bank, mint_keypair, diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state.rs b/core/src/banking_stage/transaction_scheduler/transaction_state.rs index ecea0a71847670..d684a2994c1900 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state.rs @@ -2,6 +2,7 @@ use { crate::banking_stage::{ immutable_deserialized_packet::ImmutableDeserializedPacket, scheduler_messages::MaxAge, }, + solana_sdk::packet::{self}, std::sync::Arc, }; @@ -35,14 +36,14 @@ pub(crate) enum TransactionState { /// The transaction is available for scheduling. Unprocessed { transaction_ttl: SanitizedTransactionTTL, - packet: Arc, + packet: Option>, priority: u64, cost: u64, should_forward: bool, }, /// The transaction is currently scheduled or being processed. Pending { - packet: Arc, + packet: Option>, priority: u64, cost: u64, should_forward: bool, @@ -55,12 +56,14 @@ impl TransactionState { /// Creates a new `TransactionState` in the `Unprocessed` state. pub(crate) fn new( transaction_ttl: SanitizedTransactionTTL, - packet: Arc, + packet: Option>, priority: u64, cost: u64, ) -> Self { - let should_forward = !packet.original_packet().meta().forwarded() - && packet.original_packet().meta().is_from_staked_node(); + let should_forward = packet + .as_ref() + .map(|packet| should_forward_from_meta(packet.original_packet().meta())) + .unwrap_or_default(); Self::Unprocessed { transaction_ttl, packet, @@ -116,10 +119,10 @@ impl TransactionState { } /// Return the packet of the transaction. - pub(crate) fn packet(&self) -> &Arc { + pub(crate) fn packet(&self) -> Option<&Arc> { match self { - Self::Unprocessed { packet, .. } => packet, - Self::Pending { packet, .. } => packet, + Self::Unprocessed { packet, .. } => packet.as_ref(), + Self::Pending { packet, .. } => packet.as_ref(), Self::Transitioning => unreachable!(), } } @@ -206,10 +209,15 @@ impl TransactionState { } } +fn should_forward_from_meta(meta: &packet::Meta) -> bool { + !meta.forwarded() && meta.is_from_staked_node() +} + #[cfg(test)] mod tests { use { super::*, + packet::PacketFlags, solana_runtime_transaction::runtime_transaction::RuntimeTransaction, solana_sdk::{ compute_budget::ComputeBudgetInstruction, @@ -244,7 +252,7 @@ mod tests { const TEST_TRANSACTION_COST: u64 = 5000; TransactionState::new( transaction_ttl, - packet, + Some(packet), compute_unit_price, TEST_TRANSACTION_COST, ) @@ -365,4 +373,23 @@ mod tests { )); assert_eq!(transaction_ttl.max_age, MaxAge::MAX); } + + #[test] + fn test_initialize_should_forward() { + let meta = packet::Meta::default(); + assert!(!should_forward_from_meta(&meta)); + + let mut meta = packet::Meta::default(); + meta.flags.set(PacketFlags::FORWARDED, true); + assert!(!should_forward_from_meta(&meta)); + + let mut meta = packet::Meta::default(); + meta.set_from_staked_node(true); + assert!(should_forward_from_meta(&meta)); + + let mut meta = packet::Meta::default(); + meta.flags.set(PacketFlags::FORWARDED, true); + meta.set_from_staked_node(true); + assert!(!should_forward_from_meta(&meta)); + } } diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index ada23d24219e09..c9c8ddbde751e5 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -7,10 +7,14 @@ use { immutable_deserialized_packet::ImmutableDeserializedPacket, scheduler_messages::TransactionId, }, + agave_transaction_view::resolved_transaction_view::ResolvedTransactionView, itertools::MinMaxResult, min_max_heap::MinMaxHeap, - slab::Slab, - solana_runtime_transaction::transaction_with_meta::TransactionWithMeta, + slab::{Slab, VacantEntry}, + solana_runtime_transaction::{ + runtime_transaction::RuntimeTransaction, transaction_with_meta::TransactionWithMeta, + }, + solana_sdk::packet::PACKET_DATA_SIZE, std::sync::Arc, }; @@ -65,23 +69,20 @@ pub(crate) trait StateContainer { /// Panics if the transaction does not exist. fn get_transaction_ttl(&self, id: TransactionId) -> Option<&SanitizedTransactionTTL>; - /// Insert a new transaction into the container's queues and maps. - /// Returns `true` if a packet was dropped due to capacity limits. - fn insert_new_transaction( - &mut self, - transaction_ttl: SanitizedTransactionTTL, - packet: Arc, - priority: u64, - cost: u64, - ) -> bool; - /// Retries a transaction - inserts transaction back into map (but not packet). /// This transitions the transaction to `Unprocessed` state. fn retry_transaction( &mut self, transaction_id: TransactionId, transaction_ttl: SanitizedTransactionTTL, - ); + ) { + let transaction_state = self + .get_mut_transaction_state(transaction_id) + .expect("transaction must exist"); + let priority_id = TransactionPriorityId::new(transaction_state.priority(), transaction_id); + transaction_state.transition_to_unprocessed(transaction_ttl); + self.push_id_into_queue(priority_id); + } /// Pushes a transaction id into the priority queue. If the queue is full, the lowest priority /// transaction will be dropped (removed from the queue and map). @@ -132,7 +133,29 @@ impl StateContainer for TransactionStateContainer bool { + self.push_id_into_queue_with_remaining_capacity(priority_id, self.remaining_capacity()) + } + + fn remove_by_id(&mut self, id: TransactionId) { + self.id_to_transaction_state.remove(id); + } + + fn get_min_max_priority(&self) -> MinMaxResult { + match self.priority_queue.peek_min() { + Some(min) => match self.priority_queue.peek_max() { + Some(max) => MinMaxResult::MinMax(min.priority, max.priority), + None => MinMaxResult::OneElement(min.priority), + }, + None => MinMaxResult::NoElements, + } + } +} + +impl TransactionStateContainer { + /// Insert a new transaction into the container's queues and maps. + /// Returns `true` if a packet was dropped due to capacity limits. + pub(crate) fn insert_new_transaction( &mut self, transaction_ttl: SanitizedTransactionTTL, packet: Arc, @@ -143,11 +166,11 @@ impl StateContainer for TransactionStateContainer StateContainer for TransactionStateContainer, - ) { - let transaction_state = self - .get_mut_transaction_state(transaction_id) - .expect("transaction must exist"); - let priority_id = TransactionPriorityId::new(transaction_state.priority(), transaction_id); - transaction_state.transition_to_unprocessed(transaction_ttl); - self.push_id_into_queue(priority_id); - } - - fn push_id_into_queue(&mut self, priority_id: TransactionPriorityId) -> bool { - self.push_id_into_queue_with_remaining_capacity(priority_id, self.remaining_capacity()) - } - - fn remove_by_id(&mut self, id: TransactionId) { - self.id_to_transaction_state.remove(id); - } - - fn get_min_max_priority(&self) -> MinMaxResult { - match self.priority_queue.peek_min() { - Some(min) => match self.priority_queue.peek_max() { - Some(max) => MinMaxResult::MinMax(min.priority, max.priority), - None => MinMaxResult::OneElement(min.priority), - }, - None => MinMaxResult::NoElements, - } - } -} - -impl TransactionStateContainer { fn push_id_into_queue_with_remaining_capacity( &mut self, priority_id: TransactionPriorityId, @@ -204,6 +194,132 @@ impl TransactionStateContainer { false } } + + fn get_vacant_map_entry(&mut self) -> VacantEntry> { + assert!(self.id_to_transaction_state.len() < self.id_to_transaction_state.capacity()); + self.id_to_transaction_state.vacant_entry() + } +} + +pub type SharedBytes = Arc>; +pub(crate) type RuntimeTransactionView = RuntimeTransaction>; +pub(crate) type TransactionViewState = TransactionState; + +/// A wrapper around `TransactionStateContainer` that allows re-uses +/// pre-allocated `Bytes` to copy packet data into and use for serialization. +/// This is used to avoid allocations in parsing transactions. +pub struct TransactionViewStateContainer { + inner: TransactionStateContainer, + bytes_buffer: Box<[SharedBytes]>, +} + +impl TransactionViewStateContainer { + /// Returns true if packet was dropped due to capacity limits. + pub(crate) fn try_insert_with_data( + &mut self, + data: &[u8], + f: impl FnOnce(SharedBytes) -> Result, ()>, + ) -> bool { + // Get remaining capacity before inserting. + let remaining_capacity = self.remaining_capacity(); + + // Get a vacant entry in the slab. + let vacant_entry = self.inner.get_vacant_map_entry(); + let transaction_id = vacant_entry.key(); + + // Get the vacant space in the bytes buffer. + let bytes_entry = &mut self.bytes_buffer[transaction_id]; + // Assert the entry is unique, then copy the packet data. + { + // The strong count must be 1 here. These are only cloned into the + // inner container below, wrapped by a `ResolveTransactionView`, + // which does not expose the backing memory (the `Arc`), or + // implement `Clone`. + // This could only fail if there is a bug in the container that the + // entry in the slab was not cleared. However, since we share + // indexing between the slab and our `bytes_buffer`, we know that + // `vacant_entry` is not occupied. + assert_eq!(Arc::strong_count(bytes_entry), 1, "entry must be unique"); + let bytes = Arc::make_mut(bytes_entry); + + // Clear and copy the packet data into the bytes buffer. + bytes.clear(); + bytes.extend_from_slice(data); + } + + // Attempt to insert the transaction. + match f(Arc::clone(bytes_entry)) { + Ok(state) => { + let priority_id = TransactionPriorityId::new(state.priority(), transaction_id); + vacant_entry.insert(state); + + // Push the transaction into the queue. + self.inner + .push_id_into_queue_with_remaining_capacity(priority_id, remaining_capacity) + } + Err(_) => false, + } + } +} + +impl StateContainer for TransactionViewStateContainer { + fn with_capacity(capacity: usize) -> Self { + let inner = TransactionStateContainer::with_capacity(capacity); + let bytes_buffer = (0..inner.id_to_transaction_state.capacity()) + .map(|_| Arc::new(Vec::with_capacity(PACKET_DATA_SIZE))) + .collect::>() + .into_boxed_slice(); + Self { + inner, + bytes_buffer, + } + } + + #[inline] + fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + #[inline] + fn remaining_capacity(&self) -> usize { + self.inner.remaining_capacity() + } + + #[inline] + fn pop(&mut self) -> Option { + self.inner.pop() + } + + #[inline] + fn get_mut_transaction_state( + &mut self, + id: TransactionId, + ) -> Option<&mut TransactionViewState> { + self.inner.get_mut_transaction_state(id) + } + + #[inline] + fn get_transaction_ttl( + &self, + id: TransactionId, + ) -> Option<&SanitizedTransactionTTL> { + self.inner.get_transaction_ttl(id) + } + + #[inline] + fn push_id_into_queue(&mut self, priority_id: TransactionPriorityId) -> bool { + self.inner.push_id_into_queue(priority_id) + } + + #[inline] + fn remove_by_id(&mut self, id: TransactionId) { + self.inner.remove_by_id(id); + } + + #[inline] + fn get_min_max_priority(&self) -> MinMaxResult { + self.inner.get_min_max_priority() + } } #[cfg(test)] diff --git a/core/src/tpu.rs b/core/src/tpu.rs index d715bb5c7b0534..b5a73415160260 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -15,7 +15,7 @@ use { sigverify_stage::SigVerifyStage, staked_nodes_updater_service::StakedNodesUpdaterService, tpu_entry_notifier::TpuEntryNotifier, - validator::{BlockProductionMethod, GeneratorConfig}, + validator::{BlockProductionMethod, GeneratorConfig, TransactionStructure}, }, bytes::Bytes, crossbeam_channel::{unbounded, Receiver}, @@ -118,6 +118,7 @@ impl Tpu { tpu_max_connections_per_ipaddr_per_minute: u64, prioritization_fee_cache: &Arc, block_production_method: BlockProductionMethod, + transaction_struct: TransactionStructure, enable_block_production_forwarding: bool, _generator_config: Option, /* vestigial code for replay invalidator */ ) -> (Self, Vec>) { @@ -269,6 +270,7 @@ impl Tpu { let banking_stage = BankingStage::new( block_production_method, + transaction_struct, cluster_info, poh_recorder, non_vote_receiver, diff --git a/core/src/validator.rs b/core/src/validator.rs index 10f23b4dde15c3..a540cc9c564642 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -207,6 +207,31 @@ impl BlockProductionMethod { } } +#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)] +#[strum(serialize_all = "kebab-case")] +pub enum TransactionStructure { + #[default] + Sdk, + View, +} + +impl TransactionStructure { + pub const fn cli_names() -> &'static [&'static str] { + Self::VARIANTS + } + + pub fn cli_message() -> &'static str { + lazy_static! { + static ref MESSAGE: String = format!( + "Switch internal transaction structure/representation [default: {}]", + TransactionStructure::default() + ); + }; + + &MESSAGE + } +} + /// Configuration for the block generator invalidator for replay. #[derive(Clone, Debug)] pub struct GeneratorConfig { @@ -273,6 +298,7 @@ pub struct ValidatorConfig { pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit, pub block_verification_method: BlockVerificationMethod, pub block_production_method: BlockProductionMethod, + pub transaction_struct: TransactionStructure, pub enable_block_production_forwarding: bool, pub generator_config: Option, pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup, @@ -345,6 +371,7 @@ impl Default for ValidatorConfig { banking_trace_dir_byte_limit: 0, block_verification_method: BlockVerificationMethod::default(), block_production_method: BlockProductionMethod::default(), + transaction_struct: TransactionStructure::default(), enable_block_production_forwarding: false, generator_config: None, use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(), @@ -869,8 +896,8 @@ impl Validator { config.accounts_db_test_hash_calculation, ); info!( - "Using: block-verification-method: {}, block-production-method: {}", - config.block_verification_method, config.block_production_method + "Using: block-verification-method: {}, block-production-method: {}, transaction-structure: {}", + config.block_verification_method, config.block_production_method, config.transaction_struct ); let (replay_vote_sender, replay_vote_receiver) = unbounded(); @@ -1524,6 +1551,7 @@ impl Validator { tpu_max_connections_per_ipaddr_per_minute, &prioritization_fee_cache, config.block_production_method.clone(), + config.transaction_struct.clone(), config.enable_block_production_forwarding, config.generator_config.clone(), ); diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 8e0cdc5a019895..b6be917334dd82 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -33,7 +33,7 @@ use { solana_core::{ banking_simulation::{BankingSimulator, BankingTraceEvents}, system_monitor_service::{SystemMonitorService, SystemMonitorStatsReportConfig}, - validator::{BlockProductionMethod, BlockVerificationMethod}, + validator::{BlockProductionMethod, BlockVerificationMethod, TransactionStructure}, }, solana_cost_model::{cost_model::CostModel, cost_tracker::CostTracker}, solana_feature_set::{self as feature_set, FeatureSet}, @@ -2492,14 +2492,18 @@ fn main() { BlockProductionMethod ) .unwrap_or_default(); + let transaction_struct = + value_t!(arg_matches, "transaction_struct", TransactionStructure) + .unwrap_or_default(); - info!("Using: block-production-method: {block_production_method}"); + info!("Using: block-production-method: {block_production_method} transaction-structure: {transaction_struct}"); match simulator.start( genesis_config, bank_forks, blockstore, block_production_method, + transaction_struct, ) { Ok(()) => println!("Ok"), Err(error) => { diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index e90475aad2a06f..50199be9e7172c 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -61,6 +61,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { banking_trace_dir_byte_limit: config.banking_trace_dir_byte_limit, block_verification_method: config.block_verification_method.clone(), block_production_method: config.block_production_method.clone(), + transaction_struct: config.transaction_struct.clone(), enable_block_production_forwarding: config.enable_block_production_forwarding, generator_config: config.generator_config.clone(), use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup, diff --git a/multinode-demo/bootstrap-validator.sh b/multinode-demo/bootstrap-validator.sh index d21ee1aaa8b73f..4285be6cac5dc1 100755 --- a/multinode-demo/bootstrap-validator.sh +++ b/multinode-demo/bootstrap-validator.sh @@ -112,6 +112,9 @@ while [[ -n $1 ]]; do elif [[ $1 == --block-production-method ]]; then args+=("$1" "$2") shift 2 + elif [[ $1 == --transaction-structure ]]; then + args+=("$1" "$2") + shift 2 elif [[ $1 == --wen-restart ]]; then args+=("$1" "$2") shift 2 diff --git a/multinode-demo/validator.sh b/multinode-demo/validator.sh index c97812c6cbb910..800b4ce9d136d2 100755 --- a/multinode-demo/validator.sh +++ b/multinode-demo/validator.sh @@ -182,6 +182,9 @@ while [[ -n $1 ]]; do elif [[ $1 == --block-production-method ]]; then args+=("$1" "$2") shift 2 + elif [[ $1 == --transaction-structure ]]; then + args+=("$1" "$2") + shift 2 elif [[ $1 == --wen-restart ]]; then args+=("$1" "$2") shift 2 diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 4f6eb8f1c6fd5f..9c0621fffde1bc 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5486,6 +5486,7 @@ name = "solana-core" version = "2.2.0" dependencies = [ "agave-banking-stage-ingress-types", + "agave-transaction-view", "ahash 0.8.11", "anyhow", "arrayvec", diff --git a/runtime/src/bank/fee_distribution.rs b/runtime/src/bank/fee_distribution.rs index e0be18d5e609fc..e90444053df51e 100644 --- a/runtime/src/bank/fee_distribution.rs +++ b/runtime/src/bank/fee_distribution.rs @@ -3,6 +3,7 @@ use { crate::bank::CollectorFeeDetails, log::{debug, warn}, solana_feature_set::{remove_rounding_in_fee_calculation, reward_full_priority_fee}, + solana_runtime_transaction::transaction_with_meta::TransactionWithMeta, solana_sdk::{ account::{ReadableAccount, WritableAccount}, fee::FeeBudgetLimits, @@ -10,7 +11,6 @@ use { reward_info::RewardInfo, reward_type::RewardType, system_program, - transaction::SanitizedTransaction, }, solana_svm_rent_collector::svm_rent_collector::SVMRentCollector, solana_vote::vote_account::VoteAccountsHashMap, @@ -73,7 +73,7 @@ impl Bank { pub fn calculate_reward_for_transaction( &self, - transaction: &SanitizedTransaction, + transaction: &impl TransactionWithMeta, fee_budget_limits: &FeeBudgetLimits, ) -> u64 { let fee_details = solana_fee::calculate_fee_details( diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index adf74f49a027c3..a5b494f5b4af1b 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -5337,6 +5337,7 @@ name = "solana-core" version = "2.2.0" dependencies = [ "agave-banking-stage-ingress-types", + "agave-transaction-view", "ahash 0.8.11", "anyhow", "arrayvec", diff --git a/svm/examples/Cargo.toml b/svm/examples/Cargo.toml index a5df7288e95576..5015d183ba2d13 100644 --- a/svm/examples/Cargo.toml +++ b/svm/examples/Cargo.toml @@ -1,9 +1,5 @@ [workspace] -members = [ - "json-rpc/client", - "json-rpc/server", - "paytube", -] +members = ["json-rpc/client", "json-rpc/server", "paytube"] resolver = "2" diff --git a/transaction-view/src/transaction_data.rs b/transaction-view/src/transaction_data.rs index 2bfe0c85ce0e55..323c085660fa85 100644 --- a/transaction-view/src/transaction_data.rs +++ b/transaction-view/src/transaction_data.rs @@ -10,3 +10,10 @@ impl TransactionData for &[u8] { self } } + +impl TransactionData for std::sync::Arc> { + #[inline] + fn data(&self) -> &[u8] { + self.as_ref() + } +} diff --git a/validator/src/cli.rs b/validator/src/cli.rs index fa8d439b095e3b..a54d8c7735245d 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -21,7 +21,7 @@ use { }, solana_core::{ banking_trace::{DirByteLimit, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT}, - validator::{BlockProductionMethod, BlockVerificationMethod}, + validator::{BlockProductionMethod, BlockVerificationMethod, TransactionStructure}, }, solana_faucet::faucet::{self, FAUCET_PORT}, solana_ledger::use_snapshot_archives_at_startup, @@ -1599,6 +1599,14 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .possible_values(BlockProductionMethod::cli_names()) .help(BlockProductionMethod::cli_message()), ) + .arg( + Arg::with_name("transaction_struct") + .long("transaction-structure") + .value_name("STRUCT") + .takes_value(true) + .possible_values(TransactionStructure::cli_names()) + .help(TransactionStructure::cli_message()), + ) .arg( Arg::with_name("unified_scheduler_handler_threads") .long("unified-scheduler-handler-threads") diff --git a/validator/src/main.rs b/validator/src/main.rs index 4af305a39da5c8..2b67cb2f2619bd 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -35,8 +35,9 @@ use { system_monitor_service::SystemMonitorService, tpu::DEFAULT_TPU_COALESCE, validator::{ - is_snapshot_config_valid, BlockProductionMethod, BlockVerificationMethod, Validator, - ValidatorConfig, ValidatorError, ValidatorStartProgress, ValidatorTpuConfig, + is_snapshot_config_valid, BlockProductionMethod, BlockVerificationMethod, + TransactionStructure, Validator, ValidatorConfig, ValidatorError, + ValidatorStartProgress, ValidatorTpuConfig, }, }, solana_gossip::{ @@ -1848,6 +1849,12 @@ pub fn main() { BlockProductionMethod ) .unwrap_or_default(); + validator_config.transaction_struct = value_t!( + matches, // comment to align formatting... + "transaction_struct", + TransactionStructure + ) + .unwrap_or_default(); validator_config.enable_block_production_forwarding = staked_nodes_overrides_path.is_some(); validator_config.unified_scheduler_handler_threads = value_t!(matches, "unified_scheduler_handler_threads", usize).ok();