Skip to content

Commit 5479236

Browse files
committed
Put IngredientIndex into SyncTable
1 parent 2b9d4bd commit 5479236

File tree

6 files changed

+30
-23
lines changed

6 files changed

+30
-23
lines changed

src/function.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ where
157157
lru: lru::Lru::new(lru),
158158
deleted_entries: Default::default(),
159159
view_caster,
160-
sync_table: Default::default(),
160+
sync_table: SyncTable::new(index),
161161
}
162162
}
163163

@@ -258,10 +258,7 @@ where
258258
/// Attempts to claim `key_index`, returning `false` if a cycle occurs.
259259
fn wait_for(&self, db: &dyn Database, key_index: Id) -> bool {
260260
let zalsa = db.zalsa();
261-
match self
262-
.sync_table
263-
.try_claim(db, zalsa, self.database_key_index(key_index))
264-
{
261+
match self.sync_table.try_claim(db, zalsa, key_index) {
265262
ClaimResult::Retry | ClaimResult::Claimed(_) => true,
266263
ClaimResult::Cycle => false,
267264
}

src/function/fetch.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,11 @@ where
9292
id: Id,
9393
memo_ingredient_index: MemoIngredientIndex,
9494
) -> Option<&'db Memo<C::Output<'db>>> {
95-
let database_key_index = self.database_key_index(id);
96-
9795
// Try to claim this query: if someone else has claimed it already, go back and start again.
98-
let _claim_guard = match self.sync_table.try_claim(db, zalsa, database_key_index) {
96+
let _claim_guard = match self.sync_table.try_claim(db, zalsa, id) {
9997
ClaimResult::Retry => return None,
10098
ClaimResult::Cycle => {
99+
let database_key_index = self.database_key_index(id);
101100
// check if there's a provisional value for this query
102101
// Note we don't `validate_may_be_provisional` the memo here as we want to reuse an
103102
// existing provisional memo if it exists
@@ -123,7 +122,7 @@ where
123122
}
124123
// no provisional value; create/insert/return initial provisional value
125124
return self
126-
.initial_value(db, database_key_index.key_index())
125+
.initial_value(db, id)
127126
.map(|initial_value| {
128127
tracing::debug!(
129128
"hit cycle at {database_key_index:#?}, \
@@ -157,7 +156,7 @@ where
157156
ClaimResult::Claimed(guard) => guard,
158157
};
159158

160-
let mut active_query = LazyActiveQueryGuard::new(database_key_index);
159+
let mut active_query = LazyActiveQueryGuard::new(self.database_key_index(id));
161160

162161
// Now that we've claimed the item, check again to see if there's a "hot" value.
163162
let opt_old_memo = self.get_memo_from_table_for(zalsa, id, memo_ingredient_index);

src/function/maybe_changed_after.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ where
103103
) -> Option<VerifyResult> {
104104
let database_key_index = self.database_key_index(key_index);
105105

106-
let _claim_guard = match self.sync_table.try_claim(db, zalsa, database_key_index) {
106+
let _claim_guard = match self.sync_table.try_claim(db, zalsa, key_index) {
107107
ClaimResult::Retry => return None,
108108
ClaimResult::Cycle => match C::CYCLE_STRATEGY {
109109
CycleRecoveryStrategy::Panic => db.zalsa_local().with_query_stack(|stack| {

src/function/sync.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@ use crate::{
77
key::DatabaseKeyIndex,
88
runtime::{BlockResult, WaitResult},
99
zalsa::Zalsa,
10-
Database, Id,
10+
Database, Id, IngredientIndex,
1111
};
1212

1313
/// Tracks the keys that are currently being processed; used to coordinate between
1414
/// worker threads.
15-
#[derive(Default)]
1615
pub(crate) struct SyncTable {
1716
syncs: Mutex<FxHashMap<Id, SyncState>>,
17+
ingredient: IngredientIndex,
1818
}
1919

2020
pub(crate) enum ClaimResult<'a> {
@@ -32,14 +32,21 @@ struct SyncState {
3232
}
3333

3434
impl SyncTable {
35+
pub(crate) fn new(ingredient: IngredientIndex) -> Self {
36+
Self {
37+
syncs: Default::default(),
38+
ingredient,
39+
}
40+
}
41+
3542
pub(crate) fn try_claim<'me>(
3643
&'me self,
3744
db: &'me (impl ?Sized + Database),
3845
zalsa: &'me Zalsa,
39-
database_key_index: DatabaseKeyIndex,
46+
key_index: Id,
4047
) -> ClaimResult<'me> {
4148
let mut write = self.syncs.lock();
42-
match write.entry(database_key_index.key_index()) {
49+
match write.entry(key_index) {
4350
std::collections::hash_map::Entry::Occupied(occupied_entry) => {
4451
let &mut SyncState {
4552
id,
@@ -52,7 +59,12 @@ impl SyncTable {
5259
// boolean is to decide *whether* to acquire the lock,
5360
// not to gate future atomic reads.
5461
*anyone_waiting = true;
55-
match zalsa.runtime().block_on(db, database_key_index, id, write) {
62+
match zalsa.runtime().block_on(
63+
db,
64+
DatabaseKeyIndex::new(self.ingredient, key_index),
65+
id,
66+
write,
67+
) {
5668
BlockResult::Completed => ClaimResult::Retry,
5769
BlockResult::Cycle => ClaimResult::Cycle,
5870
}
@@ -63,7 +75,7 @@ impl SyncTable {
6375
anyone_waiting: false,
6476
});
6577
ClaimResult::Claimed(ClaimGuard {
66-
database_key_index,
78+
key_index,
6779
zalsa,
6880
sync_table: self,
6981
_padding: false,
@@ -77,7 +89,7 @@ impl SyncTable {
7789
/// released when this value is dropped.
7890
#[must_use]
7991
pub(crate) struct ClaimGuard<'me> {
80-
database_key_index: DatabaseKeyIndex,
92+
key_index: Id,
8193
zalsa: &'me Zalsa,
8294
sync_table: &'me SyncTable,
8395
// Reduce the size of ClaimResult by making more niches available in ClaimGuard; this fits into
@@ -89,14 +101,13 @@ impl ClaimGuard<'_> {
89101
fn remove_from_map_and_unblock_queries(&self) {
90102
let mut syncs = self.sync_table.syncs.lock();
91103

92-
let SyncState { anyone_waiting, .. } =
93-
syncs.remove(&self.database_key_index.key_index()).unwrap();
104+
let SyncState { anyone_waiting, .. } = syncs.remove(&self.key_index).unwrap();
94105

95106
drop(syncs);
96107

97108
if anyone_waiting {
98109
self.zalsa.runtime().unblock_queries_blocked_on(
99-
self.database_key_index,
110+
DatabaseKeyIndex::new(self.sync_table.ingredient, self.key_index),
100111
if std::thread::panicking() {
101112
WaitResult::Panicked
102113
} else {

src/runtime.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ impl Runtime {
170170
other_id: ThreadId,
171171
query_mutex_guard: QueryMutexGuard,
172172
) -> BlockResult {
173-
let mut dg = self.dependency_graph.lock();
173+
let dg = self.dependency_graph.lock();
174174
let thread_id = std::thread::current().id();
175175

176176
if dg.depends_on(other_id, thread_id) {

src/runtime/dependency_graph.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ impl DependencyGraph {
3131
/// True if `from_id` depends on `to_id`.
3232
///
3333
/// (i.e., there is a path from `from_id` to `to_id` in the graph.)
34-
pub(super) fn depends_on(&mut self, from_id: ThreadId, to_id: ThreadId) -> bool {
34+
pub(super) fn depends_on(&self, from_id: ThreadId, to_id: ThreadId) -> bool {
3535
let mut p = from_id;
3636
while let Some(q) = self.edges.get(&p).map(|edge| edge.blocked_on_id) {
3737
if q == to_id {

0 commit comments

Comments
 (0)