Skip to content

Commit

Permalink
small refac for readability
Browse files Browse the repository at this point in the history
  • Loading branch information
CapCap committed Jan 22, 2024
1 parent 7432bb8 commit eb84a09
Showing 1 changed file with 172 additions and 175 deletions.
347 changes: 172 additions & 175 deletions rust/processor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,181 +363,15 @@ impl Worker {
let processor_clone = processor.clone();
let auth_token = self.auth_token.clone();
let task = tokio::spawn(async move {
let start_version = transactions_pb
.transactions
.as_slice()
.first()
.unwrap()
.version;
let end_version = transactions_pb
.transactions
.as_slice()
.last()
.unwrap()
.version;
let start_txn_timestamp = transactions_pb
.transactions
.as_slice()
.first()
.unwrap()
.timestamp
.clone();
let end_txn_timestamp = transactions_pb
.transactions
.as_slice()
.last()
.unwrap()
.timestamp
.clone();
let txn_time = transactions_pb
.transactions
.as_slice()
.first()
.unwrap()
.timestamp
.clone();
if let Some(ref t) = txn_time {
PROCESSOR_DATA_RECEIVED_LATENCY_IN_SECS
.with_label_values(&[auth_token.as_str(), processor_name])
.set(time_diff_since_pb_timestamp_in_secs(t));
}
PROCESSOR_INVOCATIONS_COUNT
.with_label_values(&[processor_name])
.inc();

if enable_verbose_logging {
info!(
processor_name = processor_name,
service_type = PROCESSOR_SERVICE_TYPE,
start_version,
end_version,
size_in_bytes = transactions_pb.size_in_bytes,
"[Parser] Started processing one batch of transactions"
);
}

let processing_duration = std::time::Instant::now();

let processed_result = processor_clone
.process_transactions(
transactions_pb.transactions,
start_version,
end_version,
db_chain_id,
) // TODO: Change how we fetch chain_id, ideally can be accessed by processors when they are initiallized (e.g. so they can have a chain_id field set on new() funciton)
.await;
if let Some(ref t) = txn_time {
PROCESSOR_DATA_PROCESSED_LATENCY_IN_SECS
.with_label_values(&[auth_token.as_str(), processor_name])
.set(time_diff_since_pb_timestamp_in_secs(t));
}

let start_txn_timestamp_unix = start_txn_timestamp
.clone()
.map(|t| timestamp_to_unixtime(&t))
.unwrap_or_default();
let start_txn_timestamp_iso = start_txn_timestamp
.map(|t| timestamp_to_iso(&t))
.unwrap_or_default();
let end_txn_timestamp_iso = end_txn_timestamp
.map(|t| timestamp_to_iso(&t))
.unwrap_or_default();

LATEST_PROCESSED_VERSION
.with_label_values(&[
processor_name,
ProcessorStep::ProcessedBatch.get_step(),
ProcessorStep::ProcessedBatch.get_label(),
])
.set(end_version as i64);
TRANSACTION_UNIX_TIMESTAMP
.with_label_values(&[
processor_name,
ProcessorStep::ProcessedBatch.get_step(),
ProcessorStep::ProcessedBatch.get_label(),
])
.set(start_txn_timestamp_unix);
PROCESSED_BYTES_COUNT
.with_label_values(&[
processor_name,
ProcessorStep::ProcessedBatch.get_step(),
ProcessorStep::ProcessedBatch.get_label(),
])
.inc_by(transactions_pb.size_in_bytes);
NUM_TRANSACTIONS_PROCESSED_COUNT
.with_label_values(&[
processor_name,
ProcessorStep::ProcessedBatch.get_step(),
ProcessorStep::ProcessedBatch.get_label(),
])
.inc_by(end_version - start_version + 1);

if let Ok(res) = processed_result {
SINGLE_BATCH_PROCESSING_TIME_IN_SECS
.with_label_values(&[processor_name])
.set(processing_duration.elapsed().as_secs_f64());
SINGLE_BATCH_PARSING_TIME_IN_SECS
.with_label_values(&[processor_name])
.set(res.processing_duration_in_secs);
SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS
.with_label_values(&[processor_name])
.set(res.db_insertion_duration_in_secs);

if enable_verbose_logging {
info!(
processor_name = processor_name,
service_type = PROCESSOR_SERVICE_TYPE,
start_version,
end_version,
start_txn_timestamp_iso,
end_txn_timestamp_iso,
size_in_bytes = transactions_pb.size_in_bytes,
duration_in_secs = res.db_insertion_duration_in_secs,
tps = (end_version - start_version) as f64
/ processing_duration.elapsed().as_secs_f64(),
bytes_per_sec = transactions_pb.size_in_bytes as f64
/ processing_duration.elapsed().as_secs_f64(),
"[Parser] DB insertion time of one batch of transactions"
);
info!(
processor_name = processor_name,
service_type = PROCESSOR_SERVICE_TYPE,
start_version,
end_version,
start_txn_timestamp_iso,
end_txn_timestamp_iso,
size_in_bytes = transactions_pb.size_in_bytes,
duration_in_secs = res.processing_duration_in_secs,
tps = (end_version - start_version) as f64
/ processing_duration.elapsed().as_secs_f64(),
bytes_per_sec = transactions_pb.size_in_bytes as f64
/ processing_duration.elapsed().as_secs_f64(),
"[Parser] Parsing time of one batch of transactions"
);
info!(
processor_name = processor_name,
service_type = PROCESSOR_SERVICE_TYPE,
start_version,
end_version,
start_txn_timestamp_iso,
end_txn_timestamp_iso,
num_of_transactions = end_version - start_version + 1,
size_in_bytes = transactions_pb.size_in_bytes,
processing_duration_in_secs = res.processing_duration_in_secs,
db_insertion_duration_in_secs = res.db_insertion_duration_in_secs,
duration_in_secs = processing_duration.elapsed().as_secs_f64(),
tps = (end_version - start_version) as f64
/ processing_duration.elapsed().as_secs_f64(),
bytes_per_sec = transactions_pb.size_in_bytes as f64
/ processing_duration.elapsed().as_secs_f64(),
step = ProcessorStep::ProcessedBatch.get_step(),
"{}",
ProcessorStep::ProcessedBatch.get_label(),
);
}
}

processed_result
do_processor(
transactions_pb,
processor_clone,
db_chain_id,
processor_name,
&auth_token,
enable_verbose_logging,
)
.await
});
tasks.push(task);
}
Expand Down Expand Up @@ -750,6 +584,169 @@ impl Worker {
}
}

