Skip to content

Implement Work Partitioner V2 #17111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
dba44c6
Work_partitioner: set up library
glyh Apr 19, 2025
06693b6
(Work Partitioner) Add module `Id_generator` to `Work_partitioner`
glyh Apr 19, 2025
139b539
(Work Partitioner) Add functor Job_pool
glyh Apr 19, 2025
a75bd0f
Snark Worker, Work Partitioner: Factor out common function extract_zk…
glyh Apr 30, 2025
f89b177
Work Partitioner: add Work_partitioner.Pending_zkapp_command
glyh Apr 30, 2025
f116bdb
Work Partitioner: Add {Zkapp_command_job_pool, Sent_job_pool}
glyh Apr 30, 2025
54a7b94
(Work Partitioner) Add Mergable_single_work
glyh Apr 25, 2025
9f530d0
Work Partitioner: add Work_partitioner.t and corresponding initializer
glyh Apr 30, 2025
ae7c6e6
Work Partitioner: Add Work_partitioner.reissue_old_task
glyh Apr 25, 2025
1952c76
Work Partitioner: Add Work_partitioner.issue_from_zkapp_command_work_…
glyh Apr 30, 2025
d976deb
Work Partitioner: Add Work_partitioner.Work_partitioner.{issue_from_t…
glyh Apr 30, 2025
87c7314
Work Partitioner: Add function Work_partitioner.consume_job_from_sel…
glyh Apr 25, 2025
51920ed
Work Partitioner: Implement {request_from_selector_and_consume_by_pa…
glyh Apr 30, 2025
77d72e9
Work Partitioner: add Work_partitioner.submit_result
glyh Apr 30, 2025
f49b4d4
Work Partitioner: implement submit_single
glyh Apr 26, 2025
cd50d63
Work Partitioner: implement submit_into_pending_zkapp_command
glyh Apr 30, 2025
9d26f6b
Work Partitioner: implement submit_partitioned_work
glyh Apr 26, 2025
7bfb930
Work Partitioner: Refactor Job_pool.{find_first_ready -> fold_until}
glyh May 1, 2025
8c501fe
FIX(Work Partitioner): go through all pending zkapp command to issue …
glyh May 1, 2025
daa37a2
FIX(Work Partitioner): we should prioritize completion of work on han…
glyh May 1, 2025
a348a99
Work Partitioner: remove recycle mechanism in ID generator and use a …
glyh May 12, 2025
866ae90
Work Partitioner: accept timeouted SNARK worker to submit a proof
glyh May 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@
(package (name webkit_trace_event))
(package (name with_hash))
(package (name work_selector))
(package (name work_partitioner))
(package (name zkapp_command_builder))
(package (name zkapps_examples))
(package (name zkapp_limits))
Expand Down
4 changes: 2 additions & 2 deletions src/lib/snark_work_lib/partitioned.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module Pairing = struct
module Stable = struct
module V1 = struct
(* this identifies a One_or_two work from Work_selector's perspective *)
type t = Pairing_ID of int
type t = Pairing_ID of int64
[@@deriving compare, hash, sexp, yojson, equal]

let to_latest = Fn.id
Expand Down Expand Up @@ -82,7 +82,7 @@ module Zkapp_command_job = struct
[%%versioned
module Stable = struct
module V1 = struct
type t = Job_ID of int [@@deriving compare, hash, sexp, yojson]
type t = Job_ID of int64 [@@deriving compare, hash, sexp, yojson]

let to_latest = Fn.id
end
Expand Down
3 changes: 2 additions & 1 deletion src/lib/snark_worker/dune
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
transaction_protocol_state
transaction_snark
transaction_snark_work
transaction_witness)
transaction_witness
work_partitioner)
(preprocess
(pps
ppx_bin_prot
Expand Down
48 changes: 14 additions & 34 deletions src/lib/snark_worker/prod.ml
Original file line number Diff line number Diff line change
Expand Up @@ -161,36 +161,16 @@ module Impl : Intf.Worker = struct
let open Deferred.Or_error.Let_syntax in
let (module M) = m in
match single with
| Transition (input, (w : Transaction_witness.Stable.Latest.t)) -> (
match w.transaction with
| Transition (input, (witness : Transaction_witness.Stable.Latest.t)) -> (
match witness.transaction with
| Command (Zkapp_command zkapp_command) -> (
let zkapp_command_cached =
Zkapp_command.write_all_proofs_to_disk ~proof_cache_db
zkapp_command
in
let%bind witnesses_specs_stmts =
Or_error.try_with (fun () ->
Transaction_snark.zkapp_command_witnesses_exn
~constraint_constants:M.constraint_constants
~global_slot:w.block_global_slot
~state_body:w.protocol_state_body
~fee_excess:Currency.Amount.Signed.zero
[ ( `Pending_coinbase_init_stack w.init_stack
, `Pending_coinbase_of_statement
{ Transaction_snark.Pending_coinbase_stack_state
.source = input.source.pending_coinbase_stack
; target = input.target.pending_coinbase_stack
}
, `Sparse_ledger w.first_pass_ledger
, `Sparse_ledger w.second_pass_ledger
, `Connecting_ledger_hash input.connecting_ledger_left
, Zkapp_command.write_all_proofs_to_disk ~proof_cache_db
zkapp_command )
]
|> List.rev )
|> Result.map_error ~f:(fun e ->
Error.createf
!"Failed to generate inputs for zkapp_command : %s: %s"
( Zkapp_command.Stable.Latest.to_yojson zkapp_command
|> Yojson.Safe.to_string )
(Error.to_string_hum e) )
|> Deferred.return
Work_partitioner.Snark_worker_shared.extract_zkapp_segment_works
~m ~input ~witness ~zkapp_command:zkapp_command_cached
in
match witnesses_specs_stmts with
| [] ->
Expand Down Expand Up @@ -239,7 +219,7 @@ module Impl : Intf.Worker = struct
Deferred.return
@@
(* Validate the received transaction *)
match w.transaction with
match witness.transaction with
| Command (Signed_command cmd) -> (
match Signed_command.check cmd with
| Some cmd ->
Expand All @@ -258,13 +238,13 @@ module Impl : Intf.Worker = struct
M.of_non_zkapp_command_transaction
~statement:{ input with sok_digest }
{ Transaction_protocol_state.Poly.transaction = t
; block_data = w.protocol_state_body
; global_slot = w.block_global_slot
; block_data = witness.protocol_state_body
; global_slot = witness.block_global_slot
}
~init_stack:w.init_stack
~init_stack:witness.init_stack
(unstage
(Mina_ledger.Sparse_ledger.handler w.first_pass_ledger) ) )
)
(Mina_ledger.Sparse_ledger.handler
witness.first_pass_ledger ) ) ) )
| Merge (_, proof1, proof2) ->
M.merge ~sok_digest proof1 proof2

Expand Down
24 changes: 24 additions & 0 deletions src/lib/work_partitioner/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
(library
(name work_partitioner)
(public_name work_partitioner)
(library_flags -linkall)
(libraries
;; OPAM libraries
async
core_kernel
;; Local libraries
mina_base
snark_work_lib
transaction_snark
transaction_witness
work_selector)

(preprocess
(pps
ppx_custom_printf
ppx_let
ppx_mina))

(instrumentation
(backend bisect_ppx))
(synopsis "Partition work returned by Work_selector and issue them to real Snark Worker"))
21 changes: 21 additions & 0 deletions src/lib/work_partitioner/id_generator.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
open Core_kernel

(* NOTE: range is both inclusive *)
type t = { logger : Logger.t; range : int64 * int64; mutable next_id : int64 }

let create ~logger =
{ logger
; range = (Int64.min_value, Int64.max_value)
; next_id = Int64.min_value
}

let next_id (t : t) : Int64.t =
let open Int64 in
let result = t.next_id in
let lower, upper = t.range in
if equal t.next_id upper then (
let logger = t.logger in
[%log warn] "ID generator reaching int64 boundart, recuring from 0" ;
t.next_id <- lower )
else t.next_id <- succ t.next_id ;
result
101 changes: 101 additions & 0 deletions src/lib/work_partitioner/job_pool.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
open Core_kernel

type ('accum, 'final) fold_action =
{ action : [ `Continue of 'accum | `Stop of 'final ]; slashed : bool }

(* NOTE: Maybe this is reusable with Work Selector as they also have reissue mechanism *)
module Make (ID : Hashtbl.Key) (Job : T) = struct
type job_item = Job.t option ref

type t =
{ timeline : (ID.t * job_item) Deque.t (* For iteration *)
; index : (ID.t, job_item) Hashtbl.t (* For marking job as done *)
}

let create () =
{ timeline = Deque.create (); index = Hashtbl.create (module ID) }

let rec peek (t : t) =
match Deque.dequeue_front t.timeline with
| None ->
None
| Some (_, { contents = None }) ->
peek t
| Some ((id, { contents = Some pending }) as item) ->
Deque.enqueue_front t.timeline item ;
Some (id, pending)

let fold_until ~(init : 'accum)
~(f : 'accum -> ID.t * Job.t -> ('accum, 'final) fold_action)
~(finish : 'accum -> 'final) t : 'final =
let stack = ref [] in
let acc = ref init in
let result = ref None in
while Option.is_none !result do
match Deque.dequeue_front t.timeline with
| None ->
result := finish !acc
| Some (_, { contents = None }) ->
(* Job done *)
()
| Some ((id, { contents = Some job }) as item) -> (
let { action; slashed } = f init (id, job) in
if not slashed then stack := item :: !stack ;
match action with
| `Continue new_acc ->
acc := new_acc
| `Stop final ->
result := final )
done ;
List.iter ~f:(fun item -> Deque.enqueue_front t.timeline item) !stack ;
!result

let attempt_add ~(key : ID.t) ~(job : Job.t) (t : t) =
let data = ref (Some job) in
match Hashtbl.add ~key ~data t.index with
| `Ok ->
Deque.enqueue_back t.timeline (key, data) ;
`Ok
| `Duplicate ->
`Duplicate

let slash (t : t) (id : ID.t) =
match Hashtbl.find_and_remove t.index id with
| None ->
None
| Some job_item ->
let result = !job_item in
job_item := None ;
result

let change ~(id : ID.t) ~(f : Job.t option -> Job.t option) (t : t) =
let process (current : job_item option) =
let output =
match current with
| None ->
f None
| Some job_already ->
let tmp = f !job_already in
job_already := None ;
tmp
in
match output with
| None ->
None
| Some data ->
let new_item = ref (Some data) in
Deque.enqueue_back t.timeline (id, new_item) ;
Some new_item
in

Hashtbl.change t.index id ~f:process

let replace ~(id : ID.t) ~(job : Job.t) = change ~id ~f:(const (Some job))

let find (t : t) (id : ID.t) =
match Hashtbl.find t.index id with
| Some { contents = Some job } ->
Some job
| _ ->
None
end
31 changes: 31 additions & 0 deletions src/lib/work_partitioner/mergable_single_work.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
(* This is half work to be merged into a Two of type _ One_or_two.t *)

open Core_kernel
module Work = Snark_work_lib

type t =
{ which_half : [ `First | `Second ]
; proof : Ledger_proof.Cached.t
; metric : Core.Time.Span.t * [ `Merge | `Transition ]
; spec : Work.Selector.Single.Spec.t
; prover : Signature_lib.Public_key.Compressed.t
; common : Work.Partitioned.Spec_common.t
}

let merge_to_one_result_exn (left : t) (right : t) : Work.Selector.Result.t =
assert (
List.for_all ~f:Fn.id
[ phys_equal left.which_half `First
; phys_equal right.which_half `Second
; Signature_lib.Public_key.Compressed.equal left.prover right.prover
; Currency.Fee.equal left.common.fee_of_full right.common.fee_of_full
] ) ;
let metrics = `Two (left.metric, right.metric) in
{ proofs = `Two (left.proof, right.proof)
; metrics
; spec =
{ instances = `Two (left.spec, right.spec)
; fee = left.common.fee_of_full
}
; prover = left.prover
}
50 changes: 50 additions & 0 deletions src/lib/work_partitioner/pending_zkapp_command.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
(* NOTE: this module is where the real optimization happen. One assumption we
have is the order of merging is irrelvant to the correctness of the final
proof. Hence we're only using a counter `merge_remaining` to track have we
reach the final proof
*)

open Core_kernel
module Work = Snark_work_lib

type t =
{ spec : Work.Selector.Single.Spec.t
(* the original work being splitted, should be identical to Work_selector.work *)
; fee_of_full : Currency.Fee.t
; unscheduled_segments : Work.Partitioned.Zkapp_command_job.Spec.t Queue.t
; pending_mergable_proofs : Ledger_proof.Cached.t Deque.t
(* we may need to insert proofs to merge back to the queue, hence a Deque *)
; mutable elapsed : Time.Stable.Span.V1.t
; mutable merge_remaining : int
}

let generate_merge ~(t : t) () =
let try_take2 (q : 'a Deque.t) : ('a * 'a) option =
match Deque.dequeue_front q with
| None ->
None
| Some fst -> (
match Deque.dequeue_front q with
| Some snd ->
Some (fst, snd)
| None ->
Deque.enqueue_front q fst ; None )
in
let open Option.Let_syntax in
let%map proof1, proof2 = try_take2 t.pending_mergable_proofs in
Work.Partitioned.Zkapp_command_job.Spec.Poly.Merge { proof1; proof2 }

let generate_segment ~(t : t) () =
let open Option.Let_syntax in
let%map segment = Queue.dequeue t.unscheduled_segments in
segment

let generate_job_spec (t : t) : Work.Partitioned.Zkapp_command_job.Spec.t option
=
List.find_map ~f:(fun f -> f ()) [ generate_merge ~t; generate_segment ~t ]

let submit_proof (t : t) ~(proof : Ledger_proof.Cached.t)
~(elapsed : Time.Stable.Span.V1.t) =
Deque.enqueue_back t.pending_mergable_proofs proof ;
t.merge_remaining <- t.merge_remaining - 1 ;
t.elapsed <- Time.Span.(t.elapsed + elapsed)
46 changes: 46 additions & 0 deletions src/lib/work_partitioner/snark_worker_shared.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
(* module contains logic that may be shared across coordinator and worker. This
is needed for backward compatibility reason. *)

open Core_kernel
open Async
open Mina_base
open Transaction_snark

(* NOTE:
This is used in both Work_partitioner and Snark_worker for compatibility
reasons
*)
let extract_zkapp_segment_works ~m:(module M : S)
~(input : Mina_state.Snarked_ledger_state.t)
~(witness : Transaction_witness.Stable.Latest.t)
~(zkapp_command : Zkapp_command.t) :
( Zkapp_command_segment.Witness.t
* Zkapp_command_segment.Basic.t
* Statement.With_sok.t )
list
Deferred.Or_error.t =
Or_error.try_with (fun () ->
Transaction_snark.zkapp_command_witnesses_exn
~constraint_constants:M.constraint_constants
~global_slot:witness.block_global_slot
~state_body:witness.protocol_state_body
~fee_excess:Currency.Amount.Signed.zero
[ ( `Pending_coinbase_init_stack witness.init_stack
, `Pending_coinbase_of_statement
{ Transaction_snark.Pending_coinbase_stack_state.source =
input.source.pending_coinbase_stack
; target = input.target.pending_coinbase_stack
}
, `Sparse_ledger witness.first_pass_ledger
, `Sparse_ledger witness.second_pass_ledger
, `Connecting_ledger_hash input.connecting_ledger_left
, zkapp_command )
]
|> List.rev )
|> Result.map_error ~f:(fun e ->
Error.createf
!"Failed to generate inputs for zkapp_command : %s: %s"
( Zkapp_command.read_all_proofs_from_disk zkapp_command
|> Zkapp_command.Stable.Latest.to_yojson |> Yojson.Safe.to_string )
(Error.to_string_hum e) )
|> Deferred.return
Loading