-
Notifications
You must be signed in to change notification settings - Fork 0
/
Haxl.hs
293 lines (215 loc) · 6.89 KB
/
Haxl.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
--
-- | A toy implementation of Haxl to illustrate the internals. It supports
-- overlapping I/O only, there is no support for:
--
-- * batching
-- * caching
-- * memoization
-- * exceptions
-- * user data
--
{-# OPTIONS_GHC -foptimal-applicative-do #-}
{-# LANGUAGE ApplicativeDo #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE GADTs #-}
module Haxl where
import Data.IORef
import Control.Concurrent.STM
import Control.Concurrent
import Control.Monad
import Text.Printf
import Unsafe.Coerce
import System.IO
import Data.Coerce
-- -----------------------------------------------------------------------------
-- Types
-- | A synchronisation point
newtype Sync a = Sync (IORef (SyncContents a))
data SyncContents a
= SyncFull a
| SyncEmpty [WaitingFor a]
-- | A computation waiting for an 'a', that delivers its result 'b'
-- to another 'Sync'
data WaitingFor a where
WaitingFor :: (a -> Haxl b) -> Sync b -> WaitingFor a
-- | The scheduler state
data SchedState =
SchedState
{ completions :: TVar [Complete]
}
-- | A 'Sync' and its value
data Complete where
Complete :: Sync a -> a -> Complete
data Result a where
Done :: a -> Result a
Blocked :: Sync b -> (b -> Haxl a) -> Result a
newtype Haxl a = Haxl { unHaxl :: SchedState -> IO (Result a) }
-- -----------------------------------------------------------------------------
-- Synchronisation points
-- | Create a new Sync point
newSync :: IO (Sync a)
newSync = Sync <$> newIORef (SyncEmpty [])
-- | Wait for a Sync to be full
getSync :: Sync a -> Haxl a
getSync (Sync ref) = Haxl $ \_sched -> do
e <- readIORef ref
case e of
SyncFull a -> return (Done a)
SyncEmpty _ -> return (Blocked (Sync ref) return)
-- | Block a computation on a Sync
blockOn :: Sync a -> WaitingFor a -> IO ()
blockOn (Sync ref) waitingFor =
modifyIORef' ref $ \contents ->
case contents of
SyncEmpty list -> SyncEmpty (waitingFor : list)
_ -> error "blockOn"
-- -----------------------------------------------------------------------------
-- Monad / Applicative instances
instance Functor Haxl where
fmap f m = m >>= return . f
instance Monad Haxl where
return a = Haxl $ \_ -> return (Done a)
Haxl m >>= k = Haxl $ \sched -> do
r <- m sched
case r of
Done a -> unHaxl (k a) sched
Blocked sync cont -> return (Blocked sync (\b -> cont b >>= k))
(>>) = (*>)
instance Applicative Haxl where
pure = return
Haxl fio <*> Haxl aio = Haxl $ \sched -> do
rf <- fio sched
ra <- aio sched
case (rf, ra) of
(Done f, Done a) ->
return (Done (f a))
(Done f, Blocked sync a_cont) ->
return (Blocked sync (\b -> f <$> a_cont b))
(Blocked f_sync f_cont, Done a) ->
return (Blocked f_sync (\b -> f_cont b <*> return a))
(Blocked f_sync f_cont, Blocked a_sync a_cont) -> do
sync <- newSync
blockOn f_sync (WaitingFor f_cont sync)
let
cont b = do
a <- a_cont b
f <- getSync sync
return (f a)
return (Blocked a_sync cont)
-- -----------------------------------------------------------------------------
-- The scheduler
data Ready = forall a . Ready (Haxl a) (Sync a)
unblock :: WaitingFor a -> a -> Ready
unblock (WaitingFor fn sync) a = Ready (fn a) sync
runHaxl :: forall a . Haxl a -> IO a
runHaxl haxl = do
Sync result <- newSync -- where to put the result
let
schedule :: SchedState -> [Ready] -> IO a
schedule sched (Ready (Haxl io) sync : ready) = do
r <- io sched
case r of
Done a -> putSync sched sync a
Blocked sync1 cont -> do
blockOn sync1 (WaitingFor cont sync)
schedule sched ready
schedule sched [] = do
Complete sync val <- atomically $ do
comps <- readTVar (completions sched)
case comps of
[] -> retry
(c:cs) -> do
writeTVar (completions sched) cs
return c
putSync sched sync val
putSync :: SchedState -> Sync b -> b -> IO a
putSync sched (Sync ref) val = do
contents <- readIORef ref
case contents of
SyncFull _ -> error "double put"
SyncEmpty waiting -> do
writeIORef ref (SyncFull val)
case sameIORef ref result of
Just fn -> return (case fn (SyncFull val) of SyncFull a -> a)
Nothing -> schedule sched (map (`unblock` val) waiting)
completions <- newTVarIO []
schedule (SchedState completions) [Ready haxl (Sync result)]
data Same a b where
Same :: Coercible a b => Same a b
sameIORef :: IORef a -> IORef b -> Maybe (a -> b)
sameIORef ref1 ref2
| ref1 == unsafeCoerce ref2 = Just unsafeCoerce
| otherwise = Nothing
-- -----------------------------------------------------------------------------
-- Perform some I/O
overlapIO :: IO a -> Haxl a
overlapIO io =
Haxl $ \SchedState{..} -> do
Sync ref <- newSync
forkIO $ do
a <- io
atomically $ do
cs <- readTVar completions
writeTVar completions (Complete (Sync ref) a : cs)
return (Blocked (Sync ref) return)
-- -----------------------------------------------------------------------------
-- Examples
-- | I/O that takes one second
test :: Char -> Haxl Char
test c = overlapIO $ do
printf "%c:start\n" c
threadDelay 1000000
printf "%c:end\n" c
return c
ex1 = runHaxl $ sequence [ test n | n <- "abc" ]
ex2 = runHaxl $ annotate 2 $
sequence [ test 'a' >>= \_ -> test 'b'
, test 'c' >>= \_ -> test 'd' ]
-- Test ApplicativeDo
ex3 = runHaxl $ annotate 3 $ do
a <- test 'a'
b <- test (const 'b' a)
c <- test 'c'
d <- test (const 'd' c)
e <- test (const 'e' (b,d))
return d
softwareUpdate = runHaxl $ annotate 3 $ do
latest <- getLatestVersion
hosts <- getHosts
installed <- mapM getInstalledVersion hosts
let updates = [ h | (h,v) <- zip hosts installed, v < latest ]
mapM_ (updateTo latest) updates
getLatestVersion :: Haxl Int
getLatestVersion = overlapIO $ do
putStrLn "getLatestVersion:start"
threadDelay 2000000
putStrLn "getLatestVersion:done"
return 3
getHosts :: Haxl [String]
getHosts = overlapIO $ do
putStrLn "getHosts:start"
threadDelay 1000000
putStrLn "getHosts:done"
return ["host1", "host2", "host3"]
getInstalledVersion :: String -> Haxl Int
getInstalledVersion h = overlapIO $ do
putStrLn "getInstalledVersion:start"
threadDelay 1000000
putStrLn "getInstalledVersion:done"
return (read (drop 4 h))
updateTo :: Int -> String -> Haxl ()
updateTo v h = overlapIO $ do
putStrLn "updateTo:start"
threadDelay 1000000
putStrLn "updateTo:done"
return ()
annotate :: Int -> Haxl a -> Haxl a
annotate n haxl = separators *> (overlapIO (threadDelay 500000) >>= \_ -> haxl)
where
separators = overlapIO $ do
forM_ [1..n] $ \m -> do
threadDelay 1000000
printf "\n%d -----\n" m