Skip to content

Commit 82111cf

Browse files
authored
Implement compute_if_present (#32)
1 parent 208cfcf commit 82111cf

File tree

2 files changed

+295
-0
lines changed

2 files changed

+295
-0
lines changed

src/map.rs

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,221 @@ where
542542
}
543543
}
544544

545+
/// If the value for the specified `key` is present, attempts to
546+
/// compute a new mapping given the key and its current mapped value.
547+
///
548+
/// The new mapping is computed by the `remapping_function`, which may
549+
/// return `None` to signalize that the mapping should be removed.
550+
/// The entire method invocation is performed atomically.
551+
/// The supplied function is invoked exactly once per invocation of
552+
/// this method if the key is present, else not at all. Some
553+
/// attempted update operations on this map by other threads may be
554+
/// blocked while computation is in progress, so the computation
555+
/// should be short and simple.
556+
///
557+
/// Returns the new value associated with the specified `key`, or `None`
558+
/// if no value for the specified `key` is present.
559+
pub fn compute_if_present<'g, Q, F>(
560+
&'g self,
561+
key: &Q,
562+
remapping_function: F,
563+
guard: &'g Guard,
564+
) -> Option<&'g V>
565+
where
566+
K: Borrow<Q>,
567+
Q: ?Sized + Hash + Eq,
568+
F: FnOnce(&K, &V) -> Option<V>,
569+
{
570+
let h = self.hash(&key);
571+
572+
let mut table = self.table.load(Ordering::SeqCst, guard);
573+
574+
loop {
575+
// safety: see argument below for !is_null case
576+
if table.is_null() || unsafe { table.deref() }.len() == 0 {
577+
table = self.init_table(guard);
578+
continue;
579+
}
580+
581+
// safety: table is a valid pointer.
582+
//
583+
// we are in one of three cases:
584+
//
585+
// 1. if table is the one we read before the loop, then we read it while holding the
586+
// guard, so it won't be dropped until after we drop that guard b/c the drop logic
587+
// only queues a drop for the next epoch after removing the table.
588+
//
589+
// 2. if table is read by init_table, then either we did a load, and the argument is
590+
// as for point 1. or, we allocated a table, in which case the earliest it can be
591+
// deallocated is in the next epoch. we are holding up the epoch by holding the
592+
// guard, so this deref is safe.
593+
//
594+
// 3. if table is set by a Moved node (below) through help_transfer, it will _either_
595+
// keep using `table` (which is fine by 1. and 2.), or use the `next_table` raw
596+
// pointer from inside the Moved. to see that if a Moved(t) is _read_, then t must
597+
// still be valid, see the safety comment on BinEntry::Moved.
598+
let t = unsafe { table.deref() };
599+
600+
let bini = t.bini(h);
601+
let bin = t.bin(bini, guard);
602+
if bin.is_null() {
603+
// fast path -- bin is empty so key is not present
604+
return None;
605+
}
606+
607+
// slow path -- bin is non-empty
608+
// safety: bin is a valid pointer.
609+
//
610+
// there are two cases when a bin pointer is invalidated:
611+
//
612+
// 1. if the table was resized, bin is a move entry, and the resize has completed. in
613+
// that case, the table (and all its heads) will be dropped in the next epoch
614+
// following that.
615+
// 2. if the table is being resized, bin may be swapped with a move entry. the old bin
616+
// will then be dropped in the following epoch after that happens.
617+
//
618+
// in both cases, we held the guard when we got the reference to the bin. if any such
619+
// swap happened, it must have happened _after_ we read. since we did the read while
620+
// pinning the epoch, the drop must happen in the _next_ epoch (i.e., the one that we
621+
// are holding up by holding on to our guard).
622+
match *unsafe { bin.deref() } {
623+
BinEntry::Moved(next_table) => {
624+
table = self.help_transfer(table, next_table, guard);
625+
}
626+
BinEntry::Node(ref head) => {
627+
// bin is non-empty, need to link into it, so we must take the lock
628+
let head_lock = head.lock.lock();
629+
630+
// need to check that this is _still_ the head
631+
let current_head = t.bin(bini, guard);
632+
if current_head != bin {
633+
// nope -- try again from the start
634+
continue;
635+
}
636+
637+
// yes, it is still the head, so we can now "own" the bin
638+
// note that there can still be readers in the bin!
639+
640+
// TODO: TreeBin & ReservationNode
641+
642+
let mut removed_node = false;
643+
let mut bin_count = 1;
644+
let mut p = bin;
645+
let mut pred: Shared<'_, BinEntry<K, V>> = Shared::null();
646+
647+
let new_val = loop {
648+
// safety: we read the bin while pinning the epoch. a bin will never be
649+
// dropped until the next epoch after it is removed. since it wasn't
650+
// removed, and the epoch was pinned, that cannot be until after we drop
651+
// our guard.
652+
let n = unsafe { p.deref() }.as_node().unwrap();
653+
// TODO: This Ordering can probably be relaxed due to the Mutex
654+
let next = n.next.load(Ordering::SeqCst, guard);
655+
if n.hash == h && n.key.borrow() == key {
656+
// the key already exists in the map!
657+
let current_value = n.value.load(Ordering::SeqCst, guard);
658+
659+
// safety: since the value is present now, and we've held a guard from
660+
// the beginning of the search, the value cannot be dropped until the
661+
// next epoch, which won't arrive until after we drop our guard.
662+
let new_value =
663+
remapping_function(&n.key, unsafe { current_value.deref() });
664+
665+
if let Some(value) = new_value {
666+
let now_garbage =
667+
n.value.swap(Owned::new(value), Ordering::SeqCst, guard);
668+
// NOTE: now_garbage == current_value
669+
670+
// safety: need to guarantee that now_garbage is no longer
671+
// reachable. more specifically, no thread that executes _after_
672+
// this line can ever get a reference to now_garbage.
673+
//
674+
// here are the possible cases:
675+
//
676+
// - another thread already has a reference to now_garbage.
677+
// they must have read it before the call to swap.
678+
// because of this, that thread must be pinned to an epoch <=
679+
// the epoch of our guard. since the garbage is placed in our
680+
// epoch, it won't be freed until the _next_ epoch, at which
681+
// point, that thread must have dropped its guard, and with it,
682+
// any reference to the value.
683+
// - another thread is about to get a reference to this value.
684+
// they execute _after_ the swap, and therefore do _not_ get a
685+
// reference to now_garbage (they get value instead). there are
686+
// no other ways to get to a value except through its Node's
687+
// `value` field (which is what we swapped), so freeing
688+
// now_garbage is fine.
689+
unsafe { guard.defer_destroy(now_garbage) };
690+
691+
// safety: since the value is present now, and we've held a guard from
692+
// the beginning of the search, the value cannot be dropped until the
693+
// next epoch, which won't arrive until after we drop our guard.
694+
break Some(unsafe {
695+
n.value.load(Ordering::SeqCst, guard).deref()
696+
});
697+
} else {
698+
removed_node = true;
699+
// remove the BinEntry containing the removed key value pair from the bucket
700+
if !pred.is_null() {
701+
// either by changing the pointer of the previous BinEntry, if present
702+
// safety: see remove
703+
unsafe { pred.deref() }
704+
.as_node()
705+
.unwrap()
706+
.next
707+
.store(next, Ordering::SeqCst);
708+
} else {
709+
// or by setting the next node as the first BinEntry if there is no previous entry
710+
t.store_bin(bini, next);
711+
}
712+
713+
// in either case, mark the BinEntry as garbage, since it was just removed
714+
// safety: need to guarantee that the old value is no longer
715+
// reachable. more specifically, no thread that executes _after_
716+
// this line can ever get a reference to val.
717+
//
718+
// here are the possible cases:
719+
//
720+
// - another thread already has a reference to the old value.
721+
// they must have read it before the call to store_bin.
722+
// because of this, that thread must be pinned to an epoch <=
723+
// the epoch of our guard. since the garbage is placed in our
724+
// epoch, it won't be freed until the _next_ epoch, at which
725+
// point, that thread must have dropped its guard, and with it,
726+
// any reference to the value.
727+
// - another thread is about to get a reference to this value.
728+
// they execute _after_ the store_bin, and therefore do _not_ get a
729+
// reference to the old value. there are no other ways to get to a
730+
// value except through its Node's `value` field (which is now gone
731+
// together with the node), so freeing the old value is fine.
732+
unsafe { guard.defer_destroy(p) };
733+
unsafe { guard.defer_destroy(current_value) };
734+
break None;
735+
}
736+
}
737+
738+
pred = p;
739+
if next.is_null() {
740+
// we're at the end of the bin
741+
break None;
742+
}
743+
p = next;
744+
745+
bin_count += 1;
746+
};
747+
drop(head_lock);
748+
749+
if removed_node {
750+
// decrement count
751+
self.add_count(-1, Some(bin_count), guard);
752+
}
753+
guard.flush();
754+
return new_val;
755+
}
756+
}
757+
}
758+
}
759+
545760
fn help_transfer<'g>(
546761
&'g self,
547762
table: Shared<'g, Table<K, V>>,

