Skip to content

Commit

Permalink
remove lock on BankHashStat in accounts-db store
Browse files Browse the repository at this point in the history
  • Loading branch information
HaoranYi committed Nov 27, 2024
1 parent 81dd34a commit 20f4f8a
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 25 deletions.
46 changes: 21 additions & 25 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use {
},
accounts_cache::{AccountsCache, CachedAccount, SlotCache},
accounts_db::stats::{
AccountsStats, BankHashStats, CleanAccountsStats, FlushStats, PurgeStats,
ShrinkAncientStats, ShrinkStats, ShrinkStatsSub, StoreAccountsTiming,
AccountsStats, AtomicBankHashStats, BankHashStats, CleanAccountsStats, FlushStats,
PurgeStats, ShrinkAncientStats, ShrinkStats, ShrinkStatsSub, StoreAccountsTiming,
},
accounts_file::{
AccountsFile, AccountsFileError, AccountsFileProvider, MatchAccountOwnerError,
Expand Down Expand Up @@ -1533,7 +1533,8 @@ pub struct AccountsDb {

pub thread_pool_hash: ThreadPool,

bank_hash_stats: Mutex<HashMap<Slot, BankHashStats>>,
bank_hash_stats: DashMap<Slot, AtomicBankHashStats>,

accounts_delta_hashes: Mutex<HashMap<Slot, AccountsDeltaHash>>,
accounts_hashes: Mutex<HashMap<Slot, (AccountsHash, /*capitalization*/ u64)>>,
incremental_accounts_hashes:
Expand Down Expand Up @@ -1979,7 +1980,8 @@ impl AccountsDb {
Self::DEFAULT_MAX_READ_ONLY_CACHE_DATA_SIZE_HI,
));

let bank_hash_stats = Mutex::new(HashMap::from([(0, BankHashStats::default())]));
let bank_hash_stats =
DashMap::from_iter(std::iter::once((0, AtomicBankHashStats::default())));

// Increase the stack for foreground threads
// rayon needs a lot of stack
Expand Down Expand Up @@ -4553,12 +4555,11 @@ impl AccountsDb {
dropped_roots: impl Iterator<Item = Slot>,
) {
let mut accounts_delta_hashes = self.accounts_delta_hashes.lock().unwrap();
let mut bank_hash_stats = self.bank_hash_stats.lock().unwrap();

dropped_roots.for_each(|slot| {
self.accounts_index.clean_dead_slot(slot);
accounts_delta_hashes.remove(&slot);
bank_hash_stats.remove(&slot);
self.bank_hash_stats.remove(&slot);
// the storage has been removed from this slot and recycled or dropped
assert!(self.storage.remove(&slot, false).is_none());
debug_assert!(
Expand Down Expand Up @@ -4974,15 +4975,15 @@ impl AccountsDb {
///
/// This fn is called when creating a new bank from parent.
pub fn insert_default_bank_hash_stats(&self, slot: Slot, parent_slot: Slot) {
let mut bank_hash_stats = self.bank_hash_stats.lock().unwrap();
if bank_hash_stats.get(&slot).is_some() {
if self.bank_hash_stats.get(&slot).is_some() {
error!(
"set_hash: already exists; multiple forks with shared slot {slot} as child \
(parent: {parent_slot})!?"
);
return;
}
bank_hash_stats.insert(slot, BankHashStats::default());
self.bank_hash_stats
.insert(slot, AtomicBankHashStats::default());
}

pub fn load(
Expand Down Expand Up @@ -7665,14 +7666,16 @@ impl AccountsDb {
slot: Slot,
stats: BankHashStats,
) -> Option<BankHashStats> {
let mut bank_hash_stats = self.bank_hash_stats.lock().unwrap();
bank_hash_stats.remove(&0);
bank_hash_stats.insert(slot, stats)
self.bank_hash_stats.remove(&0);

self.bank_hash_stats
.insert(slot, AtomicBankHashStats::new(&stats))
.map(|old_stat| old_stat.load())
}

/// Get the bank hash stats for `slot` in the `bank_hash_stats` map
pub fn get_bank_hash_stats(&self, slot: Slot) -> Option<BankHashStats> {
self.bank_hash_stats.lock().unwrap().get(&slot).cloned()
self.bank_hash_stats.get(&slot).map(|stats| stats.load())
}

fn update_index<'a>(
Expand Down Expand Up @@ -7896,13 +7899,11 @@ impl AccountsDb {
);

let mut accounts_delta_hashes = self.accounts_delta_hashes.lock().unwrap();
let mut bank_hash_stats = self.bank_hash_stats.lock().unwrap();
for slot in dead_slots_iter {
accounts_delta_hashes.remove(slot);
bank_hash_stats.remove(slot);
self.bank_hash_stats.remove(slot);
}
drop(accounts_delta_hashes);
drop(bank_hash_stats);

measure.stop();
inc_new_counter_info!("remove_dead_slots_metadata-ms", measure.as_ms() as usize);
Expand Down Expand Up @@ -8145,15 +8146,10 @@ impl AccountsDb {
.store_total_data
.fetch_add(total_data as u64, Ordering::Relaxed);

{
// we need to drop the bank_hash_stats lock to prevent deadlocks
self.bank_hash_stats
.lock()
.unwrap()
.entry(accounts.target_slot())
.or_default()
.accumulate(&stats);
}
self.bank_hash_stats
.entry(accounts.target_slot())
.or_default()
.accumulate(&stats);

// we use default hashes for now since the same account may be stored to the cache multiple times
self.store_accounts_unfrozen(
Expand Down
44 changes: 44 additions & 0 deletions accounts-db/src/accounts_db/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,50 @@ impl BankHashStats {
}
}

#[derive(Debug, Default)]
pub struct AtomicBankHashStats {
pub num_updated_accounts: AtomicU64,
pub num_removed_accounts: AtomicU64,
pub num_lamports_stored: AtomicU64,
pub total_data_len: AtomicU64,
pub num_executable_accounts: AtomicU64,
}

impl AtomicBankHashStats {
pub fn new(stat: &BankHashStats) -> Self {
AtomicBankHashStats {
num_updated_accounts: AtomicU64::new(stat.num_updated_accounts),
num_removed_accounts: AtomicU64::new(stat.num_removed_accounts),
num_lamports_stored: AtomicU64::new(stat.num_lamports_stored),
total_data_len: AtomicU64::new(stat.total_data_len),
num_executable_accounts: AtomicU64::new(stat.num_executable_accounts),
}
}

pub fn accumulate(&mut self, other: &BankHashStats) {
self.num_updated_accounts
.fetch_add(other.num_updated_accounts, Ordering::Relaxed);
self.num_removed_accounts
.fetch_add(other.num_removed_accounts, Ordering::Relaxed);
self.total_data_len
.fetch_add(other.total_data_len, Ordering::Relaxed);
self.num_lamports_stored
.fetch_add(other.num_lamports_stored, Ordering::Relaxed);
self.num_executable_accounts
.fetch_add(other.num_executable_accounts, Ordering::Relaxed);
}

pub fn load(&self) -> BankHashStats {
BankHashStats {
num_updated_accounts: self.num_updated_accounts.load(Ordering::Relaxed),
num_removed_accounts: self.num_removed_accounts.load(Ordering::Relaxed),
num_lamports_stored: self.num_lamports_stored.load(Ordering::Relaxed),
total_data_len: self.total_data_len.load(Ordering::Relaxed),
num_executable_accounts: self.num_executable_accounts.load(Ordering::Relaxed),
}
}
}

#[derive(Debug, Default)]
pub struct AccountsStats {
pub delta_hash_scan_time_total_us: AtomicU64,
Expand Down

0 comments on commit 20f4f8a

Please sign in to comment.