diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 0c9a839c..efc04446 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -535,6 +535,7 @@ dependencies = [ "num-bigint", "num-integer", "num-traits", + "pq-sys", "serde_json", ] @@ -1332,6 +1333,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.8" @@ -1884,6 +1894,15 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "pq-sys" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31c0052426df997c0cbd30789eb44ca097e3541717a7b8fa36b1c464ee7edebd" +dependencies = [ + "vcpkg", +] + [[package]] name = "proc-macro2" version = "1.0.76" @@ -1919,6 +1938,7 @@ dependencies = [ "google-cloud-googleapis", "google-cloud-pubsub", "hex", + "itertools 0.12.1", "kanal", "native-tls", "num_cpus", @@ -1983,7 +2003,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" dependencies = [ "anyhow", - "itertools", + "itertools 0.10.5", "proc-macro2", "quote", "syn 1.0.109", @@ -1996,7 +2016,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" dependencies = [ "anyhow", - "itertools", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.48", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index fe95cb6d..6d400036 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -38,6 +38,7 @@ diesel = { version = "2.1", features = [ "chrono", "postgres_backend", "numeric", + "postgres", "serde_json", ] } diesel-async = { version = "0.4", features = ["postgres", "bb8", "tokio"] } @@ -55,6 +56,7 @@ cloud-storage = { version = "0.11.1", features = ["global-client"] } google-cloud-googleapis = "0.10.0" google-cloud-pubsub = "0.18.0" hex = "0.4.3" +itertools = "0.12.1" kanal = { version = "0.1.0-pre8", features = ["async"] } once_cell = "1.10.0" num_cpus = "1.16.0" diff --git a/rust/processor/Cargo.toml b/rust/processor/Cargo.toml index e13086c0..74fb894c 100644 --- a/rust/processor/Cargo.toml +++ b/rust/processor/Cargo.toml @@ -35,6 +35,7 @@ gcloud-sdk = { workspace = true } google-cloud-googleapis = { workspace = true } google-cloud-pubsub = { workspace = true } hex = { workspace = true } +itertools = { workspace = true } kanal = { workspace = true } num_cpus = { workspace = true } once_cell = { workspace = true } diff --git a/rust/processor/src/config.rs b/rust/processor/src/config.rs index 399c0542..316d072b 100644 --- a/rust/processor/src/config.rs +++ b/rust/processor/src/config.rs @@ -4,6 +4,7 @@ use crate::{ gap_detector::DEFAULT_GAP_DETECTION_BATCH_SIZE, processors::ProcessorConfig, worker::Worker, }; +use ahash::AHashMap; use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; use server_framework::RunnableConfig; @@ -20,12 +21,23 @@ pub struct IndexerGrpcProcessorConfig { #[serde(flatten)] pub grpc_http2_config: IndexerGrpcHttp2Config, pub auth_token: String, + // Version to start indexing from pub starting_version: Option, + // Version to end indexing at pub ending_version: Option, + // Number of tasks waiting to pull transaction batches from the channel and process them pub number_concurrent_processing_tasks: Option, + // Size of the pool for writes/reads to the DB. Limits maximum number of queries in flight pub db_pool_size: Option, + // Maximum number of batches "missing" before we assume we have an issue with gaps and abort #[serde(default = "IndexerGrpcProcessorConfig::default_gap_detection_batch_size")] pub gap_detection_batch_size: u64, + // Number of protobuff transactions to send per chunk to the processor tasks + #[serde(default = "IndexerGrpcProcessorConfig::default_pb_channel_txn_chunk_size")] + pub pb_channel_txn_chunk_size: usize, + // Number of rows to insert, per chunk, for each DB table. Default per table is ~32,768 (2**16/2) + #[serde(default = "AHashMap::new")] + pub per_table_chunk_sizes: AHashMap, pub enable_verbose_logging: Option, } @@ -33,6 +45,12 @@ impl IndexerGrpcProcessorConfig { pub const fn default_gap_detection_batch_size() -> u64 { DEFAULT_GAP_DETECTION_BATCH_SIZE } + + /// Make the default very large on purpose so that by default it's not chunked + /// This prevents any unexpected changes in behavior + pub const fn default_pb_channel_txn_chunk_size() -> usize { + 100_000 + } } #[async_trait::async_trait] @@ -49,6 +67,8 @@ impl RunnableConfig for IndexerGrpcProcessorConfig { self.number_concurrent_processing_tasks, self.db_pool_size, self.gap_detection_batch_size, + self.pb_channel_txn_chunk_size, + self.per_table_chunk_sizes.clone(), self.enable_verbose_logging, ) .await diff --git a/rust/processor/src/gap_detector.rs b/rust/processor/src/gap_detector.rs index c193938d..74b5771b 100644 --- a/rust/processor/src/gap_detector.rs +++ b/rust/processor/src/gap_detector.rs @@ -7,11 +7,10 @@ use crate::{ }; use ahash::AHashMap; use kanal::AsyncReceiver; -use std::sync::Arc; use tracing::{error, info}; // Number of batches processed before gap detected -pub const DEFAULT_GAP_DETECTION_BATCH_SIZE: u64 = 50; +pub const DEFAULT_GAP_DETECTION_BATCH_SIZE: u64 = 100; // Number of seconds between each processor status update const UPDATE_PROCESSOR_STATUS_SECS: u64 = 1; @@ -70,7 +69,7 @@ impl GapDetector { pub async fn create_gap_detector_status_tracker_loop( gap_detector_receiver: AsyncReceiver, - processor: Arc, + processor: Processor, starting_version: u64, gap_detection_batch_size: u64, ) { @@ -117,7 +116,7 @@ pub async fn create_gap_detector_status_tracker_loop( processor .update_last_processed_version( res_last_success_batch.end_version, - res_last_success_batch.last_transaction_timstamp.clone(), + res_last_success_batch.last_transaction_timestamp.clone(), ) .await .unwrap(); @@ -132,7 +131,7 @@ pub async fn create_gap_detector_status_tracker_loop( error = ?e, "[Parser] Gap detector task has panicked" ); - panic!(); + panic!("[Parser] Gap detector task has panicked: {:?}", e); }, } } @@ -152,7 +151,7 @@ mod test { let result = ProcessingResult { start_version: 100 + i * 100, end_version: 199 + i * 100, - last_transaction_timstamp: None, + last_transaction_timestamp: None, processing_duration_in_secs: 0.0, db_insertion_duration_in_secs: 0.0, }; @@ -167,7 +166,7 @@ mod test { .process_versions(ProcessingResult { start_version: 0, end_version: 99, - last_transaction_timstamp: None, + last_transaction_timestamp: None, processing_duration_in_secs: 0.0, db_insertion_duration_in_secs: 0.0, }) diff --git a/rust/processor/src/grpc_stream.rs b/rust/processor/src/grpc_stream.rs index 8f86545e..2a68029a 100644 --- a/rust/processor/src/grpc_stream.rs +++ b/rust/processor/src/grpc_stream.rs @@ -8,15 +8,20 @@ use crate::{ }, worker::TransactionsPBResponse, }; -use aptos_protos::indexer::v1::{ - raw_data_client::RawDataClient, GetTransactionsRequest, TransactionsResponse, +use aptos_moving_average::MovingAverage; +use aptos_protos::{ + indexer::v1::{raw_data_client::RawDataClient, GetTransactionsRequest, TransactionsResponse}, + transaction::v1::Transaction, }; +use bigdecimal::Zero; use futures_util::StreamExt; +use itertools::Itertools; use kanal::AsyncSender; use prost::Message; use std::time::Duration; +use tokio::time::timeout; use tonic::{Response, Streaming}; -use tracing::{error, info}; +use tracing::{debug, error, info}; use url::Url; /// GRPC request metadata key for the token ID. @@ -27,7 +32,7 @@ const GRPC_REQUEST_NAME_HEADER: &str = "x-aptos-request-name"; /// GRPC connection id const GRPC_CONNECTION_ID: &str = "x-aptos-connection-id"; /// We will try to reconnect to GRPC 5 times in case upstream connection is being updated -pub const RECONNECTION_MAX_RETRIES: u64 = 65; +pub const RECONNECTION_MAX_RETRIES: u64 = 5; /// 256MB pub const MAX_RESPONSE_SIZE: usize = 1024 * 1024 * 256; @@ -99,7 +104,39 @@ pub async fn get_stream( end_version = ending_version, "[Parser] Setting up GRPC client" ); - let mut rpc_client = match RawDataClient::connect(channel).await { + + // TODO: move this to a config file + // Retry this connection a few times before giving up + let mut connect_retries = 0; + let connect_res = loop { + let res = timeout( + Duration::from_secs(5), + RawDataClient::connect(channel.clone()), + ) + .await; + match res { + Ok(client) => break Ok(client), + Err(e) => { + error!( + processor_name = processor_name, + service_type = crate::worker::PROCESSOR_SERVICE_TYPE, + stream_address = indexer_grpc_data_service_address.to_string(), + start_version = starting_version, + end_version = ending_version, + retries = connect_retries, + error = ?e, + "[Parser] Error connecting to GRPC client" + ); + connect_retries += 1; + if connect_retries >= RECONNECTION_MAX_RETRIES { + break Err(e); + } + }, + } + } + .expect("[Parser] Timeout connecting to GRPC server"); + + let mut rpc_client = match connect_res { Ok(client) => client .accept_compressed(tonic::codec::CompressionEncoding::Gzip) .send_compressed(tonic::codec::CompressionEncoding::Gzip) @@ -212,12 +249,9 @@ pub async fn create_fetcher_loop( request_ending_version: Option, auth_token: String, processor_name: String, - batch_start_version: u64, - buffer_size: usize, + // The number of transactions per protobuf batch + pb_channel_txn_chunk_size: usize, ) { - let mut grpc_channel_recv_latency = std::time::Instant::now(); - let mut next_version_to_fetch = batch_start_version; - let mut reconnection_retries = 0; info!( processor_name = processor_name, service_type = crate::worker::PROCESSOR_SERVICE_TYPE, @@ -251,8 +285,13 @@ pub async fn create_fetcher_loop( "[Parser] Successfully connected to GRPC stream", ); - let mut last_fetched_version = batch_start_version as i64 - 1; - let mut batch_start_version = batch_start_version; + let mut grpc_channel_recv_latency = std::time::Instant::now(); + let mut next_version_to_fetch = starting_version; + let mut reconnection_retries = 0; + let mut last_fetched_version = starting_version as i64 - 1; + let mut fetch_ma = MovingAverage::new(3000); + let mut send_ma = MovingAverage::new(3000); + loop { let is_success = match resp_stream.next().await { Some(Ok(r)) => { @@ -262,10 +301,17 @@ pub async fn create_fetcher_loop( r.transactions.as_slice().first().unwrap().timestamp.clone(); let end_version = r.transactions.as_slice().last().unwrap().version; let end_txn_timestamp = r.transactions.as_slice().last().unwrap().timestamp.clone(); + next_version_to_fetch = end_version + 1; + let size_in_bytes = r.encoded_len() as u64; let chain_id: u64 = r.chain_id.expect("[Parser] Chain Id doesn't exist."); + let num_txns = r.transactions.len(); + let duration_in_secs = grpc_channel_recv_latency.elapsed().as_secs_f64(); + fetch_ma.tick_now(num_txns as u64); + let step = ProcessorStep::ReceivedTxnsFromGrpc.get_step(); + let label = ProcessorStep::ReceivedTxnsFromGrpc.get_label(); info!( processor_name = processor_name, service_type = crate::worker::PROCESSOR_SERVICE_TYPE, @@ -281,110 +327,126 @@ pub async fn create_fetcher_loop( .map(|t| timestamp_to_iso(&t)) .unwrap_or_default(), num_of_transactions = end_version - start_version + 1, - channel_size = buffer_size - txn_sender.capacity(), - size_in_bytes = r.encoded_len() as f64, - duration_in_secs = grpc_channel_recv_latency.elapsed().as_secs_f64(), - tps = (r.transactions.len() as f64 - / grpc_channel_recv_latency.elapsed().as_secs_f64()) - as u64, - bytes_per_sec = - r.encoded_len() as f64 / grpc_channel_recv_latency.elapsed().as_secs_f64(), - step = ProcessorStep::ReceivedTxnsFromGrpc.get_step(), + channel_size = txn_sender.len(), + size_in_bytes, + duration_in_secs, + tps = fetch_ma.avg() as u64, + bytes_per_sec = size_in_bytes as f64 / duration_in_secs, + step, "{}", - ProcessorStep::ReceivedTxnsFromGrpc.get_label(), + label, ); - let current_fetched_version = start_version; - if last_fetched_version + 1 != current_fetched_version as i64 { + if last_fetched_version + 1 != start_version as i64 { error!( - batch_start_version = batch_start_version, - last_fetched_version = last_fetched_version, - current_fetched_version = current_fetched_version, + batch_start_version = last_fetched_version + 1, + last_fetched_version, + current_fetched_version = start_version, "[Parser] Received batch with gap from GRPC stream" ); panic!("[Parser] Received batch with gap from GRPC stream"); } last_fetched_version = end_version as i64; - batch_start_version = (last_fetched_version + 1) as u64; LATEST_PROCESSED_VERSION - .with_label_values(&[ - &processor_name, - ProcessorStep::ReceivedTxnsFromGrpc.get_step(), - ProcessorStep::ReceivedTxnsFromGrpc.get_label(), - ]) + .with_label_values(&[&processor_name, step, label, "-"]) .set(end_version as i64); TRANSACTION_UNIX_TIMESTAMP - .with_label_values(&[ - &processor_name, - ProcessorStep::ReceivedTxnsFromGrpc.get_step(), - ProcessorStep::ReceivedTxnsFromGrpc.get_label(), - ]) + .with_label_values(&[&processor_name, step, label, "-"]) .set( start_txn_timestamp .map(|t| timestamp_to_unixtime(&t)) .unwrap_or_default(), ); PROCESSED_BYTES_COUNT - .with_label_values(&[ - &processor_name, - ProcessorStep::ReceivedTxnsFromGrpc.get_step(), - ProcessorStep::ReceivedTxnsFromGrpc.get_label(), - ]) + .with_label_values(&[&processor_name, step, label, "-"]) .inc_by(size_in_bytes); NUM_TRANSACTIONS_PROCESSED_COUNT - .with_label_values(&[ - &processor_name, - ProcessorStep::ReceivedTxnsFromGrpc.get_step(), - ProcessorStep::ReceivedTxnsFromGrpc.get_label(), - ]) + .with_label_values(&[&processor_name, step, label, "-"]) .inc_by(end_version - start_version + 1); let txn_channel_send_latency = std::time::Instant::now(); - let txn_pb = TransactionsPBResponse { - transactions: r.transactions, - chain_id, - size_in_bytes, - }; - let size_in_bytes = txn_pb.size_in_bytes; - let duration_in_secs = txn_channel_send_latency.elapsed().as_secs_f64(); - let tps = (txn_pb.transactions.len() as f64 - / txn_channel_send_latency.elapsed().as_secs_f64()) - as u64; - let bytes_per_sec = - txn_pb.size_in_bytes as f64 / txn_channel_send_latency.elapsed().as_secs_f64(); - match txn_sender.send(txn_pb).await { - Ok(()) => {}, - Err(e) => { - error!( - processor_name = processor_name, - stream_address = indexer_grpc_data_service_address.to_string(), - connection_id, - channel_size = buffer_size - txn_sender.capacity(), - error = ?e, - "[Parser] Error sending GRPC response to channel." - ); - panic!("[Parser] Error sending GRPC response to channel.") - }, + //potentially break txn_pb into many `TransactionsPBResponse` that are each 100 txns max in size + if num_txns < pb_channel_txn_chunk_size { + // We only need to send one; avoid the chunk/clone + let txn_pb = TransactionsPBResponse { + transactions: r.transactions, + chain_id, + size_in_bytes, + }; + + match txn_sender.send(txn_pb).await { + Ok(()) => {}, + Err(e) => { + error!( + processor_name = processor_name, + stream_address = indexer_grpc_data_service_address.to_string(), + connection_id, + error = ?e, + "[Parser] Error sending GRPC response to channel." + ); + panic!("[Parser] Error sending GRPC response to channel.") + }, + } + } else { + // We are breaking down a big batch into small batches; this involves an iterator + let average_size_in_bytes = size_in_bytes / num_txns as u64; + + let pb_txn_chunks: Vec> = r + .transactions + .into_iter() + .chunks(pb_channel_txn_chunk_size) + .into_iter() + .map(|chunk| chunk.collect()) + .collect(); + for txns in pb_txn_chunks { + let size_in_bytes = average_size_in_bytes * txns.len() as u64; + let txn_pb = TransactionsPBResponse { + transactions: txns, + chain_id, + size_in_bytes, + }; + + match txn_sender.send(txn_pb).await { + Ok(()) => {}, + Err(e) => { + error!( + processor_name = processor_name, + stream_address = indexer_grpc_data_service_address.to_string(), + connection_id, + error = ?e, + "[Parser] Error sending GRPC response to channel." + ); + panic!("[Parser] Error sending GRPC response to channel.") + }, + } + } } - info!( + + let duration_in_secs = txn_channel_send_latency.elapsed().as_secs_f64(); + send_ma.tick_now(num_txns as u64); + let tps = send_ma.avg() as u64; + let bytes_per_sec = size_in_bytes as f64 / duration_in_secs; + + let channel_size = txn_sender.len(); + debug!( processor_name = processor_name, service_type = crate::worker::PROCESSOR_SERVICE_TYPE, stream_address = indexer_grpc_data_service_address.to_string(), connection_id, - start_version = start_version, - end_version = end_version, - channel_size = buffer_size - txn_sender.capacity(), - size_in_bytes = size_in_bytes, - duration_in_secs = duration_in_secs, - bytes_per_sec = bytes_per_sec, - tps = tps, + start_version, + end_version, + channel_size, + size_in_bytes, + duration_in_secs, + bytes_per_sec, + tps, "[Parser] Successfully sent transactions to channel." ); FETCHER_THREAD_CHANNEL_SIZE .with_label_values(&[&processor_name]) - .set((buffer_size - txn_sender.capacity()) as i64); + .set(channel_size as i64); grpc_channel_recv_latency = std::time::Instant::now(); true }, @@ -432,19 +494,19 @@ pub async fn create_fetcher_loop( ); // Wait for the fetched transactions to finish processing before closing the channel loop { - let channel_capacity = txn_sender.capacity(); + let channel_size = txn_sender.len(); info!( processor_name = processor_name, service_type = crate::worker::PROCESSOR_SERVICE_TYPE, stream_address = indexer_grpc_data_service_address.to_string(), connection_id, - channel_size = buffer_size - channel_capacity, + channel_size, "[Parser] Waiting for channel to be empty" ); - if channel_capacity == buffer_size { + if channel_size.is_zero() { break; } - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; } info!( processor_name = processor_name, @@ -462,7 +524,7 @@ pub async fn create_fetcher_loop( // Sleep for 100ms between reconnect tries // TODO: Turn this into exponential backoff - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; if reconnection_retries >= RECONNECTION_MAX_RETRIES { error!( diff --git a/rust/processor/src/models/token_v2_models/v2_token_datas.rs b/rust/processor/src/models/token_v2_models/v2_token_datas.rs index cd040e3d..b7c62ceb 100644 --- a/rust/processor/src/models/token_v2_models/v2_token_datas.rs +++ b/rust/processor/src/models/token_v2_models/v2_token_datas.rs @@ -94,7 +94,7 @@ impl TokenDataV2 { } token_properties = object_metadata .property_map - .clone() + .as_ref() .map(|m| m.inner.clone()) .unwrap_or(token_properties); // In aggregator V2 name is now derived from a separate struct diff --git a/rust/processor/src/models/token_v2_models/v2_token_ownerships.rs b/rust/processor/src/models/token_v2_models/v2_token_ownerships.rs index db6a50f0..425c651d 100644 --- a/rust/processor/src/models/token_v2_models/v2_token_ownerships.rs +++ b/rust/processor/src/models/token_v2_models/v2_token_ownerships.rs @@ -88,7 +88,7 @@ pub struct NFTOwnershipV2 { } /// Need a separate struct for queryable because we don't want to define the inserted_at column (letting DB fill) -#[derive(Debug, Identifiable, Queryable, Clone)] +#[derive(Clone, Debug, Identifiable, Queryable)] #[diesel(primary_key(token_data_id, property_version_v1, owner_address, storage_id))] #[diesel(table_name = current_token_ownerships_v2)] pub struct CurrentTokenOwnershipV2Query { diff --git a/rust/processor/src/processors/account_transactions_processor.rs b/rust/processor/src/processors/account_transactions_processor.rs index 60771ca4..1b984b17 100644 --- a/rust/processor/src/processors/account_transactions_processor.rs +++ b/rust/processor/src/processors/account_transactions_processor.rs @@ -5,24 +5,27 @@ use super::{ProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ models::account_transaction_models::account_transactions::AccountTransaction, schema, - utils::database::{execute_in_chunks, PgDbPool}, + utils::database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool}, }; use ahash::AHashMap; use anyhow::bail; use aptos_protos::transaction::v1::Transaction; use async_trait::async_trait; use diesel::{pg::Pg, query_builder::QueryFragment}; -use field_count::FieldCount; use std::fmt::Debug; use tracing::error; pub struct AccountTransactionsProcessor { connection_pool: PgDbPool, + per_table_chunk_sizes: AHashMap, } impl AccountTransactionsProcessor { - pub fn new(connection_pool: PgDbPool) -> Self { - Self { connection_pool } + pub fn new(connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap) -> Self { + Self { + connection_pool, + per_table_chunk_sizes, + } } } @@ -42,7 +45,8 @@ async fn insert_to_db( name: &'static str, start_version: u64, end_version: u64, - account_transactions: Vec, + account_transactions: &[AccountTransaction], + per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -54,7 +58,10 @@ async fn insert_to_db( conn.clone(), insert_account_transactions_query, account_transactions, - AccountTransaction::field_count(), + get_config_table_chunk_size::( + "account_transactions", + per_table_chunk_sizes, + ), ) .await?; Ok(()) @@ -88,9 +95,11 @@ impl ProcessorTrait for AccountTransactionsProcessor { transactions: Vec, start_version: u64, end_version: u64, - _: Option, + _db_chain_id: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let mut account_transactions = AHashMap::new(); for txn in &transactions { @@ -113,7 +122,8 @@ impl ProcessorTrait for AccountTransactionsProcessor { self.name(), start_version, end_version, - account_transactions, + &account_transactions, + &self.per_table_chunk_sizes, ) .await; @@ -124,7 +134,7 @@ impl ProcessorTrait for AccountTransactionsProcessor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }), Err(err) => { error!( diff --git a/rust/processor/src/processors/ans_processor.rs b/rust/processor/src/processors/ans_processor.rs index 31e64377..a448ac3c 100644 --- a/rust/processor/src/processors/ans_processor.rs +++ b/rust/processor/src/processors/ans_processor.rs @@ -13,7 +13,7 @@ use crate::{ schema, utils::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, - database::{execute_in_chunks, PgDbPool}, + database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool}, util::standardize_address, }, }; @@ -28,7 +28,6 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; -use field_count::FieldCount; use serde::{Deserialize, Serialize}; use std::fmt::Debug; use tracing::error; @@ -44,10 +43,15 @@ pub struct AnsProcessorConfig { pub struct AnsProcessor { connection_pool: PgDbPool, config: AnsProcessorConfig, + per_table_chunk_sizes: AHashMap, } impl AnsProcessor { - pub fn new(connection_pool: PgDbPool, config: AnsProcessorConfig) -> Self { + pub fn new( + connection_pool: PgDbPool, + per_table_chunk_sizes: AHashMap, + config: AnsProcessorConfig, + ) -> Self { tracing::info!( ans_v1_primary_names_table_handle = config.ans_v1_primary_names_table_handle, ans_v1_name_records_table_handle = config.ans_v1_name_records_table_handle, @@ -56,6 +60,7 @@ impl AnsProcessor { ); Self { connection_pool, + per_table_chunk_sizes, config, } } @@ -77,14 +82,15 @@ async fn insert_to_db( name: &'static str, start_version: u64, end_version: u64, - current_ans_lookups: Vec, - ans_lookups: Vec, - current_ans_primary_names: Vec, - ans_primary_names: Vec, - current_ans_lookups_v2: Vec, - ans_lookups_v2: Vec, - current_ans_primary_names_v2: Vec, - ans_primary_names_v2: Vec, + current_ans_lookups: &[CurrentAnsLookup], + ans_lookups: &[AnsLookup], + current_ans_primary_names: &[CurrentAnsPrimaryName], + ans_primary_names: &[AnsPrimaryName], + current_ans_lookups_v2: &[CurrentAnsLookupV2], + ans_lookups_v2: &[AnsLookupV2], + current_ans_primary_names_v2: &[CurrentAnsPrimaryNameV2], + ans_primary_names_v2: &[AnsPrimaryNameV2], + per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -92,62 +98,86 @@ async fn insert_to_db( end_version = end_version, "Inserting to db", ); - execute_in_chunks( + let cal = execute_in_chunks( conn.clone(), insert_current_ans_lookups_query, current_ans_lookups, - CurrentAnsLookup::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_ans_lookup", + per_table_chunk_sizes, + ), + ); + let al = execute_in_chunks( conn.clone(), insert_ans_lookups_query, ans_lookups, - AnsLookup::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("ans_lookup", per_table_chunk_sizes), + ); + let capn = execute_in_chunks( conn.clone(), insert_current_ans_primary_names_query, current_ans_primary_names, - CurrentAnsPrimaryName::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_ans_primary_name", + per_table_chunk_sizes, + ), + ); + let apn = execute_in_chunks( conn.clone(), insert_ans_primary_names_query, ans_primary_names, - AnsPrimaryName::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("ans_primary_name", per_table_chunk_sizes), + ); + let cal_v2 = execute_in_chunks( conn.clone(), insert_current_ans_lookups_v2_query, current_ans_lookups_v2, - CurrentAnsLookupV2::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_ans_lookup_v2", + per_table_chunk_sizes, + ), + ); + let al_v2 = execute_in_chunks( conn.clone(), insert_ans_lookups_v2_query, ans_lookups_v2, - AnsLookupV2::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("ans_lookup_v2", per_table_chunk_sizes), + ); + let capn_v2 = execute_in_chunks( conn.clone(), insert_current_ans_primary_names_v2_query, current_ans_primary_names_v2, - CurrentAnsPrimaryNameV2::field_count(), - ) - .await?; - execute_in_chunks( - conn.clone(), + get_config_table_chunk_size::( + "current_ans_primary_name_v2", + per_table_chunk_sizes, + ), + ); + let apn_v2 = execute_in_chunks( + conn, insert_ans_primary_names_v2_query, ans_primary_names_v2, - AnsPrimaryNameV2::field_count(), - ) - .await?; + get_config_table_chunk_size::( + "ans_primary_name_v2", + per_table_chunk_sizes, + ), + ); + + let (cal_res, al_res, capn_res, apn_res, cal_v2_res, al_v2_res, capn_v2_res, apn_v2_res) = + tokio::join!(cal, al, capn, apn, cal_v2, al_v2, capn_v2, apn_v2); + + for res in vec![ + cal_res, + al_res, + capn_res, + apn_res, + cal_v2_res, + al_v2_res, + capn_v2_res, + apn_v2_res, + ] { + res?; + } + Ok(()) } @@ -333,6 +363,7 @@ impl ProcessorTrait for AnsProcessor { _db_chain_id: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); let ( all_current_ans_lookups, @@ -359,14 +390,15 @@ impl ProcessorTrait for AnsProcessor { self.name(), start_version, end_version, - all_current_ans_lookups, - all_ans_lookups, - all_current_ans_primary_names, - all_ans_primary_names, - all_current_ans_lookups_v2, - all_ans_lookups_v2, - all_current_ans_primary_names_v2, - all_ans_primary_names_v2, + &all_current_ans_lookups, + &all_ans_lookups, + &all_current_ans_primary_names, + &all_ans_primary_names, + &all_current_ans_lookups_v2, + &all_ans_lookups_v2, + &all_current_ans_primary_names_v2, + &all_ans_primary_names_v2, + &self.per_table_chunk_sizes, ) .await; @@ -378,7 +410,7 @@ impl ProcessorTrait for AnsProcessor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }), Err(e) => { error!( @@ -431,7 +463,7 @@ fn parse_ans( .with_label_values(&["AnsProcessor"]) .inc(); tracing::warn!( - transaction_version = transaction.version, + transaction_version = txn_version, "Transaction data doesn't exist", ); continue; diff --git a/rust/processor/src/processors/coin_processor.rs b/rust/processor/src/processors/coin_processor.rs index 55ba7ada..99928e5b 100644 --- a/rust/processor/src/processors/coin_processor.rs +++ b/rust/processor/src/processors/coin_processor.rs @@ -13,10 +13,10 @@ use crate::{ fungible_asset_models::v2_fungible_asset_activities::CurrentCoinBalancePK, }, schema, - utils::database::{execute_in_chunks, PgDbPool}, + utils::database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool}, }; use ahash::AHashMap; -use anyhow::bail; +use anyhow::{bail, Context}; use aptos_protos::transaction::v1::Transaction; use async_trait::async_trait; use diesel::{ @@ -24,7 +24,6 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; -use field_count::FieldCount; use std::fmt::Debug; use tracing::error; @@ -32,11 +31,15 @@ pub const APTOS_COIN_TYPE_STR: &str = "0x1::aptos_coin::AptosCoin"; pub struct CoinProcessor { connection_pool: PgDbPool, + per_table_chunk_sizes: AHashMap, } impl CoinProcessor { - pub fn new(connection_pool: PgDbPool) -> Self { - Self { connection_pool } + pub fn new(connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap) -> Self { + Self { + connection_pool, + per_table_chunk_sizes, + } } } @@ -56,11 +59,12 @@ async fn insert_to_db( name: &'static str, start_version: u64, end_version: u64, - coin_activities: Vec, - coin_infos: Vec, - coin_balances: Vec, - current_coin_balances: Vec, - coin_supply: Vec, + coin_activities: &[CoinActivity], + coin_infos: &[CoinInfo], + coin_balances: &[CoinBalance], + current_coin_balances: &[CurrentCoinBalance], + coin_supply: &[CoinSupply], + per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -69,42 +73,44 @@ async fn insert_to_db( "Inserting to db", ); - execute_in_chunks( + let ca = execute_in_chunks( conn.clone(), insert_coin_activities_query, coin_activities, - CoinActivity::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("coin_activities", per_table_chunk_sizes), + ); + let ci = execute_in_chunks( conn.clone(), insert_coin_infos_query, coin_infos, - CoinInfo::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("coin_infos", per_table_chunk_sizes), + ); + let cb = execute_in_chunks( conn.clone(), insert_coin_balances_query, coin_balances, - CoinBalance::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("coin_balances", per_table_chunk_sizes), + ); + let ccb = execute_in_chunks( conn.clone(), insert_current_coin_balances_query, current_coin_balances, - CurrentCoinBalance::field_count(), - ) - .await?; - execute_in_chunks( - conn.clone(), + get_config_table_chunk_size::( + "current_coin_balances", + per_table_chunk_sizes, + ), + ); + let cs = execute_in_chunks( + conn, inset_coin_supply_query, coin_supply, - CoinSupply::field_count(), - ) - .await?; + get_config_table_chunk_size::("coin_supply", per_table_chunk_sizes), + ); + let (ca_res, ci_res, cb_res, ccb_res, cs_res) = tokio::join!(ca, ci, cb, ccb, cs); + for res in [ca_res, ci_res, cb_res, ccb_res, cs_res] { + res?; + } Ok(()) } @@ -233,41 +239,60 @@ impl ProcessorTrait for CoinProcessor { _: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); - let mut all_coin_activities = vec![]; - let mut all_coin_balances = vec![]; - let mut all_coin_infos: AHashMap = AHashMap::new(); - let mut all_current_coin_balances: AHashMap = - AHashMap::new(); - let mut all_coin_supply = vec![]; + let ( + all_coin_activities, + all_coin_infos, + all_coin_balances, + all_current_coin_balances, + all_coin_supply, + ) = tokio::task::spawn_blocking(move || { + let mut all_coin_activities = vec![]; + let mut all_coin_balances = vec![]; + let mut all_coin_infos: AHashMap = AHashMap::new(); + let mut all_current_coin_balances: AHashMap = + AHashMap::new(); + let mut all_coin_supply = vec![]; - for txn in &transactions { - let ( - mut coin_activities, - mut coin_balances, - coin_infos, - current_coin_balances, - mut coin_supply, - ) = CoinActivity::from_transaction(txn); - all_coin_activities.append(&mut coin_activities); - all_coin_balances.append(&mut coin_balances); - all_coin_supply.append(&mut coin_supply); - // For coin infos, we only want to keep the first version, so insert only if key is not present already - for (key, value) in coin_infos { - all_coin_infos.entry(key).or_insert(value); + for txn in &transactions { + let ( + mut coin_activities, + mut coin_balances, + coin_infos, + current_coin_balances, + mut coin_supply, + ) = CoinActivity::from_transaction(txn); + all_coin_activities.append(&mut coin_activities); + all_coin_balances.append(&mut coin_balances); + all_coin_supply.append(&mut coin_supply); + // For coin infos, we only want to keep the first version, so insert only if key is not present already + for (key, value) in coin_infos { + all_coin_infos.entry(key).or_insert(value); + } + all_current_coin_balances.extend(current_coin_balances); } - all_current_coin_balances.extend(current_coin_balances); - } - let mut all_coin_infos = all_coin_infos.into_values().collect::>(); - let mut all_current_coin_balances = all_current_coin_balances - .into_values() - .collect::>(); + let mut all_coin_infos = all_coin_infos.into_values().collect::>(); + let mut all_current_coin_balances = all_current_coin_balances + .into_values() + .collect::>(); - // Sort by PK - all_coin_infos.sort_by(|a, b| a.coin_type.cmp(&b.coin_type)); - all_current_coin_balances.sort_by(|a, b| { - (&a.owner_address, &a.coin_type).cmp(&(&b.owner_address, &b.coin_type)) - }); + // Sort by PK + all_coin_infos.sort_by(|a, b| a.coin_type.cmp(&b.coin_type)); + all_current_coin_balances.sort_by(|a, b| { + (&a.owner_address, &a.coin_type).cmp(&(&b.owner_address, &b.coin_type)) + }); + + ( + all_coin_activities, + all_coin_infos, + all_coin_balances, + all_current_coin_balances, + all_coin_supply, + ) + }) + .await + .context("spawn_blocking for CoinProcessor thread failed")?; let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); @@ -277,11 +302,12 @@ impl ProcessorTrait for CoinProcessor { self.name(), start_version, end_version, - all_coin_activities, - all_coin_infos, - all_coin_balances, - all_current_coin_balances, - all_coin_supply, + &all_coin_activities, + &all_coin_infos, + &all_coin_balances, + &all_current_coin_balances, + &all_coin_supply, + &self.per_table_chunk_sizes, ) .await; @@ -293,7 +319,7 @@ impl ProcessorTrait for CoinProcessor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }), Err(err) => { error!( diff --git a/rust/processor/src/processors/default_processor.rs b/rust/processor/src/processors/default_processor.rs index 72aab1ef..b5c6bb07 100644 --- a/rust/processor/src/processors/default_processor.rs +++ b/rust/processor/src/processors/default_processor.rs @@ -12,7 +12,7 @@ use crate::{ write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel}, }, schema, - utils::database::{execute_in_chunks, PgDbPool}, + utils::database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool}, }; use ahash::AHashMap; use anyhow::bail; @@ -23,17 +23,21 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; -use field_count::FieldCount; use std::fmt::Debug; +use tokio::join; use tracing::error; pub struct DefaultProcessor { connection_pool: PgDbPool, + per_table_chunk_sizes: AHashMap, } impl DefaultProcessor { - pub fn new(connection_pool: PgDbPool) -> Self { - Self { connection_pool } + pub fn new(connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap) -> Self { + Self { + connection_pool, + per_table_chunk_sizes, + } } } @@ -53,16 +57,17 @@ async fn insert_to_db( name: &'static str, start_version: u64, end_version: u64, - txns: Vec, - block_metadata_transactions: Vec, - wscs: Vec, + txns: &[TransactionModel], + block_metadata_transactions: &[BlockMetadataTransactionModel], + wscs: &[WriteSetChangeModel], (move_modules, move_resources, table_items, current_table_items, table_metadata): ( - Vec, - Vec, - Vec, - Vec, - Vec, + &[MoveModule], + &[MoveResource], + &[TableItem], + &[CurrentTableItem], + &[TableMetadata], ), + per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -71,62 +76,76 @@ async fn insert_to_db( "Inserting to db", ); - execute_in_chunks( + let txns_res = execute_in_chunks( conn.clone(), insert_transactions_query, txns, - TransactionModel::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("transactions", per_table_chunk_sizes), + ); + let bmt_res = execute_in_chunks( conn.clone(), insert_block_metadata_transactions_query, block_metadata_transactions, - BlockMetadataTransactionModel::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "block_metadata_transactions", + per_table_chunk_sizes, + ), + ); + let wst_res = execute_in_chunks( conn.clone(), insert_write_set_changes_query, wscs, - WriteSetChangeModel::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "write_set_changes", + per_table_chunk_sizes, + ), + ); + let mm_res = execute_in_chunks( conn.clone(), insert_move_modules_query, move_modules, - MoveModule::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("move_modules", per_table_chunk_sizes), + ); + + let mr_res = execute_in_chunks( conn.clone(), insert_move_resources_query, move_resources, - MoveResource::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("move_resources", per_table_chunk_sizes), + ); + + let ti_res = execute_in_chunks( conn.clone(), insert_table_items_query, table_items, - TableItem::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("table_items", per_table_chunk_sizes), + ); + + let cti_res = execute_in_chunks( conn.clone(), insert_current_table_items_query, current_table_items, - CurrentTableItem::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_table_items", + per_table_chunk_sizes, + ), + ); + + let tm_res = execute_in_chunks( conn.clone(), insert_table_metadata_query, table_metadata, - TableMetadata::field_count(), - ) - .await?; + get_config_table_chunk_size::("table_metadatas", per_table_chunk_sizes), + ); + + let (txns_res, bmt_res, wst_res, mm_res, mr_res, ti_res, cti_res, tm_res) = + join!(txns_res, bmt_res, wst_res, mm_res, mr_res, ti_res, cti_res, tm_res); + + for res in [ + txns_res, bmt_res, wst_res, mm_res, mr_res, ti_res, cti_res, tm_res, + ] { + res?; + } Ok(()) } @@ -293,47 +312,72 @@ impl ProcessorTrait for DefaultProcessor { _: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); - let (txns, block_metadata_txns, write_set_changes, wsc_details) = - TransactionModel::from_transactions(&transactions); - - let mut block_metadata_transactions = vec![]; - for block_metadata_txn in block_metadata_txns { - block_metadata_transactions.push(block_metadata_txn.clone()); - } - let mut move_modules = vec![]; - let mut move_resources = vec![]; - let mut table_items = vec![]; - let mut current_table_items = AHashMap::new(); - let mut table_metadata = AHashMap::new(); - for detail in wsc_details { - match detail { - WriteSetChangeDetail::Module(module) => move_modules.push(module.clone()), - WriteSetChangeDetail::Resource(resource) => move_resources.push(resource.clone()), - WriteSetChangeDetail::Table(item, current_item, metadata) => { - table_items.push(item.clone()); - current_table_items.insert( - ( - current_item.table_handle.clone(), - current_item.key_hash.clone(), - ), - current_item.clone(), - ); - if let Some(meta) = metadata { - table_metadata.insert(meta.handle.clone(), meta.clone()); - } - }, + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let ( + txns, + block_metadata_transactions, + write_set_changes, + (move_modules, move_resources, table_items, current_table_items, table_metadata), + ) = tokio::task::spawn_blocking(move || { + let (txns, block_metadata_txns, write_set_changes, wsc_details) = + TransactionModel::from_transactions(&transactions); + let mut block_metadata_transactions = vec![]; + for block_metadata_txn in block_metadata_txns { + block_metadata_transactions.push(block_metadata_txn.clone()); + } + let mut move_modules = vec![]; + let mut move_resources = vec![]; + let mut table_items = vec![]; + let mut current_table_items = AHashMap::new(); + let mut table_metadata = AHashMap::new(); + for detail in wsc_details { + match detail { + WriteSetChangeDetail::Module(module) => move_modules.push(module.clone()), + WriteSetChangeDetail::Resource(resource) => { + move_resources.push(resource.clone()) + }, + WriteSetChangeDetail::Table(item, current_item, metadata) => { + table_items.push(item.clone()); + current_table_items.insert( + ( + current_item.table_handle.clone(), + current_item.key_hash.clone(), + ), + current_item.clone(), + ); + if let Some(meta) = metadata { + table_metadata.insert(meta.handle.clone(), meta.clone()); + } + }, + } } - } - // Getting list of values and sorting by pk in order to avoid postgres deadlock since we're doing multi threaded db writes - let mut current_table_items = current_table_items - .into_values() - .collect::>(); - let mut table_metadata = table_metadata.into_values().collect::>(); - // Sort by PK - current_table_items - .sort_by(|a, b| (&a.table_handle, &a.key_hash).cmp(&(&b.table_handle, &b.key_hash))); - table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle)); + // Getting list of values and sorting by pk in order to avoid postgres deadlock since we're doing multi threaded db writes + let mut current_table_items = current_table_items + .into_values() + .collect::>(); + let mut table_metadata = table_metadata.into_values().collect::>(); + // Sort by PK + current_table_items.sort_by(|a, b| { + (&a.table_handle, &a.key_hash).cmp(&(&b.table_handle, &b.key_hash)) + }); + table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle)); + + ( + txns, + block_metadata_transactions, + write_set_changes, + ( + move_modules, + move_resources, + table_items, + current_table_items, + table_metadata, + ), + ) + }) + .await + .expect("Failed to spawn_blocking for TransactionModel::from_transactions"); let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); @@ -343,16 +387,17 @@ impl ProcessorTrait for DefaultProcessor { self.name(), start_version, end_version, - txns, - block_metadata_transactions, - write_set_changes, + &txns, + &block_metadata_transactions, + &write_set_changes, ( - move_modules, - move_resources, - table_items, - current_table_items, - table_metadata, + &move_modules, + &move_resources, + &table_items, + ¤t_table_items, + &table_metadata, ), + &self.per_table_chunk_sizes, ) .await; @@ -363,7 +408,7 @@ impl ProcessorTrait for DefaultProcessor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }), Err(e) => { error!( diff --git a/rust/processor/src/processors/events_processor.rs b/rust/processor/src/processors/events_processor.rs index 2bc567c9..c382f82e 100644 --- a/rust/processor/src/processors/events_processor.rs +++ b/rust/processor/src/processors/events_processor.rs @@ -7,9 +7,10 @@ use crate::{ schema, utils::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, - database::{execute_in_chunks, PgDbPool}, + database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool}, }, }; +use ahash::AHashMap; use anyhow::bail; use aptos_protos::transaction::v1::{transaction::TxnData, Transaction}; use async_trait::async_trait; @@ -18,17 +19,20 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; -use field_count::FieldCount; use std::fmt::Debug; use tracing::error; pub struct EventsProcessor { connection_pool: PgDbPool, + per_table_chunk_sizes: AHashMap, } impl EventsProcessor { - pub fn new(connection_pool: PgDbPool) -> Self { - Self { connection_pool } + pub fn new(connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap) -> Self { + Self { + connection_pool, + per_table_chunk_sizes, + } } } @@ -48,7 +52,8 @@ async fn insert_to_db( name: &'static str, start_version: u64, end_version: u64, - events: Vec, + events: &[EventModel], + per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -56,7 +61,13 @@ async fn insert_to_db( end_version = end_version, "Inserting to db", ); - execute_in_chunks(conn, insert_events_query, events, EventModel::field_count()).await?; + execute_in_chunks( + conn, + insert_events_query, + events, + get_config_table_chunk_size::("events", per_table_chunk_sizes), + ) + .await?; Ok(()) } @@ -94,6 +105,8 @@ impl ProcessorTrait for EventsProcessor { _: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let mut events = vec![]; for txn in &transactions { let txn_version = txn.version as i64; @@ -131,7 +144,8 @@ impl ProcessorTrait for EventsProcessor { self.name(), start_version, end_version, - events, + &events, + &self.per_table_chunk_sizes, ) .await; @@ -142,7 +156,7 @@ impl ProcessorTrait for EventsProcessor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }), Err(e) => { error!( diff --git a/rust/processor/src/processors/fungible_asset_processor.rs b/rust/processor/src/processors/fungible_asset_processor.rs index 73256aa1..0db7fd99 100644 --- a/rust/processor/src/processors/fungible_asset_processor.rs +++ b/rust/processor/src/processors/fungible_asset_processor.rs @@ -20,7 +20,7 @@ use crate::{ schema, utils::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, - database::{execute_in_chunks, PgDbPool, PgPoolConnection}, + database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool, PgPoolConnection}, util::{get_entry_function_from_user_request, standardize_address}, }, }; @@ -34,7 +34,6 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; -use field_count::FieldCount; use std::fmt::Debug; use tracing::error; @@ -42,11 +41,15 @@ pub const APTOS_COIN_TYPE_STR: &str = "0x1::aptos_coin::AptosCoin"; pub struct FungibleAssetProcessor { connection_pool: PgDbPool, + per_table_chunk_sizes: AHashMap, } impl FungibleAssetProcessor { - pub fn new(connection_pool: PgDbPool) -> Self { - Self { connection_pool } + pub fn new(connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap) -> Self { + Self { + connection_pool, + per_table_chunk_sizes, + } } } @@ -66,10 +69,11 @@ async fn insert_to_db( name: &'static str, start_version: u64, end_version: u64, - fungible_asset_activities: Vec, - fungible_asset_metadata: Vec, - fungible_asset_balances: Vec, - current_fungible_asset_balances: Vec, + fungible_asset_activities: &[FungibleAssetActivity], + fungible_asset_metadata: &[FungibleAssetMetadataModel], + fungible_asset_balances: &[FungibleAssetBalance], + current_fungible_asset_balances: &[CurrentFungibleAssetBalance], + per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -78,34 +82,46 @@ async fn insert_to_db( "Inserting to db", ); - execute_in_chunks( + let faa = execute_in_chunks( conn.clone(), insert_fungible_asset_activities_query, fungible_asset_activities, - FungibleAssetActivity::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "fungible_asset_activities", + per_table_chunk_sizes, + ), + ); + let fam = execute_in_chunks( conn.clone(), insert_fungible_asset_metadata_query, fungible_asset_metadata, - FungibleAssetMetadataModel::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "fungible_asset_metadata", + per_table_chunk_sizes, + ), + ); + let fab = execute_in_chunks( conn.clone(), insert_fungible_asset_balances_query, fungible_asset_balances, - FungibleAssetBalance::field_count(), - ) - .await?; - execute_in_chunks( - conn.clone(), + get_config_table_chunk_size::( + "fungible_asset_balances", + per_table_chunk_sizes, + ), + ); + let cfab = execute_in_chunks( + conn, insert_current_fungible_asset_balances_query, current_fungible_asset_balances, - CurrentFungibleAssetBalance::field_count(), - ) - .await?; + get_config_table_chunk_size::( + "current_fungible_asset_balances", + per_table_chunk_sizes, + ), + ); + let (faa_res, fam_res, fab_res, cfab_res) = tokio::join!(faa, fam, fab, cfab); + for res in [faa_res, fam_res, fab_res, cfab_res] { + res?; + } Ok(()) } @@ -225,6 +241,8 @@ impl ProcessorTrait for FungibleAssetProcessor { _: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let mut conn = self.get_conn().await; let ( fungible_asset_activities, @@ -241,10 +259,11 @@ impl ProcessorTrait for FungibleAssetProcessor { self.name(), start_version, end_version, - fungible_asset_activities, - fungible_asset_metadata, - fungible_asset_balances, - current_fungible_asset_balances, + &fungible_asset_activities, + &fungible_asset_metadata, + &fungible_asset_balances, + ¤t_fungible_asset_balances, + &self.per_table_chunk_sizes, ) .await; let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); @@ -254,7 +273,7 @@ impl ProcessorTrait for FungibleAssetProcessor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }), Err(err) => { error!( diff --git a/rust/processor/src/processors/mod.rs b/rust/processor/src/processors/mod.rs index c42fe518..63a9e1b6 100644 --- a/rust/processor/src/processors/mod.rs +++ b/rust/processor/src/processors/mod.rs @@ -47,18 +47,16 @@ use crate::{ }; use aptos_protos::transaction::v1::Transaction as ProtoTransaction; use async_trait::async_trait; -use diesel::{upsert::excluded, ExpressionMethods}; +use diesel::{pg::upsert::excluded, ExpressionMethods}; use enum_dispatch::enum_dispatch; use serde::{Deserialize, Serialize}; use std::fmt::Debug; -type StartVersion = u64; -type EndVersion = u64; #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct ProcessingResult { - pub start_version: StartVersion, - pub end_version: EndVersion, - pub last_transaction_timstamp: Option, + pub start_version: u64, + pub end_version: u64, + pub last_transaction_timestamp: Option, pub processing_duration_in_secs: f64, pub db_insertion_duration_in_secs: f64, } diff --git a/rust/processor/src/processors/monitoring_processor.rs b/rust/processor/src/processors/monitoring_processor.rs index 45d7421c..b8245388 100644 --- a/rust/processor/src/processors/monitoring_processor.rs +++ b/rust/processor/src/processors/monitoring_processor.rs @@ -46,7 +46,7 @@ impl ProcessorTrait for MonitoringProcessor { end_version, processing_duration_in_secs: 0.0, db_insertion_duration_in_secs: 0.0, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp: transactions.last().unwrap().timestamp.clone(), }) } diff --git a/rust/processor/src/processors/nft_metadata_processor.rs b/rust/processor/src/processors/nft_metadata_processor.rs index bc094a43..d65c1ce8 100644 --- a/rust/processor/src/processors/nft_metadata_processor.rs +++ b/rust/processor/src/processors/nft_metadata_processor.rs @@ -93,6 +93,8 @@ impl ProcessorTrait for NftMetadataProcessor { db_chain_id: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let mut conn = self.get_conn().await; // First get all token related table metadata from the batch of transactions. This is in case @@ -178,7 +180,7 @@ impl ProcessorTrait for NftMetadataProcessor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }) } diff --git a/rust/processor/src/processors/objects_processor.rs b/rust/processor/src/processors/objects_processor.rs index 81a31e0e..ad4c2536 100644 --- a/rust/processor/src/processors/objects_processor.rs +++ b/rust/processor/src/processors/objects_processor.rs @@ -4,7 +4,10 @@ use super::{ProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ models::{ - fungible_asset_models::v2_fungible_asset_utils::FungibleAssetStore, + fungible_asset_models::{ + v2_fungible_asset_activities::FungibleAssetActivity, + v2_fungible_asset_utils::FungibleAssetStore, + }, object_models::{ v2_object_utils::{ ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, @@ -15,7 +18,7 @@ use crate::{ }, schema, utils::{ - database::{execute_in_chunks, PgDbPool}, + database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool}, util::standardize_address, }, }; @@ -28,17 +31,20 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; -use field_count::FieldCount; use std::fmt::Debug; use tracing::error; pub struct ObjectsProcessor { connection_pool: PgDbPool, + per_table_chunk_sizes: AHashMap, } impl ObjectsProcessor { - pub fn new(connection_pool: PgDbPool) -> Self { - Self { connection_pool } + pub fn new(connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap) -> Self { + Self { + connection_pool, + per_table_chunk_sizes, + } } } @@ -58,7 +64,8 @@ async fn insert_to_db( name: &'static str, start_version: u64, end_version: u64, - (objects, current_objects): (Vec, Vec), + (objects, current_objects): (&[Object], &[CurrentObject]), + per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -67,20 +74,25 @@ async fn insert_to_db( "Inserting to db", ); - execute_in_chunks( + let io = execute_in_chunks( conn.clone(), insert_objects_query, objects, - Object::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("objects", per_table_chunk_sizes), + ); + let co = execute_in_chunks( conn, insert_current_objects_query, current_objects, - CurrentObject::field_count(), - ) - .await?; + get_config_table_chunk_size::( + "current_objects", + per_table_chunk_sizes, + ), + ); + let (io_res, co_res) = tokio::join!(io, co); + for res in [io_res, co_res] { + res?; + } Ok(()) } @@ -149,6 +161,8 @@ impl ProcessorTrait for ObjectsProcessor { _: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let mut conn = self.get_conn().await; // Moving object handling here because we need a single object @@ -274,7 +288,8 @@ impl ProcessorTrait for ObjectsProcessor { self.name(), start_version, end_version, - (all_objects, all_current_objects), + (&all_objects, &all_current_objects), + &self.per_table_chunk_sizes, ) .await; let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); @@ -285,7 +300,7 @@ impl ProcessorTrait for ObjectsProcessor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }), Err(e) => { error!( diff --git a/rust/processor/src/processors/stake_processor.rs b/rust/processor/src/processors/stake_processor.rs index 9b585914..774d3b05 100644 --- a/rust/processor/src/processors/stake_processor.rs +++ b/rust/processor/src/processors/stake_processor.rs @@ -18,7 +18,7 @@ use crate::{ }, schema, utils::{ - database::{execute_in_chunks, PgDbPool}, + database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool}, util::{parse_timestamp, standardize_address}, }, }; @@ -31,17 +31,20 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; -use field_count::FieldCount; use std::fmt::Debug; use tracing::error; pub struct StakeProcessor { connection_pool: PgDbPool, + per_table_chunk_sizes: AHashMap, } impl StakeProcessor { - pub fn new(connection_pool: PgDbPool) -> Self { - Self { connection_pool } + pub fn new(connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap) -> Self { + Self { + connection_pool, + per_table_chunk_sizes, + } } } @@ -61,15 +64,16 @@ async fn insert_to_db( name: &'static str, start_version: u64, end_version: u64, - current_stake_pool_voters: Vec, - proposal_votes: Vec, - delegator_actvities: Vec, - delegator_balances: Vec, - current_delegator_balances: Vec, - delegator_pools: Vec, - delegator_pool_balances: Vec, - current_delegator_pool_balances: Vec, - current_delegated_voter: Vec, + current_stake_pool_voters: &[CurrentStakingPoolVoter], + proposal_votes: &[ProposalVote], + delegator_actvities: &[DelegatedStakingActivity], + delegator_balances: &[DelegatorBalance], + current_delegator_balances: &[CurrentDelegatorBalance], + delegator_pools: &[DelegatorPool], + delegator_pool_balances: &[DelegatorPoolBalance], + current_delegator_pool_balances: &[CurrentDelegatorPoolBalance], + current_delegated_voter: &[CurrentDelegatedVoter], + per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -78,69 +82,92 @@ async fn insert_to_db( "Inserting to db", ); - execute_in_chunks( + let cspv = execute_in_chunks( conn.clone(), insert_current_stake_pool_voter_query, current_stake_pool_voters, - CurrentStakingPoolVoter::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_staking_pool_voter", + per_table_chunk_sizes, + ), + ); + let pv = execute_in_chunks( conn.clone(), insert_proposal_votes_query, proposal_votes, - ProposalVote::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("proposal_votes", per_table_chunk_sizes), + ); + let da = execute_in_chunks( conn.clone(), insert_delegator_activities_query, delegator_actvities, - DelegatedStakingActivity::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "delegated_staking_activities", + per_table_chunk_sizes, + ), + ); + let db = execute_in_chunks( conn.clone(), insert_delegator_balances_query, delegator_balances, - DelegatorBalance::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "delegator_balances", + per_table_chunk_sizes, + ), + ); + let cdb = execute_in_chunks( conn.clone(), insert_current_delegator_balances_query, current_delegator_balances, - CurrentDelegatorBalance::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_delegator_balances", + per_table_chunk_sizes, + ), + ); + let dp = execute_in_chunks( conn.clone(), insert_delegator_pools_query, delegator_pools, - DelegatorPool::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "delegated_staking_pools", + per_table_chunk_sizes, + ), + ); + let dpb = execute_in_chunks( conn.clone(), insert_delegator_pool_balances_query, delegator_pool_balances, - DelegatorPoolBalance::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "delegated_staking_pool_balances", + per_table_chunk_sizes, + ), + ); + let cdpb = execute_in_chunks( conn.clone(), insert_current_delegator_pool_balances_query, current_delegator_pool_balances, - CurrentDelegatorPoolBalance::field_count(), - ) - .await?; - execute_in_chunks( - conn.clone(), + get_config_table_chunk_size::( + "current_delegated_staking_pool_balances", + per_table_chunk_sizes, + ), + ); + let cdv = execute_in_chunks( + conn, insert_current_delegated_voter_query, current_delegated_voter, - CurrentDelegatedVoter::field_count(), - ) - .await?; + get_config_table_chunk_size::( + "current_delegated_voter", + per_table_chunk_sizes, + ), + ); + + let (cspv_res, pv_res, da_res, db_res, cdb_res, dp_res, dpb_res, cdpb_res, cdv_res) = + futures::join!(cspv, pv, da, db, cdb, dp, dpb, cdpb, cdv); + for res in [ + cspv_res, pv_res, da_res, db_res, cdb_res, dp_res, dpb_res, cdpb_res, cdv_res, + ] { + res?; + } Ok(()) } @@ -351,6 +378,8 @@ impl ProcessorTrait for StakeProcessor { _: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let mut conn = self.get_conn().await; let mut all_current_stake_pool_voters: StakingPoolVoterMap = AHashMap::new(); @@ -510,15 +539,16 @@ impl ProcessorTrait for StakeProcessor { self.name(), start_version, end_version, - all_current_stake_pool_voters, - all_proposal_votes, - all_delegator_activities, - all_delegator_balances, - all_current_delegator_balances, - all_delegator_pools, - all_delegator_pool_balances, - all_current_delegator_pool_balances, - all_current_delegated_voter, + &all_current_stake_pool_voters, + &all_proposal_votes, + &all_delegator_activities, + &all_delegator_balances, + &all_current_delegator_balances, + &all_delegator_pools, + &all_delegator_pool_balances, + &all_current_delegator_pool_balances, + &all_current_delegated_voter, + &self.per_table_chunk_sizes, ) .await; let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); @@ -528,7 +558,7 @@ impl ProcessorTrait for StakeProcessor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }), Err(e) => { error!( diff --git a/rust/processor/src/processors/token_processor.rs b/rust/processor/src/processors/token_processor.rs index a42241a3..c383d5ff 100644 --- a/rust/processor/src/processors/token_processor.rs +++ b/rust/processor/src/processors/token_processor.rs @@ -16,7 +16,7 @@ use crate::{ }, }, schema, - utils::database::{execute_in_chunks, PgDbPool}, + utils::database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool}, }; use ahash::AHashMap; use anyhow::bail; @@ -27,7 +27,6 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; -use field_count::FieldCount; use serde::{Deserialize, Serialize}; use std::fmt::Debug; use tracing::error; @@ -41,13 +40,19 @@ pub struct TokenProcessorConfig { pub struct TokenProcessor { connection_pool: PgDbPool, config: TokenProcessorConfig, + per_table_chunk_sizes: AHashMap, } impl TokenProcessor { - pub fn new(connection_pool: PgDbPool, config: TokenProcessorConfig) -> Self { + pub fn new( + connection_pool: PgDbPool, + per_table_chunk_sizes: AHashMap, + config: TokenProcessorConfig, + ) -> Self { Self { connection_pool, config, + per_table_chunk_sizes, } } } @@ -69,19 +74,20 @@ async fn insert_to_db( start_version: u64, end_version: u64, (tokens, token_ownerships, token_datas, collection_datas): ( - Vec, - Vec, - Vec, - Vec, + &[Token], + &[TokenOwnership], + &[TokenData], + &[CollectionData], ), (current_token_ownerships, current_token_datas, current_collection_datas): ( - Vec, - Vec, - Vec, + &[CurrentTokenOwnership], + &[CurrentTokenData], + &[CurrentCollectionData], ), - token_activities: Vec, - current_token_claims: Vec, - nft_points: Vec, + token_activities: &[TokenActivity], + current_token_claims: &[CurrentTokenPendingClaim], + nft_points: &[NftPoints], + per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -90,77 +96,89 @@ async fn insert_to_db( "Inserting to db", ); - execute_in_chunks( + let t = execute_in_chunks( conn.clone(), insert_tokens_query, tokens, - Token::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("tokens", per_table_chunk_sizes), + ); + let to = execute_in_chunks( conn.clone(), insert_token_ownerships_query, token_ownerships, - TokenOwnership::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("token_ownerships", per_table_chunk_sizes), + ); + let td = execute_in_chunks( conn.clone(), insert_token_datas_query, token_datas, - TokenData::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("token_datas", per_table_chunk_sizes), + ); + let cd = execute_in_chunks( conn.clone(), insert_collection_datas_query, collection_datas, - CollectionData::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("collection_datas", per_table_chunk_sizes), + ); + let cto = execute_in_chunks( conn.clone(), insert_current_token_ownerships_query, current_token_ownerships, - CurrentTokenOwnership::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_token_ownerships", + per_table_chunk_sizes, + ), + ); + let ctd = execute_in_chunks( conn.clone(), insert_current_token_datas_query, current_token_datas, - CurrentTokenData::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_token_datas", + per_table_chunk_sizes, + ), + ); + let ccd = execute_in_chunks( conn.clone(), insert_current_collection_datas_query, current_collection_datas, - CurrentCollectionData::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_collection_datas", + per_table_chunk_sizes, + ), + ); + + let ta = execute_in_chunks( conn.clone(), insert_token_activities_query, token_activities, - TokenActivity::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("token_activities", per_table_chunk_sizes), + ); + + let ctc = execute_in_chunks( conn.clone(), insert_current_token_claims_query, current_token_claims, - CurrentTokenPendingClaim::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_token_pending_claims", + per_table_chunk_sizes, + ), + ); + let np = execute_in_chunks( conn, insert_nft_points_query, nft_points, - NftPoints::field_count(), - ) - .await?; + get_config_table_chunk_size::("nft_points", per_table_chunk_sizes), + ); + + let (t_res, to_res, td_res, cd_res, cto_res, ctd_res, ccd_res, ta_res, ctc_res, np) = + tokio::join!(t, to, td, cd, cto, ctd, ccd, ta, ctc, np); + for res in [ + t_res, to_res, td_res, cd_res, cto_res, ctd_res, ccd_res, ta_res, ctc_res, np, + ] { + res?; + } Ok(()) } @@ -359,25 +377,26 @@ fn insert_current_token_claims_query( ) { use schema::current_token_pending_claims::dsl::*; - (diesel::insert_into(schema::current_token_pending_claims::table) - .values(items_to_insert) - .on_conflict(( - token_data_id_hash, property_version, from_address, to_address - )) - .do_update() - .set(( - collection_data_id_hash.eq(excluded(collection_data_id_hash)), - creator_address.eq(excluded(creator_address)), - collection_name.eq(excluded(collection_name)), - name.eq(excluded(name)), - amount.eq(excluded(amount)), - table_handle.eq(excluded(table_handle)), - last_transaction_version.eq(excluded(last_transaction_version)), - inserted_at.eq(excluded(inserted_at)), - token_data_id.eq(excluded(token_data_id)), - collection_id.eq(excluded(collection_id)), - )), - Some(" WHERE current_token_pending_claims.last_transaction_version <= excluded.last_transaction_version "), + ( + diesel::insert_into(schema::current_token_pending_claims::table) + .values(items_to_insert) + .on_conflict(( + token_data_id_hash, property_version, from_address, to_address + )) + .do_update() + .set(( + collection_data_id_hash.eq(excluded(collection_data_id_hash)), + creator_address.eq(excluded(creator_address)), + collection_name.eq(excluded(collection_name)), + name.eq(excluded(name)), + amount.eq(excluded(amount)), + table_handle.eq(excluded(table_handle)), + last_transaction_version.eq(excluded(last_transaction_version)), + inserted_at.eq(excluded(inserted_at)), + token_data_id.eq(excluded(token_data_id)), + collection_id.eq(excluded(collection_id)), + )), + Some(" WHERE current_token_pending_claims.last_transaction_version <= excluded.last_transaction_version "), ) } @@ -412,6 +431,8 @@ impl ProcessorTrait for TokenProcessor { _: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let mut conn = self.get_conn().await; // First get all token related table metadata from the batch of transactions. This is in case @@ -527,19 +548,20 @@ impl ProcessorTrait for TokenProcessor { start_version, end_version, ( - all_tokens, - all_token_ownerships, - all_token_datas, - all_collection_datas, + &all_tokens, + &all_token_ownerships, + &all_token_datas, + &all_collection_datas, ), ( - all_current_token_ownerships, - all_current_token_datas, - all_current_collection_datas, + &all_current_token_ownerships, + &all_current_token_datas, + &all_current_collection_datas, ), - all_token_activities, - all_current_token_claims, - all_nft_points, + &all_token_activities, + &all_current_token_claims, + &all_nft_points, + &self.per_table_chunk_sizes, ) .await; @@ -550,7 +572,7 @@ impl ProcessorTrait for TokenProcessor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }), Err(e) => { error!( diff --git a/rust/processor/src/processors/token_v2_processor.rs b/rust/processor/src/processors/token_v2_processor.rs index 3b7799d0..d781b516 100644 --- a/rust/processor/src/processors/token_v2_processor.rs +++ b/rust/processor/src/processors/token_v2_processor.rs @@ -30,7 +30,7 @@ use crate::{ schema, utils::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, - database::{execute_in_chunks, PgDbPool, PgPoolConnection}, + database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool, PgPoolConnection}, util::{get_entry_function_from_user_request, parse_timestamp, standardize_address}, }, }; @@ -43,17 +43,20 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; -use field_count::FieldCount; use std::fmt::Debug; use tracing::error; pub struct TokenV2Processor { connection_pool: PgDbPool, + per_table_chunk_sizes: AHashMap, } impl TokenV2Processor { - pub fn new(connection_pool: PgDbPool) -> Self { - Self { connection_pool } + pub fn new(connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap) -> Self { + Self { + connection_pool, + per_table_chunk_sizes, + } } } @@ -73,14 +76,15 @@ async fn insert_to_db( name: &'static str, start_version: u64, end_version: u64, - collections_v2: Vec, - token_datas_v2: Vec, - token_ownerships_v2: Vec, - current_collections_v2: Vec, - current_token_datas_v2: Vec, - current_token_ownerships_v2: Vec, - token_activities_v2: Vec, - current_token_v2_metadata: Vec, + collections_v2: &[CollectionV2], + token_datas_v2: &[TokenDataV2], + token_ownerships_v2: &[TokenOwnershipV2], + current_collections_v2: &[CurrentCollectionV2], + current_token_datas_v2: &[CurrentTokenDataV2], + current_token_ownerships_v2: &[CurrentTokenOwnershipV2], + token_activities_v2: &[TokenActivityV2], + current_token_v2_metadata: &[CurrentTokenV2Metadata], + per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -89,62 +93,97 @@ async fn insert_to_db( "Inserting to db", ); - execute_in_chunks( + let coll_v2 = execute_in_chunks( conn.clone(), insert_collections_v2_query, collections_v2, - CollectionV2::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("collections_v2", per_table_chunk_sizes), + ); + let td_v2 = execute_in_chunks( conn.clone(), insert_token_datas_v2_query, token_datas_v2, - TokenDataV2::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::("token_datas_v2", per_table_chunk_sizes), + ); + let to_v2 = execute_in_chunks( conn.clone(), insert_token_ownerships_v2_query, token_ownerships_v2, - TokenOwnershipV2::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "token_ownerships_v2", + per_table_chunk_sizes, + ), + ); + let cc_v2 = execute_in_chunks( conn.clone(), insert_current_collections_v2_query, current_collections_v2, - CurrentCollectionV2::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_collections_v2", + per_table_chunk_sizes, + ), + ); + let ctd_v2 = execute_in_chunks( conn.clone(), insert_current_token_datas_v2_query, current_token_datas_v2, - CurrentTokenDataV2::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_token_datas_v2", + per_table_chunk_sizes, + ), + ); + let cto_v2 = execute_in_chunks( conn.clone(), insert_current_token_ownerships_v2_query, current_token_ownerships_v2, - CurrentTokenOwnershipV2::field_count(), - ) - .await?; - execute_in_chunks( + get_config_table_chunk_size::( + "current_token_ownerships_v2", + per_table_chunk_sizes, + ), + ); + let ta_v2 = execute_in_chunks( conn.clone(), insert_token_activities_v2_query, token_activities_v2, - TokenActivityV2::field_count(), - ) - .await?; - execute_in_chunks( - conn.clone(), + get_config_table_chunk_size::( + "token_activities_v2", + per_table_chunk_sizes, + ), + ); + let ct_v2 = execute_in_chunks( + conn, insert_current_token_v2_metadatas_query, current_token_v2_metadata, - CurrentTokenV2Metadata::field_count(), - ) - .await?; + get_config_table_chunk_size::( + "current_token_v2_metadata", + per_table_chunk_sizes, + ), + ); + + let ( + coll_v2_res, + td_v2_res, + to_v2_res, + cc_v2_res, + ctd_v2_res, + cto_v2_res, + ta_v2_res, + ct_v2_res, + ) = tokio::join!(coll_v2, td_v2, to_v2, cc_v2, ctd_v2, cto_v2, ta_v2, ct_v2,); + + for res in [ + coll_v2_res, + td_v2_res, + to_v2_res, + cc_v2_res, + ctd_v2_res, + cto_v2_res, + ta_v2_res, + ct_v2_res, + ] { + res?; + } + Ok(()) } @@ -351,6 +390,8 @@ impl ProcessorTrait for TokenV2Processor { _: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let mut conn = self.get_conn().await; // First get all token related table metadata from the batch of transactions. This is in case @@ -378,14 +419,15 @@ impl ProcessorTrait for TokenV2Processor { self.name(), start_version, end_version, - collections_v2, - token_datas_v2, - token_ownerships_v2, - current_collections_v2, - current_token_ownerships_v2, - current_token_datas_v2, - token_activities_v2, - current_token_v2_metadata, + &collections_v2, + &token_datas_v2, + &token_ownerships_v2, + ¤t_collections_v2, + ¤t_token_ownerships_v2, + ¤t_token_datas_v2, + &token_activities_v2, + ¤t_token_v2_metadata, + &self.per_table_chunk_sizes, ) .await; @@ -396,7 +438,7 @@ impl ProcessorTrait for TokenV2Processor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }), Err(e) => { error!( diff --git a/rust/processor/src/processors/user_transaction_processor.rs b/rust/processor/src/processors/user_transaction_processor.rs index 1e8665dd..5571eedb 100644 --- a/rust/processor/src/processors/user_transaction_processor.rs +++ b/rust/processor/src/processors/user_transaction_processor.rs @@ -9,9 +9,10 @@ use crate::{ schema, utils::{ counters::PROCESSOR_UNKNOWN_TYPE_COUNT, - database::{execute_in_chunks, PgDbPool}, + database::{execute_in_chunks, get_config_table_chunk_size, PgDbPool}, }, }; +use ahash::AHashMap; use anyhow::bail; use aptos_protos::transaction::v1::{transaction::TxnData, Transaction}; use async_trait::async_trait; @@ -20,17 +21,20 @@ use diesel::{ query_builder::QueryFragment, ExpressionMethods, }; -use field_count::FieldCount; use std::fmt::Debug; use tracing::error; pub struct UserTransactionProcessor { connection_pool: PgDbPool, + per_table_chunk_sizes: AHashMap, } impl UserTransactionProcessor { - pub fn new(connection_pool: PgDbPool) -> Self { - Self { connection_pool } + pub fn new(connection_pool: PgDbPool, per_table_chunk_sizes: AHashMap) -> Self { + Self { + connection_pool, + per_table_chunk_sizes, + } } } @@ -50,8 +54,9 @@ async fn insert_to_db( name: &'static str, start_version: u64, end_version: u64, - user_transactions: Vec, - signatures: Vec, + user_transactions: &[UserTransactionModel], + signatures: &[Signature], + per_table_chunk_sizes: &AHashMap, ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -60,21 +65,26 @@ async fn insert_to_db( "Inserting to db", ); - execute_in_chunks( + let ut = execute_in_chunks( conn.clone(), insert_user_transactions_query, user_transactions, - UserTransactionModel::field_count(), - ) - .await?; - execute_in_chunks( - conn.clone(), + get_config_table_chunk_size::( + "user_transactions", + per_table_chunk_sizes, + ), + ); + let is = execute_in_chunks( + conn, insert_signatures_query, signatures, - Signature::field_count(), - ) - .await?; + get_config_table_chunk_size::("signatures", per_table_chunk_sizes), + ); + let (ut_res, is_res) = futures::join!(ut, is); + for res in [ut_res, is_res] { + res?; + } Ok(()) } @@ -133,6 +143,8 @@ impl ProcessorTrait for UserTransactionProcessor { _: Option, ) -> anyhow::Result { let processing_start = std::time::Instant::now(); + let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone(); + let mut signatures = vec![]; let mut user_transactions = vec![]; for txn in &transactions { @@ -172,8 +184,9 @@ impl ProcessorTrait for UserTransactionProcessor { self.name(), start_version, end_version, - user_transactions, - signatures, + &user_transactions, + &signatures, + &self.per_table_chunk_sizes, ) .await; let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); @@ -183,7 +196,7 @@ impl ProcessorTrait for UserTransactionProcessor { end_version, processing_duration_in_secs, db_insertion_duration_in_secs, - last_transaction_timstamp: transactions.last().unwrap().timestamp.clone(), + last_transaction_timestamp, }), Err(e) => { error!( diff --git a/rust/processor/src/utils/counters.rs b/rust/processor/src/utils/counters.rs index 6584dc52..992eb29f 100644 --- a/rust/processor/src/utils/counters.rs +++ b/rust/processor/src/utils/counters.rs @@ -132,7 +132,7 @@ pub static LATEST_PROCESSED_VERSION: Lazy = Lazy::new(|| { register_int_gauge_vec!( "indexer_processor_latest_version", "Latest version a processor has fully consumed", - &["processor_name", "step", "message"] + &["processor_name", "step", "message", "task_index"] ) .unwrap() }); @@ -142,7 +142,17 @@ pub static PROCESSED_BYTES_COUNT: Lazy = Lazy::new(|| { register_int_counter_vec!( "indexer_processor_processed_bytes_count", "Count of bytes processed", - &["processor_name", "step", "message"] + &["processor_name", "step", "message", "task_index"] + ) + .unwrap() +}); + +/// The amount of time that a task spent waiting for a protobuf bundle of transactions +pub static PB_CHANNEL_FETCH_WAIT_TIME_SECS: Lazy = Lazy::new(|| { + register_gauge_vec!( + "indexer_processor_pb_channel_fetch_wait_time_secs", + "Count of bytes processed", + &["processor_name", "task_index"] ) .unwrap() }); @@ -152,7 +162,7 @@ pub static NUM_TRANSACTIONS_PROCESSED_COUNT: Lazy = Lazy::new(|| register_int_counter_vec!( "indexer_processor_num_transactions_processed_count", "Number of transactions processed", - &["processor_name", "step", "message"] + &["processor_name", "step", "message", "task_index"] ) .unwrap() }); @@ -167,22 +177,12 @@ pub static FETCHER_THREAD_CHANNEL_SIZE: Lazy = Lazy::new(|| { .unwrap() }); -/// Overall processing time for multiple (n = number_concurrent_processing_tasks) batch of transactions -pub static MULTI_BATCH_PROCESSING_TIME_IN_SECS: Lazy = Lazy::new(|| { - register_gauge_vec!( - "indexer_processor_multi_batch_processing_time_in_secs", - "Time taken to process multiple batches of transactions", - &["processor_name"] - ) - .unwrap() -}); - -/// Overall processing time for a single batch of transactions +/// Overall processing time for a single batch of transactions (per task) pub static SINGLE_BATCH_PROCESSING_TIME_IN_SECS: Lazy = Lazy::new(|| { register_gauge_vec!( "indexer_processor_single_batch_processing_time_in_secs", "Time taken to process a single batch of transactions", - &["processor_name"] + &["processor_name", "task_index"] ) .unwrap() }); @@ -192,7 +192,7 @@ pub static SINGLE_BATCH_PARSING_TIME_IN_SECS: Lazy = Lazy::new(|| { register_gauge_vec!( "indexer_processor_single_batch_parsing_time_in_secs", "Time taken to parse a single batch of transactions", - &["processor_name"] + &["processor_name", "task_index"] ) .unwrap() }); @@ -202,7 +202,7 @@ pub static SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS: Lazy = Lazy::new(|| register_gauge_vec!( "indexer_processor_single_batch_db_insertion_time_in_secs", "Time taken to insert to DB for a single batch of transactions", - &["processor_name"] + &["processor_name", "task_index"] ) .unwrap() }); @@ -212,7 +212,7 @@ pub static TRANSACTION_UNIX_TIMESTAMP: Lazy = Lazy::new(|| { register_gauge_vec!( "indexer_processor_transaction_unix_timestamp", "Transaction timestamp in unixtime", - &["processor_name", "step", "message"] + &["processor_name", "step", "message", "task_index"] ) .unwrap() }); @@ -230,7 +230,7 @@ pub static GRPC_LATENCY_BY_PROCESSOR_IN_SECS: Lazy = Lazy::new(|| { register_gauge_vec!( "indexer_processor_grpc_latency_in_secs", "GRPC latency observed by processor", - &["processor_name"] + &["processor_name", "task_index"] ) .unwrap() }); diff --git a/rust/processor/src/utils/database.rs b/rust/processor/src/utils/database.rs index fd230b15..9a8e4dac 100644 --- a/rust/processor/src/utils/database.rs +++ b/rust/processor/src/utils/database.rs @@ -3,9 +3,11 @@ //! Database-related functions #![allow(clippy::extra_unused_lifetimes)] + use crate::utils::util::remove_null_bytes; +use ahash::AHashMap; use diesel::{ - pg::Pg, + backend::Backend, query_builder::{AstPass, Query, QueryFragment}, ConnectionResult, QueryResult, }; @@ -17,9 +19,8 @@ use diesel_async::{ }, RunQueryDsl, }; -use diesel_async_migrations::{embed_migrations, EmbeddedMigrations}; +use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; use futures_util::{future::BoxFuture, FutureExt}; -use once_cell::sync::Lazy; use std::{cmp::min, sync::Arc}; pub type MyDbConnection = AsyncPgConnection; @@ -27,9 +28,9 @@ pub type PgPool = Pool; pub type PgDbPool = Arc; pub type PgPoolConnection<'a> = PooledConnection<'a, MyDbConnection>; -pub static MIGRATIONS: Lazy = Lazy::new(|| embed_migrations!()); +pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); -pub const DEFAULT_MAX_POOL_SIZE: u32 = 30; +pub const DEFAULT_MAX_POOL_SIZE: u32 = 150; #[derive(QueryId)] /// Using this will append a where clause at the end of the string upsert function, e.g. @@ -41,19 +42,16 @@ pub struct UpsertFilterLatestTransactionQuery { } // the max is actually u16::MAX but we see that when the size is too big we get an overflow error so reducing it a bit -pub const MAX_DIESEL_PARAM_SIZE: u16 = u16::MAX / 2; +pub const MAX_DIESEL_PARAM_SIZE: usize = (u16::MAX / 2) as usize; -/// Given diesel has a limit of how many parameters can be inserted in a single operation (u16::MAX) -/// we may need to chunk an array of items based on how many columns are in the table. /// This function returns boundaries of chunks in the form of (start_index, end_index) -pub fn get_chunks(num_items_to_insert: usize, column_count: usize) -> Vec<(usize, usize)> { - let max_item_size = MAX_DIESEL_PARAM_SIZE as usize / column_count; - let mut chunk: (usize, usize) = (0, min(num_items_to_insert, max_item_size)); +pub fn get_chunks(num_items_to_insert: usize, chunk_size: usize) -> Vec<(usize, usize)> { + let mut chunk: (usize, usize) = (0, min(num_items_to_insert, chunk_size)); let mut chunks = vec![chunk]; while chunk.1 != num_items_to_insert { chunk = ( - chunk.0 + max_item_size, - min(num_items_to_insert, chunk.1 + max_item_size), + chunk.0 + chunk_size, + min(num_items_to_insert, chunk.1 + chunk_size), ); chunks.push(chunk); } @@ -139,49 +137,38 @@ pub async fn new_db_pool( Ok(Arc::new(pool)) } -/* -pub async fn get_connection(pool: &PgPool) -> Result, PoolError> { - let connection = pool.get().await.unwrap(); - Ok(AsyncConnectionWrapper::from(connection)) -} -*/ - pub async fn execute_in_chunks( conn: PgDbPool, build_query: fn(Vec) -> (U, Option<&'static str>), - items_to_insert: Vec, + items_to_insert: &[T], chunk_size: usize, ) -> Result<(), diesel::result::Error> where - U: QueryFragment + diesel::query_builder::QueryId + Send, - T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone, + U: QueryFragment + diesel::query_builder::QueryId + Send + 'static, + T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + 'static, { let chunks = get_chunks(items_to_insert.len(), chunk_size); - for (start_ind, end_ind) in chunks { - let items = &items_to_insert[start_ind..end_ind]; - - let (query, additional_where_clause) = build_query(items.to_vec()); - match execute_with_better_error(conn.clone(), query, additional_where_clause).await { - Ok(_) => {}, - Err(_) => { - let cleaned_items = clean_data_for_db(items.to_vec(), true); - let (cleaned_query, additional_where_clause) = build_query(cleaned_items); - match execute_with_better_error( - conn.clone(), - cleaned_query, - additional_where_clause, - ) - .await - { - Ok(_) => {}, - Err(e) => { - return Err(e); - }, - } - }, - } + let tasks = chunks + .into_iter() + .map(|(start_ind, end_ind)| { + let items = items_to_insert[start_ind..end_ind].to_vec(); + let conn = conn.clone(); + tokio::spawn(async move { + let (query, additional_where_clause) = build_query(items.clone()); + execute_or_retry_cleaned(conn, build_query, items, query, additional_where_clause) + .await + }) + }) + .collect::>(); + + let results = futures_util::future::try_join_all(tasks) + .await + .expect("Task panicked executing in chunks"); + for res in results { + res? } + Ok(()) } @@ -191,7 +178,7 @@ pub async fn execute_with_better_error( mut additional_where_clause: Option<&'static str>, ) -> QueryResult where - U: QueryFragment + diesel::query_builder::QueryId + Send, + U: QueryFragment + diesel::query_builder::QueryId + Send, { let original_query = diesel::debug_query::(&query).to_string(); // This is needed because if we don't insert any row, then diesel makes a call like this @@ -205,7 +192,6 @@ where }; let debug_string = diesel::debug_query::(&final_query).to_string(); tracing::debug!("Executing query: {:?}", debug_string); - let conn = &mut pool.get().await.map_err(|e| { tracing::warn!("Error getting connection from pool: {:?}", e); diesel::result::Error::DatabaseError( @@ -213,7 +199,46 @@ where Box::new(e.to_string()), ) })?; + let res = final_query.execute(conn).await; + if let Err(ref e) = res { + tracing::warn!("Error running query: {:?}\n{:?}", e, debug_string); + } + res +} +/// Returns the entry for the config hashmap, or the default field count for the insert +/// Given diesel has a limit of how many parameters can be inserted in a single operation (u16::MAX), +/// we default to chunk an array of items based on how many columns are in the table. +pub fn get_config_table_chunk_size( + table_name: &str, + per_table_chunk_sizes: &AHashMap, +) -> usize { + per_table_chunk_sizes + .get(table_name) + .copied() + .unwrap_or_else(|| MAX_DIESEL_PARAM_SIZE / T::field_count()) +} + +pub async fn execute_with_better_error_conn( + conn: &mut MyDbConnection, + query: U, + mut additional_where_clause: Option<&'static str>, +) -> QueryResult +where + U: QueryFragment + diesel::query_builder::QueryId + Send, +{ + let original_query = diesel::debug_query::(&query).to_string(); + // This is needed because if we don't insert any row, then diesel makes a call like this + // SELECT 1 FROM TABLE WHERE 1=0 + if original_query.to_lowercase().contains("where") { + additional_where_clause = None; + } + let final_query = UpsertFilterLatestTransactionQuery { + query, + where_clause: additional_where_clause, + }; + let debug_string = diesel::debug_query::(&final_query).to_string(); + tracing::debug!("Executing query: {:?}", debug_string); let res = final_query.execute(conn).await; if let Err(ref e) = res { tracing::warn!("Error running query: {:?}\n{:?}", e, debug_string); @@ -221,10 +246,37 @@ where res } -pub async fn run_pending_migrations(conn: &mut MyDbConnection) { - MIGRATIONS - .run_pending_migrations(conn) - .await +async fn execute_or_retry_cleaned( + conn: PgDbPool, + build_query: fn(Vec) -> (U, Option<&'static str>), + items: Vec, + query: U, + additional_where_clause: Option<&'static str>, +) -> Result<(), diesel::result::Error> +where + U: QueryFragment + diesel::query_builder::QueryId + Send, + T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone, +{ + match execute_with_better_error(conn.clone(), query, additional_where_clause).await { + Ok(_) => {}, + Err(_) => { + let cleaned_items = clean_data_for_db(items, true); + let (cleaned_query, additional_where_clause) = build_query(cleaned_items); + match execute_with_better_error(conn.clone(), cleaned_query, additional_where_clause) + .await + { + Ok(_) => {}, + Err(e) => { + return Err(e); + }, + } + }, + } + Ok(()) +} + +pub async fn run_pending_migrations(conn: &mut impl MigrationHarness) { + conn.run_pending_migrations(MIGRATIONS) .expect("[Parser] Migrations failed!"); } @@ -235,11 +287,11 @@ impl Query for UpsertFilterLatestTransactionQuery { //impl RunQueryDsl for UpsertFilterLatestTransactionQuery {} -impl QueryFragment for UpsertFilterLatestTransactionQuery +impl QueryFragment for UpsertFilterLatestTransactionQuery where - T: QueryFragment, + T: QueryFragment, { - fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> { + fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, diesel::pg::Pg>) -> QueryResult<()> { self.query.walk_ast(out.reborrow())?; if let Some(w) = self.where_clause { out.push_sql(w); @@ -258,7 +310,7 @@ mod test { assert_eq!(get_chunks(65535, 1), vec![ (0, 32767), (32767, 65534), - (65534, 65535) + (65534, 65535), ]); // 200,000 total items will take 6 buckets. Each bucket can only be 3276 size. assert_eq!(get_chunks(10000, 20), vec![ @@ -268,14 +320,14 @@ mod test { (4914, 6552), (6552, 8190), (8190, 9828), - (9828, 10000) + (9828, 10000), ]); assert_eq!(get_chunks(65535, 2), vec![ (0, 16383), (16383, 32766), (32766, 49149), (49149, 65532), - (65532, 65535) + (65532, 65535), ]); } } diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index bb6b1fa6..b30d53d9 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -18,32 +18,33 @@ use crate::{ utils::{ counters::{ ProcessorStep, GRPC_LATENCY_BY_PROCESSOR_IN_SECS, LATEST_PROCESSED_VERSION, - MULTI_BATCH_PROCESSING_TIME_IN_SECS, NUM_TRANSACTIONS_PROCESSED_COUNT, + NUM_TRANSACTIONS_PROCESSED_COUNT, PB_CHANNEL_FETCH_WAIT_TIME_SECS, PROCESSED_BYTES_COUNT, PROCESSOR_DATA_PROCESSED_LATENCY_IN_SECS, PROCESSOR_DATA_RECEIVED_LATENCY_IN_SECS, PROCESSOR_ERRORS_COUNT, PROCESSOR_INVOCATIONS_COUNT, PROCESSOR_SUCCESSES_COUNT, SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS, SINGLE_BATCH_PARSING_TIME_IN_SECS, SINGLE_BATCH_PROCESSING_TIME_IN_SECS, TRANSACTION_UNIX_TIMESTAMP, }, - database::{execute_with_better_error, new_db_pool, run_pending_migrations, PgDbPool}, + database::{execute_with_better_error_conn, new_db_pool, run_pending_migrations, PgDbPool}, util::{time_diff_since_pb_timestamp_in_secs, timestamp_to_iso, timestamp_to_unixtime}, }, }; +use ahash::AHashMap; use anyhow::{Context, Result}; use aptos_moving_average::MovingAverage; use aptos_protos::transaction::v1::Transaction; -use std::{sync::Arc, time::Duration}; -use tokio::time::timeout; -use tracing::{error, info}; +use diesel::Connection; +use tokio::task::JoinHandle; +use tracing::{debug, error, info}; use url::Url; // this is how large the fetch queue should be. Each bucket should have a max of 80MB or so, so a batch // of 50 means that we could potentially have at least 4.8GB of data in memory at any given time and that we should provision // machines accordingly. -const BUFFER_SIZE: usize = 50; +pub const BUFFER_SIZE: usize = 100; // Consumer thread will wait X seconds before panicking if it doesn't receive any data -const CONSUMER_THREAD_TIMEOUT_IN_SECS: u64 = 60 * 5; -pub(crate) const PROCESSOR_SERVICE_TYPE: &str = "processor"; +pub const CONSUMER_THREAD_TIMEOUT_IN_SECS: u64 = 60 * 5; +pub const PROCESSOR_SERVICE_TYPE: &str = "processor"; #[derive(Clone)] pub struct TransactionsPBResponse { @@ -63,6 +64,9 @@ pub struct Worker { pub ending_version: Option, pub number_concurrent_processing_tasks: usize, pub gap_detection_batch_size: u64, + pub grpc_chain_id: Option, + pub pb_channel_txn_chunk_size: usize, + pub per_table_chunk_sizes: AHashMap, pub enable_verbose_logging: Option, } @@ -78,6 +82,9 @@ impl Worker { number_concurrent_processing_tasks: Option, db_pool_size: Option, gap_detection_batch_size: u64, + // The number of transactions per protobuf batch + pb_channel_txn_chunk_size: usize, + per_table_chunk_sizes: AHashMap, enable_verbose_logging: Option, ) -> Result { let processor_name = processor_config.name(); @@ -108,6 +115,9 @@ impl Worker { auth_token, number_concurrent_processing_tasks, gap_detection_batch_size, + grpc_chain_id: None, + pb_channel_txn_chunk_size, + per_table_chunk_sizes, enable_verbose_logging, }) } @@ -120,7 +130,6 @@ impl Worker { /// 4. We will keep track of the last processed version and monitoring things like TPS pub async fn run(&mut self) { let processor_name = self.processor_config.name(); - let enable_verbose_logging = self.enable_verbose_logging.unwrap_or(false); info!( processor_name = processor_name, service_type = PROCESSOR_SERVICE_TYPE, @@ -148,10 +157,7 @@ impl Worker { 0 }); - let starting_version = match self.starting_version { - None => starting_version_from_db, - Some(version) => version, - }; + let starting_version = self.starting_version.unwrap_or(starting_version_from_db); info!( processor_name = processor_name, @@ -165,13 +171,20 @@ impl Worker { let concurrent_tasks = self.number_concurrent_processing_tasks; - // Build the processor based on the config. - let processor = build_processor(&self.processor_config, self.db_pool.clone()); - let processor = Arc::new(processor); + // get the chain id + let chain_id = crate::grpc_stream::get_chain_id( + self.indexer_grpc_data_service_address.clone(), + self.grpc_http2_config.grpc_http2_ping_interval_in_secs(), + self.grpc_http2_config.grpc_http2_ping_timeout_in_secs(), + self.auth_token.clone(), + processor_name.to_string(), + ) + .await; + self.check_or_update_chain_id(chain_id as i64) + .await + .unwrap(); - // This is the moving average that we use to calculate TPS - let mut ma = MovingAverage::new(10); - let mut batch_start_version = starting_version; + self.grpc_chain_id = Some(chain_id); let ending_version = self.ending_version; let indexer_grpc_data_service_address = self.indexer_grpc_data_service_address.clone(); @@ -179,31 +192,33 @@ impl Worker { self.grpc_http2_config.grpc_http2_ping_interval_in_secs(); let indexer_grpc_http2_ping_timeout = self.grpc_http2_config.grpc_http2_ping_timeout_in_secs(); + let pb_channel_txn_chunk_size = self.pb_channel_txn_chunk_size; + // Create a transaction fetcher thread that will continuously fetch transactions from the GRPC stream // and write into a channel - // The each item will be (chain_id, batch of transactions) + // TODO: change channel size based on number_concurrent_processing_tasks let (tx, receiver) = kanal::bounded_async::(BUFFER_SIZE); let request_ending_version = self.ending_version; let auth_token = self.auth_token.clone(); - tokio::spawn(async move { + let fetcher_task = tokio::spawn(async move { info!( processor_name = processor_name, service_type = PROCESSOR_SERVICE_TYPE, end_version = ending_version, - start_version = batch_start_version, + start_version = starting_version, "[Parser] Starting fetcher thread" ); + crate::grpc_stream::create_fetcher_loop( - tx, - indexer_grpc_data_service_address, + tx.clone(), + indexer_grpc_data_service_address.clone(), indexer_grpc_http2_ping_interval, indexer_grpc_http2_ping_timeout, starting_version, request_ending_version, - auth_token, + auth_token.clone(), processor_name.to_string(), - batch_start_version, - BUFFER_SIZE, + pb_channel_txn_chunk_size, ) .await }); @@ -211,13 +226,17 @@ impl Worker { // Create a gap detector task that will panic if there is a gap in the processing let (gap_detector_sender, gap_detector_receiver) = kanal::bounded_async::(BUFFER_SIZE); - let processor_clone = processor.clone(); let gap_detection_batch_size = self.gap_detection_batch_size; + let processor = build_processor( + &self.processor_config, + self.per_table_chunk_sizes.clone(), + self.db_pool.clone(), + ); tokio::spawn(async move { crate::gap_detector::create_gap_detector_status_tracker_loop( gap_detector_receiver, - processor_clone, - batch_start_version, + processor, + starting_version, gap_detection_batch_size, ) .await; @@ -229,347 +248,134 @@ impl Worker { // 3. We have received either an empty batch or a batch with a gap. We should panic. // 4. We have not received anything in X seconds, we should panic. // 5. If it's the wrong chain, panic. - let mut db_chain_id = None; - loop { - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - stream_address = self.indexer_grpc_data_service_address.as_str(), - "[Parser] Fetching transaction batches from channel", - ); - let txn_channel_fetch_latency = std::time::Instant::now(); - let mut transactions_batches = vec![]; - let mut last_fetched_version = batch_start_version as i64 - 1; - for task_index in 0..concurrent_tasks { - let txn_pb_res = match task_index { - 0 => { - // If we're the first task, we should wait until we get data. If `None`, it means the channel is closed. - let txn_pb_timeout_res = timeout( - Duration::from_secs(CONSUMER_THREAD_TIMEOUT_IN_SECS), - receiver.recv(), - ) - .await; - match txn_pb_timeout_res { - Ok(txn_pb_res) => txn_pb_res.map(Some), - // Outer `Err` is a timeout - Err(_) => { - error!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - stream_address = - self.indexer_grpc_data_service_address.as_str(), - "[Parser] Consumer thread timed out waiting for transactions", - ); - panic!( - "[Parser] Consumer thread timed out waiting for transactions" - ); - }, - } - }, - _ => { - // If we're not the first task, we should poll to see if we get any data. - receiver.try_recv() - }, - }; - let txn_pb = match txn_pb_res { - Ok(txn_pb) => txn_pb, - // This happens when the channel is closed. We should panic. - Err(_e) => { - error!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - stream_address = self.indexer_grpc_data_service_address.as_str(), - "[Parser][T#{}] Channel closed; stream ended.", - task_index - ); - panic!("[Parser][T#{}] Channel closed", task_index); - }, - }; + info!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + stream_address = self.indexer_grpc_data_service_address.as_str(), + concurrent_tasks, + "[Parser] Spawning concurrent parallel processor tasks", + ); - // If we didn't get any data, break; we'll retry later - let txn_pb = match txn_pb { - Some(txn_pb) => txn_pb, - None => break, - }; + let mut processor_tasks = vec![fetcher_task]; + for task_index in 0..concurrent_tasks { + let join_handle = self + .launch_processor_task(task_index, receiver.clone(), gap_detector_sender.clone()) + .await; + processor_tasks.push(join_handle); + } - if let Some(existing_id) = db_chain_id { - if txn_pb.chain_id != existing_id { - error!( - processor_name = processor_name, - stream_address = self.indexer_grpc_data_service_address.as_str(), - chain_id = txn_pb.chain_id, - existing_id = existing_id, - "[Parser] Stream somehow changed chain id!", - ); - panic!("[Parser] Stream somehow changed chain id!"); - } - } else { - db_chain_id = Some( - self.check_or_update_chain_id(txn_pb.chain_id as i64) - .await - .unwrap(), - ); - } - let current_fetched_version = - txn_pb.transactions.as_slice().first().unwrap().version; - if last_fetched_version + 1 != current_fetched_version as i64 { + info!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + stream_address = self.indexer_grpc_data_service_address.as_str(), + concurrent_tasks, + "[Parser] Processor tasks spawned", + ); + + // Await the processor tasks: this is forever + futures::future::try_join_all(processor_tasks) + .await + .expect("[Processor] Processor tasks have died"); + } + + async fn launch_processor_task( + &self, + task_index: usize, + receiver: kanal::AsyncReceiver, + gap_detector_sender: kanal::AsyncSender, + ) -> JoinHandle<()> { + let processor_name = self.processor_config.name(); + let stream_address = self.indexer_grpc_data_service_address.to_string(); + let receiver_clone = receiver.clone(); + let auth_token = self.auth_token.clone(); + + // Build the processor based on the config. + let processor = build_processor( + &self.processor_config, + self.per_table_chunk_sizes.clone(), + self.db_pool.clone(), + ); + + let concurrent_tasks = self.number_concurrent_processing_tasks; + + let chain_id = self + .grpc_chain_id + .expect("GRPC chain ID has not been fetched yet!"); + + tokio::spawn(async move { + let task_index_str = task_index.to_string(); + let step = ProcessorStep::ProcessedBatch.get_step(); + let label = ProcessorStep::ProcessedBatch.get_label(); + let mut ma = MovingAverage::new(3000); + + loop { + let txn_channel_fetch_latency = std::time::Instant::now(); + + let txn_pb = fetch_transactions( + processor_name, + &stream_address, + receiver_clone.clone(), + task_index, + ) + .await; + + let size_in_bytes = txn_pb.size_in_bytes as f64; + let first_txn = txn_pb.transactions.as_slice().first().unwrap(); + let first_txn_version = first_txn.version; + let last_txn = txn_pb.transactions.as_slice().last().unwrap(); + let last_txn_version = last_txn.version; + let start_txn_timestamp = first_txn.timestamp.clone(); + let end_txn_timestamp = last_txn.timestamp.clone(); + let txn_channel_fetch_latency_sec = + txn_channel_fetch_latency.elapsed().as_secs_f64(); + + debug!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + start_version = first_txn_version, + end_version = last_txn_version, + num_of_transactions = (last_txn_version - first_txn_version) as i64 + 1, + size_in_bytes, + task_index, + duration_in_secs = txn_channel_fetch_latency_sec, + tps = (last_txn_version as f64 - first_txn_version as f64) + / txn_channel_fetch_latency_sec, + bytes_per_sec = size_in_bytes / txn_channel_fetch_latency_sec, + "[Parser][T#{}] Successfully fetched transactions from channel.", + task_index + ); + + // Ensure chain_id has not changed + if txn_pb.chain_id != chain_id { error!( - batch_start_version = batch_start_version, - last_fetched_version = last_fetched_version, - current_fetched_version = current_fetched_version, - "[Parser] Received batch with gap from GRPC stream" + processor_name = processor_name, + stream_address = stream_address.as_str(), + chain_id = txn_pb.chain_id, + existing_id = chain_id, + task_index, + "[Parser][T#{}] Stream somehow changed chain id!", + task_index + ); + panic!( + "[Parser][T#{}] Stream somehow changed chain id!", + task_index ); - panic!("[Parser] Received batch with gap from GRPC stream"); } - last_fetched_version = - txn_pb.transactions.as_slice().last().unwrap().version as i64; - transactions_batches.push(txn_pb); - } - let size_in_bytes = transactions_batches - .iter() - .fold(0.0, |acc, txn_batch| acc + txn_batch.size_in_bytes as f64); - let batch_start_txn_timestamp = transactions_batches - .first() - .unwrap() - .transactions - .as_slice() - .first() - .unwrap() - .timestamp - .clone(); - let batch_end_txn_timestamp = transactions_batches - .last() - .unwrap() - .transactions - .as_slice() - .last() - .unwrap() - .timestamp - .clone(); - GRPC_LATENCY_BY_PROCESSOR_IN_SECS - .with_label_values(&[processor_name]) - .set(time_diff_since_pb_timestamp_in_secs( - batch_end_txn_timestamp.as_ref().unwrap(), - )); - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version = batch_start_version, - end_version = last_fetched_version, - num_of_transactions = last_fetched_version - batch_start_version as i64 + 1, - size_in_bytes, - duration_in_secs = txn_channel_fetch_latency.elapsed().as_secs_f64(), - tps = (last_fetched_version as f64 - batch_start_version as f64) - / txn_channel_fetch_latency.elapsed().as_secs_f64(), - bytes_per_sec = size_in_bytes / txn_channel_fetch_latency.elapsed().as_secs_f64(), - "[Parser] Successfully fetched transaction batches from channel." - ); + let processing_time = std::time::Instant::now(); - // Process the transactions in parallel - let mut tasks = vec![]; - for transactions_pb in transactions_batches { - let processor_clone = processor.clone(); - let gap_detector_sender = gap_detector_sender.clone(); - let auth_token = self.auth_token.clone(); - let task = tokio::spawn(async move { - let start_version = transactions_pb - .transactions - .as_slice() - .first() - .unwrap() - .version; - let end_version = transactions_pb - .transactions - .as_slice() - .last() - .unwrap() - .version; - let start_txn_timestamp = transactions_pb - .transactions - .as_slice() - .first() - .unwrap() - .timestamp - .clone(); - let end_txn_timestamp = transactions_pb - .transactions - .as_slice() - .last() - .unwrap() - .timestamp - .clone(); - let txn_time = transactions_pb - .transactions - .as_slice() - .first() - .unwrap() - .timestamp - .clone(); - if let Some(ref t) = txn_time { - PROCESSOR_DATA_RECEIVED_LATENCY_IN_SECS - .with_label_values(&[auth_token.as_str(), processor_name]) - .set(time_diff_since_pb_timestamp_in_secs(t)); - } - PROCESSOR_INVOCATIONS_COUNT - .with_label_values(&[processor_name]) - .inc(); - - if enable_verbose_logging { - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version, - end_version, - size_in_bytes = transactions_pb.size_in_bytes, - "[Parser] Started processing one batch of transactions" - ); - } - - let processing_duration = std::time::Instant::now(); - - let processed_result = processor_clone - .process_transactions( - transactions_pb.transactions, - start_version, - end_version, - db_chain_id, - ) // TODO: Change how we fetch chain_id, ideally can be accessed by processors when they are initiallized (e.g. so they can have a chain_id field set on new() funciton) - .await; - if let Some(ref t) = txn_time { - PROCESSOR_DATA_PROCESSED_LATENCY_IN_SECS - .with_label_values(&[auth_token.as_str(), processor_name]) - .set(time_diff_since_pb_timestamp_in_secs(t)); - } - - let start_txn_timestamp_unix = start_txn_timestamp - .clone() - .map(|t| timestamp_to_unixtime(&t)) - .unwrap_or_default(); - let start_txn_timestamp_iso = start_txn_timestamp - .map(|t| timestamp_to_iso(&t)) - .unwrap_or_default(); - let end_txn_timestamp_iso = end_txn_timestamp - .clone() - .map(|t| timestamp_to_iso(&t)) - .unwrap_or_default(); - - LATEST_PROCESSED_VERSION - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedBatch.get_step(), - ProcessorStep::ProcessedBatch.get_label(), - ]) - .set(end_version as i64); - TRANSACTION_UNIX_TIMESTAMP - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedBatch.get_step(), - ProcessorStep::ProcessedBatch.get_label(), - ]) - .set(start_txn_timestamp_unix); - PROCESSED_BYTES_COUNT - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedBatch.get_step(), - ProcessorStep::ProcessedBatch.get_label(), - ]) - .inc_by(transactions_pb.size_in_bytes); - NUM_TRANSACTIONS_PROCESSED_COUNT - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedBatch.get_step(), - ProcessorStep::ProcessedBatch.get_label(), - ]) - .inc_by(end_version - start_version + 1); - - if let Ok(ref res) = processed_result { - gap_detector_sender - .send(res.clone()) - .await - .expect("[Parser] Failed to send versions to gap detector"); - - // Logging and metrics - SINGLE_BATCH_PROCESSING_TIME_IN_SECS - .with_label_values(&[processor_name]) - .set(processing_duration.elapsed().as_secs_f64()); - SINGLE_BATCH_PARSING_TIME_IN_SECS - .with_label_values(&[processor_name]) - .set(res.processing_duration_in_secs); - SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS - .with_label_values(&[processor_name]) - .set(res.db_insertion_duration_in_secs); - - if enable_verbose_logging { - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version, - end_version, - start_txn_timestamp_iso, - end_txn_timestamp_iso, - size_in_bytes = transactions_pb.size_in_bytes, - duration_in_secs = res.db_insertion_duration_in_secs, - tps = (end_version - start_version) as f64 - / processing_duration.elapsed().as_secs_f64(), - bytes_per_sec = transactions_pb.size_in_bytes as f64 - / processing_duration.elapsed().as_secs_f64(), - "[Parser] DB insertion time of one batch of transactions" - ); - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version, - end_version, - start_txn_timestamp_iso, - end_txn_timestamp_iso, - size_in_bytes = transactions_pb.size_in_bytes, - duration_in_secs = res.processing_duration_in_secs, - tps = (end_version - start_version) as f64 - / processing_duration.elapsed().as_secs_f64(), - bytes_per_sec = transactions_pb.size_in_bytes as f64 - / processing_duration.elapsed().as_secs_f64(), - "[Parser] Parsing time of one batch of transactions" - ); - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version, - end_version, - start_txn_timestamp_iso, - end_txn_timestamp_iso, - num_of_transactions = end_version - start_version + 1, - size_in_bytes = transactions_pb.size_in_bytes, - processing_duration_in_secs = res.processing_duration_in_secs, - db_insertion_duration_in_secs = res.db_insertion_duration_in_secs, - duration_in_secs = processing_duration.elapsed().as_secs_f64(), - tps = (end_version - start_version) as f64 - / processing_duration.elapsed().as_secs_f64(), - bytes_per_sec = transactions_pb.size_in_bytes as f64 - / processing_duration.elapsed().as_secs_f64(), - step = ProcessorStep::ProcessedBatch.get_step(), - "{}", - ProcessorStep::ProcessedBatch.get_label(), - ); - } - } - - processed_result - }); - tasks.push(task); - } - let processing_time = std::time::Instant::now(); - let task_count = tasks.len(); - let batches = match futures::future::try_join_all(tasks).await { - Ok(res) => res, - Err(err) => panic!("[Parser] Error processing transaction batches: {:?}", err), - }; - - // Update states depending on results of the batch processing - let mut processed_versions = vec![]; - for res in batches { - let processed: ProcessingResult = match res { + let res = do_processor( + txn_pb, + &processor, + chain_id, + processor_name, + &auth_token, + false, // enable_verbose_logging + ) + .await; + + let processing_result = match res { Ok(versions) => { PROCESSOR_SUCCESSES_COUNT .with_label_values(&[processor_name]) @@ -579,93 +385,112 @@ impl Worker { Err(e) => { error!( processor_name = processor_name, - stream_address = self.indexer_grpc_data_service_address.to_string(), + stream_address = stream_address.as_str(), error = ?e, - "[Parser] Error processing transactions" + task_index, + "[Parser][T#{}] Error processing transactions", task_index ); PROCESSOR_ERRORS_COUNT .with_label_values(&[processor_name]) .inc(); - panic!(); + panic!( + "[Parser][T#{}] Error processing '{:}' transactions: {:?}", + task_index, processor_name, e + ); }, }; - processed_versions.push(processed); - } - // Log the metrics for processed batch - processed_versions.sort_by(|a, b| a.start_version.cmp(&b.start_version)); - let processed_versions_sorted = processed_versions.clone(); - let batch_start = processed_versions_sorted.first().unwrap().start_version; - let batch_end = processed_versions_sorted.last().unwrap().end_version; - batch_start_version = batch_end + 1; + let processing_time = processing_time.elapsed().as_secs_f64(); - ma.tick_now(batch_end - batch_start + 1); - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version = batch_start, - end_version = batch_end, - start_txn_timestamp_iso = batch_start_txn_timestamp - .clone() - .map(|t| timestamp_to_iso(&t)) - .unwrap_or_default(), - end_txn_timestamp_iso = batch_end_txn_timestamp - .map(|t| timestamp_to_iso(&t)) - .unwrap_or_default(), - num_of_transactions = batch_end - batch_start + 1, - task_count, - size_in_bytes, - duration_in_secs = processing_time.elapsed().as_secs_f64(), - tps = (ma.avg() * 1000.0) as u64, - bytes_per_sec = size_in_bytes / processing_time.elapsed().as_secs_f64(), - step = ProcessorStep::ProcessedMultipleBatches.get_step(), - "{}", - ProcessorStep::ProcessedMultipleBatches.get_label(), - ); - LATEST_PROCESSED_VERSION - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .set(batch_end as i64); - TRANSACTION_UNIX_TIMESTAMP - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .set( - batch_start_txn_timestamp - .map(|t| timestamp_to_unixtime(&t)) - .unwrap_or_default(), + // We've processed things: do some data and metrics + let start_version = processing_result.start_version; + let end_version = processing_result.end_version; + + let start_txn_timestamp_unix = start_txn_timestamp + .as_ref() + .map(timestamp_to_unixtime) + .unwrap_or_default(); + let start_txn_timestamp_iso = start_txn_timestamp + .as_ref() + .map(timestamp_to_iso) + .unwrap_or_default(); + let end_txn_timestamp_iso = end_txn_timestamp + .as_ref() + .map(timestamp_to_iso) + .unwrap_or_default(); + + ma.tick_now(end_version - start_version + 1); + let tps = (ma.avg() * 1000.0) as u64; + + let num_processed = end_version - start_version + 1; + + info!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + start_version = start_version, + end_version = end_version, + start_txn_timestamp_iso = start_txn_timestamp_iso, + end_txn_timestamp_iso = end_txn_timestamp_iso, + num_of_transactions = num_processed, + concurrent_tasks, + task_index, + size_in_bytes, + processing_duration_in_secs = processing_result.processing_duration_in_secs, + db_insertion_duration_in_secs = processing_result.db_insertion_duration_in_secs, + duration_in_secs = processing_time, + tps = tps, + bytes_per_sec = size_in_bytes / processing_time, + step = &step, + "{}", + label, ); - PROCESSED_BYTES_COUNT - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .inc_by(size_in_bytes as u64); - NUM_TRANSACTIONS_PROCESSED_COUNT - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .inc_by(batch_end - batch_start + 1); - MULTI_BATCH_PROCESSING_TIME_IN_SECS - .with_label_values(&[processor_name]) - .set(processing_time.elapsed().as_secs_f64()); - } + + // TODO: For these three, do an atomic thing, or ideally move to an async metrics collector! + GRPC_LATENCY_BY_PROCESSOR_IN_SECS + .with_label_values(&[processor_name, &task_index_str]) + .set(time_diff_since_pb_timestamp_in_secs( + end_txn_timestamp.as_ref().unwrap(), + )); + LATEST_PROCESSED_VERSION + .with_label_values(&[processor_name, step, label, &task_index_str]) + .set(end_version as i64); + TRANSACTION_UNIX_TIMESTAMP + .with_label_values(&[processor_name, step, label, &task_index_str]) + .set(start_txn_timestamp_unix); + + // Single batch metrics + PROCESSED_BYTES_COUNT + .with_label_values(&[processor_name, step, label, &task_index_str]) + .inc_by(size_in_bytes as u64); + NUM_TRANSACTIONS_PROCESSED_COUNT + .with_label_values(&[processor_name, step, label, &task_index_str]) + .inc_by(num_processed); + + SINGLE_BATCH_PROCESSING_TIME_IN_SECS + .with_label_values(&[processor_name, &task_index_str]) + .set(processing_time); + SINGLE_BATCH_PARSING_TIME_IN_SECS + .with_label_values(&[processor_name, &task_index_str]) + .set(processing_result.processing_duration_in_secs); + SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS + .with_label_values(&[processor_name, &task_index_str]) + .set(processing_result.db_insertion_duration_in_secs); + + // Send the result to the gap detector + gap_detector_sender + .send(processing_result) + .await + .expect("[Parser] Failed to send versions to gap detector"); + } + }) } async fn run_migrations(&self) { - let mut conn = self - .db_pool - .get() - .await - .expect("[Parser] Failed to get connection"); + use diesel::pg::PgConnection; + + println!("Running migrations: {:?}", self.postgres_connection_string); + let mut conn = + PgConnection::establish(&self.postgres_connection_string).expect("migrations failed!"); run_pending_migrations(&mut conn).await; } @@ -708,8 +533,8 @@ impl Worker { chain_id = grpc_chain_id, "[Parser] Adding chain id to db, continue to index..." ); - execute_with_better_error( - self.db_pool.clone(), + execute_with_better_error_conn( + &mut conn, diesel::insert_into(ledger_infos::table) .values(LedgerInfo { chain_id: grpc_chain_id, @@ -725,36 +550,143 @@ impl Worker { } } +async fn fetch_transactions( + processor_name: &str, + stream_address: &str, + receiver: kanal::AsyncReceiver, + task_index: usize, +) -> TransactionsPBResponse { + let pb_channel_fetch_time = std::time::Instant::now(); + let txn_pb_res = receiver.recv().await; + // Track how much time this task spent waiting for a pb bundle + PB_CHANNEL_FETCH_WAIT_TIME_SECS + .with_label_values(&[processor_name, &task_index.to_string()]) + .set(pb_channel_fetch_time.elapsed().as_secs_f64()); + + match txn_pb_res { + Ok(txn_pb) => txn_pb, + Err(_e) => { + error!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + stream_address = stream_address, + "[Parser][T#{}] Consumer thread timed out waiting for transactions", + task_index + ); + panic!( + "[Parser][T#{}] Consumer thread timed out waiting for transactions", + task_index + ); + }, + } +} + +pub async fn do_processor( + transactions_pb: TransactionsPBResponse, + processor: &Processor, + db_chain_id: u64, + processor_name: &str, + auth_token: &str, + enable_verbose_logging: bool, +) -> Result { + let transactions_pb_slice = transactions_pb.transactions.as_slice(); + let first_txn = transactions_pb_slice.first().unwrap(); + + let last_txn = transactions_pb_slice.last().unwrap(); + + let start_version = first_txn.version; + let end_version = last_txn.version; + let txn_time = first_txn.timestamp.clone(); + + if let Some(ref t) = txn_time { + PROCESSOR_DATA_RECEIVED_LATENCY_IN_SECS + .with_label_values(&[auth_token, processor_name]) + .set(time_diff_since_pb_timestamp_in_secs(t)); + } + PROCESSOR_INVOCATIONS_COUNT + .with_label_values(&[processor_name]) + .inc(); + + if enable_verbose_logging { + info!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + start_version, + end_version, + size_in_bytes = transactions_pb.size_in_bytes, + "[Parser] Started processing one batch of transactions" + ); + } + + let processed_result = processor + .process_transactions( + transactions_pb.transactions, + start_version, + end_version, + Some(db_chain_id), + ) + .await; + + if let Some(ref t) = txn_time { + PROCESSOR_DATA_PROCESSED_LATENCY_IN_SECS + .with_label_values(&[auth_token, processor_name]) + .set(time_diff_since_pb_timestamp_in_secs(t)); + } + + processed_result +} + /// Given a config and a db pool, build a concrete instance of a processor. // As time goes on there might be other things that we need to provide to certain // processors. As that happens we can revist whether this function (which tends to // couple processors together based on their args) makes sense. -pub fn build_processor(config: &ProcessorConfig, db_pool: PgDbPool) -> Processor { +// TODO: This is not particularly easily extensible; better to refactor to use a trait, and then share one extensible config model (allowing for only one arity) +pub fn build_processor( + config: &ProcessorConfig, + per_table_chunk_sizes: AHashMap, + db_pool: PgDbPool, +) -> Processor { match config { - ProcessorConfig::AccountTransactionsProcessor => { - Processor::from(AccountTransactionsProcessor::new(db_pool)) + ProcessorConfig::AccountTransactionsProcessor => Processor::from( + AccountTransactionsProcessor::new(db_pool, per_table_chunk_sizes), + ), + ProcessorConfig::AnsProcessor(config) => Processor::from(AnsProcessor::new( + db_pool, + per_table_chunk_sizes, + config.clone(), + )), + ProcessorConfig::CoinProcessor => { + Processor::from(CoinProcessor::new(db_pool, per_table_chunk_sizes)) + }, + ProcessorConfig::DefaultProcessor => { + Processor::from(DefaultProcessor::new(db_pool, per_table_chunk_sizes)) }, - ProcessorConfig::AnsProcessor(config) => { - Processor::from(AnsProcessor::new(db_pool, config.clone())) + ProcessorConfig::EventsProcessor => { + Processor::from(EventsProcessor::new(db_pool, per_table_chunk_sizes)) }, - ProcessorConfig::CoinProcessor => Processor::from(CoinProcessor::new(db_pool)), - ProcessorConfig::DefaultProcessor => Processor::from(DefaultProcessor::new(db_pool)), - ProcessorConfig::EventsProcessor => Processor::from(EventsProcessor::new(db_pool)), ProcessorConfig::FungibleAssetProcessor => { - Processor::from(FungibleAssetProcessor::new(db_pool)) + Processor::from(FungibleAssetProcessor::new(db_pool, per_table_chunk_sizes)) }, ProcessorConfig::MonitoringProcessor => Processor::from(MonitoringProcessor::new(db_pool)), ProcessorConfig::NftMetadataProcessor(config) => { Processor::from(NftMetadataProcessor::new(db_pool, config.clone())) }, - ProcessorConfig::ObjectsProcessor => Processor::from(ObjectsProcessor::new(db_pool)), - ProcessorConfig::StakeProcessor => Processor::from(StakeProcessor::new(db_pool)), - ProcessorConfig::TokenProcessor(config) => { - Processor::from(TokenProcessor::new(db_pool, config.clone())) + ProcessorConfig::ObjectsProcessor => { + Processor::from(ObjectsProcessor::new(db_pool, per_table_chunk_sizes)) + }, + ProcessorConfig::StakeProcessor => { + Processor::from(StakeProcessor::new(db_pool, per_table_chunk_sizes)) }, - ProcessorConfig::TokenV2Processor => Processor::from(TokenV2Processor::new(db_pool)), - ProcessorConfig::UserTransactionProcessor => { - Processor::from(UserTransactionProcessor::new(db_pool)) + ProcessorConfig::TokenProcessor(config) => Processor::from(TokenProcessor::new( + db_pool, + per_table_chunk_sizes, + config.clone(), + )), + ProcessorConfig::TokenV2Processor => { + Processor::from(TokenV2Processor::new(db_pool, per_table_chunk_sizes)) }, + ProcessorConfig::UserTransactionProcessor => Processor::from( + UserTransactionProcessor::new(db_pool, per_table_chunk_sizes), + ), } }