tests/basic.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,48 @@ fn update() {
164164
}
165165
}
166166

167+
#[test]
168+
fn compute_if_present() {
169+
let map = HashMap::<usize, usize>::new();
170+
171+
let guard = epoch::pin();
172+
map.insert(42, 0, &guard);
173+
let new = map.compute_if_present(&42, |_, v| Some(v + 1), &guard);
174+
assert_eq!(new, Some(&1));
175+
{
176+
let guard = epoch::pin();
177+
let e = map.get(&42, &guard).unwrap();
178+
assert_eq!(e, &1);
179+
}
180+
}
181+
182+
#[test]
183+
fn compute_if_present_empty() {
184+
let map = HashMap::<usize, usize>::new();
185+
186+
let guard = epoch::pin();
187+
let new = map.compute_if_present(&42, |_, v| Some(v + 1), &guard);
188+
assert!(new.is_none());
189+
{
190+
let guard = epoch::pin();
191+
assert!(map.get(&42, &guard).is_none());
192+
}
193+
}
194+
195+
#[test]
196+
fn compute_if_present_remove() {
197+
let map = HashMap::<usize, usize>::new();
198+
199+
let guard = epoch::pin();
200+
map.insert(42, 0, &guard);
201+
let new = map.compute_if_present(&42, |_, _| None, &guard);
202+
assert!(new.is_none());
203+
{
204+
let guard = epoch::pin();
205+
assert!(map.get(&42, &guard).is_none());
206+
}
207+
}
208+
167209
#[test]
168210
fn concurrent_insert() {
169211
let map = Arc::new(HashMap::<usize, usize>::new());
@@ -234,6 +276,44 @@ fn concurrent_remove() {
234276
}
235277
}
236278

