Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New batcher version for paper #13

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions benchmarks/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,36 @@ skiplist_explicit :
$(B) skiplist-explicit --init-count=$(S_PRESET) --count=$(S_INS) -i $(ITER) --no-searches=$(S_SEARCH) --no-size=$(S_SIZE) -w $(WARMUP) -D 6
$(B) skiplist-explicit --init-count=$(S_PRESET) --count=$(S_INS) -i $(ITER) --no-searches=$(S_SEARCH) --no-size=$(S_SIZE) -w $(WARMUP) -D 7
$(B) skiplist-explicit --init-count=$(S_PRESET) --count=$(S_INS) -i $(ITER) --no-searches=$(S_SEARCH) --no-size=$(S_SIZE) -w $(WARMUP) -D 8

btree_batched_validate :
printf "Btree-batched\n"
$(B) btree-batched --init-count=$(B_PRESET) --count=$(B_OPS) -i $(ITER) -w $(WARMUP) -D 1 -T
$(B) btree-batched --init-count=$(B_PRESET) --count=$(B_OPS) -i $(ITER) -w $(WARMUP) -D 2 -T
$(B) btree-batched --init-count=$(B_PRESET) --count=$(B_OPS) -i $(ITER) -w $(WARMUP) -D 3 -T
$(B) btree-batched --init-count=$(B_PRESET) --count=$(B_OPS) -i $(ITER) -w $(WARMUP) -D 4 -T
$(B) btree-batched --init-count=$(B_PRESET) --count=$(B_OPS) -i $(ITER) -w $(WARMUP) -D 5 -T
$(B) btree-batched --init-count=$(B_PRESET) --count=$(B_OPS) -i $(ITER) -w $(WARMUP) -D 6 -T
$(B) btree-batched --init-count=$(B_PRESET) --count=$(B_OPS) -i $(ITER) -w $(WARMUP) -D 7 -T
$(B) btree-batched --init-count=$(B_PRESET) --count=$(B_OPS) -i $(ITER) -w $(WARMUP) -D 8 -T

skiplist_batched :
printf "Skiplist-Batched\n"
$(B) skiplist-batched --init-count=$(S_PRESET) --count=$(S_INS) -i $(ITER) --no-searches=$(S_SEARCH) --no-size=$(S_SIZE) -w $(WARMUP) -D 1
$(B) skiplist-batched --init-count=$(S_PRESET) --count=$(S_INS) -i $(ITER) --no-searches=$(S_SEARCH) --no-size=$(S_SIZE) -w $(WARMUP) -D 2
$(B) skiplist-batched --init-count=$(S_PRESET) --count=$(S_INS) -i $(ITER) --no-searches=$(S_SEARCH) --no-size=$(S_SIZE) -w $(WARMUP) -D 3
$(B) skiplist-batched --init-count=$(S_PRESET) --count=$(S_INS) -i $(ITER) --no-searches=$(S_SEARCH) --no-size=$(S_SIZE) -w $(WARMUP) -D 4
$(B) skiplist-batched --init-count=$(S_PRESET) --count=$(S_INS) -i $(ITER) --no-searches=$(S_SEARCH) --no-size=$(S_SIZE) -w $(WARMUP) -D 5
$(B) skiplist-batched --init-count=$(S_PRESET) --count=$(S_INS) -i $(ITER) --no-searches=$(S_SEARCH) --no-size=$(S_SIZE) -w $(WARMUP) -D 6
$(B) skiplist-batched --init-count=$(S_PRESET) --count=$(S_INS) -i $(ITER) --no-searches=$(S_SEARCH) --no-size=$(S_SIZE) -w $(WARMUP) -D 7
$(B) skiplist-batched --init-count=$(S_PRESET) --count=$(S_INS) -i $(ITER) --no-searches=$(S_SEARCH) --no-size=$(S_SIZE) -w $(WARMUP) -D 8

counter_batched :
printf "Counter-Batched\n"
$(B) counter-batched --count=$(C_OPS) --rand -i $(ITER) -w $(WARMUP) -D 1
$(B) counter-batched --count=$(C_OPS) --rand -i $(ITER) -w $(WARMUP) -D 2
$(B) counter-batched --count=$(C_OPS) --rand -i $(ITER) -w $(WARMUP) -D 3
$(B) counter-batched --count=$(C_OPS) --rand -i $(ITER) -w $(WARMUP) -D 4
$(B) counter-batched --count=$(C_OPS) --rand -i $(ITER) -w $(WARMUP) -D 5
$(B) counter-batched --count=$(C_OPS) --rand -i $(ITER) -w $(WARMUP) -D 6
$(B) counter-batched --count=$(C_OPS) --rand -i $(ITER) -w $(WARMUP) -D 7
$(B) counter-batched --count=$(C_OPS) --rand -i $(ITER) -w $(WARMUP) -D 8
3 changes: 2 additions & 1 deletion benchmarks/btree.ml
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ module Batched = struct
then BatchedIntBtree.apply tree (Insert (test_spec.insert_elements.(i), ()))
else
ignore (BatchedIntBtree.apply tree (Search test_spec.search_elements.(i - Array.length test_spec.insert_elements)))
)
);
BatchedIntBtree.wait_for_batch tree


