Skip to content

Commit fe07f3f

Browse files
committed
Per ingredient sync table
1 parent db9d4f8 commit fe07f3f

File tree

10 files changed

+51
-143
lines changed

10 files changed

+51
-143
lines changed

src/function.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ pub(crate) use maybe_changed_after::VerifyResult;
88
use crate::accumulator::accumulated_map::{AccumulatedMap, InputAccumulatedValues};
99
use crate::cycle::{CycleHeadKind, CycleRecoveryAction, CycleRecoveryStrategy};
1010
use crate::function::delete::DeletedEntries;
11+
use crate::function::sync::{ClaimResult, SyncTable};
1112
use crate::ingredient::{fmt_index, Ingredient};
1213
use crate::key::DatabaseKeyIndex;
1314
use crate::plumbing::MemoIngredientMap;
1415
use crate::salsa_struct::SalsaStructInDb;
1516
use crate::table::memo::MemoTableTypes;
16-
use crate::table::sync::ClaimResult;
1717
use crate::table::Table;
1818
use crate::views::DatabaseDownCaster;
1919
use crate::zalsa::{IngredientIndex, MemoIngredientIndex, Zalsa};
@@ -31,6 +31,7 @@ mod lru;
3131
mod maybe_changed_after;
3232
mod memo;
3333
mod specify;
34+
mod sync;
3435

3536
pub type Memo<C> = memo::Memo<<C as Configuration>::Output<'static>>;
3637

@@ -120,6 +121,8 @@ pub struct IngredientImpl<C: Configuration> {
120121
/// instances that this downcaster was derived from.
121122
view_caster: DatabaseDownCaster<C::DbView>,
122123

124+
sync_table: SyncTable,
125+
123126
/// When `fetch` and friends executes, they return a reference to the
124127
/// value stored in the memo that is extended to live as long as the `&self`
125128
/// reference we start with. This means that whenever we remove something
@@ -161,6 +164,7 @@ where
161164
lru: lru::Lru::new(lru),
162165
deleted_entries: Default::default(),
163166
view_caster,
167+
sync_table: Default::default(),
164168
}
165169
}
166170

