From 657811fbdb0a2b4510c4bfcc39911b537e59df23 Mon Sep 17 00:00:00 2001 From: Vesa Karvonen Date: Wed, 7 Jun 2023 12:55:36 +0300 Subject: [PATCH] Timeouts --- README.md | 75 ++++++++++++++ dune-project | 1 + kcas.opam | 1 + src/kcas/dune | 2 +- src/kcas/kcas.ml | 250 ++++++++++++++++++++++++++++++++++++---------- src/kcas/kcas.mli | 30 ++++-- test/kcas/dune | 2 +- test/kcas/test.ml | 52 +++++++++- 8 files changed, 353 insertions(+), 60 deletions(-) diff --git a/README.md b/README.md index e43a5bc7..02c73319 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,7 @@ is distributed under the [ISC license](LICENSE.md). - [A transactional lock-free queue](#a-transactional-lock-free-queue) - [Composing transactions](#composing-transactions) - [Blocking transactions](#blocking-transactions) + - [Timeouts](#timeouts) - [A transactional lock-free leftist heap](#a-transactional-lock-free-leftist-heap) - [Programming with transactional data structures](#programming-with-transactional-data-structures) - [The dining philosophers problem](#the-dining-philosophers-problem) @@ -76,6 +77,12 @@ is distributed under the [ISC license](LICENSE.md). To use the library + + ```ocaml # #require "kcas" # open Kcas @@ -562,6 +569,74 @@ The retry mechanism essentially allows a transaction to wait for an arbitrary condition and can function as a fairly expressive communication and synchronization mechanism. +#### Timeouts + +> If you block, will they come? + +That is a good question. Blocking indefinitely is often not acceptable. + +A blocked transaction can be waken up by a write to any shared memory location +that was accessed by the transaction. This means that, given a suitable timeout +mechanism, one could e.g. setup a timeout that writes to a boolean shared memory +location that is accessed by a blocking transaction: + +```ocaml +# let pop_or_raise_if ~xt timeout stack = + (* Check if timeout has expired: *) + if Xt.get ~xt timeout then raise Exit; + pop stack +val pop_or_raise_if : + xt:'a Xt.t -> bool Loc.t -> 'b list Loc.t -> xt:'c Xt.t -> 'b = +``` + +This works, but creating, checking, and canceling timeouts properly can be a lot +of work. Therefore **kcas** also directly supports an optional `timeoutf` +argument for potentially blocking operations. For example, to perform a blocking +pop with a timeout, one can simply explicitly pass the desired timeout in +seconds: + +```ocaml +# let an_empty_stack = stack () in + Xt.commit ~timeoutf:0.1 { tx = pop an_empty_stack } +Exception: Failure "Domain_local_timeout.set_timeoutf not implemented". +``` + +Oops! What happened above is that the +[_domain local timeout_](https://github.com/ocaml-multicore/domain-local-timeout) +mechanism used by **kcas** was not implemented on the current domain. The idea +is that, in the future, concurrent schedulers provide the mechanism out of the +box, but there is also a default implementation using the Stdlib `Thread` and +`Unix` modules that works on most platforms. However, to avoid direct +dependencies to `Thread` and `Unix`, we need to explicitly tell the library that +it can use those modules: + +```ocaml +# Domain_local_timeout.set_system (module Thread) (module Unix) +- : unit = () +``` + +This initialization, if needed, should be done by application code rather than +by libraries. + +If we now retry the previous example we will get a +[`Timeout`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Timeout/index.html#exception-Timeout) +exception as expected: + +```ocaml +# let an_empty_stack = stack () in + Xt.commit ~timeoutf:0.1 { tx = pop an_empty_stack } +Exception: Kcas.Timeout.Timeout. +``` + +Besides +[`commit`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Xt/index.html#val-commit), +potentially blocking single location operations such as +[`get_as`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Loc/index.html#val-get_as), +[`update`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Loc/index.html#val-update), +and +[`modify`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Loc/index.html#val-modify) +support the optional `timeoutf` argument. + #### A transactional lock-free leftist heap Let's implement something a bit more complicated, diff --git a/dune-project b/dune-project index 7b157bf7..30695d86 100644 --- a/dune-project +++ b/dune-project @@ -11,6 +11,7 @@ (depends (ocaml (>= 5.0)) (domain-local-await (>= 0.2.0)) + (domain-local-timeout (>= 0.1.0)) (alcotest (and (>= 1.7.0) :with-test)) (mdx (and (>= 1.10.0) :with-test)))) (package (name kcas_data) diff --git a/kcas.opam b/kcas.opam index c31dd25a..8a5bf796 100644 --- a/kcas.opam +++ b/kcas.opam @@ -11,6 +11,7 @@ depends: [ "dune" {>= "3.3"} "ocaml" {>= "5.0"} "domain-local-await" {>= "0.2.0"} + "domain-local-timeout" {>= "0.1.0"} "alcotest" {>= "1.7.0" & with-test} "mdx" {>= "1.10.0" & with-test} "odoc" {with-doc} diff --git a/src/kcas/dune b/src/kcas/dune index ac7491b9..8c8e969e 100644 --- a/src/kcas/dune +++ b/src/kcas/dune @@ -1,4 +1,4 @@ (library (name kcas) (public_name kcas) - (libraries domain-local-await)) + (libraries domain-local-await domain-local-timeout)) diff --git a/src/kcas/kcas.ml b/src/kcas/kcas.ml index 5e9ea5e2..0e0544c4 100644 --- a/src/kcas/kcas.ml +++ b/src/kcas/kcas.ml @@ -19,6 +19,80 @@ let fenceless_set = Atomic.set module Backoff = Backoff +module Timeout = struct + exception Timeout + + let timeout () = raise Timeout [@@inline never] + + type t = Unset | Elapsed | Call of (unit -> unit) + + let unset = Atomic.make Unset + + (* Fenceless operations are safe here as the timeout state is not not visible + outside of the library and we don't always need the latest value and, when + we do, there is a fence after. *) + + let check state = if fenceless_get state == Elapsed then timeout () [@@inline] + + let set seconds state = + Domain_local_timeout.set_timeoutf seconds @@ fun () -> + match Atomic.exchange state Elapsed with + | Call release_or_cancel -> release_or_cancel () + | Unset | Elapsed -> () + + let alloc_opt = function + | None -> unset + | Some seconds -> + let state = Atomic.make Unset in + let cancel = set seconds state in + fenceless_set state @@ Call cancel; + state + [@@inline never] + + let alloc_opt seconds = if seconds == None then unset else alloc_opt seconds + [@@inline] + + let set_opt state = function + | None -> () + | Some seconds -> + let cancel = set seconds state in + fenceless_set state @@ Call cancel + [@@inline never] + + let set_opt state seconds = if seconds != None then set_opt state seconds + [@@inline] + + let await state release = + match fenceless_get state with + | Call _ as alive -> + if Atomic.compare_and_set state alive (Call release) then alive + else timeout () + | Unset | Elapsed -> timeout () + [@@inline never] + + let await state release = + let alive = fenceless_get state in + if alive == Unset then Unset else await state release + [@@inline] + + let unawait state alive = + match fenceless_get state with + | Call _ as await -> + if not (Atomic.compare_and_set state await alive) then timeout () + | Unset | Elapsed -> timeout () + [@@inline never] + + let unawait state alive = if alive != Unset then unawait state alive + [@@inline] + + let cancel_alive alive = + match alive with Call cancel -> cancel () | Unset | Elapsed -> () + [@@inline never] + + let cancel_alive alive = if alive != Unset then cancel_alive alive [@@inline] + let cancel state = cancel_alive (fenceless_get state) [@@inline] +end + module Id = struct let neg_id = Atomic.make (-1) let neg_ids n = Atomic.fetch_and_add neg_id (-n) [@@inline] @@ -32,8 +106,11 @@ module Action : sig type t val noop : t - val append : (unit -> unit) -> t -> t + val append : (unit -> unit) -> t -> t [@@inline] + val run : t -> 'a -> 'a + [@@inline] + (** Always call this last as user code may raise. *) end = struct type t = unit -> unit @@ -52,14 +129,10 @@ type awaiter = unit -> unit let resume_awaiter awaiter = awaiter () [@@inline] -let resume_awaiters result = function - | [] -> result - | [ awaiter ] -> - resume_awaiter awaiter; - result - | awaiters -> - List.iter resume_awaiter awaiters; - result +let resume_awaiters = function + | [] -> () + | [ awaiter ] -> resume_awaiter awaiter + | awaiters -> List.iter resume_awaiter awaiters [@@inline] type determined = [ `After | `Before ] @@ -127,7 +200,7 @@ let rec release_after casn = function if not (is_cmp casn state) then ( state.before <- state.after; state.casn <- casn_after; - resume_awaiters () awaiters); + resume_awaiters awaiters); release_after casn gt let rec release_before casn = function @@ -137,7 +210,7 @@ let rec release_before casn = function if not (is_cmp casn state) then ( state.after <- state.before; state.casn <- casn_before; - resume_awaiters () awaiters); + resume_awaiters awaiters); release_before casn gt let release casn cass = function @@ -345,52 +418,70 @@ let rec remove_awaiter loc before awaiter = if not (Atomic.compare_and_set (as_atomic loc) state_old state_new) then remove_awaiter loc before awaiter -let block loc before = +let block timeout loc before = let t = Domain_local_await.prepare_for_await () in + let alive = Timeout.await timeout t.release in if add_awaiter loc before t.release then ( try t.await () with cancellation_exn -> let backtrace = Printexc.get_raw_backtrace () in remove_awaiter loc before t.release; - Printexc.raise_with_backtrace cancellation_exn backtrace) + Timeout.cancel_alive alive; + Printexc.raise_with_backtrace cancellation_exn backtrace); + Timeout.unawait timeout alive -let rec update_no_alloc backoff loc state f = +let rec update_no_alloc timeout backoff loc state f = (* Fenceless is safe as we have had a fence before if needed and there is a fence after. *) let state_old = fenceless_get (as_atomic loc) in let before = eval state_old in match f before with | after -> state.after <- after; - if before == after then before + if before == after then ( + Timeout.cancel timeout; + before) else if Atomic.compare_and_set (as_atomic loc) state_old state then ( state.before <- after; - resume_awaiters before state_old.awaiters) - else update_no_alloc (Backoff.once backoff) loc state f + resume_awaiters state_old.awaiters; + Timeout.cancel timeout; + before) + else update_no_alloc timeout (Backoff.once backoff) loc state f | exception Retry.Later -> - block loc before; - update_no_alloc backoff loc state f + block timeout loc before; + update_no_alloc timeout backoff loc state f + | exception exn -> + Timeout.cancel timeout; + raise exn -let update_with_state backoff loc f state_old = +let update_with_state timeout backoff loc f state_old = let before = eval state_old in match f before with | after -> - if before == after then before + if before == after then ( + Timeout.cancel timeout; + before) else let state = new_state after in - if Atomic.compare_and_set (as_atomic loc) state_old state then - resume_awaiters before state_old.awaiters - else update_no_alloc (Backoff.once backoff) loc state f + if Atomic.compare_and_set (as_atomic loc) state_old state then ( + resume_awaiters state_old.awaiters; + Timeout.cancel timeout; + before) + else update_no_alloc timeout (Backoff.once backoff) loc state f | exception Retry.Later -> let state = new_state before in - block loc before; - update_no_alloc backoff loc state f + block timeout loc before; + update_no_alloc timeout backoff loc state f + | exception exn -> + Timeout.cancel timeout; + raise exn let rec exchange_no_alloc backoff loc state = let state_old = Atomic.get (as_atomic loc) in let before = eval state_old in if before == state.after then before - else if Atomic.compare_and_set (as_atomic loc) state_old state then - resume_awaiters before state_old.awaiters + else if Atomic.compare_and_set (as_atomic loc) state_old state then ( + resume_awaiters state_old.awaiters; + before) else exchange_no_alloc (Backoff.once backoff) loc state let is_obstruction_free casn loc = @@ -404,8 +495,9 @@ let rec cas_with_state loc before state state_old = || before == if is_after state_old.casn then after' else before') && (before == state.after || - if Atomic.compare_and_set (as_atomic loc) state_old state then - resume_awaiters true state_old.awaiters + if Atomic.compare_and_set (as_atomic loc) state_old state then ( + resume_awaiters state_old.awaiters; + true) else (* We must retry, because compare is by value rather than by state. @@ -442,13 +534,23 @@ module Loc = struct let get_id loc = loc.id [@@inline] let get loc = eval (Atomic.get (as_atomic loc)) - let rec get_as f loc = - let before = get loc in + let rec get_as timeout f loc state = + let before = eval state in match f before with - | value -> value + | value -> + Timeout.cancel timeout; + value | exception Retry.Later -> - block loc before; - get_as f loc + block timeout loc before; + (* Fenceless is safe as there was already a fence before. *) + get_as timeout f loc (fenceless_get (as_atomic loc)) + | exception exn -> + Timeout.cancel timeout; + raise exn + + let get_as ?timeoutf f loc = + get_as (Timeout.alloc_opt timeoutf) f loc (Atomic.get (as_atomic loc)) + [@@inline] let get_mode loc = if loc.id < 0 then Mode.lock_free else Mode.obstruction_free @@ -459,17 +561,21 @@ module Loc = struct let state_old = Atomic.get (as_atomic loc) in cas_with_state loc before state state_old - let fenceless_update ?(backoff = Backoff.default) loc f = - update_with_state backoff loc f (fenceless_get (as_atomic loc)) + let fenceless_update ?timeoutf ?(backoff = Backoff.default) loc f = + let timeout = Timeout.alloc_opt timeoutf in + update_with_state timeout backoff loc f (fenceless_get (as_atomic loc)) - let fenceless_modify ?backoff loc f = - fenceless_update ?backoff loc f |> ignore + let fenceless_modify ?timeoutf ?backoff loc f = + fenceless_update ?timeoutf ?backoff loc f |> ignore [@@inline] - let update ?(backoff = Backoff.default) loc f = - update_with_state backoff loc f (Atomic.get (as_atomic loc)) + let update ?timeoutf ?(backoff = Backoff.default) loc f = + let timeout = Timeout.alloc_opt timeoutf in + update_with_state timeout backoff loc f (Atomic.get (as_atomic loc)) - let modify ?backoff loc f = update ?backoff loc f |> ignore [@@inline] + let modify ?timeoutf ?backoff loc f = + update ?timeoutf ?backoff loc f |> ignore + [@@inline] let exchange ?(backoff = Backoff.default) loc value = exchange_no_alloc backoff loc (new_state value) @@ -556,13 +662,36 @@ module Op = struct end module Xt = struct + (* NOTE: You can adjust comment blocks below to select whether or not to use + an unsafe cast to avoid a level of indirection due to [Atomic.t]. *) + + (**) + type 'x t = { + mutable _timeout : Timeout.t; + mutable casn : casn; + mutable cass : cass; + mutable validate_counter : int; + mutable post_commit : Action.t; + } + + let timeout_unset () = Timeout.Unset [@@inline] + + external timeout_as_atomic : 'x t -> Timeout.t Atomic.t = "%identity" + (**) + + (* type 'x t = { + mutable _timeout : Timeout.t Atomic.t; mutable casn : casn; mutable cass : cass; mutable validate_counter : int; mutable post_commit : Action.t; } + let timeout_unset () = Atomic.make Timeout.Unset [@@inline] + let timeout_as_atomic r = r._timeout [@@inline] + *) + let validate_one casn loc state = let before = if is_cmp casn state then eval state else state.before in (* Fenceless is safe inside transactions as each log update has a fence. *) @@ -581,7 +710,9 @@ module Xt = struct let c1 = c0 + 1 in xt.validate_counter <- c1; (* Validate whenever counter reaches next power of 2. *) - if c0 land c1 = 0 then validate_all xt.casn xt.cass + if c0 land c1 = 0 then ( + Timeout.check (timeout_as_atomic xt); + validate_all xt.casn xt.cass) [@@inline] let update0 loc f xt lt gt = @@ -788,50 +919,69 @@ module Xt = struct match tx ~xt with | result -> ( match xt.cass with - | NIL -> Action.run xt.post_commit result + | NIL -> + Timeout.cancel (timeout_as_atomic xt); + Action.run xt.post_commit result | CASN { loc; state; lt = NIL; gt = NIL; _ } -> - if is_cmp xt.casn state then Action.run xt.post_commit result + if is_cmp xt.casn state then ( + Timeout.cancel (timeout_as_atomic xt); + Action.run xt.post_commit result) else let before = state.before in state.before <- state.after; state.casn <- casn_after; (* Fenceless is safe inside transactions as each log update has a fence. *) let state_old = fenceless_get (as_atomic loc) in - if cas_with_state loc before state state_old then - Action.run xt.post_commit result + if cas_with_state loc before state state_old then ( + Timeout.cancel (timeout_as_atomic xt); + Action.run xt.post_commit result) else commit (Backoff.once backoff) mode (reset_quick xt) tx | cass -> ( match determine_for_owner xt.casn cass with - | true -> Action.run xt.post_commit result + | true -> + Timeout.cancel (timeout_as_atomic xt); + Action.run xt.post_commit result | false -> commit (Backoff.once backoff) mode (reset mode xt) tx | exception Mode.Interference -> commit (Backoff.once backoff) Mode.lock_free (reset Mode.lock_free xt) tx)) | exception Retry.Invalid -> + Timeout.check (timeout_as_atomic xt); commit (Backoff.once backoff) mode (reset_quick xt) tx | exception Retry.Later -> ( if xt.cass == NIL then invalid_retry (); let t = Domain_local_await.prepare_for_await () in + let alive = Timeout.await (timeout_as_atomic xt) t.release in match add_awaiters t.release xt.casn xt.cass with | NIL -> ( match t.await () with | () -> remove_awaiters t.release xt.casn NIL xt.cass; + Timeout.unawait (timeout_as_atomic xt) alive; commit (Backoff.reset backoff) mode (reset_quick xt) tx | exception cancellation_exn -> let backtrace = Printexc.get_raw_backtrace () in remove_awaiters t.release xt.casn NIL xt.cass; + Timeout.cancel_alive alive; Printexc.raise_with_backtrace cancellation_exn backtrace) | CASN _ as stop -> remove_awaiters t.release xt.casn stop xt.cass; + Timeout.unawait (timeout_as_atomic xt) alive; commit (Backoff.once backoff) mode (reset_quick xt) tx) + | exception exn -> + Timeout.cancel (timeout_as_atomic xt); + raise exn - let commit ?(backoff = Backoff.default) ?(mode = Mode.obstruction_free) tx = + let commit ?timeoutf ?(backoff = Backoff.default) + ?(mode = Mode.obstruction_free) tx = let casn = Atomic.make (mode :> status) and cass = NIL and validate_counter = initial_validate_period and post_commit = Action.noop in - let xt = { casn; cass; validate_counter; post_commit } in + let xt = + { _timeout = timeout_unset (); casn; cass; validate_counter; post_commit } + in + Timeout.set_opt (timeout_as_atomic xt) timeoutf; commit backoff mode xt tx.tx [@@inline] end diff --git a/src/kcas/kcas.mli b/src/kcas/kcas.mli index 4330d726..48031c57 100644 --- a/src/kcas/kcas.mli +++ b/src/kcas/kcas.mli @@ -2,6 +2,14 @@ module Backoff : module type of Backoff +(** Timeout support. *) +module Timeout : sig + exception Timeout + (** Exception that may be raised by operations such as {!Loc.get_as}, + {!Loc.update}, {!Loc.modify}, or {!Xt.commit} when given a [~timeoutf] in + seconds. *) +end + (** Retry support. *) module Retry : sig exception Later @@ -60,7 +68,11 @@ end - [backoff] specifies the configuration for the {!Backoff} mechanism. In special cases, having more detailed knowledge of the application, one - might adjust the configuration to improve performance. *) + might adjust the configuration to improve performance. + + - [timeoutf] specifies a timeout in seconds and, if specified, the + {!Timeout.Timeout} may be raised by the operation to signal that the + timeout expired. *) (** Shared memory locations. *) module Loc : sig @@ -91,7 +103,7 @@ module Loc : sig val get : 'a t -> 'a (** [get r] reads the current value of the shared memory location [r]. *) - val get_as : ('a -> 'b) -> 'a t -> 'b + val get_as : ?timeoutf:float -> ('a -> 'b) -> 'a t -> 'b (** [get_as f loc] is equivalent to [f (get loc)]. The given function [f] may raise the {!Retry.Later} exception to signal that the conditional load should be retried only after the location has been modified outside of the @@ -103,7 +115,7 @@ module Loc : sig location [r] to the [after] value if the current value of [r] is the [before] value. *) - val update : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a + val update : ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a (** [update r f] repeats [let b = get r in compare_and_set r b (f b)] until it succeeds and then returns the [b] value. The given function [f] may raise the {!Retry.Later} exception to signal that the update should only be @@ -111,7 +123,8 @@ module Loc : sig also safe for the given function [f] to raise any other exception to abort the update. *) - val modify : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit + val modify : + ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit (** [modify r f] is equivalent to [update r f |> ignore]. *) val exchange : ?backoff:Backoff.t -> 'a t -> 'a -> 'a @@ -142,11 +155,13 @@ module Loc : sig (** [fenceless_get r] is like [get r] except that [fenceless_get]s may be reordered. *) - val fenceless_update : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a + val fenceless_update : + ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a (** [fenceless_update r f] is like [update r f] except that in case [f x == x] the update may be reordered. *) - val fenceless_modify : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit + val fenceless_modify : + ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit (** [fenceless_modify r f] is like [modify r f] except that in case [f x == x] the modify may be reordered. *) end @@ -348,7 +363,8 @@ module Xt : sig val call : xt:'x t -> 'a tx -> 'a (** [call ~xt tx] is equivalent to [tx.Xt.tx ~xt]. *) - val commit : ?backoff:Backoff.t -> ?mode:Mode.t -> 'a tx -> 'a + val commit : + ?timeoutf:float -> ?backoff:Backoff.t -> ?mode:Mode.t -> 'a tx -> 'a (** [commit tx] repeatedly calls [tx] to record a log of shared memory accesses and attempts to perform them atomically until it succeeds and then returns whatever [tx] returned. [tx] may raise {!Retry.Later} or diff --git a/test/kcas/dune b/test/kcas/dune index 3e0599d0..223e78ff 100644 --- a/test/kcas/dune +++ b/test/kcas/dune @@ -5,7 +5,7 @@ (test (name test) - (libraries kcas barrier unix alcotest) + (libraries kcas barrier threads alcotest) (modules test) (package kcas)) diff --git a/test/kcas/test.ml b/test/kcas/test.ml index 883f3abe..3b1caca0 100644 --- a/test/kcas/test.ml +++ b/test/kcas/test.ml @@ -167,6 +167,13 @@ let test_read_casn () = let b = Loc.get a2 in assert (a <= b) done + and getaser () = + Barrier.await barrier; + while not (Atomic.get test_finished) do + let a = Loc.get_as Fun.id a1 in + let b = Loc.get_as Fun.id a2 in + assert (a <= b) + done and committer () = Barrier.await barrier; while not (Atomic.get test_finished) do @@ -183,7 +190,7 @@ let test_read_casn () = done in - run_domains [ mutator; getter; committer; updater ] + run_domains [ mutator; getter; getaser; committer; updater ] (* *) @@ -564,6 +571,47 @@ let test_call () = (* *) +(** This is a non-deterministic test that might fail occasionally. *) +let test_timeout () = + Domain_local_timeout.set_system (module Thread) (module Unix); + + let check (op : ?timeoutf:float -> bool Loc.t -> unit) () = + let x = Loc.make false in + let finally = + Domain_local_timeout.set_timeoutf 0.3 @@ fun () -> Loc.set x true + in + Fun.protect ~finally @@ fun () -> + (match op ~timeoutf:0.1 x with + | () -> assert false + | exception Timeout.Timeout -> ()); + op ~timeoutf:0.5 x + in + run_domains + [ + check (fun ?timeoutf x -> + Loc.get_as ?timeoutf (fun x -> if x then () else Retry.later ()) x); + check (fun ?timeoutf x -> + Loc.update ?timeoutf x (fun x -> x || Retry.later ()) |> ignore); + check (fun ?timeoutf x -> + Loc.modify ?timeoutf x (fun x -> x || Retry.later ())); + check (fun ?timeoutf x -> + let y = Loc.make false in + let tx ~xt = + if not (Xt.get ~xt x) then Retry.later (); + Xt.swap ~xt x y + in + Xt.commit ?timeoutf { tx }); + check (fun ?timeoutf x -> + let y = Loc.make false in + let tx ~xt = + if not (Xt.get ~xt x) then Retry.invalid (); + Xt.swap ~xt x y + in + Xt.commit ?timeoutf { tx }); + ] + +(* *) + let test_mode () = assert (Loc.get_mode (Loc.make ~mode:Mode.lock_free 0) == Mode.lock_free); assert ( @@ -618,4 +666,6 @@ let () = ("call", [ Alcotest.test_case "" `Quick test_call ]); ("mode", [ Alcotest.test_case "" `Quick test_mode ]); ("xt", [ Alcotest.test_case "" `Quick test_xt ]); + ( "timeout (non-deterministic)", + [ Alcotest.test_case "" `Quick test_timeout ] ); ]