let cleanup (t: t) (test_spec: test_spec) =
Expand Down
3 changes: 2 additions & 1 deletion benchmarks/counter.ml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ module Batched = struct
else if i < test_spec.increments + test_spec.decrements
then BatchedCounter.apply counter Decr
else BatchedCounter.apply counter Get |> ignore
)
);
BatchedCounter.wait_for_batch counter

let cleanup (_t: t) (_test_spec: test_spec) = ()

Expand Down
6 changes: 4 additions & 2 deletions benchmarks/datalog_bench.ml
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ module BatchParallel = struct
else ignore (
ignore @@
ConcurrentDatalog.apply t (Search test_spec.search_elements.(i - Array.length test_spec.insert_elements)))
)
);
ConcurrentDatalog.wait_for_batch t

let cleanup _ _ = ()

Expand Down Expand Up @@ -336,7 +337,8 @@ module BatchParallelBasic = struct
else ignore (
ignore @@
ConcurrentBasicDatalog.apply t (Search test_spec.search_elements.(i - Array.length test_spec.insert_elements)))
)
);
ConcurrentBasicDatalog.wait_for_batch t

let cleanup _ _ = ()

Expand Down
3 changes: 2 additions & 1 deletion benchmarks/skiplist.ml
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ module Batched = struct
(Member test_spec.search_elements.(i - Array.length test_spec.insert_elements)))
else
ignore (BatchedSkiplist.apply t Size)
)
);
BatchedSkiplist.wait_for_batch t

let cleanup (_t: t) (_test_spec: test_spec) = ()

Expand Down
79 changes: 66 additions & 13 deletions lib/batcher.ml
Original file line number Diff line number Diff line change
Expand Up @@ -33,33 +33,43 @@ module Make (S : S) = struct
pool : Task.pool;
mutable ds : S.t;
running : bool Atomic.t;
container : S.wrapped_op Ts_container.t
container : S.wrapped_op Ts_container.t;
mutable last_run : Ptime.t
}

let init pool =
{ pool;
ds = S.init ();
running = Atomic.make false;
container = Ts_container.create () }

container = Ts_container.create ();
last_run = Ptime_clock.now ()
}

let rec try_launch t =
if Ts_container.size t.container > 0
&& Atomic.compare_and_set t.running false true
if Ts_container.size t.container <= 0 then ()
else let current_time = Ptime_clock.now () in
if (Ts_container.size t.container < 1000 && Ptime.Span.to_float_s (Ptime.diff current_time t.last_run) < 0.001) then
ignore @@ Task.async t.pool (fun () -> try_launch t)
else if Atomic.compare_and_set t.running false true
then
begin
let batch = Ts_container.get t.container in
t.last_run <- current_time;
S.run t.ds t.pool batch;
Atomic.set t.running false;
try_launch t
end

let try_launch t =
if Ts_container.size t.container > 0
&& Atomic.compare_and_set t.running false true
if Ts_container.size t.container <= 0 then ()
else let current_time = Ptime_clock.now () in
if (Ts_container.size t.container < 1000 && Ptime.Span.to_float_s (Ptime.diff current_time t.last_run) < 0.001) then
ignore @@ Task.async t.pool (fun () -> try_launch t)
else if Atomic.compare_and_set t.running false true
then
begin
let batch = Ts_container.get t.container in
t.last_run <- current_time;
S.run t.ds t.pool batch;
Atomic.set t.running false;
ignore @@ Task.async t.pool (fun () -> try_launch t)
Expand All @@ -72,6 +82,22 @@ module Make (S : S) = struct
try_launch t;
Task.await t.pool pr

let restart_batcher_timer t = t.last_run <- Ptime_clock.now ()

let is_batch_running t = Ts_container.size t.container > 0 || Atomic.get t.running

let rec wait_for_batch t f =
if is_batch_running t then
ignore @@ Task.async t.pool (fun () -> wait_for_batch t f)
else f ()

let wait_for_batch t =
if is_batch_running t then begin
let pr, set = Task.promise () in
wait_for_batch t set;
Task.await t.pool pr
end

let unsafe_get_internal_data t = t.ds
[@@@alert unsafe "For developer use"]

Expand All @@ -87,32 +113,43 @@ module Make1 (S : S1) = struct
pool : Task.pool;
mutable ds : 'a S.t;
running : bool Atomic.t;
container : 'a S.wrapped_op Ts_container.t
container : 'a S.wrapped_op Ts_container.t;
mutable last_run : Ptime.t
}

let init pool =
{ pool;
ds = S.init ();
running = Atomic.make false;
container = Ts_container.create () }
container = Ts_container.create ();
last_run = Ptime_clock.now ()
}

