Skip to content

Commit

Permalink
Add basic structured concurrency library
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Apr 26, 2024
1 parent 534944e commit 98761fd
Show file tree
Hide file tree
Showing 38 changed files with 1,032 additions and 340 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ small library,
that defines the essential interoperability framework. The rest of the libraries
are either sample schedulers (e.g.
[`picos.fifos`](https://ocaml-multicore.github.io/picos/doc/picos/Picos_fifos/index.html),
[`picos.lwt`](https://ocaml-multicore.github.io/picos/doc/picos/Picos_lwt/index.html),
and
[`picos.threaded`](https://ocaml-multicore.github.io/picos/doc/picos/Picos_threaded/index.html)),
scheduler agnostic libraries (e.g.
[`picos.structured`](https://ocaml-multicore.github.io/picos/doc/picos/Picos_structured/index.html),
[`picos.sync`](https://ocaml-multicore.github.io/picos/doc/picos/Picos_sync/index.html),
[`picos.stdio`](https://ocaml-multicore.github.io/picos/doc/picos/Picos_stdio/index.html),
and
Expand Down
3 changes: 2 additions & 1 deletion lib/index.mld
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ These are minimalistic, but fully-functioning, schedulers provided as samples.

{!modules:
Picos_fifos
Picos_threaded
Picos_lwt
Picos_threaded
}

{^ You may find these useful for both understanding the core Picos framework and
Expand All @@ -160,6 +160,7 @@ These are minimalistic, but fully-functioning, schedulers provided as samples.
These are examples of libraries implemented in Picos.

{!modules:
Picos_structured
Picos_sync
Picos_stdio
Picos_select
Expand Down
57 changes: 42 additions & 15 deletions lib/picos/bootstrap/picos_bootstrap.ml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Exn_bt = Picos_exn_bt

module Trigger = struct
let[@inline never] awaiting () = invalid_arg "Trigger: already awaiting"
let[@inline never] awaiting () = invalid_arg "already awaiting"

type state =
| Signaled
Expand Down Expand Up @@ -47,7 +47,9 @@ end

module Computation = struct
let[@inline never] negative_or_nan () =
invalid_arg "Computation: seconds must be non-negative"
invalid_arg "seconds must be non-negative"

let[@inline never] returned () = invalid_arg "already returned"

type 'a state =
| Canceled of Exn_bt.t
Expand Down Expand Up @@ -131,6 +133,7 @@ module Computation = struct
Trigger.signal trigger;
unsafe_unsuspend t Backoff.default |> ignore

(** This cannot be [@@unboxed] because [Atomic.t] is opaque *)
type packed = Packed : 'a t -> packed

let is_running t =
Expand Down Expand Up @@ -193,6 +196,14 @@ module Computation = struct
| Returned value -> value
| Canceled exn_bt -> Exn_bt.raise exn_bt
| Continue _ -> get_or block (block t)

let attach_canceler ~from ~into =
let canceler = canceler ~from ~into in
if try_attach from canceler then canceler
else begin
check from;
returned ()
end
end

module Fiber = struct
Expand All @@ -202,29 +213,41 @@ module Fiber = struct
| Nothing : [> `Nothing ] tdt
| Fiber : {
mutable forbid : bool;
computation : 'a Computation.t;
mutable packed : Computation.packed;
mutable fls : non_float array;
}
-> [> `Fiber ] tdt

type t = [ `Fiber ] tdt

let create ~forbid computation = Fiber { forbid; computation; fls = [||] }
let create_packed ~forbid packed = Fiber { forbid; packed; fls = [||] }

let create ~forbid computation =
create_packed ~forbid (Computation.Packed computation)

let has_forbidden (Fiber r : t) = r.forbid

let is_canceled (Fiber r : t) =
(not r.forbid) && Computation.is_canceled r.computation
(not r.forbid)
&&
let (Packed computation) = r.packed in
Computation.is_canceled computation

let canceled (Fiber r : t) =
if r.forbid then None else Computation.canceled r.computation
if r.forbid then None
else
let (Packed computation) = r.packed in
Computation.canceled computation

let get_computation (Fiber r : t) = r.packed
let set_computation (Fiber r : t) packed = r.packed <- packed

let try_attach (Fiber r : t) trigger =
Computation.try_attach r.computation trigger
let check (Fiber r : t) =
if not r.forbid then
let (Packed computation) = r.packed in
Computation.check computation

let detach (Fiber r : t) trigger = Computation.detach r.computation trigger
let[@inline] equal t1 t2 = t1 == t2
let computation (Fiber r : t) = Computation.Packed r.computation
let check (Fiber r : t) = if not r.forbid then Computation.check r.computation

let exchange (Fiber r : t) ~forbid =
let before = r.forbid in
Expand All @@ -248,14 +271,15 @@ module Fiber = struct
let permit t body = explicitly t body ~forbid:false

let try_suspend (Fiber r : t) trigger x y resume =
let (Packed computation) = r.packed in
if not r.forbid then begin
if Computation.try_attach r.computation trigger then
if Computation.try_attach computation trigger then
Trigger.on_signal trigger x y resume
|| begin
Computation.detach r.computation trigger;
Computation.detach computation trigger;
false
end
else if Computation.is_canceled r.computation then begin
else if Computation.is_canceled computation then begin
Trigger.dispose trigger;
false
end
Expand All @@ -265,7 +289,10 @@ module Fiber = struct

let[@inline] unsuspend (Fiber r : t) trigger =
assert (Trigger.is_signaled trigger);
r.forbid || Computation.unsafe_unsuspend r.computation Backoff.default
r.forbid
||
let (Packed computation) = r.packed in
Computation.unsafe_unsuspend computation Backoff.default

module FLS = struct
type 'a key = { index : int; default : non_float; compute : unit -> 'a }
Expand Down
4 changes: 2 additions & 2 deletions lib/picos/dune
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
(mdx
(enabled_if
(< %{ocaml_version} 5.0.0))
(libraries picos picos.exn_bt picos.threaded domain_shims foundation)
(libraries picos picos.exn_bt picos.structured picos.threaded domain_shims)
(files picos.mli))

(mdx
(enabled_if
(>= %{ocaml_version} 5.0.0))
(libraries picos picos.exn_bt picos.fifos foundation)
(libraries picos picos.exn_bt picos.fifos picos.structured)
(files picos.mli))
15 changes: 15 additions & 0 deletions lib/picos/picos.common.ml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,21 @@ module Fiber = struct
| T Nothing -> ()
| T (Fiber r) -> check (Fiber r)
end

exception Done

let done_bt = Exn_bt.get_callstack 0 Done

let sleep ~seconds =
let sleep = Computation.create ~mode:`LIFO () in
Computation.cancel_after ~seconds sleep done_bt;
let trigger = Trigger.create () in
if Computation.try_attach sleep trigger then
match Trigger.await trigger with
| None -> ()
| Some exn_bt ->
Computation.finish sleep;
Exn_bt.raise exn_bt
end

module Handler = struct
Expand Down
96 changes: 58 additions & 38 deletions lib/picos/picos.mli
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,12 @@
such as condition variables, where it is necessary to forbid cancelation
when the associated mutex is reacquired.
Each fiber has an associated {!Computation}. A computation is something
that needs to be completed either by {{!Computation.return} returning} a
value through it or by {{!Computation.cancel} canceling} it with an
exception. To cancel a fiber one cancels the {{!Fiber.computation}
computation} associated with the fiber.
Each fiber has an associated {!Computation} at all times. A computation is
something that needs to be completed either by {{!Computation.return}
returning} a value through it or by {{!Computation.cancel} canceling} it
with an exception. To cancel a fiber one cancels the computation associated
with the fiber or any computation whose {{!Computation.canceler} cancelation
is propagated} to the computation associated with the fiber.
Before a computation has been completed, it is also possible to
{{!Computation.try_attach} attach} a {!Trigger} to the computation and also
Expand Down Expand Up @@ -114,7 +115,7 @@
as well as a simple helper for cleaning up resources
{[
open Foundation.Finally
open Picos_structured.Finally
]}
and define a simple scheduler on OCaml 4
Expand Down Expand Up @@ -458,10 +459,12 @@ module Computation : sig
end
]}
In this framework, a fiber is always associated with {{!Fiber.computation}
at least a single computation}. However, {{!Fiber.spawn} it is possible
for multiple fibers to share a single computation} and it is also possible
for a single fiber to perform multiple computations.
In this framework, a fiber is always associated with
{{!Fiber.get_computation} at least a single computation}. However,
{{!Fiber.spawn} it is possible for multiple fibers to share a single
computation} and it is also possible for a single fiber to perform
multiple computations. Furthermore, the computation associated with a
fiber {{!Fiber.set_computation} can be changed} by the fiber.
Computations are not hierarchical. In other words, computations do not
directly implement structured concurrency. However, it is possible to
Expand Down Expand Up @@ -640,6 +643,15 @@ module Computation : sig
⚠️ The returned trigger will be in the awaiting state, which means that it
is an error to call {!Trigger.await} or {!Trigger.on_signal} on it. *)

val attach_canceler : from:_ t -> into:_ t -> Trigger.t
(** [attach_canceler ~from ~into] tries to attach a {!canceler} to the
computation [from] to propagate cancelation to the computation [into] and
returns the {!canceler} when successful. If the computation [from] has
already been canceled, the exception that [from] was canceled with will be
raised.
@raise Invalid_argument if the [from] computation has already returned. *)

(** {2 Interface for schedulers} *)

include Intf.Computation with type 'a t := 'a t with type exn_bt := Exn_bt.t
Expand Down Expand Up @@ -675,13 +687,14 @@ module Computation : sig
scoping of computations and resource cleanup at completion, which is how
the design evolved from a more traditional cancelation context design.
In this framework, {{!Fiber.computation} every fiber has an associated
computation}. Being able to return a value through the computation means
that no separate promise is necessarily required to hold the result of a
fiber. On the other hand, in this framework, {{!Fiber.spawn} multiple
fibers may share a single computation}. This allows multiple fibers to be
canceled efficiently through a single atomic update. In other words, the
design allows various higher level patterns to be implemented efficiently.
In this framework, {{!Fiber.get_computation} every fiber is associated
with a computation}. Being able to return a value through the computation
means that no separate promise is necessarily required to hold the result
of a fiber. On the other hand, in this framework, {{!Fiber.spawn}
multiple fibers may share a single computation}. This allows multiple
fibers to be canceled efficiently through a single atomic update. In
other words, the design allows various higher level patterns to be
implemented efficiently.
Instead of directly implementing a hierarchy of computations, the design
allows {{!try_attach} attach}ing triggers to computations and
Expand All @@ -702,8 +715,9 @@ module Fiber : sig
A fiber corresponds to an independent thread of execution. Fibers are
{!create}d by schedulers in response to {!Spawn} effects. A fiber is
associated with a {{!Computation} computation} and either {!forbid}s or
{!permit}s the scheduler from propagating cancelation to it. A fiber also
has an associated {{!FLS} fiber local storage}.
{!permit}s the scheduler from propagating cancelation when the fiber
performs effects. A fiber also has an associated {{!FLS} fiber local
storage}.
⚠️ Many operations on fibers can only be called safely from the fiber
itself, because those operations are neither concurrency nor parallelism
Expand All @@ -723,6 +737,10 @@ module Fiber : sig
- on OCaml 4, [yield] will call the [yield] operation of the {{!Handler}
current handler}. *)

val sleep : seconds:float -> unit
(** [sleep ~seconds] suspends the current fiber for specified number of
seconds. *)

(** {2 Interface for spawning} *)

val spawn : forbid:bool -> 'a Computation.t -> (unit -> unit) list -> unit
Expand Down Expand Up @@ -764,8 +782,8 @@ module Fiber : sig
⚠️ Extra care should be taken when storing the fiber object in any shared
data structure, because, aside from checking whether two fibers are
{!equal}, or from accessing the associated {!computation}, it is generally
unsafe to perform any operations on foreign fibers.
{!equal}, it is generally unsafe to perform any operations on foreign
fibers.
ℹ️ The behavior is that
Expand Down Expand Up @@ -802,7 +820,7 @@ module Fiber : sig
ℹ️ [forbid] does not prevent the fiber or the associated {!computation}
from being canceled. It only tells the scheduler not to propagate
cancelation to the fiber.
cancelation to the fiber when it performs effects.
⚠️ It is only safe to call [forbid] from the fiber itself. *)

Expand All @@ -821,7 +839,7 @@ module Fiber : sig
{@ocaml skip[
not (Fiber.has_forbidden fiber) &&
let (Packed computation) =
Fiber.computation fiber
Fiber.get_computation fiber
in
Computation.is_canceled computation
]}
Expand All @@ -837,7 +855,7 @@ module Fiber : sig
None
else
let (Packed computation) =
Fiber.computation fiber
Fiber.get_computation fiber
in
Computation.canceled computation
]}
Expand All @@ -851,7 +869,7 @@ module Fiber : sig
{@ocaml skip[
if not (Fiber.has_forbidden fiber) then
let (Packed computation) =
Fiber.computation fiber
Fiber.get_computation fiber
in
Computation.check computation
]}
Expand Down Expand Up @@ -927,6 +945,16 @@ module Fiber : sig
⚠️ It is only safe to call [set] from the fiber itself. *)
end

(** {2 Interface for structuring} *)

val get_computation : t -> Computation.packed
(** [get_computation fiber] returns the computation that the fiber is
currently associated with. *)

val set_computation : t -> Computation.packed -> unit
(** [set_computation fiber packed] associates the fiber with the specified
computation. *)

(** {2 Interface for foreign fiber} *)

val equal : t -> t -> bool
Expand All @@ -937,18 +965,6 @@ module Fiber : sig
primitives like mutexes where it makes sense to check that acquire and
release operations are performed by the same fiber. *)

val computation : t -> Computation.packed
(** [computation fiber] returns the computation that the fiber has been
{!create}d with. *)

val try_attach : t -> Trigger.t -> bool
(** [try_attach fiber trigger] is equivalent to
[let Packed c = computation fiber in Computation.try_attach c trigger]. *)

val detach : t -> Trigger.t -> unit
(** [detach fiber trigger] is equivalent to
[let Packed c = computation fiber in Computation.detach c trigger]. *)

module Maybe : sig
(** An unboxed optional {{!Fiber.t} fiber}. *)

Expand Down Expand Up @@ -1015,8 +1031,12 @@ module Fiber : sig

(** {2 Interface for schedulers} *)

val create_packed : forbid:bool -> Computation.packed -> t
(** [create_packed ~forbid packed] creates a new fiber. *)

val create : forbid:bool -> 'a Computation.t -> t
(** [create ~forbid computation] creates a new fiber. *)
(** [create ~forbid computation] is equivalent to
[create_packed ~forbid (Computation.Packed computation)]. *)

val try_suspend :
t -> Trigger.t -> 'x -> 'y -> (Trigger.t -> 'x -> 'y -> unit) -> bool
Expand Down
Loading

0 comments on commit 98761fd

Please sign in to comment.