Skip to content

Commit

Permalink
Fixes for caching logic
Browse files Browse the repository at this point in the history
  • Loading branch information
mystenmark committed May 30, 2024
1 parent dfaa739 commit 8927d9e
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ impl Scenario {
println!("running with cache eviction after step {}", i);
let count = Arc::new(AtomicU32::new(0));
let action = Box::new(|s: &mut Scenario| {
println!("evict_caches()");
s.evict_caches();
});
let fut = f(Scenario::new(Some((i, action)), count.clone()).await);
Expand Down Expand Up @@ -790,7 +791,8 @@ async fn test_lt_or_eq_caching() {
.get(&s.obj_id(1))
.unwrap()
.lock()
.0
.version()
.unwrap()
.value(),
5
);
Expand All @@ -807,6 +809,46 @@ async fn test_lt_or_eq_caching() {
.await;
}

#[tokio::test]
async fn test_lt_or_eq_with_cached_tombstone() {
telemetry_subscribers::init_for_testing();
Scenario::iterate(|mut s| async move {
// make an object, and a tombstone
s.with_created(&[1]);
let tx1 = s.do_tx().await;
s.with_deleted(&[1]);
let tx2 = s.do_tx().await;
s.commit(tx1).await.unwrap();
s.commit(tx2).await.unwrap();

s.reset_cache();

let check_version = |lookup_version: u64, expected_version: Option<u64>| {
let lookup_version = SequenceNumber::from_u64(lookup_version);
assert_eq!(
s.cache()
.find_object_lt_or_eq_version(s.obj_id(1), lookup_version)
.unwrap()
.map(|v| v.version()),
expected_version.map(SequenceNumber::from_u64)
);
};

// latest object not yet cached
assert!(!s.cache.cached.object_by_id_cache.contains_key(&s.obj_id(1)));

// version 2 is deleted
check_version(2, None);

// checking the version pulled the tombstone into the cache
assert!(s.cache.cached.object_by_id_cache.contains_key(&s.obj_id(1)));

// version 1 is still found, tombstone in cache is ignored
check_version(1, Some(1));
})
.await;
}

#[tokio::test]
async fn test_write_transaction_outputs_is_sync() {
telemetry_subscribers::init_for_testing();
Expand Down Expand Up @@ -1146,7 +1188,7 @@ async fn latest_object_cache_race_test() {
.cached
.object_by_id_cache
.get(&object_id)
.map(|e| e.lock().0)
.and_then(|e| e.lock().version())
else {
continue;
};
Expand All @@ -1159,7 +1201,10 @@ async fn latest_object_cache_race_test() {
let object =
Object::with_id_owner_version_for_testing(object_id, latest_version, owner);

cache.cache_latest_object_by_id(&object_id, latest_version, object.into());
cache.cache_latest_object_by_id(
&object_id,
LatestObjectCacheEntry::Object(latest_version, object.into()),
);
}
})
};
Expand All @@ -1176,7 +1221,7 @@ async fn latest_object_cache_race_test() {
.cached
.object_by_id_cache
.get(&object_id)
.map(|e| e.lock().0)
.and_then(|e| e.lock().version())
else {
continue;
};
Expand Down
170 changes: 118 additions & 52 deletions crates/sui-core/src/execution_cache/writeback_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,6 @@ enum ObjectEntry {
Wrapped,
}

impl ObjectEntry {
fn is_live(&self) -> bool {
matches!(self, ObjectEntry::Object(_))
}
}

#[cfg(test)]
impl ObjectEntry {
fn unwrap_object(&self) -> &Object {
Expand Down Expand Up @@ -131,6 +125,49 @@ impl From<Object> for ObjectEntry {
}
}

impl From<ObjectOrTombstone> for ObjectEntry {
fn from(object: ObjectOrTombstone) -> Self {
match object {
ObjectOrTombstone::Object(o) => o.into(),
ObjectOrTombstone::Tombstone(obj_ref) => {
if obj_ref.2.is_deleted() {
ObjectEntry::Deleted
} else if obj_ref.2.is_wrapped() {
ObjectEntry::Wrapped
} else {
panic!("tombstone digest must either be deleted or wrapped");
}
}
}
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
enum LatestObjectCacheEntry {
Object(SequenceNumber, ObjectEntry),
NonExistent,
}

impl LatestObjectCacheEntry {
fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
match (self, other) {
(LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
v1 > v2
}
(LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
_ => false,
}
}

#[cfg(test)]
fn version(&self) -> Option<SequenceNumber> {
match self {
LatestObjectCacheEntry::Object(version, _) => Some(*version),
LatestObjectCacheEntry::NonExistent => None,
}
}
}

type MarkerKey = (EpochId, ObjectID);

enum CacheResult<T> {
Expand Down Expand Up @@ -230,7 +267,7 @@ struct CachedCommittedData {
// We cannot simply insert objects that we read off the disk into `object_cache`,
// since that may violate the no-missing-versions property.
// `object_by_id_cache` is also written to on writes so that it is always coherent.
object_by_id_cache: MokaCache<ObjectID, Arc<Mutex<(SequenceNumber, ObjectEntry)>>>,
object_by_id_cache: MokaCache<ObjectID, Arc<Mutex<LatestObjectCacheEntry>>>,

// See module level comment for an explanation of caching strategy.
marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
Expand Down Expand Up @@ -419,9 +456,10 @@ impl WritebackCache {
.entry(*object_id)
.or_default()
.insert(version, object.clone());
self.cached
.object_by_id_cache
.insert(*object_id, Arc::new(Mutex::new((version, object))));
self.cached.object_by_id_cache.insert(
*object_id,
Arc::new(Mutex::new(LatestObjectCacheEntry::Object(version, object))),
);
}

async fn write_marker_value(
Expand Down Expand Up @@ -510,12 +548,11 @@ impl WritebackCache {
) -> CacheResult<(SequenceNumber, ObjectEntry)> {
if let Some(entry) = self.cached.object_by_id_cache.get(object_id) {
let entry = entry.lock();
let (latest_version, latest_object) = &*entry;
if latest_object.is_live() {
return CacheResult::Hit((*latest_version, latest_object.clone()));
} else {
assert_eq!(*latest_version, SequenceNumber::from_u64(0));
return CacheResult::NegativeHit;
match &*entry {
LatestObjectCacheEntry::Object(latest_version, latest_object) => {
return CacheResult::Hit((*latest_version, latest_object.clone()))
}
LatestObjectCacheEntry::NonExistent => return CacheResult::NegativeHit,
}
}

Expand Down Expand Up @@ -904,19 +941,15 @@ impl WritebackCache {
// If there are racing calls to this function, it is guaranteed that after a call
// has returned, reads from that thread will not observe a lower version than the
// one they inserted
fn cache_latest_object_by_id(
&self,
object_id: &ObjectID,
version: SequenceNumber,
object: ObjectEntry,
) {
fn cache_latest_object_by_id(&self, object_id: &ObjectID, object: LatestObjectCacheEntry) {
trace!("caching object by id: {:?} {:?}", object_id, object);
// Warning: tricky code!
let entry = self
.cached
.object_by_id_cache
.entry(*object_id)
// only one racing insert will call the closure
.or_insert_with(|| Arc::new(Mutex::new((version, object.clone()))));
.or_insert_with(|| Arc::new(Mutex::new(object.clone())));

// We may be racing with another thread that observed an older version of the object
if !entry.is_fresh() {
Expand All @@ -935,21 +968,18 @@ impl WritebackCache {

// this point because there should have been a cache hit.)
let mut entry = entry.value().lock();

// Ensure only the latest version is inserted.
if version > entry.0 {
*entry = (version, object);
if object.is_newer_than(&entry) {
*entry = object;
}
}
}

fn cache_object_not_found(&self, object_id: &ObjectID) {
// when caching non-existence, we use version 0. Since that is less than OBJECT_START_VERSION
// it is guaranteed to lose the race with any reader than finds an extant version.
self.cache_latest_object_by_id(
object_id,
SequenceNumber::from_u64(0),
ObjectEntry::Deleted,
);
self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent);
}

fn clear_state_end_of_epoch_impl(&self, _execution_guard: &ExecutionLockWriteGuard<'_>) {
Expand Down Expand Up @@ -985,6 +1015,7 @@ impl WritebackCache {
info!("removing non-finalized package from cache: {:?}", object_id);
self.packages.invalidate(object_id);
}
self.cached.object_by_id_cache.invalidate(object_id);
}

// Note: individual object entries are removed when clear_state_end_of_epoch_impl is called
Expand Down Expand Up @@ -1064,19 +1095,19 @@ impl ObjectCacheRead for WritebackCache {
}

// get_object and variants.
//
// TODO: We don't insert objects into the cache after misses because they are usually only
// read once. We might want to cache immutable reads (RO shared objects and immutable objects)
// If we do this, we must be VERY CAREFUL not to break the contiguous version property
// of the cache.

fn get_object(&self, id: &ObjectID) -> SuiResult<Option<Object>> {
match self.get_object_by_id_cache_only(id) {
CacheResult::Hit((_, object)) => Ok(Some(object)),
CacheResult::NegativeHit => Ok(None),
CacheResult::Miss => {
let obj = self.store.get_object(id)?;
if obj.is_none() {
if let Some(obj) = &obj {
self.cache_latest_object_by_id(
id,
LatestObjectCacheEntry::Object(obj.version(), obj.clone().into()),
);
} else {
self.cache_object_not_found(id);
}
Ok(obj)
Expand Down Expand Up @@ -1216,18 +1247,22 @@ impl ObjectCacheRead for WritebackCache {
// if we have the latest version cached, and it is within the bound, we are done
if let Some(latest) = self.cached.object_by_id_cache.get(&object_id) {
let latest = latest.lock();
let (latest_version, entry) = &*latest;
if entry.is_live() {
if *latest_version <= version_bound {
if let ObjectEntry::Object(object) = entry {
return Ok(Some(object.clone()));
} else {
return Ok(None);
match &*latest {
LatestObjectCacheEntry::Object(latest_version, object) => {
if *latest_version <= version_bound {
if let ObjectEntry::Object(object) = object {
return Ok(Some(object.clone()));
} else {
// object is a tombstone, but is still within the version bound
return Ok(None);
}
}
// latest object is not within the version bound. fall through.
}
// No object by this ID exists at all
LatestObjectCacheEntry::NonExistent => {
return Ok(None);
}
} else {
assert_eq!(*latest_version, SequenceNumber::from_u64(0));
return Ok(None);
}
}

Expand All @@ -1240,19 +1275,50 @@ impl ObjectCacheRead for WritebackCache {
check_cache_entry!(cached_entry);

// Much of the time, the query will be for the very latest object version, so
// try that first.
if let Some(obj) = self.store.get_object(&object_id)? {
let obj_version = obj.version();
// we can always cache the latest object, even if it is not within the bound
self.cache_latest_object_by_id(&object_id, obj_version, obj.clone().into());
// try that first. But we have to be careful:
// 1. We must load the tombstone if it is present, because its version may exceed
// the version_bound, in which case we must do a scan.
// 2. You might think we could just call `self.store.get_latest_object_or_tombstone` here.
// But we cannot, because there may be a more recent version in the dirty set, which
// we skipped over in check_cache_entry! because of the version bound. If we skipped
// it above, we will skip it here as well, but it is more important to optimize the
// case where we miss the cache entirely. Put another way, we expect that most of the
// the time, if we reach this point, there will be nothing in the dirty set, and we
// will go straight to the db - but the dirty set check is still necessary for
// correctness.
let latest: Option<(SequenceNumber, ObjectEntry)> =
if let Some(dirty_set) = dirty_entry {
dirty_set.get_highest().cloned()
} else {
self.store.get_latest_object_or_tombstone(object_id)?.map(
|(ObjectKey(_, version), obj_or_tombstone)| {
(version, ObjectEntry::from(obj_or_tombstone))
},
)
};

if let Some((obj_version, obj_entry)) = latest {
// we can always cache the latest object (or tombstone), even if it is not within the
// version_bound. This is done in order to warm the cache in the case where a sequence
// of transactions all read the same child object without writing to it.
self.cache_latest_object_by_id(
&object_id,
LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
);

if obj_version <= version_bound {
Ok(Some(obj))
match obj_entry {
ObjectEntry::Object(object) => Ok(Some(object)),
ObjectEntry::Deleted | ObjectEntry::Wrapped => Ok(None),
}
} else {
// the latest object exceeded the bound, so now we have to do a scan
self.store
.find_object_lt_or_eq_version(object_id, version_bound)
}
} else {
// no object found in dirty set or db, object does not exist
assert!(cached_entry.is_none());
self.cache_object_not_found(&object_id);
Ok(None)
}
Expand Down
1 change: 1 addition & 0 deletions crates/sui-types/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ impl From<&ObjectRef> for ObjectKey {
}
}

#[derive(Clone)]
pub enum ObjectOrTombstone {
Object(Object),
Tombstone(ObjectRef),
Expand Down

0 comments on commit 8927d9e

Please sign in to comment.