@@ -269,12 +273,10 @@ where
269273
/// Attempts to claim `key_index`, returning `false` if a cycle occurs.
270274
fn wait_for(&self, db: &dyn Database, key_index: Id) -> bool {
271275
let zalsa = db.zalsa();
272-
match zalsa.sync_table_for(key_index).claim(
273-
db,
274-
zalsa,
275-
self.database_key_index(key_index),
276-
self.memo_ingredient_index(zalsa, key_index),
277-
) {
276+
match self
277+
.sync_table
278+
.try_claim(db, zalsa, self.database_key_index(key_index))
279+
{
278280
ClaimResult::Retry | ClaimResult::Claimed(_) => true,
279281
ClaimResult::Cycle => false,
280282
}

src/function/fetch.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::cycle::{CycleHeads, CycleRecoveryStrategy};
22
use crate::function::memo::Memo;
3+
use crate::function::sync::ClaimResult;
34
use crate::function::{Configuration, IngredientImpl, VerifyResult};
4-
use crate::table::sync::ClaimResult;
55
use crate::zalsa::{MemoIngredientIndex, Zalsa, ZalsaDatabase};
66
use crate::zalsa_local::QueryRevisions;
77
use crate::Id;
@@ -99,15 +99,8 @@ where
9999
let database_key_index = self.database_key_index(id);
100100

101101
// Try to claim this query: if someone else has claimed it already, go back and start again.
102-
let _claim_guard = match zalsa.sync_table_for(id).claim(
103-
db,
104-
zalsa,
105-
database_key_index,
106-
memo_ingredient_index,
107-
) {
108-
ClaimResult::Retry => {
109-
return None;
110-
}
102+
let _claim_guard = match self.sync_table.try_claim(db, zalsa, database_key_index) {
103+
ClaimResult::Retry => return None,
111104
ClaimResult::Cycle => {
112105
// check if there's a provisional value for this query
113106
// Note we don't `validate_may_be_provisional` the memo here as we want to reuse an

src/function/maybe_changed_after.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ use std::sync::atomic::Ordering;
33
use crate::accumulator::accumulated_map::InputAccumulatedValues;
44
use crate::cycle::{CycleHeadKind, CycleHeads, CycleRecoveryStrategy};
55
use crate::function::memo::Memo;
6+
use crate::function::sync::ClaimResult;
67
use crate::function::{Configuration, IngredientImpl};
78
use crate::key::DatabaseKeyIndex;
8-
use crate::table::sync::ClaimResult;
99
use crate::zalsa::{MemoIngredientIndex, Zalsa, ZalsaDatabase};
1010
use crate::zalsa_local::{QueryEdge, QueryOrigin};
1111
use crate::{AsDynDatabase as _, Id, Revision};
@@ -102,12 +102,7 @@ where
102102
) -> Option<VerifyResult> {
103103
let database_key_index = self.database_key_index(key_index);
104104

105-
let _claim_guard = match zalsa.sync_table_for(key_index).claim(
106-
db,
107-
zalsa,
108-
database_key_index,
109-
memo_ingredient_index,
110-
) {
105+
let _claim_guard = match self.sync_table.try_claim(db, zalsa, database_key_index) {
111106
ClaimResult::Retry => return None,
112107
ClaimResult::Cycle => match C::CYCLE_STRATEGY {
113108
CycleRecoveryStrategy::Panic => db.zalsa_local().with_query_stack(|stack| {

src/table/sync.rs renamed to src/function/sync.rs

Lines changed: 36 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,26 @@
11
use std::thread::ThreadId;
22

33
use parking_lot::Mutex;
4+
use rustc_hash::FxHashMap;
45

5-
use crate::key::DatabaseKeyIndex;
6-
use crate::runtime::{BlockResult, WaitResult};
7-
use crate::table::util;
8-
use crate::zalsa::{MemoIngredientIndex, Zalsa};
9-
use crate::Database;
6+
use crate::{
7+
key::DatabaseKeyIndex,
8+
runtime::{BlockResult, WaitResult},
9+
zalsa::Zalsa,
10+
Database, Id,
11+
};
1012

1113
/// Tracks the keys that are currently being processed; used to coordinate between
1214
/// worker threads.
1315
#[derive(Default)]
1416
pub(crate) struct SyncTable {
15-
syncs: Mutex<Vec<Option<SyncState>>>,
17+
syncs: Mutex<FxHashMap<Id, SyncState>>,
18+
}
19+
20+
pub(crate) enum ClaimResult<'a> {
21+
Retry,
22+
Cycle,
23+
Claimed(ClaimGuard<'a>),
1624
}
1725

1826
struct SyncState {
@@ -23,59 +31,44 @@ struct SyncState {
2331
anyone_waiting: bool,
2432
}
2533

26-
pub(crate) enum ClaimResult<'a> {
27-
Retry,
28-
Cycle,
29-
Claimed(ClaimGuard<'a>),
30-
}
31-
3234
impl SyncTable {
33-
#[inline]
34-
pub(crate) fn claim<'me>(
35+
pub(crate) fn try_claim<'me>(
3536
&'me self,
3637
db: &'me (impl ?Sized + Database),
3738
zalsa: &'me Zalsa,
3839
database_key_index: DatabaseKeyIndex,
39-
memo_ingredient_index: MemoIngredientIndex,
4040
) -> ClaimResult<'me> {
41-
let mut syncs = self.syncs.lock();
42-
let thread_id = std::thread::current().id();
43-
44-
util::ensure_vec_len(&mut syncs, memo_ingredient_index.as_usize() + 1);
45-
46-
match &mut syncs[memo_ingredient_index.as_usize()] {
47-
None => {
48-
syncs[memo_ingredient_index.as_usize()] = Some(SyncState {
49-
id: thread_id,
50-
anyone_waiting: false,
51-
});
52-
ClaimResult::Claimed(ClaimGuard {
53-
database_key_index,
54-
memo_ingredient_index,
55-
zalsa,
56-
sync_table: self,
57-
_padding: false,
58-
})
59-
}
60-
Some(SyncState {
61-
id: other_id,
62-
anyone_waiting,
63-
}) => {
41+
let mut write = self.syncs.lock();
42+
match write.entry(database_key_index.key_index()) {
43+
std::collections::hash_map::Entry::Occupied(occupied_entry) => {
44+
let &mut SyncState {
45+
id,
46+
ref mut anyone_waiting,
47+
} = occupied_entry.into_mut();
6448
// NB: `Ordering::Relaxed` is sufficient here,
6549
// as there are no loads that are "gated" on this
6650
// value. Everything that is written is also protected
6751
// by a lock that must be acquired. The role of this
6852
// boolean is to decide *whether* to acquire the lock,
6953
// not to gate future atomic reads.
7054
*anyone_waiting = true;
71-
match zalsa
72-
.runtime()
73-
.block_on(db, database_key_index, *other_id, syncs)
74-
{
55+
match zalsa.runtime().block_on(db, database_key_index, id, write) {
7556
BlockResult::Completed => ClaimResult::Retry,
7657
BlockResult::Cycle => ClaimResult::Cycle,
7758
}
7859
}
60+
std::collections::hash_map::Entry::Vacant(vacant_entry) => {
61+
vacant_entry.insert(SyncState {
62+
id: std::thread::current().id(),
63+
anyone_waiting: false,
64+
});
65+
ClaimResult::Claimed(ClaimGuard {
66+
database_key_index,
67+
zalsa,
68+
sync_table: self,
69+
_padding: false,
70+
})
71+
}
7972
}
8073
}
8174
}
@@ -85,7 +78,6 @@ impl SyncTable {
8578
#[must_use]
8679
pub(crate) struct ClaimGuard<'me> {
8780
database_key_index: DatabaseKeyIndex,
88-
memo_ingredient_index: MemoIngredientIndex,
8981
zalsa: &'me Zalsa,
9082
sync_table: &'me SyncTable,
9183
// Reduce the size of ClaimResult by making more niches available in ClaimGuard; this fits into
@@ -98,7 +90,7 @@ impl ClaimGuard<'_> {
9890
let mut syncs = self.sync_table.syncs.lock();
9991

10092
let SyncState { anyone_waiting, .. } =
101-
syncs[self.memo_ingredient_index.as_usize()].take().unwrap();
93+
syncs.remove(&self.database_key_index.key_index()).unwrap();
10294

10395
if anyone_waiting {
10496
self.zalsa.runtime().unblock_queries_blocked_on(

src/input.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use crate::input::singleton::{Singleton, SingletonChoice};
1616
use crate::key::DatabaseKeyIndex;
1717
use crate::plumbing::{Jar, Stamp};
1818
use crate::table::memo::{MemoTable, MemoTableTypes};
19-
use crate::table::sync::SyncTable;
2019
use crate::table::{Slot, Table};
2120
use crate::zalsa::{IngredientIndex, Zalsa};
2221
use crate::{Database, Durability, Id, Revision, Runtime};
@@ -107,7 +106,6 @@ impl<C: Configuration> IngredientImpl<C> {
107106
fields,
108107
stamps,
109108
memos: Default::default(),
110-
syncs: Default::default(),
111109
})
112110
});
113111

@@ -252,9 +250,6 @@ where
252250

253251
/// Memos
254252
memos: MemoTable,
255-
256-
/// Syncs
257-
syncs: SyncTable,
258253
}
259254

260255
impl<C> Value<C>
@@ -288,9 +283,4 @@ where
288283
fn memos_mut(&mut self) -> &mut crate::table::memo::MemoTable {
289284
&mut self.memos
290285
}
291-
292-
#[inline(always)]
293-
unsafe fn syncs(&self, _current_revision: Revision) -> &SyncTable {
294-
&self.syncs
295-
}
296286
}

src/interned.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use crate::ingredient::{fmt_index, Ingredient};
1818
use crate::plumbing::{IngredientIndices, Jar};
1919
use crate::revision::AtomicRevision;
2020
use crate::table::memo::{MemoTable, MemoTableTypes};
21-
use crate::table::sync::SyncTable;
2221
use crate::table::Slot;
2322
use crate::zalsa::{IngredientIndex, Zalsa};
2423
use crate::{Database, DatabaseKeyIndex, Event, EventKind, Id, Revision};
@@ -78,7 +77,6 @@ where
7877
{
7978
fields: C::Fields<'static>,
8079
memos: MemoTable,
81-
syncs: SyncTable,
8280

8381
/// The revision the value was first interned in.
8482
first_interned_at: Revision,
@@ -311,7 +309,6 @@ where
311309
let id = zalsa_local.allocate(zalsa, self.ingredient_index, |id| Value::<C> {
312310
fields: unsafe { self.to_internal_data(assemble(id, key)) },
313311
memos: Default::default(),
314-
syncs: Default::default(),
315312
durability: AtomicU8::new(durability.as_u8()),
316313
// Record the revision we are interning in.
317314
first_interned_at: current_revision,
@@ -463,11 +460,6 @@ where
463460
fn memos_mut(&mut self) -> &mut MemoTable {
464461
&mut self.memos
465462
}
466-
467-
#[inline(always)]
468-
unsafe fn syncs(&self, _current_revision: Revision) -> &crate::table::sync::SyncTable {
469-
&self.syncs
470-
}
471463
}
472464

473465
/// A trait for types that hash and compare like `O`.

src/table.rs

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,11 @@ use std::sync::Arc;
1111
use memo::MemoTable;
1212
use parking_lot::Mutex;
1313
use rustc_hash::FxHashMap;
14-
use sync::SyncTable;
1514

1615
use crate::table::memo::{MemoTableTypes, MemoTableWithTypes, MemoTableWithTypesMut};
1716
use crate::{Id, IngredientIndex, Revision};
1817

1918
pub(crate) mod memo;
20-
pub(crate) mod sync;
21-
mod util;
2219

2320
const PAGE_LEN_BITS: usize = 10;
2421
const PAGE_LEN_MASK: usize = PAGE_LEN - 1;
@@ -44,13 +41,6 @@ pub(crate) trait Slot: Any + Send + Sync {
4441

4542
/// Mutably access the [`MemoTable`] for this slot.
4643
fn memos_mut(&mut self) -> &mut MemoTable;
47-
48-
/// Access the [`SyncTable`][] for this slot.
49-
///
50-
/// # Safety condition
51-
///
52-
/// The current revision MUST be the current revision of the database containing this slot.
53-
unsafe fn syncs(&self, current_revision: Revision) -> &SyncTable;
5444
}
5545

5646
/// [Slot::memos]
@@ -61,17 +51,12 @@ type SlotMemosFn<T> = unsafe fn(&T, current_revision: Revision) -> &MemoTable;
6151
type SlotMemosMutFnRaw = unsafe fn(*mut ()) -> *mut MemoTable;
6252
/// [Slot::memos_mut]
6353
type SlotMemosMutFn<T> = unsafe fn(&mut T) -> &mut MemoTable;
64-
/// [Slot::syncs]
65-
type SlotSyncsFnRaw = unsafe fn(*const (), current_revision: Revision) -> *const SyncTable;
66-
/// [Slot::syncs]
67-
type SlotSyncsFn<T> = unsafe fn(&T, current_revision: Revision) -> &SyncTable;
6854

6955
struct SlotVTable {
7056
layout: Layout,
7157
/// [`Slot`] methods
7258
memos: SlotMemosFnRaw,
7359
memos_mut: SlotMemosMutFnRaw,
74-
syncs: SlotSyncsFnRaw,
7560
/// A drop impl to call when the own page drops
7661
/// SAFETY: The caller is required to supply a correct data pointer to a `Box<PageDataEntry<T>>` and initialized length,
7762
/// and correct memo types.
@@ -99,8 +84,6 @@ impl SlotVTable {
9984
memos_mut: unsafe {
10085
mem::transmute::<SlotMemosMutFn<T>, SlotMemosMutFnRaw>(T::memos_mut)
10186
},
102-
// SAFETY: The signatures are compatible
103-
syncs: unsafe { mem::transmute::<SlotSyncsFn<T>, SlotSyncsFnRaw>(T::syncs) },
10487
}
10588
}
10689
}
@@ -267,19 +250,6 @@ impl Table {
267250
unsafe { page.memo_types.attach_memos_mut(memos) }
268251
}
269252

270-
/// Get the sync table associated with `id`
271-
///
272-
/// # Safety condition
273-
///
274-
/// The parameter `current_revision` MUST be the current revision
275-
/// of the owner of database owning this table.
276-
pub(crate) unsafe fn syncs(&self, id: Id, current_revision: Revision) -> &SyncTable {
277-
let (page, slot) = split_id(id);
278-
let page = &self.pages[page.0];
279-
// SAFETY: We supply a proper slot pointer and the caller is required to pass the `current_revision`.
280-
unsafe { &*(page.slot_vtable.syncs)(page.get(slot), current_revision) }
281-
}
282-
283253
pub(crate) fn slots_of<T: Slot>(&self) -> impl Iterator<Item = &T> + '_ {
284254
self.pages
285255
.iter()

src/table/util.rs

Lines changed: 0 additions & 5 deletions
This file was deleted.

0 commit comments

Comments
 (0)