Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assign specific jobs to dedicated workers #564

Merged
merged 20 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
471 changes: 250 additions & 221 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ num-bigint = "0.4.5"
num-traits = "0.2.19"
nunny = "0.2.1"
once_cell = "1.19.0"
paladin-core = "0.4.2"
paladin-core = { git = "https://github.com/0xPolygonZero/paladin.git", branch = "arpit/507-2" }
temaniarpit27 marked this conversation as resolved.
Show resolved Hide resolved
parking_lot = "0.12.3"
paste = "1.0.15"
pest = "2.7.10"
Expand Down
1 change: 1 addition & 0 deletions zero_bin/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ clap = { workspace = true }
futures = { workspace = true }
lru = { workspace = true }
once_cell = { workspace = true }
paladin-core = { workspace = true }
plonky2 = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions zero_bin/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod debug_utils;
pub mod fs;
pub mod parsing;
pub mod pre_checks;
pub mod proof_runtime;
pub mod prover_state;
pub mod provider;
pub mod version;
Expand Down
6 changes: 6 additions & 0 deletions zero_bin/common/src/proof_runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use paladin::runtime::Runtime;

pub struct ProofRuntime {
pub block_proof_runtime: Runtime,
pub segment_proof_runtime: Runtime,
}
14 changes: 13 additions & 1 deletion zero_bin/leader/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::path::PathBuf;

use alloy::transports::http::reqwest::Url;
use clap::{Parser, Subcommand, ValueHint};
use clap::{Parser, Subcommand, ValueEnum, ValueHint};
use prover::cli::CliProverConfig;
use rpc::RpcType;
use zero_bin_common::prover_state::cli::CliProverStateConfig;

const WORKER_HELP_HEADING: &str = "Worker Config options";

