Skip to content

Commit d3d3d65

Browse files
committed
Put IngredientIndex into SyncTable
1 parent 72b2dc5 commit d3d3d65

File tree

6 files changed

+30
-23
lines changed

6 files changed

+30
-23
lines changed

src/function.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ where
160160
lru: lru::Lru::new(lru),
161161
deleted_entries: Default::default(),
162162
view_caster,
163-
sync_table: Default::default(),
163+
sync_table: SyncTable::new(index),
164164
}
165165
}
166166

@@ -256,10 +256,7 @@ where
256256
/// Attempts to claim `key_index`, returning `false` if a cycle occurs.
257257
fn wait_for(&self, db: &dyn Database, key_index: Id) -> bool {
258258
let zalsa = db.zalsa();
259-
match self
260-
.sync_table
261-
.try_claim(db, zalsa, self.database_key_index(key_index))
262-
{
259+
match self.sync_table.try_claim(db, zalsa, key_index) {
263260
ClaimResult::Retry | ClaimResult::Claimed(_) => true,
264261
ClaimResult::Cycle => false,
265262
}

src/function/fetch.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,11 @@ where
103103
id: Id,
104104
memo_ingredient_index: MemoIngredientIndex,
105105
) -> Option<&'db Memo<C::Output<'db>>> {
106-
let database_key_index = self.database_key_index(id);
107-
108106
// Try to claim this query: if someone else has claimed it already, go back and start again.
109-
let _claim_guard = match self.sync_table.try_claim(db, zalsa, database_key_index) {
107+
let _claim_guard = match self.sync_table.try_claim(db, zalsa, id) {
110108
ClaimResult::Retry => return None,
111109
ClaimResult::Cycle => {
110+
let database_key_index = self.database_key_index(id);
112111
// check if there's a provisional value for this query
113112
// Note we don't `validate_may_be_provisional` the memo here as we want to reuse an
114113
// existing provisional memo if it exists
@@ -124,7 +123,7 @@ where
124123
}
125124
// no provisional value; create/insert/return initial provisional value
126125
return self
127-
.initial_value(db, database_key_index.key_index())
126+
.initial_value(db, id)
128127
.map(|initial_value| {
129128
tracing::debug!(
130129
"hit cycle at {database_key_index:#?}, \
@@ -155,7 +154,7 @@ where
155154
};
156155

157156
// Push the query on the stack.
158-
let active_query = db.zalsa_local().push_query(database_key_index);
157+
let active_query = db.zalsa_local().push_query(self.database_key_index(id));
159158

160159
// Now that we've claimed the item, check again to see if there's a "hot" value.
161160
let opt_old_memo = self.get_memo_from_table_for(zalsa, id, memo_ingredient_index);

src/function/maybe_changed_after.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ where
9898
) -> Option<VerifyResult> {
9999
let database_key_index = self.database_key_index(key_index);
100100

101-
let _claim_guard = match self.sync_table.try_claim(db, zalsa, database_key_index) {
101+
let _claim_guard = match self.sync_table.try_claim(db, zalsa, key_index) {
102102
ClaimResult::Retry => return None,
103103
ClaimResult::Cycle => match C::CYCLE_STRATEGY {
104104
CycleRecoveryStrategy::Panic => panic!(

src/function/sync.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@ use crate::{
77
key::DatabaseKeyIndex,
88
runtime::{BlockResult, WaitResult},
99
zalsa::Zalsa,
10-
Database, Id,
10+
Database, Id, IngredientIndex,
1111
};
1212

1313
/// Tracks the keys that are currently being processed; used to coordinate between
1414
/// worker threads.
15-
#[derive(Default)]
1615
pub(crate) struct SyncTable {
1716
syncs: Mutex<FxHashMap<Id, SyncState>>,
17+
ingredient: IngredientIndex,
1818
}
1919

2020
pub(crate) enum ClaimResult<'a> {
@@ -32,14 +32,21 @@ struct SyncState {
3232
}
3333

3434
impl SyncTable {
35+
pub(crate) fn new(ingredient: IngredientIndex) -> Self {
36+
Self {
37+
syncs: Default::default(),
38+
ingredient,
39+
}
40+
}
41+
3542
pub(crate) fn try_claim<'me>(
3643
&'me self,
3744
db: &'me (impl ?Sized + Database),
3845
zalsa: &'me Zalsa,
39-
database_key_index: DatabaseKeyIndex,
46+
key_index: Id,
4047
) -> ClaimResult<'me> {
4148
let mut write = self.syncs.lock();
42-
match write.entry(database_key_index.key_index()) {
49+
match write.entry(key_index) {
4350
std::collections::hash_map::Entry::Occupied(occupied_entry) => {
4451
let &mut SyncState {
4552
id,
@@ -52,7 +59,12 @@ impl SyncTable {
5259
// boolean is to decide *whether* to acquire the lock,
5360
// not to gate future atomic reads.
5461
*anyone_waiting = true;
55-
match zalsa.runtime().block_on(db, database_key_index, id, write) {
62+
match zalsa.runtime().block_on(
63+
db,
64+
DatabaseKeyIndex::new(self.ingredient, key_index),
65+
id,
66+
write,
67+
) {
5668
BlockResult::Completed => ClaimResult::Retry,
5769
BlockResult::Cycle => ClaimResult::Cycle,
5870
}
@@ -63,7 +75,7 @@ impl SyncTable {
6375
anyone_waiting: false,
6476
});
6577
ClaimResult::Claimed(ClaimGuard {
66-
database_key_index,
78+
key_index,
6779
zalsa,
6880
sync_table: self,
6981
_padding: false,
@@ -77,7 +89,7 @@ impl SyncTable {
7789
/// released when this value is dropped.
7890
#[must_use]
7991
pub(crate) struct ClaimGuard<'me> {
80-
database_key_index: DatabaseKeyIndex,
92+
key_index: Id,
8193
zalsa: &'me Zalsa,
8294
sync_table: &'me SyncTable,
8395
// Reduce the size of ClaimResult by making more niches available in ClaimGuard; this fits into
@@ -89,14 +101,13 @@ impl ClaimGuard<'_> {
89101
fn remove_from_map_and_unblock_queries(&self) {
90102
let mut syncs = self.sync_table.syncs.lock();
91103

92-
let SyncState { anyone_waiting, .. } =
93-
syncs.remove(&self.database_key_index.key_index()).unwrap();
104+
let SyncState { anyone_waiting, .. } = syncs.remove(&self.key_index).unwrap();
94105

95106
drop(syncs);
96107

97108
if anyone_waiting {
98109
self.zalsa.runtime().unblock_queries_blocked_on(
99-
self.database_key_index,
110+
DatabaseKeyIndex::new(self.sync_table.ingredient, self.key_index),
100111
if std::thread::panicking() {
101112
WaitResult::Panicked
102113
} else {

src/runtime.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ impl Runtime {
173173
other_id: ThreadId,
174174
query_mutex_guard: QueryMutexGuard,
175175
) -> BlockResult {
176-
let mut dg = self.dependency_graph.lock();
176+
let dg = self.dependency_graph.lock();
177177
let thread_id = std::thread::current().id();
178178

179179
if dg.depends_on(other_id, thread_id) {

src/runtime/dependency_graph.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ impl DependencyGraph {
2828
/// True if `from_id` depends on `to_id`.
2929
///
3030
/// (i.e., there is a path from `from_id` to `to_id` in the graph.)
31-
pub(super) fn depends_on(&mut self, from_id: ThreadId, to_id: ThreadId) -> bool {
31+
pub(super) fn depends_on(&self, from_id: ThreadId, to_id: ThreadId) -> bool {
3232
let mut p = from_id;
3333
while let Some(q) = self.edges.get(&p).map(|edge| edge.blocked_on_id) {
3434
if q == to_id {

0 commit comments

Comments
 (0)