-
Notifications
You must be signed in to change notification settings - Fork 35
/
Copy pathpromises.ml
164 lines (141 loc) · 3.78 KB
/
promises.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
open Effect
open Effect.Deep
module type Applicative = sig
type 'a t
val pure : 'a -> 'a t
val ( <*> ) : ('a -> 'b) t -> 'a t -> 'b t
end
module type Promise = sig
include Applicative
val fork : (unit -> 'a) -> 'a t
val get : 'a t -> ('a, exn) result
val get_val : 'a t -> 'a
val run : (unit -> 'a) -> ('a, exn) result
end
module Promise : Promise = struct
type cont = Cont : (unit, 'b) continuation -> cont
type tvar = cont option ref
let mk_tvar k = ref (Some (Cont k))
type 'a status = Done of 'a | Cancelled of exn | Waiting of tvar list
type 'a t = 'a status ref
type _ eff +=
| Fork : (unit -> 'a) -> 'a t eff
| Wait : 'a t -> unit eff
let fork f = perform (Fork f)
let enqueue run_q k v = Queue.push (fun () -> ignore @@ continue k v) run_q
let dequeue run_q = if Queue.is_empty run_q then () else (Queue.pop run_q) ()
let mk_status () = ref (Waiting [])
let finish run_q sr v =
match !sr with
| Waiting l ->
sr := Done v;
List.iter
(fun tv ->
match !tv with
| None -> ()
| Some (Cont k) ->
tv := None;
enqueue run_q k ())
l
| _ -> failwith "Impossible: finish"
let abort run_q sr e =
match !sr with
| Waiting l ->
sr := Cancelled e;
List.iter
(fun tv ->
match !tv with
| None -> ()
| Some (Cont k) ->
tv := None;
enqueue run_q k ())
l
| _ -> failwith "Impossible: abort"
let wait sr k =
match !sr with
| Waiting l -> sr := Waiting (mk_tvar k :: l)
| _ -> failwith "Impossible: wait"
let rec get sr =
match !sr with
| Done v -> Ok v
| Cancelled e -> Error e
| Waiting _ ->
perform (Wait sr);
get sr
let rec get_val sr =
match !sr with
| Done v -> v
| Cancelled e -> raise e
| Waiting _ ->
perform (Wait sr);
get_val sr
let pure v = ref (Done v)
let rec ( <*> ) f g =
match (!f, !g) with
| (Cancelled _ as x), _ -> ref x
| _, (Cancelled _ as x) -> ref x
| Waiting _, _ -> (
perform (Wait f);
match get f with
| Ok f -> ref (Done f) <*> g
| Error e -> ref (Cancelled e))
| Done f, Done g -> ref (Done (f g))
| Done f, Waiting _ -> (
perform (Wait g);
match get g with
| Ok g -> ref (Done (f g))
| Error e -> ref (Cancelled e))
let run main =
let run_q = Queue.create () in
let rec spawn : 'a. 'a status ref -> (unit -> 'a) -> unit =
fun sr f ->
match f () with
| v -> finish run_q sr v; dequeue run_q
| exception e -> abort run_q sr e; dequeue run_q
| effect (Wait sr), k -> wait sr k; dequeue run_q
| effect (Fork f), k ->
let sr = mk_status () in
enqueue run_q k sr; spawn sr f
in
let sr = mk_status () in
spawn sr main;
get sr
end
open Promise
open Printf
let test1 () =
let x = pure 10 in
let y = pure 20 in
let z = pure ( + ) <*> x <*> y in
get_val z
let _ =
match run test1 with
| Ok v -> Printf.printf "test1: %d\n" v
| Error e -> Printf.printf "test2: error: %s\n" @@ Printexc.to_string e
let test2 () =
let x =
fork (fun () ->
printf "test2: x\n%!";
10)
in
let y =
fork (fun () ->
printf "test2: y\n%!";
raise Exit)
in
let z =
fork (fun () ->
printf "test2: z\n%!";
20)
in
let add3 x y z =
let _ = printf "test2: add %d %d %d\n" x y z in
x + y + z
in
let r = pure add3 <*> x <*> y <*> z in
get_val r
let _ =
match run test2 with
| Ok v -> Printf.printf "test2: %d\n" v
| Error e -> Printf.printf "test2: error: %s\n" @@ Printexc.to_string e
let _ = print_endline "SUCCESS"