/// zero-bin leader config
#[derive(Parser)]
pub(crate) struct Cli {
Expand All @@ -22,6 +24,16 @@ pub(crate) struct Cli {
// mode.
#[clap(flatten)]
pub(crate) prover_state_config: CliProverStateConfig,

// Mode to use for worker for setup (split or unified)
temaniarpit27 marked this conversation as resolved.
Show resolved Hide resolved
#[arg(long = "worker-run-mode", help_heading = WORKER_HELP_HEADING, value_enum, default_value = "unified")]
pub(crate) worker_run_mode: WorkerRunMode,
}

#[derive(ValueEnum, Clone, PartialEq, Debug)]
pub enum WorkerRunMode {
Split,
Unified,
}

#[derive(Subcommand)]
Expand Down
11 changes: 6 additions & 5 deletions zero_bin/leader/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use std::sync::Arc;
use alloy::rpc::types::{BlockId, BlockNumberOrTag, BlockTransactionsKind};
use alloy::transports::http::reqwest::Url;
use anyhow::{anyhow, Result};
use paladin::runtime::Runtime;
use proof_gen::proof_types::GeneratedBlockProof;
use prover::{BlockProverInput, ProverConfig};
use rpc::{retry::build_http_retry_provider, RpcType};
use tokio::sync::mpsc;
use tracing::info;
use zero_bin_common::block_interval::{BlockInterval, BlockIntervalStream};
use zero_bin_common::pre_checks::check_previous_proof_and_checkpoint;
use zero_bin_common::proof_runtime::ProofRuntime;

#[derive(Debug)]
pub struct RpcParams {
Expand All @@ -30,7 +30,7 @@ pub struct LeaderConfig {

/// The main function for the client.
pub(crate) async fn client_main(
runtime: Arc<Runtime>,
proof_runtime: Arc<ProofRuntime>,
rpc_params: RpcParams,
block_interval: BlockInterval,
mut leader_config: LeaderConfig,
Expand Down Expand Up @@ -67,10 +67,10 @@ pub(crate) async fn client_main(
let test_only = leader_config.prover_config.test_only;

// Run proving task
let runtime_ = runtime.clone();
let proof_runtime_ = proof_runtime.clone();
let proving_task = tokio::spawn(prover::prove(
block_rx,
runtime_,
proof_runtime_,
leader_config.previous_proof.take(),
Arc::new(leader_config.prover_config),
));
Expand Down Expand Up @@ -116,7 +116,8 @@ pub(crate) async fn client_main(
}
}

runtime.close().await?;
proof_runtime.block_proof_runtime.close().await?;
proof_runtime.segment_proof_runtime.close().await?;

if test_only {
info!("All proof witnesses have been generated successfully.");
Expand Down
15 changes: 6 additions & 9 deletions zero_bin/leader/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ use std::{net::SocketAddr, path::PathBuf, sync::Arc};
use alloy::primitives::U256;
use anyhow::{bail, Result};
use axum::{http::StatusCode, routing::post, Json, Router};
use paladin::runtime::Runtime;
use proof_gen::proof_types::GeneratedBlockProof;
use prover::{BlockProverInput, ProverConfig};
use serde::{Deserialize, Serialize};
use serde_json::to_writer;
use tracing::{debug, error, info};
use zero_bin_common::proof_runtime::ProofRuntime;

/// The main function for the HTTP mode.
pub(crate) async fn http_main(
runtime: Arc<Runtime>,
proof_runtime: Arc<ProofRuntime>,
port: u16,
output_dir: PathBuf,
prover_config: Arc<ProverConfig>,
Expand All @@ -22,10 +22,7 @@ pub(crate) async fn http_main(

let app = Router::new().route(
"/prove",
post({
let runtime = runtime.clone();
move |body| prove(body, runtime, output_dir.clone(), prover_config)
}),
post(move |body| prove(body, proof_runtime, output_dir.clone(), prover_config)),
);
let listener = tokio::net::TcpListener::bind(&addr).await?;
Ok(axum::serve(listener, app).await?)
Expand Down Expand Up @@ -62,7 +59,7 @@ struct HttpProverInput {

async fn prove(
Json(payload): Json<HttpProverInput>,
runtime: Arc<Runtime>,
proof_runtime: Arc<ProofRuntime>,
output_dir: PathBuf,
prover_config: Arc<ProverConfig>,
) -> StatusCode {
Expand All @@ -74,7 +71,7 @@ async fn prove(
payload
.prover_input
.prove_test(
runtime,
proof_runtime,
payload.previous.map(futures::future::ok),
prover_config,
)
Expand All @@ -83,7 +80,7 @@ async fn prove(
payload
.prover_input
.prove(
runtime,
proof_runtime,
payload.previous.map(futures::future::ok),
prover_config,
)
Expand Down
39 changes: 34 additions & 5 deletions zero_bin/leader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use proof_gen::proof_types::GeneratedBlockProof;
use prover::ProverConfig;
use tracing::{info, warn};
use zero_bin_common::{
block_interval::BlockInterval, prover_state::persistence::set_circuit_cache_dir_env_if_not_set,
block_interval::BlockInterval, proof_runtime::ProofRuntime,
prover_state::persistence::set_circuit_cache_dir_env_if_not_set,
};
use zero_bin_common::{prover_state::persistence::CIRCUIT_VERSION, version};

Expand All @@ -37,6 +38,10 @@ fn get_previous_proof(path: Option<PathBuf>) -> Result<Option<GeneratedBlockProo
Ok(Some(proof))
}

const SEGMENT_PROOF_ROUTING_KEY: &str = "segment-proof";
const BLOCK_PROOF_ROUTING_KEY: &str = "block-proof";
const DEFAULT_ROUTING_KEY: &str = paladin::runtime::DEFAULT_ROUTING_KEY;
temaniarpit27 marked this conversation as resolved.
Show resolved Hide resolved

#[tokio::main]
async fn main() -> Result<()> {
load_dotenvy_vars_if_present();
Expand All @@ -56,11 +61,35 @@ async fn main() -> Result<()> {

let args = cli::Cli::parse();

let mut block_proof_routing_key = DEFAULT_ROUTING_KEY.to_string();
let mut segment_proof_routing_key = DEFAULT_ROUTING_KEY.to_string();
if args.worker_run_mode == cli::WorkerRunMode::Split {
temaniarpit27 marked this conversation as resolved.
Show resolved Hide resolved
// If we're running in split mode, we need to set the routing key for the
// block proof and segment proof.
info!("Workers running in split mode");
block_proof_routing_key = BLOCK_PROOF_ROUTING_KEY.to_string();
segment_proof_routing_key = SEGMENT_PROOF_ROUTING_KEY.to_string();
}

let mut block_proof_paladin_args = args.paladin.clone();
temaniarpit27 marked this conversation as resolved.
Show resolved Hide resolved
block_proof_paladin_args.task_bus_routing_key = Some(block_proof_routing_key);

let mut segment_proof_paladin_args = args.paladin.clone();
segment_proof_paladin_args.task_bus_routing_key = Some(segment_proof_routing_key);

let block_proof_runtime = Runtime::from_config(&block_proof_paladin_args, register()).await?;
let segment_proof_runtime =
Runtime::from_config(&segment_proof_paladin_args, register()).await?;
if let Command::Clean = args.command {
return zero_bin_common::prover_state::persistence::delete_all();
}
temaniarpit27 marked this conversation as resolved.
Show resolved Hide resolved

let runtime = Arc::new(Runtime::from_config(&args.paladin, register()).await?);
let proof_runtime = ProofRuntime {
block_proof_runtime,
segment_proof_runtime,
};

let proof_runtime = Arc::new(proof_runtime);
let prover_config: ProverConfig = args.prover_config.into();

// If not in test_only mode and running in emulation mode, we'll need to
Expand All @@ -76,7 +105,7 @@ async fn main() -> Result<()> {
match args.command {
Command::Stdio { previous_proof } => {
let previous_proof = get_previous_proof(previous_proof)?;
stdio::stdio_main(runtime, previous_proof, Arc::new(prover_config)).await?;
stdio::stdio_main(proof_runtime, previous_proof, Arc::new(prover_config)).await?;
}
Command::Http { port, output_dir } => {
// check if output_dir exists, is a directory, and is writable
Expand All @@ -88,7 +117,7 @@ async fn main() -> Result<()> {
panic!("output-dir is not a writable directory");
}

http::http_main(runtime, port, output_dir, Arc::new(prover_config)).await?;
http::http_main(proof_runtime, port, output_dir, Arc::new(prover_config)).await?;
}
Command::Rpc {
rpc_url,
Expand All @@ -105,7 +134,7 @@ async fn main() -> Result<()> {

info!("Proving interval {block_interval}");
client_main(
runtime,
proof_runtime,
RpcParams {
rpc_url,
rpc_type,
Expand Down
16 changes: 11 additions & 5 deletions zero_bin/leader/src/stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ use std::io::Read;
use std::sync::Arc;

use anyhow::{anyhow, Result};
use paladin::runtime::Runtime;
use proof_gen::proof_types::GeneratedBlockProof;
use prover::{BlockProverInput, ProverConfig};
use tokio::sync::mpsc;
use tracing::info;
use zero_bin_common::proof_runtime::ProofRuntime;

/// The main function for the stdio mode.
pub(crate) async fn stdio_main(
runtime: Arc<Runtime>,
proof_runtime: Arc<ProofRuntime>,
previous: Option<GeneratedBlockProof>,
prover_config: Arc<ProverConfig>,
) -> Result<()> {
Expand All @@ -25,9 +25,14 @@ pub(crate) async fn stdio_main(
let (block_tx, block_rx) =
mpsc::channel::<(BlockProverInput, bool)>(zero_bin_common::BLOCK_CHANNEL_SIZE);

let runtime_ = runtime.clone();
let proof_runtime_ = proof_runtime.clone();
let prover_config_ = prover_config.clone();
let proving_task = tokio::spawn(prover::prove(block_rx, runtime_, previous, prover_config_));
let proving_task = tokio::spawn(prover::prove(
block_rx,
proof_runtime_,
previous,
prover_config_,
));

let interval_len = block_prover_inputs.len();
for (index, block_prover_input) in block_prover_inputs.into_iter().enumerate() {
Expand All @@ -49,7 +54,8 @@ pub(crate) async fn stdio_main(
}
}

runtime.close().await?;
proof_runtime.block_proof_runtime.close().await?;
proof_runtime.segment_proof_runtime.close().await?;

if prover_config.test_only {
info!("All proof witnesses have been generated successfully.");
Expand Down
Loading
Loading