diff --git a/rust/processor/src/gap_detector.rs b/rust/processor/src/gap_detector.rs index 96f84b3c..5aa1cccd 100644 --- a/rust/processor/src/gap_detector.rs +++ b/rust/processor/src/gap_detector.rs @@ -10,8 +10,8 @@ use ahash::AHashMap; use kanal::AsyncReceiver; use tracing::{error, info}; -// Size of a gap (in txn version) before gap detected -pub const DEFAULT_GAP_DETECTION_BATCH_SIZE: u64 = 500; +// Number of batches processed before gap detected +pub const DEFAULT_GAP_DETECTION_BATCH_SIZE: u64 = 100; // Number of seconds between each processor status update const UPDATE_PROCESSOR_STATUS_SECS: u64 = 1; diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index b3305221..aada94e4 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -318,7 +318,7 @@ impl Worker { receiver_clone.clone(), task_index, ) - .await; + .await; let size_in_bytes = transactions_pb.size_in_bytes as f64; let first_txn_version = transactions_pb @@ -336,33 +336,21 @@ impl Worker { let start_txn_timestamp = transactions_pb.start_txn_timestamp.clone(); let end_txn_timestamp = transactions_pb.end_txn_timestamp.clone(); - let start_txn_timestamp_unix = start_txn_timestamp - .as_ref() - .map(timestamp_to_unixtime) - .unwrap_or_default(); - let start_txn_timestamp_iso = start_txn_timestamp - .as_ref() - .map(timestamp_to_iso) - .unwrap_or_default(); - let end_txn_timestamp_iso = end_txn_timestamp - .as_ref() - .map(timestamp_to_iso) - .unwrap_or_default(); - let txn_channel_fetch_latency_sec = txn_channel_fetch_latency.elapsed().as_secs_f64(); debug!( processor_name = processor_name, service_type = PROCESSOR_SERVICE_TYPE, - start_version = batch_first_txn_version, - end_version = batch_last_txn_version, - num_of_transactions = - (batch_last_txn_version - batch_first_txn_version) as i64 + 1, + first_txn_version, + batch_first_txn_version, + last_txn_version, + batch_last_txn_version, + num_of_transactions = (last_txn_version - first_txn_version) as i64 + 1, size_in_bytes, task_index, duration_in_secs = txn_channel_fetch_latency_sec, - tps = (batch_last_txn_version as f64 - batch_first_txn_version as f64) + tps = (last_txn_version as f64 - first_txn_version as f64) / txn_channel_fetch_latency_sec, bytes_per_sec = size_in_bytes / txn_channel_fetch_latency_sec, "[Parser][T#{}] Successfully fetched transactions from channel.", @@ -426,11 +414,22 @@ impl Worker { let processing_time = processing_time.elapsed().as_secs_f64(); // We've processed things: do some data and metrics - - ma.tick_now((last_txn_version - first_txn_version) + 1); - let tps = (ma.avg() * 1000.0) as u64; + let start_txn_timestamp_unix = start_txn_timestamp + .as_ref() + .map(timestamp_to_unixtime) + .unwrap_or_default(); + let start_txn_timestamp_iso = start_txn_timestamp + .as_ref() + .map(timestamp_to_iso) + .unwrap_or_default(); + let end_txn_timestamp_iso = end_txn_timestamp + .as_ref() + .map(timestamp_to_iso) + .unwrap_or_default(); let num_processed = (last_txn_version - first_txn_version) + 1; + ma.tick_now(num_processed); + let tps = (ma.avg() * 1000.0) as u64; debug!( processor_name = processor_name,