Skip to content

Commit

Permalink
Support concurrent threads locking the same transaction (#19526) (#19530
Browse files Browse the repository at this point in the history
)

Fix crash when two threads concurrently lock the same transaction.

The new test case fails reliably if the fix is not present.
  • Loading branch information
mystenmark authored Sep 24, 2024
1 parent f1bba34 commit 0d3bec8
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 33 deletions.
62 changes: 29 additions & 33 deletions crates/sui-core/src/execution_cache/object_locks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ use sui_types::error::{SuiError, SuiResult, UserInputError};
use sui_types::object::Object;
use sui_types::storage::ObjectStore;
use sui_types::transaction::VerifiedSignedTransaction;
use tracing::{debug, info, instrument, trace};
use tracing::{debug, error, info, instrument, trace};

use super::writeback_cache::WritebackCache;

type RefCount = usize;

pub(super) struct ObjectLocks {
// When acquire transaction locks, lock entries are briefly inserted into this map. The map
// exists to provide atomic test-and-set operations on the locks. After all locks have been inserted
Expand All @@ -23,7 +25,7 @@ pub(super) struct ObjectLocks {
// those objects. Therefore we do a db read for each object we are locking.
//
// TODO: find a strategy to allow us to avoid db reads for each object.
locked_transactions: DashMap<ObjectRef, LockDetails>,
locked_transactions: DashMap<ObjectRef, (RefCount, LockDetails)>,
}

impl ObjectLocks {
Expand All @@ -38,29 +40,10 @@ impl ObjectLocks {
obj_ref: &ObjectRef,
epoch_store: &AuthorityPerEpochStore,
) -> SuiResult<Option<LockDetails>> {
match self.locked_transactions.entry(*obj_ref) {
DashMapEntry::Vacant(vacant) => {
let tables = epoch_store.tables()?;
let lock = tables.get_locked_transaction(obj_ref)?;
if let Some(lock_details) = lock {
vacant.insert(lock_details);
}
Ok(lock)
}
DashMapEntry::Occupied(occupied) => {
if cfg!(debug_assertions) {
if let Some(lock_details) = epoch_store
.tables()
.unwrap()
.get_locked_transaction(obj_ref)
.unwrap()
{
assert_eq!(*occupied.get(), lock_details);
}
}
Ok(Some(*occupied.get()))
}
}
// We don't consult the in-memory state here. We are only interested in state that
// has been committed to the db. This is because in memory state is reverted
// if the transaction is not successfully locked.
epoch_store.tables()?.get_locked_transaction(obj_ref)
}

/// Attempts to atomically test-and-set a transaction lock on an object.
Expand Down Expand Up @@ -96,15 +79,18 @@ impl ObjectLocks {
let tables = epoch_store.tables()?;
if let Some(lock_details) = tables.get_locked_transaction(obj_ref)? {
trace!("read lock from db: {:?}", lock_details);
vacant.insert(lock_details);
vacant.insert((1, lock_details));
lock_details
} else {
trace!("set lock: {:?}", new_lock);
vacant.insert(new_lock);
vacant.insert((1, new_lock));
new_lock
}
}
DashMapEntry::Occupied(occupied) => *occupied.get(),
DashMapEntry::Occupied(mut occupied) => {
occupied.get_mut().0 += 1;
occupied.get().1
}
};

if prev_lock != new_lock {
Expand Down Expand Up @@ -156,14 +142,24 @@ impl ObjectLocks {
fn clear_cached_locks(&self, locks: &[(ObjectRef, LockDetails)]) {
for (obj_ref, lock) in locks {
let entry = self.locked_transactions.entry(*obj_ref);
let occupied = match entry {
DashMapEntry::Vacant(_) => panic!("lock must exist"),
let mut occupied = match entry {
DashMapEntry::Vacant(_) => {
if cfg!(debug_assertions) {
panic!("lock must exist");
} else {
error!(?obj_ref, "lock should exist");
}
continue;
}
DashMapEntry::Occupied(occupied) => occupied,
};

if occupied.get() == lock {
trace!("clearing lock: {:?}", lock);
occupied.remove();
if occupied.get().1 == *lock {
occupied.get_mut().0 -= 1;
if occupied.get().0 == 0 {
trace!("clearing lock: {:?}", lock);
occupied.remove();
}
} else {
// this is impossible because the only case in which we overwrite a
// lock is when the lock is from a previous epoch. but we are holding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,79 @@ async fn test_concurrent_lockers() {
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_concurrent_lockers_same_tx() {
telemetry_subscribers::init_for_testing();

let mut s = Scenario::new(None, Arc::new(AtomicU32::new(0))).await;
let cache = s.cache.clone();
let mut txns = Vec::new();

for i in 0..1000 {
let a = i * 4;
let b = i * 4 + 1;
s.with_created(&[a, b]);
s.do_tx().await;

let a_ref = s.obj_ref(a);
let b_ref = s.obj_ref(b);

let tx1 = s.take_outputs();

let tx1 = s.make_signed_transaction(&tx1.transaction);

txns.push((tx1, a_ref, b_ref));
}

let barrier = Arc::new(tokio::sync::Barrier::new(2));

let t1 = {
let txns = txns.clone();
let cache = cache.clone();
let barrier = barrier.clone();
let epoch_store = s.epoch_store.clone();
tokio::task::spawn(async move {
let mut results = Vec::new();
for (tx1, a_ref, b_ref) in txns {
results.push(
cache
.acquire_transaction_locks(&epoch_store, &[a_ref, b_ref], tx1)
.await,
);
barrier.wait().await;
}
results
})
};

let t2 = {
let txns = txns.clone();
let cache = cache.clone();
let barrier = barrier.clone();
let epoch_store = s.epoch_store.clone();
tokio::task::spawn(async move {
let mut results = Vec::new();
for (tx1, a_ref, b_ref) in txns {
results.push(
cache
.acquire_transaction_locks(&epoch_store, &[a_ref, b_ref], tx1)
.await,
);
barrier.wait().await;
}
results
})
};

let results1 = t1.await.unwrap();
let results2 = t2.await.unwrap();

for (r1, r2) in results1.into_iter().zip(results2) {
assert!(r1.is_ok());
assert!(r2.is_ok());
}
}

#[tokio::test]
async fn latest_object_cache_race_test() {
let authority = TestAuthorityBuilder::new().build().await;
Expand Down

0 comments on commit 0d3bec8

Please sign in to comment.