Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Several caching improvements: #17930

Merged
merged 10 commits into from
May 30, 2024
1 change: 0 additions & 1 deletion crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,6 @@ impl AuthorityStore {
objects: &[ObjectRef],
is_force_reset: bool,
) -> SuiResult {
trace!(?objects, "initialize_locks");
AuthorityStore::initialize_live_object_markers(
&self.perpetual_tables.live_owned_object_markers,
write_batch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
// SPDX-License-Identifier: Apache-2.0

use prometheus::default_registry;
use rand::{rngs::StdRng, SeedableRng};
use rand::{rngs::StdRng, Rng, SeedableRng};
use std::{
collections::BTreeMap,
future::Future,
path::PathBuf,
sync::atomic::Ordering,
sync::{atomic::AtomicU32, Arc},
time::{Duration, Instant},
};
use sui_framework::BuiltInFramework;
use sui_macros::{register_fail_point_async, sim_test};
Expand Down Expand Up @@ -123,6 +124,7 @@ impl Scenario {
println!("running with cache eviction after step {}", i);
let count = Arc::new(AtomicU32::new(0));
let action = Box::new(|s: &mut Scenario| {
println!("evict_caches()");
s.evict_caches();
});
let fut = f(Scenario::new(Some((i, action)), count.clone()).await);
Expand Down Expand Up @@ -197,14 +199,14 @@ impl Scenario {
)
}

fn bump_version(object: Object) -> Object {
fn inc_version_by(object: Object, delta: u64) -> Object {
let version = object.version();
let mut inner = object.into_inner();
inner
.data
.try_as_move_mut()
.unwrap()
.increment_version_to(version.next());
.increment_version_to(SequenceNumber::from_u64(version.value() + delta));
inner.into()
}

Expand Down Expand Up @@ -256,6 +258,10 @@ impl Scenario {
}

pub fn with_mutated(&mut self, short_ids: &[u32]) {
self.with_mutated_version_delta(short_ids, 1);
}

pub fn with_mutated_version_delta(&mut self, short_ids: &[u32], delta: u64) {
// for every id in short_ids, assert than an object with that id exists, and
// mutate it
for short_id in short_ids {
Expand All @@ -264,7 +270,7 @@ impl Scenario {
self.outputs
.locks_to_delete
.push(object.compute_object_reference());
let object = Self::bump_version(object);
let object = Self::inc_version_by(object, delta);
self.objects.insert(*id, object.clone());
self.outputs
.new_locks_to_init
Expand Down Expand Up @@ -737,6 +743,112 @@ async fn test_lt_or_eq() {
.await;
}

#[tokio::test]
async fn test_lt_or_eq_caching() {
telemetry_subscribers::init_for_testing();
Scenario::iterate(|mut s| async move {
// make 3 versions of the object
s.with_created(&[1]);
let tx1 = s.do_tx().await;
s.with_mutated_version_delta(&[1], 2);
let tx2 = s.do_tx().await;
s.with_mutated_version_delta(&[1], 2);
let tx3 = s.do_tx().await;
s.commit(tx1).await.unwrap();
s.commit(tx2).await.unwrap();
s.commit(tx3).await.unwrap();

s.reset_cache();

let check_version = |lookup_version: u64, expected_version: u64| {
let lookup_version = SequenceNumber::from_u64(lookup_version);
let expected_version = SequenceNumber::from_u64(expected_version);
assert_eq!(
s.cache()
.find_object_lt_or_eq_version(s.obj_id(1), lookup_version)
.unwrap()
.unwrap()
.version(),
expected_version
);
};

// latest object not yet cached
assert!(!s.cache.cached.object_by_id_cache.contains_key(&s.obj_id(1)));

// version <= 0 does not exist
assert!(s
.cache()
.find_object_lt_or_eq_version(s.obj_id(1), 0.into())
.unwrap()
.is_none());

// query above populates cache
assert_eq!(
s.cache
.cached
.object_by_id_cache
.get(&s.obj_id(1))
.unwrap()
.lock()
.version()
.unwrap()
.value(),
5
);

// all queries get correct answer with a populated cache
check_version(1, 1);
check_version(2, 1);
check_version(3, 3);
check_version(4, 3);
check_version(5, 5);
check_version(6, 5);
check_version(7, 5);
})
.await;
}

#[tokio::test]
async fn test_lt_or_eq_with_cached_tombstone() {
telemetry_subscribers::init_for_testing();
Scenario::iterate(|mut s| async move {
// make an object, and a tombstone
s.with_created(&[1]);
let tx1 = s.do_tx().await;
s.with_deleted(&[1]);
let tx2 = s.do_tx().await;
s.commit(tx1).await.unwrap();
s.commit(tx2).await.unwrap();

s.reset_cache();

let check_version = |lookup_version: u64, expected_version: Option<u64>| {
let lookup_version = SequenceNumber::from_u64(lookup_version);
assert_eq!(
s.cache()
.find_object_lt_or_eq_version(s.obj_id(1), lookup_version)
.unwrap()
.map(|v| v.version()),
expected_version.map(SequenceNumber::from_u64)
);
};

// latest object not yet cached
assert!(!s.cache.cached.object_by_id_cache.contains_key(&s.obj_id(1)));

// version 2 is deleted
check_version(2, None);

// checking the version pulled the tombstone into the cache
assert!(s.cache.cached.object_by_id_cache.contains_key(&s.obj_id(1)));

// version 1 is still found, tombstone in cache is ignored
check_version(1, Some(1));
})
.await;
}

#[tokio::test]
async fn test_write_transaction_outputs_is_sync() {
telemetry_subscribers::init_for_testing();
Expand Down Expand Up @@ -1032,3 +1144,95 @@ async fn test_concurrent_lockers() {
assert_eq!(r1.is_ok(), r2.is_err());
}
}

#[tokio::test]
async fn latest_object_cache_race_test() {
let authority = TestAuthorityBuilder::new().build().await;

let store = authority.database_for_testing().clone();

static METRICS: once_cell::sync::Lazy<Arc<ExecutionCacheMetrics>> =
once_cell::sync::Lazy::new(|| Arc::new(ExecutionCacheMetrics::new(default_registry())));

let cache = Arc::new(WritebackCache::new(store.clone(), (*METRICS).clone()));

let object_id = ObjectID::random();
let owner = SuiAddress::random_for_testing_only();

// a writer thread that keeps writing new versions
let writer = {
let cache = cache.clone();
let start = Instant::now();
std::thread::spawn(move || {
let mut version = OBJECT_START_VERSION;
while start.elapsed() < Duration::from_secs(2) {
let object = Object::with_id_owner_version_for_testing(object_id, version, owner);

cache
.write_object_entry(&object_id, version, object.into())
.now_or_never()
.unwrap();

version = version.next();
}
})
};

// a reader thread that pretends it saw some previous version on the db
let reader = {
let cache = cache.clone();
let start = Instant::now();
std::thread::spawn(move || {
while start.elapsed() < Duration::from_secs(2) {
let Some(latest_version) = cache
.cached
.object_by_id_cache
.get(&object_id)
.and_then(|e| e.lock().version())
else {
continue;
};

// with probability 0.1, sleep for 1µs, so that we are further out of date.
if rand::thread_rng().gen_bool(0.1) {
std::thread::sleep(Duration::from_micros(1));
}

let object =
Object::with_id_owner_version_for_testing(object_id, latest_version, owner);

cache.cache_latest_object_by_id(
&object_id,
LatestObjectCacheEntry::Object(latest_version, object.into()),
);
}
})
};

// a thread that does nothing but watch to see if the cache goes back in time
let checker = {
let cache = cache.clone();
let start = Instant::now();
std::thread::spawn(move || {
let mut latest = OBJECT_START_VERSION;

while start.elapsed() < Duration::from_secs(2) {
let Some(cur) = cache
.cached
.object_by_id_cache
.get(&object_id)
.and_then(|e| e.lock().version())
else {
continue;
};

assert!(cur >= latest);
latest = cur;
}
})
};

writer.join().unwrap();
reader.join().unwrap();
checker.join().unwrap();
}
Loading
Loading