From 323039ee897319b9a07fd1fdfd284c9d83fade5e Mon Sep 17 00:00:00 2001 From: Faycel Kouteib Date: Tue, 28 Jan 2025 18:26:12 -0800 Subject: [PATCH] Make transaction status service multi-threaded. (#4032) * Make TSS multi-threaded. * Switch to global rayon thread pool. * Change recv_timeout to try_recv with a sleep, and up sleep time to 50 msec if TSS receiver is empty. * Keep track of in-flight work and quiesce it before exiting the service. --- rpc/src/transaction_status_service.rs | 66 ++++++++++++++++++--------- 1 file changed, 44 insertions(+), 22 deletions(-) diff --git a/rpc/src/transaction_status_service.rs b/rpc/src/transaction_status_service.rs index 63dcdd1398461e..3dde0ca4824976 100644 --- a/rpc/src/transaction_status_service.rs +++ b/rpc/src/transaction_status_service.rs @@ -1,6 +1,6 @@ use { crate::transaction_notifier_interface::TransactionNotifierArc, - crossbeam_channel::{Receiver, RecvTimeoutError}, + crossbeam_channel::{Receiver, TryRecvError}, itertools::izip, solana_ledger::{ blockstore::{Blockstore, BlockstoreError}, @@ -12,10 +12,10 @@ use { }, std::{ sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, + atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, Arc, }, - thread::{self, Builder, JoinHandle}, + thread::{self, sleep, Builder, JoinHandle}, time::Duration, }, }; @@ -49,38 +49,60 @@ impl TransactionStatusService { .name("solTxStatusWrtr".to_string()) .spawn(move || { info!("TransactionStatusService has started"); + + let outstanding_thread_count = Arc::new(AtomicUsize::new(0)); loop { if exit.load(Ordering::Relaxed) { + // Wait for the outstanding worker threads to complete before + // joining the main thread and shutting down the service. + while outstanding_thread_count.load(Ordering::SeqCst) > 0 { + sleep(Duration::from_millis(1)); + } break; } - let message = match transaction_status_receiver_handle - .recv_timeout(Duration::from_secs(1)) - { + let message = match transaction_status_receiver_handle.try_recv() { Ok(message) => message, - Err(RecvTimeoutError::Disconnected) => { + Err(TryRecvError::Disconnected) => { break; } - Err(RecvTimeoutError::Timeout) => { + Err(TryRecvError::Empty) => { + // TSS is bandwidth sensitive at high TPS, but not necessarily + // latency sensitive. We use a global thread pool to handle + // bursts of work below. This sleep is intended to balance that + // out so other users of the pool can make progress while TSS + // builds up a backlog for the next burst. + sleep(Duration::from_millis(50)); continue; } }; - match Self::write_transaction_status_batch( - message, - &max_complete_transaction_status_slot, - enable_rpc_transaction_history, - transaction_notifier.clone(), - &blockstore, - enable_extended_tx_metadata_storage, - ) { - Ok(_) => {} - Err(err) => { - error!("TransactionStatusService stopping due to error: {err}"); - exit.store(true, Ordering::Relaxed); - break; + let max_complete_transaction_status_slot = + Arc::clone(&max_complete_transaction_status_slot); + let blockstore = Arc::clone(&blockstore); + let transaction_notifier = transaction_notifier.clone(); + let exit_clone = Arc::clone(&exit); + let outstanding_thread_count_handle = Arc::clone(&outstanding_thread_count); + + outstanding_thread_count.fetch_add(1, Ordering::Relaxed); + + rayon::spawn(move || { + match Self::write_transaction_status_batch( + message, + &max_complete_transaction_status_slot, + enable_rpc_transaction_history, + transaction_notifier, + &blockstore, + enable_extended_tx_metadata_storage, + ) { + Ok(_) => {} + Err(err) => { + error!("TransactionStatusService stopping due to error: {err}"); + exit_clone.store(true, Ordering::Relaxed); + } } - } + outstanding_thread_count_handle.fetch_sub(1, Ordering::Relaxed); + }); } info!("TransactionStatusService has stopped"); })