Skip to content

Commit 1963cd8

Browse files
authored
Monotonic timestamps (#2618)
1 parent 1aad021 commit 1963cd8

File tree

6 files changed

+160
-111
lines changed

6 files changed

+160
-111
lines changed

crates/core/src/host/instance_env.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::replica_context::ReplicaContext;
77
use core::mem;
88
use parking_lot::{Mutex, MutexGuard};
99
use smallvec::SmallVec;
10+
use spacetimedb_lib::Timestamp;
1011
use spacetimedb_primitives::{ColId, ColList, IndexId, TableId};
1112
use spacetimedb_sats::{
1213
bsatn::{self, ToBsatn},
@@ -23,6 +24,8 @@ pub struct InstanceEnv {
2324
pub replica_ctx: Arc<ReplicaContext>,
2425
pub scheduler: Scheduler,
2526
pub tx: TxSlot,
27+
/// The timestamp the current reducer began running.
28+
pub start_time: Timestamp,
2629
}
2730

2831
#[derive(Clone, Default)]
@@ -168,9 +171,15 @@ impl InstanceEnv {
168171
replica_ctx,
169172
scheduler,
170173
tx: TxSlot::default(),
174+
start_time: Timestamp::now(),
171175
}
172176
}
173177

178+
/// Signal to this `InstanceEnv` that a reducer call is beginning.
179+
pub fn start_reducer(&mut self, ts: Timestamp) {
180+
self.start_time = ts;
181+
}
182+
174183
fn get_tx(&self) -> Result<impl DerefMut<Target = MutTxId> + '_, GetTxError> {
175184
self.tx.get()
176185
}
@@ -260,7 +269,14 @@ impl InstanceEnv {
260269
// as we successfully inserted and thus `ret` is verified against the table schema.
261270
.map_err(|e| NodesError::ScheduleError(ScheduleError::DecodingError(e)))?;
262271
self.scheduler
263-
.schedule(table_id, schedule_id, schedule_at, id_column, at_column)
272+
.schedule(
273+
table_id,
274+
schedule_id,
275+
schedule_at,
276+
id_column,
277+
at_column,
278+
self.start_time,
279+
)
264280
.map_err(NodesError::ScheduleError)?;
265281

266282
Ok(())
@@ -490,6 +506,8 @@ impl From<GetTxError> for NodesError {
490506

491507
#[cfg(test)]
492508
mod test {
509+
use super::*;
510+
493511
use std::{ops::Bound, sync::Arc};
494512

495513
use crate::{
@@ -515,8 +533,6 @@ mod test {
515533
use spacetimedb_sats::product;
516534
use tempfile::TempDir;
517535

518-
use super::{ChunkPool, InstanceEnv, TxSlot};
519-
520536
/// An `InstanceEnv` requires a `DatabaseLogger`
521537
fn temp_logger() -> Result<DatabaseLogger> {
522538
let temp = TempDir::new()?;
@@ -559,6 +575,7 @@ mod test {
559575
replica_ctx: Arc::new(replica_ctx(db)?),
560576
scheduler,
561577
tx: TxSlot::default(),
578+
start_time: Timestamp::now(),
562579
})
563580
}
564581

crates/core/src/host/scheduler.rs

Lines changed: 117 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ use spacetimedb_primitives::{ColId, TableId};
1212
use spacetimedb_sats::{bsatn::ToBsatn as _, AlgebraicValue};
1313
use spacetimedb_table::table::RowRef;
1414
use tokio::sync::mpsc;
15-
use tokio_util::time::delay_queue::Expired;
16-
use tokio_util::time::{delay_queue, DelayQueue};
15+
use tokio::time::Instant;
16+
use tokio_util::time::delay_queue::{self, DelayQueue, Expired};
1717

1818
use crate::db::datastore::locking_tx_datastore::MutTxId;
1919
use crate::db::datastore::system_tables::{StFields, StScheduledFields, ST_SCHEDULED_ID};
@@ -51,8 +51,17 @@ enum MsgOrExit<T> {
5151
}
5252

5353
enum SchedulerMessage {
54-
Schedule { id: ScheduledReducerId, at: ScheduleAt },
55-
ScheduleImmediate { reducer_name: String, args: ReducerArgs },
54+
Schedule {
55+
id: ScheduledReducerId,
56+
/// The timestamp we'll tell the reducer it is.
57+
effective_at: Timestamp,
58+
/// The actual instant we're scheduling for.
59+
real_at: Instant,
60+
},
61+
ScheduleImmediate {
62+
reducer_name: String,
63+
args: ReducerArgs,
64+
},
5665
}
5766

5867
pub struct ScheduledReducer {
@@ -102,20 +111,22 @@ impl SchedulerStarter {
102111
.table_scheduled_id_and_at(&tx, table_id)?
103112
.ok_or_else(|| anyhow!("scheduled table {table_id} doesn't have valid columns"))?;
104113

114+
let now_ts = Timestamp::now();
115+
let now_instant = Instant::now();
116+
105117
// Insert each entry (row) in the scheduled table into `queue`.
106118
for scheduled_row in self.db.iter(&tx, table_id)? {
107119
let (schedule_id, schedule_at) = get_schedule_from_row(&scheduled_row, id_column, at_column)?;
108120
// calculate duration left to call the scheduled reducer
109-
let duration = schedule_at.to_duration_from_now();
110-
queue.insert(
111-
QueueItem::Id(ScheduledReducerId {
112-
table_id,
113-
schedule_id,
114-
id_column,
115-
at_column,
116-
}),
117-
duration,
118-
);
121+
let duration = schedule_at.to_duration_from(now_ts);
122+
let at = schedule_at.to_timestamp_from(now_ts);
123+
let id = ScheduledReducerId {
124+
table_id,
125+
schedule_id,
126+
id_column,
127+
at_column,
128+
};
129+
queue.insert_at(QueueItem::Id { id, at }, now_instant + duration);
119130
}
120131
}
121132

@@ -158,7 +169,7 @@ impl SchedulerStarter {
158169
/// If `DelayQueue` extends to support a larger range,
159170
/// we may reject some long-delayed schedule calls which could succeed,
160171
/// but we will never permit a schedule attempt which will panic.
161-
const MAX_SCHEDULE_DELAY: std::time::Duration = std::time::Duration::from_millis(
172+
const MAX_SCHEDULE_DELAY: Duration = Duration::from_millis(
162173
// Equal to 64^6 - 1 milliseconds, which is 2.177589 years.
163174
(1 << (6 * 6)) - 1,
164175
);
@@ -173,40 +184,36 @@ pub enum ScheduleError {
173184
}
174185

175186
impl Scheduler {
187+
/// Schedule a reducer to run from a scheduled table.
188+
///
189+
/// `reducer_start` is the timestamp of the start of the current reducer.
176190
pub(super) fn schedule(
177191
&self,
178192
table_id: TableId,
179193
schedule_id: u64,
180194
schedule_at: ScheduleAt,
181195
id_column: ColId,
182196
at_column: ColId,
197+
reducer_start: Timestamp,
183198
) -> Result<(), ScheduleError> {
184-
// Check that `at` is within `tokio_utils::time::DelayQueue`'s accepted time-range.
185-
//
186-
// `DelayQueue` uses a sliding window,
187-
// and there may be some non-zero delay between this check
188-
// and the actual call to `DelayQueue::insert`.
189-
//
190-
// Assuming a monotonic clock,
191-
// this means we may reject some otherwise acceptable schedule calls.
199+
// if `Timestamp::now()` is properly monotonic, use it; otherwise, use
200+
// the start of the reducer run as "now" for purposes of scheduling
201+
let now = reducer_start.max(Timestamp::now());
202+
203+
// Check that `at` is within `tokio_utils::time::DelayQueue`'s
204+
// accepted time-range.
192205
//
193-
// If `Timestamp::now()`, i.e. `std::time::SystemTime::now()`,
194-
// is not monotonic,
195-
// `DelayQueue::insert` may panic.
196-
// This will happen if a module attempts to schedule a reducer
197-
// with a delay just before the two-year limit,
198-
// and the system clock is adjusted backwards
199-
// after the check but before scheduling so that after the adjustment,
200-
// the delay is beyond the two-year limit.
206+
// `DelayQueue` uses a sliding window, and there may be some non-zero
207+
// delay between this check and the actual call to `DelayQueue::insert_at`.
201208
//
202-
// We could avoid this edge case by scheduling in terms of the monotonic `Instant`,
203-
// rather than `SystemTime`,
204-
// but we don't currently have a meaningful way
205-
// to convert a `Timestamp` into an `Instant`.
206-
let delay = schedule_at.to_duration_from_now();
209+
// Assuming a monotonic clock, this means we may reject some otherwise
210+
// acceptable schedule calls.
211+
let delay = schedule_at.to_duration_from(now);
207212
if delay >= MAX_SCHEDULE_DELAY {
208213
return Err(ScheduleError::DelayTooLong(delay));
209214
}
215+
let effective_at = schedule_at.to_timestamp_from(now);
216+
let real_at = Instant::now() + delay;
210217

211218
// if the actor has exited, it's fine to ignore; it means that the host actor calling
212219
// schedule will exit soon as well, and it'll be scheduled to run when the module host restarts
@@ -217,7 +224,8 @@ impl Scheduler {
217224
id_column,
218225
at_column,
219226
},
220-
at: schedule_at,
227+
effective_at,
228+
real_at,
221229
}));
222230

223231
Ok(())
@@ -247,10 +255,13 @@ struct SchedulerActor {
247255
}
248256

249257
enum QueueItem {
250-
Id(ScheduledReducerId),
258+
Id { id: ScheduledReducerId, at: Timestamp },
251259
VolatileNonatomicImmediate { reducer_name: String, args: ReducerArgs },
252260
}
253261

262+
#[cfg(target_pointer_width = "64")]
263+
spacetimedb_table::static_assert_size!(QueueItem, 64);
264+
254265
impl SchedulerActor {
255266
async fn run(mut self) {
256267
loop {
@@ -270,12 +281,16 @@ impl SchedulerActor {
270281

271282
fn handle_message(&mut self, msg: SchedulerMessage) {
272283
match msg {
273-
SchedulerMessage::Schedule { id, at } => {
284+
SchedulerMessage::Schedule {
285+
id,
286+
effective_at,
287+
real_at,
288+
} => {
274289
// Incase of row update, remove the existing entry from queue first
275290
if let Some(key) = self.key_map.get(&id) {
276291
self.queue.remove(key);
277292
}
278-
let key = self.queue.insert(QueueItem::Id(id), at.to_duration_from_now());
293+
let key = self.queue.insert_at(QueueItem::Id { id, at: effective_at }, real_at);
279294
self.key_map.insert(id, key);
280295
}
281296
SchedulerMessage::ScheduleImmediate { reducer_name, args } => {
@@ -290,7 +305,7 @@ impl SchedulerActor {
290305
async fn handle_queued(&mut self, id: Expired<QueueItem>) {
291306
let item = id.into_inner();
292307
let id = match item {
293-
QueueItem::Id(id) => Some(id),
308+
QueueItem::Id { id, .. } => Some(id),
294309
QueueItem::VolatileNonatomicImmediate { .. } => None,
295310
};
296311
if let Some(id) = id {
@@ -304,58 +319,60 @@ impl SchedulerActor {
304319
let caller_identity = module_host.info().database_identity;
305320
let module_info = module_host.info.clone();
306321

307-
let call_reducer_params = move |tx: &MutTxId| -> Result<Option<CallReducerParams>, anyhow::Error> {
308-
let id = match item {
309-
QueueItem::Id(id) => id,
310-
QueueItem::VolatileNonatomicImmediate { reducer_name, args } => {
311-
let (reducer_id, reducer_seed) = module_info
312-
.module_def
313-
.reducer_arg_deserialize_seed(&reducer_name[..])
314-
.ok_or_else(|| anyhow!("Reducer not found: {}", reducer_name))?;
315-
let reducer_args = args.into_tuple(reducer_seed)?;
316-
317-
return Ok(Some(CallReducerParams {
318-
timestamp: Timestamp::now(),
319-
caller_identity,
320-
caller_connection_id: ConnectionId::ZERO,
321-
client: None,
322-
request_id: None,
323-
timer: None,
324-
reducer_id,
325-
args: reducer_args,
326-
}));
327-
}
328-
};
329-
330-
let Ok(schedule_row) = get_schedule_row_mut(tx, &db, id) else {
331-
// if the row is not found, it means the schedule is cancelled by the user
332-
log::debug!(
333-
"table row corresponding to yeild scheduler id not found: tableid {}, schedulerId {}",
334-
id.table_id,
335-
id.schedule_id
336-
);
337-
return Ok(None);
338-
};
339-
340-
let ScheduledReducer { reducer, bsatn_args } = proccess_schedule(tx, &db, id.table_id, &schedule_row)?;
341-
342-
let (reducer_id, reducer_seed) = module_info
343-
.module_def
344-
.reducer_arg_deserialize_seed(&reducer[..])
345-
.ok_or_else(|| anyhow!("Reducer not found: {}", reducer))?;
346-
347-
let reducer_args = ReducerArgs::Bsatn(bsatn_args.into()).into_tuple(reducer_seed)?;
348-
349-
Ok(Some(CallReducerParams {
350-
timestamp: Timestamp::now(),
351-
caller_identity,
352-
caller_connection_id: ConnectionId::ZERO,
353-
client: None,
354-
request_id: None,
355-
timer: None,
356-
reducer_id,
357-
args: reducer_args,
358-
}))
322+
let call_reducer_params = move |tx: &MutTxId| match item {
323+
QueueItem::Id { id, at } => {
324+
let Ok(schedule_row) = get_schedule_row_mut(tx, &db, id) else {
325+
// if the row is not found, it means the schedule is cancelled by the user
326+
log::debug!(
327+
"table row corresponding to yeild scheduler id not found: tableid {}, schedulerId {}",
328+
id.table_id,
329+
id.schedule_id
330+
);
331+
return Ok(None);
332+
};
333+
334+
let ScheduledReducer { reducer, bsatn_args } = proccess_schedule(tx, &db, id.table_id, &schedule_row)?;
335+
336+
let (reducer_id, reducer_seed) = module_info
337+
.module_def
338+
.reducer_arg_deserialize_seed(&reducer[..])
339+
.ok_or_else(|| anyhow!("Reducer not found: {}", reducer))?;
340+
341+
let reducer_args = ReducerArgs::Bsatn(bsatn_args.into()).into_tuple(reducer_seed)?;
342+
343+
// the timestamp we tell the reducer it's running at will be
344+
// at least the timestamp it was scheduled to run at.
345+
let timestamp = at.max(Timestamp::now());
346+
347+
Ok(Some(CallReducerParams {
348+
timestamp,
349+
caller_identity,
350+
caller_connection_id: ConnectionId::ZERO,
351+
client: None,
352+
request_id: None,
353+
timer: None,
354+
reducer_id,
355+
args: reducer_args,
356+
}))
357+
}
358+
QueueItem::VolatileNonatomicImmediate { reducer_name, args } => {
359+
let (reducer_id, reducer_seed) = module_info
360+
.module_def
361+
.reducer_arg_deserialize_seed(&reducer_name[..])
362+
.ok_or_else(|| anyhow!("Reducer not found: {}", reducer_name))?;
363+
let reducer_args = args.into_tuple(reducer_seed)?;
364+
365+
Ok(Some(CallReducerParams {
366+
timestamp: Timestamp::now(),
367+
caller_identity,
368+
caller_connection_id: ConnectionId::ZERO,
369+
client: None,
370+
request_id: None,
371+
timer: None,
372+
reducer_id,
373+
args: reducer_args,
374+
}))
375+
}
359376
};
360377

361378
let db = module_host.replica_ctx().relational_db.clone();
@@ -391,9 +408,13 @@ impl SchedulerActor {
391408
let schedule_at = read_schedule_at(schedule_row, id.at_column)?;
392409

393410
if let ScheduleAt::Interval(dur) = schedule_at {
394-
let key = self
395-
.queue
396-
.insert(QueueItem::Id(id), dur.to_duration().unwrap_or(Duration::ZERO));
411+
let key = self.queue.insert(
412+
QueueItem::Id {
413+
id,
414+
at: Timestamp::now() + dur,
415+
},
416+
dur.to_duration().unwrap_or(Duration::ZERO),
417+
);
397418
self.key_map.insert(id, key);
398419
Ok(true)
399420
} else {

0 commit comments

Comments
 (0)