-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: job isolation done #204
base: main
Are you sure you want to change the base?
Conversation
@@ -2,3 +2,10 @@ pub const BLOB_DATA_FILE_NAME: &str = "blob_data.txt"; | |||
pub const SNOS_OUTPUT_FILE_NAME: &str = "snos_output.json"; | |||
pub const PROGRAM_OUTPUT_FILE_NAME: &str = "program_output.txt"; | |||
pub const CAIRO_PIE_FILE_NAME: &str = "cairo_pie.zip"; | |||
|
|||
pub const JOB_METADATA_CAIRO_PIE_PATH: &str = "cairo_pie_path"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we've a lot of metadata constants here. should we reuse this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -133,7 +133,11 @@ impl Job for DaJob { | |||
// data transformation on the data | |||
tracing::trace!(job_id = ?job.id, "Applied FFT transformation"); | |||
|
|||
store_blob_data(transformed_data.clone(), block_no, config.clone()).await?; | |||
let blob_data_path = format!("{}/{}", block_no, BLOB_DATA_FILE_NAME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's change to worker specifying full path as discussed
@@ -196,17 +218,17 @@ impl SnosJob { | |||
SnosError::SnosOutputUnstorable { internal_id: internal_id.to_string(), message: e.to_string() } | |||
})?; | |||
|
|||
let program_output_key = format!("{block_number}/{PROGRAM_OUTPUT_FILE_NAME}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will come from metadata now?
|
||
let program_output = storage_client.get_data(&key).await.map_err(|e| JobError::Other(OtherError(e)))?; | ||
// Get the array of program output paths from metadata | ||
let program_paths: Vec<String> = serde_json::from_str( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- pass path directly
- as discussed, should we use config structs for metadata fields
ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); | ||
Ok(Some(job)) | ||
// Add debug logging to see the raw document | ||
tracing::info!(raw_document = ?doc, "Raw document from MongoDB"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tracing::info!(raw_document = ?doc, "Raw document from MongoDB"); | |
tracing::debug!(raw_document = ?doc, "Raw document from MongoDB"); |
tracing::info!(raw_document = ?doc, "Raw document from MongoDB"); | ||
|
||
// Try to deserialize and log any errors | ||
match mongodb::bson::from_document::<JobItem>(doc.clone()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're cloning here so that we can log in error later? if yes, should we explain that's why we're cloning it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is also types to be precise. maybe we should add this to types.rs OR create a types folder with jobs.rs and metadata.rs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, something like :
src/
types/
mod.rs // Re-exports all types
job.rs // Job-related types
common.rs // Common types
pub struct ProvingMetadata { | ||
// Required fields | ||
pub block_number: u64, | ||
pub cairo_pie_path: Option<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it's required it should not be optional right?
why does proving job care about block_number
?
cross_verify
can be renamed to ensure_on_chain_registration
or something more understandable
let's delete verification_key_path
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make an input field which is
enum Input {
Pie()...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can remove snos_fact
and club it with ensure_on_chain_registration
using Optional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also add comments to explain what are inputs in metadata and what is added by jobs on the fly
|
||
// State tracking | ||
pub last_failed_block_no: Option<u64>, | ||
pub tx_hashes: Vec<String>, // key: attempt_no, value: comma-separated tx hashes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub tx_hashes: Vec<String>, // key: attempt_no, value: comma-separated tx hashes | |
pub tx_hashes: Vec<String> |
block_numbers = block_numbers.into_iter().filter(|&block| block >= last_failed_block).collect::<Vec<u64>>(); | ||
} | ||
// Filter block numbers if there was a previous failure | ||
let block_numbers = if let JobSpecificMetadata::StateUpdate(state_metadata) = &job.metadata.specific { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can move this part outside
.update_state_for_block(config.clone(), *block_no, snos, nonce) | ||
.await | ||
.map_err(|e| { | ||
let snos = self.fetch_snos_for_block(i, config.clone(), job).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get the snos path here itsefl
.await | ||
.map_err(|e| { | ||
let snos = self.fetch_snos_for_block(i, config.clone(), job).await?; | ||
let txn_hash = match self.update_state_for_block(config.clone(), *block_no, i, snos, nonce, job).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
.unwrap(); | ||
|
||
// Update metadata in a single place | ||
if let JobSpecificMetadata::StateUpdate(ref mut state_metadata) = job.metadata.specific { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here as well
// Fetching the blob data (stored in remote storage during DA job) for a particular block | ||
// pub async fn fetch_program_data_for_block(block_number: u64, config: Arc<Config>, job: &JobItem) | ||
// -> color_eyre::Result<Vec<[u8; 32]>> { let storage_client = config.storage(); | ||
// let key = block_number.to_string() + "/" + PROGRAM_OUTPUT_FILE_NAME; | ||
// let blob_data = storage_client.get_data(&key).await?; | ||
// let transformed_blob_vec_u8 = bytes_to_vec_u8(blob_data.as_ref())?; | ||
// Ok(transformed_blob_vec_u8) | ||
// } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete
let proving_metadata = match &proving_job.metadata.specific { | ||
JobSpecificMetadata::Proving(metadata) => metadata, | ||
_ => { | ||
tracing::error!( | ||
job_id = %proving_job.internal_id, | ||
"Invalid metadata type for proving job" | ||
); | ||
continue; | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can just use internal id here
Ok(_) => tracing::info!(block_id = %job.internal_id, "Successfully created new proving job"), | ||
for snos_job in successful_snos_jobs { | ||
// Extract SNOS metadata | ||
let snos_metadata = match &snos_job.metadata.specific { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can try some TryInto implementation to make it cleaner
@@ -96,26 +94,73 @@ impl Worker for UpdateStateWorker { | |||
} | |||
} | |||
None => { | |||
if blocks_to_process[0] != 0 { | |||
if blocks_to_process[0] != 0 && blocks_to_process[0] != 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to remove this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please feel free to make separate issues for comments you don't feel fit to resolve in this PR, please also add the issue link in the respective comment.
Note: I have skipped through reading the tests this time
@@ -2,3 +2,10 @@ pub const BLOB_DATA_FILE_NAME: &str = "blob_data.txt"; | |||
pub const SNOS_OUTPUT_FILE_NAME: &str = "snos_output.json"; | |||
pub const PROGRAM_OUTPUT_FILE_NAME: &str = "program_output.txt"; | |||
pub const CAIRO_PIE_FILE_NAME: &str = "cairo_pie.zip"; | |||
|
|||
pub const JOB_METADATA_CAIRO_PIE_PATH: &str = "cairo_pie_path"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
match cursor.try_next().await? { | ||
Some(doc) => { | ||
let job: JobItem = mongodb::bson::from_document(doc)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we should use .map_error ?
let job: JobItem = mongodb::bson::from_document(doc).map_err(|e| {
tracing::error!(error = %e, document = ?doc, "Failed to deserialize document into JobItem");
e
})?;
pub verification_retry_attempt_no: u64, | ||
/// Timestamp when job processing completed | ||
#[serde(with = "chrono::serde::ts_seconds_option")] | ||
pub process_completed_at: Option<DateTime<Utc>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, let's add process_started_at
and verification_started_at
as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can leave out the same for create jobs since it's always almost instantaneous and we usually don't care about the time it takes
); | ||
|
||
// Get DA-specific metadata | ||
let mut da_metadata: DaMetadata = job.metadata.specific.clone().try_into().map_err(|e| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure of this idea of "specific" and "common" inside metadata.
What is your opinion on two fields within the job itself ?
i.e
instead of metadata.specific
and metadata.common
we should have
commonMetaData
and JobMetadata
directly in the job,
Do feel the current structure is necessary ?
if current_blob_length > max_blob_per_txn { | ||
tracing::warn!(job_id = ?job.id, current_blob_length = current_blob_length, max_blob_per_txn = max_blob_per_txn, "Exceeded maximum number of blobs per transaction"); | ||
tracing::warn!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a error
only if we are throwing an error after this, isn't it ?
Ok(transformed_blob_vec_u8) | ||
// Get the path for this block | ||
let path = | ||
blob_data_paths.get(block_index).ok_or_else(|| eyre!("Blob data path not found for index {}", block_index))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are we gaining by passing the index
and the blob_data_paths
down the props ?
We are sending them
- from process_job
- to update_state_for_block
- to fetch_program_data_for_block
and are just fetching the value at the index ?
IMO it seems simpler to just fetch the block_number
inside process_job
and send here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also follows the same for :
- fetch_snos_for_block
- fetch_program_output_for_block
let tx_inclusion_status = | ||
settlement_client.verify_tx_inclusion(tx_hash).await.map_err(|e| JobError::Other(OtherError(e)))?; | ||
|
||
match tx_inclusion_status { | ||
SettlementVerificationStatus::Rejected(_) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick : Why are we not using recursion here!!! I just realised it haha!
Could be crazy if we implement it.
@@ -336,36 +412,44 @@ impl StateUpdateJob { | |||
} | |||
|
|||
/// Retrieves the SNOS output for the corresponding block. | |||
async fn fetch_snos_for_block(&self, block_no: u64, config: Arc<Config>) -> Result<StarknetOsOutput, JobError> { | |||
async fn fetch_snos_for_block( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- fetch_snos_for_block
- fetch_program_output_for_block
No need for these functions to be a part of this impl,
they are not usingself
anywhere, we can move these toutils.rs
let fact = B256::from_str(fact).map_err(|e| ProverClientError::FailedToConvertFact(e.to_string()))?; | ||
if self.fact_checker.is_valid(&fact).await? { | ||
Ok(TaskStatus::Succeeded) | ||
if cross_verify { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can make these nested conditions better,
- by using a guard clause for when cross_verify is false.
- by using a guard clause for when fact is None.
Here's a reply from claude to help :
if !cross_verify {
tracing::debug!("Skipping cross-verification as it's disabled");
return Ok(TaskStatus::Succeeded);
}
let Some(fact_str) = fact else {
return Ok(TaskStatus::Failed("Cross verification enabled but no fact provided".to_string()));
};
let fact = B256::from_str(&fact_str)
.map_err(|e| ProverClientError::FailedToConvertFact(e.to_string()))?;
tracing::debug!(fact = %hex::encode(&fact), "Cross-verifying fact on chain");
if self.fact_checker.is_valid(&fact).await? {
Ok(TaskStatus::Succeeded)
} else {
Ok(TaskStatus::Failed(format!(
"Fact {} is not valid or not registered",
hex::encode(fact)
)))
}
"Cairo PIE task status: ONCHAIN and fact is valid." | ||
); | ||
CairoJobStatus::ONCHAIN => match fact { | ||
Some(fact_str) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above,
we can use guard clause instead of match fact
files and information needed by each job should be provided by the worker of the respective job, no inter-job dependency