Skip to content

Commit

Permalink
v2.1: Refactors get_snapshot_storages() (backport of #3760) (#3785)
Browse files Browse the repository at this point in the history
Refactors get_snapshot_storages() (#3760)

(cherry picked from commit 8c7ae80)

Co-authored-by: Brooks <[email protected]>
  • Loading branch information
2 people authored and KirillLykov committed Dec 9, 2024
1 parent 7f4ccd5 commit 719c3d3
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 37 deletions.
77 changes: 76 additions & 1 deletion accounts-db/src/account_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,29 @@ impl AccountStorage {
pub(crate) fn len(&self) -> usize {
self.map.len()
}

/// Returns the (slot, storage) tuples where `predicate` returns `true`
///
/// This function is useful when not all storages are desired,
/// as storages are only Arc::cloned if they pass the predicate.
///
/// # Panics
///
/// Panics if `shrink` is in progress.
pub fn get_if(
&self,
predicate: impl Fn(&Slot, &AccountStorageEntry) -> bool,
) -> Box<[(Slot, Arc<AccountStorageEntry>)]> {
assert!(self.no_shrink_in_progress());
self.map
.iter()
.filter_map(|entry| {
let slot = entry.key();
let storage = &entry.value().storage;
predicate(slot, storage).then(|| (*slot, Arc::clone(storage)))
})
.collect()
}
}

/// iterate contents of AccountStorage without exposing internals
Expand Down Expand Up @@ -291,7 +314,11 @@ impl Default for AccountStorageStatus {

#[cfg(test)]
pub(crate) mod tests {
use {super::*, crate::accounts_file::AccountsFileProvider, std::path::Path};
use {
super::*,
crate::accounts_file::AccountsFileProvider,
std::{iter, path::Path},
};

#[test]
fn test_shrink_in_progress() {
Expand Down Expand Up @@ -568,4 +595,52 @@ pub(crate) mod tests {
.is_none());
assert!(storage.get_account_storage_entry(slot, id).is_some());
}

#[test]
fn test_get_if() {
let storage = AccountStorage::default();
assert!(storage.get_if(|_, _| true).is_empty());

// add some entries
let ids = [123, 456, 789];
for id in ids {
let slot = id as Slot;
let entry = AccountStorageEntry::new(
Path::new(""),
slot,
id,
5000,
AccountsFileProvider::AppendVec,
);
storage.map.insert(
slot,
AccountStorageReference {
id,
storage: entry.into(),
},
);
}

// look 'em up
for id in ids {
let found = storage.get_if(|slot, _| *slot == id as Slot);
assert!(found
.iter()
.map(|(slot, _)| *slot)
.eq(iter::once(id as Slot)));
}

assert!(storage.get_if(|_, _| false).is_empty());
assert_eq!(storage.get_if(|_, _| true).len(), ids.len());
}

#[test]
#[should_panic(expected = "self.no_shrink_in_progress()")]
fn test_get_if_fail() {
let storage = AccountStorage::default();
storage
.shrink_in_progress_map
.insert(0, storage.get_test_storage());
storage.get_if(|_, _| true);
}
}
55 changes: 19 additions & 36 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8281,44 +8281,27 @@ impl AccountsDb {
&self,
requested_slots: impl RangeBounds<Slot> + Sync,
) -> (Vec<Arc<AccountStorageEntry>>, Vec<Slot>) {
let mut m = Measure::start("get slots");
let mut slots_and_storages = self
let start = Instant::now();
let max_alive_root_exclusive = self
.accounts_index
.roots_tracker
.read()
.unwrap()
.alive_roots
.max_exclusive();
let (slots, storages) = self
.storage
.iter()
.filter_map(|(slot, store)| {
requested_slots
.contains(&slot)
.then_some((slot, Some(store)))
.get_if(|slot, storage| {
(*slot < max_alive_root_exclusive)
&& requested_slots.contains(slot)
&& storage.has_accounts()
})
.collect::<Vec<_>>();
m.stop();
let mut m2 = Measure::start("filter");
let chunk_size = 5_000;
let (result, slots): (Vec<_>, Vec<_>) = self.thread_pool_clean.install(|| {
slots_and_storages
.par_chunks_mut(chunk_size)
.map(|slots_and_storages| {
slots_and_storages
.iter_mut()
.filter(|(slot, _)| self.accounts_index.is_alive_root(*slot))
.filter_map(|(slot, store)| {
let store = std::mem::take(store).unwrap();
store.has_accounts().then_some((store, *slot))
})
.collect::<Vec<(Arc<AccountStorageEntry>, Slot)>>()
})
.flatten()
.unzip()
});

m2.stop();

debug!(
"hash_total: get slots: {}, filter: {}",
m.as_us(),
m2.as_us(),
);
(result, slots)
.into_vec()
.into_iter()
.unzip();
let duration = start.elapsed();
debug!("get_snapshot_storages: {duration:?}");
(storages, slots)
}

/// Returns the latest full snapshot slot
Expand Down

0 comments on commit 719c3d3

Please sign in to comment.