-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.hs
128 lines (102 loc) · 5.62 KB
/
main.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
module Rx where
import Control.Exception
import Control.Monad
import Data.IORef
import Data.Maybe
-------------------- TYPES ----------------------
newtype Observable a = Observable { onSubscribe :: Observer a -> IO () }
data Observer a = Observer { onNext :: a -> IO (),
onError :: SomeException -> IO (),
onCompleted :: IO (),
addSubscription :: IO () -> IO (),
subscription :: IO Subscription }
type Subscription = IORef (Maybe [IO ()])
---------------- INSTANCE DECLARATIONS -----------------
instance Functor Observable where
fmap f = lift (coMap f)
where
coMap :: (a -> b) -> Observer b -> Observer a
coMap f ob = createObserver (onNext ob . f)
(onError ob)
(onCompleted ob)
instance Applicative Observable where
pure a = Observable (\obr -> onNext obr a >> onCompleted obr)
f <*> a = rxCombineLatestWith ($) f a
instance Monad Observable where
o >>= f = rxMerge $ fmap f o
--------------- GENERAL FUNCTIONS -----------------
createObserver :: (a -> IO ()) -> (SomeException -> IO ()) -> IO () -> Observer a
createObserver onNext onError onCompleted = Observer onNext onError onCompleted addSubscription subscription
where
subscription = newIORef $ Just []
addSubscription action = do s <- subscription
actions <- readIORef s
when (isJust actions) . writeIORef s $ fmap (action :) actions
subscribe :: Observable a -> Observer a -> IO Subscription
subscribe obl obr = onSubscribe obl obr >> subscription obr
unsubscribe :: Subscription -> IO ()
unsubscribe s = do subs <- readIORef s
when (isJust subs) $ sequence_ (fromJust subs) >> writeIORef s Nothing
isUnsubscribed :: Subscription -> IO Bool
isUnsubscribed s = isNothing <$> readIORef s
-- review this code!
-- perhaps just write a (++) for Subscription?
mergeSubscription :: Observer a -> Subscription -> IO ()
mergeSubscription o s = do subs <- readIORef s
forM_ subs $ mapM_ (addSubscription o)
-------------------- OPERATORS --------------------
-- does the subscription get properly propagated here?
lift :: (Observer b -> Observer a) -> Observable a -> Observable b
lift f oa = Observable (onSubscribe oa . f)
-- TODO: `onCompleted obr` is never called now
rxMerge :: Observable (Observable a) -> Observable a
rxMerge oas = Observable (\obr ->
let onnext oa = do innerSub <- subscribe oa $ createObserver (onNext obr) (onError obr) (return ())
mergeSubscription obr innerSub
onerror = onError obr
oncompleted = return ()
in do outerSub <- subscribe oas $ createObserver onnext onerror oncompleted
mergeSubscription obr outerSub)
rxCombineLatestWith :: (a -> b -> c) -> Observable a -> Observable b -> Observable c
rxCombineLatestWith f oa ob = uncurry f <$> rxCombineLatest oa ob
rxCombineLatest :: Observable a -> Observable b -> Observable (a,b)
rxCombineLatest oa ob = Observable (\obr -> do refA <- newIORef Nothing
refB <- newIORef Nothing
doneA <- newIORef False
doneB <- newIORef False
sa <- subscribe oa $ createObserver (onNextA refA refB obr) (onError obr) (handleOnCompleted doneB doneA obr)
mergeSubscription obr sa
sb <- subscribe ob $ createObserver (onNextB refA refB obr) (onError obr) (handleOnCompleted doneA doneB obr)
mergeSubscription obr sb)
where
handleOnCompleted readRef writeRef obr = do done <- readIORef readRef
if done then onCompleted obr else writeIORef writeRef True
combine readRef writeRef val = writeIORef writeRef (Just val) >> readIORef readRef
onNextA refA refB obr a = do maybeB <- combine refB refA a
when (isJust maybeB) $ onNext obr (a, fromJust maybeB)
onNextB refA refB obr b = do maybeA <- combine refA refB b
when (isJust maybeA) $ onNext obr (fromJust maybeA, b)
-- or just use mfilter (MonadPlus)?
rxFilter :: (a -> Bool) -> Observable a -> Observable a
rxFilter p = lift (coFilter p)
where
coFilter :: (a -> Bool) -> Observer a -> Observer a
coFilter p oa = createObserver (\a -> when (p a) $ onNext oa a)
(onError oa)
(onCompleted oa)
------------------------------ TEST -------------------------------
stream :: Observable Integer
stream = Observable (\observer -> do onNext observer 1
onNext observer 2
onNext observer 3
onCompleted observer)
printObserver :: Show a => String -> Observer a
printObserver doneMsg = createObserver print print (print doneMsg)
main :: IO ()
main = do putStrLn "hello friend"
subscribe stream $ printObserver "done 1"
subscribe (fmap (*1337) stream) $ printObserver "done 2"
subscribe (rxFilter (> 1) stream) $ printObserver "done 3"
subscribe (pure (\x -> "Transformed " ++ show x) <*> stream) $ printObserver "done 4"
subscribe (stream >>= (\x -> Observable (\obr -> onNext obr x >> onNext obr x >> onCompleted obr))) $ printObserver "done 5"
return ()