Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: job isolation done #204

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open

refactor: job isolation done #204

wants to merge 13 commits into from

Conversation

Mohiiit
Copy link
Contributor

@Mohiiit Mohiiit commented Jan 22, 2025

files and information needed by each job should be provided by the worker of the respective job, no inter-job dependency

@@ -2,3 +2,10 @@ pub const BLOB_DATA_FILE_NAME: &str = "blob_data.txt";
pub const SNOS_OUTPUT_FILE_NAME: &str = "snos_output.json";
pub const PROGRAM_OUTPUT_FILE_NAME: &str = "program_output.txt";
pub const CAIRO_PIE_FILE_NAME: &str = "cairo_pie.zip";

pub const JOB_METADATA_CAIRO_PIE_PATH: &str = "cairo_pie_path";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we've a lot of metadata constants here. should we reuse this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -133,7 +133,11 @@ impl Job for DaJob {
// data transformation on the data
tracing::trace!(job_id = ?job.id, "Applied FFT transformation");

store_blob_data(transformed_data.clone(), block_no, config.clone()).await?;
let blob_data_path = format!("{}/{}", block_no, BLOB_DATA_FILE_NAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's change to worker specifying full path as discussed

@@ -196,17 +218,17 @@ impl SnosJob {
SnosError::SnosOutputUnstorable { internal_id: internal_id.to_string(), message: e.to_string() }
})?;

