From 2020f0c4168ad7d872191bd6f311e91def39e792 Mon Sep 17 00:00:00 2001 From: CapCap Date: Thu, 1 Feb 2024 11:49:23 -0800 Subject: [PATCH] cdb compatible migrations Remove transactions and make migrations not async lint fmt clippy diesel async TLS sslrootcert lint lint support optional tls format more log parallel inserst lint bigger pool pool size 200 try bigger buffer try fixed 100 insert size use ahash + update rust smaller batches, bigger pool increase pool size to 800 small refac for readability increase buffer to 150 try batch size 20 back to 100 buffer refactor grpc into separate file lint try 40mb buffers insert of 10 again ARC instead of cloning txns lint avoid another clone try size 50 try 100 tryp 65 Change threading model for higher parallelism and throughput (#249) Co-authored-by: jillxuu clean cleanup try 200 connections coin processor spawn blocking sleep well ARC and consistent parallelism database parallelism undo no CDB compat Use gap detector Don't panic in gaps TEMP CHANGE FOR LOAD TEST send in chunks gap detector bigger parallel writes to db try chunks of 40 5k gap fix channel length post load test cleanup temporary execute in chunks cleanup and comments Add config for table chunk size cleanup --- rust/Cargo.lock | 24 +- rust/Cargo.toml | 2 + rust/processor/Cargo.toml | 1 + rust/processor/src/config.rs | 20 + rust/processor/src/gap_detector.rs | 13 +- rust/processor/src/grpc_stream.rs | 236 +++-- .../models/token_v2_models/v2_token_datas.rs | 2 +- .../token_v2_models/v2_token_ownerships.rs | 2 +- .../account_transactions_processor.rs | 28 +- .../processor/src/processors/ans_processor.rs | 140 +-- .../src/processors/coin_processor.rs | 162 ++-- .../src/processors/default_processor.rs | 229 +++-- .../src/processors/events_processor.rs | 30 +- .../processors/fungible_asset_processor.rs | 79 +- rust/processor/src/processors/mod.rs | 10 +- .../src/processors/monitoring_processor.rs | 2 +- .../src/processors/nft_metadata_processor.rs | 4 +- .../src/processors/objects_processor.rs | 47 +- .../src/processors/stake_processor.rs | 150 ++-- .../src/processors/token_processor.rs | 188 ++-- .../src/processors/token_v2_processor.rs | 150 ++-- .../processors/user_transaction_processor.rs | 49 +- rust/processor/src/utils/counters.rs | 38 +- rust/processor/src/utils/database.rs | 168 ++-- rust/processor/src/worker.rs | 842 ++++++++---------- 25 files changed, 1486 insertions(+), 1130 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 0c9a839c..efc04446 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -535,6 +535,7 @@ dependencies = [ "num-bigint", "num-integer", "num-traits", + "pq-sys", "serde_json", ] @@ -1332,6 +1333,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.8" @@ -1884,6 +1894,15 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "pq-sys" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31c0052426df997c0cbd30789eb44ca097e3541717a7b8fa36b1c464ee7edebd" +dependencies = [ + "vcpkg", +] + [[package]] name = "proc-macro2" version = "1.0.76" @@ -1919,6 +1938,7 @@ dependencies = [ "google-cloud-googleapis", "google-cloud-pubsub", "hex", + "itertools 0.12.1", "kanal", "native-tls", "num_cpus", @@ -1983,7 +2003,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" dependencies = [ "anyhow", - "itertools", + "itertools 0.10.5", "proc-macro2", "quote", "syn 1.0.109", @@ -1996,7 +2016,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" dependencies = [ "anyhow", - "itertools", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.48", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index fe95cb6d..6d400036 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -38,6 +38,7 @@ diesel = { version = "2.1", features = [ "chrono", "postgres_backend", "numeric", + "postgres", "serde_json", ] } diesel-async = { version = "0.4", features = ["postgres", "bb8", "tokio"] } @@ -55,6 +56,7 @@ cloud-storage = { version = "0.11.1", features = ["global-client"] } google-cloud-googleapis = "0.10.0" google-cloud-pubsub = "0.18.0" hex = "0.4.3" +itertools = "0.12.1" kanal = { version = "0.1.0-pre8", features = ["async"] } once_cell = "1.10.0" num_cpus = "1.16.0" diff --git a/rust/processor/Cargo.toml b/rust/processor/Cargo.toml index e13086c0..74fb894c 100644 --- a/rust/processor/Cargo.toml +++ b/rust/processor/Cargo.toml @@ -35,6 +35,7 @@ gcloud-sdk = { workspace = true } google-cloud-googleapis = { workspace = true } google-cloud-pubsub = { workspace = true } hex = { workspace = true } +itertools = { workspace = true } kanal = { workspace = true } num_cpus = { workspace = true } once_cell = { workspace = true } diff --git a/rust/processor/src/config.rs b/rust/processor/src/config.rs index 399c0542..316d072b 100644 --- a/rust/processor/src/config.rs +++ b/rust/processor/src/config.rs @@ -4,6 +4,7 @@ use crate::{ gap_detector::DEFAULT_GAP_DETECTION_BATCH_SIZE, processors::ProcessorConfig, worker::Worker, }; +use ahash::AHashMap; use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; use server_framework::RunnableConfig; @@ -20,12 +21,23 @@ pub struct IndexerGrpcProcessorConfig { #[serde(flatten)] pub grpc_http2_config: IndexerGrpcHttp2Config, pub auth_token: String, + // Version to start indexing from pub starting_version: Option, + // Version to end indexing at pub ending_version: Option, + // Number of tasks waiting to pull transaction batches from the channel and process them pub number_concurrent_processing_tasks: Option, + // Size of the pool for writes/reads to the DB. Limits maximum number of queries in flight pub db_pool_size: Option, + // Maximum number of batches "missing" before we assume we have an issue with gaps and abort #[serde(default = "IndexerGrpcProcessorConfig::default_gap_detection_batch_size")] pub gap_detection_batch_size: u64, + // Number of protobuff transactions to send per chunk to the processor tasks + #[serde(default = "IndexerGrpcProcessorConfig::default_pb_channel_txn_chunk_size")] + pub pb_channel_txn_chunk_size: usize, + // Number of rows to insert, per chunk, for each DB table. Default per table is ~32,768 (2**16/2) + #[serde(default = "AHashMap::new")] + pub per_table_chunk_sizes: AHashMap, pub enable_verbose_logging: Option, } @@ -33,6 +45,12 @@ impl IndexerGrpcProcessorConfig { pub const fn default_gap_detection_batch_size() -> u64 { DEFAULT_GAP_DETECTION_BATCH_SIZE } + + /// Make the default very large on purpose so that by default it's not chunked + /// This prevents any unexpected changes in behavior + pub const fn default_pb_channel_txn_chunk_size() -> usize { + 100_000 + } } #[async_trait::async_trait] @@ -49,6 +67,8 @@ impl RunnableConfig for IndexerGrpcProcessorConfig { self.number_concurrent_processing_tasks, self.db_pool_size, self.gap_detection_batch_size, + self.pb_channel_txn_chunk_size, + self.per_table_chunk_sizes.clone(), self.enable_verbose_logging, ) .await diff --git a/rust/processor/src/gap_detector.rs b/rust/processor/src/gap_detector.rs index c193938d..74b5771b 100644 --- a/rust/processor/src/gap_detector.rs +++ b/rust/processor/src/gap_detector.rs @@ -7,11 +7,10 @@ use crate::{ }; use ahash::AHashMap; use kanal::AsyncReceiver; -use std::sync::Arc; use tracing::{error, info}; // Number of batches processed before gap detected -pub const DEFAULT_GAP_DETECTION_BATCH_SIZE: u64 = 50; +pub const DEFAULT_GAP_DETECTION_BATCH_SIZE: u64 = 100; // Number of seconds between each processor status update const UPDATE_PROCESSOR_STATUS_SECS: u64 = 1; @@ -70,7 +69,7 @@ impl GapDetector { pub async fn create_gap_detector_status_tracker_loop( gap_detector_receiver: AsyncReceiver, - processor: Arc, + processor: Processor, starting_version: u64, gap_detection_batch_size: u64, ) { @@ -117,7 +116,7 @@ pub async fn create_gap_detector_status_tracker_loop( processor .update_last_processed_version( res_last_success_batch.end_version, - res_last_success_batch.last_transaction_timstamp.clone(), + res_last_success_batch.last_transaction_timestamp.clone(), ) .await .unwrap(); @@ -132,7 +131,7 @@ pub async fn create_gap_detector_status_tracker_loop( error = ?e, "[Parser] Gap detector task has panicked" ); - panic!(); + panic!("[Parser] Gap detector task has panicked: {:?}", e); }, } } @@ -152,7 +151,7 @@ mod test { let result = ProcessingResult { start_version: 100 + i * 100, end_version: 199 + i * 100, - last_transaction_timstamp: None, + last_transaction_timestamp: None, processing_duration_in_secs: 0.0, db_insertion_duration_in_secs: 0.0, }; @@ -167,7 +166,7 @@ mod test { .process_versions(ProcessingResult { start_version: 0, end_version: 99, - last_transaction_timstamp: None, + last_transaction_timestamp: None, processing_duration_in_secs: 0.0, db_insertion_duration_in_secs: 0.0, }) diff --git a/rust/processor/src/grpc_stream.rs b/rust/processor/src/grpc_stream.rs index 8f86545e..2a68029a 100644 --- a/rust/processor/src/grpc_stream.rs +++ b/rust/processor/src/grpc_stream.rs @@ -8,15 +8,20 @@ use crate::{ }, worker::TransactionsPBResponse, }; -use aptos_protos::indexer::v1::{ - raw_data_client::RawDataClient, GetTransactionsRequest, TransactionsResponse, +use aptos_moving_average::MovingAverage; +use aptos_protos::{ + indexer::v1::{raw_data_client::RawDataClient, GetTransactionsRequest, TransactionsResponse}, + transaction::v1::Transaction, }; +use bigdecimal::Zero; use futures_util::StreamExt; +use itertools::Itertools; use kanal::AsyncSender; use prost::Message; use std::time::Duration; +use tokio::time::timeout; use tonic::{Response, Streaming}; -use tracing::{error, info}; +use tracing::{debug, error, info}; use url::Url; /// GRPC request metadata key for the token ID. @@ -27,7 +32,7 @@ const GRPC_REQUEST_NAME_HEADER: &str = "x-aptos-request-name"; /// GRPC connection id const GRPC_CONNECTION_ID: &str = "x-aptos-connection-id"; /// We will try to reconnect to GRPC 5 times in case upstream connection is being updated -pub const RECONNECTION_MAX_RETRIES: u64 = 65; +pub const RECONNECTION_MAX_RETRIES: u64 = 5; /// 256MB pub const MAX_RESPONSE_SIZE: usize = 1024 * 1024 * 256; @@ -99,7 +104,39 @@ pub async fn get_stream( end_version = ending_version, "[Parser] Setting up GRPC client" ); - let mut rpc_client = match RawDataClient::connect(channel).await { + + // TODO: move this to a config file + // Retry this connection a few times before giving up + let mut connect_retries = 0; + let connect_res = loop { + let res = timeout( + Duration::from_secs(5), + RawDataClient::connect(channel.clone()), + ) + .await; + match res { + Ok(client) => break Ok(client), + Err(e) => { + error!( + processor_name = processor_name, + service_type = crate::worker::PROCESSOR_SERVICE_TYPE, + stream_address = indexer_grpc_data_service_address.to_string(), + start_version = starting_version, + end_version = ending_version, + retries = connect_retries, + error = ?e, + "[Parser] Error connecting to GRPC client" + ); + connect_retries += 1; + if connect_retries >= RECONNECTION_MAX_RETRIES { + break Err(e); + } + }, + } + } + .expect("[Parser] Timeout connecting to GRPC server"); + + let mut rpc_client = match connect_res { Ok(client) => client .accept_compressed(tonic::codec::CompressionEncoding::Gzip) .send_compressed(tonic::codec::CompressionEncoding::Gzip) @@ -212,12 +249,9 @@ pub async fn create_fetcher_loop( request_ending_version: Option, auth_token: String, processor_name: String, - batch_start_version: u64, - buffer_size: usize, + // The number of transactions per protobuf batch + pb_channel_txn_chunk_size: usize, ) { - let mut grpc_channel_recv_latency = std::time::Instant::now(); - let mut next_version_to_fetch = batch_start_version; - let mut reconnection_retries = 0; info!( processor_name = processor_name, service_type = crate::worker::PROCESSOR_SERVICE_TYPE, @@ -251,8 +285,13 @@ pub async fn create_fetcher_loop( "[Parser] Successfully connected to GRPC stream", ); - let mut last_fetched_version = batch_start_version as i64 - 1; - let mut batch_start_version = batch_start_version; + let mut grpc_channel_recv_latency = std::time::Instant::now(); + let mut next_version_to_fetch = starting_version; + let mut reconnection_retries = 0; + let mut last_fetched_version = starting_version as i64 - 1; + let mut fetch_ma = MovingAverage::new(3000); + let mut send_ma = MovingAverage::new(3000); + loop { let is_success = match resp_stream.next().await { Some(Ok(r)) => { @@ -262,10 +301,17 @@ pub async fn create_fetcher_loop( r.transactions.as_slice().first().unwrap().timestamp.clone(); let end_version = r.transactions.as_slice().last().unwrap().version; let end_txn_timestamp = r.transactions.as_slice().last().unwrap().timestamp.clone(); + next_version_to_fetch = end_version + 1; + let size_in_bytes = r.encoded_len() as u64; let chain_id: u64 = r.chain_id.expect("[Parser] Chain Id doesn't exist."); + let num_txns = r.transactions.len(); + let duration_in_secs = grpc_channel_recv_latency.elapsed().as_secs_f64(); + fetch_ma.tick_now(num_txns as u64); + let step = ProcessorStep::ReceivedTxnsFromGrpc.get_step(); + let label = ProcessorStep::ReceivedTxnsFromGrpc.get_label(); info!( processor_name = processor_name, service_type = crate::worker::PROCESSOR_SERVICE_TYPE, @@ -281,110 +327,126 @@ pub async fn create_fetcher_loop( .map(|t| timestamp_to_iso(&t)) .unwrap_or_default(), num_of_transactions = end_version - start_version + 1, - channel_size = buffer_size - txn_sender.capacity(), - size_in_bytes = r.encoded_len() as f64, - duration_in_secs = grpc_channel_recv_latency.elapsed().as_secs_f64(), - tps = (r.transactions.len() as f64 - / grpc_channel_recv_latency.elapsed().as_secs_f64()) - as u64, - bytes_per_sec = - r.encoded_len() as f64 / grpc_channel_recv_latency.elapsed().as_secs_f64(), - step = ProcessorStep::ReceivedTxnsFromGrpc.get_step(), + channel_size = txn_sender.len(), + size_in_bytes, + duration_in_secs, + tps = fetch_ma.avg() as u64, + bytes_per_sec = size_in_bytes as f64 / duration_in_secs, + step, "{}", - ProcessorStep::ReceivedTxnsFromGrpc.get_label(), + label, ); - let current_fetched_version = start_version; - if last_fetched_version + 1 != current_fetched_version as i64 { + if last_fetched_version + 1 != start_version as i64 { error!( - batch_start_version = batch_start_version, - last_fetched_version = last_fetched_version, - current_fetched_version = current_fetched_version, + batch_start_version = last_fetched_version + 1, + last_fetched_version, + current_fetched_version = start_version, "[Parser] Received batch with gap from GRPC stream" ); panic!("[Parser] Received batch with gap from GRPC stream"); } last_fetched_version = end_version as i64; - batch_start_version = (last_fetched_version + 1) as u64; LATEST_PROCESSED_VERSION - .with_label_values(&[ - &processor_name, - ProcessorStep::ReceivedTxnsFromGrpc.get_step(), - ProcessorStep::ReceivedTxnsFromGrpc.get_label(), - ]) + .with_label_values(&[&processor_name, step, label, "-"]) .set(end_version as i64); TRANSACTION_UNIX_TIMESTAMP - .with_label_values(&[ - &processor_name, - ProcessorStep::ReceivedTxnsFromGrpc.get_step(), - ProcessorStep::ReceivedTxnsFromGrpc.get_label(), - ]) + .with_label_values(&[&processor_name, step, label, "-"]) .set( start_txn_timestamp .map(|t| timestamp_to_unixtime(&t)) .unwrap_or_default(), ); PROCESSED_BYTES_COUNT - .with_label_values(&[ - &processor_name, - ProcessorStep::ReceivedTxnsFromGrpc.get_step(), - ProcessorStep::ReceivedTxnsFromGrpc.get_label(), - ]) + .with_label_values(&[&processor_name, step, label, "-"]) .inc_by(size_in_bytes); NUM_TRANSACTIONS_PROCESSED_COUNT - .with_label_values(&[ - &processor_name, - ProcessorStep::ReceivedTxnsFromGrpc.get_step(), - ProcessorStep::ReceivedTxnsFromGrpc.get_label(), - ]) + .with_label_values(&[&processor_name, step, label, "-"]) .inc_by(end_version - start_version + 1); let txn_channel_send_latency = std::time::Instant::now(); - let txn_pb = TransactionsPBResponse { - transactions: r.transactions, - chain_id, - size_in_bytes, - }; - let size_in_bytes = txn_pb.size_in_bytes; - let duration_in_secs = txn_channel_send_latency.elapsed().as_secs_f64(); - let tps = (txn_pb.transactions.len() as f64 - / txn_channel_send_latency.elapsed().as_secs_f64()) - as u64; - let bytes_per_sec = - txn_pb.size_in_bytes as f64 / txn_channel_send_latency.elapsed().as_secs_f64(); - match txn_sender.send(txn_pb).await { - Ok(()) => {}, - Err(e) => { - error!( - processor_name = processor_name, - stream_address = indexer_grpc_data_service_address.to_string(), - connection_id, - channel_size = buffer_size - txn_sender.capacity(), - error = ?e, - "[Parser] Error sending GRPC response to channel." - ); - panic!("[Parser] Error sending GRPC response to channel.") - }, + //potentially break txn_pb into many `TransactionsPBResponse` that are each 100 txns max in size + if num_txns < pb_channel_txn_chunk_size { + // We only need to send one; avoid the chunk/clone + let txn_pb = TransactionsPBResponse { + transactions: r.transactions, + chain_id, + size_in_bytes, + }; + + match txn_sender.send(txn_pb).await { + Ok(()) => {}, + Err(e) => { + error!( + processor_name = processor_name, + stream_address = indexer_grpc_data_service_address.to_string(), + connection_id, + error = ?e, + "[Parser] Error sending GRPC response to channel." + ); + panic!("[Parser] Error sending GRPC response to channel.") + }, + } + } else { + // We are breaking down a big batch into small batches; this involves an iterator + let average_size_in_bytes = size_in_bytes / num_txns as u64; + + let pb_txn_chunks: Vec> = r + .transactions + .into_iter() + .chunks(pb_channel_txn_chunk_size) + .into_iter() + .map(|chunk| chunk.collect()) + .collect(); + for txns in pb_txn_chunks { + let size_in_bytes = average_size_in_bytes * txns.len() as u64; + let txn_pb = TransactionsPBResponse { + transactions: txns, + chain_id, + size_in_bytes, + }; + + match txn_sender.send(txn_pb).await { + Ok(()) => {}, + Err(e) => { + error!( + processor_name = processor_name, + stream_address = indexer_grpc_data_service_address.to_string(), + connection_id, + error = ?e, + "[Parser] Error sending GRPC response to channel." + ); + panic!("[Parser] Error sending GRPC response to channel.") + }, + } + } } - info!( + + let duration_in_secs = txn_channel_send_latency.elapsed().as_secs_f64(); + send_ma.tick_now(num_txns as u64); + let tps = send_ma.avg() as u64; + let bytes_per_sec = size_in_bytes as f64 / duration_in_secs; + + let channel_size = txn_sender.len(); + debug!( processor_name = processor_name, service_type = crate::worker::PROCESSOR_SERVICE_TYPE, stream_address = indexer_grpc_data_service_address.to_string(), connection_id, - start_version = start_version, - end_version = end_version, - channel_size = buffer_size - txn_sender.capacity(), - size_in_bytes = size_in_bytes, - duration_in_secs = duration_in_secs, - bytes_per_sec = bytes_per_sec, - tps = tps, + start_version, + end_version, + channel_size, + size_in_bytes, + duration_in_secs, + bytes_per_sec, + tps, "[Parser] Successfully sent transactions to channel." ); FETCHER_THREAD_CHANNEL_SIZE .with_label_values(&[&processor_name]) - .set((buffer_size - txn_sender.capacity()) as i64); + .set(channel_size as i64); grpc_channel_recv_latency = std::time::Instant::now(); true }, @@ -432,19 +494,19 @@ pub async fn create_fetcher_loop( ); // Wait for the fetched transactions to finish processing before closing the channel loop { - let channel_capacity = txn_sender.capacity(); + let channel_size = txn_sender.len(); info!( processor_name = processor_name, service_type = crate::worker::PROCESSOR_SERVICE_TYPE, stream_address = indexer_grpc_data_service_address.to_string(), connection_id, - channel_size = buffer_size - channel_capacity, + channel_size, "[Parser] Waiting for channel to be empty" ); - if channel_capacity == buffer_size { + if channel_size.is_zero() { break; } - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; } info!( processor_name = processor_name, @@ -462,7 +524,7 @@ pub async fn create_fetcher_loop( // Sleep for 100ms between reconnect tries // TODO: Turn this into exponential backoff - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; if reconnection_retries >= RECONNECTION_MAX_RETRIES { error!( diff --git a/rust/processor/src/models/token_v2_models/v2_token_datas.rs b/rust/processor/src/models/token_v2_models/v2_token_datas.rs index cd040e3d..b7c62ceb 100644 --- a/rust/processor/src/models/token_v2_models/v2_token_datas.rs +++ b/rust/processor/src/models/token_v2_models/v2_token_datas.rs @@ -94,7 +94,7 @@ impl TokenDataV2 { } token_properties = object_metadata .property_map - .clone() + .as_ref() .map(|m| m.inner.clone()) .unwrap_or(token_properties); // In aggregator V2 name is now derived from a separate struct diff --git a/rust/processor/src/models/token_v2_models/v2_token_ownerships.rs b/rust/processor/src/models/token_v2_models/v2_token_ownerships.rs index db6a50f0..425c651d 100644 --- a/rust/processor/src/models/token_v2_models/v2_token_ownerships.rs +++ b/rust/processor/src/models/token_v2_models/v2_token_ownerships.rs @@ -88,7 +88,7 @@ pub struct NFTOwnershipV2 { } /// Need a separate struct for queryable because we don't want to define the inserted_at column (letting DB fill) -#[derive(Debug, Identifiable, Queryable, Clone)] +#[derive(Clone, Debug, Identifiable, Queryable)] #[diesel(primary_key(token_data_id, property_version_v1, owner_address, storage_id))] #[diesel(table_name = current_token_ownerships_v2)] pub struct CurrentTokenOwnershipV2Query { diff --git a/rust/processor/src/processors/account_transactions_processor.rs b/rust/processor/src/processors/account_transactions_processor.rs index 60771ca4..1b984b17 100644 --- a/rust/processor/src/processors/account_transactions_processor.rs +++ b/rust/processor/src/processors/account_transactions_processor.rs @@ -5,24 +5,27 @@ use super::{ProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ models::account_transaction_models::account_transactions::AccountTransaction, schema, - utils::database::{execute_in_chunks, PgDbPool}, + utils::database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool}, }; use ahash::AHashMap; use anyhow::bail; use aptos_protos::transaction::v1::Transaction; use async_trait::async_trait; use diesel::{pg::Pg, query_builder::QueryFragment}; -use field_count::FieldCount; use std::fmt::Debug; use tracing::error; pub struct AccountTransactionsProcessor { connection_pool: PgDbPool, + per_table_chunk_sizes: AHashMap, } impl AccountTransactionsProcessor { - pub fn new(connection_pool: PgDbPool) -> Self { - Self { connection_pool } + pub fn new(connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap) -> Self { + Self { + connection_pool, + per_table_chunk_sizes, + } } } @@ -42,7 +45,8 @@ async fn insert_to_db( name: &'static str, start_version: u64, end_version: u64, - account_transactions: Vec, + account_transactions: &[AccountTransaction], + per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -54,7 +58,10 @@ async fn insert_to_db( conn.clone(), insert_account_transactions_query, account_transactions, - AccountTransaction::field_count(), + get_config_table_chunk_size::( + "account_transactions", + per_table_chunk_sizes, + ), ) .await?; Ok(()) @@ -88,9 +95,11 @@ impl ProcessorTrait for AccountTransactionsProcessor { transactions: Vec, start_version: u64, end_version: u64, - _: Option, + _db_chain_id: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let mut account_transactions = AHashMap::new(); for txn in &transactions { @@ -113,7 +122,8 @@ impl ProcessorTrait for AccountTransactionsProcessor { self.name(), start_version, end_version, - account_transactions, + &account_transactions, + &self.per_table_chunk_sizes, ) .await; @@ -124,7 +134,7 @@ impl ProcessorTrait for AccountTransactionsProcessor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }), Err(err) => { error!( diff --git a/rust/processor/src/processors/ans_processor.rs b/rust/processor/src/processors/ans_processor.rs index 31e64377..a448ac3c 100644 --- a/rust/processor/src/processors/ans_processor.rs +++ b/rust/processor/src/processors/ans_processor.rs @@ -13,7 +13,7 @@ use crate::{ schema, utils::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, - database::{execute_in_chunks, PgDbPool}, + database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool}, util::standardize_address, }, }; @@ -28,7 +28,6 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; -use field_count::FieldCount; use serde::{Deserialize, Serialize}; use std::fmt::Debug; use tracing::error; @@ -44,10 +43,15 @@ pub struct AnsProcessorConfig { pub struct AnsProcessor { connection_pool: PgDbPool, config: AnsProcessorConfig, + per_table_chunk_sizes: AHashMap, } impl AnsProcessor { - pub fn new(connection_pool: PgDbPool, config: AnsProcessorConfig) -> Self { + pub fn new( + connection_pool: PgDbPool, + per_table_chunk_sizes: AHashMap, + config: AnsProcessorConfig, + ) -> Self { tracing::info!( ans_v1_primary_names_table_handle = config.ans_v1_primary_names_table_handle, ans_v1_name_records_table_handle = config.ans_v1_name_records_table_handle, @@ -56,6 +60,7 @@ impl AnsProcessor { ); Self { connection_pool, + per_table_chunk_sizes, config, } } @@ -77,14 +82,15 @@ async fn insert_to_db( name: &'static str, start_version: u64, end_version: u64, - current_ans_lookups: Vec, - ans_lookups: Vec, - current_ans_primary_names: Vec, - ans_primary_names: Vec, - current_ans_lookups_v2: Vec, - ans_lookups_v2: Vec, - current_ans_primary_names_v2: Vec, - ans_primary_names_v2: Vec, + current_ans_lookups: &[CurrentAnsLookup], + ans_lookups: &[AnsLookup], + current_ans_primary_names: &[CurrentAnsPrimaryName], + ans_primary_names: &[AnsPrimaryName], + current_ans_lookups_v2: &[CurrentAnsLookupV2], + ans_lookups_v2: &[AnsLookupV2], + current_ans_primary_names_v2: &[CurrentAnsPrimaryNameV2], + ans_primary_names_v2: &[AnsPrimaryNameV2], + per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -92,62 +98,86 @@ async fn insert_to_db( end_version = end_version, "Inserting to db", ); - execute_in_chunks( + let cal = execute_in_chunks( conn.clone(), insert_current_ans_lookups_query, current_ans_lookups, - CurrentAnsLookup::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_ans_lookup", + per_table_chunk_sizes, + ), + ); + let al = execute_in_chunks( conn.clone(), insert_ans_lookups_query, ans_lookups, - AnsLookup::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("ans_lookup", per_table_chunk_sizes), + ); + let capn = execute_in_chunks( conn.clone(), insert_current_ans_primary_names_query, current_ans_primary_names, - CurrentAnsPrimaryName::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_ans_primary_name", + per_table_chunk_sizes, + ), + ); + let apn = execute_in_chunks( conn.clone(), insert_ans_primary_names_query, ans_primary_names, - AnsPrimaryName::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("ans_primary_name", per_table_chunk_sizes), + ); + let cal_v2 = execute_in_chunks( conn.clone(), insert_current_ans_lookups_v2_query, current_ans_lookups_v2, - CurrentAnsLookupV2::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_ans_lookup_v2", + per_table_chunk_sizes, + ), + ); + let al_v2 = execute_in_chunks( conn.clone(), insert_ans_lookups_v2_query, ans_lookups_v2, - AnsLookupV2::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("ans_lookup_v2", per_table_chunk_sizes), + ); + let capn_v2 = execute_in_chunks( conn.clone(), insert_current_ans_primary_names_v2_query, current_ans_primary_names_v2, - CurrentAnsPrimaryNameV2::field_count(), - ) - .await?; - execute_in_chunks( - conn.clone(), + get_config_table_chunk_size::( + "current_ans_primary_name_v2", + per_table_chunk_sizes, + ), + ); + let apn_v2 = execute_in_chunks( + conn, insert_ans_primary_names_v2_query, ans_primary_names_v2, - AnsPrimaryNameV2::field_count(), - ) - .await?; + get_config_table_chunk_size::( + "ans_primary_name_v2", + per_table_chunk_sizes, + ), + ); + + let (cal_res, al_res, capn_res, apn_res, cal_v2_res, al_v2_res, capn_v2_res, apn_v2_res) = + tokio::join!(cal, al, capn, apn, cal_v2, al_v2, capn_v2, apn_v2); + + for res in vec![ + cal_res, + al_res, + capn_res, + apn_res, + cal_v2_res, + al_v2_res, + capn_v2_res, + apn_v2_res, + ] { + res?; + } + Ok(()) } @@ -333,6 +363,7 @@ impl ProcessorTrait for AnsProcessor { _db_chain_id: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); let ( all_current_ans_lookups, @@ -359,14 +390,15 @@ impl ProcessorTrait for AnsProcessor { self.name(), start_version, end_version, - all_current_ans_lookups, - all_ans_lookups, - all_current_ans_primary_names, - all_ans_primary_names, - all_current_ans_lookups_v2, - all_ans_lookups_v2, - all_current_ans_primary_names_v2, - all_ans_primary_names_v2, + &all_current_ans_lookups, + &all_ans_lookups, + &all_current_ans_primary_names, + &all_ans_primary_names, + &all_current_ans_lookups_v2, + &all_ans_lookups_v2, + &all_current_ans_primary_names_v2, + &all_ans_primary_names_v2, + &self.per_table_chunk_sizes, ) .await; @@ -378,7 +410,7 @@ impl ProcessorTrait for AnsProcessor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }), Err(e) => { error!( @@ -431,7 +463,7 @@ fn parse_ans( .with_label_values(&["AnsProcessor"]) .inc(); tracing::warn!( - transaction_version = transaction.version, + transaction_version = txn_version, "Transaction data doesn't exist", ); continue; diff --git a/rust/processor/src/processors/coin_processor.rs b/rust/processor/src/processors/coin_processor.rs index 55ba7ada..99928e5b 100644 --- a/rust/processor/src/processors/coin_processor.rs +++ b/rust/processor/src/processors/coin_processor.rs @@ -13,10 +13,10 @@ use crate::{ fungible_asset_models::v2_fungible_asset_activities::CurrentCoinBalancePK, }, schema, - utils::database::{execute_in_chunks, PgDbPool}, + utils::database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool}, }; use ahash::AHashMap; -use anyhow::bail; +use anyhow::{bail, Context}; use aptos_protos::transaction::v1::Transaction; use async_trait::async_trait; use diesel::{ @@ -24,7 +24,6 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; -use field_count::FieldCount; use std::fmt::Debug; use tracing::error; @@ -32,11 +31,15 @@ pub const APTOS_COIN_TYPE_STR: &str = "0x1::aptos_coin::AptosCoin"; pub struct CoinProcessor { connection_pool: PgDbPool, + per_table_chunk_sizes: AHashMap, } impl CoinProcessor { - pub fn new(connection_pool: PgDbPool) -> Self { - Self { connection_pool } + pub fn new(connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap) -> Self { + Self { + connection_pool, + per_table_chunk_sizes, + } } } @@ -56,11 +59,12 @@ async fn insert_to_db( name: &'static str, start_version: u64, end_version: u64, - coin_activities: Vec, - coin_infos: Vec, - coin_balances: Vec, - current_coin_balances: Vec, - coin_supply: Vec, + coin_activities: &[CoinActivity], + coin_infos: &[CoinInfo], + coin_balances: &[CoinBalance], + current_coin_balances: &[CurrentCoinBalance], + coin_supply: &[CoinSupply], + per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -69,42 +73,44 @@ async fn insert_to_db( "Inserting to db", ); - execute_in_chunks( + let ca = execute_in_chunks( conn.clone(), insert_coin_activities_query, coin_activities, - CoinActivity::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("coin_activities", per_table_chunk_sizes), + ); + let ci = execute_in_chunks( conn.clone(), insert_coin_infos_query, coin_infos, - CoinInfo::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("coin_infos", per_table_chunk_sizes), + ); + let cb = execute_in_chunks( conn.clone(), insert_coin_balances_query, coin_balances, - CoinBalance::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("coin_balances", per_table_chunk_sizes), + ); + let ccb = execute_in_chunks( conn.clone(), insert_current_coin_balances_query, current_coin_balances, - CurrentCoinBalance::field_count(), - ) - .await?; - execute_in_chunks( - conn.clone(), + get_config_table_chunk_size::( + "current_coin_balances", + per_table_chunk_sizes, + ), + ); + let cs = execute_in_chunks( + conn, inset_coin_supply_query, coin_supply, - CoinSupply::field_count(), - ) - .await?; + get_config_table_chunk_size::("coin_supply", per_table_chunk_sizes), + ); + let (ca_res, ci_res, cb_res, ccb_res, cs_res) = tokio::join!(ca, ci, cb, ccb, cs); + for res in [ca_res, ci_res, cb_res, ccb_res, cs_res] { + res?; + } Ok(()) } @@ -233,41 +239,60 @@ impl ProcessorTrait for CoinProcessor { _: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); - let mut all_coin_activities = vec![]; - let mut all_coin_balances = vec![]; - let mut all_coin_infos: AHashMap = AHashMap::new(); - let mut all_current_coin_balances: AHashMap = - AHashMap::new(); - let mut all_coin_supply = vec![]; + let ( + all_coin_activities, + all_coin_infos, + all_coin_balances, + all_current_coin_balances, + all_coin_supply, + ) = tokio::task::spawn_blocking(move || { + let mut all_coin_activities = vec![]; + let mut all_coin_balances = vec![]; + let mut all_coin_infos: AHashMap = AHashMap::new(); + let mut all_current_coin_balances: AHashMap = + AHashMap::new(); + let mut all_coin_supply = vec![]; - for txn in &transactions { - let ( - mut coin_activities, - mut coin_balances, - coin_infos, - current_coin_balances, - mut coin_supply, - ) = CoinActivity::from_transaction(txn); - all_coin_activities.append(&mut coin_activities); - all_coin_balances.append(&mut coin_balances); - all_coin_supply.append(&mut coin_supply); - // For coin infos, we only want to keep the first version, so insert only if key is not present already - for (key, value) in coin_infos { - all_coin_infos.entry(key).or_insert(value); + for txn in &transactions { + let ( + mut coin_activities, + mut coin_balances, + coin_infos, + current_coin_balances, + mut coin_supply, + ) = CoinActivity::from_transaction(txn); + all_coin_activities.append(&mut coin_activities); + all_coin_balances.append(&mut coin_balances); + all_coin_supply.append(&mut coin_supply); + // For coin infos, we only want to keep the first version, so insert only if key is not present already + for (key, value) in coin_infos { + all_coin_infos.entry(key).or_insert(value); + } + all_current_coin_balances.extend(current_coin_balances); } - all_current_coin_balances.extend(current_coin_balances); - } - let mut all_coin_infos = all_coin_infos.into_values().collect::>(); - let mut all_current_coin_balances = all_current_coin_balances - .into_values() - .collect::>(); + let mut all_coin_infos = all_coin_infos.into_values().collect::>(); + let mut all_current_coin_balances = all_current_coin_balances + .into_values() + .collect::>(); - // Sort by PK - all_coin_infos.sort_by(|a, b| a.coin_type.cmp(&b.coin_type)); - all_current_coin_balances.sort_by(|a, b| { - (&a.owner_address, &a.coin_type).cmp(&(&b.owner_address, &b.coin_type)) - }); + // Sort by PK + all_coin_infos.sort_by(|a, b| a.coin_type.cmp(&b.coin_type)); + all_current_coin_balances.sort_by(|a, b| { + (&a.owner_address, &a.coin_type).cmp(&(&b.owner_address, &b.coin_type)) + }); + + ( + all_coin_activities, + all_coin_infos, + all_coin_balances, + all_current_coin_balances, + all_coin_supply, + ) + }) + .await + .context("spawn_blocking for CoinProcessor thread failed")?; let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); @@ -277,11 +302,12 @@ impl ProcessorTrait for CoinProcessor { self.name(), start_version, end_version, - all_coin_activities, - all_coin_infos, - all_coin_balances, - all_current_coin_balances, - all_coin_supply, + &all_coin_activities, + &all_coin_infos, + &all_coin_balances, + &all_current_coin_balances, + &all_coin_supply, + &self.per_table_chunk_sizes, ) .await; @@ -293,7 +319,7 @@ impl ProcessorTrait for CoinProcessor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }), Err(err) => { error!( diff --git a/rust/processor/src/processors/default_processor.rs b/rust/processor/src/processors/default_processor.rs index 72aab1ef..b5c6bb07 100644 --- a/rust/processor/src/processors/default_processor.rs +++ b/rust/processor/src/processors/default_processor.rs @@ -12,7 +12,7 @@ use crate::{ write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel}, }, schema, - utils::database::{execute_in_chunks, PgDbPool}, + utils::database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool}, }; use ahash::AHashMap; use anyhow::bail; @@ -23,17 +23,21 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; -use field_count::FieldCount; use std::fmt::Debug; +use tokio::join; use tracing::error; pub struct DefaultProcessor { connection_pool: PgDbPool, + per_table_chunk_sizes: AHashMap, } impl DefaultProcessor { - pub fn new(connection_pool: PgDbPool) -> Self { - Self { connection_pool } + pub fn new(connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap) -> Self { + Self { + connection_pool, + per_table_chunk_sizes, + } } } @@ -53,16 +57,17 @@ async fn insert_to_db( name: &'static str, start_version: u64, end_version: u64, - txns: Vec, - block_metadata_transactions: Vec, - wscs: Vec, + txns: &[TransactionModel], + block_metadata_transactions: &[BlockMetadataTransactionModel], + wscs: &[WriteSetChangeModel], (move_modules, move_resources, table_items, current_table_items, table_metadata): ( - Vec, - Vec, - Vec, - Vec, - Vec, + &[MoveModule], + &[MoveResource], + &[TableItem], + &[CurrentTableItem], + &[TableMetadata], ), + per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -71,62 +76,76 @@ async fn insert_to_db( "Inserting to db", ); - execute_in_chunks( + let txns_res = execute_in_chunks( conn.clone(), insert_transactions_query, txns, - TransactionModel::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("transactions", per_table_chunk_sizes), + ); + let bmt_res = execute_in_chunks( conn.clone(), insert_block_metadata_transactions_query, block_metadata_transactions, - BlockMetadataTransactionModel::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "block_metadata_transactions", + per_table_chunk_sizes, + ), + ); + let wst_res = execute_in_chunks( conn.clone(), insert_write_set_changes_query, wscs, - WriteSetChangeModel::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "write_set_changes", + per_table_chunk_sizes, + ), + ); + let mm_res = execute_in_chunks( conn.clone(), insert_move_modules_query, move_modules, - MoveModule::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("move_modules", per_table_chunk_sizes), + ); + + let mr_res = execute_in_chunks( conn.clone(), insert_move_resources_query, move_resources, - MoveResource::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("move_resources", per_table_chunk_sizes), + ); + + let ti_res = execute_in_chunks( conn.clone(), insert_table_items_query, table_items, - TableItem::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("table_items", per_table_chunk_sizes), + ); + + let cti_res = execute_in_chunks( conn.clone(), insert_current_table_items_query, current_table_items, - CurrentTableItem::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_table_items", + per_table_chunk_sizes, + ), + ); + + let tm_res = execute_in_chunks( conn.clone(), insert_table_metadata_query, table_metadata, - TableMetadata::field_count(), - ) - .await?; + get_config_table_chunk_size::("table_metadatas", per_table_chunk_sizes), + ); + + let (txns_res, bmt_res, wst_res, mm_res, mr_res, ti_res, cti_res, tm_res) = + join!(txns_res, bmt_res, wst_res, mm_res, mr_res, ti_res, cti_res, tm_res); + + for res in [ + txns_res, bmt_res, wst_res, mm_res, mr_res, ti_res, cti_res, tm_res, + ] { + res?; + } Ok(()) } @@ -293,47 +312,72 @@ impl ProcessorTrait for DefaultProcessor { _: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); - let (txns, block_metadata_txns, write_set_changes, wsc_details) = - TransactionModel::from_transactions(&transactions); - - let mut block_metadata_transactions = vec![]; - for block_metadata_txn in block_metadata_txns { - block_metadata_transactions.push(block_metadata_txn.clone()); - } - let mut move_modules = vec![]; - let mut move_resources = vec![]; - let mut table_items = vec![]; - let mut current_table_items = AHashMap::new(); - let mut table_metadata = AHashMap::new(); - for detail in wsc_details { - match detail { - WriteSetChangeDetail::Module(module) => move_modules.push(module.clone()), - WriteSetChangeDetail::Resource(resource) => move_resources.push(resource.clone()), - WriteSetChangeDetail::Table(item, current_item, metadata) => { - table_items.push(item.clone()); - current_table_items.insert( - ( - current_item.table_handle.clone(), - current_item.key_hash.clone(), - ), - current_item.clone(), - ); - if let Some(meta) = metadata { - table_metadata.insert(meta.handle.clone(), meta.clone()); - } - }, + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let ( + txns, + block_metadata_transactions, + write_set_changes, + (move_modules, move_resources, table_items, current_table_items, table_metadata), + ) = tokio::task::spawn_blocking(move || { + let (txns, block_metadata_txns, write_set_changes, wsc_details) = + TransactionModel::from_transactions(&transactions); + let mut block_metadata_transactions = vec![]; + for block_metadata_txn in block_metadata_txns { + block_metadata_transactions.push(block_metadata_txn.clone()); + } + let mut move_modules = vec![]; + let mut move_resources = vec![]; + let mut table_items = vec![]; + let mut current_table_items = AHashMap::new(); + let mut table_metadata = AHashMap::new(); + for detail in wsc_details { + match detail { + WriteSetChangeDetail::Module(module) => move_modules.push(module.clone()), + WriteSetChangeDetail::Resource(resource) => { + move_resources.push(resource.clone()) + }, + WriteSetChangeDetail::Table(item, current_item, metadata) => { + table_items.push(item.clone()); + current_table_items.insert( + ( + current_item.table_handle.clone(), + current_item.key_hash.clone(), + ), + current_item.clone(), + ); + if let Some(meta) = metadata { + table_metadata.insert(meta.handle.clone(), meta.clone()); + } + }, + } } - } - // Getting list of values and sorting by pk in order to avoid postgres deadlock since we're doing multi threaded db writes - let mut current_table_items = current_table_items - .into_values() - .collect::>(); - let mut table_metadata = table_metadata.into_values().collect::>(); - // Sort by PK - current_table_items - .sort_by(|a, b| (&a.table_handle, &a.key_hash).cmp(&(&b.table_handle, &b.key_hash))); - table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle)); + // Getting list of values and sorting by pk in order to avoid postgres deadlock since we're doing multi threaded db writes + let mut current_table_items = current_table_items + .into_values() + .collect::>(); + let mut table_metadata = table_metadata.into_values().collect::>(); + // Sort by PK + current_table_items.sort_by(|a, b| { + (&a.table_handle, &a.key_hash).cmp(&(&b.table_handle, &b.key_hash)) + }); + table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle)); + + ( + txns, + block_metadata_transactions, + write_set_changes, + ( + move_modules, + move_resources, + table_items, + current_table_items, + table_metadata, + ), + ) + }) + .await + .expect("Failed to spawn_blocking for TransactionModel::from_transactions"); let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); @@ -343,16 +387,17 @@ impl ProcessorTrait for DefaultProcessor { self.name(), start_version, end_version, - txns, - block_metadata_transactions, - write_set_changes, + &txns, + &block_metadata_transactions, + &write_set_changes, ( - move_modules, - move_resources, - table_items, - current_table_items, - table_metadata, + &move_modules, + &move_resources, + &table_items, + ¤t_table_items, + &table_metadata, ), + &self.per_table_chunk_sizes, ) .await; @@ -363,7 +408,7 @@ impl ProcessorTrait for DefaultProcessor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }), Err(e) => { error!( diff --git a/rust/processor/src/processors/events_processor.rs b/rust/processor/src/processors/events_processor.rs index 2bc567c9..c382f82e 100644 --- a/rust/processor/src/processors/events_processor.rs +++ b/rust/processor/src/processors/events_processor.rs @@ -7,9 +7,10 @@ use crate::{ schema, utils::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, - database::{execute_in_chunks, PgDbPool}, + database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool}, }, }; +use ahash::AHashMap; use anyhow::bail; use aptos_protos::transaction::v1::{transaction::TxnData, Transaction}; use async_trait::async_trait; @@ -18,17 +19,20 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; -use field_count::FieldCount; use std::fmt::Debug; use tracing::error; pub struct EventsProcessor { connection_pool: PgDbPool, + per_table_chunk_sizes: AHashMap, } impl EventsProcessor { - pub fn new(connection_pool: PgDbPool) -> Self { - Self { connection_pool } + pub fn new(connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap) -> Self { + Self { + connection_pool, + per_table_chunk_sizes, + } } } @@ -48,7 +52,8 @@ async fn insert_to_db( name: &'static str, start_version: u64, end_version: u64, - events: Vec, + events: &[EventModel], + per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -56,7 +61,13 @@ async fn insert_to_db( end_version = end_version, "Inserting to db", ); - execute_in_chunks(conn, insert_events_query, events, EventModel::field_count()).await?; + execute_in_chunks( + conn, + insert_events_query, + events, + get_config_table_chunk_size::("events", per_table_chunk_sizes), + ) + .await?; Ok(()) } @@ -94,6 +105,8 @@ impl ProcessorTrait for EventsProcessor { _: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let mut events = vec![]; for txn in &transactions { let txn_version = txn.version as i64; @@ -131,7 +144,8 @@ impl ProcessorTrait for EventsProcessor { self.name(), start_version, end_version, - events, + &events, + &self.per_table_chunk_sizes, ) .await; @@ -142,7 +156,7 @@ impl ProcessorTrait for EventsProcessor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }), Err(e) => { error!( diff --git a/rust/processor/src/processors/fungible_asset_processor.rs b/rust/processor/src/processors/fungible_asset_processor.rs index 73256aa1..0db7fd99 100644 --- a/rust/processor/src/processors/fungible_asset_processor.rs +++ b/rust/processor/src/processors/fungible_asset_processor.rs @@ -20,7 +20,7 @@ use crate::{ schema, utils::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, - database::{execute_in_chunks, PgDbPool, PgPoolConnection}, + database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool, PgPoolConnection}, util::{get_entry_function_from_user_request, standardize_address}, }, }; @@ -34,7 +34,6 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; -use field_count::FieldCount; use std::fmt::Debug; use tracing::error; @@ -42,11 +41,15 @@ pub const APTOS_COIN_TYPE_STR: &str = "0x1::aptos_coin::AptosCoin"; pub struct FungibleAssetProcessor { connection_pool: PgDbPool, + per_table_chunk_sizes: AHashMap, } impl FungibleAssetProcessor { - pub fn new(connection_pool: PgDbPool) -> Self { - Self { connection_pool } + pub fn new(connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap) -> Self { + Self { + connection_pool, + per_table_chunk_sizes, + } } } @@ -66,10 +69,11 @@ async fn insert_to_db( name: &'static str, start_version: u64, end_version: u64, - fungible_asset_activities: Vec, - fungible_asset_metadata: Vec, - fungible_asset_balances: Vec, - current_fungible_asset_balances: Vec, + fungible_asset_activities: &[FungibleAssetActivity], + fungible_asset_metadata: &[FungibleAssetMetadataModel], + fungible_asset_balances: &[FungibleAssetBalance], + current_fungible_asset_balances: &[CurrentFungibleAssetBalance], + per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -78,34 +82,46 @@ async fn insert_to_db( "Inserting to db", ); - execute_in_chunks( + let faa = execute_in_chunks( conn.clone(), insert_fungible_asset_activities_query, fungible_asset_activities, - FungibleAssetActivity::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "fungible_asset_activities", + per_table_chunk_sizes, + ), + ); + let fam = execute_in_chunks( conn.clone(), insert_fungible_asset_metadata_query, fungible_asset_metadata, - FungibleAssetMetadataModel::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "fungible_asset_metadata", + per_table_chunk_sizes, + ), + ); + let fab = execute_in_chunks( conn.clone(), insert_fungible_asset_balances_query, fungible_asset_balances, - FungibleAssetBalance::field_count(), - ) - .await?; - execute_in_chunks( - conn.clone(), + get_config_table_chunk_size::( + "fungible_asset_balances", + per_table_chunk_sizes, + ), + ); + let cfab = execute_in_chunks( + conn, insert_current_fungible_asset_balances_query, current_fungible_asset_balances, - CurrentFungibleAssetBalance::field_count(), - ) - .await?; + get_config_table_chunk_size::( + "current_fungible_asset_balances", + per_table_chunk_sizes, + ), + ); + let (faa_res, fam_res, fab_res, cfab_res) = tokio::join!(faa, fam, fab, cfab); + for res in [faa_res, fam_res, fab_res, cfab_res] { + res?; + } Ok(()) } @@ -225,6 +241,8 @@ impl ProcessorTrait for FungibleAssetProcessor { _: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let mut conn = self.get_conn().await; let ( fungible_asset_activities, @@ -241,10 +259,11 @@ impl ProcessorTrait for FungibleAssetProcessor { self.name(), start_version, end_version, - fungible_asset_activities, - fungible_asset_metadata, - fungible_asset_balances, - current_fungible_asset_balances, + &fungible_asset_activities, + &fungible_asset_metadata, + &fungible_asset_balances, + ¤t_fungible_asset_balances, + &self.per_table_chunk_sizes, ) .await; let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); @@ -254,7 +273,7 @@ impl ProcessorTrait for FungibleAssetProcessor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }), Err(err) => { error!( diff --git a/rust/processor/src/processors/mod.rs b/rust/processor/src/processors/mod.rs index c42fe518..63a9e1b6 100644 --- a/rust/processor/src/processors/mod.rs +++ b/rust/processor/src/processors/mod.rs @@ -47,18 +47,16 @@ use crate::{ }; use aptos_protos::transaction::v1::Transaction as ProtoTransaction; use async_trait::async_trait; -use diesel::{upsert::excluded, ExpressionMethods}; +use diesel::{pg::upsert::excluded, ExpressionMethods}; use enum_dispatch::enum_dispatch; use serde::{Deserialize, Serialize}; use std::fmt::Debug; -type StartVersion = u64; -type EndVersion = u64; #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct ProcessingResult { - pub start_version: StartVersion, - pub end_version: EndVersion, - pub last_transaction_timstamp: Option, + pub start_version: u64, + pub end_version: u64, + pub last_transaction_timestamp: Option, pub processing_duration_in_secs: f64, pub db_insertion_duration_in_secs: f64, } diff --git a/rust/processor/src/processors/monitoring_processor.rs b/rust/processor/src/processors/monitoring_processor.rs index 45d7421c..b8245388 100644 --- a/rust/processor/src/processors/monitoring_processor.rs +++ b/rust/processor/src/processors/monitoring_processor.rs @@ -46,7 +46,7 @@ impl ProcessorTrait for MonitoringProcessor { end_version, processing_duration_in_secs: 0.0, db_insertion_duration_in_secs: 0.0, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp: transactions.last().unwrap().timestamp.clone(), }) } diff --git a/rust/processor/src/processors/nft_metadata_processor.rs b/rust/processor/src/processors/nft_metadata_processor.rs index bc094a43..d65c1ce8 100644 --- a/rust/processor/src/processors/nft_metadata_processor.rs +++ b/rust/processor/src/processors/nft_metadata_processor.rs @@ -93,6 +93,8 @@ impl ProcessorTrait for NftMetadataProcessor { db_chain_id: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let mut conn = self.get_conn().await; // First get all token related table metadata from the batch of transactions. This is in case @@ -178,7 +180,7 @@ impl ProcessorTrait for NftMetadataProcessor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }) } diff --git a/rust/processor/src/processors/objects_processor.rs b/rust/processor/src/processors/objects_processor.rs index 81a31e0e..ad4c2536 100644 --- a/rust/processor/src/processors/objects_processor.rs +++ b/rust/processor/src/processors/objects_processor.rs @@ -4,7 +4,10 @@ use super::{ProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ models::{ - fungible_asset_models::v2_fungible_asset_utils::FungibleAssetStore, + fungible_asset_models::{ + v2_fungible_asset_activities::FungibleAssetActivity, + v2_fungible_asset_utils::FungibleAssetStore, + }, object_models::{ v2_object_utils::{ ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, @@ -15,7 +18,7 @@ use crate::{ }, schema, utils::{ - database::{execute_in_chunks, PgDbPool}, + database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool}, util::standardize_address, }, }; @@ -28,17 +31,20 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; -use field_count::FieldCount; use std::fmt::Debug; use tracing::error; pub struct ObjectsProcessor { connection_pool: PgDbPool, + per_table_chunk_sizes: AHashMap, } impl ObjectsProcessor { - pub fn new(connection_pool: PgDbPool) -> Self { - Self { connection_pool } + pub fn new(connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap) -> Self { + Self { + connection_pool, + per_table_chunk_sizes, + } } } @@ -58,7 +64,8 @@ async fn insert_to_db( name: &'static str, start_version: u64, end_version: u64, - (objects, current_objects): (Vec, Vec), + (objects, current_objects): (&[Object], &[CurrentObject]), + per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -67,20 +74,25 @@ async fn insert_to_db( "Inserting to db", ); - execute_in_chunks( + let io = execute_in_chunks( conn.clone(), insert_objects_query, objects, - Object::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("objects", per_table_chunk_sizes), + ); + let co = execute_in_chunks( conn, insert_current_objects_query, current_objects, - CurrentObject::field_count(), - ) - .await?; + get_config_table_chunk_size::( + "current_objects", + per_table_chunk_sizes, + ), + ); + let (io_res, co_res) = tokio::join!(io, co); + for res in [io_res, co_res] { + res?; + } Ok(()) } @@ -149,6 +161,8 @@ impl ProcessorTrait for ObjectsProcessor { _: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let mut conn = self.get_conn().await; // Moving object handling here because we need a single object @@ -274,7 +288,8 @@ impl ProcessorTrait for ObjectsProcessor { self.name(), start_version, end_version, - (all_objects, all_current_objects), + (&all_objects, &all_current_objects), + &self.per_table_chunk_sizes, ) .await; let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); @@ -285,7 +300,7 @@ impl ProcessorTrait for ObjectsProcessor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }), Err(e) => { error!( diff --git a/rust/processor/src/processors/stake_processor.rs b/rust/processor/src/processors/stake_processor.rs index 9b585914..774d3b05 100644 --- a/rust/processor/src/processors/stake_processor.rs +++ b/rust/processor/src/processors/stake_processor.rs @@ -18,7 +18,7 @@ use crate::{ }, schema, utils::{ - database::{execute_in_chunks, PgDbPool}, + database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool}, util::{parse_timestamp, standardize_address}, }, }; @@ -31,17 +31,20 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; -use field_count::FieldCount; use std::fmt::Debug; use tracing::error; pub struct StakeProcessor { connection_pool: PgDbPool, + per_table_chunk_sizes: AHashMap, } impl StakeProcessor { - pub fn new(connection_pool: PgDbPool) -> Self { - Self { connection_pool } + pub fn new(connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap) -> Self { + Self { + connection_pool, + per_table_chunk_sizes, + } } } @@ -61,15 +64,16 @@ async fn insert_to_db( name: &'static str, start_version: u64, end_version: u64, - current_stake_pool_voters: Vec, - proposal_votes: Vec, - delegator_actvities: Vec, - delegator_balances: Vec, - current_delegator_balances: Vec, - delegator_pools: Vec, - delegator_pool_balances: Vec, - current_delegator_pool_balances: Vec, - current_delegated_voter: Vec, + current_stake_pool_voters: &[CurrentStakingPoolVoter], + proposal_votes: &[ProposalVote], + delegator_actvities: &[DelegatedStakingActivity], + delegator_balances: &[DelegatorBalance], + current_delegator_balances: &[CurrentDelegatorBalance], + delegator_pools: &[DelegatorPool], + delegator_pool_balances: &[DelegatorPoolBalance], + current_delegator_pool_balances: &[CurrentDelegatorPoolBalance], + current_delegated_voter: &[CurrentDelegatedVoter], + per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -78,69 +82,92 @@ async fn insert_to_db( "Inserting to db", ); - execute_in_chunks( + let cspv = execute_in_chunks( conn.clone(), insert_current_stake_pool_voter_query, current_stake_pool_voters, - CurrentStakingPoolVoter::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_staking_pool_voter", + per_table_chunk_sizes, + ), + ); + let pv = execute_in_chunks( conn.clone(), insert_proposal_votes_query, proposal_votes, - ProposalVote::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("proposal_votes", per_table_chunk_sizes), + ); + let da = execute_in_chunks( conn.clone(), insert_delegator_activities_query, delegator_actvities, - DelegatedStakingActivity::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "delegated_staking_activities", + per_table_chunk_sizes, + ), + ); + let db = execute_in_chunks( conn.clone(), insert_delegator_balances_query, delegator_balances, - DelegatorBalance::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "delegator_balances", + per_table_chunk_sizes, + ), + ); + let cdb = execute_in_chunks( conn.clone(), insert_current_delegator_balances_query, current_delegator_balances, - CurrentDelegatorBalance::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_delegator_balances", + per_table_chunk_sizes, + ), + ); + let dp = execute_in_chunks( conn.clone(), insert_delegator_pools_query, delegator_pools, - DelegatorPool::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "delegated_staking_pools", + per_table_chunk_sizes, + ), + ); + let dpb = execute_in_chunks( conn.clone(), insert_delegator_pool_balances_query, delegator_pool_balances, - DelegatorPoolBalance::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "delegated_staking_pool_balances", + per_table_chunk_sizes, + ), + ); + let cdpb = execute_in_chunks( conn.clone(), insert_current_delegator_pool_balances_query, current_delegator_pool_balances, - CurrentDelegatorPoolBalance::field_count(), - ) - .await?; - execute_in_chunks( - conn.clone(), + get_config_table_chunk_size::( + "current_delegated_staking_pool_balances", + per_table_chunk_sizes, + ), + ); + let cdv = execute_in_chunks( + conn, insert_current_delegated_voter_query, current_delegated_voter, - CurrentDelegatedVoter::field_count(), - ) - .await?; + get_config_table_chunk_size::( + "current_delegated_voter", + per_table_chunk_sizes, + ), + ); + + let (cspv_res, pv_res, da_res, db_res, cdb_res, dp_res, dpb_res, cdpb_res, cdv_res) = + futures::join!(cspv, pv, da, db, cdb, dp, dpb, cdpb, cdv); + for res in [ + cspv_res, pv_res, da_res, db_res, cdb_res, dp_res, dpb_res, cdpb_res, cdv_res, + ] { + res?; + } Ok(()) } @@ -351,6 +378,8 @@ impl ProcessorTrait for StakeProcessor { _: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let mut conn = self.get_conn().await; let mut all_current_stake_pool_voters: StakingPoolVoterMap = AHashMap::new(); @@ -510,15 +539,16 @@ impl ProcessorTrait for StakeProcessor { self.name(), start_version, end_version, - all_current_stake_pool_voters, - all_proposal_votes, - all_delegator_activities, - all_delegator_balances, - all_current_delegator_balances, - all_delegator_pools, - all_delegator_pool_balances, - all_current_delegator_pool_balances, - all_current_delegated_voter, + &all_current_stake_pool_voters, + &all_proposal_votes, + &all_delegator_activities, + &all_delegator_balances, + &all_current_delegator_balances, + &all_delegator_pools, + &all_delegator_pool_balances, + &all_current_delegator_pool_balances, + &all_current_delegated_voter, + &self.per_table_chunk_sizes, ) .await; let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); @@ -528,7 +558,7 @@ impl ProcessorTrait for StakeProcessor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }), Err(e) => { error!( diff --git a/rust/processor/src/processors/token_processor.rs b/rust/processor/src/processors/token_processor.rs index a42241a3..c383d5ff 100644 --- a/rust/processor/src/processors/token_processor.rs +++ b/rust/processor/src/processors/token_processor.rs @@ -16,7 +16,7 @@ use crate::{ }, }, schema, - utils::database::{execute_in_chunks, PgDbPool}, + utils::database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool}, }; use ahash::AHashMap; use anyhow::bail; @@ -27,7 +27,6 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; -use field_count::FieldCount; use serde::{Deserialize, Serialize}; use std::fmt::Debug; use tracing::error; @@ -41,13 +40,19 @@ pub struct TokenProcessorConfig { pub struct TokenProcessor { connection_pool: PgDbPool, config: TokenProcessorConfig, + per_table_chunk_sizes: AHashMap, } impl TokenProcessor { - pub fn new(connection_pool: PgDbPool, config: TokenProcessorConfig) -> Self { + pub fn new( + connection_pool: PgDbPool, + per_table_chunk_sizes: AHashMap, + config: TokenProcessorConfig, + ) -> Self { Self { connection_pool, config, + per_table_chunk_sizes, } } } @@ -69,19 +74,20 @@ async fn insert_to_db( start_version: u64, end_version: u64, (tokens, token_ownerships, token_datas, collection_datas): ( - Vec, - Vec, - Vec, - Vec, + &[Token], + &[TokenOwnership], + &[TokenData], + &[CollectionData], ), (current_token_ownerships, current_token_datas, current_collection_datas): ( - Vec, - Vec, - Vec, + &[CurrentTokenOwnership], + &[CurrentTokenData], + &[CurrentCollectionData], ), - token_activities: Vec, - current_token_claims: Vec, - nft_points: Vec, + token_activities: &[TokenActivity], + current_token_claims: &[CurrentTokenPendingClaim], + nft_points: &[NftPoints], + per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -90,77 +96,89 @@ async fn insert_to_db( "Inserting to db", ); - execute_in_chunks( + let t = execute_in_chunks( conn.clone(), insert_tokens_query, tokens, - Token::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("tokens", per_table_chunk_sizes), + ); + let to = execute_in_chunks( conn.clone(), insert_token_ownerships_query, token_ownerships, - TokenOwnership::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("token_ownerships", per_table_chunk_sizes), + ); + let td = execute_in_chunks( conn.clone(), insert_token_datas_query, token_datas, - TokenData::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("token_datas", per_table_chunk_sizes), + ); + let cd = execute_in_chunks( conn.clone(), insert_collection_datas_query, collection_datas, - CollectionData::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("collection_datas", per_table_chunk_sizes), + ); + let cto = execute_in_chunks( conn.clone(), insert_current_token_ownerships_query, current_token_ownerships, - CurrentTokenOwnership::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_token_ownerships", + per_table_chunk_sizes, + ), + ); + let ctd = execute_in_chunks( conn.clone(), insert_current_token_datas_query, current_token_datas, - CurrentTokenData::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_token_datas", + per_table_chunk_sizes, + ), + ); + let ccd = execute_in_chunks( conn.clone(), insert_current_collection_datas_query, current_collection_datas, - CurrentCollectionData::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_collection_datas", + per_table_chunk_sizes, + ), + ); + + let ta = execute_in_chunks( conn.clone(), insert_token_activities_query, token_activities, - TokenActivity::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("token_activities", per_table_chunk_sizes), + ); + + let ctc = execute_in_chunks( conn.clone(), insert_current_token_claims_query, current_token_claims, - CurrentTokenPendingClaim::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_token_pending_claims", + per_table_chunk_sizes, + ), + ); + let np = execute_in_chunks( conn, insert_nft_points_query, nft_points, - NftPoints::field_count(), - ) - .await?; + get_config_table_chunk_size::("nft_points", per_table_chunk_sizes), + ); + + let (t_res, to_res, td_res, cd_res, cto_res, ctd_res, ccd_res, ta_res, ctc_res, np) = + tokio::join!(t, to, td, cd, cto, ctd, ccd, ta, ctc, np); + for res in [ + t_res, to_res, td_res, cd_res, cto_res, ctd_res, ccd_res, ta_res, ctc_res, np, + ] { + res?; + } Ok(()) } @@ -359,25 +377,26 @@ fn insert_current_token_claims_query( ) { use schema::current_token_pending_claims::dsl::*; - (diesel::insert_into(schema::current_token_pending_claims::table) - .values(items_to_insert) - .on_conflict(( - token_data_id_hash, property_version, from_address, to_address - )) - .do_update() - .set(( - collection_data_id_hash.eq(excluded(collection_data_id_hash)), - creator_address.eq(excluded(creator_address)), - collection_name.eq(excluded(collection_name)), - name.eq(excluded(name)), - amount.eq(excluded(amount)), - table_handle.eq(excluded(table_handle)), - last_transaction_version.eq(excluded(last_transaction_version)), - inserted_at.eq(excluded(inserted_at)), - token_data_id.eq(excluded(token_data_id)), - collection_id.eq(excluded(collection_id)), - )), - Some(" WHERE current_token_pending_claims.last_transaction_version <= excluded.last_transaction_version "), + ( + diesel::insert_into(schema::current_token_pending_claims::table) + .values(items_to_insert) + .on_conflict(( + token_data_id_hash, property_version, from_address, to_address + )) + .do_update() + .set(( + collection_data_id_hash.eq(excluded(collection_data_id_hash)), + creator_address.eq(excluded(creator_address)), + collection_name.eq(excluded(collection_name)), + name.eq(excluded(name)), + amount.eq(excluded(amount)), + table_handle.eq(excluded(table_handle)), + last_transaction_version.eq(excluded(last_transaction_version)), + inserted_at.eq(excluded(inserted_at)), + token_data_id.eq(excluded(token_data_id)), + collection_id.eq(excluded(collection_id)), + )), + Some(" WHERE current_token_pending_claims.last_transaction_version <= excluded.last_transaction_version "), ) } @@ -412,6 +431,8 @@ impl ProcessorTrait for TokenProcessor { _: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let mut conn = self.get_conn().await; // First get all token related table metadata from the batch of transactions. This is in case @@ -527,19 +548,20 @@ impl ProcessorTrait for TokenProcessor { start_version, end_version, ( - all_tokens, - all_token_ownerships, - all_token_datas, - all_collection_datas, + &all_tokens, + &all_token_ownerships, + &all_token_datas, + &all_collection_datas, ), ( - all_current_token_ownerships, - all_current_token_datas, - all_current_collection_datas, + &all_current_token_ownerships, + &all_current_token_datas, + &all_current_collection_datas, ), - all_token_activities, - all_current_token_claims, - all_nft_points, + &all_token_activities, + &all_current_token_claims, + &all_nft_points, + &self.per_table_chunk_sizes, ) .await; @@ -550,7 +572,7 @@ impl ProcessorTrait for TokenProcessor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }), Err(e) => { error!( diff --git a/rust/processor/src/processors/token_v2_processor.rs b/rust/processor/src/processors/token_v2_processor.rs index 3b7799d0..d781b516 100644 --- a/rust/processor/src/processors/token_v2_processor.rs +++ b/rust/processor/src/processors/token_v2_processor.rs @@ -30,7 +30,7 @@ use crate::{ schema, utils::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, - database::{execute_in_chunks, PgDbPool, PgPoolConnection}, + database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool, PgPoolConnection}, util::{get_entry_function_from_user_request, parse_timestamp, standardize_address}, }, }; @@ -43,17 +43,20 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; -use field_count::FieldCount; use std::fmt::Debug; use tracing::error; pub struct TokenV2Processor { connection_pool: PgDbPool, + per_table_chunk_sizes: AHashMap, } impl TokenV2Processor { - pub fn new(connection_pool: PgDbPool) -> Self { - Self { connection_pool } + pub fn new(connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap) -> Self { + Self { + connection_pool, + per_table_chunk_sizes, + } } } @@ -73,14 +76,15 @@ async fn insert_to_db( name: &'static str, start_version: u64, end_version: u64, - collections_v2: Vec, - token_datas_v2: Vec, - token_ownerships_v2: Vec, - current_collections_v2: Vec, - current_token_datas_v2: Vec, - current_token_ownerships_v2: Vec, - token_activities_v2: Vec, - current_token_v2_metadata: Vec, + collections_v2: &[CollectionV2], + token_datas_v2: &[TokenDataV2], + token_ownerships_v2: &[TokenOwnershipV2], + current_collections_v2: &[CurrentCollectionV2], + current_token_datas_v2: &[CurrentTokenDataV2], + current_token_ownerships_v2: &[CurrentTokenOwnershipV2], + token_activities_v2: &[TokenActivityV2], + current_token_v2_metadata: &[CurrentTokenV2Metadata], + per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -89,62 +93,97 @@ async fn insert_to_db( "Inserting to db", ); - execute_in_chunks( + let coll_v2 = execute_in_chunks( conn.clone(), insert_collections_v2_query, collections_v2, - CollectionV2::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("collections_v2", per_table_chunk_sizes), + ); + let td_v2 = execute_in_chunks( conn.clone(), insert_token_datas_v2_query, token_datas_v2, - TokenDataV2::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("token_datas_v2", per_table_chunk_sizes), + ); + let to_v2 = execute_in_chunks( conn.clone(), insert_token_ownerships_v2_query, token_ownerships_v2, - TokenOwnershipV2::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "token_ownerships_v2", + per_table_chunk_sizes, + ), + ); + let cc_v2 = execute_in_chunks( conn.clone(), insert_current_collections_v2_query, current_collections_v2, - CurrentCollectionV2::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_collections_v2", + per_table_chunk_sizes, + ), + ); + let ctd_v2 = execute_in_chunks( conn.clone(), insert_current_token_datas_v2_query, current_token_datas_v2, - CurrentTokenDataV2::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_token_datas_v2", + per_table_chunk_sizes, + ), + ); + let cto_v2 = execute_in_chunks( conn.clone(), insert_current_token_ownerships_v2_query, current_token_ownerships_v2, - CurrentTokenOwnershipV2::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_token_ownerships_v2", + per_table_chunk_sizes, + ), + ); + let ta_v2 = execute_in_chunks( conn.clone(), insert_token_activities_v2_query, token_activities_v2, - TokenActivityV2::field_count(), - ) - .await?; - execute_in_chunks( - conn.clone(), + get_config_table_chunk_size::( + "token_activities_v2", + per_table_chunk_sizes, + ), + ); + let ct_v2 = execute_in_chunks( + conn, insert_current_token_v2_metadatas_query, current_token_v2_metadata, - CurrentTokenV2Metadata::field_count(), - ) - .await?; + get_config_table_chunk_size::( + "current_token_v2_metadata", + per_table_chunk_sizes, + ), + ); + + let ( + coll_v2_res, + td_v2_res, + to_v2_res, + cc_v2_res, + ctd_v2_res, + cto_v2_res, + ta_v2_res, + ct_v2_res, + ) = tokio::join!(coll_v2, td_v2, to_v2, cc_v2, ctd_v2, cto_v2, ta_v2, ct_v2,); + + for res in [ + coll_v2_res, + td_v2_res, + to_v2_res, + cc_v2_res, + ctd_v2_res, + cto_v2_res, + ta_v2_res, + ct_v2_res, + ] { + res?; + } + Ok(()) } @@ -351,6 +390,8 @@ impl ProcessorTrait for TokenV2Processor { _: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let mut conn = self.get_conn().await; // First get all token related table metadata from the batch of transactions. This is in case @@ -378,14 +419,15 @@ impl ProcessorTrait for TokenV2Processor { self.name(), start_version, end_version, - collections_v2, - token_datas_v2, - token_ownerships_v2, - current_collections_v2, - current_token_ownerships_v2, - current_token_datas_v2, - token_activities_v2, - current_token_v2_metadata, + &collections_v2, + &token_datas_v2, + &token_ownerships_v2, + ¤t_collections_v2, + ¤t_token_ownerships_v2, + ¤t_token_datas_v2, + &token_activities_v2, + ¤t_token_v2_metadata, + &self.per_table_chunk_sizes, ) .await; @@ -396,7 +438,7 @@ impl ProcessorTrait for TokenV2Processor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }), Err(e) => { error!( diff --git a/rust/processor/src/processors/user_transaction_processor.rs b/rust/processor/src/processors/user_transaction_processor.rs index 1e8665dd..5571eedb 100644 --- a/rust/processor/src/processors/user_transaction_processor.rs +++ b/rust/processor/src/processors/user_transaction_processor.rs @@ -9,9 +9,10 @@ use crate::{ schema, utils::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, - database::{execute_in_chunks, PgDbPool}, + database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool}, }, }; +use ahash::AHashMap; use anyhow::bail; use aptos_protos::transaction::v1::{transaction::TxnData, Transaction}; use async_trait::async_trait; @@ -20,17 +21,20 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; -use field_count::FieldCount; use std::fmt::Debug; use tracing::error; pub struct UserTransactionProcessor { connection_pool: PgDbPool, + per_table_chunk_sizes: AHashMap, } impl UserTransactionProcessor { - pub fn new(connection_pool: PgDbPool) -> Self { - Self { connection_pool } + pub fn new(connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap) -> Self { + Self { + connection_pool, + per_table_chunk_sizes, + } } } @@ -50,8 +54,9 @@ async fn insert_to_db( name: &'static str, start_version: u64, end_version: u64, - user_transactions: Vec, - signatures: Vec, + user_transactions: &[UserTransactionModel], + signatures: &[Signature], + per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -60,21 +65,26 @@ async fn insert_to_db( "Inserting to db", ); - execute_in_chunks( + let ut = execute_in_chunks( conn.clone(), insert_user_transactions_query, user_transactions, - UserTransactionModel::field_count(), - ) - .await?; - execute_in_chunks( - conn.clone(), + get_config_table_chunk_size::( + "user_transactions", + per_table_chunk_sizes, + ), + ); + let is = execute_in_chunks( + conn, insert_signatures_query, signatures, - Signature::field_count(), - ) - .await?; + get_config_table_chunk_size::("signatures", per_table_chunk_sizes), + ); + let (ut_res, is_res) = futures::join!(ut, is); + for res in [ut_res, is_res] { + res?; + } Ok(()) } @@ -133,6 +143,8 @@ impl ProcessorTrait for UserTransactionProcessor { _: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let mut signatures = vec![]; let mut user_transactions = vec![]; for txn in &transactions { @@ -172,8 +184,9 @@ impl ProcessorTrait for UserTransactionProcessor { self.name(), start_version, end_version, - user_transactions, - signatures, + &user_transactions, + &signatures, + &self.per_table_chunk_sizes, ) .await; let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); @@ -183,7 +196,7 @@ impl ProcessorTrait for UserTransactionProcessor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }), Err(e) => { error!( diff --git a/rust/processor/src/utils/counters.rs b/rust/processor/src/utils/counters.rs index 6584dc52..992eb29f 100644 --- a/rust/processor/src/utils/counters.rs +++ b/rust/processor/src/utils/counters.rs @@ -132,7 +132,7 @@ pub static LATEST_PROCESSED_VERSION: Lazy = Lazy::new(|| { register_int_gauge_vec!( "indexer_processor_latest_version", "Latest version a processor has fully consumed", - &["processor_name", "step", "message"] + &["processor_name", "step", "message", "task_index"] ) .unwrap() }); @@ -142,7 +142,17 @@ pub static PROCESSED_BYTES_COUNT: Lazy = Lazy::new(|| { register_int_counter_vec!( "indexer_processor_processed_bytes_count", "Count of bytes processed", - &["processor_name", "step", "message"] + &["processor_name", "step", "message", "task_index"] + ) + .unwrap() +}); + +/// The amount of time that a task spent waiting for a protobuf bundle of transactions +pub static PB_CHANNEL_FETCH_WAIT_TIME_SECS: Lazy = Lazy::new(|| { + register_gauge_vec!( + "indexer_processor_pb_channel_fetch_wait_time_secs", + "Count of bytes processed", + &["processor_name", "task_index"] ) .unwrap() }); @@ -152,7 +162,7 @@ pub static NUM_TRANSACTIONS_PROCESSED_COUNT: Lazy = Lazy::new(|| register_int_counter_vec!( "indexer_processor_num_transactions_processed_count", "Number of transactions processed", - &["processor_name", "step", "message"] + &["processor_name", "step", "message", "task_index"] ) .unwrap() }); @@ -167,22 +177,12 @@ pub static FETCHER_THREAD_CHANNEL_SIZE: Lazy = Lazy::new(|| { .unwrap() }); -/// Overall processing time for multiple (n = number_concurrent_processing_tasks) batch of transactions -pub static MULTI_BATCH_PROCESSING_TIME_IN_SECS: Lazy = Lazy::new(|| { - register_gauge_vec!( - "indexer_processor_multi_batch_processing_time_in_secs", - "Time taken to process multiple batches of transactions", - &["processor_name"] - ) - .unwrap() -}); - -/// Overall processing time for a single batch of transactions +/// Overall processing time for a single batch of transactions (per task) pub static SINGLE_BATCH_PROCESSING_TIME_IN_SECS: Lazy = Lazy::new(|| { register_gauge_vec!( "indexer_processor_single_batch_processing_time_in_secs", "Time taken to process a single batch of transactions", - &["processor_name"] + &["processor_name", "task_index"] ) .unwrap() }); @@ -192,7 +192,7 @@ pub static SINGLE_BATCH_PARSING_TIME_IN_SECS: Lazy = Lazy::new(|| { register_gauge_vec!( "indexer_processor_single_batch_parsing_time_in_secs", "Time taken to parse a single batch of transactions", - &["processor_name"] + &["processor_name", "task_index"] ) .unwrap() }); @@ -202,7 +202,7 @@ pub static SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS: Lazy = Lazy::new(|| register_gauge_vec!( "indexer_processor_single_batch_db_insertion_time_in_secs", "Time taken to insert to DB for a single batch of transactions", - &["processor_name"] + &["processor_name", "task_index"] ) .unwrap() }); @@ -212,7 +212,7 @@ pub static TRANSACTION_UNIX_TIMESTAMP: Lazy = Lazy::new(|| { register_gauge_vec!( "indexer_processor_transaction_unix_timestamp", "Transaction timestamp in unixtime", - &["processor_name", "step", "message"] + &["processor_name", "step", "message", "task_index"] ) .unwrap() }); @@ -230,7 +230,7 @@ pub static GRPC_LATENCY_BY_PROCESSOR_IN_SECS: Lazy = Lazy::new(|| { register_gauge_vec!( "indexer_processor_grpc_latency_in_secs", "GRPC latency observed by processor", - &["processor_name"] + &["processor_name", "task_index"] ) .unwrap() }); diff --git a/rust/processor/src/utils/database.rs b/rust/processor/src/utils/database.rs index fd230b15..9a8e4dac 100644 --- a/rust/processor/src/utils/database.rs +++ b/rust/processor/src/utils/database.rs @@ -3,9 +3,11 @@ //! Database-related functions #![allow(clippy::extra_unused_lifetimes)] + use crate::utils::util::remove_null_bytes; +use ahash::AHashMap; use diesel::{ - pg::Pg, + backend::Backend, query_builder::{AstPass, Query, QueryFragment}, ConnectionResult, QueryResult, }; @@ -17,9 +19,8 @@ use diesel_async::{ }, RunQueryDsl, }; -use diesel_async_migrations::{embed_migrations, EmbeddedMigrations}; +use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; use futures_util::{future::BoxFuture, FutureExt}; -use once_cell::sync::Lazy; use std::{cmp::min, sync::Arc}; pub type MyDbConnection = AsyncPgConnection; @@ -27,9 +28,9 @@ pub type PgPool = Pool; pub type PgDbPool = Arc; pub type PgPoolConnection<'a> = PooledConnection<'a, MyDbConnection>; -pub static MIGRATIONS: Lazy = Lazy::new(|| embed_migrations!()); +pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); -pub const DEFAULT_MAX_POOL_SIZE: u32 = 30; +pub const DEFAULT_MAX_POOL_SIZE: u32 = 150; #[derive(QueryId)] /// Using this will append a where clause at the end of the string upsert function, e.g. @@ -41,19 +42,16 @@ pub struct UpsertFilterLatestTransactionQuery { } // the max is actually u16::MAX but we see that when the size is too big we get an overflow error so reducing it a bit -pub const MAX_DIESEL_PARAM_SIZE: u16 = u16::MAX / 2; +pub const MAX_DIESEL_PARAM_SIZE: usize = (u16::MAX / 2) as usize; -/// Given diesel has a limit of how many parameters can be inserted in a single operation (u16::MAX) -/// we may need to chunk an array of items based on how many columns are in the table. /// This function returns boundaries of chunks in the form of (start_index, end_index) -pub fn get_chunks(num_items_to_insert: usize, column_count: usize) -> Vec<(usize, usize)> { - let max_item_size = MAX_DIESEL_PARAM_SIZE as usize / column_count; - let mut chunk: (usize, usize) = (0, min(num_items_to_insert, max_item_size)); +pub fn get_chunks(num_items_to_insert: usize, chunk_size: usize) -> Vec<(usize, usize)> { + let mut chunk: (usize, usize) = (0, min(num_items_to_insert, chunk_size)); let mut chunks = vec![chunk]; while chunk.1 != num_items_to_insert { chunk = ( - chunk.0 + max_item_size, - min(num_items_to_insert, chunk.1 + max_item_size), + chunk.0 + chunk_size, + min(num_items_to_insert, chunk.1 + chunk_size), ); chunks.push(chunk); } @@ -139,49 +137,38 @@ pub async fn new_db_pool( Ok(Arc::new(pool)) } -/* -pub async fn get_connection(pool: &PgPool) -> Result, PoolError> { - let connection = pool.get().await.unwrap(); - Ok(AsyncConnectionWrapper::from(connection)) -} -*/ - pub async fn execute_in_chunks( conn: PgDbPool, build_query: fn(Vec) -> (U, Option<&'static str>), - items_to_insert: Vec, + items_to_insert: &[T], chunk_size: usize, ) -> Result<(), diesel::result::Error> where - U: QueryFragment + diesel::query_builder::QueryId + Send, - T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone, + U: QueryFragment + diesel::query_builder::QueryId + Send + 'static, + T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + 'static, { let chunks = get_chunks(items_to_insert.len(), chunk_size); - for (start_ind, end_ind) in chunks { - let items = &items_to_insert[start_ind..end_ind]; - - let (query, additional_where_clause) = build_query(items.to_vec()); - match execute_with_better_error(conn.clone(), query, additional_where_clause).await { - Ok(_) => {}, - Err(_) => { - let cleaned_items = clean_data_for_db(items.to_vec(), true); - let (cleaned_query, additional_where_clause) = build_query(cleaned_items); - match execute_with_better_error( - conn.clone(), - cleaned_query, - additional_where_clause, - ) - .await - { - Ok(_) => {}, - Err(e) => { - return Err(e); - }, - } - }, - } + let tasks = chunks + .into_iter() + .map(|(start_ind, end_ind)| { + let items = items_to_insert[start_ind..end_ind].to_vec(); + let conn = conn.clone(); + tokio::spawn(async move { + let (query, additional_where_clause) = build_query(items.clone()); + execute_or_retry_cleaned(conn, build_query, items, query, additional_where_clause) + .await + }) + }) + .collect::>(); + + let results = futures_util::future::try_join_all(tasks) + .await + .expect("Task panicked executing in chunks"); + for res in results { + res? } + Ok(()) } @@ -191,7 +178,7 @@ pub async fn execute_with_better_error( mut additional_where_clause: Option<&'static str>, ) -> QueryResult where - U: QueryFragment + diesel::query_builder::QueryId + Send, + U: QueryFragment + diesel::query_builder::QueryId + Send, { let original_query = diesel::debug_query::(&query).to_string(); // This is needed because if we don't insert any row, then diesel makes a call like this @@ -205,7 +192,6 @@ where }; let debug_string = diesel::debug_query::(&final_query).to_string(); tracing::debug!("Executing query: {:?}", debug_string); - let conn = &mut pool.get().await.map_err(|e| { tracing::warn!("Error getting connection from pool: {:?}", e); diesel::result::Error::DatabaseError( @@ -213,7 +199,46 @@ where Box::new(e.to_string()), ) })?; + let res = final_query.execute(conn).await; + if let Err(ref e) = res { + tracing::warn!("Error running query: {:?}\n{:?}", e, debug_string); + } + res +} +/// Returns the entry for the config hashmap, or the default field count for the insert +/// Given diesel has a limit of how many parameters can be inserted in a single operation (u16::MAX), +/// we default to chunk an array of items based on how many columns are in the table. +pub fn get_config_table_chunk_size( + table_name: &str, + per_table_chunk_sizes: &AHashMap, +) -> usize { + per_table_chunk_sizes + .get(table_name) + .copied() + .unwrap_or_else(|| MAX_DIESEL_PARAM_SIZE / T::field_count()) +} + +pub async fn execute_with_better_error_conn( + conn: &mut MyDbConnection, + query: U, + mut additional_where_clause: Option<&'static str>, +) -> QueryResult +where + U: QueryFragment + diesel::query_builder::QueryId + Send, +{ + let original_query = diesel::debug_query::(&query).to_string(); + // This is needed because if we don't insert any row, then diesel makes a call like this + // SELECT 1 FROM TABLE WHERE 1=0 + if original_query.to_lowercase().contains("where") { + additional_where_clause = None; + } + let final_query = UpsertFilterLatestTransactionQuery { + query, + where_clause: additional_where_clause, + }; + let debug_string = diesel::debug_query::(&final_query).to_string(); + tracing::debug!("Executing query: {:?}", debug_string); let res = final_query.execute(conn).await; if let Err(ref e) = res { tracing::warn!("Error running query: {:?}\n{:?}", e, debug_string); @@ -221,10 +246,37 @@ where res } -pub async fn run_pending_migrations(conn: &mut MyDbConnection) { - MIGRATIONS - .run_pending_migrations(conn) - .await +async fn execute_or_retry_cleaned( + conn: PgDbPool, + build_query: fn(Vec) -> (U, Option<&'static str>), + items: Vec, + query: U, + additional_where_clause: Option<&'static str>, +) -> Result<(), diesel::result::Error> +where + U: QueryFragment + diesel::query_builder::QueryId + Send, + T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone, +{ + match execute_with_better_error(conn.clone(), query, additional_where_clause).await { + Ok(_) => {}, + Err(_) => { + let cleaned_items = clean_data_for_db(items, true); + let (cleaned_query, additional_where_clause) = build_query(cleaned_items); + match execute_with_better_error(conn.clone(), cleaned_query, additional_where_clause) + .await + { + Ok(_) => {}, + Err(e) => { + return Err(e); + }, + } + }, + } + Ok(()) +} + +pub async fn run_pending_migrations(conn: &mut impl MigrationHarness) { + conn.run_pending_migrations(MIGRATIONS) .expect("[Parser] Migrations failed!"); } @@ -235,11 +287,11 @@ impl Query for UpsertFilterLatestTransactionQuery { //impl RunQueryDsl for UpsertFilterLatestTransactionQuery {} -impl QueryFragment for UpsertFilterLatestTransactionQuery +impl QueryFragment for UpsertFilterLatestTransactionQuery where - T: QueryFragment, + T: QueryFragment, { - fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> { + fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, diesel::pg::Pg>) -> QueryResult<()> { self.query.walk_ast(out.reborrow())?; if let Some(w) = self.where_clause { out.push_sql(w); @@ -258,7 +310,7 @@ mod test { assert_eq!(get_chunks(65535, 1), vec![ (0, 32767), (32767, 65534), - (65534, 65535) + (65534, 65535), ]); // 200,000 total items will take 6 buckets. Each bucket can only be 3276 size. assert_eq!(get_chunks(10000, 20), vec![ @@ -268,14 +320,14 @@ mod test { (4914, 6552), (6552, 8190), (8190, 9828), - (9828, 10000) + (9828, 10000), ]); assert_eq!(get_chunks(65535, 2), vec![ (0, 16383), (16383, 32766), (32766, 49149), (49149, 65532), - (65532, 65535) + (65532, 65535), ]); } } diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index bb6b1fa6..b30d53d9 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -18,32 +18,33 @@ use crate::{ utils::{ counters::{ ProcessorStep, GRPC_LATENCY_BY_PROCESSOR_IN_SECS, LATEST_PROCESSED_VERSION, - MULTI_BATCH_PROCESSING_TIME_IN_SECS, NUM_TRANSACTIONS_PROCESSED_COUNT, + NUM_TRANSACTIONS_PROCESSED_COUNT, PB_CHANNEL_FETCH_WAIT_TIME_SECS, PROCESSED_BYTES_COUNT, PROCESSOR_DATA_PROCESSED_LATENCY_IN_SECS, PROCESSOR_DATA_RECEIVED_LATENCY_IN_SECS, PROCESSOR_ERRORS_COUNT, PROCESSOR_INVOCATIONS_COUNT, PROCESSOR_SUCCESSES_COUNT, SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS, SINGLE_BATCH_PARSING_TIME_IN_SECS, SINGLE_BATCH_PROCESSING_TIME_IN_SECS, TRANSACTION_UNIX_TIMESTAMP, }, - database::{execute_with_better_error, new_db_pool, run_pending_migrations, PgDbPool}, + database::{execute_with_better_error_conn, new_db_pool, run_pending_migrations, PgDbPool}, util::{time_diff_since_pb_timestamp_in_secs, timestamp_to_iso, timestamp_to_unixtime}, }, }; +use ahash::AHashMap; use anyhow::{Context, Result}; use aptos_moving_average::MovingAverage; use aptos_protos::transaction::v1::Transaction; -use std::{sync::Arc, time::Duration}; -use tokio::time::timeout; -use tracing::{error, info}; +use diesel::Connection; +use tokio::task::JoinHandle; +use tracing::{debug, error, info}; use url::Url; // this is how large the fetch queue should be. Each bucket should have a max of 80MB or so, so a batch // of 50 means that we could potentially have at least 4.8GB of data in memory at any given time and that we should provision // machines accordingly. -const BUFFER_SIZE: usize = 50; +pub const BUFFER_SIZE: usize = 100; // Consumer thread will wait X seconds before panicking if it doesn't receive any data -const CONSUMER_THREAD_TIMEOUT_IN_SECS: u64 = 60 * 5; -pub(crate) const PROCESSOR_SERVICE_TYPE: &str = "processor"; +pub const CONSUMER_THREAD_TIMEOUT_IN_SECS: u64 = 60 * 5; +pub const PROCESSOR_SERVICE_TYPE: &str = "processor"; #[derive(Clone)] pub struct TransactionsPBResponse { @@ -63,6 +64,9 @@ pub struct Worker { pub ending_version: Option, pub number_concurrent_processing_tasks: usize, pub gap_detection_batch_size: u64, + pub grpc_chain_id: Option, + pub pb_channel_txn_chunk_size: usize, + pub per_table_chunk_sizes: AHashMap, pub enable_verbose_logging: Option, } @@ -78,6 +82,9 @@ impl Worker { number_concurrent_processing_tasks: Option, db_pool_size: Option, gap_detection_batch_size: u64, + // The number of transactions per protobuf batch + pb_channel_txn_chunk_size: usize, + per_table_chunk_sizes: AHashMap, enable_verbose_logging: Option, ) -> Result { let processor_name = processor_config.name(); @@ -108,6 +115,9 @@ impl Worker { auth_token, number_concurrent_processing_tasks, gap_detection_batch_size, + grpc_chain_id: None, + pb_channel_txn_chunk_size, + per_table_chunk_sizes, enable_verbose_logging, }) } @@ -120,7 +130,6 @@ impl Worker { /// 4. We will keep track of the last processed version and monitoring things like TPS pub async fn run(&mut self) { let processor_name = self.processor_config.name(); - let enable_verbose_logging = self.enable_verbose_logging.unwrap_or(false); info!( processor_name = processor_name, service_type = PROCESSOR_SERVICE_TYPE, @@ -148,10 +157,7 @@ impl Worker { 0 }); - let starting_version = match self.starting_version { - None => starting_version_from_db, - Some(version) => version, - }; + let starting_version = self.starting_version.unwrap_or(starting_version_from_db); info!( processor_name = processor_name, @@ -165,13 +171,20 @@ impl Worker { let concurrent_tasks = self.number_concurrent_processing_tasks; - // Build the processor based on the config. - let processor = build_processor(&self.processor_config, self.db_pool.clone()); - let processor = Arc::new(processor); + // get the chain id + let chain_id = crate::grpc_stream::get_chain_id( + self.indexer_grpc_data_service_address.clone(), + self.grpc_http2_config.grpc_http2_ping_interval_in_secs(), + self.grpc_http2_config.grpc_http2_ping_timeout_in_secs(), + self.auth_token.clone(), + processor_name.to_string(), + ) + .await; + self.check_or_update_chain_id(chain_id as i64) + .await + .unwrap(); - // This is the moving average that we use to calculate TPS - let mut ma = MovingAverage::new(10); - let mut batch_start_version = starting_version; + self.grpc_chain_id = Some(chain_id); let ending_version = self.ending_version; let indexer_grpc_data_service_address = self.indexer_grpc_data_service_address.clone(); @@ -179,31 +192,33 @@ impl Worker { self.grpc_http2_config.grpc_http2_ping_interval_in_secs(); let indexer_grpc_http2_ping_timeout = self.grpc_http2_config.grpc_http2_ping_timeout_in_secs(); + let pb_channel_txn_chunk_size = self.pb_channel_txn_chunk_size; + // Create a transaction fetcher thread that will continuously fetch transactions from the GRPC stream // and write into a channel - // The each item will be (chain_id, batch of transactions) + // TODO: change channel size based on number_concurrent_processing_tasks let (tx, receiver) = kanal::bounded_async::(BUFFER_SIZE); let request_ending_version = self.ending_version; let auth_token = self.auth_token.clone(); - tokio::spawn(async move { + let fetcher_task = tokio::spawn(async move { info!( processor_name = processor_name, service_type = PROCESSOR_SERVICE_TYPE, end_version = ending_version, - start_version = batch_start_version, + start_version = starting_version, "[Parser] Starting fetcher thread" ); + crate::grpc_stream::create_fetcher_loop( - tx, - indexer_grpc_data_service_address, + tx.clone(), + indexer_grpc_data_service_address.clone(), indexer_grpc_http2_ping_interval, indexer_grpc_http2_ping_timeout, starting_version, request_ending_version, - auth_token, + auth_token.clone(), processor_name.to_string(), - batch_start_version, - BUFFER_SIZE, + pb_channel_txn_chunk_size, ) .await }); @@ -211,13 +226,17 @@ impl Worker { // Create a gap detector task that will panic if there is a gap in the processing let (gap_detector_sender, gap_detector_receiver) = kanal::bounded_async::(BUFFER_SIZE); - let processor_clone = processor.clone(); let gap_detection_batch_size = self.gap_detection_batch_size; + let processor = build_processor( + &self.processor_config, + self.per_table_chunk_sizes.clone(), + self.db_pool.clone(), + ); tokio::spawn(async move { crate::gap_detector::create_gap_detector_status_tracker_loop( gap_detector_receiver, - processor_clone, - batch_start_version, + processor, + starting_version, gap_detection_batch_size, ) .await; @@ -229,347 +248,134 @@ impl Worker { // 3. We have received either an empty batch or a batch with a gap. We should panic. // 4. We have not received anything in X seconds, we should panic. // 5. If it's the wrong chain, panic. - let mut db_chain_id = None; - loop { - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - stream_address = self.indexer_grpc_data_service_address.as_str(), - "[Parser] Fetching transaction batches from channel", - ); - let txn_channel_fetch_latency = std::time::Instant::now(); - let mut transactions_batches = vec![]; - let mut last_fetched_version = batch_start_version as i64 - 1; - for task_index in 0..concurrent_tasks { - let txn_pb_res = match task_index { - 0 => { - // If we're the first task, we should wait until we get data. If `None`, it means the channel is closed. - let txn_pb_timeout_res = timeout( - Duration::from_secs(CONSUMER_THREAD_TIMEOUT_IN_SECS), - receiver.recv(), - ) - .await; - match txn_pb_timeout_res { - Ok(txn_pb_res) => txn_pb_res.map(Some), - // Outer `Err` is a timeout - Err(_) => { - error!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - stream_address = - self.indexer_grpc_data_service_address.as_str(), - "[Parser] Consumer thread timed out waiting for transactions", - ); - panic!( - "[Parser] Consumer thread timed out waiting for transactions" - ); - }, - } - }, - _ => { - // If we're not the first task, we should poll to see if we get any data. - receiver.try_recv() - }, - }; - let txn_pb = match txn_pb_res { - Ok(txn_pb) => txn_pb, - // This happens when the channel is closed. We should panic. - Err(_e) => { - error!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - stream_address = self.indexer_grpc_data_service_address.as_str(), - "[Parser][T#{}] Channel closed; stream ended.", - task_index - ); - panic!("[Parser][T#{}] Channel closed", task_index); - }, - }; + info!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + stream_address = self.indexer_grpc_data_service_address.as_str(), + concurrent_tasks, + "[Parser] Spawning concurrent parallel processor tasks", + ); - // If we didn't get any data, break; we'll retry later - let txn_pb = match txn_pb { - Some(txn_pb) => txn_pb, - None => break, - }; + let mut processor_tasks = vec![fetcher_task]; + for task_index in 0..concurrent_tasks { + let join_handle = self + .launch_processor_task(task_index, receiver.clone(), gap_detector_sender.clone()) + .await; + processor_tasks.push(join_handle); + } - if let Some(existing_id) = db_chain_id { - if txn_pb.chain_id != existing_id { - error!( - processor_name = processor_name, - stream_address = self.indexer_grpc_data_service_address.as_str(), - chain_id = txn_pb.chain_id, - existing_id = existing_id, - "[Parser] Stream somehow changed chain id!", - ); - panic!("[Parser] Stream somehow changed chain id!"); - } - } else { - db_chain_id = Some( - self.check_or_update_chain_id(txn_pb.chain_id as i64) - .await - .unwrap(), - ); - } - let current_fetched_version = - txn_pb.transactions.as_slice().first().unwrap().version; - if last_fetched_version + 1 != current_fetched_version as i64 { + info!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + stream_address = self.indexer_grpc_data_service_address.as_str(), + concurrent_tasks, + "[Parser] Processor tasks spawned", + ); + + // Await the processor tasks: this is forever + futures::future::try_join_all(processor_tasks) + .await + .expect("[Processor] Processor tasks have died"); + } + + async fn launch_processor_task( + &self, + task_index: usize, + receiver: kanal::AsyncReceiver, + gap_detector_sender: kanal::AsyncSender, + ) -> JoinHandle<()> { + let processor_name = self.processor_config.name(); + let stream_address = self.indexer_grpc_data_service_address.to_string(); + let receiver_clone = receiver.clone(); + let auth_token = self.auth_token.clone(); + + // Build the processor based on the config. + let processor = build_processor( + &self.processor_config, + self.per_table_chunk_sizes.clone(), + self.db_pool.clone(), + ); + + let concurrent_tasks = self.number_concurrent_processing_tasks; + + let chain_id = self + .grpc_chain_id + .expect("GRPC chain ID has not been fetched yet!"); + + tokio::spawn(async move { + let task_index_str = task_index.to_string(); + let step = ProcessorStep::ProcessedBatch.get_step(); + let label = ProcessorStep::ProcessedBatch.get_label(); + let mut ma = MovingAverage::new(3000); + + loop { + let txn_channel_fetch_latency = std::time::Instant::now(); + + let txn_pb = fetch_transactions( + processor_name, + &stream_address, + receiver_clone.clone(), + task_index, + ) + .await; + + let size_in_bytes = txn_pb.size_in_bytes as f64; + let first_txn = txn_pb.transactions.as_slice().first().unwrap(); + let first_txn_version = first_txn.version; + let last_txn = txn_pb.transactions.as_slice().last().unwrap(); + let last_txn_version = last_txn.version; + let start_txn_timestamp = first_txn.timestamp.clone(); + let end_txn_timestamp = last_txn.timestamp.clone(); + let txn_channel_fetch_latency_sec = + txn_channel_fetch_latency.elapsed().as_secs_f64(); + + debug!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + start_version = first_txn_version, + end_version = last_txn_version, + num_of_transactions = (last_txn_version - first_txn_version) as i64 + 1, + size_in_bytes, + task_index, + duration_in_secs = txn_channel_fetch_latency_sec, + tps = (last_txn_version as f64 - first_txn_version as f64) + / txn_channel_fetch_latency_sec, + bytes_per_sec = size_in_bytes / txn_channel_fetch_latency_sec, + "[Parser][T#{}] Successfully fetched transactions from channel.", + task_index + ); + + // Ensure chain_id has not changed + if txn_pb.chain_id != chain_id { error!( - batch_start_version = batch_start_version, - last_fetched_version = last_fetched_version, - current_fetched_version = current_fetched_version, - "[Parser] Received batch with gap from GRPC stream" + processor_name = processor_name, + stream_address = stream_address.as_str(), + chain_id = txn_pb.chain_id, + existing_id = chain_id, + task_index, + "[Parser][T#{}] Stream somehow changed chain id!", + task_index + ); + panic!( + "[Parser][T#{}] Stream somehow changed chain id!", + task_index ); - panic!("[Parser] Received batch with gap from GRPC stream"); } - last_fetched_version = - txn_pb.transactions.as_slice().last().unwrap().version as i64; - transactions_batches.push(txn_pb); - } - let size_in_bytes = transactions_batches - .iter() - .fold(0.0, |acc, txn_batch| acc + txn_batch.size_in_bytes as f64); - let batch_start_txn_timestamp = transactions_batches - .first() - .unwrap() - .transactions - .as_slice() - .first() - .unwrap() - .timestamp - .clone(); - let batch_end_txn_timestamp = transactions_batches - .last() - .unwrap() - .transactions - .as_slice() - .last() - .unwrap() - .timestamp - .clone(); - GRPC_LATENCY_BY_PROCESSOR_IN_SECS - .with_label_values(&[processor_name]) - .set(time_diff_since_pb_timestamp_in_secs( - batch_end_txn_timestamp.as_ref().unwrap(), - )); - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version = batch_start_version, - end_version = last_fetched_version, - num_of_transactions = last_fetched_version - batch_start_version as i64 + 1, - size_in_bytes, - duration_in_secs = txn_channel_fetch_latency.elapsed().as_secs_f64(), - tps = (last_fetched_version as f64 - batch_start_version as f64) - / txn_channel_fetch_latency.elapsed().as_secs_f64(), - bytes_per_sec = size_in_bytes / txn_channel_fetch_latency.elapsed().as_secs_f64(), - "[Parser] Successfully fetched transaction batches from channel." - ); + let processing_time = std::time::Instant::now(); - // Process the transactions in parallel - let mut tasks = vec![]; - for transactions_pb in transactions_batches { - let processor_clone = processor.clone(); - let gap_detector_sender = gap_detector_sender.clone(); - let auth_token = self.auth_token.clone(); - let task = tokio::spawn(async move { - let start_version = transactions_pb - .transactions - .as_slice() - .first() - .unwrap() - .version; - let end_version = transactions_pb - .transactions - .as_slice() - .last() - .unwrap() - .version; - let start_txn_timestamp = transactions_pb - .transactions - .as_slice() - .first() - .unwrap() - .timestamp - .clone(); - let end_txn_timestamp = transactions_pb - .transactions - .as_slice() - .last() - .unwrap() - .timestamp - .clone(); - let txn_time = transactions_pb - .transactions - .as_slice() - .first() - .unwrap() - .timestamp - .clone(); - if let Some(ref t) = txn_time { - PROCESSOR_DATA_RECEIVED_LATENCY_IN_SECS - .with_label_values(&[auth_token.as_str(), processor_name]) - .set(time_diff_since_pb_timestamp_in_secs(t)); - } - PROCESSOR_INVOCATIONS_COUNT - .with_label_values(&[processor_name]) - .inc(); - - if enable_verbose_logging { - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version, - end_version, - size_in_bytes = transactions_pb.size_in_bytes, - "[Parser] Started processing one batch of transactions" - ); - } - - let processing_duration = std::time::Instant::now(); - - let processed_result = processor_clone - .process_transactions( - transactions_pb.transactions, - start_version, - end_version, - db_chain_id, - ) // TODO: Change how we fetch chain_id, ideally can be accessed by processors when they are initiallized (e.g. so they can have a chain_id field set on new() funciton) - .await; - if let Some(ref t) = txn_time { - PROCESSOR_DATA_PROCESSED_LATENCY_IN_SECS - .with_label_values(&[auth_token.as_str(), processor_name]) - .set(time_diff_since_pb_timestamp_in_secs(t)); - } - - let start_txn_timestamp_unix = start_txn_timestamp - .clone() - .map(|t| timestamp_to_unixtime(&t)) - .unwrap_or_default(); - let start_txn_timestamp_iso = start_txn_timestamp - .map(|t| timestamp_to_iso(&t)) - .unwrap_or_default(); - let end_txn_timestamp_iso = end_txn_timestamp - .clone() - .map(|t| timestamp_to_iso(&t)) - .unwrap_or_default(); - - LATEST_PROCESSED_VERSION - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedBatch.get_step(), - ProcessorStep::ProcessedBatch.get_label(), - ]) - .set(end_version as i64); - TRANSACTION_UNIX_TIMESTAMP - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedBatch.get_step(), - ProcessorStep::ProcessedBatch.get_label(), - ]) - .set(start_txn_timestamp_unix); - PROCESSED_BYTES_COUNT - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedBatch.get_step(), - ProcessorStep::ProcessedBatch.get_label(), - ]) - .inc_by(transactions_pb.size_in_bytes); - NUM_TRANSACTIONS_PROCESSED_COUNT - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedBatch.get_step(), - ProcessorStep::ProcessedBatch.get_label(), - ]) - .inc_by(end_version - start_version + 1); - - if let Ok(ref res) = processed_result { - gap_detector_sender - .send(res.clone()) - .await - .expect("[Parser] Failed to send versions to gap detector"); - - // Logging and metrics - SINGLE_BATCH_PROCESSING_TIME_IN_SECS - .with_label_values(&[processor_name]) - .set(processing_duration.elapsed().as_secs_f64()); - SINGLE_BATCH_PARSING_TIME_IN_SECS - .with_label_values(&[processor_name]) - .set(res.processing_duration_in_secs); - SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS - .with_label_values(&[processor_name]) - .set(res.db_insertion_duration_in_secs); - - if enable_verbose_logging { - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version, - end_version, - start_txn_timestamp_iso, - end_txn_timestamp_iso, - size_in_bytes = transactions_pb.size_in_bytes, - duration_in_secs = res.db_insertion_duration_in_secs, - tps = (end_version - start_version) as f64 - / processing_duration.elapsed().as_secs_f64(), - bytes_per_sec = transactions_pb.size_in_bytes as f64 - / processing_duration.elapsed().as_secs_f64(), - "[Parser] DB insertion time of one batch of transactions" - ); - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version, - end_version, - start_txn_timestamp_iso, - end_txn_timestamp_iso, - size_in_bytes = transactions_pb.size_in_bytes, - duration_in_secs = res.processing_duration_in_secs, - tps = (end_version - start_version) as f64 - / processing_duration.elapsed().as_secs_f64(), - bytes_per_sec = transactions_pb.size_in_bytes as f64 - / processing_duration.elapsed().as_secs_f64(), - "[Parser] Parsing time of one batch of transactions" - ); - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version, - end_version, - start_txn_timestamp_iso, - end_txn_timestamp_iso, - num_of_transactions = end_version - start_version + 1, - size_in_bytes = transactions_pb.size_in_bytes, - processing_duration_in_secs = res.processing_duration_in_secs, - db_insertion_duration_in_secs = res.db_insertion_duration_in_secs, - duration_in_secs = processing_duration.elapsed().as_secs_f64(), - tps = (end_version - start_version) as f64 - / processing_duration.elapsed().as_secs_f64(), - bytes_per_sec = transactions_pb.size_in_bytes as f64 - / processing_duration.elapsed().as_secs_f64(), - step = ProcessorStep::ProcessedBatch.get_step(), - "{}", - ProcessorStep::ProcessedBatch.get_label(), - ); - } - } - - processed_result - }); - tasks.push(task); - } - let processing_time = std::time::Instant::now(); - let task_count = tasks.len(); - let batches = match futures::future::try_join_all(tasks).await { - Ok(res) => res, - Err(err) => panic!("[Parser] Error processing transaction batches: {:?}", err), - }; - - // Update states depending on results of the batch processing - let mut processed_versions = vec![]; - for res in batches { - let processed: ProcessingResult = match res { + let res = do_processor( + txn_pb, + &processor, + chain_id, + processor_name, + &auth_token, + false, // enable_verbose_logging + ) + .await; + + let processing_result = match res { Ok(versions) => { PROCESSOR_SUCCESSES_COUNT .with_label_values(&[processor_name]) @@ -579,93 +385,112 @@ impl Worker { Err(e) => { error!( processor_name = processor_name, - stream_address = self.indexer_grpc_data_service_address.to_string(), + stream_address = stream_address.as_str(), error = ?e, - "[Parser] Error processing transactions" + task_index, + "[Parser][T#{}] Error processing transactions", task_index ); PROCESSOR_ERRORS_COUNT .with_label_values(&[processor_name]) .inc(); - panic!(); + panic!( + "[Parser][T#{}] Error processing '{:}' transactions: {:?}", + task_index, processor_name, e + ); }, }; - processed_versions.push(processed); - } - // Log the metrics for processed batch - processed_versions.sort_by(|a, b| a.start_version.cmp(&b.start_version)); - let processed_versions_sorted = processed_versions.clone(); - let batch_start = processed_versions_sorted.first().unwrap().start_version; - let batch_end = processed_versions_sorted.last().unwrap().end_version; - batch_start_version = batch_end + 1; + let processing_time = processing_time.elapsed().as_secs_f64(); - ma.tick_now(batch_end - batch_start + 1); - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version = batch_start, - end_version = batch_end, - start_txn_timestamp_iso = batch_start_txn_timestamp - .clone() - .map(|t| timestamp_to_iso(&t)) - .unwrap_or_default(), - end_txn_timestamp_iso = batch_end_txn_timestamp - .map(|t| timestamp_to_iso(&t)) - .unwrap_or_default(), - num_of_transactions = batch_end - batch_start + 1, - task_count, - size_in_bytes, - duration_in_secs = processing_time.elapsed().as_secs_f64(), - tps = (ma.avg() * 1000.0) as u64, - bytes_per_sec = size_in_bytes / processing_time.elapsed().as_secs_f64(), - step = ProcessorStep::ProcessedMultipleBatches.get_step(), - "{}", - ProcessorStep::ProcessedMultipleBatches.get_label(), - ); - LATEST_PROCESSED_VERSION - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .set(batch_end as i64); - TRANSACTION_UNIX_TIMESTAMP - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .set( - batch_start_txn_timestamp - .map(|t| timestamp_to_unixtime(&t)) - .unwrap_or_default(), + // We've processed things: do some data and metrics + let start_version = processing_result.start_version; + let end_version = processing_result.end_version; + + let start_txn_timestamp_unix = start_txn_timestamp + .as_ref() + .map(timestamp_to_unixtime) + .unwrap_or_default(); + let start_txn_timestamp_iso = start_txn_timestamp + .as_ref() + .map(timestamp_to_iso) + .unwrap_or_default(); + let end_txn_timestamp_iso = end_txn_timestamp + .as_ref() + .map(timestamp_to_iso) + .unwrap_or_default(); + + ma.tick_now(end_version - start_version + 1); + let tps = (ma.avg() * 1000.0) as u64; + + let num_processed = end_version - start_version + 1; + + info!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + start_version = start_version, + end_version = end_version, + start_txn_timestamp_iso = start_txn_timestamp_iso, + end_txn_timestamp_iso = end_txn_timestamp_iso, + num_of_transactions = num_processed, + concurrent_tasks, + task_index, + size_in_bytes, + processing_duration_in_secs = processing_result.processing_duration_in_secs, + db_insertion_duration_in_secs = processing_result.db_insertion_duration_in_secs, + duration_in_secs = processing_time, + tps = tps, + bytes_per_sec = size_in_bytes / processing_time, + step = &step, + "{}", + label, ); - PROCESSED_BYTES_COUNT - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .inc_by(size_in_bytes as u64); - NUM_TRANSACTIONS_PROCESSED_COUNT - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .inc_by(batch_end - batch_start + 1); - MULTI_BATCH_PROCESSING_TIME_IN_SECS - .with_label_values(&[processor_name]) - .set(processing_time.elapsed().as_secs_f64()); - } + + // TODO: For these three, do an atomic thing, or ideally move to an async metrics collector! + GRPC_LATENCY_BY_PROCESSOR_IN_SECS + .with_label_values(&[processor_name, &task_index_str]) + .set(time_diff_since_pb_timestamp_in_secs( + end_txn_timestamp.as_ref().unwrap(), + )); + LATEST_PROCESSED_VERSION + .with_label_values(&[processor_name, step, label, &task_index_str]) + .set(end_version as i64); + TRANSACTION_UNIX_TIMESTAMP + .with_label_values(&[processor_name, step, label, &task_index_str]) + .set(start_txn_timestamp_unix); + + // Single batch metrics + PROCESSED_BYTES_COUNT + .with_label_values(&[processor_name, step, label, &task_index_str]) + .inc_by(size_in_bytes as u64); + NUM_TRANSACTIONS_PROCESSED_COUNT + .with_label_values(&[processor_name, step, label, &task_index_str]) + .inc_by(num_processed); + + SINGLE_BATCH_PROCESSING_TIME_IN_SECS + .with_label_values(&[processor_name, &task_index_str]) + .set(processing_time); + SINGLE_BATCH_PARSING_TIME_IN_SECS + .with_label_values(&[processor_name, &task_index_str]) + .set(processing_result.processing_duration_in_secs); + SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS + .with_label_values(&[processor_name, &task_index_str]) + .set(processing_result.db_insertion_duration_in_secs); + + // Send the result to the gap detector + gap_detector_sender + .send(processing_result) + .await + .expect("[Parser] Failed to send versions to gap detector"); + } + }) } async fn run_migrations(&self) { - let mut conn = self - .db_pool - .get() - .await - .expect("[Parser] Failed to get connection"); + use diesel::pg::PgConnection; + + println!("Running migrations: {:?}", self.postgres_connection_string); + let mut conn = + PgConnection::establish(&self.postgres_connection_string).expect("migrations failed!"); run_pending_migrations(&mut conn).await; } @@ -708,8 +533,8 @@ impl Worker { chain_id = grpc_chain_id, "[Parser] Adding chain id to db, continue to index..." ); - execute_with_better_error( - self.db_pool.clone(), + execute_with_better_error_conn( + &mut conn, diesel::insert_into(ledger_infos::table) .values(LedgerInfo { chain_id: grpc_chain_id, @@ -725,36 +550,143 @@ impl Worker { } } +async fn fetch_transactions( + processor_name: &str, + stream_address: &str, + receiver: kanal::AsyncReceiver, + task_index: usize, +) -> TransactionsPBResponse { + let pb_channel_fetch_time = std::time::Instant::now(); + let txn_pb_res = receiver.recv().await; + // Track how much time this task spent waiting for a pb bundle + PB_CHANNEL_FETCH_WAIT_TIME_SECS + .with_label_values(&[processor_name, &task_index.to_string()]) + .set(pb_channel_fetch_time.elapsed().as_secs_f64()); + + match txn_pb_res { + Ok(txn_pb) => txn_pb, + Err(_e) => { + error!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + stream_address = stream_address, + "[Parser][T#{}] Consumer thread timed out waiting for transactions", + task_index + ); + panic!( + "[Parser][T#{}] Consumer thread timed out waiting for transactions", + task_index + ); + }, + } +} + +pub async fn do_processor( + transactions_pb: TransactionsPBResponse, + processor: &Processor, + db_chain_id: u64, + processor_name: &str, + auth_token: &str, + enable_verbose_logging: bool, +) -> Result { + let transactions_pb_slice = transactions_pb.transactions.as_slice(); + let first_txn = transactions_pb_slice.first().unwrap(); + + let last_txn = transactions_pb_slice.last().unwrap(); + + let start_version = first_txn.version; + let end_version = last_txn.version; + let txn_time = first_txn.timestamp.clone(); + + if let Some(ref t) = txn_time { + PROCESSOR_DATA_RECEIVED_LATENCY_IN_SECS + .with_label_values(&[auth_token, processor_name]) + .set(time_diff_since_pb_timestamp_in_secs(t)); + } + PROCESSOR_INVOCATIONS_COUNT + .with_label_values(&[processor_name]) + .inc(); + + if enable_verbose_logging { + info!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + start_version, + end_version, + size_in_bytes = transactions_pb.size_in_bytes, + "[Parser] Started processing one batch of transactions" + ); + } + + let processed_result = processor + .process_transactions( + transactions_pb.transactions, + start_version, + end_version, + Some(db_chain_id), + ) + .await; + + if let Some(ref t) = txn_time { + PROCESSOR_DATA_PROCESSED_LATENCY_IN_SECS + .with_label_values(&[auth_token, processor_name]) + .set(time_diff_since_pb_timestamp_in_secs(t)); + } + + processed_result +} + /// Given a config and a db pool, build a concrete instance of a processor. // As time goes on there might be other things that we need to provide to certain // processors. As that happens we can revist whether this function (which tends to // couple processors together based on their args) makes sense. -pub fn build_processor(config: &ProcessorConfig, db_pool: PgDbPool) -> Processor { +// TODO: This is not particularly easily extensible; better to refactor to use a trait, and then share one extensible config model (allowing for only one arity) +pub fn build_processor( + config: &ProcessorConfig, + per_table_chunk_sizes: AHashMap, + db_pool: PgDbPool, +) -> Processor { match config { - ProcessorConfig::AccountTransactionsProcessor => { - Processor::from(AccountTransactionsProcessor::new(db_pool)) + ProcessorConfig::AccountTransactionsProcessor => Processor::from( + AccountTransactionsProcessor::new(db_pool, per_table_chunk_sizes), + ), + ProcessorConfig::AnsProcessor(config) => Processor::from(AnsProcessor::new( + db_pool, + per_table_chunk_sizes, + config.clone(), + )), + ProcessorConfig::CoinProcessor => { + Processor::from(CoinProcessor::new(db_pool, per_table_chunk_sizes)) + }, + ProcessorConfig::DefaultProcessor => { + Processor::from(DefaultProcessor::new(db_pool, per_table_chunk_sizes)) }, - ProcessorConfig::AnsProcessor(config) => { - Processor::from(AnsProcessor::new(db_pool, config.clone())) + ProcessorConfig::EventsProcessor => { + Processor::from(EventsProcessor::new(db_pool, per_table_chunk_sizes)) }, - ProcessorConfig::CoinProcessor => Processor::from(CoinProcessor::new(db_pool)), - ProcessorConfig::DefaultProcessor => Processor::from(DefaultProcessor::new(db_pool)), - ProcessorConfig::EventsProcessor => Processor::from(EventsProcessor::new(db_pool)), ProcessorConfig::FungibleAssetProcessor => { - Processor::from(FungibleAssetProcessor::new(db_pool)) + Processor::from(FungibleAssetProcessor::new(db_pool, per_table_chunk_sizes)) }, ProcessorConfig::MonitoringProcessor => Processor::from(MonitoringProcessor::new(db_pool)), ProcessorConfig::NftMetadataProcessor(config) => { Processor::from(NftMetadataProcessor::new(db_pool, config.clone())) }, - ProcessorConfig::ObjectsProcessor => Processor::from(ObjectsProcessor::new(db_pool)), - ProcessorConfig::StakeProcessor => Processor::from(StakeProcessor::new(db_pool)), - ProcessorConfig::TokenProcessor(config) => { - Processor::from(TokenProcessor::new(db_pool, config.clone())) + ProcessorConfig::ObjectsProcessor => { + Processor::from(ObjectsProcessor::new(db_pool, per_table_chunk_sizes)) + }, + ProcessorConfig::StakeProcessor => { + Processor::from(StakeProcessor::new(db_pool, per_table_chunk_sizes)) }, - ProcessorConfig::TokenV2Processor => Processor::from(TokenV2Processor::new(db_pool)), - ProcessorConfig::UserTransactionProcessor => { - Processor::from(UserTransactionProcessor::new(db_pool)) + ProcessorConfig::TokenProcessor(config) => Processor::from(TokenProcessor::new( + db_pool, + per_table_chunk_sizes, + config.clone(), + )), + ProcessorConfig::TokenV2Processor => { + Processor::from(TokenV2Processor::new(db_pool, per_table_chunk_sizes)) }, + ProcessorConfig::UserTransactionProcessor => Processor::from( + UserTransactionProcessor::new(db_pool, per_table_chunk_sizes), + ), } }