diff --git a/README.md b/README.md index e43a5bc7..2db151d9 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,7 @@ is distributed under the [ISC license](LICENSE.md). - [A transactional lock-free queue](#a-transactional-lock-free-queue) - [Composing transactions](#composing-transactions) - [Blocking transactions](#blocking-transactions) + - [Timeouts](#timeouts) - [A transactional lock-free leftist heap](#a-transactional-lock-free-leftist-heap) - [Programming with transactional data structures](#programming-with-transactional-data-structures) - [The dining philosophers problem](#the-dining-philosophers-problem) @@ -76,6 +77,12 @@ is distributed under the [ISC license](LICENSE.md). To use the library + + ```ocaml # #require "kcas" # open Kcas @@ -562,6 +569,74 @@ The retry mechanism essentially allows a transaction to wait for an arbitrary condition and can function as a fairly expressive communication and synchronization mechanism. +#### Timeouts + +> If you block, will they come? + +That is a good question. Blocking indefinitely is often not acceptable. + +A blocked transaction can be waken up by a write to any shared memory location +that was accessed by the transaction. This means that given a suitable timeout +mechanism one could e.g. setup a timeout that writes to a boolean shared memory +location that is accessed by a blocking transaction: + +```ocaml +# let pop_or_raise_if ~xt timeout stack = + (* Check if timeout has expired: *) + if Xt.get ~xt timeout then raise Exit; + pop stack +val pop_or_raise_if : + xt:'a Xt.t -> bool Loc.t -> 'b list Loc.t -> xt:'c Xt.t -> 'b = +``` + +This works, but creating, checking, and canceling timeouts properly can be a lot +of work. Therefore **kcas** also directly supports an optional `timeoutf` +argument for potentially blocking operations. For example, to perform a blocking +pop with a timeout, one can simply explicitly pass the desired timeout in +seconds: + +```ocaml +# let an_empty_stack = stack () in + Xt.commit ~timeoutf:0.1 { tx = pop an_empty_stack } +Exception: Failure "Domain_local_timeout.set_timeoutf not implemented". +``` + +Oops! What happened above is that the +[_domain local timeout_](https://github.com/ocaml-multicore/domain-local-timeout) +mechanism used by **kcas** was not implemented on the current domain. The idea +is that, in the future, concurrent schedulers provide the mechanism out of the +box, but there is also a default implementation using the Stdlib `Thread` and +`Unix` modules that works on most platforms. However, to avoid direct +dependencies to `Thread` and `Unix`, we need to explicitly tell the library that +it can use those modules: + +```ocaml +# Domain_local_timeout.set_system (module Thread) (module Unix) +- : unit = () +``` + +This initialization, if needed, should be done by application code rather than +by libraries. + +If we now retry the previous example we will get a +[`Timeout`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Timeout/index.html#exception-Timeout) +exception as expected: + +```ocaml +# let an_empty_stack = stack () in + Xt.commit ~timeoutf:0.1 { tx = pop an_empty_stack } +Exception: Kcas.Timeout.Timeout. +``` + +Besides +[`commit`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html#val-commit), +potentially blocking single location operations such as +[`get_as`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html#val-get_as), +[`update`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html#val-update), +and +[`modify`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html#val-modify) +support the optional `timeoutf` argument. + #### A transactional lock-free leftist heap Let's implement something a bit more complicated, diff --git a/dune-project b/dune-project index 7b157bf7..30695d86 100644 --- a/dune-project +++ b/dune-project @@ -11,6 +11,7 @@ (depends (ocaml (>= 5.0)) (domain-local-await (>= 0.2.0)) + (domain-local-timeout (>= 0.1.0)) (alcotest (and (>= 1.7.0) :with-test)) (mdx (and (>= 1.10.0) :with-test)))) (package (name kcas_data) diff --git a/kcas.opam b/kcas.opam index c31dd25a..8a5bf796 100644 --- a/kcas.opam +++ b/kcas.opam @@ -11,6 +11,7 @@ depends: [ "dune" {>= "3.3"} "ocaml" {>= "5.0"} "domain-local-await" {>= "0.2.0"} + "domain-local-timeout" {>= "0.1.0"} "alcotest" {>= "1.7.0" & with-test} "mdx" {>= "1.10.0" & with-test} "odoc" {with-doc} diff --git a/src/kcas/dune b/src/kcas/dune index ac7491b9..8c8e969e 100644 --- a/src/kcas/dune +++ b/src/kcas/dune @@ -1,4 +1,4 @@ (library (name kcas) (public_name kcas) - (libraries domain-local-await)) + (libraries domain-local-await domain-local-timeout)) diff --git a/src/kcas/kcas.ml b/src/kcas/kcas.ml index 5e9ea5e2..19c94fcc 100644 --- a/src/kcas/kcas.ml +++ b/src/kcas/kcas.ml @@ -19,6 +19,78 @@ let fenceless_set = Atomic.set module Backoff = Backoff +module Timeout = struct + exception Timeout + + let timeout () = raise Timeout [@@inline never] + + type t = Unset | Elapsed | Call of (unit -> unit) + + let unset = Atomic.make Unset + let check state = if fenceless_get state == Elapsed then timeout () [@@inline] + + let alloc_opt = function + | None -> unset + | Some seconds -> + let state = Atomic.make Unset in + let cancel = + Domain_local_timeout.set_timeoutf seconds @@ fun () -> + match Atomic.exchange state Elapsed with + | Call release_or_cancel -> release_or_cancel () + | Unset | Elapsed -> () + in + fenceless_set state @@ Call cancel; + state + [@@inline never] + + let alloc_opt seconds = if seconds == None then unset else alloc_opt seconds + [@@inline] + + let set_opt state = function + | None -> () + | Some seconds -> + let cancel = + Domain_local_timeout.set_timeoutf seconds @@ fun () -> + match Atomic.exchange state Elapsed with + | Call release_or_cancel -> release_or_cancel () + | Unset | Elapsed -> () + in + fenceless_set state @@ Call cancel + [@@inline never] + + let set_opt state seconds = if seconds != None then set_opt state seconds + [@@inline] + + let await state release = + match fenceless_get state with + | Call _ as alive -> + if Atomic.compare_and_set state alive (Call release) then alive + else timeout () + | Unset | Elapsed -> timeout () + [@@inline never] + + let await state release = + let alive = fenceless_get state in + if alive == Unset then Unset else await state release + [@@inline] + + let unawait state alive = + match fenceless_get state with + | Call _ as await -> + if not (Atomic.compare_and_set state await alive) then timeout () + | Unset | Elapsed -> timeout () + [@@inline never] + + let unawait state alive = if alive != Unset then unawait state alive + [@@inline] + + let cancel alive = + match alive with Call cancel -> cancel () | Unset | Elapsed -> () + [@@inline never] + + let cancel alive = if alive != Unset then cancel alive [@@inline] +end + module Id = struct let neg_id = Atomic.make (-1) let neg_ids n = Atomic.fetch_and_add neg_id (-n) [@@inline] @@ -33,7 +105,7 @@ module Action : sig val noop : t val append : (unit -> unit) -> t -> t - val run : t -> 'a -> 'a + val run : t -> unit end = struct type t = unit -> unit @@ -42,10 +114,7 @@ end = struct let append action t = if t == noop then action else fun x -> action (t x) [@@inline] - let run t x = - t (); - x - [@@inline] + let run t = t () [@@inline] end type awaiter = unit -> unit @@ -345,45 +414,60 @@ let rec remove_awaiter loc before awaiter = if not (Atomic.compare_and_set (as_atomic loc) state_old state_new) then remove_awaiter loc before awaiter -let block loc before = +let block timeout loc before = let t = Domain_local_await.prepare_for_await () in + let alive = Timeout.await timeout t.release in if add_awaiter loc before t.release then ( try t.await () with cancellation_exn -> let backtrace = Printexc.get_raw_backtrace () in remove_awaiter loc before t.release; - Printexc.raise_with_backtrace cancellation_exn backtrace) + Timeout.cancel alive; + Printexc.raise_with_backtrace cancellation_exn backtrace); + Timeout.unawait timeout alive -let rec update_no_alloc backoff loc state f = +let rec update_no_alloc timeout backoff loc state f = (* Fenceless is safe as we have had a fence before if needed and there is a fence after. *) let state_old = fenceless_get (as_atomic loc) in let before = eval state_old in match f before with | after -> state.after <- after; - if before == after then before + if before == after then ( + Timeout.cancel (fenceless_get timeout); + before) else if Atomic.compare_and_set (as_atomic loc) state_old state then ( + Timeout.cancel (fenceless_get timeout); state.before <- after; resume_awaiters before state_old.awaiters) - else update_no_alloc (Backoff.once backoff) loc state f + else update_no_alloc timeout (Backoff.once backoff) loc state f | exception Retry.Later -> - block loc before; - update_no_alloc backoff loc state f + block timeout loc before; + update_no_alloc timeout backoff loc state f + | exception exn -> + Timeout.cancel (fenceless_get timeout); + raise exn -let update_with_state backoff loc f state_old = +let update_with_state timeout backoff loc f state_old = let before = eval state_old in match f before with | after -> - if before == after then before + if before == after then ( + Timeout.cancel (fenceless_get timeout); + before) else let state = new_state after in - if Atomic.compare_and_set (as_atomic loc) state_old state then - resume_awaiters before state_old.awaiters - else update_no_alloc (Backoff.once backoff) loc state f + if Atomic.compare_and_set (as_atomic loc) state_old state then ( + Timeout.cancel (fenceless_get timeout); + resume_awaiters before state_old.awaiters) + else update_no_alloc timeout (Backoff.once backoff) loc state f | exception Retry.Later -> let state = new_state before in - block loc before; - update_no_alloc backoff loc state f + block timeout loc before; + update_no_alloc timeout backoff loc state f + | exception exn -> + Timeout.cancel (fenceless_get timeout); + raise exn let rec exchange_no_alloc backoff loc state = let state_old = Atomic.get (as_atomic loc) in @@ -442,13 +526,22 @@ module Loc = struct let get_id loc = loc.id [@@inline] let get loc = eval (Atomic.get (as_atomic loc)) - let rec get_as f loc = - let before = get loc in + let rec get_as timeout f loc state = + let before = eval state in match f before with - | value -> value + | value -> + Timeout.cancel (fenceless_get timeout); + value | exception Retry.Later -> - block loc before; - get_as f loc + block timeout loc before; + get_as timeout f loc (fenceless_get (as_atomic loc)) + | exception exn -> + Timeout.cancel (fenceless_get timeout); + raise exn + + let get_as ?timeoutf f loc = + get_as (Timeout.alloc_opt timeoutf) f loc (Atomic.get (as_atomic loc)) + [@@inline] let get_mode loc = if loc.id < 0 then Mode.lock_free else Mode.obstruction_free @@ -459,17 +552,21 @@ module Loc = struct let state_old = Atomic.get (as_atomic loc) in cas_with_state loc before state state_old - let fenceless_update ?(backoff = Backoff.default) loc f = - update_with_state backoff loc f (fenceless_get (as_atomic loc)) + let fenceless_update ?timeoutf ?(backoff = Backoff.default) loc f = + let timeout = Timeout.alloc_opt timeoutf in + update_with_state timeout backoff loc f (fenceless_get (as_atomic loc)) - let fenceless_modify ?backoff loc f = - fenceless_update ?backoff loc f |> ignore + let fenceless_modify ?timeoutf ?backoff loc f = + fenceless_update ?timeoutf ?backoff loc f |> ignore [@@inline] - let update ?(backoff = Backoff.default) loc f = - update_with_state backoff loc f (Atomic.get (as_atomic loc)) + let update ?timeoutf ?(backoff = Backoff.default) loc f = + let timeout = Timeout.alloc_opt timeoutf in + update_with_state timeout backoff loc f (Atomic.get (as_atomic loc)) - let modify ?backoff loc f = update ?backoff loc f |> ignore [@@inline] + let modify ?timeoutf ?backoff loc f = + update ?timeoutf ?backoff loc f |> ignore + [@@inline] let exchange ?(backoff = Backoff.default) loc value = exchange_no_alloc backoff loc (new_state value) @@ -557,12 +654,15 @@ end module Xt = struct type 'x t = { + mutable _timeout : Timeout.t; mutable casn : casn; mutable cass : cass; mutable validate_counter : int; mutable post_commit : Action.t; } + external timeout_as_atomic : 'x t -> Timeout.t Atomic.t = "%identity" + let validate_one casn loc state = let before = if is_cmp casn state then eval state else state.before in (* Fenceless is safe inside transactions as each log update has a fence. *) @@ -581,7 +681,9 @@ module Xt = struct let c1 = c0 + 1 in xt.validate_counter <- c1; (* Validate whenever counter reaches next power of 2. *) - if c0 land c1 = 0 then validate_all xt.casn xt.cass + if c0 land c1 = 0 then ( + Timeout.check (timeout_as_atomic xt); + validate_all xt.casn xt.cass) [@@inline] let update0 loc f xt lt gt = @@ -788,50 +890,73 @@ module Xt = struct match tx ~xt with | result -> ( match xt.cass with - | NIL -> Action.run xt.post_commit result + | NIL -> + Timeout.cancel (fenceless_get (timeout_as_atomic xt)); + Action.run xt.post_commit; + result | CASN { loc; state; lt = NIL; gt = NIL; _ } -> - if is_cmp xt.casn state then Action.run xt.post_commit result + if is_cmp xt.casn state then ( + Timeout.cancel (fenceless_get (timeout_as_atomic xt)); + Action.run xt.post_commit; + result) else let before = state.before in state.before <- state.after; state.casn <- casn_after; (* Fenceless is safe inside transactions as each log update has a fence. *) let state_old = fenceless_get (as_atomic loc) in - if cas_with_state loc before state state_old then - Action.run xt.post_commit result + if cas_with_state loc before state state_old then ( + Timeout.cancel (fenceless_get (timeout_as_atomic xt)); + Action.run xt.post_commit; + result) else commit (Backoff.once backoff) mode (reset_quick xt) tx | cass -> ( match determine_for_owner xt.casn cass with - | true -> Action.run xt.post_commit result + | true -> + Timeout.cancel (fenceless_get (timeout_as_atomic xt)); + Action.run xt.post_commit; + result | false -> commit (Backoff.once backoff) mode (reset mode xt) tx | exception Mode.Interference -> commit (Backoff.once backoff) Mode.lock_free (reset Mode.lock_free xt) tx)) | exception Retry.Invalid -> + Timeout.check (timeout_as_atomic xt); commit (Backoff.once backoff) mode (reset_quick xt) tx | exception Retry.Later -> ( if xt.cass == NIL then invalid_retry (); let t = Domain_local_await.prepare_for_await () in + let alive = Timeout.await (timeout_as_atomic xt) t.release in match add_awaiters t.release xt.casn xt.cass with | NIL -> ( match t.await () with | () -> remove_awaiters t.release xt.casn NIL xt.cass; + Timeout.unawait (timeout_as_atomic xt) alive; commit (Backoff.reset backoff) mode (reset_quick xt) tx | exception cancellation_exn -> let backtrace = Printexc.get_raw_backtrace () in remove_awaiters t.release xt.casn NIL xt.cass; + Timeout.cancel alive; Printexc.raise_with_backtrace cancellation_exn backtrace) | CASN _ as stop -> remove_awaiters t.release xt.casn stop xt.cass; + Timeout.unawait (timeout_as_atomic xt) alive; commit (Backoff.once backoff) mode (reset_quick xt) tx) + | exception exn -> + Timeout.cancel (fenceless_get (timeout_as_atomic xt)); + raise exn - let commit ?(backoff = Backoff.default) ?(mode = Mode.obstruction_free) tx = + let commit ?timeoutf ?(backoff = Backoff.default) + ?(mode = Mode.obstruction_free) tx = let casn = Atomic.make (mode :> status) and cass = NIL and validate_counter = initial_validate_period and post_commit = Action.noop in - let xt = { casn; cass; validate_counter; post_commit } in + let xt = + { _timeout = Timeout.Unset; casn; cass; validate_counter; post_commit } + in + Timeout.set_opt (timeout_as_atomic xt) timeoutf; commit backoff mode xt tx.tx [@@inline] end diff --git a/src/kcas/kcas.mli b/src/kcas/kcas.mli index 4330d726..6bbf3bad 100644 --- a/src/kcas/kcas.mli +++ b/src/kcas/kcas.mli @@ -2,6 +2,14 @@ module Backoff : module type of Backoff +(** Timeout support. *) +module Timeout : sig + exception Timeout + (** Exception that may be raised by operations such as {!Loc.get_as}, + {!Loc.update}, {!Loc.modify}, or {!Xt.commit} when given a [~timeoutf] in + seconds and timeout elapses. *) +end + (** Retry support. *) module Retry : sig exception Later @@ -60,7 +68,10 @@ end - [backoff] specifies the configuration for the {!Backoff} mechanism. In special cases, having more detailed knowledge of the application, one - might adjust the configuration to improve performance. *) + might adjust the configuration to improve performance. + + - [timeoutf] specifies a timeout in seconds and, if specified, the + {!Timeout.Timeout} may be raised by the operation to signal timeout. *) (** Shared memory locations. *) module Loc : sig @@ -91,7 +102,7 @@ module Loc : sig val get : 'a t -> 'a (** [get r] reads the current value of the shared memory location [r]. *) - val get_as : ('a -> 'b) -> 'a t -> 'b + val get_as : ?timeoutf:float -> ('a -> 'b) -> 'a t -> 'b (** [get_as f loc] is equivalent to [f (get loc)]. The given function [f] may raise the {!Retry.Later} exception to signal that the conditional load should be retried only after the location has been modified outside of the @@ -103,7 +114,7 @@ module Loc : sig location [r] to the [after] value if the current value of [r] is the [before] value. *) - val update : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a + val update : ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a (** [update r f] repeats [let b = get r in compare_and_set r b (f b)] until it succeeds and then returns the [b] value. The given function [f] may raise the {!Retry.Later} exception to signal that the update should only be @@ -111,7 +122,8 @@ module Loc : sig also safe for the given function [f] to raise any other exception to abort the update. *) - val modify : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit + val modify : + ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit (** [modify r f] is equivalent to [update r f |> ignore]. *) val exchange : ?backoff:Backoff.t -> 'a t -> 'a -> 'a @@ -142,11 +154,13 @@ module Loc : sig (** [fenceless_get r] is like [get r] except that [fenceless_get]s may be reordered. *) - val fenceless_update : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a + val fenceless_update : + ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a (** [fenceless_update r f] is like [update r f] except that in case [f x == x] the update may be reordered. *) - val fenceless_modify : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit + val fenceless_modify : + ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit (** [fenceless_modify r f] is like [modify r f] except that in case [f x == x] the modify may be reordered. *) end @@ -348,7 +362,8 @@ module Xt : sig val call : xt:'x t -> 'a tx -> 'a (** [call ~xt tx] is equivalent to [tx.Xt.tx ~xt]. *) - val commit : ?backoff:Backoff.t -> ?mode:Mode.t -> 'a tx -> 'a + val commit : + ?timeoutf:float -> ?backoff:Backoff.t -> ?mode:Mode.t -> 'a tx -> 'a (** [commit tx] repeatedly calls [tx] to record a log of shared memory accesses and attempts to perform them atomically until it succeeds and then returns whatever [tx] returned. [tx] may raise {!Retry.Later} or diff --git a/test/kcas/test.ml b/test/kcas/test.ml index 883f3abe..eab3391b 100644 --- a/test/kcas/test.ml +++ b/test/kcas/test.ml @@ -167,6 +167,13 @@ let test_read_casn () = let b = Loc.get a2 in assert (a <= b) done + and getaser () = + Barrier.await barrier; + while not (Atomic.get test_finished) do + let a = Loc.get_as Fun.id a1 in + let b = Loc.get_as Fun.id a2 in + assert (a <= b) + done and committer () = Barrier.await barrier; while not (Atomic.get test_finished) do @@ -183,7 +190,7 @@ let test_read_casn () = done in - run_domains [ mutator; getter; committer; updater ] + run_domains [ mutator; getter; getaser; committer; updater ] (* *)