-
Notifications
You must be signed in to change notification settings - Fork 90
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
First working version of SP1 Distributed Prover
Optimized prototype Remove unecessary complexity on async send/receive Fix the commitment of shards public values Setting the shard_batch_size to 1 and processing multiple checkpoints in the workers Send only the first checkpoint and reexecute the runtime for the next ones Make the worker computation stateless Share the shard_batch_size and shard_size with workers Make worker able to receive multiple requests Redistribute a request when a worker fails Reducing the size of the shard public values Better request data structure Keep a single instance of program and machine Remove debugs about time duration
- Loading branch information
Showing
23 changed files
with
1,311 additions
and
137 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
use crate::ProverState; | ||
use raiko_lib::prover::{ProverError, WorkerError}; | ||
use sp1_driver::{ | ||
sp1_specifics::{Challenger, CoreSC, Machine, Program, ProvingKey, RiscvAir}, | ||
RequestData, WorkerProtocol, WorkerRequest, WorkerResponse, WorkerSocket, ELF, | ||
}; | ||
use tokio::net::TcpListener; | ||
use tracing::{error, info, warn}; | ||
|
||
pub async fn serve(state: ProverState) { | ||
if state.opts.orchestrator_address.is_some() { | ||
tokio::spawn(listen_worker(state)); | ||
} | ||
} | ||
|
||
async fn listen_worker(state: ProverState) { | ||
info!( | ||
"Listening as a SP1 worker on: {}", | ||
state.opts.worker_address | ||
); | ||
|
||
let listener = TcpListener::bind(state.opts.worker_address).await.unwrap(); | ||
|
||
loop { | ||
let Ok((socket, addr)) = listener.accept().await else { | ||
error!("Error while accepting connection from orchestrator: Closing socket"); | ||
|
||
return; | ||
}; | ||
|
||
if let Some(orchestrator_address) = &state.opts.orchestrator_address { | ||
if addr.ip().to_string() != *orchestrator_address { | ||
warn!("Unauthorized orchestrator connection from: {}", addr); | ||
|
||
continue; | ||
} | ||
} | ||
|
||
// We purposely don't spawn the task here, as we want to block to limit the number | ||
// of concurrent connections to one. | ||
if let Err(e) = handle_worker_socket(WorkerSocket::from_stream(socket)).await { | ||
error!("Error while handling worker socket: {:?}", e); | ||
} | ||
} | ||
} | ||
|
||
async fn handle_worker_socket(mut socket: WorkerSocket) -> Result<(), ProverError> { | ||
let program = Program::from(ELF); | ||
let config = CoreSC::default(); | ||
|
||
let machine = RiscvAir::machine(config.clone()); | ||
let (pk, _vk) = machine.setup(&program); | ||
|
||
while let Ok(protocol) = socket.receive().await { | ||
match protocol { | ||
WorkerProtocol::Request(request) => match request { | ||
WorkerRequest::Ping => handle_ping(&mut socket).await?, | ||
WorkerRequest::Commit(request_data) => { | ||
handle_commit(&mut socket, &program, &machine, request_data).await? | ||
} | ||
WorkerRequest::Prove { | ||
request_data, | ||
challenger, | ||
} => { | ||
handle_prove( | ||
&mut socket, | ||
&program, | ||
&machine, | ||
&pk, | ||
request_data, | ||
challenger, | ||
) | ||
.await? | ||
} | ||
}, | ||
_ => Err(WorkerError::InvalidRequest)?, | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
async fn handle_ping(socket: &mut WorkerSocket) -> Result<(), WorkerError> { | ||
socket | ||
.send(WorkerProtocol::Response(WorkerResponse::Pong)) | ||
.await | ||
} | ||
|
||
async fn handle_commit( | ||
socket: &mut WorkerSocket, | ||
program: &Program, | ||
machine: &Machine, | ||
request_data: RequestData, | ||
) -> Result<(), WorkerError> { | ||
let (commitments, shards_public_values) = sp1_driver::sp1_specifics::commit( | ||
program, | ||
machine, | ||
request_data.checkpoint, | ||
request_data.nb_checkpoints, | ||
request_data.public_values, | ||
request_data.shard_batch_size, | ||
request_data.shard_size, | ||
)?; | ||
|
||
socket | ||
.send(WorkerProtocol::Response(WorkerResponse::Commitment { | ||
commitments, | ||
shards_public_values, | ||
})) | ||
.await | ||
} | ||
|
||
async fn handle_prove( | ||
socket: &mut WorkerSocket, | ||
program: &Program, | ||
machine: &Machine, | ||
pk: &ProvingKey, | ||
request_data: RequestData, | ||
challenger: Challenger, | ||
) -> Result<(), WorkerError> { | ||
let proof = sp1_driver::sp1_specifics::prove( | ||
program, | ||
machine, | ||
pk, | ||
request_data.checkpoint, | ||
request_data.nb_checkpoints, | ||
request_data.public_values, | ||
request_data.shard_batch_size, | ||
request_data.shard_size, | ||
challenger, | ||
)?; | ||
|
||
socket | ||
.send(WorkerProtocol::Response(WorkerResponse::Proof(proof))) | ||
.await | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -71,4 +71,4 @@ std = [ | |
sgx = [] | ||
sp1 = [] | ||
risc0 = [] | ||
sp1-cycle-tracker = [] | ||
sp1-cycle-tracker = [] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
mod prover; | ||
pub mod sp1_specifics; | ||
mod worker; | ||
|
||
pub use prover::Sp1DistributedProver; | ||
pub use worker::{ | ||
RequestData, WorkerEnvelope, WorkerPool, WorkerProtocol, WorkerRequest, WorkerResponse, | ||
WorkerSocket, | ||
}; |
Oops, something went wrong.