let rec try_launch t =
if Ts_container.size t.container > 0
&& Atomic.compare_and_set t.running false true
if Ts_container.size t.container <= 0 then ()
else let current_time = Ptime_clock.now () in
if (Ts_container.size t.container < 1000 && Ptime.Span.to_float_s (Ptime.diff current_time t.last_run) < 0.001) then
ignore @@ Task.async t.pool (fun () -> try_launch t)
else if Atomic.compare_and_set t.running false true
then
begin
let batch = Ts_container.get t.container in
t.last_run <- current_time;
S.run t.ds t.pool batch;
Atomic.set t.running false;
try_launch t
end

let try_launch t =
if Ts_container.size t.container > 0
&& Atomic.compare_and_set t.running false true
if Ts_container.size t.container <= 0 then ()
else let current_time = Ptime_clock.now () in
if (Ts_container.size t.container < 1000 && Ptime.Span.to_float_s (Ptime.diff current_time t.last_run) < 0.001) then
ignore @@ Task.async t.pool (fun () -> try_launch t)
else if Atomic.compare_and_set t.running false true
then
begin
let batch = Ts_container.get t.container in
t.last_run <- current_time;
S.run t.ds t.pool batch;
Atomic.set t.running false;
ignore @@ Task.async t.pool (fun () -> try_launch t)
Expand All @@ -125,6 +162,22 @@ module Make1 (S : S1) = struct
try_launch t;
Task.await t.pool pr

let restart_batcher_timer t = t.last_run <- Ptime_clock.now ()

let is_batch_running t = Ts_container.size t.container > 0 || Atomic.get t.running

let rec wait_for_batch t f =
if is_batch_running t then
ignore @@ Task.async t.pool (fun () -> wait_for_batch t f)
else f ()

let wait_for_batch t =
if is_batch_running t then begin
let pr, set = Task.promise () in
wait_for_batch t set;
Task.await t.pool pr
end

let unsafe_get_internal_data t = t.ds
[@@@alert unsafe "For developer use"]

Expand Down
22 changes: 22 additions & 0 deletions lib/batcher.mli
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ module Make : functor (S : S) -> sig
val apply : t -> 'a op -> 'a
(** [apply t op] applies the operation [op] to [t]. *)

val restart_batcher_timer : t -> unit

val is_batch_running : t -> bool
(** [is_batch_running t] returns [true] if there is are active or
pending operations on the batched data structure. *)

val wait_for_batch : t -> unit
(** [wait_for_batch t] waits until all active and pending operations
on the batched data structure have completed. Allows domain to
yield while waiting like [await]. *)

val unsafe_get_internal_data : t -> S.t
[@@@alert unsafe "For developer use"]

Expand All @@ -82,6 +93,17 @@ module Make1 : functor (S : S1) -> sig
val apply : 'a t -> ('a, 'b) op -> 'b
(** [apply t op] applies the operation [op] to [t]. *)

val restart_batcher_timer : 'a t -> unit

val is_batch_running : 'a t -> bool
(** [is_batch_running t] returns [true] if there is are active or
pending operations on the batched data structure. *)

val wait_for_batch : 'a t -> unit
(** [wait_for_batch t] waits until all active and pending operations
on the batched data structure have completed. Allows domain to
yield while waiting like Domainslib's [await]. *)

val unsafe_get_internal_data : 'a t -> 'a S.t
[@@@alert unsafe "For developer use"]

Expand Down
2 changes: 1 addition & 1 deletion lib/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(library
(name domainslib)
(public_name domainslib)
(libraries lockfree))
(libraries lockfree ptime ptime.clock.os))
49 changes: 47 additions & 2 deletions lib/ts_container.ml
Original file line number Diff line number Diff line change
@@ -1,4 +1,49 @@
module ChanBased = struct
(* Simple, unbounded and unordered task queue *)
module SimpleTreiber = struct
type 'a t = 'a list Atomic.t

let empty () = Atomic.make []

let rec push t v =
let cur = Atomic.get t in
if not (Atomic.compare_and_set t cur (v :: cur)) then
push t v

let pop_all t = Array.of_list (Atomic.exchange t [])
end

module StackBased = struct
type 'a t = {
stack : 'a SimpleTreiber.t;
size : int Atomic.t;
batch_limit : int
}

(* Change batch limit to (N)processes *)
let create ?(batch_limit=max_int) () =
{
stack = SimpleTreiber.empty ();
size = Atomic.make 0;
batch_limit;
}

let add t elt =
ignore @@ Atomic.fetch_and_add t.size 1;
SimpleTreiber.push t.stack elt

let get t =
let batch_size = Atomic.exchange t.size 0 in
let arr = SimpleTreiber.pop_all t.stack in
let topup = batch_size - Array.length arr in
let _ = Atomic.fetch_and_add t.size topup in
arr

let size t = Atomic.get t.size
end

include StackBased

(* module ChanBased = struct
type 'a t = {
chan : 'a Chan.t;
size : int Atomic.t;
Expand All @@ -24,7 +69,7 @@ module ChanBased = struct
let size t = Atomic.get t.size
end

include ChanBased
include ChanBased *)

(* type container = ELT.t option array
type t = {
Expand Down