From 9126ececb535a2b80ad1fb0c32507f3813ece8a4 Mon Sep 17 00:00:00 2001 From: Marko Atanasievski Date: Wed, 4 Sep 2024 15:14:15 +0200 Subject: [PATCH] feat: follow from block interval (#582) * fix: unify block interval stream api * wip: refactor of proving logic * feat: update leader * fix: test scripts * fix: ci * fix: redundand short arguments * fix: build * update: follow from * fix: cleanup * chore: update block polling time * fix: error handling * fix: improve error output * fix: use cli block_time * fix: reviews * fix: comment * fix: nit * chore: passing runtime * fix: tests * fix: optimize * fix: tests * fix: clean up --- .gitignore | 1 + zero_bin/common/src/block_interval.rs | 67 ++++---- zero_bin/common/src/lib.rs | 6 + zero_bin/leader/src/cli.rs | 15 +- zero_bin/leader/src/client.rs | 137 +++++++--------- zero_bin/leader/src/http.rs | 11 +- zero_bin/leader/src/main.rs | 29 +--- zero_bin/leader/src/stdio.rs | 51 ++++-- zero_bin/prover/Cargo.toml | 26 +-- zero_bin/prover/src/cli.rs | 30 +++- zero_bin/prover/src/lib.rs | 224 ++++++++++++++------------ zero_bin/rpc/src/main.rs | 6 +- zero_bin/tools/prove_rpc.sh | 23 ++- zero_bin/tools/prove_stdio.sh | 59 ++++--- 14 files changed, 378 insertions(+), 307 deletions(-) diff --git a/.gitignore b/.gitignore index 3d1dc8c49..667ccfc9c 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ *.iml .idea/ .vscode +**/output.log diff --git a/zero_bin/common/src/block_interval.rs b/zero_bin/common/src/block_interval.rs index 5a7a79117..a659976a3 100644 --- a/zero_bin/common/src/block_interval.rs +++ b/zero_bin/common/src/block_interval.rs @@ -1,3 +1,6 @@ +use std::pin::Pin; +use std::sync::Arc; + use alloy::primitives::B256; use alloy::rpc::types::eth::BlockId; use alloy::{hex, providers::Provider, transports::Transport}; @@ -7,8 +10,11 @@ use futures::Stream; use tracing::info; use crate::parsing; +use crate::provider::CachedProvider; -const DEFAULT_BLOCK_TIME: u64 = 1000; +/// The async stream of block numbers. +/// The second bool flag indicates if the element is last in the interval. +pub type BlockIntervalStream = Pin>>>; /// Range of blocks to be processed and proven. #[derive(Debug, PartialEq, Clone)] @@ -21,9 +27,6 @@ pub enum BlockInterval { FollowFrom { // Interval starting block number start_block: u64, - // Block time specified in milliseconds. - // If not set, use the default block time to poll node. - block_time: Option, }, } @@ -44,7 +47,7 @@ impl BlockInterval { /// assert_eq!(BlockInterval::new("0..10").unwrap(), BlockInterval::Range(0..10)); /// assert_eq!(BlockInterval::new("0..=10").unwrap(), BlockInterval::Range(0..11)); /// assert_eq!(BlockInterval::new("32141").unwrap(), BlockInterval::SingleBlockId(BlockId::Number(32141.into()))); - /// assert_eq!(BlockInterval::new("100..").unwrap(), BlockInterval::FollowFrom{start_block: 100, block_time: None}); + /// assert_eq!(BlockInterval::new("100..").unwrap(), BlockInterval::FollowFrom{start_block: 100}); /// ``` pub fn new(s: &str) -> anyhow::Result { if (s.starts_with("0x") && s.len() == 66) || s.len() == 64 { @@ -77,10 +80,7 @@ impl BlockInterval { .map_err(|_| anyhow!("invalid block number '{num}'")) }) .ok_or(anyhow!("invalid block interval range '{s}'"))??; - return Ok(BlockInterval::FollowFrom { - start_block: num, - block_time: None, - }); + return Ok(BlockInterval::FollowFrom { start_block: num }); } // Only single block number is left to try to parse else { @@ -92,16 +92,24 @@ impl BlockInterval { } } - /// Convert the block interval into an async stream of block numbers. - pub fn into_bounded_stream(self) -> anyhow::Result> { + /// Convert the block interval into an async stream of block numbers. The + /// second bool flag indicates if the element is last in the interval. + pub fn into_bounded_stream(self) -> Result { match self { BlockInterval::SingleBlockId(BlockId::Number(num)) => { let num = num .as_number() .ok_or(anyhow!("invalid block number '{num}'"))?; - Ok(futures::stream::iter(num..num + 1)) + let range = (num..num + 1).map(|it| Ok((it, true))).collect::>(); + + Ok(Box::pin(futures::stream::iter(range))) + } + BlockInterval::Range(range) => { + let mut range = range.map(|it| Ok((it, false))).collect::>(); + // Set last element indicator to true + range.last_mut().map(|it| it.as_mut().map(|it| it.1 = true)); + Ok(Box::pin(futures::stream::iter(range))) } - BlockInterval::Range(range) => Ok(futures::stream::iter(range)), _ => Err(anyhow!( "could not create bounded stream from unbounded follow-from interval", )), @@ -126,36 +134,33 @@ impl BlockInterval { /// numbers. Query the blockchain node for the latest block number. pub async fn into_unbounded_stream( self, - provider: ProviderT, - ) -> Result>, anyhow::Error> + cached_provider: Arc>, + block_time: u64, + ) -> Result where - ProviderT: Provider, + ProviderT: Provider + 'static, TransportT: Transport + Clone, { match self { - BlockInterval::FollowFrom { - start_block, - block_time, - } => Ok(try_stream! { + BlockInterval::FollowFrom { start_block } => Ok(Box::pin(try_stream! { let mut current = start_block; loop { - let last_block_number = provider.get_block_number().await.map_err(|e: alloy::transports::RpcError<_>| { + let last_block_number = cached_provider.get_provider().await?.get_block_number().await.map_err(|e: alloy::transports::RpcError<_>| { anyhow!("could not retrieve latest block number from the provider: {e}") })?; if current < last_block_number { current += 1; - yield current; + yield (current, false); } else { info!("Waiting for the new blocks to be mined, requested block number: {current}, \ latest block number: {last_block_number}"); - let block_time = block_time.unwrap_or(DEFAULT_BLOCK_TIME); // No need to poll the node too frequently, waiting // a block time interval for a block to be mined should be enough tokio::time::sleep(tokio::time::Duration::from_millis(block_time)).await; } } - }), + })), _ => Err(anyhow!( "could not create unbounded follow-from stream from fixed bounded interval", )), @@ -214,10 +219,7 @@ mod test { fn can_create_follow_from_block_interval() { assert_eq!( BlockInterval::new("100..").unwrap(), - BlockInterval::FollowFrom { - start_block: 100, - block_time: None - } + BlockInterval::FollowFrom { start_block: 100 } ); } @@ -270,9 +272,14 @@ mod test { .into_bounded_stream() .unwrap(); while let Some(val) = stream.next().await { - result.push(val); + result.push(val.unwrap()); } - assert_eq!(result, Vec::from_iter(1u64..10u64)); + let mut expected = Vec::from_iter(1u64..10u64) + .into_iter() + .map(|it| (it, false)) + .collect::>(); + expected.last_mut().unwrap().1 = true; + assert_eq!(result, expected); } #[test] diff --git a/zero_bin/common/src/lib.rs b/zero_bin/common/src/lib.rs index 4d9fadbda..42ce661d7 100644 --- a/zero_bin/common/src/lib.rs +++ b/zero_bin/common/src/lib.rs @@ -6,3 +6,9 @@ pub mod pre_checks; pub mod prover_state; pub mod provider; pub mod version; + +/// Size of the channel used to send block prover inputs to the per block +/// proving task. If the proving task is slow and can not consume inputs fast +/// enough retrieval of the block prover inputs will block until the proving +/// task consumes some of the inputs. +pub const BLOCK_CHANNEL_SIZE: usize = 16; diff --git a/zero_bin/leader/src/cli.rs b/zero_bin/leader/src/cli.rs index 50c6e7ce1..9cc2de300 100644 --- a/zero_bin/leader/src/cli.rs +++ b/zero_bin/leader/src/cli.rs @@ -51,23 +51,10 @@ pub(crate) enum Command { /// The previous proof output. #[arg(long, short = 'f', value_hint = ValueHint::FilePath)] previous_proof: Option, - /// If provided, write the generated proofs to this directory instead of - /// stdout. - #[arg(long, short = 'o', value_hint = ValueHint::FilePath)] - proof_output_dir: Option, - /// Network block time in milliseconds. This value is used + /// Blockchain network block time in milliseconds. This value is used /// to determine the blockchain node polling interval. #[arg(short, long, env = "ZERO_BIN_BLOCK_TIME", default_value_t = 2000)] block_time: u64, - /// Keep intermediate proofs. Default action is to - /// delete them after the final proof is generated. - #[arg( - short, - long, - env = "ZERO_BIN_KEEP_INTERMEDIATE_PROOFS", - default_value_t = false - )] - keep_intermediate_proofs: bool, /// Backoff in milliseconds for retry requests #[arg(long, default_value_t = 0)] backoff: u64, diff --git a/zero_bin/leader/src/client.rs b/zero_bin/leader/src/client.rs index 61fe9f45f..ea4e5e9ae 100644 --- a/zero_bin/leader/src/client.rs +++ b/zero_bin/leader/src/client.rs @@ -1,17 +1,15 @@ -use std::io::Write; -use std::path::PathBuf; use std::sync::Arc; use alloy::rpc::types::{BlockId, BlockNumberOrTag, BlockTransactionsKind}; use alloy::transports::http::reqwest::Url; -use anyhow::Result; +use anyhow::{anyhow, Result}; use paladin::runtime::Runtime; use proof_gen::proof_types::GeneratedBlockProof; -use prover::ProverConfig; +use prover::{BlockProverInput, ProverConfig}; use rpc::{retry::build_http_retry_provider, RpcType}; -use tracing::{error, info, warn}; -use zero_bin_common::block_interval::BlockInterval; -use zero_bin_common::fs::generate_block_proof_file_name; +use tokio::sync::mpsc; +use tracing::info; +use zero_bin_common::block_interval::{BlockInterval, BlockIntervalStream}; use zero_bin_common::pre_checks::check_previous_proof_and_checkpoint; #[derive(Debug)] @@ -20,25 +18,24 @@ pub struct RpcParams { pub rpc_type: RpcType, pub backoff: u64, pub max_retries: u32, + pub block_time: u64, } #[derive(Debug)] -pub struct ProofParams { +pub struct LeaderConfig { pub checkpoint_block_number: u64, pub previous_proof: Option, - pub proof_output_dir: Option, pub prover_config: ProverConfig, - pub keep_intermediate_proofs: bool, } /// The main function for the client. pub(crate) async fn client_main( - runtime: Runtime, + runtime: Arc, rpc_params: RpcParams, block_interval: BlockInterval, - mut params: ProofParams, + mut leader_config: LeaderConfig, ) -> Result<()> { - use futures::{FutureExt, StreamExt}; + use futures::StreamExt; let cached_provider = Arc::new(zero_bin_common::provider::CachedProvider::new( build_http_retry_provider( @@ -48,94 +45,84 @@ pub(crate) async fn client_main( )?, )); check_previous_proof_and_checkpoint( - params.checkpoint_block_number, - ¶ms.previous_proof, + leader_config.checkpoint_block_number, + &leader_config.previous_proof, block_interval.get_start_block()?, )?; // Grab interval checkpoint block state trie. let checkpoint_state_trie_root = cached_provider .get_block( - params.checkpoint_block_number.into(), + leader_config.checkpoint_block_number.into(), BlockTransactionsKind::Hashes, ) .await? .header .state_root; - let mut block_prover_inputs = Vec::new(); - let mut block_interval = block_interval.into_bounded_stream()?; - while let Some(block_num) = block_interval.next().await { + // Create a channel for block prover input and use it to send prover input to + // the proving task. The second element of the tuple is a flag indicating + // whether the block is the last one in the interval. + let (block_tx, block_rx) = + mpsc::channel::<(BlockProverInput, bool)>(zero_bin_common::BLOCK_CHANNEL_SIZE); + let test_only = leader_config.prover_config.test_only; + + // Run proving task + let runtime_ = runtime.clone(); + let proving_task = tokio::spawn(prover::prove( + block_rx, + runtime_, + leader_config.previous_proof.take(), + Arc::new(leader_config.prover_config), + )); + + // Create block interval stream. Could be bounded or unbounded. + let mut block_interval_stream: BlockIntervalStream = match block_interval { + block_interval @ BlockInterval::FollowFrom { .. } => { + block_interval + .into_unbounded_stream(cached_provider.clone(), rpc_params.block_time) + .await? + } + _ => block_interval.into_bounded_stream()?, + }; + + // Iterate over the block interval, retrieve prover input + // and send it to the proving task + while let Some(block_interval_elem) = block_interval_stream.next().await { + let (block_num, is_last_block) = block_interval_elem?; let block_id = BlockId::Number(BlockNumberOrTag::Number(block_num)); - // Get future of prover input for particular block. + // Get prover input for particular block. let block_prover_input = rpc::block_prover_input( cached_provider.clone(), block_id, checkpoint_state_trie_root, rpc_params.rpc_type, ) - .boxed(); - block_prover_inputs.push(block_prover_input); + .await?; + block_tx + .send((block_prover_input, is_last_block)) + .await + .map_err(|e| anyhow!("failed to send block prover input through the channel: {e}"))?; + } + + match proving_task.await { + Ok(Ok(_)) => { + info!("Proving task successfully finished"); + } + Ok(Err(e)) => { + anyhow::bail!("Proving task finished with error: {e:?}"); + } + Err(e) => { + anyhow::bail!("Unable to join proving task, error: {e:?}"); + } } - // If `keep_intermediate_proofs` is not set we only keep the last block - // proof from the interval. It contains all the necessary information to - // verify the whole sequence. - let proved_blocks = prover::prove( - block_prover_inputs, - &runtime, - params.previous_proof.take(), - params.prover_config, - params.proof_output_dir.clone(), - ) - .await; runtime.close().await?; - let proved_blocks = proved_blocks?; - if params.prover_config.test_only { + if test_only { info!("All proof witnesses have been generated successfully."); } else { info!("All proofs have been generated successfully."); } - if !params.prover_config.test_only { - if params.keep_intermediate_proofs { - if params.proof_output_dir.is_some() { - // All proof files (including intermediary) are written to disk and kept - warn!("Skipping cleanup, intermediate proof files are kept"); - } else { - // Output all proofs to stdout - std::io::stdout().write_all(&serde_json::to_vec( - &proved_blocks - .into_iter() - .filter_map(|(_, block)| block) - .collect::>(), - )?)?; - } - } else if let Some(proof_output_dir) = params.proof_output_dir.as_ref() { - // Remove intermediary proof files - proved_blocks - .into_iter() - .rev() - .skip(1) - .map(|(block_number, _)| { - generate_block_proof_file_name(&proof_output_dir.to_str(), block_number) - }) - .for_each(|path| { - if let Err(e) = std::fs::remove_file(path) { - error!("Failed to remove intermediate proof file: {e}"); - } - }); - } else { - // Output only last proof to stdout - if let Some(last_block) = proved_blocks - .into_iter() - .filter_map(|(_, block)| block) - .last() - { - std::io::stdout().write_all(&serde_json::to_vec(&last_block)?)?; - } - } - } - Ok(()) } diff --git a/zero_bin/leader/src/http.rs b/zero_bin/leader/src/http.rs index 39c7333e1..14fa965a4 100644 --- a/zero_bin/leader/src/http.rs +++ b/zero_bin/leader/src/http.rs @@ -12,15 +12,14 @@ use tracing::{debug, error, info}; /// The main function for the HTTP mode. pub(crate) async fn http_main( - runtime: Runtime, + runtime: Arc, port: u16, output_dir: PathBuf, - prover_config: ProverConfig, + prover_config: Arc, ) -> Result<()> { let addr = SocketAddr::from(([0, 0, 0, 0], port)); debug!("listening on {}", addr); - let runtime = Arc::new(runtime); let app = Router::new().route( "/prove", post({ @@ -65,7 +64,7 @@ async fn prove( Json(payload): Json, runtime: Arc, output_dir: PathBuf, - prover_config: ProverConfig, + prover_config: Arc, ) -> StatusCode { debug!("Received payload: {:#?}", payload); @@ -75,7 +74,7 @@ async fn prove( payload .prover_input .prove_test( - &runtime, + runtime, payload.previous.map(futures::future::ok), prover_config, ) @@ -84,7 +83,7 @@ async fn prove( payload .prover_input .prove( - &runtime, + runtime, payload.previous.map(futures::future::ok), prover_config, ) diff --git a/zero_bin/leader/src/main.rs b/zero_bin/leader/src/main.rs index f4a448a3b..dc65f0eac 100644 --- a/zero_bin/leader/src/main.rs +++ b/zero_bin/leader/src/main.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::{env, io}; use std::{fs::File, path::PathBuf}; @@ -16,7 +17,7 @@ use zero_bin_common::{ }; use zero_bin_common::{prover_state::persistence::CIRCUIT_VERSION, version}; -use crate::client::{client_main, ProofParams}; +use crate::client::{client_main, LeaderConfig}; mod cli; mod client; @@ -54,9 +55,7 @@ async fn main() -> Result<()> { } let args = cli::Cli::parse(); - - let runtime = Runtime::from_config(&args.paladin, register()).await?; - + let runtime = Arc::new(Runtime::from_config(&args.paladin, register()).await?); let prover_config: ProverConfig = args.prover_config.into(); // If not in test_only mode and running in emulation mode, we'll need to @@ -73,7 +72,7 @@ async fn main() -> Result<()> { Command::Clean => zero_bin_common::prover_state::persistence::delete_all()?, Command::Stdio { previous_proof } => { let previous_proof = get_previous_proof(previous_proof)?; - stdio::stdio_main(runtime, previous_proof, prover_config).await?; + stdio::stdio_main(runtime, previous_proof, Arc::new(prover_config)).await?; } Command::Http { port, output_dir } => { // check if output_dir exists, is a directory, and is writable @@ -85,7 +84,7 @@ async fn main() -> Result<()> { panic!("output-dir is not a writable directory"); } - http::http_main(runtime, port, output_dir, prover_config).await?; + http::http_main(runtime, port, output_dir, Arc::new(prover_config)).await?; } Command::Rpc { rpc_url, @@ -93,23 +92,12 @@ async fn main() -> Result<()> { block_interval, checkpoint_block_number, previous_proof, - proof_output_dir, block_time, - keep_intermediate_proofs, backoff, max_retries, } => { - let runtime = Runtime::from_config(&args.paladin, register()).await?; let previous_proof = get_previous_proof(previous_proof)?; - let mut block_interval = BlockInterval::new(&block_interval)?; - - if let BlockInterval::FollowFrom { - start_block: _, - block_time: ref mut block_time_opt, - } = block_interval - { - *block_time_opt = Some(block_time); - } + let block_interval = BlockInterval::new(&block_interval)?; info!("Proving interval {block_interval}"); client_main( @@ -119,14 +107,13 @@ async fn main() -> Result<()> { rpc_type, backoff, max_retries, + block_time, }, block_interval, - ProofParams { + LeaderConfig { checkpoint_block_number, previous_proof, - proof_output_dir, prover_config, - keep_intermediate_proofs, }, ) .await?; diff --git a/zero_bin/leader/src/stdio.rs b/zero_bin/leader/src/stdio.rs index 88dd20aac..71db3c6ae 100644 --- a/zero_bin/leader/src/stdio.rs +++ b/zero_bin/leader/src/stdio.rs @@ -1,16 +1,18 @@ -use std::io::{Read, Write}; +use std::io::Read; +use std::sync::Arc; -use anyhow::Result; +use anyhow::{anyhow, Result}; use paladin::runtime::Runtime; use proof_gen::proof_types::GeneratedBlockProof; -use prover::{BlockProverInput, BlockProverInputFuture, ProverConfig}; +use prover::{BlockProverInput, ProverConfig}; +use tokio::sync::mpsc; use tracing::info; /// The main function for the stdio mode. pub(crate) async fn stdio_main( - runtime: Runtime, + runtime: Arc, previous: Option, - prover_config: ProverConfig, + prover_config: Arc, ) -> Result<()> { let mut buffer = String::new(); std::io::stdin().read_to_string(&mut buffer)?; @@ -18,13 +20,36 @@ pub(crate) async fn stdio_main( let des = &mut serde_json::Deserializer::from_str(&buffer); let block_prover_inputs = serde_path_to_error::deserialize::<_, Vec>(des)? .into_iter() - .map(Into::into) - .collect::>(); + .collect::>(); + + let (block_tx, block_rx) = + mpsc::channel::<(BlockProverInput, bool)>(zero_bin_common::BLOCK_CHANNEL_SIZE); + + let runtime_ = runtime.clone(); + let prover_config_ = prover_config.clone(); + let proving_task = tokio::spawn(prover::prove(block_rx, runtime_, previous, prover_config_)); + + let interval_len = block_prover_inputs.len(); + for (index, block_prover_input) in block_prover_inputs.into_iter().enumerate() { + block_tx + .send((block_prover_input, interval_len == index + 1)) + .await + .map_err(|e| anyhow!("Failed to send block prover input through the channel: {e}"))?; + } + + match proving_task.await { + Ok(Ok(_)) => { + info!("Proving task successfully finished"); + } + Ok(Err(e)) => { + anyhow::bail!("Proving task finished with error: {e:?}"); + } + Err(e) => { + anyhow::bail!("Unable to join proving task, error: {e:?}"); + } + } - let proved_blocks = - prover::prove(block_prover_inputs, &runtime, previous, prover_config, None).await; runtime.close().await?; - let proved_blocks = proved_blocks?; if prover_config.test_only { info!("All proof witnesses have been generated successfully."); @@ -32,11 +57,5 @@ pub(crate) async fn stdio_main( info!("All proofs have been generated successfully."); } - let proofs: Vec = proved_blocks - .into_iter() - .filter_map(|(_, proof)| proof) - .collect(); - std::io::stdout().write_all(&serde_json::to_vec(&proofs)?)?; - Ok(()) } diff --git a/zero_bin/prover/Cargo.toml b/zero_bin/prover/Cargo.toml index d96f2ff0f..6295f65b4 100644 --- a/zero_bin/prover/Cargo.toml +++ b/zero_bin/prover/Cargo.toml @@ -9,24 +9,24 @@ keywords.workspace = true categories.workspace = true [dependencies] -serde = { workspace = true } -proof_gen = { workspace = true } -plonky2 = { workspace = true } -plonky2_maybe_rayon = { workspace = true } -trace_decoder = { workspace = true } -tracing = { workspace = true } -paladin-core = { workspace = true } +alloy.workspace = true anyhow = { workspace = true } +clap = {workspace = true, features = ["derive", "string"] } evm_arithmetization = { workspace = true } futures = { workspace = true } -alloy.workspace = true -tokio = { workspace = true } -serde_json = { workspace = true } -ruint = { workspace = true, features = ["num-traits", "primitive-types"] } +num-traits = { workspace = true } ops = { workspace = true } +paladin-core = { workspace = true } +plonky2 = { workspace = true } +plonky2_maybe_rayon = { workspace = true } +proof_gen = { workspace = true } +ruint = { workspace = true, features = ["num-traits", "primitive-types"] } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +trace_decoder = { workspace = true } +tracing = { workspace = true } zero_bin_common = { workspace = true } -num-traits = { workspace = true } -clap = {workspace = true} [features] default = [] diff --git a/zero_bin/prover/src/cli.rs b/zero_bin/prover/src/cli.rs index a49eb2d1d..694194ad9 100644 --- a/zero_bin/prover/src/cli.rs +++ b/zero_bin/prover/src/cli.rs @@ -1,7 +1,16 @@ -use clap::Args; +use std::path::PathBuf; + +use clap::{Args, ValueHint}; const HELP_HEADING: &str = "Prover options"; +// If not provided, default output path is `./proofs/`. +fn get_default_output_path() -> PathBuf { + let mut path = std::env::current_dir().unwrap_or_default(); + path.push("proofs"); + path +} + /// Represents the main configuration structure for the runtime. #[derive(Args, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Default)] pub struct CliProverConfig { @@ -18,6 +27,22 @@ pub struct CliProverConfig { /// generating a proof. #[arg(long, help_heading = HELP_HEADING, default_value_t = false)] test_only: bool, + /// Directory where the generated proofs will be written. + #[arg(long, short = 'o', value_hint = ValueHint::FilePath, default_value = get_default_output_path().into_os_string())] + proof_output_dir: PathBuf, + /// Keep intermediate proofs. Default action is to + /// delete them after the final proof is generated. + #[arg( + short, + long, + env = "ZERO_BIN_KEEP_INTERMEDIATE_PROOFS", + default_value_t = false + )] + keep_intermediate_proofs: bool, + /// Number of blocks in a batch. For every block batch, the prover will + /// generate one proof file. + #[arg(long, default_value_t = 8)] + block_batch_size: usize, } impl From for crate::ProverConfig { @@ -27,6 +52,9 @@ impl From for crate::ProverConfig { max_cpu_len_log: cli.max_cpu_len_log, save_inputs_on_error: cli.save_inputs_on_error, test_only: cli.test_only, + proof_output_dir: cli.proof_output_dir, + keep_intermediate_proofs: cli.keep_intermediate_proofs, + block_batch_size: cli.block_batch_size, } } } diff --git a/zero_bin/prover/src/lib.rs b/zero_bin/prover/src/lib.rs index 0d49ca733..21be514ad 100644 --- a/zero_bin/prover/src/lib.rs +++ b/zero_bin/prover/src/lib.rs @@ -1,40 +1,32 @@ pub mod cli; use std::future::Future; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; +use std::sync::Arc; -use alloy::primitives::{BlockNumber, U256}; +use alloy::primitives::U256; use anyhow::{Context, Result}; -use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt, TryFutureExt, TryStreamExt}; +use futures::{future::BoxFuture, FutureExt, TryFutureExt, TryStreamExt}; use num_traits::ToPrimitive as _; use paladin::runtime::Runtime; use proof_gen::proof_types::GeneratedBlockProof; use serde::{Deserialize, Serialize}; use tokio::io::AsyncWriteExt; +use tokio::sync::mpsc::Receiver; use tokio::sync::oneshot; use trace_decoder::{BlockTrace, OtherBlockData}; -use tracing::info; +use tracing::{error, info}; use zero_bin_common::fs::generate_block_proof_file_name; -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] pub struct ProverConfig { pub batch_size: usize, pub max_cpu_len_log: usize, pub save_inputs_on_error: bool, pub test_only: bool, -} - -pub type BlockProverInputFuture = std::pin::Pin< - Box> + Send>, ->; - -impl From for BlockProverInputFuture { - fn from(item: BlockProverInput) -> Self { - async fn _from(item: BlockProverInput) -> Result { - Ok(item) - } - Box::pin(_from(item)) - } + pub proof_output_dir: PathBuf, + pub keep_intermediate_proofs: bool, + pub block_batch_size: usize, } #[derive(Clone, Debug, Deserialize, Serialize)] @@ -50,9 +42,9 @@ impl BlockProverInput { pub async fn prove( self, - runtime: &Runtime, + runtime: Arc, previous: Option>>, - prover_config: ProverConfig, + prover_config: Arc, ) -> Result { use anyhow::Context as _; use evm_arithmetization::SegmentDataIterator; @@ -63,8 +55,8 @@ impl BlockProverInput { max_cpu_len_log, batch_size, save_inputs_on_error, - test_only: _, - } = prover_config; + .. + } = *prover_config; let block_number = self.get_block_number(); @@ -104,7 +96,7 @@ impl BlockProverInput { Directive::map(IndexedStream::from(segment_data_iterator), &seg_prove_ops) .fold(&seg_agg_ops) - .run(runtime) + .run(&runtime) .map(move |e| { e.map(|p| (idx, proof_gen::proof_types::BatchAggregatableProof::from(p))) }) @@ -114,7 +106,7 @@ impl BlockProverInput { // Fold the batch aggregated proof stream into a single proof. let final_batch_proof = Directive::fold(IndexedStream::new(batch_proof_futs), &batch_agg_ops) - .run(runtime) + .run(&runtime) .await?; if let proof_gen::proof_types::BatchAggregatableProof::Agg(proof) = final_batch_proof { @@ -131,7 +123,7 @@ impl BlockProverInput { prev, save_inputs_on_error, }) - .run(runtime) + .run(&runtime) .await?; info!("Successfully proved block {block_number}"); @@ -144,9 +136,9 @@ impl BlockProverInput { pub async fn prove_test( self, - runtime: &Runtime, + runtime: Arc, previous: Option>>, - prover_config: ProverConfig, + prover_config: Arc, ) -> Result { use std::iter::repeat; @@ -157,8 +149,8 @@ impl BlockProverInput { max_cpu_len_log, batch_size, save_inputs_on_error, - test_only: _, - } = prover_config; + .. + } = *prover_config; let block_number = self.get_block_number(); info!("Testing witness generation for block {block_number}."); @@ -185,7 +177,7 @@ impl BlockProverInput { ); simulation - .run(runtime) + .run(&runtime) .await? .try_for_each(|_| future::ok(())) .await?; @@ -208,94 +200,112 @@ impl BlockProverInput { } } +async fn prove_block( + block: BlockProverInput, + runtime: Arc, + previous_block_proof: Option>>, + prover_config: Arc, +) -> Result { + if prover_config.test_only { + block + .prove_test(runtime, previous_block_proof, prover_config) + .await + } else { + block + .prove(runtime, previous_block_proof, prover_config) + .await + } +} + /// Prove all the blocks in the input, or simulate their execution depending on /// the selected prover configuration. Return the list of block numbers that are /// proved and if the proof data is not saved to disk, return the generated /// block proofs as well. pub async fn prove( - block_prover_inputs: Vec, - runtime: &Runtime, - previous_proof: Option, - prover_config: ProverConfig, - proof_output_dir: Option, -) -> Result)>> { - let mut prev: Option>> = - previous_proof.map(|proof| Box::pin(futures::future::ok(proof)) as BoxFuture<_>); - - let mut results = FuturesOrdered::new(); - for block_prover_input in block_prover_inputs { + mut block_receiver: Receiver<(BlockProverInput, bool)>, + runtime: Arc, + checkpoint_proof: Option, + prover_config: Arc, +) -> Result<()> { + use tokio::task::JoinSet; + let mut block_counter: u64 = 0; + let mut prev_proof: Option>> = + checkpoint_proof.map(|proof| Box::pin(futures::future::ok(proof)) as BoxFuture<_>); + + let mut task_set: JoinSet< + std::result::Result, anyhow::Error>, + > = JoinSet::new(); + while let Some((block_prover_input, is_last_block)) = block_receiver.recv().await { + block_counter += 1; let (tx, rx) = oneshot::channel::(); - let proof_output_dir = proof_output_dir.clone(); - let previous_block_proof = prev.take(); - let fut = async move { - // Get the prover input data from the external source (e.g. Erigon node). - let block = block_prover_input.await?; - let block_number = block.get_block_number(); + let prover_config = prover_config.clone(); + let previous_block_proof = prev_proof.take(); + let runtime = runtime.clone(); + let block_number = block_prover_input.get_block_number(); + let _abort_handle = task_set.spawn(async move { + let block_number = block_prover_input.get_block_number(); info!("Proving block {block_number}"); - // Prove the block - let block_proof = if prover_config.test_only { - block - .prove_test(runtime, previous_block_proof, prover_config) - .then(move |proof| async move { - let proof = proof?; - let block_number = proof.b_height; - - // Write latest generated proof to disk if proof_output_dir is provided - // or alternatively return proof as function result. - let return_proof: Option = - if let Some(output_dir) = proof_output_dir { - write_proof_to_dir(output_dir, proof.clone()).await?; - None - } else { - Some(proof.clone()) - }; - - if tx.send(proof).is_err() { - anyhow::bail!("Failed to send proof"); - } - - Ok((block_number, return_proof)) - }) - .await? - } else { - block - .prove(runtime, previous_block_proof, prover_config) - .then(move |proof| async move { - let proof = proof?; - let block_number = proof.b_height; - - // Write latest generated proof to disk if proof_output_dir is provided - // or alternatively return proof as function result. - let return_proof: Option = - if let Some(output_dir) = proof_output_dir { - write_proof_to_dir(output_dir, proof.clone()).await?; - None - } else { - Some(proof.clone()) - }; - - if tx.send(proof).is_err() { - anyhow::bail!("Failed to send proof"); - } - - Ok((block_number, return_proof)) - }) - .await? - }; + let block_proof = prove_block( + block_prover_input, + runtime, + previous_block_proof, + prover_config.clone(), + ) + .then(move |proof| async move { + let proof = proof.inspect_err(|e| { + error!("failed to generate proof for block {block_number}, error {e:?}") + })?; + let block_number = proof.b_height; + + // Write proof to disk if block is last in block batch, + // or if the block is last in the interval (it contains all the necessary + // information to verify the whole sequence). If flag + // `keep_intermediate_proofs` is set, output all block proofs to disk. + let is_block_batch_finished = + block_counter % prover_config.block_batch_size as u64 == 0; + if is_last_block + || prover_config.keep_intermediate_proofs + || is_block_batch_finished + { + write_proof_to_dir(&prover_config.proof_output_dir, proof.clone()) + .await + .inspect_err(|e| error!("failed to output proof for block {block_number} to directory {e:?}"))?; + } + + if tx.send(proof).is_err() { + anyhow::bail!("Failed to send proof for block {block_number}"); + } + + Ok(block_number) + }) + .await; Ok(block_proof) + }); + prev_proof = Some(Box::pin(rx.map_err(move |e| { + error!("failed to receive previous proof for block {block_number}: {e:?}"); + anyhow::Error::new(e) + }))); + if is_last_block { + break; } - .boxed(); - prev = Some(Box::pin(rx.map_err(anyhow::Error::new))); - results.push_back(fut); } - results.try_collect().await + while let Some(res) = task_set.join_next().await { + let _proved_block_height = res???; + } + Ok(()) } /// Write the proof to the `output_dir` directory. -async fn write_proof_to_dir(output_dir: PathBuf, proof: GeneratedBlockProof) -> Result<()> { +async fn write_proof_to_dir(output_dir: &Path, proof: GeneratedBlockProof) -> Result<()> { + // Check if output directory exists, and create one if it doesn't. + if !output_dir.exists() { + info!("Created output directory {:?}", output_dir.display()); + std::fs::create_dir(output_dir)?; + } + let block_proof_file_path = generate_block_proof_file_name(&output_dir.to_str(), proof.b_height); @@ -306,8 +316,14 @@ async fn write_proof_to_dir(output_dir: PathBuf, proof: GeneratedBlockProof) -> tokio::fs::create_dir_all(parent).await?; } - let mut f = tokio::fs::File::create(block_proof_file_path).await?; + let mut f = tokio::fs::File::create(block_proof_file_path.clone()).await?; f.write_all(&proof_serialized) .await - .context("Failed to write proof to disk") + .context("Failed to write proof to disk")?; + + info!( + "Successfully wrote to disk proof file {}", + block_proof_file_path.display() + ); + Ok(()) } diff --git a/zero_bin/rpc/src/main.rs b/zero_bin/rpc/src/main.rs index 7ac9db60c..9ec2b9b62 100644 --- a/zero_bin/rpc/src/main.rs +++ b/zero_bin/rpc/src/main.rs @@ -13,6 +13,7 @@ use prover::BlockProverInput; use rpc::{retry::build_http_retry_provider, RpcParams, RpcType}; use tracing_subscriber::{prelude::*, EnvFilter}; use url::Url; +use zero_bin_common::block_interval::BlockIntervalStream; use zero_bin_common::pre_checks::check_previous_proof_and_checkpoint; use zero_bin_common::provider::CachedProvider; use zero_bin_common::version; @@ -93,8 +94,9 @@ where let block_interval = BlockInterval::Range(params.start_block..params.end_block + 1); let mut block_prover_inputs = Vec::new(); - let mut block_interval = block_interval.clone().into_bounded_stream()?; - while let Some(block_num) = block_interval.next().await { + let mut block_interval: BlockIntervalStream = block_interval.into_bounded_stream()?; + while let Some(block_interval_elem) = block_interval.next().await { + let (block_num, _is_last_block) = block_interval_elem?; let block_id = BlockId::Number(BlockNumberOrTag::Number(block_num)); // Get the prover input for particular block. let result = rpc::block_prover_input( diff --git a/zero_bin/tools/prove_rpc.sh b/zero_bin/tools/prove_rpc.sh index 941521a3c..1c7420491 100755 --- a/zero_bin/tools/prove_rpc.sh +++ b/zero_bin/tools/prove_rpc.sh @@ -17,6 +17,9 @@ export RUST_LOG=info # See also .cargo/config.toml. export RUSTFLAGS='-C target-cpu=native -Zlinker-features=-lld' +BLOCK_BATCH_SIZE="${BLOCK_BATCH_SIZE:-8}" +echo "Block batch size: $BLOCK_BATCH_SIZE" + # Circuit sizes only matter in non test_only mode. if ! [[ $8 == "test_only" ]]; then export ARITHMETIC_CIRCUIT_SIZE="16..21" @@ -76,10 +79,15 @@ if [[ $END_BLOCK == 0x* ]]; then fi # Define block interval -if [ $START_BLOCK == $END_BLOCK ]; then - BLOCK_INTERVAL=$START_BLOCK +if [ $END_BLOCK == '-' ]; then + # Follow from the start block to the end of the chain + BLOCK_INTERVAL=$START_BLOCK.. +elif [ $START_BLOCK == $END_BLOCK ]; then + # Single block + BLOCK_INTERVAL=$START_BLOCK else - BLOCK_INTERVAL=$START_BLOCK..=$END_BLOCK + # Block range + BLOCK_INTERVAL=$START_BLOCK..=$END_BLOCK fi # Print out a warning if the we're using `native` and our file descriptor limit is too low. Don't bother if we can't find `ulimit`. @@ -102,7 +110,7 @@ fi if [[ $8 == "test_only" ]]; then # test only run echo "Proving blocks ${BLOCK_INTERVAL} in a test_only mode now... (Total: ${TOT_BLOCKS})" - command='cargo r --release --bin leader -- --test-only --runtime in-memory --load-strategy on-demand rpc --rpc-type "$NODE_RPC_TYPE" --rpc-url "$NODE_RPC_URL" --block-interval $BLOCK_INTERVAL --proof-output-dir $PROOF_OUTPUT_DIR $PREV_PROOF_EXTRA_ARG --backoff "$BACKOFF" --max-retries "$RETRIES" ' + command='cargo r --release --bin leader -- --test-only --runtime in-memory --load-strategy on-demand --proof-output-dir $PROOF_OUTPUT_DIR --block-batch-size $BLOCK_BATCH_SIZE rpc --rpc-type "$NODE_RPC_TYPE" --rpc-url "$NODE_RPC_URL" --block-interval $BLOCK_INTERVAL $PREV_PROOF_EXTRA_ARG --backoff "$BACKOFF" --max-retries "$RETRIES" ' if [ "$OUTPUT_TO_TERMINAL" = true ]; then eval $command retVal=$? @@ -125,7 +133,7 @@ if [[ $8 == "test_only" ]]; then else # normal run echo "Proving blocks ${BLOCK_INTERVAL} now... (Total: ${TOT_BLOCKS})" - command='cargo r --release --bin leader -- --runtime in-memory --load-strategy on-demand rpc --rpc-type "$NODE_RPC_TYPE" --rpc-url "$3" --block-interval $BLOCK_INTERVAL --proof-output-dir $PROOF_OUTPUT_DIR $PREV_PROOF_EXTRA_ARG --backoff "$BACKOFF" --max-retries "$RETRIES" ' + command='cargo r --release --bin leader -- --runtime in-memory --load-strategy on-demand --proof-output-dir $PROOF_OUTPUT_DIR --block-batch-size $BLOCK_BATCH_SIZE rpc --rpc-type "$NODE_RPC_TYPE" --rpc-url "$3" --block-interval $BLOCK_INTERVAL $PREV_PROOF_EXTRA_ARG --backoff "$BACKOFF" --max-retries "$RETRIES" ' if [ "$OUTPUT_TO_TERMINAL" = true ]; then eval $command echo -e "Proof generation finished with result: $?" @@ -149,14 +157,15 @@ fi # If we're running the verification, we'll do it here. if [ "$RUN_VERIFICATION" = true ]; then - echo "Running the verification" + echo "Running the verification for the last proof..." proof_file_name=$PROOF_OUTPUT_DIR/b$END_BLOCK.zkproof echo "Verifying the proof of the latest block in the interval:" $proof_file_name cargo r --release --bin verifier -- -f $proof_file_name > $PROOF_OUTPUT_DIR/verify.out 2>&1 if grep -q 'All proofs verified successfully!' $PROOF_OUTPUT_DIR/verify.out; then - echo "All proofs verified successfully!"; + echo "$proof_file_name verified successfully!"; + rm $PROOF_OUTPUT_DIR/verify.out else echo "there was an issue with proof verification"; exit 1 diff --git a/zero_bin/tools/prove_stdio.sh b/zero_bin/tools/prove_stdio.sh index 64c140023..d37f84bf9 100755 --- a/zero_bin/tools/prove_stdio.sh +++ b/zero_bin/tools/prove_stdio.sh @@ -19,10 +19,13 @@ fi # Force the working directory to always be the `tools/` directory. TOOLS_DIR=$(dirname $(realpath "$0")) +PROOF_OUTPUT_DIR="${TOOLS_DIR}/proofs" -LEADER_OUT_PATH="${TOOLS_DIR}/leader.out" -PROOFS_JSON_PATH="${TOOLS_DIR}/proofs.json" -VERIFY_OUT_PATH="${TOOLS_DIR}/verify.out" +BLOCK_BATCH_SIZE="${BLOCK_BATCH_SIZE:-8}" +echo "Block batch size: $BLOCK_BATCH_SIZE" + +OUTPUT_LOG="${TOOLS_DIR}/output.log" +PROOFS_FILE_LIST="${PROOF_OUTPUT_DIR}/proof_files.json" TEST_OUT_PATH="${TOOLS_DIR}/test.out" # Configured Rayon and Tokio with rough defaults @@ -88,7 +91,7 @@ fi # proof. This is useful for quickly testing decoding and all of the # other non-proving code. if [[ $TEST_ONLY == "test_only" ]]; then - cargo run --release --bin leader -- --test-only --runtime in-memory --load-strategy on-demand stdio < $INPUT_FILE &> $TEST_OUT_PATH + cargo run --release --bin leader -- --test-only --runtime in-memory --load-strategy on-demand --block-batch-size $BLOCK_BATCH_SIZE --proof-output-dir $PROOF_OUTPUT_DIR stdio < $INPUT_FILE &> $TEST_OUT_PATH if grep -q 'All proof witnesses have been generated successfully.' $TEST_OUT_PATH; then echo -e "\n\nSuccess - Note this was just a test, not a proof" rm $TEST_OUT_PATH @@ -101,24 +104,44 @@ fi cargo build --release --jobs "$num_procs" + start_time=$(date +%s%N) -"${TOOLS_DIR}/../../target/release/leader" --runtime in-memory --load-strategy on-demand stdio < $INPUT_FILE &> $LEADER_OUT_PATH +"${TOOLS_DIR}/../../target/release/leader" --runtime in-memory --load-strategy on-demand --block-batch-size $BLOCK_BATCH_SIZE \ + --proof-output-dir $PROOF_OUTPUT_DIR stdio < $INPUT_FILE &> $OUTPUT_LOG end_time=$(date +%s%N) -tail -n 1 $LEADER_OUT_PATH > $PROOFS_JSON_PATH +set +o pipefail +cat $OUTPUT_LOG | grep "Successfully wrote to disk proof file " | awk '{print $NF}' | tee $PROOFS_FILE_LIST +if [ ! -s "$PROOFS_FILE_LIST" ]; then + echo "Proof list not generated, some error happened. For more details check the log file $OUTPUT_LOG" + exit 1 +fi + +cat $PROOFS_FILE_LIST | while read proof_file; +do + echo "Verifying proof file $proof_file" + verify_file=$PROOF_OUTPUT_DIR/verify_$(basename $proof_file).out + "${TOOLS_DIR}/../../target/release/verifier" -f $proof_file | tee $verify_file + if grep -q 'All proofs verified successfully!' $verify_file; then + echo "Proof verification for file $proof_file successful"; + rm $verify_file # we keep the generated proof for potential reuse + else + echo "there was an issue with proof verification"; + exit 1 + fi +done + +duration_ns=$((end_time - start_time)) +duration_sec=$(echo "$duration_ns / 1000000000" | bc -l) + +echo "Success!" +echo "Proving duration:" $duration_sec " seconds" +echo "Note, this duration is inclusive of circuit handling and overall process initialization"; + +# Clean up in case of success +rm $OUTPUT_LOG + -"${TOOLS_DIR}/../../target/release/verifier" -f $PROOFS_JSON_PATH | tee $VERIFY_OUT_PATH -if grep -q 'All proofs verified successfully!' $VERIFY_OUT_PATH; then - duration_ns=$((end_time - start_time)) - duration_sec=$(echo "$duration_ns / 1000000000" | bc -l) - echo "Success!" - echo "Duration:" $duration_sec " seconds" - echo "Note, this duration is inclusive of circuit handling and overall process initialization"; - rm $LEADER_OUT_PATH $VERIFY_OUT_PATH # we keep the generated proof for potential reuse -else - echo "there was an issue with proof verification"; - exit 1 -fi