Skip to content

Commit

Permalink
Fix task leaks (#2336)
Browse files Browse the repository at this point in the history
  • Loading branch information
elmattic authored Jan 6, 2023
1 parent 9e1f6b4 commit bfd3009
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 85 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,4 @@ lto = "off"
[profile.release]
# https://doc.rust-lang.org/cargo/reference/profiles.html#strip
strip = true
panic = "abort"
8 changes: 4 additions & 4 deletions blockchain/chain/src/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ where
DB: Clone,
{
let (publisher, _) = broadcast::channel(SINK_CAP);
let ts_cache = Arc::new(RwLock::new(LruCache::new(DEFAULT_TIPSET_CACHE_SIZE)));
let ts_cache = Arc::new(Mutex::new(LruCache::new(DEFAULT_TIPSET_CACHE_SIZE)));
let cs = Self {
publisher,
// subscriptions: Default::default(),
Expand Down Expand Up @@ -596,7 +596,7 @@ where
}
}

pub(crate) type TipsetCache = RwLock<LruCache<TipsetKeys, Arc<Tipset>>>;
pub(crate) type TipsetCache = Mutex<LruCache<TipsetKeys, Arc<Tipset>>>;

/// Loads a tipset from memory given the tipset keys and cache.
pub(crate) async fn tipset_from_keys<BS>(
Expand All @@ -607,7 +607,7 @@ pub(crate) async fn tipset_from_keys<BS>(
where
BS: Blockstore,
{
if let Some(ts) = cache.write().await.get(tsk) {
if let Some(ts) = cache.lock().await.get(tsk) {
metrics::LRU_CACHE_HIT
.with_label_values(&[metrics::values::TIPSET])
.inc();
Expand All @@ -627,7 +627,7 @@ where

// construct new Tipset to return
let ts = Arc::new(Tipset::new(block_headers)?);
cache.write().await.put(tsk.clone(), ts.clone());
cache.lock().await.put(tsk.clone(), ts.clone());
metrics::LRU_CACHE_MISS
.with_label_values(&[metrics::values::TIPSET])
.inc();
Expand Down
14 changes: 8 additions & 6 deletions blockchain/chain/src/store/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use fvm_ipld_blockstore::Blockstore;
use fvm_shared::clock::ChainEpoch;
use log::info;
use lru::LruCache;
use std::{num::NonZeroUsize, sync::Arc};
use tokio::sync::RwLock;
use std::num::NonZeroUsize;
use std::sync::Arc;
use tokio::sync::Mutex;

const DEFAULT_CHAIN_INDEX_CACHE_SIZE: NonZeroUsize =
forest_utils::const_option!(NonZeroUsize::new(32 << 10));
Expand Down Expand Up @@ -133,7 +134,7 @@ pub(crate) struct LookbackEntry {
/// at the chain to retrieve an old tipset.
pub(crate) struct ChainIndex<BS> {
/// Cache of look-back entries to speed up lookup.
skip_cache: RwLock<LruCache<TipsetKeys, Arc<LookbackEntry>>>,
skip_cache: Mutex<LruCache<TipsetKeys, Arc<LookbackEntry>>>,

/// `Arc` reference tipset cache.
ts_cache: Arc<TipsetCache>,
Expand All @@ -145,7 +146,7 @@ pub(crate) struct ChainIndex<BS> {
impl<BS: Blockstore> ChainIndex<BS> {
pub(crate) fn new(ts_cache: Arc<TipsetCache>, db: BS) -> Self {
Self {
skip_cache: RwLock::new(LruCache::new(DEFAULT_CHAIN_INDEX_CACHE_SIZE)),
skip_cache: Mutex::new(LruCache::new(DEFAULT_CHAIN_INDEX_CACHE_SIZE)),
ts_cache,
db,
}
Expand Down Expand Up @@ -173,10 +174,11 @@ impl<BS: Blockstore> ChainIndex<BS> {
let rounded = self.round_down(from).await?;

let mut cur = rounded.key().clone();

const MAX_COUNT: usize = 100;
let mut counter = 0;
loop {
let entry = self.skip_cache.write().await.get(&cur).cloned();
let entry = self.skip_cache.lock().await.get(&cur).cloned();
let lbe = if let Some(cached) = entry {
metrics::LRU_CACHE_HIT
.with_label_values(&[metrics::values::SKIP])
Expand Down Expand Up @@ -265,7 +267,7 @@ impl<BS: Blockstore> ChainIndex<BS> {
target: skip_target.key().clone(),
});

self.skip_cache.write().await.put(tsk.clone(), lbe.clone());
self.skip_cache.lock().await.put(tsk.clone(), lbe.clone());
Ok(lbe)
}

Expand Down
2 changes: 1 addition & 1 deletion blockchain/chain_sync/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ lazy_static! {
.expect("Defining the invalid_tipset_total metric must succeed"),
);
prometheus::default_registry().register(invalid_tipset_total.clone()).expect(
"Registering the invalid_tispet_total metric with the metrics registry must succeed"
"Registering the invalid_tipset_total metric with the metrics registry must succeed"
);
invalid_tipset_total
};
Expand Down
25 changes: 13 additions & 12 deletions blockchain/chain_sync/src/tipset_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1114,18 +1114,19 @@ async fn sync_messages_check_state<
// Validation loop
while let Ok(full_tipset) = r.recv_async().await {
let current_epoch = full_tipset.epoch();
let timer = metrics::TIPSET_PROCESSING_TIME.start_timer();
validate_tipset::<_, C>(
consensus.clone(),
state_manager.clone(),
&chainstore,
bad_block_cache,
full_tipset,
genesis,
invalid_block_strategy,
)
.await?;
timer.observe_duration();
{
let _timer = metrics::TIPSET_PROCESSING_TIME.start_timer();
validate_tipset::<_, C>(
consensus.clone(),
state_manager.clone(),
&chainstore,
bad_block_cache,
full_tipset,
genesis,
invalid_block_strategy,
)
.await?;
}
tracker.write().await.set_epoch(current_epoch);
metrics::LAST_VALIDATED_TIPSET_EPOCH.set(current_epoch as u64);
}
Expand Down
205 changes: 143 additions & 62 deletions blockchain/state_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use tokio::runtime::Handle;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::Mutex as TokioMutex;
use tokio::sync::RwLock;
use tracing::{debug, error, info, instrument, trace, warn};
use vm_circ_supply::GenesisInfo;
Expand All @@ -54,6 +56,120 @@ const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize =
/// Intermediary for retrieving state objects and updating actor states.
type CidPair = (Cid, Cid);

// Various structures for implementing the tipset state cache

struct TipsetStateCacheInner {
values: LruCache<TipsetKeys, CidPair>,
pending: Vec<(TipsetKeys, Arc<TokioMutex<()>>)>,
}

impl Default for TipsetStateCacheInner {
fn default() -> Self {
Self {
values: LruCache::new(DEFAULT_TIPSET_CACHE_SIZE),
pending: Vec::with_capacity(8),
}
}
}

struct TipsetStateCache {
cache: Arc<StdMutex<TipsetStateCacheInner>>,
}

enum Status {
Done(CidPair),
Empty(Arc<TokioMutex<()>>),
}

impl TipsetStateCache {
pub fn new() -> Self {
Self {
cache: Arc::new(StdMutex::new(TipsetStateCacheInner::default())),
}
}

fn with_inner<F, T>(&self, func: F) -> T
where
F: FnOnce(&mut TipsetStateCacheInner) -> T,
{
let mut lock = self.cache.lock().unwrap();
func(&mut lock)
}

pub async fn get_or_else<F, Fut>(&self, key: &TipsetKeys, compute: F) -> anyhow::Result<CidPair>
where
F: Fn() -> Fut,
Fut: core::future::Future<Output = anyhow::Result<CidPair>>,
{
let status = self.with_inner(|inner| match inner.values.get(key) {
Some(v) => Status::Done(*v),
None => {
let option = inner
.pending
.iter()
.find(|(k, _)| k == key)
.map(|(_, mutex)| mutex);
match option {
Some(mutex) => Status::Empty(mutex.clone()),
None => {
let mutex = Arc::new(TokioMutex::new(()));
inner.pending.push((key.clone(), mutex.clone()));
Status::Empty(mutex)
}
}
}
});
match status {
Status::Done(x) => {
forest_metrics::metrics::LRU_CACHE_HIT
.with_label_values(&[forest_metrics::metrics::values::STATE_MANAGER_TIPSET])
.inc();
Ok(x)
}
Status::Empty(mtx) => {
let _guard = mtx.lock().await;
match self.get(key) {
Some(v) => {
// While locking someone else computed the pending task
forest_metrics::metrics::LRU_CACHE_HIT
.with_label_values(&[
forest_metrics::metrics::values::STATE_MANAGER_TIPSET,
])
.inc();

Ok(v)
}
None => {
// Entry does not have state computed yet, compute value and fill the cache
forest_metrics::metrics::LRU_CACHE_MISS
.with_label_values(&[
forest_metrics::metrics::values::STATE_MANAGER_TIPSET,
])
.inc();

let cid_pair = compute().await?;

// Write back to cache, release lock and return value
self.insert(key.clone(), cid_pair);
Ok(cid_pair)
}
}
}
}
}

fn get(&self, key: &TipsetKeys) -> Option<CidPair> {
self.with_inner(|inner| inner.values.get(key).copied())
}

fn insert(&self, key: TipsetKeys, value: CidPair) {
self.with_inner(|inner| {
inner.pending.retain(|(k, _)| k != &key);
inner.values.put(key, value);
});
}
}

/// Type to represent invocation of state call results.
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
Expand Down Expand Up @@ -84,9 +200,7 @@ pub struct StateManager<DB> {
cs: Arc<ChainStore<DB>>,

/// This is a cache which indexes tipsets to their calculated state.
/// The calculated state is wrapped in a mutex to avoid duplicate computation
/// of the state/receipt root.
cache: RwLock<LruCache<TipsetKeys, Arc<once_cell::sync::OnceCell<CidPair>>>>,
cache: TipsetStateCache,
genesis_info: GenesisInfo,
beacon: Arc<forest_beacon::BeaconSchedule<DrandBeacon>>,
chain_config: Arc<ChainConfig>,
Expand All @@ -112,7 +226,7 @@ where

Ok(Self {
cs,
cache: RwLock::new(LruCache::new(DEFAULT_TIPSET_CACHE_SIZE)),
cache: TipsetStateCache::new(),
genesis_info: GenesisInfo::from_chain_config(&chain_config),
beacon,
chain_config,
Expand Down Expand Up @@ -313,64 +427,31 @@ where
/// not to be computed twice.
#[instrument(skip(self))]
pub async fn tipset_state(self: &Arc<Self>, tipset: &Arc<Tipset>) -> anyhow::Result<CidPair> {
// Get entry in cache, if it exists.
// Arc is cloned here to avoid holding the entire cache lock until function ends.
// (tasks should be able to compute different tipset state's in parallel)
//
// In the case of task `A` computing the same tipset as task `B`, `A` will hold the
// mutex until the value is updated, which task `B` will await.
//
// If two tasks are computing different tipset states, they will only block computation
// when accessing/initializing the entry in cache, not during the whole tipset calc.

// first try reading cache
if let Some(entry) = self.cache.write().await.get(tipset.key()) {
trace!("hit cache for tipset {:?}", tipset.cids());
forest_metrics::metrics::LRU_CACHE_HIT
.with_label_values(&[forest_metrics::metrics::values::STATE_MANAGER_TIPSET])
.inc();
return Ok(*entry.wait());
}

// write an empty `OnceCell` when cache not hit
let cache_entry = Arc::new(once_cell::sync::OnceCell::new());
{
self.cache
.write()
.await
.push(tipset.key().clone(), cache_entry.clone());
}

// Entry does not have state computed yet, this task will fill entry if successful.
debug!("calculating tipset state {:?}", tipset.cids());

let cid_pair = if tipset.epoch() == 0 {
// NB: This is here because the process that executes blocks requires that the
// block miner reference a valid miner in the state tree. Unless we create some
// magical genesis miner, this won't work properly, so we short circuit here
// This avoids the question of 'who gets paid the genesis block reward'
let message_receipts = tipset
.blocks()
.first()
.ok_or_else(|| Error::Other("Could not get message receipts".to_string()))?;

(*tipset.parent_state(), *message_receipts.message_receipts())
} else {
// generic constants are not implemented yet this is a lowcost method for now
let no_func = None::<fn(&Cid, &ChainMessage, &ApplyRet) -> Result<(), anyhow::Error>>;
let ts_state = self.compute_tipset_state(tipset, no_func).await?;
debug!("completed tipset state calculation {:?}", tipset.cids());
ts_state
};

// Fill entry with calculated cid pair
if let Err(e) = cache_entry.set(cid_pair) {
error!("Fail to set tipset_state cache: {}, {}", e.0, e.1);
}
forest_metrics::metrics::LRU_CACHE_MISS
.with_label_values(&[forest_metrics::metrics::values::STATE_MANAGER_TIPSET])
.inc();
Ok(cid_pair)
let key = tipset.key();
self.cache
.get_or_else(key, || async move {
let cid_pair = if tipset.epoch() == 0 {
// NB: This is here because the process that executes blocks requires that the
// block miner reference a valid miner in the state tree. Unless we create some
// magical genesis miner, this won't work properly, so we short circuit here
// This avoids the question of 'who gets paid the genesis block reward'
let message_receipts = tipset.blocks().first().ok_or_else(|| {
Error::Other("Could not get message receipts".to_string())
})?;

(*tipset.parent_state(), *message_receipts.message_receipts())
} else {
// generic constants are not implemented yet this is a lowcost method for now
let no_func =
None::<fn(&Cid, &ChainMessage, &ApplyRet) -> Result<(), anyhow::Error>>;
let ts_state = self.compute_tipset_state(tipset, no_func).await?;
debug!("Completed tipset state calculation {:?}", tipset.cids());
ts_state
};

Ok(cid_pair)
})
.await
}

#[instrument(skip(self, rand))]
Expand Down

0 comments on commit bfd3009

Please sign in to comment.