279+
#[test]
280+
fn concurrent_compute_if_present() {
281+
let map = Arc::new(HashMap::<usize, usize>::new());
282+
283+
{
284+
let guard = epoch::pin();
285+
for i in 0..64 {
286+
map.insert(i, i, &guard);
287+
}
288+
}
289+
290+
let map1 = map.clone();
291+
let t1 = std::thread::spawn(move || {
292+
let guard = epoch::pin();
293+
for i in 0..64 {
294+
let new = map1.compute_if_present(&i, |_, _| None, &guard);
295+
assert!(new.is_none());
296+
}
297+
});
298+
let map2 = map.clone();
299+
let t2 = std::thread::spawn(move || {
300+
let guard = epoch::pin();
301+
for i in 0..64 {
302+
let new = map2.compute_if_present(&i, |_, _| None, &guard);
303+
assert!(new.is_none());
304+
}
305+
});
306+
307+
t1.join().unwrap();
308+
t2.join().unwrap();
309+
310+
// after joining the threads, the map should be empty
311+
let guard = epoch::pin();
312+
for i in 0..64 {
313+
assert!(map.get(&i, &guard).is_none());
314+
}
315+
}
316+
237317
#[test]
238318
fn current_kv_dropped() {
239319
let dropped1 = Arc::new(0);

0 commit comments

Comments
 (0)