Skip to content

Commit

Permalink
Several caching improvements: (MystenLabs#17930)
Browse files Browse the repository at this point in the history
- Populate the cache with the latest object version when possible.
- Add negative caching for getting object by id.
- Use latest version cache for satisfying dynamic field reads
  • Loading branch information
mystenmark authored and tx-tomcat committed Jul 29, 2024
1 parent 84a6b1e commit 324dd12
Show file tree
Hide file tree
Showing 4 changed files with 492 additions and 27 deletions.
1 change: 0 additions & 1 deletion crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,6 @@ impl AuthorityStore {
objects: &[ObjectRef],
is_force_reset: bool,
) -> SuiResult {
trace!(?objects, "initialize_locks");
AuthorityStore::initialize_live_object_markers(
&self.perpetual_tables.live_owned_object_markers,
write_batch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
// SPDX-License-Identifier: Apache-2.0

use prometheus::default_registry;
use rand::{rngs::StdRng, SeedableRng};
use rand::{rngs::StdRng, Rng, SeedableRng};
use std::{
collections::BTreeMap,
future::Future,
path::PathBuf,
sync::atomic::Ordering,
sync::{atomic::AtomicU32, Arc},
time::{Duration, Instant},
};
use sui_framework::BuiltInFramework;
use sui_macros::{register_fail_point_async, sim_test};
Expand Down Expand Up @@ -123,6 +124,7 @@ impl Scenario {
println!("running with cache eviction after step {}", i);
let count = Arc::new(AtomicU32::new(0));
let action = Box::new(|s: &mut Scenario| {
println!("evict_caches()");
s.evict_caches();
});
let fut = f(Scenario::new(Some((i, action)), count.clone()).await);
Expand Down Expand Up @@ -197,14 +199,14 @@ impl Scenario {
)
}

fn bump_version(object: Object) -> Object {
fn inc_version_by(object: Object, delta: u64) -> Object {
let version = object.version();
let mut inner = object.into_inner();
inner
.data
.try_as_move_mut()
.unwrap()
.increment_version_to(version.next());
.increment_version_to(SequenceNumber::from_u64(version.value() + delta));
inner.into()
}

Expand Down Expand Up @@ -256,6 +258,10 @@ impl Scenario {
}

pub fn with_mutated(&mut self, short_ids: &[u32]) {
self.with_mutated_version_delta(short_ids, 1);
}

pub fn with_mutated_version_delta(&mut self, short_ids: &[u32], delta: u64) {
// for every id in short_ids, assert than an object with that id exists, and
// mutate it
for short_id in short_ids {
Expand All @@ -264,7 +270,7 @@ impl Scenario {
self.outputs
.locks_to_delete
.push(object.compute_object_reference());
let object = Self::bump_version(object);
let object = Self::inc_version_by(object, delta);
self.objects.insert(*id, object.clone());
self.outputs
.new_locks_to_init
Expand Down Expand Up @@ -737,6 +743,112 @@ async fn test_lt_or_eq() {
.await;
}

#[tokio::test]
async fn test_lt_or_eq_caching() {
telemetry_subscribers::init_for_testing();
Scenario::iterate(|mut s| async move {
// make 3 versions of the object
s.with_created(&[1]);
let tx1 = s.do_tx().await;
s.with_mutated_version_delta(&[1], 2);
let tx2 = s.do_tx().await;
s.with_mutated_version_delta(&[1], 2);
let tx3 = s.do_tx().await;
s.commit(tx1).await.unwrap();
s.commit(tx2).await.unwrap();
s.commit(tx3).await.unwrap();

s.reset_cache();

let check_version = |lookup_version: u64, expected_version: u64| {
let lookup_version = SequenceNumber::from_u64(lookup_version);
let expected_version = SequenceNumber::from_u64(expected_version);
assert_eq!(
s.cache()
.find_object_lt_or_eq_version(s.obj_id(1), lookup_version)
.unwrap()
.unwrap()
.version(),
expected_version
);
};

// latest object not yet cached
assert!(!s.cache.cached.object_by_id_cache.contains_key(&s.obj_id(1)));

// version <= 0 does not exist
assert!(s
.cache()
.find_object_lt_or_eq_version(s.obj_id(1), 0.into())
.unwrap()
.is_none());

// query above populates cache
assert_eq!(
s.cache
.cached
.object_by_id_cache
.get(&s.obj_id(1))
.unwrap()
.lock()
.version()
.unwrap()
.value(),
5
);

// all queries get correct answer with a populated cache
check_version(1, 1);
check_version(2, 1);
check_version(3, 3);
check_version(4, 3);
check_version(5, 5);
check_version(6, 5);
check_version(7, 5);
})
.await;
}

#[tokio::test]
async fn test_lt_or_eq_with_cached_tombstone() {
telemetry_subscribers::init_for_testing();
Scenario::iterate(|mut s| async move {
// make an object, and a tombstone
s.with_created(&[1]);
let tx1 = s.do_tx().await;
s.with_deleted(&[1]);
let tx2 = s.do_tx().await;
s.commit(tx1).await.unwrap();
s.commit(tx2).await.unwrap();

s.reset_cache();

let check_version = |lookup_version: u64, expected_version: Option<u64>| {
let lookup_version = SequenceNumber::from_u64(lookup_version);
assert_eq!(
s.cache()
.find_object_lt_or_eq_version(s.obj_id(1), lookup_version)
.unwrap()
.map(|v| v.version()),
expected_version.map(SequenceNumber::from_u64)
);
};

// latest object not yet cached
assert!(!s.cache.cached.object_by_id_cache.contains_key(&s.obj_id(1)));

// version 2 is deleted
check_version(2, None);

// checking the version pulled the tombstone into the cache
assert!(s.cache.cached.object_by_id_cache.contains_key(&s.obj_id(1)));

// version 1 is still found, tombstone in cache is ignored
check_version(1, Some(1));
})
.await;
}

#[tokio::test]
async fn test_write_transaction_outputs_is_sync() {
telemetry_subscribers::init_for_testing();
Expand Down Expand Up @@ -1032,3 +1144,95 @@ async fn test_concurrent_lockers() {
assert_eq!(r1.is_ok(), r2.is_err());
}
}

#[tokio::test]
async fn latest_object_cache_race_test() {
let authority = TestAuthorityBuilder::new().build().await;

let store = authority.database_for_testing().clone();

static METRICS: once_cell::sync::Lazy<Arc<ExecutionCacheMetrics>> =
once_cell::sync::Lazy::new(|| Arc::new(ExecutionCacheMetrics::new(default_registry())));

let cache = Arc::new(WritebackCache::new(store.clone(), (*METRICS).clone()));

let object_id = ObjectID::random();
let owner = SuiAddress::random_for_testing_only();

// a writer thread that keeps writing new versions
let writer = {
let cache = cache.clone();
let start = Instant::now();
std::thread::spawn(move || {
let mut version = OBJECT_START_VERSION;
while start.elapsed() < Duration::from_secs(2) {
let object = Object::with_id_owner_version_for_testing(object_id, version, owner);

cache
.write_object_entry(&object_id, version, object.into())
.now_or_never()
.unwrap();

version = version.next();
}
})
};

// a reader thread that pretends it saw some previous version on the db
let reader = {
let cache = cache.clone();
let start = Instant::now();
std::thread::spawn(move || {
while start.elapsed() < Duration::from_secs(2) {
let Some(latest_version) = cache
.cached
.object_by_id_cache
.get(&object_id)
.and_then(|e| e.lock().version())
else {
continue;
};

// with probability 0.1, sleep for 1µs, so that we are further out of date.
if rand::thread_rng().gen_bool(0.1) {
std::thread::sleep(Duration::from_micros(1));
}

let object =
Object::with_id_owner_version_for_testing(object_id, latest_version, owner);

cache.cache_latest_object_by_id(
&object_id,
LatestObjectCacheEntry::Object(latest_version, object.into()),
);
}
})
};

// a thread that does nothing but watch to see if the cache goes back in time
let checker = {
let cache = cache.clone();
let start = Instant::now();
std::thread::spawn(move || {
let mut latest = OBJECT_START_VERSION;

while start.elapsed() < Duration::from_secs(2) {
let Some(cur) = cache
.cached
.object_by_id_cache
.get(&object_id)
.and_then(|e| e.lock().version())
else {
continue;
};

assert!(cur >= latest);
latest = cur;
}
})
};

writer.join().unwrap();
reader.join().unwrap();
checker.join().unwrap();
}
Loading

0 comments on commit 324dd12

Please sign in to comment.