Skip to content

Commit

Permalink
Better benchmark stability
Browse files Browse the repository at this point in the history
This patch does multiple things to try to improve stability of running
times:

 - Introduce cpu affinity support in benchpress. This is configured with
   on the command-line with the new `--cpus` option, which replaces `-j`
   and allows to specify a list of cpus or cpu ranges in taskset format,
   such as `0-3,7-12,15` (strides are not supported).

   When the `--cpus` option is provided, provers will be run in parallel
   on the provided cpus, with at most one prover running on a given cpu
   at once.

   Note that the `--cpus` setting *only* concerns the prover runs (and
   some glue code around one specific prover run): it does *not*
   otherwise restrict the cpus used by benchpress itself. It is
   recommended to use an external method such as the `taskset` utility
   to assign to the toplevel benchpress process a CPU affinity that does
   not overlap with the prover CPUs provided in `--cpus`.

   When using the `--cpus` setting, users should be aware that *there
   may still be other processes on the system using these cpus*
   (obviously)! It is thus recommended to use one of the existing
   techniques to isolate these CPUs from the rest of the system. I know
   of two ways to do this: the `isolcpus` kernel parameter, which is
   deprecated but is slightly easier to use, and cpusets, which are not
   deprecated but harder to use.

   To use `isolcpus`, simply set the cpus to use for benchmarking as the
   isolated CPUs on the kernel cmdline and reboot (OK, maybe not that
   simple for one-shot benchmarking, but fairly easy for a machine that
   is mostly used for benchmarking). There is no need to set the CPU
   affinity for the `benchpress` binary: it will never be scheduled on
   the isolated CPUs, and neither will any other processes (unless
   manually required to).

   To use cpusets (which is the solution I employed on our benchmark
   machine at OCamlPro), you should create a `system` cpuset that
   contains only the CPUs that will *NOT* be used by benchpress, and
   move all processes to that cpuset (this can be done on a running
   system, consult the cpuset documentation). Then, create another
   cpuset that contains the CPUs to use for benchpress, including the
   CPUs to use for the `benchpress` binary (in practice I use the root
   cpuset that contains all the CPUs), and run `benchpress` in that
   cpuset. You must not forget to use `taskset` to prevent the
   `benchpress` binary from using the CPUs destined for the provers. In
   practice, I move the shell that I use to run benchpress to that
   second cpuset.

 - Input files are copied into RAM (through `/dev/shm`) before running
   the provers, and the input to the prover is the copy in RAM.  This
   ensures we don't benchmark the time it takes to load the problem from
   disk, and hels minimise noise in situations of high disk contention
   (such as when tens of prover instances are trying to read their input
   at the same time). We are still limited by the RAM bandwidth but…
   what can you do.

 - Standard output and standard error of the prover are written to
   temporary files in RAM instead of using pipes. This ensures that the
   prover never gets stuck waiting for benchpress to read from the pipes
   (although this shouldn't be an issue due to huge default pipe buffer
   sizes on Linux), but more importantly minimises context switches
   between the prover and benchpress.

With these changes we observe much less noise in benchmark results
internally at OCamlPro, and this all but fixes sneeuwballen#58 (that said, it would
still be nice to add support for getting execution times through
getrusage).
  • Loading branch information