let program_output_key = format!("{block_number}/{PROGRAM_OUTPUT_FILE_NAME}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will come from metadata now?


let program_output = storage_client.get_data(&key).await.map_err(|e| JobError::Other(OtherError(e)))?;
// Get the array of program output paths from metadata
let program_paths: Vec<String> = serde_json::from_str(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. pass path directly
  2. as discussed, should we use config structs for metadata fields

ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes);
Ok(Some(job))
// Add debug logging to see the raw document
tracing::info!(raw_document = ?doc, "Raw document from MongoDB");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tracing::info!(raw_document = ?doc, "Raw document from MongoDB");
tracing::debug!(raw_document = ?doc, "Raw document from MongoDB");

tracing::info!(raw_document = ?doc, "Raw document from MongoDB");

// Try to deserialize and log any errors
match mongodb::bson::from_document::<JobItem>(doc.clone()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're cloning here so that we can log in error later? if yes, should we explain that's why we're cloning it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is also types to be precise. maybe we should add this to types.rs OR create a types folder with jobs.rs and metadata.rs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, something like :

src/
  types/
    mod.rs          // Re-exports all types
    job.rs         // Job-related types
    common.rs       // Common types

pub struct ProvingMetadata {
// Required fields
pub block_number: u64,
pub cairo_pie_path: Option<String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's required it should not be optional right?
why does proving job care about block_number?
cross_verify can be renamed to ensure_on_chain_registration or something more understandable
let's delete verification_key_path

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make an input field which is

enum Input {
Pie()...
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove snos_fact and club it with ensure_on_chain_registration using Optional

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add comments to explain what are inputs in metadata and what is added by jobs on the fly


// State tracking
pub last_failed_block_no: Option<u64>,
pub tx_hashes: Vec<String>, // key: attempt_no, value: comma-separated tx hashes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub tx_hashes: Vec<String>, // key: attempt_no, value: comma-separated tx hashes
pub tx_hashes: Vec<String>

block_numbers = block_numbers.into_iter().filter(|&block| block >= last_failed_block).collect::<Vec<u64>>();
}
// Filter block numbers if there was a previous failure
let block_numbers = if let JobSpecificMetadata::StateUpdate(state_metadata) = &job.metadata.specific {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can move this part outside

.update_state_for_block(config.clone(), *block_no, snos, nonce)
.await
.map_err(|e| {
let snos = self.fetch_snos_for_block(i, config.clone(), job).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get the snos path here itsefl

.await
.map_err(|e| {
let snos = self.fetch_snos_for_block(i, config.clone(), job).await?;
let txn_hash = match self.update_state_for_block(config.clone(), *block_no, i, snos, nonce, job).await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

.unwrap();

// Update metadata in a single place
if let JobSpecificMetadata::StateUpdate(ref mut state_metadata) = job.metadata.specific {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here as well

Comment on lines 37 to 44
// Fetching the blob data (stored in remote storage during DA job) for a particular block
// pub async fn fetch_program_data_for_block(block_number: u64, config: Arc<Config>, job: &JobItem)
// -> color_eyre::Result<Vec<[u8; 32]>> { let storage_client = config.storage();
// let key = block_number.to_string() + "/" + PROGRAM_OUTPUT_FILE_NAME;
// let blob_data = storage_client.get_data(&key).await?;
// let transformed_blob_vec_u8 = bytes_to_vec_u8(blob_data.as_ref())?;
// Ok(transformed_blob_vec_u8)
// }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete

Comment on lines 31 to 40
let proving_metadata = match &proving_job.metadata.specific {
JobSpecificMetadata::Proving(metadata) => metadata,
_ => {
tracing::error!(
job_id = %proving_job.internal_id,
"Invalid metadata type for proving job"
);
continue;
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can just use internal id here

Ok(_) => tracing::info!(block_id = %job.internal_id, "Successfully created new proving job"),
for snos_job in successful_snos_jobs {
// Extract SNOS metadata
let snos_metadata = match &snos_job.metadata.specific {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can try some TryInto implementation to make it cleaner

@@ -96,26 +94,73 @@ impl Worker for UpdateStateWorker {
}
}
None => {
if blocks_to_process[0] != 0 {
if blocks_to_process[0] != 0 && blocks_to_process[0] != 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to remove this

@coveralls
Copy link

coveralls commented Jan 31, 2025

Coverage Status

coverage: 45.386% (-21.7%) from 67.086%
when pulling 04e9324 on refactor/job-isolation
into afb9afe on main.

Copy link
Contributor

@heemankv heemankv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please feel free to make separate issues for comments you don't feel fit to resolve in this PR, please also add the issue link in the respective comment.

Note: I have skipped through reading the tests this time

@@ -2,3 +2,10 @@ pub const BLOB_DATA_FILE_NAME: &str = "blob_data.txt";
pub const SNOS_OUTPUT_FILE_NAME: &str = "snos_output.json";
pub const PROGRAM_OUTPUT_FILE_NAME: &str = "program_output.txt";
pub const CAIRO_PIE_FILE_NAME: &str = "cairo_pie.zip";

pub const JOB_METADATA_CAIRO_PIE_PATH: &str = "cairo_pie_path";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

match cursor.try_next().await? {
Some(doc) => {
let job: JobItem = mongodb::bson::from_document(doc)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should use .map_error ?

let job: JobItem = mongodb::bson::from_document(doc).map_err(|e| {
    tracing::error!(error = %e, document = ?doc, "Failed to deserialize document into JobItem");
    e
})?;

pub verification_retry_attempt_no: u64,
/// Timestamp when job processing completed
#[serde(with = "chrono::serde::ts_seconds_option")]
pub process_completed_at: Option<DateTime<Utc>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, let's add process_started_at and verification_started_at as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can leave out the same for create jobs since it's always almost instantaneous and we usually don't care about the time it takes

);

// Get DA-specific metadata
let mut da_metadata: DaMetadata = job.metadata.specific.clone().try_into().map_err(|e| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure of this idea of "specific" and "common" inside metadata.
What is your opinion on two fields within the job itself ?
i.e
instead of metadata.specific and metadata.common we should have
commonMetaData and JobMetadata directly in the job,
Do feel the current structure is necessary ?

if current_blob_length > max_blob_per_txn {
tracing::warn!(job_id = ?job.id, current_blob_length = current_blob_length, max_blob_per_txn = max_blob_per_txn, "Exceeded maximum number of blobs per transaction");
tracing::warn!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a error only if we are throwing an error after this, isn't it ?

Ok(transformed_blob_vec_u8)
// Get the path for this block
let path =
blob_data_paths.get(block_index).ok_or_else(|| eyre!("Blob data path not found for index {}", block_index))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are we gaining by passing the index and the blob_data_paths down the props ?
We are sending them

  • from process_job
  • to update_state_for_block
  • to fetch_program_data_for_block

and are just fetching the value at the index ?

IMO it seems simpler to just fetch the block_number inside process_job and send here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also follows the same for :

  • fetch_snos_for_block
  • fetch_program_output_for_block

let tx_inclusion_status =
settlement_client.verify_tx_inclusion(tx_hash).await.map_err(|e| JobError::Other(OtherError(e)))?;

match tx_inclusion_status {
SettlementVerificationStatus::Rejected(_) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick : Why are we not using recursion here!!! I just realised it haha!
Could be crazy if we implement it.

@@ -336,36 +412,44 @@ impl StateUpdateJob {
}

/// Retrieves the SNOS output for the corresponding block.
async fn fetch_snos_for_block(&self, block_no: u64, config: Arc<Config>) -> Result<StarknetOsOutput, JobError> {
async fn fetch_snos_for_block(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • fetch_snos_for_block
  • fetch_program_output_for_block
    No need for these functions to be a part of this impl,
    they are not using self anywhere, we can move these to utils.rs

let fact = B256::from_str(fact).map_err(|e| ProverClientError::FailedToConvertFact(e.to_string()))?;
if self.fact_checker.is_valid(&fact).await? {
Ok(TaskStatus::Succeeded)
if cross_verify {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can make these nested conditions better,

  • by using a guard clause for when cross_verify is false.
  • by using a guard clause for when fact is None.

Here's a reply from claude to help :

if !cross_verify {
    tracing::debug!("Skipping cross-verification as it's disabled");
    return Ok(TaskStatus::Succeeded);
}

let Some(fact_str) = fact else {
    return Ok(TaskStatus::Failed("Cross verification enabled but no fact provided".to_string()));
};

let fact = B256::from_str(&fact_str)
    .map_err(|e| ProverClientError::FailedToConvertFact(e.to_string()))?;

tracing::debug!(fact = %hex::encode(&fact), "Cross-verifying fact on chain");

if self.fact_checker.is_valid(&fact).await? {
    Ok(TaskStatus::Succeeded)
} else {
    Ok(TaskStatus::Failed(format!(
        "Fact {} is not valid or not registered",
        hex::encode(fact)
    )))
}

"Cairo PIE task status: ONCHAIN and fact is valid."
);
CairoJobStatus::ONCHAIN => match fact {
Some(fact_str) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above,
we can use guard clause instead of match fact

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants