Skip to content

Commit

Permalink
chore(*): activate more miri test
Browse files Browse the repository at this point in the history
  • Loading branch information
wvwwvwwv committed Aug 12, 2024
1 parent 5151bd7 commit c4a072d
Showing 1 changed file with 133 additions and 78 deletions.
211 changes: 133 additions & 78 deletions src/tests/correctness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ mod hashmap_test {
use std::panic::UnwindSafe;
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use std::sync::atomic::{AtomicU64, AtomicUsize};
use std::sync::{Arc, Barrier};
use std::thread;
use std::sync::Arc;
use tokio::sync::Barrier as AsyncBarrier;

static_assertions::assert_impl_all!(HashMap<String, String>: Send, Sync, UnwindSafe);
Expand Down Expand Up @@ -618,65 +617,91 @@ mod hashmap_test {
}

#[cfg_attr(miri, ignore)]
#[test]
fn iterator() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn iterator() {
let data_size = 4096;
for _ in 0..16 {
let hashmap: Arc<HashMap<u64, u64>> = Arc::new(HashMap::default());
let hashmap_copied = hashmap.clone();
let barrier = Arc::new(Barrier::new(2));
let barrier_copied = barrier.clone();
let barrier = Arc::new(AsyncBarrier::new(2));
let barrier_clone = barrier.clone();
let inserted = Arc::new(AtomicU64::new(0));
let inserted_copied = inserted.clone();
let inserted_clone = inserted.clone();
let removed = Arc::new(AtomicU64::new(data_size));
let removed_copied = removed.clone();
let thread_handle = thread::spawn(move || {
let removed_clone = removed.clone();
let task_handle = tokio::task::spawn(async move {
// test insert
for _ in 0..2 {
barrier_copied.wait();
let mut scanned = 0;
let mut checker = BTreeSet::new();
let max = inserted_copied.load(Acquire);
hashmap_copied.retain(|k, _| {
barrier_clone.wait().await;
let mut scanned = 0;
let mut checker = BTreeSet::new();
let mut max = inserted_clone.load(Acquire);
hashmap_copied.retain(|k, _| {
scanned += 1;
checker.insert(*k);
true
});
for key in 0..max {
assert!(checker.contains(&key));
}

barrier_clone.wait().await;
scanned = 0;
checker = BTreeSet::new();
max = inserted_clone.load(Acquire);
hashmap_copied
.retain_async(|k, _| {
scanned += 1;
checker.insert(*k);
true
});
for key in 0..max {
assert!(checker.contains(&key));
}
})
.await;
for key in 0..max {
assert!(checker.contains(&key));
}

// test remove
for _ in 0..2 {
barrier_copied.wait();
let mut scanned = 0;
let max = removed_copied.load(Acquire);
hashmap_copied.retain(|k, _| {
barrier_clone.wait().await;
scanned = 0;
max = removed_clone.load(Acquire);
hashmap_copied.retain(|k, _| {
scanned += 1;
assert!(*k < max);
true
});

barrier_clone.wait().await;
scanned = 0;
max = removed_clone.load(Acquire);
hashmap_copied
.retain_async(|k, _| {
scanned += 1;
assert!(*k < max);
true
});
}
})
.await;
});

// insert
barrier.wait();
barrier.wait().await;
for i in 0..data_size {
if i == data_size / 2 {
barrier.wait();
barrier.wait().await;
}
assert!(hashmap.insert(i, i).is_ok());
inserted.store(i, Release);
}

// remove
barrier.wait();
barrier.wait().await;
for i in (0..data_size).rev() {
if i == data_size / 2 {
barrier.wait();
barrier.wait().await;
}
assert!(hashmap.remove(&i).is_some());
removed.store(i, Release);
}
thread_handle.join().unwrap();

assert!(task_handle.await.is_ok());
}
}

Expand Down Expand Up @@ -787,7 +812,7 @@ mod hashindex_test {
use std::panic::UnwindSafe;
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use std::sync::atomic::{fence, AtomicU64, AtomicUsize};
use std::sync::{Arc, Barrier};
use std::sync::Arc;
use std::thread;
use tokio::sync::Barrier as AsyncBarrier;

Expand Down Expand Up @@ -1177,60 +1202,91 @@ mod hashindex_test {
}

#[cfg_attr(miri, ignore)]
#[test]
fn iterator() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn iterator() {
let data_size = 4096;
for _ in 0..64 {
for _ in 0..16 {
let hashindex: Arc<HashIndex<u64, u64>> = Arc::new(HashIndex::default());
let hashindex_copied = hashindex.clone();
let barrier = Arc::new(Barrier::new(2));
let barrier_copied = barrier.clone();
let hashindex_clone = hashindex.clone();
let barrier = Arc::new(AsyncBarrier::new(2));
let barrier_clone = barrier.clone();
let inserted = Arc::new(AtomicU64::new(0));
let inserted_copied = inserted.clone();
let inserted_clone = inserted.clone();
let removed = Arc::new(AtomicU64::new(data_size));
let removed_copied = removed.clone();
let thread_handle = thread::spawn(move || {
let removed_clone = removed.clone();
let task_handle = tokio::task::spawn(async move {
// test insert
for _ in 0..2 {
barrier_copied.wait();
let mut checker = BTreeSet::new();
let max = inserted_copied.load(Acquire);
for iter in hashindex_copied.iter(&Guard::new()) {
checker.insert(*iter.0);
}
for key in 0..max {
assert!(checker.contains(&key));
}
barrier_clone.wait().await;
let mut scanned = 0;
let mut checker = BTreeSet::new();
let mut max = inserted_clone.load(Acquire);
hashindex_clone.retain(|k, _| {
scanned += 1;
checker.insert(*k);
true
});
for key in 0..max {
assert!(checker.contains(&key));
}
// test remove
for _ in 0..2 {
barrier_copied.wait();
let max = removed_copied.load(Acquire);
for iter in hashindex_copied.iter(&Guard::new()) {
assert!(*iter.0 < max);
}

barrier_clone.wait().await;
scanned = 0;
checker = BTreeSet::new();
max = inserted_clone.load(Acquire);
hashindex_clone
.retain_async(|k, _| {
scanned += 1;
checker.insert(*k);
true
})
.await;
for key in 0..max {
assert!(checker.contains(&key));
}

// test remove
barrier_clone.wait().await;
scanned = 0;
max = removed_clone.load(Acquire);
hashindex_clone.retain(|k, _| {
scanned += 1;
assert!(*k < max);
true
});

barrier_clone.wait().await;
scanned = 0;
max = removed_clone.load(Acquire);
hashindex_clone
.retain_async(|k, _| {
scanned += 1;
assert!(*k < max);
true
})
.await;
});

// insert
barrier.wait();
barrier.wait().await;
for i in 0..data_size {
if i == data_size / 2 {
barrier.wait();
barrier.wait().await;
}
assert!(hashindex.insert(i, i).is_ok());
inserted.store(i, Release);
}

// remove
barrier.wait();
barrier.wait().await;
for i in (0..data_size).rev() {
if i == data_size / 2 {
barrier.wait();
barrier.wait().await;
}
assert!(hashindex.remove(&i));
assert!(hashindex.peek_with(&i, |_, _| ()).is_none());
removed.store(i, Release);
}
thread_handle.join().unwrap();

assert!(task_handle.await.is_ok());
}
}

Expand Down Expand Up @@ -2119,10 +2175,10 @@ mod treeindex_test {
let mut thread_handles = Vec::with_capacity(num_threads);
for thread_id in 0..num_threads {
let tree_copied = tree.clone();
let barrier_copied = barrier.clone();
let barrier_clone = barrier.clone();
thread_handles.push(thread::spawn(move || {
let first_key = thread_id * range;
barrier_copied.wait();
barrier_clone.wait();
for key in first_key..(first_key + range / 2) {
assert!(tree_copied.insert(key, key).is_ok());
}
Expand Down Expand Up @@ -2207,10 +2263,10 @@ mod treeindex_test {
for thread_id in 0..num_threads {
let tree_copied = tree.clone();
let stopped_copied = stopped.clone();
let barrier_copied = barrier.clone();
let barrier_clone = barrier.clone();
thread_handles.push(thread::spawn(move || {
let first_key = thread_id * range;
barrier_copied.wait();
barrier_clone.wait();
while !stopped_copied.load(Relaxed) {
for key in (first_key + 1)..(first_key + range) {
assert!(tree_copied.insert(key, key).is_ok());
Expand Down Expand Up @@ -2298,9 +2354,9 @@ mod treeindex_test {
let mut thread_handles = Vec::with_capacity(num_threads);
for thread_id in 0..num_threads {
let tree_copied = tree.clone();
let barrier_copied = barrier.clone();
let barrier_clone = barrier.clone();
thread_handles.push(thread::spawn(move || {
barrier_copied.wait();
barrier_clone.wait();
let data_size = if cfg!(miri) { 32 } else { 4096 };
for _ in 0..data_size {
let range = 0..32;
Expand Down Expand Up @@ -2380,14 +2436,14 @@ mod treeindex_test {
let mut thread_handles = Vec::new();
for _ in 0..2 {
let tree_copied = tree.clone();
let barrier_copied = barrier.clone();
let inserted_copied = inserted.clone();
let removed_copied = removed.clone();
let barrier_clone = barrier.clone();
let inserted_clone = inserted.clone();
let removed_clone = removed.clone();
let thread_handle = thread::spawn(move || {
// test insert
for _ in 0..2 {
barrier_copied.wait();
let max = inserted_copied.load(Acquire);
barrier_clone.wait();
let max = inserted_clone.load(Acquire);
let mut prev = 0;
let mut iterated = 0;
let guard = Guard::new();
Expand All @@ -2404,9 +2460,9 @@ mod treeindex_test {
}
// test remove
for _ in 0..2 {
barrier_copied.wait();
barrier_clone.wait();
let mut prev = 0;
let max = removed_copied.load(Acquire);
let max = removed_clone.load(Acquire);
let guard = Guard::new();
for iter in tree_copied.iter(&guard) {
let current = *iter.0;
Expand Down Expand Up @@ -2801,7 +2857,6 @@ mod queue_test {
println!("{cnt}");
}

#[cfg_attr(miri, ignore)]
#[test]
fn iter_push_pop() {
const NUM_TASKS: usize = 4;
Expand Down

0 comments on commit c4a072d

Please sign in to comment.