pub async fn do_processor(
transactions_pb: TransactionsPBResponse,
processor: Arc<Processor>,
db_chain_id: Option<u64>,
processor_name: &str,
auth_token: &str,
enable_verbose_logging: bool,
) -> Result<ProcessingResult> {
let transactions_pb_slice = transactions_pb.transactions.as_slice();
let first_txn = transactions_pb_slice.first().unwrap();

let last_txn = transactions_pb_slice.last().unwrap();

let start_version = first_txn.version;
let end_version = last_txn.version;
let start_txn_timestamp = first_txn.timestamp.clone();
let end_txn_timestamp = last_txn.timestamp.clone();
let txn_time = first_txn.timestamp.clone();

if let Some(ref t) = txn_time {
PROCESSOR_DATA_RECEIVED_LATENCY_IN_SECS
.with_label_values(&[auth_token, processor_name])
.set(time_diff_since_pb_timestamp_in_secs(t));
}
PROCESSOR_INVOCATIONS_COUNT
.with_label_values(&[processor_name])
.inc();

if enable_verbose_logging {
info!(
processor_name = processor_name,
service_type = PROCESSOR_SERVICE_TYPE,
start_version,
end_version,
size_in_bytes = transactions_pb.size_in_bytes,
"[Parser] Started processing one batch of transactions"
);
}

let processing_duration = std::time::Instant::now();

let processed_result = processor
.process_transactions(
transactions_pb.transactions,
start_version,
end_version,
db_chain_id,
) // TODO: Change how we fetch chain_id, ideally can be accessed by processors when they are initiallized (e.g. so they can have a chain_id field set on new() funciton)
.await;
if let Some(ref t) = txn_time {
PROCESSOR_DATA_PROCESSED_LATENCY_IN_SECS
.with_label_values(&[auth_token, processor_name])
.set(time_diff_since_pb_timestamp_in_secs(t));
}

let start_txn_timestamp_unix = start_txn_timestamp
.clone()
.map(|t| timestamp_to_unixtime(&t))
.unwrap_or_default();
let start_txn_timestamp_iso = start_txn_timestamp
.map(|t| timestamp_to_iso(&t))
.unwrap_or_default();
let end_txn_timestamp_iso = end_txn_timestamp
.map(|t| timestamp_to_iso(&t))
.unwrap_or_default();

LATEST_PROCESSED_VERSION
.with_label_values(&[
processor_name,
ProcessorStep::ProcessedBatch.get_step(),
ProcessorStep::ProcessedBatch.get_label(),
])
.set(end_version as i64);
TRANSACTION_UNIX_TIMESTAMP
.with_label_values(&[
processor_name,
ProcessorStep::ProcessedBatch.get_step(),
ProcessorStep::ProcessedBatch.get_label(),
])
.set(start_txn_timestamp_unix);
PROCESSED_BYTES_COUNT
.with_label_values(&[
processor_name,
ProcessorStep::ProcessedBatch.get_step(),
ProcessorStep::ProcessedBatch.get_label(),
])
.inc_by(transactions_pb.size_in_bytes);
NUM_TRANSACTIONS_PROCESSED_COUNT
.with_label_values(&[
processor_name,
ProcessorStep::ProcessedBatch.get_step(),
ProcessorStep::ProcessedBatch.get_label(),
])
.inc_by(end_version - start_version + 1);

if let Ok(res) = processed_result {
SINGLE_BATCH_PROCESSING_TIME_IN_SECS
.with_label_values(&[processor_name])
.set(processing_duration.elapsed().as_secs_f64());
SINGLE_BATCH_PARSING_TIME_IN_SECS
.with_label_values(&[processor_name])
.set(res.processing_duration_in_secs);
SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS
.with_label_values(&[processor_name])
.set(res.db_insertion_duration_in_secs);

if enable_verbose_logging {
info!(
processor_name = processor_name,
service_type = PROCESSOR_SERVICE_TYPE,
start_version,
end_version,
start_txn_timestamp_iso,
end_txn_timestamp_iso,
size_in_bytes = transactions_pb.size_in_bytes,
duration_in_secs = res.db_insertion_duration_in_secs,
tps = (end_version - start_version) as f64
/ processing_duration.elapsed().as_secs_f64(),
bytes_per_sec = transactions_pb.size_in_bytes as f64
/ processing_duration.elapsed().as_secs_f64(),
"[Parser] DB insertion time of one batch of transactions"
);
info!(
processor_name = processor_name,
service_type = PROCESSOR_SERVICE_TYPE,
start_version,
end_version,
start_txn_timestamp_iso,
end_txn_timestamp_iso,
size_in_bytes = transactions_pb.size_in_bytes,
duration_in_secs = res.processing_duration_in_secs,
tps = (end_version - start_version) as f64
/ processing_duration.elapsed().as_secs_f64(),
bytes_per_sec = transactions_pb.size_in_bytes as f64
/ processing_duration.elapsed().as_secs_f64(),
"[Parser] Parsing time of one batch of transactions"
);
info!(
processor_name = processor_name,
service_type = PROCESSOR_SERVICE_TYPE,
start_version,
end_version,
start_txn_timestamp_iso,
end_txn_timestamp_iso,
num_of_transactions = end_version - start_version + 1,
size_in_bytes = transactions_pb.size_in_bytes,
processing_duration_in_secs = res.processing_duration_in_secs,
db_insertion_duration_in_secs = res.db_insertion_duration_in_secs,
duration_in_secs = processing_duration.elapsed().as_secs_f64(),
tps = (end_version - start_version) as f64
/ processing_duration.elapsed().as_secs_f64(),
bytes_per_sec = transactions_pb.size_in_bytes as f64
/ processing_duration.elapsed().as_secs_f64(),
step = ProcessorStep::ProcessedBatch.get_step(),
"{}",
ProcessorStep::ProcessedBatch.get_label(),
);
}
}

processed_result
}

/// Given a config and a db pool, build a concrete instance of a processor.
// As time goes on there might be other things that we need to provide to certain
// processors. As that happens we can revist whether this function (which tends to
Expand Down

0 comments on commit eb84a09

Please sign in to comment.