From eb84a0935ca80499c1962951df9c2b149dfca5a0 Mon Sep 17 00:00:00 2001 From: CapCap Date: Mon, 22 Jan 2024 15:25:43 -0800 Subject: [PATCH] small refac for readability --- rust/processor/src/worker.rs | 347 +++++++++++++++++------------------ 1 file changed, 172 insertions(+), 175 deletions(-) diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 2e303632..c0fe976c 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -363,181 +363,15 @@ impl Worker { let processor_clone = processor.clone(); let auth_token = self.auth_token.clone(); let task = tokio::spawn(async move { - let start_version = transactions_pb - .transactions - .as_slice() - .first() - .unwrap() - .version; - let end_version = transactions_pb - .transactions - .as_slice() - .last() - .unwrap() - .version; - let start_txn_timestamp = transactions_pb - .transactions - .as_slice() - .first() - .unwrap() - .timestamp - .clone(); - let end_txn_timestamp = transactions_pb - .transactions - .as_slice() - .last() - .unwrap() - .timestamp - .clone(); - let txn_time = transactions_pb - .transactions - .as_slice() - .first() - .unwrap() - .timestamp - .clone(); - if let Some(ref t) = txn_time { - PROCESSOR_DATA_RECEIVED_LATENCY_IN_SECS - .with_label_values(&[auth_token.as_str(), processor_name]) - .set(time_diff_since_pb_timestamp_in_secs(t)); - } - PROCESSOR_INVOCATIONS_COUNT - .with_label_values(&[processor_name]) - .inc(); - - if enable_verbose_logging { - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version, - end_version, - size_in_bytes = transactions_pb.size_in_bytes, - "[Parser] Started processing one batch of transactions" - ); - } - - let processing_duration = std::time::Instant::now(); - - let processed_result = processor_clone - .process_transactions( - transactions_pb.transactions, - start_version, - end_version, - db_chain_id, - ) // TODO: Change how we fetch chain_id, ideally can be accessed by processors when they are initiallized (e.g. so they can have a chain_id field set on new() funciton) - .await; - if let Some(ref t) = txn_time { - PROCESSOR_DATA_PROCESSED_LATENCY_IN_SECS - .with_label_values(&[auth_token.as_str(), processor_name]) - .set(time_diff_since_pb_timestamp_in_secs(t)); - } - - let start_txn_timestamp_unix = start_txn_timestamp - .clone() - .map(|t| timestamp_to_unixtime(&t)) - .unwrap_or_default(); - let start_txn_timestamp_iso = start_txn_timestamp - .map(|t| timestamp_to_iso(&t)) - .unwrap_or_default(); - let end_txn_timestamp_iso = end_txn_timestamp - .map(|t| timestamp_to_iso(&t)) - .unwrap_or_default(); - - LATEST_PROCESSED_VERSION - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedBatch.get_step(), - ProcessorStep::ProcessedBatch.get_label(), - ]) - .set(end_version as i64); - TRANSACTION_UNIX_TIMESTAMP - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedBatch.get_step(), - ProcessorStep::ProcessedBatch.get_label(), - ]) - .set(start_txn_timestamp_unix); - PROCESSED_BYTES_COUNT - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedBatch.get_step(), - ProcessorStep::ProcessedBatch.get_label(), - ]) - .inc_by(transactions_pb.size_in_bytes); - NUM_TRANSACTIONS_PROCESSED_COUNT - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedBatch.get_step(), - ProcessorStep::ProcessedBatch.get_label(), - ]) - .inc_by(end_version - start_version + 1); - - if let Ok(res) = processed_result { - SINGLE_BATCH_PROCESSING_TIME_IN_SECS - .with_label_values(&[processor_name]) - .set(processing_duration.elapsed().as_secs_f64()); - SINGLE_BATCH_PARSING_TIME_IN_SECS - .with_label_values(&[processor_name]) - .set(res.processing_duration_in_secs); - SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS - .with_label_values(&[processor_name]) - .set(res.db_insertion_duration_in_secs); - - if enable_verbose_logging { - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version, - end_version, - start_txn_timestamp_iso, - end_txn_timestamp_iso, - size_in_bytes = transactions_pb.size_in_bytes, - duration_in_secs = res.db_insertion_duration_in_secs, - tps = (end_version - start_version) as f64 - / processing_duration.elapsed().as_secs_f64(), - bytes_per_sec = transactions_pb.size_in_bytes as f64 - / processing_duration.elapsed().as_secs_f64(), - "[Parser] DB insertion time of one batch of transactions" - ); - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version, - end_version, - start_txn_timestamp_iso, - end_txn_timestamp_iso, - size_in_bytes = transactions_pb.size_in_bytes, - duration_in_secs = res.processing_duration_in_secs, - tps = (end_version - start_version) as f64 - / processing_duration.elapsed().as_secs_f64(), - bytes_per_sec = transactions_pb.size_in_bytes as f64 - / processing_duration.elapsed().as_secs_f64(), - "[Parser] Parsing time of one batch of transactions" - ); - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version, - end_version, - start_txn_timestamp_iso, - end_txn_timestamp_iso, - num_of_transactions = end_version - start_version + 1, - size_in_bytes = transactions_pb.size_in_bytes, - processing_duration_in_secs = res.processing_duration_in_secs, - db_insertion_duration_in_secs = res.db_insertion_duration_in_secs, - duration_in_secs = processing_duration.elapsed().as_secs_f64(), - tps = (end_version - start_version) as f64 - / processing_duration.elapsed().as_secs_f64(), - bytes_per_sec = transactions_pb.size_in_bytes as f64 - / processing_duration.elapsed().as_secs_f64(), - step = ProcessorStep::ProcessedBatch.get_step(), - "{}", - ProcessorStep::ProcessedBatch.get_label(), - ); - } - } - - processed_result + do_processor( + transactions_pb, + processor_clone, + db_chain_id, + processor_name, + &auth_token, + enable_verbose_logging, + ) + .await }); tasks.push(task); } @@ -750,6 +584,169 @@ impl Worker { } } +pub async fn do_processor( + transactions_pb: TransactionsPBResponse, + processor: Arc, + db_chain_id: Option, + processor_name: &str, + auth_token: &str, + enable_verbose_logging: bool, +) -> Result { + let transactions_pb_slice = transactions_pb.transactions.as_slice(); + let first_txn = transactions_pb_slice.first().unwrap(); + + let last_txn = transactions_pb_slice.last().unwrap(); + + let start_version = first_txn.version; + let end_version = last_txn.version; + let start_txn_timestamp = first_txn.timestamp.clone(); + let end_txn_timestamp = last_txn.timestamp.clone(); + let txn_time = first_txn.timestamp.clone(); + + if let Some(ref t) = txn_time { + PROCESSOR_DATA_RECEIVED_LATENCY_IN_SECS + .with_label_values(&[auth_token, processor_name]) + .set(time_diff_since_pb_timestamp_in_secs(t)); + } + PROCESSOR_INVOCATIONS_COUNT + .with_label_values(&[processor_name]) + .inc(); + + if enable_verbose_logging { + info!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + start_version, + end_version, + size_in_bytes = transactions_pb.size_in_bytes, + "[Parser] Started processing one batch of transactions" + ); + } + + let processing_duration = std::time::Instant::now(); + + let processed_result = processor + .process_transactions( + transactions_pb.transactions, + start_version, + end_version, + db_chain_id, + ) // TODO: Change how we fetch chain_id, ideally can be accessed by processors when they are initiallized (e.g. so they can have a chain_id field set on new() funciton) + .await; + if let Some(ref t) = txn_time { + PROCESSOR_DATA_PROCESSED_LATENCY_IN_SECS + .with_label_values(&[auth_token, processor_name]) + .set(time_diff_since_pb_timestamp_in_secs(t)); + } + + let start_txn_timestamp_unix = start_txn_timestamp + .clone() + .map(|t| timestamp_to_unixtime(&t)) + .unwrap_or_default(); + let start_txn_timestamp_iso = start_txn_timestamp + .map(|t| timestamp_to_iso(&t)) + .unwrap_or_default(); + let end_txn_timestamp_iso = end_txn_timestamp + .map(|t| timestamp_to_iso(&t)) + .unwrap_or_default(); + + LATEST_PROCESSED_VERSION + .with_label_values(&[ + processor_name, + ProcessorStep::ProcessedBatch.get_step(), + ProcessorStep::ProcessedBatch.get_label(), + ]) + .set(end_version as i64); + TRANSACTION_UNIX_TIMESTAMP + .with_label_values(&[ + processor_name, + ProcessorStep::ProcessedBatch.get_step(), + ProcessorStep::ProcessedBatch.get_label(), + ]) + .set(start_txn_timestamp_unix); + PROCESSED_BYTES_COUNT + .with_label_values(&[ + processor_name, + ProcessorStep::ProcessedBatch.get_step(), + ProcessorStep::ProcessedBatch.get_label(), + ]) + .inc_by(transactions_pb.size_in_bytes); + NUM_TRANSACTIONS_PROCESSED_COUNT + .with_label_values(&[ + processor_name, + ProcessorStep::ProcessedBatch.get_step(), + ProcessorStep::ProcessedBatch.get_label(), + ]) + .inc_by(end_version - start_version + 1); + + if let Ok(res) = processed_result { + SINGLE_BATCH_PROCESSING_TIME_IN_SECS + .with_label_values(&[processor_name]) + .set(processing_duration.elapsed().as_secs_f64()); + SINGLE_BATCH_PARSING_TIME_IN_SECS + .with_label_values(&[processor_name]) + .set(res.processing_duration_in_secs); + SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS + .with_label_values(&[processor_name]) + .set(res.db_insertion_duration_in_secs); + + if enable_verbose_logging { + info!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + start_version, + end_version, + start_txn_timestamp_iso, + end_txn_timestamp_iso, + size_in_bytes = transactions_pb.size_in_bytes, + duration_in_secs = res.db_insertion_duration_in_secs, + tps = (end_version - start_version) as f64 + / processing_duration.elapsed().as_secs_f64(), + bytes_per_sec = transactions_pb.size_in_bytes as f64 + / processing_duration.elapsed().as_secs_f64(), + "[Parser] DB insertion time of one batch of transactions" + ); + info!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + start_version, + end_version, + start_txn_timestamp_iso, + end_txn_timestamp_iso, + size_in_bytes = transactions_pb.size_in_bytes, + duration_in_secs = res.processing_duration_in_secs, + tps = (end_version - start_version) as f64 + / processing_duration.elapsed().as_secs_f64(), + bytes_per_sec = transactions_pb.size_in_bytes as f64 + / processing_duration.elapsed().as_secs_f64(), + "[Parser] Parsing time of one batch of transactions" + ); + info!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + start_version, + end_version, + start_txn_timestamp_iso, + end_txn_timestamp_iso, + num_of_transactions = end_version - start_version + 1, + size_in_bytes = transactions_pb.size_in_bytes, + processing_duration_in_secs = res.processing_duration_in_secs, + db_insertion_duration_in_secs = res.db_insertion_duration_in_secs, + duration_in_secs = processing_duration.elapsed().as_secs_f64(), + tps = (end_version - start_version) as f64 + / processing_duration.elapsed().as_secs_f64(), + bytes_per_sec = transactions_pb.size_in_bytes as f64 + / processing_duration.elapsed().as_secs_f64(), + step = ProcessorStep::ProcessedBatch.get_step(), + "{}", + ProcessorStep::ProcessedBatch.get_label(), + ); + } + } + + processed_result +} + /// Given a config and a db pool, build a concrete instance of a processor. // As time goes on there might be other things that we need to provide to certain // processors. As that happens we can revist whether this function (which tends to