diff --git a/README.md b/README.md index 81e48a1e..078e5110 100644 --- a/README.md +++ b/README.md @@ -19,9 +19,11 @@ small library, that defines the essential interoperability framework. The rest of the libraries are either sample schedulers (e.g. [`picos.fifos`](https://ocaml-multicore.github.io/picos/doc/picos/Picos_fifos/index.html), +[`picos.lwt`](https://ocaml-multicore.github.io/picos/doc/picos/Picos_lwt/index.html), and [`picos.threaded`](https://ocaml-multicore.github.io/picos/doc/picos/Picos_threaded/index.html)), scheduler agnostic libraries (e.g. +[`picos.structured`](https://ocaml-multicore.github.io/picos/doc/picos/Picos_structured/index.html), [`picos.sync`](https://ocaml-multicore.github.io/picos/doc/picos/Picos_sync/index.html), [`picos.stdio`](https://ocaml-multicore.github.io/picos/doc/picos/Picos_stdio/index.html), and diff --git a/lib/index.mld b/lib/index.mld index 3de95a49..47f73e5b 100644 --- a/lib/index.mld +++ b/lib/index.mld @@ -148,8 +148,8 @@ These are minimalistic, but fully-functioning, schedulers provided as samples. {!modules: Picos_fifos - Picos_threaded Picos_lwt + Picos_threaded } {^ You may find these useful for both understanding the core Picos framework and @@ -160,6 +160,7 @@ These are minimalistic, but fully-functioning, schedulers provided as samples. These are examples of libraries implemented in Picos. {!modules: + Picos_structured Picos_sync Picos_stdio Picos_select diff --git a/lib/picos/bootstrap/picos_bootstrap.ml b/lib/picos/bootstrap/picos_bootstrap.ml index 817f6cff..2b0fbab0 100644 --- a/lib/picos/bootstrap/picos_bootstrap.ml +++ b/lib/picos/bootstrap/picos_bootstrap.ml @@ -1,7 +1,7 @@ module Exn_bt = Picos_exn_bt module Trigger = struct - let[@inline never] awaiting () = invalid_arg "Trigger: already awaiting" + let[@inline never] awaiting () = invalid_arg "already awaiting" type state = | Signaled @@ -47,7 +47,9 @@ end module Computation = struct let[@inline never] negative_or_nan () = - invalid_arg "Computation: seconds must be non-negative" + invalid_arg "seconds must be non-negative" + + let[@inline never] returned () = invalid_arg "already returned" type 'a state = | Canceled of Exn_bt.t @@ -131,6 +133,7 @@ module Computation = struct Trigger.signal trigger; unsafe_unsuspend t Backoff.default |> ignore + (** This cannot be [@@unboxed] because [Atomic.t] is opaque *) type packed = Packed : 'a t -> packed let is_running t = @@ -193,6 +196,14 @@ module Computation = struct | Returned value -> value | Canceled exn_bt -> Exn_bt.raise exn_bt | Continue _ -> get_or block (block t) + + let attach_canceler ~from ~into = + let canceler = canceler ~from ~into in + if try_attach from canceler then canceler + else begin + check from; + returned () + end end module Fiber = struct @@ -202,29 +213,41 @@ module Fiber = struct | Nothing : [> `Nothing ] tdt | Fiber : { mutable forbid : bool; - computation : 'a Computation.t; + mutable packed : Computation.packed; mutable fls : non_float array; } -> [> `Fiber ] tdt type t = [ `Fiber ] tdt - let create ~forbid computation = Fiber { forbid; computation; fls = [||] } + let create_packed ~forbid packed = Fiber { forbid; packed; fls = [||] } + + let create ~forbid computation = + create_packed ~forbid (Computation.Packed computation) + let has_forbidden (Fiber r : t) = r.forbid let is_canceled (Fiber r : t) = - (not r.forbid) && Computation.is_canceled r.computation + (not r.forbid) + && + let (Packed computation) = r.packed in + Computation.is_canceled computation let canceled (Fiber r : t) = - if r.forbid then None else Computation.canceled r.computation + if r.forbid then None + else + let (Packed computation) = r.packed in + Computation.canceled computation + + let get_computation (Fiber r : t) = r.packed + let set_computation (Fiber r : t) packed = r.packed <- packed - let try_attach (Fiber r : t) trigger = - Computation.try_attach r.computation trigger + let check (Fiber r : t) = + if not r.forbid then + let (Packed computation) = r.packed in + Computation.check computation - let detach (Fiber r : t) trigger = Computation.detach r.computation trigger let[@inline] equal t1 t2 = t1 == t2 - let computation (Fiber r : t) = Computation.Packed r.computation - let check (Fiber r : t) = if not r.forbid then Computation.check r.computation let exchange (Fiber r : t) ~forbid = let before = r.forbid in @@ -248,14 +271,15 @@ module Fiber = struct let permit t body = explicitly t body ~forbid:false let try_suspend (Fiber r : t) trigger x y resume = + let (Packed computation) = r.packed in if not r.forbid then begin - if Computation.try_attach r.computation trigger then + if Computation.try_attach computation trigger then Trigger.on_signal trigger x y resume || begin - Computation.detach r.computation trigger; + Computation.detach computation trigger; false end - else if Computation.is_canceled r.computation then begin + else if Computation.is_canceled computation then begin Trigger.dispose trigger; false end @@ -265,7 +289,10 @@ module Fiber = struct let[@inline] unsuspend (Fiber r : t) trigger = assert (Trigger.is_signaled trigger); - r.forbid || Computation.unsafe_unsuspend r.computation Backoff.default + r.forbid + || + let (Packed computation) = r.packed in + Computation.unsafe_unsuspend computation Backoff.default module FLS = struct type 'a key = { index : int; default : non_float; compute : unit -> 'a } diff --git a/lib/picos/dune b/lib/picos/dune index 29679fec..50c4314d 100644 --- a/lib/picos/dune +++ b/lib/picos/dune @@ -18,11 +18,11 @@ (mdx (enabled_if (< %{ocaml_version} 5.0.0)) - (libraries picos picos.exn_bt picos.threaded domain_shims foundation) + (libraries picos picos.exn_bt picos.structured picos.threaded domain_shims) (files picos.mli)) (mdx (enabled_if (>= %{ocaml_version} 5.0.0)) - (libraries picos picos.exn_bt picos.fifos foundation) + (libraries picos picos.exn_bt picos.fifos picos.structured) (files picos.mli)) diff --git a/lib/picos/picos.common.ml b/lib/picos/picos.common.ml index 9202f3ae..f25cb56e 100644 --- a/lib/picos/picos.common.ml +++ b/lib/picos/picos.common.ml @@ -58,6 +58,21 @@ module Fiber = struct | T Nothing -> () | T (Fiber r) -> check (Fiber r) end + + exception Done + + let done_bt = Exn_bt.get_callstack 0 Done + + let sleep ~seconds = + let sleep = Computation.create ~mode:`LIFO () in + Computation.cancel_after ~seconds sleep done_bt; + let trigger = Trigger.create () in + if Computation.try_attach sleep trigger then + match Trigger.await trigger with + | None -> () + | Some exn_bt -> + Computation.finish sleep; + Exn_bt.raise exn_bt end module Handler = struct diff --git a/lib/picos/picos.mli b/lib/picos/picos.mli index d733763a..4ecbe085 100644 --- a/lib/picos/picos.mli +++ b/lib/picos/picos.mli @@ -75,11 +75,12 @@ such as condition variables, where it is necessary to forbid cancelation when the associated mutex is reacquired. - Each fiber has an associated {!Computation}. A computation is something - that needs to be completed either by {{!Computation.return} returning} a - value through it or by {{!Computation.cancel} canceling} it with an - exception. To cancel a fiber one cancels the {{!Fiber.computation} - computation} associated with the fiber. + Each fiber has an associated {!Computation} at all times. A computation is + something that needs to be completed either by {{!Computation.return} + returning} a value through it or by {{!Computation.cancel} canceling} it + with an exception. To cancel a fiber one cancels the computation associated + with the fiber or any computation whose {{!Computation.canceler} cancelation + is propagated} to the computation associated with the fiber. Before a computation has been completed, it is also possible to {{!Computation.try_attach} attach} a {!Trigger} to the computation and also @@ -114,7 +115,7 @@ as well as a simple helper for cleaning up resources {[ - open Foundation.Finally + open Picos_structured.Finally ]} and define a simple scheduler on OCaml 4 @@ -458,10 +459,12 @@ module Computation : sig end ]} - In this framework, a fiber is always associated with {{!Fiber.computation} - at least a single computation}. However, {{!Fiber.spawn} it is possible - for multiple fibers to share a single computation} and it is also possible - for a single fiber to perform multiple computations. + In this framework, a fiber is always associated with + {{!Fiber.get_computation} at least a single computation}. However, + {{!Fiber.spawn} it is possible for multiple fibers to share a single + computation} and it is also possible for a single fiber to perform + multiple computations. Furthermore, the computation associated with a + fiber {{!Fiber.set_computation} can be changed} by the fiber. Computations are not hierarchical. In other words, computations do not directly implement structured concurrency. However, it is possible to @@ -640,6 +643,15 @@ module Computation : sig ⚠️ The returned trigger will be in the awaiting state, which means that it is an error to call {!Trigger.await} or {!Trigger.on_signal} on it. *) + val attach_canceler : from:_ t -> into:_ t -> Trigger.t + (** [attach_canceler ~from ~into] tries to attach a {!canceler} to the + computation [from] to propagate cancelation to the computation [into] and + returns the {!canceler} when successful. If the computation [from] has + already been canceled, the exception that [from] was canceled with will be + raised. + + @raise Invalid_argument if the [from] computation has already returned. *) + (** {2 Interface for schedulers} *) include Intf.Computation with type 'a t := 'a t with type exn_bt := Exn_bt.t @@ -675,13 +687,14 @@ module Computation : sig scoping of computations and resource cleanup at completion, which is how the design evolved from a more traditional cancelation context design. - In this framework, {{!Fiber.computation} every fiber has an associated - computation}. Being able to return a value through the computation means - that no separate promise is necessarily required to hold the result of a - fiber. On the other hand, in this framework, {{!Fiber.spawn} multiple - fibers may share a single computation}. This allows multiple fibers to be - canceled efficiently through a single atomic update. In other words, the - design allows various higher level patterns to be implemented efficiently. + In this framework, {{!Fiber.get_computation} every fiber is associated + with a computation}. Being able to return a value through the computation + means that no separate promise is necessarily required to hold the result + of a fiber. On the other hand, in this framework, {{!Fiber.spawn} + multiple fibers may share a single computation}. This allows multiple + fibers to be canceled efficiently through a single atomic update. In + other words, the design allows various higher level patterns to be + implemented efficiently. Instead of directly implementing a hierarchy of computations, the design allows {{!try_attach} attach}ing triggers to computations and @@ -702,8 +715,9 @@ module Fiber : sig A fiber corresponds to an independent thread of execution. Fibers are {!create}d by schedulers in response to {!Spawn} effects. A fiber is associated with a {{!Computation} computation} and either {!forbid}s or - {!permit}s the scheduler from propagating cancelation to it. A fiber also - has an associated {{!FLS} fiber local storage}. + {!permit}s the scheduler from propagating cancelation when the fiber + performs effects. A fiber also has an associated {{!FLS} fiber local + storage}. ⚠️ Many operations on fibers can only be called safely from the fiber itself, because those operations are neither concurrency nor parallelism @@ -723,6 +737,10 @@ module Fiber : sig - on OCaml 4, [yield] will call the [yield] operation of the {{!Handler} current handler}. *) + val sleep : seconds:float -> unit + (** [sleep ~seconds] suspends the current fiber for specified number of + seconds. *) + (** {2 Interface for spawning} *) val spawn : forbid:bool -> 'a Computation.t -> (unit -> unit) list -> unit @@ -764,8 +782,8 @@ module Fiber : sig ⚠️ Extra care should be taken when storing the fiber object in any shared data structure, because, aside from checking whether two fibers are - {!equal}, or from accessing the associated {!computation}, it is generally - unsafe to perform any operations on foreign fibers. + {!equal}, it is generally unsafe to perform any operations on foreign + fibers. ℹ️ The behavior is that @@ -802,7 +820,7 @@ module Fiber : sig ℹ️ [forbid] does not prevent the fiber or the associated {!computation} from being canceled. It only tells the scheduler not to propagate - cancelation to the fiber. + cancelation to the fiber when it performs effects. ⚠️ It is only safe to call [forbid] from the fiber itself. *) @@ -821,7 +839,7 @@ module Fiber : sig {@ocaml skip[ not (Fiber.has_forbidden fiber) && let (Packed computation) = - Fiber.computation fiber + Fiber.get_computation fiber in Computation.is_canceled computation ]} @@ -837,7 +855,7 @@ module Fiber : sig None else let (Packed computation) = - Fiber.computation fiber + Fiber.get_computation fiber in Computation.canceled computation ]} @@ -851,7 +869,7 @@ module Fiber : sig {@ocaml skip[ if not (Fiber.has_forbidden fiber) then let (Packed computation) = - Fiber.computation fiber + Fiber.get_computation fiber in Computation.check computation ]} @@ -927,6 +945,16 @@ module Fiber : sig ⚠️ It is only safe to call [set] from the fiber itself. *) end + (** {2 Interface for structuring} *) + + val get_computation : t -> Computation.packed + (** [get_computation fiber] returns the computation that the fiber is + currently associated with. *) + + val set_computation : t -> Computation.packed -> unit + (** [set_computation fiber packed] associates the fiber with the specified + computation. *) + (** {2 Interface for foreign fiber} *) val equal : t -> t -> bool @@ -937,18 +965,6 @@ module Fiber : sig primitives like mutexes where it makes sense to check that acquire and release operations are performed by the same fiber. *) - val computation : t -> Computation.packed - (** [computation fiber] returns the computation that the fiber has been - {!create}d with. *) - - val try_attach : t -> Trigger.t -> bool - (** [try_attach fiber trigger] is equivalent to - [let Packed c = computation fiber in Computation.try_attach c trigger]. *) - - val detach : t -> Trigger.t -> unit - (** [detach fiber trigger] is equivalent to - [let Packed c = computation fiber in Computation.detach c trigger]. *) - module Maybe : sig (** An unboxed optional {{!Fiber.t} fiber}. *) @@ -1015,8 +1031,12 @@ module Fiber : sig (** {2 Interface for schedulers} *) + val create_packed : forbid:bool -> Computation.packed -> t + (** [create_packed ~forbid packed] creates a new fiber. *) + val create : forbid:bool -> 'a Computation.t -> t - (** [create ~forbid computation] creates a new fiber. *) + (** [create ~forbid computation] is equivalent to + [create_packed ~forbid (Computation.Packed computation)]. *) val try_suspend : t -> Trigger.t -> 'x -> 'y -> (Trigger.t -> 'x -> 'y -> unit) -> bool diff --git a/lib/picos_fifos/picos_fifos.ml b/lib/picos_fifos/picos_fifos.ml index 36007a8f..a9811190 100644 --- a/lib/picos_fifos/picos_fifos.ml +++ b/lib/picos_fifos/picos_fifos.ml @@ -22,12 +22,12 @@ type t = { retc : unit -> unit; } -let rec spawn t n forbid computation = function +let rec spawn t n forbid packed = function | [] -> Atomic.fetch_and_add t.num_alive_fibers n |> ignore | main :: mains -> - let fiber = Fiber.create ~forbid computation in + let fiber = Fiber.create_packed ~forbid packed in Queue.push t.ready (Spawn (fiber, main)); - spawn t (n + 1) forbid computation mains + spawn t (n + 1) forbid packed mains let continue = Some (fun k -> Effect.Deep.continue k ()) @@ -56,7 +56,7 @@ let rec next t = whole operation or discontinue the fiber. *) if Fiber.is_canceled fiber then discontinue else begin - spawn t 0 r.forbid r.computation r.mains; + spawn t 0 r.forbid (Packed r.computation) r.mains; continue end | Fiber.Yield -> yield diff --git a/lib/picos_lwt/intf.ml b/lib/picos_lwt/intf.ml index da23b00f..819ffe3e 100644 --- a/lib/picos_lwt/intf.ml +++ b/lib/picos_lwt/intf.ml @@ -17,5 +17,5 @@ module type S = sig val await : (unit -> 'a Lwt.t) -> 'a (** [await thunk] awaits for the promise returned by [thunk ()] to resolve and returns the result. This should only be called from inside a fiber - running inside {!start}. *) + started through {!run}. *) end diff --git a/lib/picos_lwt/picos_lwt.ml b/lib/picos_lwt/picos_lwt.ml index a3a8b5cd..797f06fc 100644 --- a/lib/picos_lwt/picos_lwt.ml +++ b/lib/picos_lwt/picos_lwt.ml @@ -20,9 +20,10 @@ module Make (Sleep : Sleep) : S = struct (fun k -> match Fiber.canceled fiber with | None -> + let packed = Computation.Packed r.computation in List.iter (fun main -> - let fiber = Fiber.create ~forbid:r.forbid r.computation in + let fiber = Fiber.create_packed ~forbid:r.forbid packed in Lwt.async @@ fun () -> run fiber (Effect.Shallow.fiber main) (Ok ())) r.mains; diff --git a/lib/picos_mpsc_queue/picos_mpsc_queue.ml b/lib/picos_mpsc_queue/picos_mpsc_queue.ml index 1794e07b..2a77d901 100644 --- a/lib/picos_mpsc_queue/picos_mpsc_queue.ml +++ b/lib/picos_mpsc_queue/picos_mpsc_queue.ml @@ -11,8 +11,7 @@ and 'a tail = T : ('a, [< `Tail | `Snoc ]) tdt -> 'a tail [@@unboxed] exception Empty -let[@inline never] impossible () = - invalid_arg "Picos_mpsc_queue: multiple consumers not allowed" +let[@inline never] impossible () = invalid_arg "multiple consumers not allowed" let create () = let tail = Multicore_magic.copy_as_padded @@ Atomic.make (T Tail) in diff --git a/lib/picos_rc/picos_rc.ml b/lib/picos_rc/picos_rc.ml index 7079a04f..5e17fec8 100644 --- a/lib/picos_rc/picos_rc.ml +++ b/lib/picos_rc/picos_rc.ml @@ -1,10 +1,10 @@ include Intf let[@inline never] created () = - invalid_arg "Picos_rc: resource already previously created" + invalid_arg "resource already previously created" let[@inline never] disposed () = - invalid_arg "Picos_rc: resource already previously disposed" + invalid_arg "resource already previously disposed" let bt = if Printexc.backtrace_status () then None else Some (Printexc.get_callstack 0) diff --git a/lib/picos_select/dune b/lib/picos_select/dune index c4f6af4c..2577db01 100644 --- a/lib/picos_select/dune +++ b/lib/picos_select/dune @@ -21,11 +21,11 @@ (enabled_if (>= %{ocaml_version} 5.1.0)) (libraries - foundation picos picos.exn_bt - picos.stdio picos.fifos picos.select + picos.stdio + picos.structured unix) (files picos_select.mli)) diff --git a/lib/picos_select/picos_select.ml b/lib/picos_select/picos_select.ml index 45b7920f..74059961 100644 --- a/lib/picos_select/picos_select.ml +++ b/lib/picos_select/picos_select.ml @@ -101,7 +101,7 @@ let cleared = let intr_key = Picos_thread.TLS.new_key @@ fun () : [ `Req ] tdt -> - invalid_arg "Picos_select has not been configured" + invalid_arg "has not been configured" let key = Picos_domain.DLS.new_key @@ fun () -> @@ -280,13 +280,12 @@ let is_intr_sig signum = signum = config.intr_sig let rec configure ?(intr_sig = Sys.sigusr2) ?(handle_sigchld = true) () = if not (Picos_thread.is_main_thread ()) then - invalid_arg - "Picos_select must be configured from the main thread on the main domain"; + invalid_arg "must be called from the main thread on the main domain"; assert (Sys.sigabrt = -1 && Sys.sigxfsz < Sys.sigabrt); if intr_sig < Sys.sigxfsz || 0 <= intr_sig || intr_sig = Sys.sigchld then - invalid_arg "Invalid interrupt signal number"; + invalid_arg "invalid interrupt signal number"; if not (try_configure ~intr_sig ~intr_sigs:[ intr_sig ] ~handle_sigchld) then - invalid_arg "Picos_select.configure already configured"; + invalid_arg "already configured"; if not Sys.win32 then begin if config.bits land handle_sigchld_bit <> 0 then begin @@ -336,8 +335,7 @@ let[@inline never] init s = while s.state == `Starting do Thread.yield () done; - if s.state != `Alive then - invalid_arg "Picos_select: domain has been terminated" + if s.state != `Alive then invalid_arg "domain has been terminated" let get () = let s = Picos_domain.DLS.get key in @@ -368,9 +366,7 @@ let rec remove_action _trigger s id = let to_deadline ~seconds = match Mtime.Span.of_float_ns (seconds *. 1_000_000_000.) with - | None -> - invalid_arg - "Picos_select: seconds should be between 0 to pow(2, 53) nanoseconds" + | None -> invalid_arg "seconds should be between 0 to pow(2, 53) nanoseconds" | Some span -> Mtime.Span.add (Mtime_clock.elapsed ()) span let[@alert "-handler"] cancel_after computation ~seconds exn_bt = @@ -457,8 +453,7 @@ module Intr = struct let nothing = R Nothing let[@alert "-handler"] req ~seconds = - if Sys.win32 then - invalid_arg "Picos_select.Intr is not supported on Windows" + if Sys.win32 then invalid_arg "not supported on Windows" else begin let time = to_deadline ~seconds in (* assert (not (Computation.is_running r.computation)); *) diff --git a/lib/picos_select/picos_select.mli b/lib/picos_select/picos_select.mli index da3a18f9..7ebc30ad 100644 --- a/lib/picos_select/picos_select.mli +++ b/lib/picos_select/picos_select.mli @@ -119,11 +119,12 @@ val check_configured : unit -> unit (** {1 Examples} - For convenience, we first open the {!Picos} and {!Picos_stdio} modules: + First we open some modules for convenience: {[ - open Foundation.Finally open Picos + open Picos_structured.Finally + open Picos_structured open Picos_stdio ]} @@ -157,12 +158,9 @@ val check_configured : unit -> unit Unix.set_nonblock syn_inn; Unix.set_nonblock syn_out; - let consumer = Computation.create () in - Fiber.spawn ~forbid:false consumer [ fun () -> - try + Bundle.join_after begin fun bundle -> + Bundle.fork bundle begin fun () -> while true do - Fiber.check (Fiber.current ()); - let select = Computation.create () in Picos_select.return_on select msg_inn1 `R `Inn1; Picos_select.return_on select msg_inn2 `R `Inn2; @@ -181,18 +179,20 @@ val check_configured : unit -> unit | exception Timeout -> Printf.printf "Timeout\n%!"; assert (1 = Unix.write_substring syn_out "!" 0 1) - | exception Exit -> - Computation.cancel select (Exn_bt.get_callstack 0 Exit) + | exception exn -> + Computation.cancel select (Exn_bt.get_callstack 0 Exit); + raise exn done - with Exit -> () ]; + end; - assert (1 = Unix.write_substring msg_out1 "!" 0 1); - assert (1 = Unix.read syn_inn (Bytes.create 1) 0 1); - assert (1 = Unix.write_substring msg_out2 "!" 0 1); - assert (1 = Unix.read syn_inn (Bytes.create 1) 0 1); - assert (1 = Unix.read syn_inn (Bytes.create 1) 0 1); + assert (1 = Unix.write_substring msg_out1 "!" 0 1); + assert (1 = Unix.read syn_inn (Bytes.create 1) 0 1); + assert (1 = Unix.write_substring msg_out2 "!" 0 1); + assert (1 = Unix.read syn_inn (Bytes.create 1) 0 1); + assert (1 = Unix.read syn_inn (Bytes.create 1) 0 1); - Computation.cancel consumer (Exn_bt.get_callstack 0 Exit) + Bundle.terminate bundle + end Inn1 Inn2 Timeout diff --git a/lib/picos_stdio/dune b/lib/picos_stdio/dune index 2630ab69..f697f0eb 100644 --- a/lib/picos_stdio/dune +++ b/lib/picos_stdio/dune @@ -11,5 +11,5 @@ (mdx (enabled_if (>= %{ocaml_version} 5.1.0)) - (libraries foundation picos picos.exn_bt picos.stdio picos.fifos unix) + (libraries picos picos.exn_bt picos.fifos picos.stdio picos.structured unix) (files picos_stdio.mli)) diff --git a/lib/picos_stdio/picos_stdio.ml b/lib/picos_stdio/picos_stdio.ml index e08d72b3..f1c6cc80 100644 --- a/lib/picos_stdio/picos_stdio.ml +++ b/lib/picos_stdio/picos_stdio.ml @@ -334,9 +334,7 @@ module Unix = struct (* One way to provide a scheduler friendly [waitpid] on Windows would be to use a thread pool to run blocking operations on. PR for a thread pool implemented in Picos would be welcome! *) - invalid_arg - "Picos_stdio.Unix.waitpid is currently not supported on Windows \ - without WNOHANG" + invalid_arg "currently not supported on Windows without WNOHANG" let waitpid flags pid = let bits = Wait_flag.to_bits flags in @@ -364,43 +362,21 @@ module Unix = struct (* One way to provide a scheduler friendly [system] on Windows would be to use a thread pool to run blocking operations on. PR for a thread pool implemented in Picos would be welcome! *) - invalid_arg - "Picos_stdio.Unix.system is currently not supported on Windows" + invalid_arg "currently not supported on Windows" else create_process sh [| sh; "-c"; cmd |] stdin stdout stderr |> waitpid [] |> snd (* *) - exception Done + let sleepf seconds = Fiber.sleep ~seconds + let sleep seconds = Fiber.sleep ~seconds:(Float.of_int seconds) - let done_bt = Exn_bt.get_callstack 0 Done + (* *) - let sleepf seconds = - let sleep = Computation.create ~mode:`LIFO () in - (* We could also use [Computation.cancel_after], but we already depend on - [Picos_select]. *) - Picos_select.cancel_after ~seconds sleep done_bt; - let trigger = Trigger.create () in - if Computation.try_attach sleep trigger then - match Trigger.await trigger with - | None -> - (* This means that the timeout was triggered and [sleep] has been - canceled. *) - () - | Some exn_bt -> - (* This means that the underlying fiber was canceled. - - Note that the [exn_bt] does not come from [sleep]! - - We must finish the [sleep] computation, which signals the scheduler - that the timeout is no longer needed. *) - Computation.finish sleep; - Exn_bt.raise exn_bt - - let sleep seconds = sleepf (Float.of_int seconds) + exception Done - (* *) + let done_bt = Exn_bt.get_callstack 0 Done let[@alert "-handler"] select rds wrs exs seconds = let overall = Computation.create () in diff --git a/lib/picos_stdio/picos_stdio.mli b/lib/picos_stdio/picos_stdio.mli index a3e2319c..f858a2f2 100644 --- a/lib/picos_stdio/picos_stdio.mli +++ b/lib/picos_stdio/picos_stdio.mli @@ -685,11 +685,11 @@ end (** {1 Examples} - For convenience, we first open the {!Picos} and {!Picos_stdio} modules: + First we open some modules for convenience: {[ - open Foundation.Finally - open Picos + open Picos_structured.Finally + open Picos_structured open Picos_stdio ]} @@ -715,9 +715,8 @@ end Unix.set_nonblock syn_inn; Unix.set_nonblock syn_out; - let consumer = Computation.create () in - Fiber.spawn ~forbid:false consumer [ fun () -> - try + Bundle.join_after begin fun bundle -> + Bundle.fork bundle begin fun () -> let bytes = Bytes.create 100 in while true do let n = Unix.read msg_inn bytes 0 100 in @@ -726,18 +725,19 @@ end assert (1 = Unix.write_substring syn_out "!" 0 1) end done - with Exit -> () ]; + end; - let send_string s = - let n = String.length s in - assert (n = Unix.write_substring msg_out s 0 n); - assert (1 = Unix.read syn_inn (Bytes.create 1) 0 1) - in + let send_string s = + let n = String.length s in + assert (n = Unix.write_substring msg_out s 0 n); + assert (1 = Unix.read syn_inn (Bytes.create 1) 0 1) + in - send_string "Hello, world!"; - send_string "POSIX with OCaml"; + send_string "Hello, world!"; + send_string "POSIX with OCaml"; - Computation.cancel consumer (Exn_bt.get_callstack 0 Exit) + Bundle.terminate bundle + end Hello, world! POSIX with OCaml - : unit = () diff --git a/lib/picos_structured/bundle.ml b/lib/picos_structured/bundle.ml new file mode 100644 index 00000000..f4f8f5ff --- /dev/null +++ b/lib/picos_structured/bundle.ml @@ -0,0 +1,89 @@ +open Picos + +type t = { + num_fibers : int Atomic.t; + bundle : unit Computation.t; + errors : Control.Errors.t; + finished : Trigger.t; +} + +let terminate ?callstack t = + let terminate_bt = Control.terminate_bt ?callstack () in + Computation.cancel t.bundle terminate_bt + +let error t exn_bt = + terminate t; + Control.Errors.push t.errors exn_bt + +let decr t = + let n = Atomic.fetch_and_add t.num_fibers (-1) in + if n = 1 then begin + Computation.finish t.bundle; + Trigger.signal t.finished + end + +let await t fiber packed canceler = + decr t; + Fiber.set_computation fiber packed; + let forbid = Fiber.exchange fiber ~forbid:true in + Trigger.await t.finished |> ignore; + Fiber.set fiber ~forbid; + let (Packed parent) = packed in + Computation.detach parent canceler; + Control.Errors.check t.errors; + Fiber.check fiber + +let join_after fn = + let t = + let num_fibers = Atomic.make 1 in + let bundle = Computation.create () in + let errors = Control.Errors.create () in + let finished = Trigger.create () in + { num_fibers; bundle; errors; finished } + in + let fiber = Fiber.current () in + let (Packed parent as packed) = Fiber.get_computation fiber in + let bundle = Computation.Packed t.bundle in + let canceler = Computation.attach_canceler ~from:parent ~into:t.bundle in + (* TODO: Ideally there should be no poll point betweem [attach_canceler] and + the [match ... with] below. *) + Fiber.set_computation fiber bundle; + match fn t with + | value -> + await t fiber packed canceler; + value + | exception exn -> + let exn_bt = Exn_bt.get exn in + error t exn_bt; + await t fiber packed canceler; + Exn_bt.raise exn_bt + +let[@inline never] completed () = invalid_arg "already completed" + +let rec incr t backoff = + let before = Atomic.get t.num_fibers in + if before = 0 then completed () + else if not (Atomic.compare_and_set t.num_fibers before (before + 1)) then + incr t (Backoff.once backoff) + +let fork t thunk = + incr t Backoff.default; + try + let child = Computation.create () in + let canceler = Computation.attach_canceler ~from:t.bundle ~into:child in + let main () = + begin + match thunk () with + | () -> Computation.finish child + | exception exn -> + let exn_bt = Exn_bt.get exn in + Computation.cancel child exn_bt; + error t exn_bt + end; + Computation.detach t.bundle canceler; + decr t + in + Fiber.spawn ~forbid:false child [ main ] + with canceled_exn -> + decr t; + raise canceled_exn diff --git a/lib/picos_structured/control.ml b/lib/picos_structured/control.ml new file mode 100644 index 00000000..b2f90359 --- /dev/null +++ b/lib/picos_structured/control.ml @@ -0,0 +1,61 @@ +open Picos + +exception Terminate + +let terminate_bt = Exn_bt.get_callstack 0 Terminate + +let terminate_bt ?callstack () = + match callstack with + | None -> terminate_bt + | Some n -> Exn_bt.get_callstack n Terminate + +exception Errors of Exn_bt.t list + +let () = + Printexc.register_printer @@ function + | Errors exn_bts -> + let causes = + List.map (fun exn_bt -> Printexc.to_string exn_bt.Exn_bt.exn) exn_bts + |> String.concat "; " + in + Some (Printf.sprintf "Errors[%s]" causes) + | _ -> None + +module Errors = struct + type t = Exn_bt.t list Atomic.t + + let create () = Atomic.make [] + + let rec check (exn_bts : Exn_bt.t list) exns = + match exn_bts with + | [] -> () + | [ exn_bt ] -> + Printexc.raise_with_backtrace (Errors (exn_bt :: exns)) exn_bt.bt + | exn_bt :: exn_bts -> check exn_bts (exn_bt :: exns) + + let check t = + match Atomic.get t with + | [] -> () + | [ exn_bt ] -> Exn_bt.raise exn_bt + | exn_bts -> check exn_bts [] + + let rec push t exn_bt backoff = + let before = Atomic.get t in + let after = exn_bt :: before in + if not (Atomic.compare_and_set t before after) then + push t exn_bt (Backoff.once backoff) + + let push t (exn_bt : Exn_bt.t) = + if exn_bt.exn != Terminate then push t exn_bt Backoff.default +end + +let raise_if_canceled () = Fiber.check (Fiber.current ()) +let yield = Fiber.yield +let sleep = Fiber.sleep + +let block () = + match Trigger.await (Trigger.create ()) with + | None -> failwith "impossible" + | Some exn_bt -> Exn_bt.raise exn_bt + +let protect thunk = Fiber.forbid (Fiber.current ()) thunk diff --git a/lib/picos_structured/dune b/lib/picos_structured/dune new file mode 100644 index 00000000..2a4464d3 --- /dev/null +++ b/lib/picos_structured/dune @@ -0,0 +1,18 @@ +(library + (name picos_structured) + (public_name picos.structured) + (libraries picos backoff multicore-magic)) + +(mdx + (enabled_if + (and + (>= %{ocaml_version} 5.0.0) + (= %{env:OPAM_REPO_CI=false} false))) + (libraries + picos.exn_bt + picos.fifos + picos.stdio + picos.structured + picos.sync + unix) + (files picos_structured.mli)) diff --git a/lib/picos_structured/finally.ml b/lib/picos_structured/finally.ml new file mode 100644 index 00000000..cdd452c0 --- /dev/null +++ b/lib/picos_structured/finally.ml @@ -0,0 +1,77 @@ +open Picos + +type 'a finally = ('a -> unit) * (unit -> 'a) + +let[@inline] finally release acquire = (release, acquire) + +(** This function is marked [@inline never] to ensure that there are no + allocations between the [acquire ()] and the [match ... with] nor before + [release]. Allocations here would mean that e.g. pressing Ctrl-C, i.e. + [SIGINT], at the right moment could mean that [release] would not be called + after [acquire]. *) +let[@inline never] ( let@ ) (release, acquire) body = + let x = acquire () in + match body x with + | y -> + release x; + y + | exception exn -> + release x; + raise exn + +type ('a, _) tdt = + | Nothing : ('a, [> `Nothing ]) tdt + | Resource : { + mutable resource : 'a; + release : 'a -> unit; + moved : Trigger.t; + } + -> ('a, [> `Resource ]) tdt + +type 'a moveable = ('a, [ `Nothing | `Resource ]) tdt Atomic.t + +let ( let^ ) (release, acquire) body = + let moveable = Atomic.make Nothing in + let acquire () = + let (Resource r as state : (_, [ `Resource ]) tdt) = + Resource { resource = Obj.magic (); release; moved = Trigger.create () } + in + r.resource <- acquire (); + Atomic.set moveable state; + moveable + in + let release moveable = + match Atomic.get moveable with + | Nothing -> () + | Resource r -> begin + match Trigger.await r.moved with + | None -> () + | Some exn_bt -> begin + match Atomic.exchange moveable Nothing with + | Nothing -> () + | Resource r -> + r.release r.resource; + Exn_bt.raise exn_bt + end + end + in + ( let@ ) (release, acquire) body + +let[@inline never] check_no_resource () = + (* In case of cancelation this is not considered an error as the resource was + (likely) released by the parent. *) + Fiber.check (Fiber.current ()); + invalid_arg "no resource to move" + +let move moveable = + match Atomic.get moveable with + | Nothing -> check_no_resource () + | Resource r -> + let acquire () = + match Atomic.exchange moveable Nothing with + | Nothing -> check_no_resource () + | Resource r -> + Trigger.signal r.moved; + r.resource + in + (r.release, acquire) diff --git a/lib/picos_structured/picos_structured.ml b/lib/picos_structured/picos_structured.ml new file mode 100644 index 00000000..a109abdc --- /dev/null +++ b/lib/picos_structured/picos_structured.ml @@ -0,0 +1,4 @@ +module Finally = Finally +module Control = Control +module Bundle = Bundle +module Run = Run diff --git a/lib/picos_structured/picos_structured.mli b/lib/picos_structured/picos_structured.mli new file mode 100644 index 00000000..c63913aa --- /dev/null +++ b/lib/picos_structured/picos_structured.mli @@ -0,0 +1,279 @@ +(** Basic structured concurrency primitives for {!Picos}. + + This library essentially provides one user level interface for structuring + fibers with any Picos compatible scheduler. This library is both meant to + serve as an example of what can be done and to also provide practical means + for programming with fibers. Hopefully there will be many more libraries + implemented in Picos like this providing different approaches, patterns, and + idioms for structuring concurrent programs. *) + +open Picos + +(** {1 Modules} *) + +module Finally : sig + (** Syntax for avoiding resource leaks. *) + + type 'a finally = ('a -> unit) * (unit -> 'a) + (** A pair of release and acquire functions. *) + + val finally : ('a -> unit) -> (unit -> 'a) -> 'a finally + (** [finally release acquire] is equivalent to [(release, acquire)]. *) + + val ( let@ ) : 'a finally -> ('a -> 'b) -> 'b + (** [let@ resource = finally release acquire in scope] calls [acquire ()] to + obtain a [resource], evaluates [scope], and calls [release resource] + whether [scope] returns normally or raises an exception. + + Here is a sketch of a server that recursively forks a fiber to accept and + handle a client: + + {@ocaml skip[ + Bundle.join_after @@ fun bundle -> + let rec accept () = + let@ client = + finally Unix.close @@ fun () -> + Unix.accept ~cloexec:true socket |> fst + in + (* fork to accept other clients *) + Bundle.fork bundle accept; + (* handle this client... omitted *) + in + Bundle.fork bundle accept + ]} + + There is also a way to {!move} resources to allow forking fibers to handle + clients without leaks. *) + + type 'a moveable + (** A [moveable] either contains a resource or is empty as the resource has + been moved. *) + + val ( let^ ) : 'a finally -> ('a moveable -> 'b) -> 'b + (** [let^ moveable = finally release acquire in scope] calls [acquire ()] to + obtain a resource and stores it as a [moveable] resource. Then, at the + end of [scope], awaits that the resource is {{!move} moved} out of the + [moveable] or [release]s the resource in case of cancelation. *) + + val move : 'a moveable -> 'a finally + (** [move moveable] creates {{!type-finally} a pair of release and acquire + functions} where the acquire operation takes the resource from the + [moveable] and the release operation releases the resource. + + Here is a sketch of a server that accepts in a loop and forks fibers to + handle clients: + + {@ocaml skip[ + Bundle.join_after @@ fun bundle -> + while true do + (* loop to accept clients *) + let^ client = + finally Unix.close @@ fun () -> + Unix.accept ~closexec:true socket |> fst + in + (* fork to handle this client *) + Bundle.fork bundle @@ fun () -> + let@ client = move client in + (* handle client... omitted *) + done + ]} + + Another alternative to avoiding leaks is to {{!let@} recursively fork + fibers to accept and handle a client}. + + @raise Invalid_argument if the resource has already been moved (or + released) unless the fiber has been canceled. *) +end + +module Control : sig + (** Basic control operations and exceptions for structured concurrency. *) + + exception Terminate + (** An exception that is used to signal fibers, typically by canceling them, + that they should terminate by letting the exception propagate. This does + not, by itself, indicate an error. *) + + exception Errors of Exn_bt.t list + (** An exception that can be used to collect exceptions, typically indicating + errors, from multiple fibers. *) + + val raise_if_canceled : unit -> unit + (** [raise_if_canceled ()] checks whether the current fiber has been canceled + and if so raises the exception that the fiber was canceled with. *) + + val yield : unit -> unit + (** [yield ()] asks the current fiber to be rescheduled. *) + + val sleep : seconds:float -> unit + (** [sleep ~seconds] suspends the current fiber for specified number of + seconds. *) + + val protect : (unit -> 'a) -> 'a + (** [protect thunk] forbids cancelation for the duration of [thunk ()]. *) + + val block : unit -> 'a + (** [block ()] suspends the current fiber until it is canceled at which point + the cancelation exception will be raised. + + ⚠️ Beware that [protect block] never returns and you don't want that. *) +end + +module Bundle : sig + (** A dynamic bundle of fibers guaranteed to be joined at the end. *) + + type t + (** Represents a bundle of fibers. *) + + val join_after : (t -> 'a) -> 'a + (** [join_after scope] calls [scope] with a {{!t} bundle}. A call of + [join_after] returns or raises only after [scope] has returned or raised + and all {{!fork} forked} fibers have terminated. If [scope] raises an + exception, {!error} will be called. + + ℹ️ When [scope] returns normally, {!terminate} will not be called + implicitly. *) + + val terminate : ?callstack:int -> t -> unit + (** [terminate bundle] cancels all the {{!fork} forked} fibers using the + {{!Control.Terminate} [Terminate]} exception. + + The optional [callstack] argument specifies the number of callstack + entries to capture with the {{!Control.Terminate} [Terminate]} exception. + The default is [0]. *) + + val error : t -> Exn_bt.t -> unit + (** [error bundle exn_bt] first calls {!terminate} and then adds the exception + with backtrace to the list of exceptions to be raised, unless the + exception is the {{!Control.Terminate} [Terminate]} exception, which is + not considered to signal an error by itself. *) + + val fork : t -> (unit -> unit) -> unit + (** [fork bundle action] spawns a new fiber to the [bundle] that will run the + given [action]. If the action raises an exception, {!error} will be + called with that exception. *) +end + +module Run : sig + (** Operations for running fibers in specific patterns. *) + + val all : (unit -> unit) list -> unit + (** [all actions] starts all the actions as separate fibers and waits until + they all complete. *) + + val any : (unit -> unit) list -> unit + (** [any actions] starts all the actions as separate fibers and waits until at + least one of them completes. The rest of the started fibers will then be + canceled. + + ⚠️ Calling [any []] is equivalent to calling + {{!Control.block} [block ()]}. *) +end + +(** {1 Examples} + + First we open some modules for convenience: + + {[ + open Picos_structured.Finally + open Picos_structured + open Picos_stdio + open Picos_sync + ]} + + {2 A simple echo server and clients} + + Here is an example of a simple TCP echo server and two clients: + + {[ + # Picos_fifos.run ~forbid:false @@ fun () -> + let max_size = 100 in + + (* We let the system pick the port *) + let loopback_0 = Unix.(ADDR_INET (inet_addr_loopback, 0)) in + let server_addr = ref loopback_0 in + let mutex = Mutex.create () in + let condition = Condition.create () in + + let server () = + let@ socket = + finally Unix.close @@ fun () -> + Unix.socket ~cloexec:true PF_INET SOCK_STREAM 0 + in + Unix.set_nonblock socket; + Unix.bind socket loopback_0; + Mutex.protect mutex begin fun () -> + server_addr := Unix.getsockname socket + end; + Condition.signal condition; + Unix.listen socket 8; + + Bundle.join_after begin fun bundle -> + while true do + let^ client = + finally Unix.close @@ fun () -> + Unix.accept ~cloexec:true socket |> fst + in + + (* Fork a new fiber for each client *) + Bundle.fork bundle begin fun () -> + let@ client = move client in + Unix.set_nonblock client; + + let bytes = Bytes.create max_size in + let n = Unix.read client bytes 0 (Bytes.length bytes) in + + Unix.write client bytes 0 n |> ignore + end + done + end + in + + let client () = + let@ socket = + finally Unix.close @@ fun () -> + Unix.socket ~cloexec:true PF_INET SOCK_STREAM 0 + in + Unix.set_nonblock socket; + Unix.connect socket !server_addr; + + let msg = "Hello!" in + Unix.write_substring socket msg 0 (String.length msg) |> ignore; + + let bytes = Bytes.create max_size in + let n = Unix.read socket bytes 0 (Bytes.length bytes) in + + Printf.printf "Received: %s\n%!" (Bytes.sub_string bytes 0 n) + in + + Bundle.join_after begin fun bundle -> + (* Star server *) + Bundle.fork bundle server; + + (* Wait until server addr has been determined *) + Mutex.protect mutex begin fun () -> + while !server_addr == loopback_0 do + Condition.wait condition mutex + done + end; + + (* Start some clients and wait until they are done *) + Bundle.join_after begin fun bundle -> + for _=1 to 5 do + Bundle.fork bundle client + done + end; + + (* Stop server *) + Bundle.terminate bundle + end + Received: Hello! + Received: Hello! + Received: Hello! + Received: Hello! + Received: Hello! + - : unit = () + ]} + + As an exercise, you might want to refactor the server to avoid + {{!Finally.move} moving} the file descriptors and use a {{!Finally.let@} + recursive} accept loop instead. *) diff --git a/lib/picos_structured/run.ml b/lib/picos_structured/run.ml new file mode 100644 index 00000000..3f3b4a56 --- /dev/null +++ b/lib/picos_structured/run.ml @@ -0,0 +1,12 @@ +let all actions = + Bundle.join_after @@ fun bundle -> List.iter (Bundle.fork bundle) actions + +let any actions = + if actions == [] then Control.block () + else + Bundle.join_after @@ fun bundle -> + actions + |> List.iter @@ fun action -> + Bundle.fork bundle @@ fun () -> + action (); + Bundle.terminate bundle diff --git a/lib/picos_sync/dune b/lib/picos_sync/dune index 863593ea..79ab72f3 100644 --- a/lib/picos_sync/dune +++ b/lib/picos_sync/dune @@ -6,5 +6,5 @@ (mdx (enabled_if (>= %{ocaml_version} 5.0.0)) - (libraries picos picos.exn_bt picos.sync picos.fifos) + (libraries picos.exn_bt picos.fifos picos.structured picos.sync) (files picos_sync.mli)) diff --git a/lib/picos_sync/picos_sync.mli b/lib/picos_sync/picos_sync.mli index 9837f765..da720524 100644 --- a/lib/picos_sync/picos_sync.mli +++ b/lib/picos_sync/picos_sync.mli @@ -159,10 +159,10 @@ end (** {1 Examples} - For convenience, we first open the {!Picos} and {!Picos_sync} modules: + First we open some modules for convenience: {[ - open Picos + open Picos_structured open Picos_sync ]} @@ -230,30 +230,28 @@ end let bq = Bounded_queue.create ~capacity:3 () in - let consumer = Computation.create () in - Fiber.spawn ~forbid:false consumer [ fun () -> - try + Bundle.join_after begin fun bundle -> + Bundle.fork bundle begin fun () -> while true do Printf.printf "Popped %d\n%!" (Bounded_queue.pop bq) done - with Exit -> () ]; + end; - for i=1 to 5 do - Printf.printf "Pushing %d\n%!" i; - Bounded_queue.push bq i - done; + for i=1 to 5 do + Printf.printf "Pushing %d\n%!" i; + Bounded_queue.push bq i + done; - Printf.printf "All done?\n%!"; + Printf.printf "All done?\n%!"; - Fiber.yield (); + Control.yield (); - Computation.cancel consumer (Exn_bt.get_callstack 0 Exit); + Bundle.terminate bundle + end; Printf.printf "Pushing %d\n%!" 101; Bounded_queue.push bq 101; - Fiber.yield (); - Printf.printf "Popped %d\n%!" (Bounded_queue.pop bq) Pushing 1 Pushing 2 diff --git a/lib/picos_threaded/picos_threaded.ml b/lib/picos_threaded/picos_threaded.ml index fd636dc7..a9fa5b84 100644 --- a/lib/picos_threaded/picos_threaded.ml +++ b/lib/picos_threaded/picos_threaded.ml @@ -2,8 +2,8 @@ open Picos type t = { fiber : Fiber.t; mutex : Mutex.t; condition : Condition.t } -let create ~forbid computation = - let fiber = Fiber.create ~forbid computation in +let create_packed ~forbid packed = + let fiber = Fiber.create_packed ~forbid packed in let mutex = Mutex.create () in let condition = Condition.create () in { fiber; mutex; condition } @@ -70,13 +70,14 @@ and cancel_after : type a. _ -> a Computation.t -> _ = and spawn : type a. _ -> forbid:bool -> a Computation.t -> _ = fun t ~forbid computation mains -> Fiber.check t.fiber; + let packed = Computation.Packed computation in mains |> List.iter @@ fun main -> Thread.create (fun () -> (* We need to (recursively) install the handler on each new thread that we create. *) - Handler.using handler (create ~forbid computation) main) + Handler.using handler (create_packed ~forbid packed) main) () |> ignore @@ -84,4 +85,5 @@ and handler = Handler.{ current; spawn; yield; cancel_after; await } let run ~forbid main = Select.check_configured (); - Handler.using handler (create ~forbid (Computation.create ())) main + let packed = Computation.Packed (Computation.create ()) in + Handler.using handler (create_packed ~forbid packed) main diff --git a/test/dune b/test/dune index d2c2452f..a2182ab5 100644 --- a/test/dune +++ b/test/dune @@ -16,7 +16,7 @@ (modules test_picos) (libraries test_scheduler - foundation + picos.structured alcotest unix threads.posix @@ -57,7 +57,7 @@ (test (name test_server_and_client) (modules test_server_and_client) - (libraries test_scheduler picos.stdio elements foundation)) + (libraries test_scheduler picos.stdio picos.structured picos.sync)) ;; @@ -85,7 +85,12 @@ (test (name test_select) (modules test_select) - (libraries foundation picos.threaded picos.select alcotest domain_shims)) + (libraries + picos.structured + picos.threaded + picos.select + alcotest + domain_shims)) ;; @@ -127,12 +132,13 @@ (mdx (libraries - foundation picos picos.exn_bt picos.htbl picos.stdio + picos.structured + picos.sync picos.threaded test_scheduler unix) - (files test_htbl.md test_scheduler.md test_stdio.md)) + (files test_htbl.md test_scheduler.md test_stdio.md test_structured.md)) diff --git a/test/lib/elements/bundle.ml b/test/lib/elements/bundle.ml deleted file mode 100644 index 51a07bda..00000000 --- a/test/lib/elements/bundle.ml +++ /dev/null @@ -1,62 +0,0 @@ -open Picos - -(* TODO: propagation of all exceptions? *) -(* TODO: cancelation forbidden? *) - -type t = { num_fibers : int Atomic.t; computation : unit Computation.t } - -let decr t = - let n = Atomic.fetch_and_add t.num_fibers (-1) in - if n = 1 then Computation.finish t.computation - -let run fn = - let num_fibers = Atomic.make 1 in - let computation = Computation.create () in - let t = { num_fibers; computation } in - let fiber = Fiber.current () in - let (Packed fiber_computation) = Fiber.computation fiber in - let canceler = - Computation.canceler ~from:fiber_computation ~into:t.computation - in - if Computation.try_attach fiber_computation canceler then begin - match fn t with - | value -> - decr t; - Computation.await t.computation; - Computation.detach fiber_computation canceler; - value - | exception exn -> - let exn_bt = Exn_bt.get exn in - Computation.cancel t.computation exn_bt; - Computation.detach fiber_computation canceler; - Exn_bt.raise exn_bt - end - else - match Computation.canceled fiber_computation with - | None -> failwith "Bundle: fiber already finished" - | Some exn_bt -> Exn_bt.raise exn_bt - -let fork t thunk = - let child_computation = Computation.create () in - let canceler = - Computation.canceler ~from:t.computation ~into:child_computation - in - if Computation.try_attach t.computation canceler then begin - Atomic.incr t.num_fibers; - match - let main () = - Computation.capture child_computation thunk (); - Computation.detach t.computation canceler; - decr t - in - Fiber.spawn ~forbid:false child_computation [ main ] - with - | () -> Promise.of_computation child_computation - | exception canceled_exn -> - decr t; - raise canceled_exn - end - else - match Computation.canceled t.computation with - | None -> invalid_arg "Bundle: bundle already finished" - | Some exn_bt -> Exn_bt.raise exn_bt diff --git a/test/lib/elements/bundle.mli b/test/lib/elements/bundle.mli deleted file mode 100644 index 80be67c7..00000000 --- a/test/lib/elements/bundle.mli +++ /dev/null @@ -1,10 +0,0 @@ -(** An implementation of structured concurrency for Picos *) - -type t -(** *) - -val run : (t -> 'a) -> 'a -(** *) - -val fork : t -> (unit -> 'a) -> 'a Promise.t -(** *) diff --git a/test/lib/foundation/dune b/test/lib/foundation/dune deleted file mode 100644 index 4d21ece7..00000000 --- a/test/lib/foundation/dune +++ /dev/null @@ -1,2 +0,0 @@ -(library - (name foundation)) diff --git a/test/lib/foundation/finally.ml b/test/lib/foundation/finally.ml deleted file mode 100644 index f40cd259..00000000 --- a/test/lib/foundation/finally.ml +++ /dev/null @@ -1,11 +0,0 @@ -let finally release acquire = (release, acquire) - -let[@inline never] ( let@ ) (release, acquire) body = - let x = acquire () in - match body x with - | y -> - release x; - y - | exception exn -> - release x; - raise exn diff --git a/test/test_picos.ml b/test/test_picos.ml index 92e3b941..aa92d7c6 100644 --- a/test/test_picos.ml +++ b/test/test_picos.ml @@ -1,5 +1,5 @@ open Picos -open Foundation.Finally +open Picos_structured.Finally let run_in_fiber main = let computation = Computation.create () in diff --git a/test/test_select.ml b/test/test_select.ml index 837653ab..31ce6689 100644 --- a/test/test_select.ml +++ b/test/test_select.ml @@ -1,4 +1,4 @@ -open Foundation.Finally +open Picos_structured.Finally open Picos let () = Picos_select.configure () diff --git a/test/test_server_and_client.ml b/test/test_server_and_client.ml index c2ef493a..54291fd0 100644 --- a/test/test_server_and_client.ml +++ b/test/test_server_and_client.ml @@ -1,9 +1,11 @@ -open Foundation.Finally -open Elements +open Picos_structured.Finally +open Picos_structured open Picos_stdio +open Picos_sync +let () = Random.self_init () let is_ocaml4 = String.starts_with ~prefix:"4." Sys.ocaml_version -let use_nonblock = Sys.win32 || Random.bool (Random.self_init ()) +let use_nonblock = Sys.win32 || Random.bool () let set_nonblock fd = if use_nonblock then @@ -16,66 +18,108 @@ let is_opam_ci = | exception Not_found -> false let main () = - Bundle.run @@ fun bundle -> let n = 100 in - let server_addr = ref None in - let server = - Bundle.fork bundle @@ fun () -> - Printf.printf " Server running\n%!"; - let@ client = + let loopback_0 = Unix.(ADDR_INET (inet_addr_loopback, 0)) in + let server_addr = ref loopback_0 in + let mutex = Mutex.create () in + let condition = Condition.create () in + + let server_looping () = + Printf.printf " Looping server running\n%!"; + let@ socket = finally Unix.close @@ fun () -> - let@ socket = - finally Unix.close @@ fun () -> - Unix.socket ~cloexec:true PF_INET SOCK_STREAM 0 - in - set_nonblock socket; - match Unix.bind socket Unix.(ADDR_INET (inet_addr_loopback, 0)) with - | () -> - server_addr := Some (Unix.getsockname socket); - Unix.listen socket 1; - Printf.printf " Server listening\n%!"; - Unix.accept ~cloexec:true socket |> fst - | exception Unix.Unix_error (EPERM, _, _) when is_opam_ci -> raise Exit + Unix.socket ~cloexec:true PF_INET SOCK_STREAM 0 in - set_nonblock client; - let bytes = Bytes.create n in - let n = Unix.read client bytes 0 (Bytes.length bytes) in - Printf.printf " Server read %d\n%!" n; - let n = Unix.write client bytes 0 (n / 2) in - Printf.printf " Server wrote %d\n%!" n + set_nonblock socket; + match Unix.bind socket loopback_0 with + | () -> + Mutex.protect mutex (fun () -> server_addr := Unix.getsockname socket); + Condition.signal condition; + Unix.listen socket 8; + Printf.printf " Server listening\n%!"; + Bundle.join_after @@ fun bundle -> + while true do + let^ client = + finally Unix.close @@ fun () -> + Printf.printf " Server accepting\n%!"; + Unix.accept ~cloexec:true socket |> fst + in + Printf.printf " Server accepted client\n%!"; + + Bundle.fork bundle @@ fun () -> + let@ client = move client in + set_nonblock client; + let bytes = Bytes.create n in + let n = Unix.read client bytes 0 (Bytes.length bytes) in + Printf.printf " Server read %d\n%!" n; + let n = Unix.write client bytes 0 (n / 2) in + Printf.printf " Server wrote %d\n%!" n + done + | exception Unix.Unix_error (EPERM, _, _) when is_opam_ci -> raise Exit in - let client = - Bundle.fork bundle @@ fun () -> - Printf.printf " Client running\n%!"; + let server_recursive () = + Printf.printf " Recursive server running\n%!"; let@ socket = finally Unix.close @@ fun () -> Unix.socket ~cloexec:true PF_INET SOCK_STREAM 0 in set_nonblock socket; - let server_addr = - let rec loop retries = - match !server_addr with - | None -> - if retries < 0 then - if is_opam_ci then raise Exit else failwith "No server address"; - Unix.sleepf 0.01; - loop (retries - 1) - | Some addr -> addr - in - loop 100 + match Unix.bind socket Unix.(ADDR_INET (inet_addr_loopback, 0)) with + | () -> + Mutex.protect mutex (fun () -> server_addr := Unix.getsockname socket); + Condition.signal condition; + Unix.listen socket 8; + Printf.printf " Server listening\n%!"; + Bundle.join_after @@ fun bundle -> + let rec accept () = + let@ client = + finally Unix.close @@ fun () -> + Printf.printf " Server accepting\n%!"; + Unix.accept ~cloexec:true socket |> fst + in + Printf.printf " Server accepted client\n%!"; + Bundle.fork bundle accept; + set_nonblock client; + let bytes = Bytes.create n in + let n = Unix.read client bytes 0 (Bytes.length bytes) in + Printf.printf " Server read %d\n%!" n; + let n = Unix.write client bytes 0 (n / 2) in + Printf.printf " Server wrote %d\n%!" n + in + Bundle.fork bundle accept + | exception Unix.Unix_error (EPERM, _, _) when is_opam_ci -> raise Exit + in + + let server = if Random.bool () then server_looping else server_recursive in + + let client id () = + Printf.printf " Client %s running\n%!" id; + let@ socket = + finally Unix.close @@ fun () -> + Unix.socket ~cloexec:true PF_INET SOCK_STREAM 0 in - Unix.connect socket server_addr; - Printf.printf " Client connected\n%!"; + set_nonblock socket; + Unix.connect socket !server_addr; + Printf.printf " Client %s connected\n%!" id; let bytes = Bytes.create n in let n = Unix.write socket bytes 0 (Bytes.length bytes) in - Printf.printf " Client wrote %d\n%!" n; + Printf.printf " Client %s wrote %d\n%!" id n; let n = Unix.read socket bytes 0 (Bytes.length bytes) in - Printf.printf " Client read %d\n%!" n + Printf.printf " Client %s read %d\n%!" id n in - Promise.await (Promise.both server client); + Bundle.join_after @@ fun bundle -> + Bundle.fork bundle server; + begin + Mutex.protect mutex @@ fun () -> + while !server_addr == loopback_0 do + Condition.wait condition mutex + done + end; + Run.all [ client "A"; client "B" ]; + Bundle.terminate bundle; Printf.printf "Server and Client test: OK\n%!" diff --git a/test/test_stdio.md b/test/test_stdio.md index 7a7a624c..fee71000 100644 --- a/test/test_stdio.md +++ b/test/test_stdio.md @@ -1,8 +1,10 @@ # `Picos_stdio` ```ocaml -open Foundation.Finally open Picos +open Picos_structured.Finally +open Picos_structured +open Picos_sync open Picos_stdio ``` @@ -54,18 +56,14 @@ test_stdio.md ```ocaml # Test_scheduler.run @@ fun () -> - let computation = Computation.create () in - let exited = Trigger.create () in - Fiber.spawn ~forbid:false computation [ fun () -> - match Unix.select [] [] [] (-1.0) with - | _ -> Printf.printf "Impossible\n%!" - | exception Exit -> - Trigger.signal exited ]; + Bundle.join_after @@ fun bundle -> + Bundle.fork bundle begin fun () -> + let _ = Unix.select [] [] [] (-1.0) in + Printf.printf "Impossible\n%!" + end; Unix.sleepf 0.01; - assert (Trigger.is_initial exited); - Computation.cancel computation (Exn_bt.get_callstack 0 Exit); - Trigger.await exited -- : Picos_exn_bt.t option = None + Bundle.terminate bundle +- : unit = () ``` ```ocaml @@ -91,29 +89,27 @@ test_stdio.md Unix.set_nonblock syn_inn; Unix.set_nonblock syn_out; - let consumer = Computation.create () in - let finished = Trigger.create () in - Fiber.spawn ~forbid:false consumer [ fun () -> - try - while true do - match Unix.select [ msg_inn1; msg_inn2 ] [] [] 0.1 with - | inns, _, _ -> - if List.exists ((==) msg_inn1) inns then begin - Printf.printf "Inn1\n%!"; - assert (1 = Unix.read msg_inn1 (Bytes.create 1) 0 1); - assert (1 = Unix.write_substring syn_out "!" 0 1) - end; - if List.exists ((==) msg_inn2) inns then begin - Printf.printf "Inn2\n%!"; - assert (1 = Unix.read msg_inn2 (Bytes.create 1) 0 1); - assert (1 = Unix.write_substring syn_out "!" 0 1) - end; - if [] == inns then begin - Printf.printf "Timeout\n%!"; - assert (1 = Unix.write_substring syn_out "!" 0 1) - end - done - with Exit -> Trigger.signal finished ]; + Bundle.join_after @@ fun bundle -> + Bundle.fork bundle begin fun () -> + while true do + match Unix.select [ msg_inn1; msg_inn2 ] [] [] 0.1 with + | inns, _, _ -> + if List.exists ((==) msg_inn1) inns then begin + Printf.printf "Inn1\n%!"; + assert (1 = Unix.read msg_inn1 (Bytes.create 1) 0 1); + assert (1 = Unix.write_substring syn_out "!" 0 1) + end; + if List.exists ((==) msg_inn2) inns then begin + Printf.printf "Inn2\n%!"; + assert (1 = Unix.read msg_inn2 (Bytes.create 1) 0 1); + assert (1 = Unix.write_substring syn_out "!" 0 1) + end; + if [] == inns then begin + Printf.printf "Timeout\n%!"; + assert (1 = Unix.write_substring syn_out "!" 0 1) + end + done + end; assert (1 = Unix.write_substring msg_out1 "!" 0 1); assert (1 = Unix.write_substring msg_out2 "!" 0 1); @@ -122,10 +118,9 @@ test_stdio.md assert (1 = Unix.read syn_inn (Bytes.create 1) 0 1); - Computation.cancel consumer (Exn_bt.get_callstack 0 Exit); - Trigger.await finished + Bundle.terminate bundle Inn1 Inn2 Timeout -- : Picos_exn_bt.t option = None +- : unit = () ``` diff --git a/test/test_structured.md b/test/test_structured.md new file mode 100644 index 00000000..e12ceb99 --- /dev/null +++ b/test/test_structured.md @@ -0,0 +1,156 @@ +# `Picos_structured` + +```ocaml +open Picos_structured.Finally +open Picos_structured +open Picos_sync +``` + +## Helper to check that computation is restored + +```ocaml +let check join_after scope = + let open Picos in + let fiber = Fiber.current () in + let before = Fiber.get_computation fiber in + let finally () = + let after = Fiber.get_computation fiber in + assert (before == after) + in + Fun.protect ~finally @@ fun () -> + join_after @@ fun bundle -> + let during = Fiber.get_computation fiber in + assert (before != during); + scope bundle +``` + +## Fork after terminate raises + +```ocaml +# Test_scheduler.run @@ fun () -> + check Bundle.join_after @@ fun bundle -> + Bundle.terminate bundle; + Bundle.fork bundle (fun () -> Printf.printf "Hello!\n%!") +Exception: Picos_structured__Control.Terminate. +``` + +```ocaml +# Test_scheduler.run @@ fun () -> + let escape = ref (Obj.magic ()) in + check Bundle.join_after begin fun bundle -> + escape := bundle; + end; + Bundle.fork !escape (fun () -> Printf.printf "Hello!\n%!") +Exception: Invalid_argument "already completed". +``` + +## Exception in child terminates bundle + +```ocaml +# Test_scheduler.run @@ fun () -> + let mutex = Mutex.create () in + let condition = Condition.create () in + let blocked = ref false in + check Bundle.join_after @@ fun bundle -> + Bundle.fork bundle begin fun () -> + Mutex.protect mutex begin fun () -> + while not !blocked do + Condition.wait condition mutex + done + end; + raise Exit + end; + Mutex.protect mutex begin fun () -> + blocked := true; + Printf.printf "Blocked\n%!"; + while true do + Condition.wait condition mutex + done + end +Blocked +Exception: Stdlib.Exit. +``` + +## Termination (or cancelation) nests + +```ocaml +# Test_scheduler.run @@ fun () -> + let mutex = Mutex.create () in + let condition = Condition.create () in + let blocked = ref false in + check Bundle.join_after begin fun bundle -> + Bundle.fork bundle begin fun () -> + check Bundle.join_after begin fun bundle -> + Bundle.fork bundle begin fun () -> + Mutex.protect mutex begin fun () -> + blocked := true + end; + Printf.printf "Blocked\n%!"; + Condition.signal condition; + while true do + Control.sleep ~seconds:1.0; + done + end + end + end; + + Mutex.protect mutex begin fun () -> + while not !blocked do + Condition.wait condition mutex + done + end; + + Bundle.terminate bundle + end +Blocked +- : unit = () +``` + +## Cancelation also waits for children + +```ocaml +# Test_scheduler.run @@ fun () -> + let blocked = ref false in + let slept = ref false in + check Bundle.join_after begin fun bundle -> + Bundle.fork bundle begin fun () -> + check Bundle.join_after begin fun bundle -> + Bundle.fork bundle begin fun () -> + try + blocked := true; + Control.block () + with exn -> + Control.protect begin fun () -> + Printf.printf "Sleeping\n%!"; + Control.sleep ~seconds:0.2; + Printf.printf "Woke up\n%!"; + slept := true; + end; + raise exn + end + end; + end; + while not !blocked do + Control.sleep ~seconds:0.01 + done; + Bundle.terminate bundle + end; + !slept +Sleeping +Woke up +- : bool = true +``` + +## Racing + +```ocaml +# Test_scheduler.run @@ fun () -> + let winner = ref 0 in + Run.any [ + (fun () -> Control.sleep ~seconds:0.3; winner := 3 ); + (fun () -> Control.sleep ~seconds:0.2; winner := 2 ); + (fun () -> Control.sleep ~seconds:0.1; winner := 1 ); + ]; + !winner +- : int = 1 +``` diff --git a/test/test_sync.ml b/test/test_sync.ml index 920e7d7f..39e4f36c 100644 --- a/test/test_sync.ml +++ b/test/test_sync.ml @@ -81,7 +81,7 @@ let test_mutex_and_condition_cancelation () = in let attempt i finished ?checked () = - computations.(i) <- Fiber.computation (Fiber.current ()); + computations.(i) <- Fiber.get_computation (Fiber.current ()); match Atomic.incr step_1; Domain.cpu_relax ();