-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add Timer, make Seconds a Fixed E9, haddock work
- Loading branch information
1 parent
f6b97fd
commit e6490ea
Showing
5 changed files
with
119 additions
and
94 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,21 +1,29 @@ | ||
module TimerWheel | ||
( -- * Timer wheel | ||
TimerWheel, | ||
|
||
-- * Timer wheel configuration | ||
Config (..), | ||
Seconds, | ||
|
||
-- ** Constructing a timer wheel | ||
-- * Timer | ||
Timer, | ||
|
||
-- * Constructing a timer wheel | ||
create, | ||
with, | ||
|
||
-- ** Querying a timer wheel | ||
-- * Querying a timer wheel | ||
count, | ||
|
||
-- ** Registering timers in a timer wheel | ||
-- * Registering timers in a timer wheel | ||
register, | ||
register_, | ||
recurring, | ||
recurring_, | ||
|
||
-- * Canceling timers | ||
cancel, | ||
) | ||
where | ||
|
||
|
@@ -35,61 +43,48 @@ import qualified TimerWheel.Internal.Timestamp as Timestamp | |
import TimerWheel.Internal.TimestampMap (TimestampMap) | ||
import qualified TimerWheel.Internal.TimestampMap as TimestampMap | ||
|
||
-- | A timer wheel is a vector-of-collections-of timers to fire. It is configured with a /spoke count/ and /resolution/. | ||
-- Timers may be scheduled arbitrarily far in the future. A timeout thread is spawned to step through the timer wheel | ||
-- and fire expired timers at regular intervals. | ||
-- | ||
-- * The /spoke count/ determines the size of the timer vector. | ||
-- | ||
-- * A __larger spoke count__ will result in __less insert contention__ at each spoke and will require | ||
-- __more memory__ to store the timer wheel. | ||
-- | A timer wheel is a vector-of-collections-of timers to fire. Timers may be one-shot or recurring, and may be | ||
-- scheduled arbitrarily far in the future. | ||
-- | ||
-- * A __smaller spoke count__ will result in __more insert contention__ at each spoke and will require | ||
-- __less memory__ to store the timer wheel. | ||
-- A timer wheel is configured with a /spoke count/ and /resolution/: | ||
-- | ||
-- * The /resolution/ determines both the duration of time that each spoke corresponds to, and how often the timeout | ||
-- thread wakes. For example, with a resolution of __@1s@__, a timer that expires at [email protected]@__ will not fire until | ||
-- the timeout thread wakes at __@3s@__. | ||
-- * The /spoke count/ determines the size of the timer vector. | ||
-- | ||
-- * A __larger resolution__ will result in __more insert contention__ at each spoke, __less accurate__ timers, and | ||
-- will require __fewer wakeups__ by the timeout thread. | ||
-- A __larger spoke count__ will require __more memory__, but will result in __less insert contention__. | ||
-- | ||
-- * A __smaller resolution__ will result in __less insert contention__ at each spoke, __more accurate__ timers, and | ||
-- will require __more wakeups__ by the timeout thread. | ||
-- * The /resolution/ determines the duration of time that each spoke corresponds to, and thus how often timers are | ||
-- checked for expiry. | ||
-- | ||
-- * The timeout thread has some important properties: | ||
-- For example, in a timer wheel with a /resolution/ of __@1 second@__, a timer that is scheduled to fire at | ||
-- [email protected] o'clock@__ will end up firing around [email protected] o'clock@__ instead (that is, on the | ||
-- __@1 second@__-boundary). | ||
-- | ||
-- * There is only one, and it fires expired timers synchronously. If your timer actions execute quicky, 'register' | ||
-- them directly. Otherwise, consider registering an action that enqueues the /real/ action to be performed on a | ||
-- job queue. | ||
-- A __larger resolution__ will result in __more insert contention__ and __less accurate timers__, but will require | ||
-- __fewer wakeups__ by the timeout thread. | ||
-- | ||
-- * Synchronous exceptions thrown by enqueued @IO@ actions will bring the thread down, and the exception will be | ||
-- propagated to the thread that created the timer wheel. If you want to catch exceptions and log them, for | ||
-- example, you will have to bake this into the registered actions yourself. | ||
-- The timeout thread has some important properties: | ||
-- | ||
-- As an example, below is a depiction of a timer wheel with @6@ timers inserted across @8@ spokes, and a resolution of | ||
-- @.1s@. It depicts a cursor at @.3s@, which indicates where the timeout thread currently is. | ||
-- * There is only one, and it fires expired timers synchronously. If your timer actions execute quicky, you can | ||
-- 'register' them directly. Otherwise, consider registering an action that enqueues the real action to be | ||
-- performed on a job queue. | ||
-- | ||
-- @ | ||
-- 0 .1 .2 .3 .4 .5 .6 .7 | ||
-- ┌───────┬───────┬───────┬───────┬───────┬───────┬───────┬───────┐ | ||
-- │ │ A⁰ │ │ B¹ C⁰ │ D⁰ │ │ │ E² F⁰ │ | ||
-- └───────┴───────┴───────┴───────┴───────┴───────┴───────┴───────┘ | ||
-- ↑ | ||
-- @ | ||
-- * A synchronous exception thrown by a registered timer will bring the timeout thread down, and the exception will | ||
-- be propagated to the thread that created the timer wheel. If you want to log and ignore exceptions, for example, | ||
-- you will have to bake this into the registered actions yourself. | ||
-- | ||
-- After @.1s@, the timeout thread will advance to the next spoke and process all of the timers it passed over. In | ||
-- this case, __C__ will fire, and __B__ will be put back with its count decremented to @0@. This is how the timer wheel | ||
-- can schedule a timer to fire arbitrarily far in the future: its count is simply the number of times its delay wraps | ||
-- the entire duration of the timer wheel. | ||
-- __API summary__ | ||
-- | ||
-- @ | ||
-- 0 .1 .2 .3 .4 .5 .6 .7 | ||
-- ┌───────┬───────┬───────┬───────┬───────┬───────┬───────┬───────┐ | ||
-- │ │ A⁰ │ │ B⁰ │ D⁰ │ │ │ E² F⁰ │ | ||
-- └───────┴───────┴───────┴───────┴───────┴───────┴───────┴───────┘ | ||
-- ↑ | ||
-- @ | ||
-- +----------+---------+----------------+ | ||
-- | Create | Query | Modify | | ||
-- +==========+=========+================+ | ||
-- | 'create' | 'count' | 'register' | | ||
-- +----------+---------+----------------+ | ||
-- | 'with' | | 'register_' | | ||
-- +----------+ +----------------+ | ||
-- | | | 'recurring' | | ||
-- | | +----------------+ | ||
-- | | | 'recurring_' | | ||
-- +----------+---------+----------------+ | ||
data TimerWheel = TimerWheel | ||
{ buckets :: {-# UNPACK #-} !(MutableArray RealWorld TimerBucket), | ||
resolution :: {-# UNPACK #-} !Nanoseconds, | ||
|
@@ -98,10 +93,18 @@ data TimerWheel = TimerWheel | |
timerIdSupply :: {-# UNPACK #-} !Counter | ||
} | ||
|
||
-- | Timer wheel config. | ||
-- | A timer wheel config. | ||
-- | ||
-- * @spokes@ must be ∈ @[1, maxBound]@, and is set to @1024@ if invalid. | ||
-- * @resolution@ must be ∈ @(0, ∞]@, and is set to @1@ if invalid. | ||
-- | ||
-- __API summary__ | ||
-- | ||
-- +----------+ | ||
-- | Create | | ||
-- +==========+ | ||
-- | 'Config' | | ||
-- +----------+ | ||
data Config = Config | ||
{ -- | Spoke count | ||
spokes :: {-# UNPACK #-} !Int, | ||
|
@@ -141,20 +144,26 @@ with config action = | |
wheel <- create scope config | ||
action wheel | ||
|
||
-- | @register wheel delay action@ registers an action __@action@__ in timer wheel __@wheel@__ to fire after __@delay@__ | ||
-- seconds. | ||
-- | Get the number of timers in a timer wheel. | ||
-- | ||
-- Returns an action that attempts to cancel the timer, and returns whether or not it was successful (@False@ means the | ||
-- timer has already fired, or has already been cancelled). | ||
-- /O(1)/. | ||
count :: TimerWheel -> IO Int | ||
count TimerWheel {numTimers} = | ||
readCounter numTimers | ||
|
||
-- | @register wheel delay action@ registers __@action@__ in __@wheel@__ to fire after __@delay@__ seconds. | ||
-- | ||
-- When canceled, the timer returns whether or not the cancelation was successful; @False@ means the timer had either | ||
-- already fired, or had already been canceled. | ||
register :: | ||
-- | The timer wheel | ||
TimerWheel -> | ||
-- | The delay before the action is fired | ||
Seconds -> | ||
-- | The action to fire | ||
IO () -> | ||
-- | An action that attempts to cancel the timer | ||
IO (IO Bool) | ||
-- | The timer | ||
IO (Timer Bool) | ||
register TimerWheel {buckets, numTimers, resolution, timerIdSupply} delay action = do | ||
now <- Timestamp.now | ||
let timestamp = now `Timestamp.plus` Nanoseconds.fromSeconds delay | ||
|
@@ -163,11 +172,12 @@ register TimerWheel {buckets, numTimers, resolution, timerIdSupply} delay action | |
mask_ do | ||
atomicModifyArray buckets index (timerBucketInsert timestamp (OneShot timerId action)) | ||
incrCounter_ numTimers | ||
pure do | ||
mask_ do | ||
deleted <- atomicMaybeModifyArray buckets index (timerBucketDelete timestamp timerId) | ||
when deleted (decrCounter_ numTimers) | ||
pure deleted | ||
coerce @(IO (IO Bool)) @(IO (Timer Bool)) do | ||
pure do | ||
mask_ do | ||
deleted <- atomicMaybeModifyArray buckets index (timerBucketDelete timestamp timerId) | ||
when deleted (decrCounter_ numTimers) | ||
pure deleted | ||
|
||
-- | Like 'register', but for when you don't intend to cancel the timer. | ||
register_ :: | ||
|
@@ -181,8 +191,7 @@ register_ :: | |
register_ wheel delay action = | ||
void (register wheel delay action) | ||
|
||
-- | @recurring wheel action delay@ registers an action __@action@__ in timer wheel __@wheel@__ to fire every | ||
-- __@delay@__ seconds. | ||
-- | @recurring wheel action delay@ registers __@action@__ in __@wheel@__ to fire every __@delay@__ seconds. | ||
-- | ||
-- Returns an action that cancels the recurring timer. | ||
recurring :: | ||
|
@@ -192,8 +201,8 @@ recurring :: | |
Seconds -> | ||
-- | The action to fire repeatedly | ||
IO () -> | ||
-- | An action that cancels the recurring timer | ||
IO (IO ()) | ||
-- | The timer | ||
IO (Timer ()) | ||
recurring TimerWheel {buckets, numTimers, resolution, timerIdSupply} (Nanoseconds.fromSeconds -> delay) action = do | ||
now <- Timestamp.now | ||
let timestamp = now `Timestamp.plus` delay | ||
|
@@ -203,12 +212,13 @@ recurring TimerWheel {buckets, numTimers, resolution, timerIdSupply} (Nanosecond | |
mask_ do | ||
atomicModifyArray buckets index (timerBucketInsert timestamp (Recurring timerId action delay canceledRef)) | ||
incrCounter_ numTimers | ||
pure do | ||
mask_ do | ||
writeIORef canceledRef True | ||
decrCounter_ numTimers | ||
coerce @(IO (IO ())) @(IO (Timer ())) do | ||
pure do | ||
mask_ do | ||
writeIORef canceledRef True | ||
decrCounter_ numTimers | ||
|
||
-- | Like 'recurring', but for when you don't intend to cancel the timer. | ||
-- | Like 'recurring', but for when you don't intend to 'cancel' the timer. | ||
recurring_ :: | ||
TimerWheel -> | ||
-- | The delay before each action is fired | ||
|
@@ -225,12 +235,27 @@ recurring_ TimerWheel {buckets, numTimers, resolution, timerIdSupply} (Nanosecon | |
atomicModifyArray buckets index (timerBucketInsert timestamp (Recurring_ timerId action delay)) | ||
incrCounter_ numTimers | ||
|
||
-- | Get the number of timers in a timer wheel. | ||
-- | ||
-- /O(1)/. | ||
count :: TimerWheel -> IO Int | ||
count TimerWheel {numTimers} = | ||
readCounter numTimers | ||
-- | A registered timer, parameterized by the result of attempting to cancel it: | ||
-- | ||
-- * A one-shot timer may only be canceled if it has not already fired. | ||
-- * A recurring timer can always be canceled. | ||
-- | ||
-- __API summary__ | ||
-- | ||
-- +-------------+----------+ | ||
-- | Create | Modify | | ||
-- +=============+==========+ | ||
-- | 'register' | 'cancel' | | ||
-- +-------------+----------+ | ||
-- | 'recurring' | | | ||
-- +-------------+----------+ | ||
newtype Timer a | ||
= Timer (IO a) | ||
|
||
-- | Cancel a timer. | ||
cancel :: Timer a -> IO a | ||
cancel = | ||
coerce | ||
|
||
-- `timestampToIndex buckets resolution timestamp` figures out which index `timestamp` corresponds to in `buckets`, | ||
-- where each bucket corresponds to `resolution` nanoseconds. | ||
|
@@ -280,34 +305,34 @@ timerBucketDelete timestamp timerId bucket = | |
_ -> TimersN timers1 | ||
in Just $! TimestampMap.insert timestamp timers2 bucket | ||
|
||
timerBucketInsert :: Timestamp -> Timer -> TimerBucket -> TimerBucket | ||
timerBucketInsert :: Timestamp -> Timer0 -> TimerBucket -> TimerBucket | ||
timerBucketInsert timestamp timer = | ||
TimestampMap.upsert timestamp (Timers1 timer) \case | ||
Timers1 old -> TimersN [timer, old] | ||
TimersN old -> TimersN (timer : old) | ||
|
||
data Timers | ||
= Timers1 !Timer | ||
= Timers1 !Timer0 | ||
| -- 2+ timers, stored in the reverse order that they were enqueued (so the last should fire first) | ||
TimersN ![Timer] | ||
TimersN ![Timer0] | ||
|
||
timersDelete :: TimerId -> [Timer] -> Maybe [Timer] | ||
timersDelete :: TimerId -> [Timer0] -> Maybe [Timer0] | ||
timersDelete timerId = | ||
go | ||
where | ||
go :: [Timer] -> Maybe [Timer] | ||
go :: [Timer0] -> Maybe [Timer0] | ||
go = \case | ||
[] -> Nothing | ||
timer : timers | ||
| timerId == getTimerId timer -> Just timers | ||
| otherwise -> (timer :) <$> go timers | ||
|
||
data Timer | ||
data Timer0 | ||
= OneShot !TimerId !(IO ()) | ||
| Recurring !TimerId !(IO ()) !Nanoseconds !(IORef Bool) | ||
| Recurring_ !TimerId !(IO ()) !Nanoseconds | ||
|
||
getTimerId :: Timer -> TimerId | ||
getTimerId :: Timer0 -> TimerId | ||
getTimerId = \case | ||
OneShot timerId _ -> timerId | ||
Recurring timerId _ _ _ -> timerId | ||
|
@@ -497,16 +522,16 @@ runTimerReaperThread buckets numTimers resolution = do | |
bucket2 <- fireTimers bucket1 timestamp timers1 | ||
fireTimerBucket bucket2 | ||
|
||
fireTimers :: TimerBucket -> Timestamp -> [Timer] -> IO TimerBucket | ||
fireTimers :: TimerBucket -> Timestamp -> [Timer0] -> IO TimerBucket | ||
fireTimers bucket timestamp = | ||
foldr step (pure bucket) | ||
where | ||
step :: Timer -> IO TimerBucket -> IO TimerBucket | ||
step :: Timer0 -> IO TimerBucket -> IO TimerBucket | ||
step timer earlier = do | ||
expired1 <- earlier | ||
fireTimer expired1 timestamp timer | ||
|
||
fireTimer :: TimerBucket -> Timestamp -> Timer -> IO TimerBucket | ||
fireTimer :: TimerBucket -> Timestamp -> Timer0 -> IO TimerBucket | ||
fireTimer bucket timestamp timer = | ||
case timer of | ||
OneShot _ action -> do | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,12 +7,11 @@ category: Data | |
description: | ||
This library provides a timer wheel data structure for | ||
. | ||
* (Almost) /O(1)/ registering @IO@ actions to fire after a given amount of time | ||
* /O(1)/ canceling registered actions | ||
. | ||
It is similar to @TimerManager@ from @GHC.Event@, but supports recurring | ||
actions, and can scale to handle many more registered actions. | ||
* Registering one-shot or recurring @IO@ actions to fire after a given amount of time | ||
* Canceling registered @IO@ actions | ||
. | ||
It is similar to @TimerManager@ from @GHC.Event@, but supports recurring actions, and can scale to handle many more | ||
registered actions. | ||
synopsis: A timer wheel | ||
author: Mitchell Rosen | ||
maintainer: Mitchell Rosen <[email protected]>, Travis Staton <[email protected]> | ||
|