Skip to content

Commit

Permalink
Make transaction status service multi-threaded. (#4032)
Browse files Browse the repository at this point in the history
* Make TSS multi-threaded.
* Switch to global rayon thread pool.
* Change recv_timeout to try_recv with a sleep, and up sleep time to 50 msec if TSS receiver is empty.
* Keep track of in-flight work and quiesce it before exiting the service.
  • Loading branch information
fkouteib authored Jan 29, 2025
1 parent 0d68275 commit 323039e
Showing 1 changed file with 44 additions and 22 deletions.
66 changes: 44 additions & 22 deletions rpc/src/transaction_status_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::transaction_notifier_interface::TransactionNotifierArc,
crossbeam_channel::{Receiver, RecvTimeoutError},
crossbeam_channel::{Receiver, TryRecvError},
itertools::izip,
solana_ledger::{
blockstore::{Blockstore, BlockstoreError},
Expand All @@ -12,10 +12,10 @@ use {
},
std::{
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc,
},
thread::{self, Builder, JoinHandle},
thread::{self, sleep, Builder, JoinHandle},
time::Duration,
},
};
Expand Down Expand Up @@ -49,38 +49,60 @@ impl TransactionStatusService {
.name("solTxStatusWrtr".to_string())
.spawn(move || {
info!("TransactionStatusService has started");

let outstanding_thread_count = Arc::new(AtomicUsize::new(0));
loop {
if exit.load(Ordering::Relaxed) {
// Wait for the outstanding worker threads to complete before
// joining the main thread and shutting down the service.
while outstanding_thread_count.load(Ordering::SeqCst) > 0 {
sleep(Duration::from_millis(1));
}
break;
}

let message = match transaction_status_receiver_handle
.recv_timeout(Duration::from_secs(1))
{
let message = match transaction_status_receiver_handle.try_recv() {
Ok(message) => message,
Err(RecvTimeoutError::Disconnected) => {
Err(TryRecvError::Disconnected) => {
break;
}
Err(RecvTimeoutError::Timeout) => {
Err(TryRecvError::Empty) => {
// TSS is bandwidth sensitive at high TPS, but not necessarily
// latency sensitive. We use a global thread pool to handle
// bursts of work below. This sleep is intended to balance that
// out so other users of the pool can make progress while TSS
// builds up a backlog for the next burst.
sleep(Duration::from_millis(50));
continue;
}
};

match Self::write_transaction_status_batch(
message,
&max_complete_transaction_status_slot,
enable_rpc_transaction_history,
transaction_notifier.clone(),
&blockstore,
enable_extended_tx_metadata_storage,
) {
Ok(_) => {}
Err(err) => {
error!("TransactionStatusService stopping due to error: {err}");
exit.store(true, Ordering::Relaxed);
break;
let max_complete_transaction_status_slot =
Arc::clone(&max_complete_transaction_status_slot);
let blockstore = Arc::clone(&blockstore);
let transaction_notifier = transaction_notifier.clone();
let exit_clone = Arc::clone(&exit);
let outstanding_thread_count_handle = Arc::clone(&outstanding_thread_count);

outstanding_thread_count.fetch_add(1, Ordering::Relaxed);

rayon::spawn(move || {
match Self::write_transaction_status_batch(
message,
&max_complete_transaction_status_slot,
enable_rpc_transaction_history,
transaction_notifier,
&blockstore,
enable_extended_tx_metadata_storage,
) {
Ok(_) => {}
Err(err) => {
error!("TransactionStatusService stopping due to error: {err}");
exit_clone.store(true, Ordering::Relaxed);
}
}
}
outstanding_thread_count_handle.fetch_sub(1, Ordering::Relaxed);
});
}
info!("TransactionStatusService has stopped");
})
Expand Down

0 comments on commit 323039e

Please sign in to comment.