Skip to content

Commit

Permalink
Refactor merge stream API
Browse files Browse the repository at this point in the history
Summary:
This change packages up the merge stream API from a bag of functions and closed-over refs into an abstract record and functions parameterized over that type.

I tried to break out smaller functions that preserve invariants between the various state values.

I also added several counters which are useful for understanding the merge stream over time.

Reviewed By: nmote

Differential Revision: D13972924

fbshipit-source-id: 643782b39ad6a6353e50e526bc2a9fce7f86ecec
  • Loading branch information
samwgoldman authored and facebook-github-bot committed Feb 15, 2019
1 parent 1d0aa2c commit 44006e9
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 203 deletions.
22 changes: 14 additions & 8 deletions src/services/inference/merge_service.ml
Original file line number Diff line number Diff line change
Expand Up @@ -379,22 +379,28 @@ let merge_runner
) component_map in

let start_time = Unix.gettimeofday () in
let {Merge_stream.next; merge; stats} = Merge_stream.make ~num_workers
~dependency_graph ~leader_map ~component_map ~recheck_leader_map ~intermediate_result_callback
let stream = Merge_stream.create
~num_workers
~dependency_graph
~leader_map
~component_map
~recheck_leader_map
~intermediate_result_callback
in
Merge_stream.update_server_status stream;
(* returns parallel lists of filenames, error sets, and suppression sets *)
let%lwt ret = MultiWorkerLwt.call
workers
~job: (merge_strict_job ~worker_mutator ~reader ~options ~job)
~neutral: []
~merge:(merge ~master_mutator ~reader)
~next
~merge:(Merge_stream.merge ~master_mutator ~reader stream)
~next:(Merge_stream.next stream)
in
let total_number_of_files = Merge_stream.get_total_files stats in
let files_skipped = Merge_stream.get_skipped_files stats in
Hh_logger.info "Merge skipped %d of %d modules" files_skipped total_number_of_files;
let total_files = Merge_stream.total_files stream in
let skipped_files = Merge_stream.skipped_files stream in
Hh_logger.info "Merge skipped %d of %d modules" skipped_files total_files;
let elapsed = Unix.gettimeofday () -. start_time in
if Options.should_profile options then Hh_logger.info "merged (strict) in %f" elapsed;
Lwt.return (ret, files_skipped)
Lwt.return (ret, skipped_files)

let merge_strict = merge_runner ~job:merge_strict_component
Loading

0 comments on commit 44006e9

Please sign in to comment.