Skip to content

Commit

Permalink
Use number of workers, not number of processors, to determine merge b…
Browse files Browse the repository at this point in the history
…ucket size

Summary:
This logic is intended to divide work roughly evenly between the available
workers. Normally the number of workers is the same as the number of
processors, but can be different through configuration.

If the configured number of workers is less than the number of processors, the
existing logic will cause oversmall buckets. If the configured number of
workers is greater than the number of processors, the buckets will be too
large. In both cases, the end result is poor utilization of workers.

Reviewed By: gabelevi

Differential Revision: D13945046

fbshipit-source-id: eabe2403bb689fde783912d9cada971d9be075f0
  • Loading branch information
samwgoldman authored and facebook-github-bot committed Feb 5, 2019
1 parent 2a5cf71 commit 5ef67eb
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 4 deletions.
3 changes: 2 additions & 1 deletion src/services/inference/merge_service.ml
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ let merge_strict_job ~worker_mutator ~reader ~job ~options merged elements =
let merge_runner
~job ~master_mutator ~worker_mutator ~reader ~intermediate_result_callback ~options ~workers
dependency_graph component_map recheck_map =
let num_workers = Options.max_workers options in
(* make a map from files to their component leaders *)
let leader_map =
FilenameMap.fold (fun file component acc ->
Expand All @@ -415,7 +416,7 @@ let merge_runner
) component_map in

let start_time = Unix.gettimeofday () in
let {Merge_stream.next; merge; stats} = Merge_stream.make
let {Merge_stream.next; merge; stats} = Merge_stream.make ~num_workers
~dependency_graph ~leader_map ~component_map ~recheck_leader_map ~intermediate_result_callback
in
(* returns parallel lists of filenames, error sets, and suppression sets *)
Expand Down
7 changes: 4 additions & 3 deletions src/services/inference/merge_stream.ml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type 'a merge_stream = {
}

let make
~num_workers
~dependency_graph
~leader_map
~component_map
Expand Down Expand Up @@ -173,14 +174,14 @@ let make
(* component_map is a map from leaders to components *)
(* dependency_graph is a map from files to dependencies *)
let next =
let procs = Sys_utils.nbr_procs in
fun () ->
let jobs = Stream.length !stream in
if jobs = 0 && !blocked <> 0 then Bucket.Wait
else
(* NB: num_workers can be zero *)
let bucket_size =
if jobs < procs * max_bucket_size
then 1 + (jobs / procs)
if jobs < num_workers * max_bucket_size
then 1 + (jobs / num_workers)
else max_bucket_size
in
let n = min bucket_size jobs in
Expand Down
1 change: 1 addition & 0 deletions src/services/inference/merge_stream.mli
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type 'a merge_stream = {
}

val make :
num_workers: int ->
dependency_graph: FilenameSet.t FilenameMap.t ->
leader_map: File_key.t FilenameMap.t ->
component_map: File_key.t Nel.t FilenameMap.t ->
Expand Down

0 comments on commit 5ef67eb

Please sign in to comment.