Skip to content

Commit 617c988

Browse files
committed
Mark grow_hashtable as safe. Extract linked list handling
1 parent d6e110d commit 617c988

File tree

1 file changed

+49
-37
lines changed

1 file changed

+49
-37
lines changed

core/src/parking_lot.rs

+49-37
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,7 @@ impl ThreadData {
153153
// Keep track of the total number of live ThreadData objects and resize
154154
// the hash table accordingly.
155155
let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1;
156-
unsafe {
157-
grow_hashtable(num_threads);
158-
}
156+
grow_hashtable(num_threads);
159157

160158
ThreadData {
161159
parker: ThreadParker::new(),
@@ -239,57 +237,44 @@ fn create_hashtable() -> &'static HashTable {
239237
// Grow the hash table so that it is big enough for the given number of threads.
240238
// This isn't performance-critical since it is only done when a ThreadData is
241239
// created, which only happens once per thread.
242-
unsafe fn grow_hashtable(num_threads: usize) {
243-
let mut old_table = get_hashtable();
244-
loop {
240+
fn grow_hashtable(num_threads: usize) {
241+
// Lock all buckets in the existing table and get a reference to it
242+
let old_table = loop {
243+
let table = get_hashtable();
244+
245245
// Check if we need to resize the existing table
246-
if (*old_table).entries.len() >= LOAD_FACTOR * num_threads {
246+
if table.entries.len() >= LOAD_FACTOR * num_threads {
247247
return;
248248
}
249249

250250
// Lock all buckets in the old table
251-
for b in &(*old_table).entries[..] {
252-
b.mutex.lock();
251+
for bucket in &table.entries[..] {
252+
bucket.mutex.lock();
253253
}
254254

255255
// Now check if our table is still the latest one. Another thread could
256256
// have grown the hash table between us reading HASHTABLE and locking
257257
// the buckets.
258-
if HASHTABLE.load(Ordering::Relaxed) == old_table as *const _ as *mut _ {
259-
break;
258+
if HASHTABLE.load(Ordering::Relaxed) == table as *const _ as *mut _ {
259+
break table;
260260
}
261261

262262
// Unlock buckets and try again
263-
for b in &(*old_table).entries[..] {
263+
for bucket in &table.entries[..] {
264264
// SAFETY: We hold the lock here, as required
265-
b.mutex.unlock();
265+
unsafe { bucket.mutex.unlock() };
266266
}
267-
268-
// SAFETY: when not null, `HASHTABLE` always points to a `HashTable` that is never freed.
269-
// Here we know `get_hashtable` above must have initialized `HASHTABLE`
270-
old_table = &*HASHTABLE.load(Ordering::Acquire);
271-
}
267+
};
272268

273269
// Create the new table
274-
let new_table = HashTable::new(num_threads, old_table);
270+
let mut new_table = HashTable::new(num_threads, old_table);
275271

276272
// Move the entries from the old table to the new one
277-
for b in &(*old_table).entries[..] {
278-
let mut current = b.queue_head.get();
279-
while !current.is_null() {
280-
let next = (*current).next_in_queue.get();
281-
let hash = hash((*current).key.load(Ordering::Relaxed), new_table.hash_bits);
282-
if new_table.entries[hash].queue_tail.get().is_null() {
283-
new_table.entries[hash].queue_head.set(current);
284-
} else {
285-
(*new_table.entries[hash].queue_tail.get())
286-
.next_in_queue
287-
.set(current);
288-
}
289-
new_table.entries[hash].queue_tail.set(current);
290-
(*current).next_in_queue.set(ptr::null());
291-
current = next;
292-
}
273+
for bucket in &old_table.entries[..] {
274+
// SAFETY: The park, unpark* and check_wait_graph_fast functions create only correct linked
275+
// lists. All `ThreadData` instances in these lists will remain valid as long as they are
276+
// present in the lists, meaning as long as their threads are parked.
277+
unsafe { rehash_bucket_into(bucket, &mut new_table) };
293278
}
294279

295280
// Publish the new table. No races are possible at this point because
@@ -298,9 +283,36 @@ unsafe fn grow_hashtable(num_threads: usize) {
298283
HASHTABLE.store(Box::into_raw(new_table), Ordering::Release);
299284

300285
// Unlock all buckets in the old table
301-
for b in &(*old_table).entries[..] {
286+
for bucket in &old_table.entries[..] {
302287
// SAFETY: We hold the lock here, as required
303-
b.mutex.unlock();
288+
unsafe { bucket.mutex.unlock() };
289+
}
290+
}
291+
292+
/// Iterate through all `ThreadData` objects in the bucket and insert them into the given table
293+
/// in the bucket their key correspond to for this table.
294+
///
295+
/// # Safety
296+
///
297+
/// The given `bucket` must have a correctly constructed linked list under `queue_head`, containing
298+
/// `ThreadData` instances that must stay valid at least as long as the given `table` is in use.
299+
///
300+
/// The given `table` must only contain buckets with correctly constructed linked lists.
301+
unsafe fn rehash_bucket_into(bucket: &'static Bucket, table: &mut HashTable) {
302+
let mut current: *const ThreadData = bucket.queue_head.get();
303+
while !current.is_null() {
304+
let next = (*current).next_in_queue.get();
305+
let hash = hash((*current).key.load(Ordering::Relaxed), table.hash_bits);
306+
if table.entries[hash].queue_tail.get().is_null() {
307+
table.entries[hash].queue_head.set(current);
308+
} else {
309+
(*table.entries[hash].queue_tail.get())
310+
.next_in_queue
311+
.set(current);
312+
}
313+
table.entries[hash].queue_tail.set(current);
314+
(*current).next_in_queue.set(ptr::null());
315+
current = next;
304316
}
305317
}
306318

0 commit comments

Comments
 (0)