bclement-ocp committed Jun 7, 2023
1 parent 837db0c commit 19a6a1e
Show file tree
Hide file tree
Showing 13 changed files with 251 additions and 68 deletions.
1 change: 1 addition & 0 deletions benchpress.opam
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ depends: [
"uuidm"
"base64"
"ptime"
"processor"
"pp_loc" { >= "2.0" & < "3.0" }
"gnuplot" { >= "0.6" & < "0.8" }
"sqlite3"
Expand Down
8 changes: 4 additions & 4 deletions src/bin/Run_main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ module T = Test
module Log = (val Logs.src_log (Logs.Src.create "benchpress.run-main"))

(* run provers on the given dirs, return a list [prover, dir, results] *)
let execute_run_prover_action ?j ?timestamp ?pp_results ?dyn ?limits ?proof_dir
let execute_run_prover_action ?j ?cpus ?timestamp ?pp_results ?dyn ?limits ?proof_dir
?output ~notify ~uuid ~save ~wal_mode ~update (defs : Definitions.t)
(r : Action.run_provers) : _ * Test_compact_result.t =
Error.guard
(Error.wrapf "run prover action@ `@[%a@]`" Action.pp_run_provers r)
@@ fun () ->
let interrupted = CCLock.create false in
let r =
Exec_action.Exec_run_provers.expand ?dyn ?j ?proof_dir ?limits defs r.limits
Exec_action.Exec_run_provers.expand ?dyn ?j ?cpus ?proof_dir ?limits defs r.limits
r.j r.pattern r.dirs r.provers
in
let len = List.length r.problems in
Expand Down Expand Up @@ -68,7 +68,7 @@ type top_task =
| TT_run_slurm_submission of
Action.run_provers_slurm_submission * Definitions.t

let main ?j ?pp_results ?dyn ?timeout ?memory ?csv ?(provers = []) ?meta:_
let main ?j ?cpus ?pp_results ?dyn ?timeout ?memory ?csv ?(provers = []) ?meta:_
?summary ?task ?(dir_files = []) ?proof_dir ?output ?(save = true)
?(wal_mode = false) ~desktop_notification ~no_failure ~update
?(sbatch = false) ?partition ?nodes ?addr ?port ?ntasks
Expand Down Expand Up @@ -188,7 +188,7 @@ let main ?j ?pp_results ?dyn ?timeout ?memory ?csv ?(provers = []) ?meta:_

let top_res, (results : Test_compact_result.t) =
execute_run_prover_action ~uuid ?pp_results ?proof_dir ?dyn:progress
~limits ?j ?output ~notify ~timestamp ~save ~wal_mode ~update defs
~limits ?j ?cpus ?output ~notify ~timestamp ~save ~wal_mode ~update defs
run_provers_action
in
if CCOpt.is_some csv then (
Expand Down
47 changes: 44 additions & 3 deletions src/bin/benchpress_bin.ml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ module Run = struct
(* sub-command for running tests *)
let cmd =
let open Cmdliner in
let aux j pp_results dyn paths dir_files proof_dir defs task timeout memory
let aux j cpus pp_results dyn paths dir_files proof_dir defs task timeout memory
meta provers csv summary no_color output save wal_mode
desktop_notification no_failure update =
catch_err @@ fun () ->
Expand All @@ -30,7 +30,7 @@ module Run = struct
else
None
in
Run_main.main ~pp_results ?dyn ~j ?timeout ?memory ?csv ~provers ~meta
Run_main.main ~pp_results ?dyn ~j ?cpus ?timeout ?memory ?csv ~provers ~meta
?task ?summary ~dir_files ?proof_dir ?output ~save ~wal_mode
~desktop_notification ~no_failure ~update defs paths ()
in
Expand Down Expand Up @@ -67,6 +67,47 @@ module Run = struct
& opt (some int) None
& info [ "t"; "timeout" ] ~doc:"timeout (in s)")
and j = Arg.(value & opt int 1 & info [ "j" ] ~doc:"level of parallelism")
and cpus =
let doc = "Limit the specific CPUs or cores to use. When provided, the
[-j] flag is ignored, and each prover gets allocated its own CPU core from
this list. A comma-separated list or hyphen-separated ranges are allowed."
in
let parser s =
match String.split_on_char '-' s with
| [] -> assert false (* [split_on_char] invariant *)
| [ n ] -> Result.map (fun x -> (x, x)) Arg.(conv_parser int n)
| [ n; m ] ->
Result.bind Arg.(conv_parser int n) @@ fun n ->
Result.bind Arg.(conv_parser int m) @@ fun m ->
if m < n then
Error (`Msg (Format.asprintf "invalid range: %d-%d" n m) )
else
Ok ((n, m))
| _ ->
Error (`Msg (Format.asprintf "invalid cpuset: %s" s))
in
let printer ppf (n, m) =
if n = m then
Format.pp_print_int ppf n
else
Format.fprintf ppf "%d-%d" n m
in
let cpuspec = Arg.conv ~docv:"MASK" (parser, printer) in
let parse xs =
let cpus =
CCList.flat_map
(fun (n, m) -> List.init (m + 1 - n) (fun i -> i + n))
xs
|> List.sort_uniq Int.compare
in
match cpus with
| [] -> None
| _ -> Some cpus
in
Term.(
const parse $
Arg.(value & opt (list cpuspec) [] & info [ "cpus" ] ~doc)
)
and memory =
Arg.(
value
Expand Down Expand Up @@ -123,7 +164,7 @@ module Run = struct
in
Cmd.v (Cmd.info ~doc "run")
Term.(
const aux $ j $ pp_results $ dyn $ paths $ dir_files $ proof_dir $ defs
const aux $ j $ cpus $ pp_results $ dyn $ paths $ dir_files $ proof_dir $ defs
$ task $ timeout $ memory $ meta $ provers $ csv $ summary $ no_color
$ output $ save $ wal_mode $ desktop_notification $ no_failure $ update)
end
Expand Down
63 changes: 47 additions & 16 deletions src/core/Exec_action.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ module Log = (val Logs.src_log (Logs.Src.create "benchpress.runexec-action"))
module Exec_run_provers : sig
type t = Action.run_provers

type jobs =
| Bounded of int
| Cpus of int list

type expanded = {
j: int;
j: jobs;
problems: Problem.t list;
provers: Prover.t list;
checkers: Proof_checker.t Misc.Str_map.t;
Expand All @@ -18,6 +22,7 @@ module Exec_run_provers : sig
val expand :
?slurm:bool ->
?j:int ->
?cpus:int list ->
?dyn:bool ->
?limits:Limit.All.t ->
?proof_dir:string ->
Expand Down Expand Up @@ -74,6 +79,10 @@ module Exec_run_provers : sig
end = struct
type t = Action.run_provers

type jobs =
| Bounded of int
| Cpus of int list

let ( >? ) a b =
match a with
| None -> b
Expand All @@ -85,7 +94,7 @@ end = struct
| Some _ as x -> x

type expanded = {
j: int;
j: jobs;
problems: Problem.t list;
provers: Prover.t list;
checkers: Proof_checker.t Misc.Str_map.t;
Expand Down Expand Up @@ -136,7 +145,7 @@ end = struct
| exn -> Error.(raise @@ of_exn exn)
(* Expand options into concrete choices *)
let expand ?(slurm = false) ?j ?(dyn = false) ?limits ?proof_dir ?interrupted
let expand ?(slurm = false) ?j ?cpus ?(dyn = false) ?limits ?proof_dir ?interrupted
(defs : Definitions.t) s_limits s_j s_pattern s_dirs s_provers : expanded
=
let limits =
Expand All @@ -145,12 +154,17 @@ end = struct
| Some l -> Limit.All.with_defaults l ~defaults:s_limits
in
let j =
j >?? s_j
>?
if slurm then
0
else
Misc.guess_cpu_count ()
match cpus with
| None ->
let j =
j >?? s_j
>?
if slurm then
0
else
Misc.guess_cpu_count ()
in Bounded j
| Some cpus -> Cpus cpus
in
let problems =
CCList.flat_map
Expand Down Expand Up @@ -245,7 +259,7 @@ end = struct
CCOpt.iter Misc.mkdir_rec self.proof_dir;
let run_prover_pb ~prover ~pb ~db () : _ list =
let run_prover_pb ?cpu ~prover ~pb ~db () : _ list =
(* Also runs the proof checker, if the prover is proof producing
and returns "UNSAT". *)
if interrupted () then Error.fail "run.interrupted";
Expand Down Expand Up @@ -277,7 +291,7 @@ end = struct
(* continue but ensure we cleanup the proof file *)
with_proof_file_opt ~proof_file ~keep @@ fun () ->
let result =
Run_prover_problem.run ~limits:self.limits ~proof_file prover pb
Run_prover_problem.run ?cpu ~limits:self.limits ~proof_file prover pb
in
(* insert into DB here *)
let ev_prover = Run_event.mk_prover result in
Expand Down Expand Up @@ -331,10 +345,20 @@ end = struct
(* run provers *)
let res_l =
let db = CCLock.create db in
Misc.Par_map.map_p ~j:self.j
(fun (prover, pb) -> run_prover_pb ~prover ~pb ~db ())
jobs
|> CCList.flatten
match self.j with
| Bounded j ->
Misc.Par_map.map_p ~j
(fun (prover, pb) -> run_prover_pb ~prover ~pb ~db ())
jobs
|> CCList.flatten
| Cpus cpus ->
List.iter (fun cpu ->
try Sys.mkdir ("/dev/shm/" ^ string_of_int cpu) 0o750 with Sys_error _ -> ())
cpus;
Misc.Par_map.map_with_resource ~resources:cpus (fun cpu (prover, pb) ->
Log.debug (fun m -> m "Running on cpu %d" cpu);
Misc.with_affinity cpu (run_prover_pb ~cpu ~prover ~pb ~db)) jobs
|> CCList.flatten
in
if interrupted () then Error.fail "run.interrupted";
Expand Down Expand Up @@ -375,6 +399,13 @@ end = struct
~port ~ntasks ?output ?(update = false) ~uuid ~save ~wal_mode
(self : expanded) : _ * _ =
ignore on_start_proof_check;
let j =
match self.j with
| Bounded j -> j
| Cpus cpus ->
Log.warn (fun m -> m "cpu affinity ignored in slurm mode");
List.length cpus
in
let start = Misc.now_s () in
let db =
prepare_db ~wal_mode ?output ~update timestamp uuid save self.provers
Expand Down Expand Up @@ -556,7 +587,7 @@ end = struct
(Unix.string_of_inet_addr addr)
used_port);
let sbatch_cmds =
Slurm_cmd.mk_sbatch_cmds self.limits self.proof_dir self.j addr used_port
Slurm_cmd.mk_sbatch_cmds self.limits self.proof_dir j addr used_port
partition config_file nodes
in
let job_ids =
Expand Down
9 changes: 8 additions & 1 deletion src/core/Exec_action.mli
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@ type cb_progress =
module Exec_run_provers : sig
type t = Action.run_provers

type jobs =
| Bounded of int
(* [Bounded j] is at most [j] parallel jobs *)
| Cpus of int list
(* [Cpus cpus] assigns an exclusive cpu from [cpus] to each job *)

type expanded = {
j: int;
j: jobs;
problems: Problem.t list;
provers: Prover.t list;
checkers: Proof_checker.t Misc.Str_map.t;
Expand All @@ -17,6 +23,7 @@ module Exec_run_provers : sig
val expand :
?slurm:bool ->
?j:int ->
?cpus:int list ->
?dyn:bool ->
?limits:Limit.All.t ->
?proof_dir:string ->
Expand Down
96 changes: 96 additions & 0 deletions src/core/Misc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,33 @@ module Par_map = struct
Logs.debug (fun k -> k "par-map: stop pool");
P.stop ();
res

(* Map on the list [l] with each call to [f] being associated one of the
resources from [resources] that is guaranteed not to be used concurrently
by another call to [f]. *)
let map_with_resource ~resources f l =
match l with
| [] -> []
| _ ->
if CCList.is_empty resources then invalid_arg "map_with_resource: ~resources";
die_on_sigterm();
let jobs = List.length resources in
let queue = CCBlockingQueue.create jobs in
List.iter (CCBlockingQueue.push queue) resources;
let f x =
let resource = CCBlockingQueue.take queue in
Fun.protect
~finally:(fun () -> CCBlockingQueue.push queue resource)
(fun () -> f resource x)
in
Logs.debug (fun m -> m "par-map: create pool j=%d" jobs);
let module P = CCPool.Make(struct
let max_size = jobs
end) in
let res = P.Fut.map_l (P.Fut.make1 f) l |> P.Fut.get in
Logs.debug (fun m -> m "par-map: stop pool");
P.stop ();
res
end

module Git = struct
Expand Down Expand Up @@ -449,3 +476,72 @@ let start_server n server_fun sock =
let establish_server n server_fun sockaddr =
let sock, _ = mk_socket sockaddr in
start_server n server_fun sock

(* Will not work on windows *)
let ramdisk = "/dev/shm"

(* Turns out that [Filename.open_temp_file] is not thread-safe *)
let open_temp_file = CCLock.create Filename.open_temp_file

let open_ram_file ?dir prefix suffix =
let temp_dir =
match dir with
| None -> ramdisk
| Some dir -> ramdisk ^ "/" ^ dir
in
CCLock.with_lock open_temp_file @@ fun open_temp_file ->
open_temp_file ~temp_dir prefix suffix

let with_ram_file ?dir prefix suffix f =
let fname, oc = open_ram_file ?dir prefix suffix in
Fun.protect
~finally:(fun () ->
close_out oc;
try Sys.remove fname with Sys_error _ -> ()
)
(fun () -> f fname)

let with_copy_to_ram ?dir fname f =
let suffix = Filename.basename fname in
let fname = begin
CCIO.with_in ~flags:[Open_binary] fname @@ fun ic ->
let fname, oc = open_ram_file ?dir "" suffix in
Fun.protect
~finally:(fun () -> close_out oc)
(fun () ->
CCIO.with_out ~flags:[Open_binary] fname @@ fun oc ->
CCIO.copy_into ~bufsize:(64 * 1024) ic oc;
fname)
end in
Fun.protect
~finally:(fun () -> (* try Sys.remove fname with Sys_error _ -> () *) () ) (fun () -> f fname)

let with_copy_from_ram ?dir fname f =
let suffix = Filename.basename fname in
let ram_fname, oc = open_ram_file ?dir "" suffix in
Fun.protect
~finally:(fun () ->
begin
CCIO.with_in ~flags:[Open_binary] ram_fname @@ fun ic ->
CCIO.copy_into ~bufsize:(64 * 1024) ic oc
end;
close_out oc;
Sys.remove ram_fname)
(fun () -> f ram_fname)

let with_copy_from_ram_opt ?dir fname f =
match fname with
| Some fname -> with_copy_from_ram ?dir fname (fun fname -> f (Some fname))
| None -> f None

let with_affinity cpu f =
let aff = Processor.Affinity.get_ids () in
Processor.Affinity.set_ids [cpu];
Fun.protect
~finally:(fun () -> Processor.Affinity.set_ids aff)
f

let with_affinity_opt cpu f =
match cpu with
| None -> f ()
| Some cpu -> with_affinity cpu f
Loading

0 comments on commit 19a6a1e

Please sign in to comment.