diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 6a52e23e..e905ba11 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -506,12 +506,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.16" +version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" -dependencies = [ - "cfg-if", -] +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" [[package]] name = "crypto-common" @@ -1365,6 +1362,16 @@ dependencies = [ "simple_asn1", ] +[[package]] +name = "kanal" +version = "0.1.0-pre8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05d55519627edaf7fd0f29981f6dc03fb52df3f5b257130eb8d0bf2801ea1d7" +dependencies = [ + "futures-core", + "lock_api", +] + [[package]] name = "keccak" version = "0.1.4" @@ -1794,9 +1801,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.10" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57" +checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" [[package]] name = "pin-utils" @@ -1922,7 +1929,9 @@ dependencies = [ "google-cloud-googleapis", "google-cloud-pubsub", "hex", + "kanal", "native-tls", + "num_cpus", "once_cell", "postgres-native-tls", "prometheus", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 77dcc133..d45809a4 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -27,6 +27,7 @@ ahash = { version = "0.8.7", features = ["serde"] } anyhow = "1.0.62" async-trait = "0.1.53" aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "af0dcea7144225a709e4f595e58f8026b99e901c" } +kanal = { version = "0.1.0-pre8", features = ["async"] } backtrace = "0.3.58" base64 = "0.13.0" bb8 = "0.8.1" diff --git a/rust/processor/Cargo.toml b/rust/processor/Cargo.toml index 2274e3ea..1ab6b511 100644 --- a/rust/processor/Cargo.toml +++ b/rust/processor/Cargo.toml @@ -35,6 +35,7 @@ gcloud-sdk = { workspace = true } google-cloud-googleapis = { workspace = true } google-cloud-pubsub = { workspace = true } hex = { workspace = true } +kanal = { workspace = true } once_cell = { workspace = true } prometheus = { workspace = true } prost = { workspace = true } @@ -55,4 +56,5 @@ url = { workspace = true } # Postgres SSL native-tls = "0.2.11" +num_cpus = "1.16.0" postgres-native-tls = "0.5.0" diff --git a/rust/processor/migrations/2023-11-09-234725_delegator_balances_3/up.sql b/rust/processor/migrations/2023-11-09-234725_delegator_balances_3/up.sql index e822f960..ac6ea59d 100644 --- a/rust/processor/migrations/2023-11-09-234725_delegator_balances_3/up.sql +++ b/rust/processor/migrations/2023-11-09-234725_delegator_balances_3/up.sql @@ -1,2 +1,2 @@ -- Your SQL goes here -CREATE INDEX cdb_insat_index ON current_delegator_balances (inserted_at); +CREATE INDEX IF NOT EXISTS cdb_insat_index ON current_delegator_balances (inserted_at); diff --git a/rust/processor/src/grpc_stream.rs b/rust/processor/src/grpc_stream.rs index 253bd262..3c451f71 100644 --- a/rust/processor/src/grpc_stream.rs +++ b/rust/processor/src/grpc_stream.rs @@ -12,6 +12,7 @@ use aptos_protos::indexer::v1::{ raw_data_client::RawDataClient, GetTransactionsRequest, TransactionsResponse, }; use futures_util::StreamExt; +use kanal::AsyncSender; use prost::Message; use std::{sync::Arc, time::Duration}; use tonic::{Response, Streaming}; @@ -25,6 +26,10 @@ const GRPC_API_GATEWAY_API_KEY_HEADER: &str = "authorization"; const GRPC_REQUEST_NAME_HEADER: &str = "x-aptos-request-name"; /// GRPC connection id const GRPC_CONNECTION_ID: &str = "x-aptos-connection-id"; +/// We will try to reconnect to GRPC 5 times in case upstream connection is being updated +pub const RECONNECTION_MAX_RETRIES: u64 = 65; +/// 256MB +pub const MAX_RESPONSE_SIZE: usize = 1024 * 1024 * 256; pub fn grpc_request_builder( starting_version: u64, @@ -98,8 +103,8 @@ pub async fn get_stream( Ok(client) => client .accept_compressed(tonic::codec::CompressionEncoding::Gzip) .send_compressed(tonic::codec::CompressionEncoding::Gzip) - .max_decoding_message_size(crate::worker::MAX_RESPONSE_SIZE) - .max_encoding_message_size(crate::worker::MAX_RESPONSE_SIZE), + .max_decoding_message_size(MAX_RESPONSE_SIZE) + .max_encoding_message_size(MAX_RESPONSE_SIZE), Err(e) => { error!( processor_name = processor_name, @@ -130,6 +135,68 @@ pub async fn get_stream( .expect("[Parser] Failed to get grpc response. Is the server running?") } +pub async fn get_chain_id( + indexer_grpc_data_service_address: Url, + indexer_grpc_http2_ping_interval: Duration, + indexer_grpc_http2_ping_timeout: Duration, + auth_token: String, + processor_name: String, +) -> u64 { + info!( + processor_name = processor_name, + service_type = crate::worker::PROCESSOR_SERVICE_TYPE, + stream_address = indexer_grpc_data_service_address.to_string(), + "[Parser] Connecting to GRPC stream to get chain id", + ); + let response = get_stream( + indexer_grpc_data_service_address.clone(), + indexer_grpc_http2_ping_interval, + indexer_grpc_http2_ping_timeout, + 1, + Some(2), + auth_token.clone(), + processor_name.to_string(), + ) + .await; + let connection_id = match response.metadata().get(GRPC_CONNECTION_ID) { + Some(connection_id) => connection_id.to_str().unwrap().to_string(), + None => "".to_string(), + }; + let mut resp_stream = response.into_inner(); + info!( + processor_name = processor_name, + service_type = crate::worker::PROCESSOR_SERVICE_TYPE, + stream_address = indexer_grpc_data_service_address.to_string(), + connection_id, + "[Parser] Successfully connected to GRPC stream to get chain id", + ); + + match resp_stream.next().await { + Some(Ok(r)) => r.chain_id.expect("[Parser] Chain Id doesn't exist."), + Some(Err(rpc_error)) => { + error!( + processor_name = processor_name, + service_type = crate::worker::PROCESSOR_SERVICE_TYPE, + stream_address = indexer_grpc_data_service_address.to_string(), + connection_id, + error = ?rpc_error, + "[Parser] Error receiving datastream response for chain id" + ); + panic!("[Parser] Error receiving datastream response for chain id"); + }, + None => { + error!( + processor_name = processor_name, + service_type = crate::worker::PROCESSOR_SERVICE_TYPE, + stream_address = indexer_grpc_data_service_address.to_string(), + connection_id, + "[Parser] Stream ended before getting response fo for chain id" + ); + panic!("[Parser] Stream ended before getting response fo for chain id"); + }, + } +} + /// Gets a batch of transactions from the stream. Batch size is set in the grpc server. /// The number of batches depends on our config /// There could be several special scenarios: @@ -137,7 +204,7 @@ pub async fn get_stream( /// 2. If we specified an end version and we hit that, we will stop fetching, but we will make sure that /// all existing transactions are processed pub async fn create_fetcher_loop( - txn_sender: tokio::sync::mpsc::Sender, + txn_sender: AsyncSender, indexer_grpc_data_service_address: Url, indexer_grpc_http2_ping_interval: Duration, indexer_grpc_http2_ping_timeout: Duration, @@ -184,6 +251,8 @@ pub async fn create_fetcher_loop( "[Parser] Successfully connected to GRPC stream", ); + let mut last_fetched_version = batch_start_version as i64 - 1; + let mut batch_start_version = batch_start_version; loop { let is_success = match resp_stream.next().await { Some(Ok(r)) => { @@ -224,6 +293,21 @@ pub async fn create_fetcher_loop( "{}", ProcessorStep::ReceivedTxnsFromGrpc.get_label(), ); + + let current_fetched_version = start_version; + // TODO: FIX THIS NONSENSICAL ERROR + if last_fetched_version + 1 != current_fetched_version as i64 { + error!( + batch_start_version = batch_start_version, + last_fetched_version = last_fetched_version, + current_fetched_version = current_fetched_version, + "[Parser] Received batch with gap from GRPC stream" + ); + // panic!("[Parser] Received batch with gap from GRPC stream"); + } + last_fetched_version = end_version as i64; + batch_start_version = (last_fetched_version + 1) as u64; + LATEST_PROCESSED_VERSION .with_label_values(&[ &processor_name, @@ -382,7 +466,7 @@ pub async fn create_fetcher_loop( // TODO: Turn this into exponential backoff tokio::time::sleep(std::time::Duration::from_millis(100)).await; - if reconnection_retries >= crate::worker::RECONNECTION_MAX_RETRIES { + if reconnection_retries >= RECONNECTION_MAX_RETRIES { error!( processor_name = processor_name, service_type = crate::worker::PROCESSOR_SERVICE_TYPE, diff --git a/rust/processor/src/main.rs b/rust/processor/src/main.rs index dc1f4659..62e4de53 100644 --- a/rust/processor/src/main.rs +++ b/rust/processor/src/main.rs @@ -6,9 +6,24 @@ use clap::Parser; use processor::IndexerGrpcProcessorConfig; use server_framework::ServerArgs; -#[tokio::main(flavor = "multi_thread")] -async fn main() -> Result<()> { - let args = ServerArgs::parse(); - args.run::(tokio::runtime::Handle::current()) - .await +const RUNTIME_WORKER_MULTIPLIER: usize = 2; + +fn main() -> Result<()> { + let num_cpus = num_cpus::get(); + let worker_threads = (num_cpus * RUNTIME_WORKER_MULTIPLIER).max(16); + println!( + "[Processor] Starting processor tokio runtime: num_cpus={}, worker_threads={}", + num_cpus, worker_threads + ); + + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(worker_threads) + .build() + .unwrap() + .block_on(async { + let args = ServerArgs::parse(); + args.run::(tokio::runtime::Handle::current()) + .await + }) } diff --git a/rust/processor/src/processors/mod.rs b/rust/processor/src/processors/mod.rs index 6242d7f5..76536584 100644 --- a/rust/processor/src/processors/mod.rs +++ b/rust/processor/src/processors/mod.rs @@ -34,18 +34,12 @@ use self::{ token_v2_processor::TokenV2Processor, user_transaction_processor::UserTransactionProcessor, }; -use crate::{ - models::processor_status::ProcessorStatus, - schema::processor_status, - utils::{ - counters::{GOT_CONNECTION_COUNT, UNABLE_TO_GET_CONNECTION_COUNT}, - database::{execute_with_better_error, PgDbPool, PgPoolConnection}, - util::parse_timestamp, - }, +use crate::utils::{ + counters::{GOT_CONNECTION_COUNT, UNABLE_TO_GET_CONNECTION_COUNT}, + database::{PgDbPool, PgPoolConnection}, }; use aptos_protos::transaction::v1::Transaction as ProtoTransaction; use async_trait::async_trait; -use diesel::{pg::upsert::excluded, prelude::*}; use enum_dispatch::enum_dispatch; use serde::{Deserialize, Serialize}; use std::{fmt::Debug, sync::Arc}; @@ -115,9 +109,10 @@ pub trait ProcessorTrait: Send + Sync + Debug { /// versions are successful because any gap would cause the processor to panic async fn update_last_processed_version( &self, - version: u64, - last_transaction_timestamp: Option, + _version: u64, + _last_transaction_timestamp: Option, ) -> anyhow::Result<()> { + /* let timestamp = last_transaction_timestamp.map(|t| parse_timestamp(&t, version as i64)); let status = ProcessorStatus { processor: self.name().to_string(), @@ -139,7 +134,7 @@ pub trait ProcessorTrait: Send + Sync + Debug { )), Some(" WHERE processor_status.last_success_version <= EXCLUDED.last_success_version "), ) - .await?; + .await?;*/ Ok(()) } } diff --git a/rust/processor/src/utils/counters.rs b/rust/processor/src/utils/counters.rs index 06e38494..84e73c0e 100644 --- a/rust/processor/src/utils/counters.rs +++ b/rust/processor/src/utils/counters.rs @@ -145,6 +145,16 @@ pub static PROCESSED_BYTES_COUNT: Lazy = Lazy::new(|| { .unwrap() }); +/// The amount of time that a thread spent waiting for a protobuf bundle of transactions +pub static PB_CHANNEL_FETCH_WAIT_TIME_SECS: Lazy = Lazy::new(|| { + register_gauge_vec!( + "indexer_processor_pb_channel_fetch_wait_time_secs", + "Count of bytes processed", + &["processor_name", "thread_number"] + ) + .unwrap() +}); + /// Count of transactions processed. pub static NUM_TRANSACTIONS_PROCESSED_COUNT: Lazy = Lazy::new(|| { register_int_counter_vec!( diff --git a/rust/processor/src/utils/database.rs b/rust/processor/src/utils/database.rs index f41e38d6..0c0f1a5c 100644 --- a/rust/processor/src/utils/database.rs +++ b/rust/processor/src/utils/database.rs @@ -45,7 +45,7 @@ pub const MAX_DIESEL_PARAM_SIZE: u16 = u16::MAX / 2; /// we may need to chunk an array of items based on how many columns are in the table. /// This function returns boundaries of chunks in the form of (start_index, end_index) pub fn get_chunks(num_items_to_insert: usize, _column_count: usize) -> Vec<(usize, usize)> { - let max_item_size = 100; // MAX_DIESEL_PARAM_SIZE as usize / column_count; + let max_item_size = 40; // MAX_DIESEL_PARAM_SIZE as usize / column_count; let mut chunk: (usize, usize) = (0, min(num_items_to_insert, max_item_size)); let mut chunks = vec![chunk]; while chunk.1 != num_items_to_insert { @@ -205,15 +205,17 @@ where } pub async fn execute_in_chunks( - conn: PgDbPool, - build_query: fn(Vec) -> (U, Option<&'static str>), - items_to_insert: &[T], - chunk_size: usize, + _conn: PgDbPool, + _build_query: fn(Vec) -> (U, Option<&'static str>), + _items_to_insert: &[T], + _chunk_size: usize, ) -> Result<(), diesel::result::Error> where U: QueryFragment + diesel::query_builder::QueryId + Send, T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone, { + // TEMPORARILY SKIP ACTUALLY WRITING, SO WE CAN TEST THE FETCHING SPEED + /* let chunks = get_chunks(items_to_insert.len(), chunk_size); let futures = chunks.into_iter().map(|(start_ind, end_ind)| { @@ -230,10 +232,11 @@ where }); for res in futures_util::future::join_all(futures).await { res?; - } + }*/ Ok(()) } +#[allow(dead_code)] async fn execute_or_retry_cleaned( conn: PgDbPool, build_query: fn(Vec) -> (U, Option<&'static str>), diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 43c3c2e8..68a3b665 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -17,9 +17,10 @@ use crate::{ utils::{ counters::{ ProcessorStep, LATEST_PROCESSED_VERSION, MULTI_BATCH_PROCESSING_TIME_IN_SECS, - NUM_TRANSACTIONS_PROCESSED_COUNT, PROCESSED_BYTES_COUNT, - PROCESSOR_DATA_PROCESSED_LATENCY_IN_SECS, PROCESSOR_DATA_RECEIVED_LATENCY_IN_SECS, - PROCESSOR_ERRORS_COUNT, PROCESSOR_INVOCATIONS_COUNT, PROCESSOR_SUCCESSES_COUNT, + NUM_TRANSACTIONS_PROCESSED_COUNT, PB_CHANNEL_FETCH_WAIT_TIME_SECS, + PROCESSED_BYTES_COUNT, PROCESSOR_DATA_PROCESSED_LATENCY_IN_SECS, + PROCESSOR_DATA_RECEIVED_LATENCY_IN_SECS, PROCESSOR_ERRORS_COUNT, + PROCESSOR_INVOCATIONS_COUNT, PROCESSOR_SUCCESSES_COUNT, SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS, SINGLE_BATCH_PARSING_TIME_IN_SECS, SINGLE_BATCH_PROCESSING_TIME_IN_SECS, TRANSACTION_UNIX_TIMESTAMP, }, @@ -32,7 +33,7 @@ use aptos_moving_average::MovingAverage; use aptos_protos::transaction::v1::Transaction; use diesel::Connection; use std::{sync::Arc, time::Duration}; -use tokio::{sync::mpsc::error::TryRecvError, time::timeout}; +use tokio::{sync::Mutex, time::timeout}; use tracing::{error, info}; use url::Url; @@ -40,10 +41,6 @@ use url::Url; // of 50 means that we could potentially have at least 4.8GB of data in memory at any given time and that we should provision // machines accordingly. pub const BUFFER_SIZE: usize = 100; -// 40MB -pub const MAX_RESPONSE_SIZE: usize = 1024 * 1024 * 40; -// We will try to reconnect to GRPC 5 times in case upstream connection is being updated -pub const RECONNECTION_MAX_RETRIES: u64 = 65; // Consumer thread will wait X seconds before panicking if it doesn't receive any data pub const CONSUMER_THREAD_TIMEOUT_IN_SECS: u64 = 60 * 5; pub const PROCESSOR_SERVICE_TYPE: &str = "processor"; @@ -162,12 +159,26 @@ impl Worker { let concurrent_tasks = self.number_concurrent_processing_tasks; // Build the processor based on the config. - let processor = build_processor(&self.processor_config, self.db_pool.clone()); - let processor = Arc::new(processor); + let processor = Arc::new(build_processor( + &self.processor_config, + self.db_pool.clone(), + )); + + // get the chain id + let chain_id = crate::grpc_stream::get_chain_id( + self.indexer_grpc_data_service_address.clone(), + self.grpc_http2_config.grpc_http2_ping_interval_in_secs(), + self.grpc_http2_config.grpc_http2_ping_timeout_in_secs(), + self.auth_token.clone(), + processor_name.to_string(), + ) + .await; + self.check_or_update_chain_id(chain_id as i64) + .await + .unwrap(); // This is the moving average that we use to calculate TPS - let mut ma = MovingAverage::new(10); - let mut batch_start_version = starting_version; + let ma = Arc::new(Mutex::new(MovingAverage::new(10))); let ending_version = self.ending_version; let indexer_grpc_data_service_address = self.indexer_grpc_data_service_address.clone(); @@ -178,7 +189,7 @@ impl Worker { // Create a transaction fetcher thread that will continuously fetch transactions from the GRPC stream // and write into a channel // The each item will be (chain_id, batch of transactions) - let (tx, mut receiver) = tokio::sync::mpsc::channel::(BUFFER_SIZE); + let (tx, receiver) = kanal::bounded_async::(BUFFER_SIZE); let request_ending_version = self.ending_version; let auth_token = self.auth_token.clone(); tokio::spawn(async move { @@ -186,19 +197,20 @@ impl Worker { processor_name = processor_name, service_type = PROCESSOR_SERVICE_TYPE, end_version = ending_version, - start_version = batch_start_version, + start_version = starting_version, "[Parser] Starting fetcher thread" ); + crate::grpc_stream::create_fetcher_loop( - tx, - indexer_grpc_data_service_address, + tx.clone(), + indexer_grpc_data_service_address.clone(), indexer_grpc_http2_ping_interval, indexer_grpc_http2_ping_timeout, starting_version, request_ending_version, - auth_token, + auth_token.clone(), processor_name.to_string(), - batch_start_version, + starting_version, BUFFER_SIZE, ) .await @@ -210,297 +222,275 @@ impl Worker { // 3. We have received either an empty batch or a batch with a gap. We should panic. // 4. We have not received anything in X seconds, we should panic. // 5. If it's the wrong chain, panic. - let mut db_chain_id = None; - loop { - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - stream_address = self.indexer_grpc_data_service_address.as_str(), - "[Parser] Fetching transaction batches from channel", - ); - let txn_channel_fetch_latency = std::time::Instant::now(); - let mut transactions_batches = vec![]; - let mut last_fetched_version = batch_start_version as i64 - 1; - for task_index in 0..concurrent_tasks { - let receive_status = match task_index { - 0 => { - // If we're the first task, we should wait until we get data. If `None`, it means the channel is closed. - match timeout( - Duration::from_secs(CONSUMER_THREAD_TIMEOUT_IN_SECS), - receiver.recv(), - ) - .await - { - Ok(result) => result.ok_or(TryRecvError::Disconnected), - Err(_) => { + + info!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + stream_address = self.indexer_grpc_data_service_address.as_str(), + "[Parser] Fetching transaction batches from channel", + ); + let txn_channel_fetch_latency = std::time::Instant::now(); + + let mut tasks = vec![]; + for task_index in 0..concurrent_tasks { + let auth_token = self.auth_token.clone(); + let stream_address = self.indexer_grpc_data_service_address.to_string(); + let processor_clone = processor.clone(); + let ma_clone = ma.clone(); + let receiver_clone = receiver.clone(); + + let join_handle = tokio::spawn(async move { + loop { + let pb_channel_fetch_time = std::time::Instant::now(); + let txn_pb_res = timeout( + Duration::from_secs(CONSUMER_THREAD_TIMEOUT_IN_SECS), + receiver_clone.recv(), + ) + .await; + // Track how much time this thread spent waiting for a pb bundle + PB_CHANNEL_FETCH_WAIT_TIME_SECS + .with_label_values(&[processor_name, &task_index.to_string()]) + .set(pb_channel_fetch_time.elapsed().as_secs_f64()); + + let txn_pb = match txn_pb_res { + Ok(txn_pb) => match txn_pb { + Ok(txn_pb) => txn_pb, + // This happens when the channel is closed. We should panic. + Err(_e) => { error!( processor_name = processor_name, service_type = PROCESSOR_SERVICE_TYPE, - stream_address = - self.indexer_grpc_data_service_address.as_str(), - "[Parser] Consumer thread timed out waiting for transactions", - ); - panic!( - "[Parser] Consumer thread timed out waiting for transactions" + stream_address = stream_address, + "[Parser][T#{}] Channel closed; stream ended.", + task_index ); + panic!("[Parser][T#{}] Channel closed", task_index); }, - } - // If we're the first task, we should wait until we get data. If `None`, it means the channel is closed. - // receiver.recv().await.ok_or(TryRecvError::Disconnected) - }, - _ => { - // If we're not the first task, we should poll to see if we get any data. - receiver.try_recv() - }, - }; - match receive_status { - Ok(txn_pb) => { - if let Some(existing_id) = db_chain_id { - if txn_pb.chain_id != existing_id { - error!( - processor_name = processor_name, - stream_address = - self.indexer_grpc_data_service_address.as_str(), - chain_id = txn_pb.chain_id, - existing_id = existing_id, - "[Parser] Stream somehow changed chain id!", - ); - panic!("[Parser] Stream somehow changed chain id!"); - } - } else { - db_chain_id = Some( - self.check_or_update_chain_id(txn_pb.chain_id as i64) - .await - .unwrap(), - ); - } - let current_fetched_version = - txn_pb.transactions.as_slice().first().unwrap().version; - if last_fetched_version + 1 != current_fetched_version as i64 { + }, + Err(_e) => { error!( - batch_start_version = batch_start_version, - last_fetched_version = last_fetched_version, - current_fetched_version = current_fetched_version, - "[Parser] Received batch with gap from GRPC stream" + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + stream_address = stream_address, + "[Parser][T#{}] Consumer thread timed out waiting for transactions", + task_index + ); + panic!( + "[Parser][T#{}] Consumer thread timed out waiting for transactions", + task_index ); - panic!("[Parser] Received batch with gap from GRPC stream"); - } - last_fetched_version = - txn_pb.transactions.as_slice().last().unwrap().version as i64; - transactions_batches.push(txn_pb); - }, - // Channel is empty and send is not drpped which we definitely expect. Wait for a bit and continue polling. - Err(TryRecvError::Empty) => { - break; - }, - // This happens when the channel is closed. We should panic. - Err(TryRecvError::Disconnected) => { + }, + }; + + let size_in_bytes = txn_pb.size_in_bytes as f64; + let first_txn = txn_pb.transactions.as_slice().first().unwrap(); + let first_txn_version = first_txn.version; + let first_txn_timestamp = first_txn.timestamp.clone(); + let last_txn = txn_pb.transactions.as_slice().last().unwrap(); + let last_txn_version = last_txn.version; + let last_txn_timestamp = last_txn.timestamp.clone(); + + info!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + start_version = first_txn_version, + end_version = last_txn_version, + num_of_transactions = (last_txn_version - first_txn_version) as i64 + 1, + size_in_bytes, + duration_in_secs = txn_channel_fetch_latency.elapsed().as_secs_f64(), + tps = (last_txn_version as f64 - first_txn_version as f64) + / txn_channel_fetch_latency.elapsed().as_secs_f64(), + bytes_per_sec = + size_in_bytes / txn_channel_fetch_latency.elapsed().as_secs_f64(), + "[Parser][T#{}] Successfully fetched transaction batches from channel.", + task_index + ); + + // Ensure chain_id has not changed + if txn_pb.chain_id != chain_id { error!( processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - stream_address = self.indexer_grpc_data_service_address.as_str(), - "[Parser] Channel closed; stream ended." + stream_address = stream_address, + chain_id = txn_pb.chain_id, + existing_id = chain_id, + "[Parser][T#{}] Stream somehow changed chain id!", + task_index ); - panic!("[Parser] Channel closed"); - }, - } - } + panic!( + "[Parser][T#{}] Stream somehow changed chain id!", + task_index + ); + } - let size_in_bytes = transactions_batches - .iter() - .fold(0.0, |acc, txn_batch| acc + txn_batch.size_in_bytes as f64); - let batch_start_txn_timestamp = transactions_batches - .first() - .unwrap() - .transactions - .as_slice() - .first() - .unwrap() - .timestamp - .clone(); - let batch_end_txn_timestamp = transactions_batches - .last() - .unwrap() - .transactions - .as_slice() - .last() - .unwrap() - .timestamp - .clone(); - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version = batch_start_version, - end_version = last_fetched_version, - num_of_transactions = last_fetched_version - batch_start_version as i64 + 1, - size_in_bytes, - duration_in_secs = txn_channel_fetch_latency.elapsed().as_secs_f64(), - tps = (last_fetched_version as f64 - batch_start_version as f64) - / txn_channel_fetch_latency.elapsed().as_secs_f64(), - bytes_per_sec = size_in_bytes / txn_channel_fetch_latency.elapsed().as_secs_f64(), - "[Parser] Successfully fetched transaction batches from channel." - ); + let processing_time = std::time::Instant::now(); - // Process the transactions in parallel - let mut tasks = vec![]; - for transactions_pb in transactions_batches { - let processor_clone = processor.clone(); - let auth_token = self.auth_token.clone(); - let task = tokio::spawn(async move { - do_processor( - transactions_pb, - processor_clone, - db_chain_id, + let res = do_processor( + txn_pb, + processor_clone.clone(), + chain_id, processor_name, &auth_token, enable_verbose_logging, ) - .await - }); - tasks.push(task); - } - let processing_time = std::time::Instant::now(); - let task_count = tasks.len(); - let batches = match futures::future::try_join_all(tasks).await { - Ok(res) => res, - Err(err) => panic!("[Parser] Error processing transaction batches: {:?}", err), - }; - - // Update states depending on results of the batch processing - let mut processed_versions = vec![]; - for res in batches { - let processed: ProcessingResult = match res { - Ok(versions) => { - PROCESSOR_SUCCESSES_COUNT - .with_label_values(&[processor_name]) - .inc(); - versions - }, - Err(e) => { - error!( - processor_name = processor_name, - stream_address = self.indexer_grpc_data_service_address.to_string(), - error = ?e, - "[Parser] Error processing transactions" - ); - PROCESSOR_ERRORS_COUNT - .with_label_values(&[processor_name]) - .inc(); - panic!( - "[Parser] Error processing '{:}' transactions: {:?}", - processor_name, e - ); - }, - }; - processed_versions.push(processed); - } - - // Make sure there are no gaps and advance states - processed_versions.sort_by(|a, b| a.start_version.cmp(&b.start_version)); - let mut prev_start = None; - let mut prev_end = None; - let mut max_processing_duration_in_secs: f64 = 0.0; - let mut max_db_insertion_duration_in_secs: f64 = 0.0; - let processed_versions_sorted = processed_versions.clone(); - for processing_result in processed_versions { - let start = processing_result.start_version; - let end = processing_result.end_version; - max_processing_duration_in_secs = max_processing_duration_in_secs - .max(processing_result.processing_duration_in_secs); - max_db_insertion_duration_in_secs = max_db_insertion_duration_in_secs - .max(processing_result.db_insertion_duration_in_secs); - if prev_start.is_none() { - prev_start = Some(start); - prev_end = Some(end); - } else { - if prev_end.unwrap() + 1 != start { - error!( - processor_name = processor_name, - stream_address = self.indexer_grpc_data_service_address.to_string(), - processed_versions = processed_versions_sorted - .iter() - .map(|result| format!( - "{}-{}", - result.start_version, result.end_version - )) - .collect::>() - .join(", "), - "[Parser] Gaps in processing stream" + .await; + + let _processed = match res { + Ok(versions) => { + PROCESSOR_SUCCESSES_COUNT + .with_label_values(&[processor_name]) + .inc(); + versions + }, + Err(e) => { + error!( + processor_name = processor_name, + stream_address = stream_address, + error = ?e, + "[Parser][T#{}] Error processing transactions", task_index + ); + PROCESSOR_ERRORS_COUNT + .with_label_values(&[processor_name]) + .inc(); + panic!( + "[Parser][T#{}] Error processing '{:}' transactions: {:?}", + task_index, processor_name, e + ); + }, + }; + + /* + let mut max_processing_duration_in_secs: f64 = processed.processing_duration_in_secs; + let mut max_db_insertion_duration_in_secs: f64 = processed.db_insertion_duration_in_secs; + max_processing_duration_in_secs = + max_processing_duration_in_secs.max(processed.processing_duration_in_secs); + max_db_insertion_duration_in_secs = + max_db_insertion_duration_in_secs.max(processed.db_insertion_duration_in_secs); + */ + + processor_clone + .update_last_processed_version(last_txn_version, last_txn_timestamp.clone()) + .await + .unwrap(); + + let mut ma_locked = ma_clone.lock().await; + ma_locked.tick_now(last_txn_version - first_txn_version + 1); + let tps = (ma_locked.avg() * 1000.0) as u64; + info!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + start_version = first_txn_version, + end_version = last_txn_version, + start_txn_timestamp_iso = first_txn_timestamp + .clone() + .map(|t| timestamp_to_iso(&t)) + .unwrap_or_default(), + end_txn_timestamp_iso = last_txn_timestamp + .map(|t| timestamp_to_iso(&t)) + .unwrap_or_default(), + num_of_transactions = last_txn_version - first_txn_version + 1, + task_count = concurrent_tasks, + size_in_bytes, + duration_in_secs = processing_time.elapsed().as_secs_f64(), + tps = tps, + bytes_per_sec = size_in_bytes / processing_time.elapsed().as_secs_f64(), + step = ProcessorStep::ProcessedMultipleBatches.get_step(), + "{}", + ProcessorStep::ProcessedMultipleBatches.get_label(), + ); + LATEST_PROCESSED_VERSION + .with_label_values(&[ + processor_name, + ProcessorStep::ProcessedMultipleBatches.get_step(), + ProcessorStep::ProcessedMultipleBatches.get_label(), + ]) + .set(last_txn_version as i64); + // TODO: do an atomic thing, or ideally move to an async metrics collector! + TRANSACTION_UNIX_TIMESTAMP + .with_label_values(&[ + processor_name, + ProcessorStep::ProcessedMultipleBatches.get_step(), + ProcessorStep::ProcessedMultipleBatches.get_label(), + ]) + .set( + first_txn_timestamp + .map(|t| timestamp_to_unixtime(&t)) + .unwrap_or_default(), ); - panic!(); - } - prev_start = Some(start); - prev_end = Some(end); + PROCESSED_BYTES_COUNT + .with_label_values(&[ + processor_name, + ProcessorStep::ProcessedMultipleBatches.get_step(), + ProcessorStep::ProcessedMultipleBatches.get_label(), + ]) + .inc_by(size_in_bytes as u64); + NUM_TRANSACTIONS_PROCESSED_COUNT + .with_label_values(&[ + processor_name, + ProcessorStep::ProcessedMultipleBatches.get_step(), + ProcessorStep::ProcessedMultipleBatches.get_label(), + ]) + .inc_by(last_txn_version - first_txn_version + 1); + MULTI_BATCH_PROCESSING_TIME_IN_SECS + .with_label_values(&[processor_name]) + .set(processing_time.elapsed().as_secs_f64()); } - } - let batch_start = processed_versions_sorted.first().unwrap().start_version; - let batch_end = processed_versions_sorted.last().unwrap().end_version; - batch_start_version = batch_end + 1; + }); - processor - .update_last_processed_version(batch_end, batch_end_txn_timestamp.clone()) - .await - .unwrap(); + tasks.push(join_handle); + } - ma.tick_now(batch_end - batch_start + 1); - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version = batch_start, - end_version = batch_end, - start_txn_timestamp_iso = batch_start_txn_timestamp - .clone() - .map(|t| timestamp_to_iso(&t)) - .unwrap_or_default(), - end_txn_timestamp_iso = batch_end_txn_timestamp - .map(|t| timestamp_to_iso(&t)) - .unwrap_or_default(), - num_of_transactions = batch_end - batch_start + 1, - task_count, - size_in_bytes, - duration_in_secs = processing_time.elapsed().as_secs_f64(), - tps = (ma.avg() * 1000.0) as u64, - bytes_per_sec = size_in_bytes / processing_time.elapsed().as_secs_f64(), - step = ProcessorStep::ProcessedMultipleBatches.get_step(), - "{}", - ProcessorStep::ProcessedMultipleBatches.get_label(), - ); - LATEST_PROCESSED_VERSION - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .set(batch_end as i64); - TRANSACTION_UNIX_TIMESTAMP - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .set( - batch_start_txn_timestamp - .map(|t| timestamp_to_unixtime(&t)) - .unwrap_or_default(), - ); - PROCESSED_BYTES_COUNT - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .inc_by(size_in_bytes as u64); - NUM_TRANSACTIONS_PROCESSED_COUNT - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .inc_by(batch_end - batch_start + 1); - MULTI_BATCH_PROCESSING_TIME_IN_SECS - .with_label_values(&[processor_name]) - .set(processing_time.elapsed().as_secs_f64()); + // This will be forever, until all the indexer dies + // This will be forever, until all the indexer dies + // This will be forever, until all the indexer dies + futures::future::try_join_all(tasks) + .await + .expect("[Processor] Processor tasks have died"); + + // Update states depending on results of the batch processing + // let mut processed_versions = vec![]; + + // TODO: MAKE THIS GAP HANDLING CODE WORK FOR OUR NEW ASYNC WORLD! + /* + // Make sure there are no gaps and advance states + processed_versions.sort_by(|a, b| a.start_version.cmp(&b.start_version)); + let mut prev_start = None; + let mut prev_end = None; + let mut max_processing_duration_in_secs: f64 = 0.0; + let mut max_db_insertion_duration_in_secs: f64 = 0.0; + let processed_versions_sorted = processed_versions.clone(); + for processing_result in processed_versions { + let start = processing_result.start_version; + let end = processing_result.end_version; + max_processing_duration_in_secs = + max_processing_duration_in_secs.max(processing_result.processing_duration_in_secs); + max_db_insertion_duration_in_secs = max_db_insertion_duration_in_secs + .max(processing_result.db_insertion_duration_in_secs); + + if prev_start.is_none() { + prev_start = Some(start); + prev_end = Some(end); + } else { + if prev_end.unwrap() + 1 != start { + error!( + processor_name = processor_name, + stream_address = self.indexer_grpc_data_service_address.to_string(), + processed_versions = processed_versions_sorted + .iter() + .map(|result| format!( + "{}-{}", + result.start_version, result.end_version + )) + .collect::>() + .join(", "), + "[Parser] Gaps in processing stream" + ); + panic!(); + } + prev_start = Some(start); + prev_end = Some(end); + } } + */ } async fn run_migrations(&self) { @@ -571,7 +561,7 @@ impl Worker { pub async fn do_processor( transactions_pb: TransactionsPBResponse, processor: Arc, - db_chain_id: Option, + db_chain_id: u64, processor_name: &str, auth_token: &str, enable_verbose_logging: bool, @@ -614,8 +604,9 @@ pub async fn do_processor( transactions_pb.transactions, start_version, end_version, - db_chain_id, - ) // TODO: Change how we fetch chain_id, ideally can be accessed by processors when they are initiallized (e.g. so they can have a chain_id field set on new() funciton) + // TODO: Change how we fetch chain_id, ideally can be accessed by processors when they are initialized (e.g. so they can have a chain_id field set on new() funciton) + Some(db_chain_id), + ) .await; if let Some(ref t) = txn_time { PROCESSOR_DATA_PROCESSED_LATENCY_IN_SECS