From 6ccc1c644086912f0fe77d8417fbd7267a41bb74 Mon Sep 17 00:00:00 2001 From: CapCap Date: Mon, 22 Jan 2024 21:11:39 -0800 Subject: [PATCH 01/28] max kanal --- rust/processor/Cargo.toml | 1 + rust/processor/src/grpc_stream.rs | 3 +- rust/processor/src/worker.rs | 174 ++++++++++++++---------------- 3 files changed, 84 insertions(+), 94 deletions(-) diff --git a/rust/processor/Cargo.toml b/rust/processor/Cargo.toml index 2274e3ea..47e02a34 100644 --- a/rust/processor/Cargo.toml +++ b/rust/processor/Cargo.toml @@ -17,6 +17,7 @@ ahash = { workspace = true } anyhow = { workspace = true } aptos-moving-average = { workspace = true } aptos-protos = { workspace = true } +kanal = { workspace = true } async-trait = { workspace = true } base64 = { workspace = true } bcs = { workspace = true } diff --git a/rust/processor/src/grpc_stream.rs b/rust/processor/src/grpc_stream.rs index 253bd262..c898a712 100644 --- a/rust/processor/src/grpc_stream.rs +++ b/rust/processor/src/grpc_stream.rs @@ -12,6 +12,7 @@ use aptos_protos::indexer::v1::{ raw_data_client::RawDataClient, GetTransactionsRequest, TransactionsResponse, }; use futures_util::StreamExt; +use kanal::AsyncSender; use prost::Message; use std::{sync::Arc, time::Duration}; use tonic::{Response, Streaming}; @@ -137,7 +138,7 @@ pub async fn get_stream( /// 2. If we specified an end version and we hit that, we will stop fetching, but we will make sure that /// all existing transactions are processed pub async fn create_fetcher_loop( - txn_sender: tokio::sync::mpsc::Sender, + txn_sender: AsyncSender, indexer_grpc_data_service_address: Url, indexer_grpc_http2_ping_interval: Duration, indexer_grpc_http2_ping_timeout: Duration, diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 43c3c2e8..29875bff 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -32,7 +32,7 @@ use aptos_moving_average::MovingAverage; use aptos_protos::transaction::v1::Transaction; use diesel::Connection; use std::{sync::Arc, time::Duration}; -use tokio::{sync::mpsc::error::TryRecvError, time::timeout}; +use tokio::time::timeout; use tracing::{error, info}; use url::Url; @@ -178,7 +178,7 @@ impl Worker { // Create a transaction fetcher thread that will continuously fetch transactions from the GRPC stream // and write into a channel // The each item will be (chain_id, batch of transactions) - let (tx, mut receiver) = tokio::sync::mpsc::channel::(BUFFER_SIZE); + let (tx, mut receiver) = kanal::bounded_async::(BUFFER_SIZE); let request_ending_version = self.ending_version; let auth_token = self.auth_token.clone(); tokio::spawn(async move { @@ -201,7 +201,7 @@ impl Worker { batch_start_version, BUFFER_SIZE, ) - .await + .await }); // This is the consumer side of the channel. These are the major states: @@ -211,100 +211,88 @@ impl Worker { // 4. We have not received anything in X seconds, we should panic. // 5. If it's the wrong chain, panic. let mut db_chain_id = None; - loop { - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - stream_address = self.indexer_grpc_data_service_address.as_str(), - "[Parser] Fetching transaction batches from channel", - ); - let txn_channel_fetch_latency = std::time::Instant::now(); - let mut transactions_batches = vec![]; - let mut last_fetched_version = batch_start_version as i64 - 1; - for task_index in 0..concurrent_tasks { - let receive_status = match task_index { - 0 => { - // If we're the first task, we should wait until we get data. If `None`, it means the channel is closed. - match timeout( - Duration::from_secs(CONSUMER_THREAD_TIMEOUT_IN_SECS), - receiver.recv(), - ) - .await - { - Ok(result) => result.ok_or(TryRecvError::Disconnected), - Err(_) => { - error!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - stream_address = - self.indexer_grpc_data_service_address.as_str(), - "[Parser] Consumer thread timed out waiting for transactions", - ); - panic!( - "[Parser] Consumer thread timed out waiting for transactions" - ); - }, - } - // If we're the first task, we should wait until we get data. If `None`, it means the channel is closed. - // receiver.recv().await.ok_or(TryRecvError::Disconnected) - }, - _ => { - // If we're not the first task, we should poll to see if we get any data. - receiver.try_recv() - }, - }; - match receive_status { - Ok(txn_pb) => { - if let Some(existing_id) = db_chain_id { - if txn_pb.chain_id != existing_id { - error!( + info!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + stream_address = self.indexer_grpc_data_service_address.as_str(), + "[Parser] Fetching transaction batches from channel", + ); + let txn_channel_fetch_latency = std::time::Instant::now(); + let mut transactions_batches = vec![]; + let mut last_fetched_version = batch_start_version as i64 - 1; + + // Make a new thread that copies from one channel, into another: if it doesn't see any new data for + // `CONSUMER_THREAD_TIMEOUT_IN_SECS`: panic + let stream_address = self.indexer_grpc_data_service_address.to_string(); + let processor_name_string = processor_name.to_string(); + + let tasks = vec![]; + for task_index in (0..concurrent_tasks) { + let join_handle = tokio::spawn(async move { + match timeout( + Duration::from_secs(CONSUMER_THREAD_TIMEOUT_IN_SECS), + receiver.recv(), + ) + .await + { + let txn_pb = Ok(txn_pb) => match txn_pb { + Ok(txn_pb) => { + if let Some(existing_id) = db_chain_id { + if txn_pb.chain_id != existing_id { + error!( processor_name = processor_name, - stream_address = - self.indexer_grpc_data_service_address.as_str(), + stream_address = stream_address, chain_id = txn_pb.chain_id, existing_id = existing_id, "[Parser] Stream somehow changed chain id!", ); - panic!("[Parser] Stream somehow changed chain id!"); + panic!("[Parser] Stream somehow changed chain id!"); + } + } else { + db_chain_id = Some( + self.check_or_update_chain_id(txn_pb.chain_id as i64) + .await + .unwrap(), + ); } - } else { - db_chain_id = Some( - self.check_or_update_chain_id(txn_pb.chain_id as i64) - .await - .unwrap(), - ); - } - let current_fetched_version = - txn_pb.transactions.as_slice().first().unwrap().version; - if last_fetched_version + 1 != current_fetched_version as i64 { - error!( + let current_fetched_version = + txn_pb.transactions.as_slice().first().unwrap().version; + if last_fetched_version + 1 != current_fetched_version as i64 { + error!( batch_start_version = batch_start_version, last_fetched_version = last_fetched_version, current_fetched_version = current_fetched_version, "[Parser] Received batch with gap from GRPC stream" ); - panic!("[Parser] Received batch with gap from GRPC stream"); + panic!("[Parser] Received batch with gap from GRPC stream"); + } + last_fetched_version = + txn_pb.transactions.as_slice().last().unwrap().version as i64; + txn_pb } - last_fetched_version = - txn_pb.transactions.as_slice().last().unwrap().version as i64; - transactions_batches.push(txn_pb); - }, - // Channel is empty and send is not drpped which we definitely expect. Wait for a bit and continue polling. - Err(TryRecvError::Empty) => { - break; - }, - // This happens when the channel is closed. We should panic. - Err(TryRecvError::Disconnected) => { - error!( + // This happens when the channel is closed. We should panic. + Err(_e) => { + error!( processor_name = processor_name, service_type = PROCESSOR_SERVICE_TYPE, stream_address = self.indexer_grpc_data_service_address.as_str(), "[Parser] Channel closed; stream ended." ); - panic!("[Parser] Channel closed"); + panic!("[Parser] Channel closed"); + } }, - } - } + Err(_) => { + error!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + stream_address = stream_address, + "[Parser] Consumer thread timed out waiting for transactions", + ); + panic!("[Parser] Consumer thread timed out waiting for transactions"); + } + }; + }); + } let size_in_bytes = transactions_batches .iter() @@ -355,7 +343,7 @@ impl Worker { &auth_token, enable_verbose_logging, ) - .await + .await }); tasks.push(task); } @@ -375,7 +363,7 @@ impl Worker { .with_label_values(&[processor_name]) .inc(); versions - }, + } Err(e) => { error!( processor_name = processor_name, @@ -390,7 +378,7 @@ impl Worker { "[Parser] Error processing '{:}' transactions: {:?}", processor_name, e ); - }, + } }; processed_versions.push(processed); } @@ -544,7 +532,7 @@ impl Worker { "[Parser] Chain id matches! Continue to index...", ); Ok(chain_id as u64) - }, + } None => { info!( processor_name = processor_name, @@ -560,10 +548,10 @@ impl Worker { .on_conflict_do_nothing(), None, ) - .await - .context("[Parser] Error updating chain_id!") - .map(|_| grpc_chain_id as u64) - }, + .await + .context("[Parser] Error updating chain_id!") + .map(|_| grpc_chain_id as u64) + } } } } @@ -739,27 +727,27 @@ pub fn build_processor(config: &ProcessorConfig, db_pool: PgDbPool) -> Processor match config { ProcessorConfig::AccountTransactionsProcessor => { Processor::from(AccountTransactionsProcessor::new(db_pool)) - }, + } ProcessorConfig::AnsProcessor(config) => { Processor::from(AnsProcessor::new(db_pool, config.clone())) - }, + } ProcessorConfig::CoinProcessor => Processor::from(CoinProcessor::new(db_pool)), ProcessorConfig::DefaultProcessor => Processor::from(DefaultProcessor::new(db_pool)), ProcessorConfig::EventsProcessor => Processor::from(EventsProcessor::new(db_pool)), ProcessorConfig::FungibleAssetProcessor => { Processor::from(FungibleAssetProcessor::new(db_pool)) - }, + } ProcessorConfig::NftMetadataProcessor(config) => { Processor::from(NftMetadataProcessor::new(db_pool, config.clone())) - }, + } ProcessorConfig::ObjectsProcessor => Processor::from(ObjectsProcessor::new(db_pool)), ProcessorConfig::StakeProcessor => Processor::from(StakeProcessor::new(db_pool)), ProcessorConfig::TokenProcessor(config) => { Processor::from(TokenProcessor::new(db_pool, config.clone())) - }, + } ProcessorConfig::TokenV2Processor => Processor::from(TokenV2Processor::new(db_pool)), ProcessorConfig::UserTransactionProcessor => { Processor::from(UserTransactionProcessor::new(db_pool)) - }, + } } } From 619bf026c3806afecee1881b69f9f847e1db9cb5 Mon Sep 17 00:00:00 2001 From: CapCap Date: Mon, 22 Jan 2024 21:11:53 -0800 Subject: [PATCH 02/28] add kanal cargo --- rust/Cargo.lock | 22 +++++++++++++++------- rust/Cargo.toml | 1 + 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 6a52e23e..38dae1c4 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -506,12 +506,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.16" +version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" -dependencies = [ - "cfg-if", -] +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" [[package]] name = "crypto-common" @@ -1365,6 +1362,16 @@ dependencies = [ "simple_asn1", ] +[[package]] +name = "kanal" +version = "0.1.0-pre8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05d55519627edaf7fd0f29981f6dc03fb52df3f5b257130eb8d0bf2801ea1d7" +dependencies = [ + "futures-core", + "lock_api", +] + [[package]] name = "keccak" version = "0.1.4" @@ -1794,9 +1801,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.10" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57" +checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" [[package]] name = "pin-utils" @@ -1922,6 +1929,7 @@ dependencies = [ "google-cloud-googleapis", "google-cloud-pubsub", "hex", + "kanal", "native-tls", "once_cell", "postgres-native-tls", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 77dcc133..d45809a4 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -27,6 +27,7 @@ ahash = { version = "0.8.7", features = ["serde"] } anyhow = "1.0.62" async-trait = "0.1.53" aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "af0dcea7144225a709e4f595e58f8026b99e901c" } +kanal = { version = "0.1.0-pre8", features = ["async"] } backtrace = "0.3.58" base64 = "0.13.0" bb8 = "0.8.1" From 81967a252a33faeec2387c4d492a7d4bfd6cd197 Mon Sep 17 00:00:00 2001 From: CapCap Date: Mon, 22 Jan 2024 21:51:22 -0800 Subject: [PATCH 03/28] more --- rust/processor/src/grpc_stream.rs | 46 +++- rust/processor/src/worker.rs | 420 ++++++++++++++---------------- 2 files changed, 232 insertions(+), 234 deletions(-) diff --git a/rust/processor/src/grpc_stream.rs b/rust/processor/src/grpc_stream.rs index c898a712..210ee7f9 100644 --- a/rust/processor/src/grpc_stream.rs +++ b/rust/processor/src/grpc_stream.rs @@ -71,11 +71,11 @@ pub async fn get_stream( let channel = tonic::transport::Channel::from_shared( indexer_grpc_data_service_address.to_string(), ) - .expect( - "[Parser] Failed to build GRPC channel, perhaps because the data service URL is invalid", - ) - .http2_keep_alive_interval(indexer_grpc_http2_ping_interval) - .keep_alive_timeout(indexer_grpc_http2_ping_timeout); + .expect( + "[Parser] Failed to build GRPC channel, perhaps because the data service URL is invalid", + ) + .http2_keep_alive_interval(indexer_grpc_http2_ping_interval) + .keep_alive_timeout(indexer_grpc_http2_ping_timeout); // If the scheme is https, add a TLS config. let channel = if indexer_grpc_data_service_address.scheme() == "https" { @@ -112,7 +112,7 @@ pub async fn get_stream( "[Parser] Error connecting to GRPC client" ); panic!("[Parser] Error connecting to GRPC client"); - }, + } }; let count = ending_version.map(|v| (v as i64 - starting_version as i64 + 1) as u64); info!( @@ -131,6 +131,10 @@ pub async fn get_stream( .expect("[Parser] Failed to get grpc response. Is the server running?") } +pub async fn get_chain_id(){ + +} + /// Gets a batch of transactions from the stream. Batch size is set in the grpc server. /// The number of batches depends on our config /// There could be several special scenarios: @@ -169,7 +173,7 @@ pub async fn create_fetcher_loop( auth_token.clone(), processor_name.to_string(), ) - .await; + .await; let mut connection_id = match response.metadata().get(GRPC_CONNECTION_ID) { Some(connection_id) => connection_id.to_str().unwrap().to_string(), None => "".to_string(), @@ -185,6 +189,8 @@ pub async fn create_fetcher_loop( "[Parser] Successfully connected to GRPC stream", ); + let mut last_fetched_version = batch_start_version as i64 - 1; + let mut batch_start_version = batch_start_version; loop { let is_success = match resp_stream.next().await { Some(Ok(r)) => { @@ -225,6 +231,20 @@ pub async fn create_fetcher_loop( "{}", ProcessorStep::ReceivedTxnsFromGrpc.get_label(), ); + + let current_fetched_version = start_version; + if last_fetched_version + 1 != current_fetched_version as i64 { + error!( + batch_start_version = batch_start_version, + last_fetched_version = last_fetched_version, + current_fetched_version = current_fetched_version, + "[Parser] Received batch with gap from GRPC stream" + ); + panic!("[Parser] Received batch with gap from GRPC stream"); + } + last_fetched_version = end_version as i64; + batch_start_version = (last_fetched_version + 1) as u64; + LATEST_PROCESSED_VERSION .with_label_values(&[ &processor_name, @@ -274,7 +294,7 @@ pub async fn create_fetcher_loop( txn_pb.size_in_bytes as f64 / txn_channel_send_latency.elapsed().as_secs_f64(); match txn_sender.send(txn_pb).await { - Ok(()) => {}, + Ok(()) => {} Err(e) => { error!( processor_name = processor_name, @@ -285,7 +305,7 @@ pub async fn create_fetcher_loop( "[Parser] Error sending GRPC response to channel." ); panic!("[Parser] Error sending GRPC response to channel.") - }, + } } info!( processor_name = processor_name, @@ -306,7 +326,7 @@ pub async fn create_fetcher_loop( .set((buffer_size - txn_sender.capacity()) as i64); grpc_channel_recv_latency = std::time::Instant::now(); true - }, + } Some(Err(rpc_error)) => { tracing::warn!( processor_name = processor_name, @@ -319,7 +339,7 @@ pub async fn create_fetcher_loop( "[Parser] Error receiving datastream response." ); false - }, + } None => { tracing::warn!( processor_name = processor_name, @@ -331,7 +351,7 @@ pub async fn create_fetcher_loop( "[Parser] Stream ended." ); false - }, + } }; // Check if we're at the end of the stream let is_end = if let Some(ending_version) = request_ending_version { @@ -411,7 +431,7 @@ pub async fn create_fetcher_loop( auth_token.clone(), processor_name.to_string(), ) - .await; + .await; connection_id = match response.metadata().get(GRPC_CONNECTION_ID) { Some(connection_id) => connection_id.to_str().unwrap().to_string(), None => "".to_string(), diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 29875bff..d8e29208 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -218,146 +218,100 @@ impl Worker { "[Parser] Fetching transaction batches from channel", ); let txn_channel_fetch_latency = std::time::Instant::now(); - let mut transactions_batches = vec![]; - let mut last_fetched_version = batch_start_version as i64 - 1; - // Make a new thread that copies from one channel, into another: if it doesn't see any new data for - // `CONSUMER_THREAD_TIMEOUT_IN_SECS`: panic let stream_address = self.indexer_grpc_data_service_address.to_string(); let processor_name_string = processor_name.to_string(); + let auth_token = self.auth_token.clone(); - let tasks = vec![]; + let mut tasks = vec![]; for task_index in (0..concurrent_tasks) { let join_handle = tokio::spawn(async move { - match timeout( + let txn_pb = match timeout( Duration::from_secs(CONSUMER_THREAD_TIMEOUT_IN_SECS), receiver.recv(), ) .await { - let txn_pb = Ok(txn_pb) => match txn_pb { - Ok(txn_pb) => { - if let Some(existing_id) = db_chain_id { - if txn_pb.chain_id != existing_id { - error!( - processor_name = processor_name, - stream_address = stream_address, - chain_id = txn_pb.chain_id, - existing_id = existing_id, - "[Parser] Stream somehow changed chain id!", - ); - panic!("[Parser] Stream somehow changed chain id!"); - } - } else { - db_chain_id = Some( - self.check_or_update_chain_id(txn_pb.chain_id as i64) - .await - .unwrap(), - ); - } - let current_fetched_version = - txn_pb.transactions.as_slice().first().unwrap().version; - if last_fetched_version + 1 != current_fetched_version as i64 { - error!( - batch_start_version = batch_start_version, - last_fetched_version = last_fetched_version, - current_fetched_version = current_fetched_version, - "[Parser] Received batch with gap from GRPC stream" - ); - panic!("[Parser] Received batch with gap from GRPC stream"); - } - last_fetched_version = - txn_pb.transactions.as_slice().last().unwrap().version as i64; - txn_pb - } + Ok(txn_pb) => match txn_pb { + Ok(txn_pb) => txn_pb, // This happens when the channel is closed. We should panic. Err(_e) => { error!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - stream_address = self.indexer_grpc_data_service_address.as_str(), - "[Parser] Channel closed; stream ended." - ); + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + stream_address = self.indexer_grpc_data_service_address.as_str(), + "[Parser] Channel closed; stream ended." + ); panic!("[Parser] Channel closed"); } }, Err(_) => { error!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - stream_address = stream_address, - "[Parser] Consumer thread timed out waiting for transactions", - ); + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + stream_address = stream_address, + "[Parser] Consumer thread timed out waiting for transactions", + ); panic!("[Parser] Consumer thread timed out waiting for transactions"); } }; - }); - } - let size_in_bytes = transactions_batches - .iter() - .fold(0.0, |acc, txn_batch| acc + txn_batch.size_in_bytes as f64); - let batch_start_txn_timestamp = transactions_batches - .first() - .unwrap() - .transactions - .as_slice() - .first() - .unwrap() - .timestamp - .clone(); - let batch_end_txn_timestamp = transactions_batches - .last() - .unwrap() - .transactions - .as_slice() - .last() - .unwrap() - .timestamp - .clone(); - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version = batch_start_version, - end_version = last_fetched_version, - num_of_transactions = last_fetched_version - batch_start_version as i64 + 1, - size_in_bytes, - duration_in_secs = txn_channel_fetch_latency.elapsed().as_secs_f64(), - tps = (last_fetched_version as f64 - batch_start_version as f64) - / txn_channel_fetch_latency.elapsed().as_secs_f64(), - bytes_per_sec = size_in_bytes / txn_channel_fetch_latency.elapsed().as_secs_f64(), - "[Parser] Successfully fetched transaction batches from channel." - ); + let size_in_bytes = txn_pb.size_in_bytes as f64; + let first_txn = txn_pb.transactions.as_slice().first().unwrap(); + let last_txn = txn_pb.transactions.as_slice().last().unwrap(); + info!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + start_version = batch_start_version, + end_version = first_txn.version, + num_of_transactions = last_txn.version - batch_start_version as i64 + 1, + size_in_bytes, + duration_in_secs = txn_channel_fetch_latency.elapsed().as_secs_f64(), + tps = (last_txn.version as f64 - batch_start_version as f64) + / txn_channel_fetch_latency.elapsed().as_secs_f64(), + bytes_per_sec = + size_in_bytes / txn_channel_fetch_latency.elapsed().as_secs_f64(), + "[Parser][T#{}] Successfully fetched transaction batches from channel.", + task_index + ); + + if let Some(existing_id) = db_chain_id { + if txn_pb.chain_id != existing_id { + error!( + processor_name = processor_name, + stream_address = stream_address, + chain_id = txn_pb.chain_id, + existing_id = existing_id, + "[Parser][T#{}] Stream somehow changed chain id!", + task_index + ); + panic!( + "[Parser][T#{}] Stream somehow changed chain id!", + task_index + ); + } + } else { + db_chain_id = Some( + self.check_or_update_chain_id(txn_pb.chain_id as i64) + .await + .unwrap(), + ); + } - // Process the transactions in parallel - let mut tasks = vec![]; - for transactions_pb in transactions_batches { let processor_clone = processor.clone(); - let auth_token = self.auth_token.clone(); - let task = tokio::spawn(async move { - do_processor( - transactions_pb, - processor_clone, - db_chain_id, - processor_name, - &auth_token, - enable_verbose_logging, - ) - .await - }); - tasks.push(task); - } - let processing_time = std::time::Instant::now(); - let task_count = tasks.len(); - let batches = match futures::future::try_join_all(tasks).await { - Ok(res) => res, - Err(err) => panic!("[Parser] Error processing transaction batches: {:?}", err), - }; - - // Update states depending on results of the batch processing - let mut processed_versions = vec![]; - for res in batches { - let processed: ProcessingResult = match res { + let processing_time = std::time::Instant::now(); + + let res = do_processor( + txn_pb, + processor_clone, + db_chain_id, + processor_name, + &auth_token, + enable_verbose_logging, + ) + .await; + + let processed = match res { Ok(versions) => { PROCESSOR_SUCCESSES_COUNT .with_label_values(&[processor_name]) @@ -366,11 +320,11 @@ impl Worker { } Err(e) => { error!( - processor_name = processor_name, - stream_address = self.indexer_grpc_data_service_address.to_string(), - error = ?e, - "[Parser] Error processing transactions" - ); + processor_name = processor_name, + stream_address = self.indexer_grpc_data_service_address.to_string(), + error = ?e, + "[Parser] Error processing transactions" + ); PROCESSOR_ERRORS_COUNT .with_label_values(&[processor_name]) .inc(); @@ -380,115 +334,139 @@ impl Worker { ); } }; - processed_versions.push(processed); - } - // Make sure there are no gaps and advance states - processed_versions.sort_by(|a, b| a.start_version.cmp(&b.start_version)); - let mut prev_start = None; - let mut prev_end = None; - let mut max_processing_duration_in_secs: f64 = 0.0; - let mut max_db_insertion_duration_in_secs: f64 = 0.0; - let processed_versions_sorted = processed_versions.clone(); - for processing_result in processed_versions { - let start = processing_result.start_version; - let end = processing_result.end_version; - max_processing_duration_in_secs = max_processing_duration_in_secs - .max(processing_result.processing_duration_in_secs); + let mut max_processing_duration_in_secs: f64 = 0.0; + let mut max_db_insertion_duration_in_secs: f64 = 0.0; + max_processing_duration_in_secs = + max_processing_duration_in_secs.max(processed.processing_duration_in_secs); max_db_insertion_duration_in_secs = max_db_insertion_duration_in_secs - .max(processing_result.db_insertion_duration_in_secs); - if prev_start.is_none() { - prev_start = Some(start); - prev_end = Some(end); - } else { - if prev_end.unwrap() + 1 != start { - error!( - processor_name = processor_name, - stream_address = self.indexer_grpc_data_service_address.to_string(), - processed_versions = processed_versions_sorted - .iter() - .map(|result| format!( - "{}-{}", - result.start_version, result.end_version - )) - .collect::>() - .join(", "), - "[Parser] Gaps in processing stream" - ); - panic!(); - } - prev_start = Some(start); - prev_end = Some(end); + .max(processed.db_insertion_duration_in_secs); + + processor + .update_last_processed_version(batch_end, batch_end_txn_timestamp.clone()) + .await + .unwrap(); + }); + tasks.push(join_handle); + } + + // Process the transactions in parallel + + let processing_time = std::time::Instant::now(); + let task_count = tasks.len(); + + // This will be forever, until all the indexer dies + // This will be forever, until all the indexer dies + // This will be forever, until all the indexer dies + futures::future::try_join_all(tasks).await.expect("[Processor] Processor tasks have died"); + + // Update states depending on results of the batch processing + let mut processed_versions = vec![]; + + // TODO: MAKE THIS GAP HANDLING CODE WORK FOR OUR NEW ASYNC WORLD! + // Make sure there are no gaps and advance states + processed_versions.sort_by(|a, b| a.start_version.cmp(&b.start_version)); + let mut prev_start = None; + let mut prev_end = None; + let mut max_processing_duration_in_secs: f64 = 0.0; + let mut max_db_insertion_duration_in_secs: f64 = 0.0; + let processed_versions_sorted = processed_versions.clone(); + for processing_result in processed_versions { + let start = processing_result.start_version; + let end = processing_result.end_version; + max_processing_duration_in_secs = + max_processing_duration_in_secs.max(processing_result.processing_duration_in_secs); + max_db_insertion_duration_in_secs = max_db_insertion_duration_in_secs + .max(processing_result.db_insertion_duration_in_secs); + + if prev_start.is_none() { + prev_start = Some(start); + prev_end = Some(end); + } else { + if prev_end.unwrap() + 1 != start { + error!( + processor_name = processor_name, + stream_address = self.indexer_grpc_data_service_address.to_string(), + processed_versions = processed_versions_sorted + .iter() + .map(|result| format!( + "{}-{}", + result.start_version, result.end_version + )) + .collect::>() + .join(", "), + "[Parser] Gaps in processing stream" + ); + panic!(); } + prev_start = Some(start); + prev_end = Some(end); } - let batch_start = processed_versions_sorted.first().unwrap().start_version; - let batch_end = processed_versions_sorted.last().unwrap().end_version; - batch_start_version = batch_end + 1; + } - processor - .update_last_processed_version(batch_end, batch_end_txn_timestamp.clone()) - .await - .unwrap(); + processor + .update_last_processed_version(batch_end, batch_end_txn_timestamp.clone()) + .await + .unwrap(); - ma.tick_now(batch_end - batch_start + 1); - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version = batch_start, - end_version = batch_end, - start_txn_timestamp_iso = batch_start_txn_timestamp - .clone() - .map(|t| timestamp_to_iso(&t)) - .unwrap_or_default(), - end_txn_timestamp_iso = batch_end_txn_timestamp - .map(|t| timestamp_to_iso(&t)) - .unwrap_or_default(), - num_of_transactions = batch_end - batch_start + 1, - task_count, - size_in_bytes, - duration_in_secs = processing_time.elapsed().as_secs_f64(), - tps = (ma.avg() * 1000.0) as u64, - bytes_per_sec = size_in_bytes / processing_time.elapsed().as_secs_f64(), - step = ProcessorStep::ProcessedMultipleBatches.get_step(), - "{}", + ma.tick_now(batch_end - batch_start + 1); + info!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + start_version = batch_start, + end_version = batch_end, + start_txn_timestamp_iso = batch_start_txn_timestamp + .clone() + .map(|t| timestamp_to_iso(&t)) + .unwrap_or_default(), + end_txn_timestamp_iso = batch_end_txn_timestamp + .map(|t| timestamp_to_iso(&t)) + .unwrap_or_default(), + num_of_transactions = batch_end - batch_start + 1, + task_count, + size_in_bytes, + duration_in_secs = processing_time.elapsed().as_secs_f64(), + tps = (ma.avg() * 1000.0) as u64, + bytes_per_sec = size_in_bytes / processing_time.elapsed().as_secs_f64(), + step = ProcessorStep::ProcessedMultipleBatches.get_step(), + "{}", + ProcessorStep::ProcessedMultipleBatches.get_label(), + ); + LATEST_PROCESSED_VERSION + .with_label_values(&[ + processor_name, + ProcessorStep::ProcessedMultipleBatches.get_step(), ProcessorStep::ProcessedMultipleBatches.get_label(), + ]) + .set(batch_end as i64); + TRANSACTION_UNIX_TIMESTAMP + .with_label_values(&[ + processor_name, + ProcessorStep::ProcessedMultipleBatches.get_step(), + ProcessorStep::ProcessedMultipleBatches.get_label(), + ]) + .set( + batch_start_txn_timestamp + .map(|t| timestamp_to_unixtime(&t)) + .unwrap_or_default(), ); - LATEST_PROCESSED_VERSION - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .set(batch_end as i64); - TRANSACTION_UNIX_TIMESTAMP - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .set( - batch_start_txn_timestamp - .map(|t| timestamp_to_unixtime(&t)) - .unwrap_or_default(), - ); - PROCESSED_BYTES_COUNT - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .inc_by(size_in_bytes as u64); - NUM_TRANSACTIONS_PROCESSED_COUNT - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .inc_by(batch_end - batch_start + 1); - MULTI_BATCH_PROCESSING_TIME_IN_SECS - .with_label_values(&[processor_name]) - .set(processing_time.elapsed().as_secs_f64()); - } + PROCESSED_BYTES_COUNT + .with_label_values(&[ + processor_name, + ProcessorStep::ProcessedMultipleBatches.get_step(), + ProcessorStep::ProcessedMultipleBatches.get_label(), + ]) + .inc_by(size_in_bytes as u64); + NUM_TRANSACTIONS_PROCESSED_COUNT + .with_label_values(&[ + processor_name, + ProcessorStep::ProcessedMultipleBatches.get_step(), + ProcessorStep::ProcessedMultipleBatches.get_label(), + ]) + .inc_by(batch_end - batch_start + 1); + MULTI_BATCH_PROCESSING_TIME_IN_SECS + .with_label_values(&[processor_name]) + .set(processing_time.elapsed().as_secs_f64()); } async fn run_migrations(&self) { From 42747b1503648b90b93b3e59cc486b9c11b855d9 Mon Sep 17 00:00:00 2001 From: CapCap Date: Mon, 22 Jan 2024 22:35:10 -0800 Subject: [PATCH 04/28] finish fix --- rust/processor/src/grpc_stream.rs | 86 ++++++-- rust/processor/src/worker.rs | 325 ++++++++++++++++-------------- 2 files changed, 244 insertions(+), 167 deletions(-) diff --git a/rust/processor/src/grpc_stream.rs b/rust/processor/src/grpc_stream.rs index 210ee7f9..0d0344ea 100644 --- a/rust/processor/src/grpc_stream.rs +++ b/rust/processor/src/grpc_stream.rs @@ -71,11 +71,11 @@ pub async fn get_stream( let channel = tonic::transport::Channel::from_shared( indexer_grpc_data_service_address.to_string(), ) - .expect( - "[Parser] Failed to build GRPC channel, perhaps because the data service URL is invalid", - ) - .http2_keep_alive_interval(indexer_grpc_http2_ping_interval) - .keep_alive_timeout(indexer_grpc_http2_ping_timeout); + .expect( + "[Parser] Failed to build GRPC channel, perhaps because the data service URL is invalid", + ) + .http2_keep_alive_interval(indexer_grpc_http2_ping_interval) + .keep_alive_timeout(indexer_grpc_http2_ping_timeout); // If the scheme is https, add a TLS config. let channel = if indexer_grpc_data_service_address.scheme() == "https" { @@ -112,7 +112,7 @@ pub async fn get_stream( "[Parser] Error connecting to GRPC client" ); panic!("[Parser] Error connecting to GRPC client"); - } + }, }; let count = ending_version.map(|v| (v as i64 - starting_version as i64 + 1) as u64); info!( @@ -131,8 +131,66 @@ pub async fn get_stream( .expect("[Parser] Failed to get grpc response. Is the server running?") } -pub async fn get_chain_id(){ +pub async fn get_chain_id( + indexer_grpc_data_service_address: Url, + indexer_grpc_http2_ping_interval: Duration, + indexer_grpc_http2_ping_timeout: Duration, + auth_token: String, + processor_name: String, +) -> u64 { + info!( + processor_name = processor_name, + service_type = crate::worker::PROCESSOR_SERVICE_TYPE, + stream_address = indexer_grpc_data_service_address.to_string(), + "[Parser] Connecting to GRPC stream to get chain id", + ); + let response = get_stream( + indexer_grpc_data_service_address.clone(), + indexer_grpc_http2_ping_interval, + indexer_grpc_http2_ping_timeout, + 1, + Some(2), + auth_token.clone(), + processor_name.to_string(), + ) + .await; + let connection_id = match response.metadata().get(GRPC_CONNECTION_ID) { + Some(connection_id) => connection_id.to_str().unwrap().to_string(), + None => "".to_string(), + }; + let mut resp_stream = response.into_inner(); + info!( + processor_name = processor_name, + service_type = crate::worker::PROCESSOR_SERVICE_TYPE, + stream_address = indexer_grpc_data_service_address.to_string(), + connection_id, + "[Parser] Successfully connected to GRPC stream to get chain id", + ); + match resp_stream.next().await { + Some(Ok(r)) => r.chain_id.expect("[Parser] Chain Id doesn't exist."), + Some(Err(rpc_error)) => { + error!( + processor_name = processor_name, + service_type = crate::worker::PROCESSOR_SERVICE_TYPE, + stream_address = indexer_grpc_data_service_address.to_string(), + connection_id, + error = ?rpc_error, + "[Parser] Error receiving datastream response for chain id" + ); + panic!("[Parser] Error receiving datastream response for chain id"); + }, + None => { + error!( + processor_name = processor_name, + service_type = crate::worker::PROCESSOR_SERVICE_TYPE, + stream_address = indexer_grpc_data_service_address.to_string(), + connection_id, + "[Parser] Stream ended before getting response fo for chain id" + ); + panic!("[Parser] Stream ended before getting response fo for chain id"); + }, + } } /// Gets a batch of transactions from the stream. Batch size is set in the grpc server. @@ -173,7 +231,7 @@ pub async fn create_fetcher_loop( auth_token.clone(), processor_name.to_string(), ) - .await; + .await; let mut connection_id = match response.metadata().get(GRPC_CONNECTION_ID) { Some(connection_id) => connection_id.to_str().unwrap().to_string(), None => "".to_string(), @@ -294,7 +352,7 @@ pub async fn create_fetcher_loop( txn_pb.size_in_bytes as f64 / txn_channel_send_latency.elapsed().as_secs_f64(); match txn_sender.send(txn_pb).await { - Ok(()) => {} + Ok(()) => {}, Err(e) => { error!( processor_name = processor_name, @@ -305,7 +363,7 @@ pub async fn create_fetcher_loop( "[Parser] Error sending GRPC response to channel." ); panic!("[Parser] Error sending GRPC response to channel.") - } + }, } info!( processor_name = processor_name, @@ -326,7 +384,7 @@ pub async fn create_fetcher_loop( .set((buffer_size - txn_sender.capacity()) as i64); grpc_channel_recv_latency = std::time::Instant::now(); true - } + }, Some(Err(rpc_error)) => { tracing::warn!( processor_name = processor_name, @@ -339,7 +397,7 @@ pub async fn create_fetcher_loop( "[Parser] Error receiving datastream response." ); false - } + }, None => { tracing::warn!( processor_name = processor_name, @@ -351,7 +409,7 @@ pub async fn create_fetcher_loop( "[Parser] Stream ended." ); false - } + }, }; // Check if we're at the end of the stream let is_end = if let Some(ending_version) = request_ending_version { @@ -431,7 +489,7 @@ pub async fn create_fetcher_loop( auth_token.clone(), processor_name.to_string(), ) - .await; + .await; connection_id = match response.metadata().get(GRPC_CONNECTION_ID) { Some(connection_id) => connection_id.to_str().unwrap().to_string(), None => "".to_string(), diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index d8e29208..8f96595c 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -32,7 +32,7 @@ use aptos_moving_average::MovingAverage; use aptos_protos::transaction::v1::Transaction; use diesel::Connection; use std::{sync::Arc, time::Duration}; -use tokio::time::timeout; +use tokio::{sync::Mutex, time::timeout}; use tracing::{error, info}; use url::Url; @@ -162,12 +162,26 @@ impl Worker { let concurrent_tasks = self.number_concurrent_processing_tasks; // Build the processor based on the config. - let processor = build_processor(&self.processor_config, self.db_pool.clone()); - let processor = Arc::new(processor); + let processor = Arc::new(build_processor( + &self.processor_config, + self.db_pool.clone(), + )); + + // get the chain id + let chain_id = crate::grpc_stream::get_chain_id( + self.indexer_grpc_data_service_address.clone(), + self.grpc_http2_config.grpc_http2_ping_interval_in_secs(), + self.grpc_http2_config.grpc_http2_ping_timeout_in_secs(), + self.auth_token.clone(), + processor_name.to_string(), + ) + .await; + self.check_or_update_chain_id(chain_id as i64) + .await + .unwrap(); // This is the moving average that we use to calculate TPS - let mut ma = MovingAverage::new(10); - let mut batch_start_version = starting_version; + let ma = Arc::new(Mutex::new(MovingAverage::new(10))); let ending_version = self.ending_version; let indexer_grpc_data_service_address = self.indexer_grpc_data_service_address.clone(); @@ -178,7 +192,7 @@ impl Worker { // Create a transaction fetcher thread that will continuously fetch transactions from the GRPC stream // and write into a channel // The each item will be (chain_id, batch of transactions) - let (tx, mut receiver) = kanal::bounded_async::(BUFFER_SIZE); + let (tx, receiver) = kanal::bounded_async::(BUFFER_SIZE); let request_ending_version = self.ending_version; let auth_token = self.auth_token.clone(); tokio::spawn(async move { @@ -186,7 +200,7 @@ impl Worker { processor_name = processor_name, service_type = PROCESSOR_SERVICE_TYPE, end_version = ending_version, - start_version = batch_start_version, + start_version = starting_version, "[Parser] Starting fetcher thread" ); crate::grpc_stream::create_fetcher_loop( @@ -198,10 +212,10 @@ impl Worker { request_ending_version, auth_token, processor_name.to_string(), - batch_start_version, + starting_version, BUFFER_SIZE, ) - .await + .await }); // This is the consumer side of the channel. These are the major states: @@ -210,7 +224,7 @@ impl Worker { // 3. We have received either an empty batch or a batch with a gap. We should panic. // 4. We have not received anything in X seconds, we should panic. // 5. If it's the wrong chain, panic. - let mut db_chain_id = None; + info!( processor_name = processor_name, service_type = PROCESSOR_SERVICE_TYPE, @@ -219,18 +233,20 @@ impl Worker { ); let txn_channel_fetch_latency = std::time::Instant::now(); - let stream_address = self.indexer_grpc_data_service_address.to_string(); - let processor_name_string = processor_name.to_string(); - let auth_token = self.auth_token.clone(); - let mut tasks = vec![]; - for task_index in (0..concurrent_tasks) { + for task_index in 0..concurrent_tasks { + let auth_token = self.auth_token.clone(); + let stream_address = self.indexer_grpc_data_service_address.to_string(); + let processor_clone = processor.clone(); + let ma_clone = ma.clone(); + let receiver_clone = receiver.clone(); + let join_handle = tokio::spawn(async move { let txn_pb = match timeout( Duration::from_secs(CONSUMER_THREAD_TIMEOUT_IN_SECS), - receiver.recv(), + receiver_clone.recv(), ) - .await + .await { Ok(txn_pb) => match txn_pb { Ok(txn_pb) => txn_pb, @@ -239,35 +255,45 @@ impl Worker { error!( processor_name = processor_name, service_type = PROCESSOR_SERVICE_TYPE, - stream_address = self.indexer_grpc_data_service_address.as_str(), - "[Parser] Channel closed; stream ended." + stream_address = stream_address, + "[Parser][T#{}] Channel closed; stream ended.", + task_index ); - panic!("[Parser] Channel closed"); - } + panic!("[Parser][T#{}] Channel closed", task_index); + }, }, Err(_) => { error!( processor_name = processor_name, service_type = PROCESSOR_SERVICE_TYPE, stream_address = stream_address, - "[Parser] Consumer thread timed out waiting for transactions", + "[Parser][T#{}] Consumer thread timed out waiting for transactions", + task_index ); - panic!("[Parser] Consumer thread timed out waiting for transactions"); - } + panic!( + "[Parser][T#{}] Consumer thread timed out waiting for transactions", + task_index + ); + }, }; let size_in_bytes = txn_pb.size_in_bytes as f64; let first_txn = txn_pb.transactions.as_slice().first().unwrap(); + let first_txn_version = first_txn.version; + let first_txn_timestamp = first_txn.timestamp.clone(); let last_txn = txn_pb.transactions.as_slice().last().unwrap(); + let last_txn_version = last_txn.version; + let last_txn_timestamp = last_txn.timestamp.clone(); + info!( processor_name = processor_name, service_type = PROCESSOR_SERVICE_TYPE, - start_version = batch_start_version, - end_version = first_txn.version, - num_of_transactions = last_txn.version - batch_start_version as i64 + 1, + start_version = first_txn_version, + end_version = last_txn_version, + num_of_transactions = (last_txn_version - first_txn_version) as i64 + 1, size_in_bytes, duration_in_secs = txn_channel_fetch_latency.elapsed().as_secs_f64(), - tps = (last_txn.version as f64 - batch_start_version as f64) + tps = (last_txn_version as f64 - first_txn_version as f64) / txn_channel_fetch_latency.elapsed().as_secs_f64(), bytes_per_sec = size_in_bytes / txn_channel_fetch_latency.elapsed().as_secs_f64(), @@ -275,95 +301,150 @@ impl Worker { task_index ); - if let Some(existing_id) = db_chain_id { - if txn_pb.chain_id != existing_id { - error!( - processor_name = processor_name, - stream_address = stream_address, - chain_id = txn_pb.chain_id, - existing_id = existing_id, - "[Parser][T#{}] Stream somehow changed chain id!", - task_index - ); - panic!( - "[Parser][T#{}] Stream somehow changed chain id!", - task_index - ); - } - } else { - db_chain_id = Some( - self.check_or_update_chain_id(txn_pb.chain_id as i64) - .await - .unwrap(), + // Ensure chain_id has not changed + if txn_pb.chain_id != chain_id { + error!( + processor_name = processor_name, + stream_address = stream_address, + chain_id = txn_pb.chain_id, + existing_id = chain_id, + "[Parser][T#{}] Stream somehow changed chain id!", + task_index + ); + panic!( + "[Parser][T#{}] Stream somehow changed chain id!", + task_index ); } - let processor_clone = processor.clone(); let processing_time = std::time::Instant::now(); let res = do_processor( txn_pb, - processor_clone, - db_chain_id, + processor_clone.clone(), + chain_id, processor_name, &auth_token, enable_verbose_logging, ) - .await; + .await; - let processed = match res { + let _processed = match res { Ok(versions) => { PROCESSOR_SUCCESSES_COUNT .with_label_values(&[processor_name]) .inc(); versions - } + }, Err(e) => { error!( - processor_name = processor_name, - stream_address = self.indexer_grpc_data_service_address.to_string(), - error = ?e, - "[Parser] Error processing transactions" - ); + processor_name = processor_name, + stream_address = stream_address, + error = ?e, + "[Parser][T#{}] Error processing transactions", task_index + ); PROCESSOR_ERRORS_COUNT .with_label_values(&[processor_name]) .inc(); panic!( - "[Parser] Error processing '{:}' transactions: {:?}", - processor_name, e + "[Parser][T#{}] Error processing '{:}' transactions: {:?}", + task_index, processor_name, e ); - } + }, }; - let mut max_processing_duration_in_secs: f64 = 0.0; - let mut max_db_insertion_duration_in_secs: f64 = 0.0; + /* + let mut max_processing_duration_in_secs: f64 = processed.processing_duration_in_secs; + let mut max_db_insertion_duration_in_secs: f64 = processed.db_insertion_duration_in_secs; max_processing_duration_in_secs = max_processing_duration_in_secs.max(processed.processing_duration_in_secs); - max_db_insertion_duration_in_secs = max_db_insertion_duration_in_secs - .max(processed.db_insertion_duration_in_secs); + max_db_insertion_duration_in_secs = + max_db_insertion_duration_in_secs.max(processed.db_insertion_duration_in_secs); + */ - processor - .update_last_processed_version(batch_end, batch_end_txn_timestamp.clone()) + processor_clone + .update_last_processed_version(last_txn_version, last_txn_timestamp.clone()) .await .unwrap(); + + let mut ma_locked = ma_clone.lock().await; + ma_locked.tick_now(last_txn_version - first_txn_version + 1); + let tps = (ma_locked.avg() * 1000.0) as u64; + info!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + start_version = first_txn_version, + end_version = last_txn_version, + start_txn_timestamp_iso = first_txn_timestamp + .clone() + .map(|t| timestamp_to_iso(&t)) + .unwrap_or_default(), + end_txn_timestamp_iso = last_txn_timestamp + .map(|t| timestamp_to_iso(&t)) + .unwrap_or_default(), + num_of_transactions = last_txn_version - first_txn_version + 1, + task_count = concurrent_tasks, + size_in_bytes, + duration_in_secs = processing_time.elapsed().as_secs_f64(), + tps = tps, + bytes_per_sec = size_in_bytes / processing_time.elapsed().as_secs_f64(), + step = ProcessorStep::ProcessedMultipleBatches.get_step(), + "{}", + ProcessorStep::ProcessedMultipleBatches.get_label(), + ); + LATEST_PROCESSED_VERSION + .with_label_values(&[ + processor_name, + ProcessorStep::ProcessedMultipleBatches.get_step(), + ProcessorStep::ProcessedMultipleBatches.get_label(), + ]) + .set(last_txn_version as i64); + // TODO: do an atomic thing, or ideally move to an async metrics collector! + TRANSACTION_UNIX_TIMESTAMP + .with_label_values(&[ + processor_name, + ProcessorStep::ProcessedMultipleBatches.get_step(), + ProcessorStep::ProcessedMultipleBatches.get_label(), + ]) + .set( + first_txn_timestamp + .map(|t| timestamp_to_unixtime(&t)) + .unwrap_or_default(), + ); + PROCESSED_BYTES_COUNT + .with_label_values(&[ + processor_name, + ProcessorStep::ProcessedMultipleBatches.get_step(), + ProcessorStep::ProcessedMultipleBatches.get_label(), + ]) + .inc_by(size_in_bytes as u64); + NUM_TRANSACTIONS_PROCESSED_COUNT + .with_label_values(&[ + processor_name, + ProcessorStep::ProcessedMultipleBatches.get_step(), + ProcessorStep::ProcessedMultipleBatches.get_label(), + ]) + .inc_by(last_txn_version - first_txn_version + 1); + MULTI_BATCH_PROCESSING_TIME_IN_SECS + .with_label_values(&[processor_name]) + .set(processing_time.elapsed().as_secs_f64()); }); + tasks.push(join_handle); } - // Process the transactions in parallel - - let processing_time = std::time::Instant::now(); - let task_count = tasks.len(); - // This will be forever, until all the indexer dies // This will be forever, until all the indexer dies // This will be forever, until all the indexer dies - futures::future::try_join_all(tasks).await.expect("[Processor] Processor tasks have died"); + futures::future::try_join_all(tasks) + .await + .expect("[Processor] Processor tasks have died"); // Update states depending on results of the batch processing - let mut processed_versions = vec![]; + // let mut processed_versions = vec![]; // TODO: MAKE THIS GAP HANDLING CODE WORK FOR OUR NEW ASYNC WORLD! + /* // Make sure there are no gaps and advance states processed_versions.sort_by(|a, b| a.start_version.cmp(&b.start_version)); let mut prev_start = None; @@ -403,70 +484,7 @@ impl Worker { prev_end = Some(end); } } - - processor - .update_last_processed_version(batch_end, batch_end_txn_timestamp.clone()) - .await - .unwrap(); - - ma.tick_now(batch_end - batch_start + 1); - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version = batch_start, - end_version = batch_end, - start_txn_timestamp_iso = batch_start_txn_timestamp - .clone() - .map(|t| timestamp_to_iso(&t)) - .unwrap_or_default(), - end_txn_timestamp_iso = batch_end_txn_timestamp - .map(|t| timestamp_to_iso(&t)) - .unwrap_or_default(), - num_of_transactions = batch_end - batch_start + 1, - task_count, - size_in_bytes, - duration_in_secs = processing_time.elapsed().as_secs_f64(), - tps = (ma.avg() * 1000.0) as u64, - bytes_per_sec = size_in_bytes / processing_time.elapsed().as_secs_f64(), - step = ProcessorStep::ProcessedMultipleBatches.get_step(), - "{}", - ProcessorStep::ProcessedMultipleBatches.get_label(), - ); - LATEST_PROCESSED_VERSION - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .set(batch_end as i64); - TRANSACTION_UNIX_TIMESTAMP - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .set( - batch_start_txn_timestamp - .map(|t| timestamp_to_unixtime(&t)) - .unwrap_or_default(), - ); - PROCESSED_BYTES_COUNT - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .inc_by(size_in_bytes as u64); - NUM_TRANSACTIONS_PROCESSED_COUNT - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .inc_by(batch_end - batch_start + 1); - MULTI_BATCH_PROCESSING_TIME_IN_SECS - .with_label_values(&[processor_name]) - .set(processing_time.elapsed().as_secs_f64()); + */ } async fn run_migrations(&self) { @@ -510,7 +528,7 @@ impl Worker { "[Parser] Chain id matches! Continue to index...", ); Ok(chain_id as u64) - } + }, None => { info!( processor_name = processor_name, @@ -526,10 +544,10 @@ impl Worker { .on_conflict_do_nothing(), None, ) - .await - .context("[Parser] Error updating chain_id!") - .map(|_| grpc_chain_id as u64) - } + .await + .context("[Parser] Error updating chain_id!") + .map(|_| grpc_chain_id as u64) + }, } } } @@ -537,7 +555,7 @@ impl Worker { pub async fn do_processor( transactions_pb: TransactionsPBResponse, processor: Arc, - db_chain_id: Option, + db_chain_id: u64, processor_name: &str, auth_token: &str, enable_verbose_logging: bool, @@ -580,8 +598,9 @@ pub async fn do_processor( transactions_pb.transactions, start_version, end_version, - db_chain_id, - ) // TODO: Change how we fetch chain_id, ideally can be accessed by processors when they are initiallized (e.g. so they can have a chain_id field set on new() funciton) + // TODO: Change how we fetch chain_id, ideally can be accessed by processors when they are initiallized (e.g. so they can have a chain_id field set on new() funciton) + Some(db_chain_id), + ) .await; if let Some(ref t) = txn_time { PROCESSOR_DATA_PROCESSED_LATENCY_IN_SECS @@ -705,27 +724,27 @@ pub fn build_processor(config: &ProcessorConfig, db_pool: PgDbPool) -> Processor match config { ProcessorConfig::AccountTransactionsProcessor => { Processor::from(AccountTransactionsProcessor::new(db_pool)) - } + }, ProcessorConfig::AnsProcessor(config) => { Processor::from(AnsProcessor::new(db_pool, config.clone())) - } + }, ProcessorConfig::CoinProcessor => Processor::from(CoinProcessor::new(db_pool)), ProcessorConfig::DefaultProcessor => Processor::from(DefaultProcessor::new(db_pool)), ProcessorConfig::EventsProcessor => Processor::from(EventsProcessor::new(db_pool)), ProcessorConfig::FungibleAssetProcessor => { Processor::from(FungibleAssetProcessor::new(db_pool)) - } + }, ProcessorConfig::NftMetadataProcessor(config) => { Processor::from(NftMetadataProcessor::new(db_pool, config.clone())) - } + }, ProcessorConfig::ObjectsProcessor => Processor::from(ObjectsProcessor::new(db_pool)), ProcessorConfig::StakeProcessor => Processor::from(StakeProcessor::new(db_pool)), ProcessorConfig::TokenProcessor(config) => { Processor::from(TokenProcessor::new(db_pool, config.clone())) - } + }, ProcessorConfig::TokenV2Processor => Processor::from(TokenV2Processor::new(db_pool)), ProcessorConfig::UserTransactionProcessor => { Processor::from(UserTransactionProcessor::new(db_pool)) - } + }, } } From fb040ea36e5fb2d0939e0c868765bcb4cf0da66c Mon Sep 17 00:00:00 2001 From: CapCap Date: Mon, 22 Jan 2024 22:38:27 -0800 Subject: [PATCH 05/28] try 20 --- rust/processor/src/utils/database.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/processor/src/utils/database.rs b/rust/processor/src/utils/database.rs index f41e38d6..958e36be 100644 --- a/rust/processor/src/utils/database.rs +++ b/rust/processor/src/utils/database.rs @@ -45,7 +45,7 @@ pub const MAX_DIESEL_PARAM_SIZE: u16 = u16::MAX / 2; /// we may need to chunk an array of items based on how many columns are in the table. /// This function returns boundaries of chunks in the form of (start_index, end_index) pub fn get_chunks(num_items_to_insert: usize, _column_count: usize) -> Vec<(usize, usize)> { - let max_item_size = 100; // MAX_DIESEL_PARAM_SIZE as usize / column_count; + let max_item_size = 20; // MAX_DIESEL_PARAM_SIZE as usize / column_count; let mut chunk: (usize, usize) = (0, min(num_items_to_insert, max_item_size)); let mut chunks = vec![chunk]; while chunk.1 != num_items_to_insert { From 52b40fbe20ea22334085a2c4bd63708e8917421e Mon Sep 17 00:00:00 2001 From: CapCap Date: Mon, 22 Jan 2024 22:41:02 -0800 Subject: [PATCH 06/28] smallfix --- rust/processor/src/worker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 8f96595c..3aafc9eb 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -598,7 +598,7 @@ pub async fn do_processor( transactions_pb.transactions, start_version, end_version, - // TODO: Change how we fetch chain_id, ideally can be accessed by processors when they are initiallized (e.g. so they can have a chain_id field set on new() funciton) + // TODO: Change how we fetch chain_id, ideally can be accessed by processors when they are initialized (e.g. so they can have a chain_id field set on new() funciton) Some(db_chain_id), ) .await; From a22346613be1013816c9c3cf81878c36e0c09495 Mon Sep 17 00:00:00 2001 From: CapCap Date: Mon, 22 Jan 2024 22:45:13 -0800 Subject: [PATCH 07/28] lint --- rust/processor/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/processor/Cargo.toml b/rust/processor/Cargo.toml index 47e02a34..1899d2a6 100644 --- a/rust/processor/Cargo.toml +++ b/rust/processor/Cargo.toml @@ -17,7 +17,6 @@ ahash = { workspace = true } anyhow = { workspace = true } aptos-moving-average = { workspace = true } aptos-protos = { workspace = true } -kanal = { workspace = true } async-trait = { workspace = true } base64 = { workspace = true } bcs = { workspace = true } @@ -36,6 +35,7 @@ gcloud-sdk = { workspace = true } google-cloud-googleapis = { workspace = true } google-cloud-pubsub = { workspace = true } hex = { workspace = true } +kanal = { workspace = true } once_cell = { workspace = true } prometheus = { workspace = true } prost = { workspace = true } From b34f5b8bd52a70f09873022b92005d1d6381ab4a Mon Sep 17 00:00:00 2001 From: CapCap Date: Mon, 22 Jan 2024 22:56:51 -0800 Subject: [PATCH 08/28] loop --- rust/processor/src/worker.rs | 328 ++++++++++++++++++----------------- 1 file changed, 165 insertions(+), 163 deletions(-) diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 3aafc9eb..018cce47 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -242,192 +242,194 @@ impl Worker { let receiver_clone = receiver.clone(); let join_handle = tokio::spawn(async move { - let txn_pb = match timeout( - Duration::from_secs(CONSUMER_THREAD_TIMEOUT_IN_SECS), - receiver_clone.recv(), - ) - .await - { - Ok(txn_pb) => match txn_pb { - Ok(txn_pb) => txn_pb, - // This happens when the channel is closed. We should panic. - Err(_e) => { + loop { + let txn_pb = match timeout( + Duration::from_secs(CONSUMER_THREAD_TIMEOUT_IN_SECS), + receiver_clone.recv(), + ) + .await + { + Ok(txn_pb) => match txn_pb { + Ok(txn_pb) => txn_pb, + // This happens when the channel is closed. We should panic. + Err(_e) => { + error!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + stream_address = stream_address, + "[Parser][T#{}] Channel closed; stream ended.", + task_index + ); + panic!("[Parser][T#{}] Channel closed", task_index); + }, + }, + Err(_) => { error!( processor_name = processor_name, service_type = PROCESSOR_SERVICE_TYPE, stream_address = stream_address, - "[Parser][T#{}] Channel closed; stream ended.", + "[Parser][T#{}] Consumer thread timed out waiting for transactions", + task_index + ); + panic!( + "[Parser][T#{}] Consumer thread timed out waiting for transactions", task_index ); - panic!("[Parser][T#{}] Channel closed", task_index); }, - }, - Err(_) => { - error!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - stream_address = stream_address, - "[Parser][T#{}] Consumer thread timed out waiting for transactions", - task_index - ); - panic!( - "[Parser][T#{}] Consumer thread timed out waiting for transactions", - task_index - ); - }, - }; + }; - let size_in_bytes = txn_pb.size_in_bytes as f64; - let first_txn = txn_pb.transactions.as_slice().first().unwrap(); - let first_txn_version = first_txn.version; - let first_txn_timestamp = first_txn.timestamp.clone(); - let last_txn = txn_pb.transactions.as_slice().last().unwrap(); - let last_txn_version = last_txn.version; - let last_txn_timestamp = last_txn.timestamp.clone(); + let size_in_bytes = txn_pb.size_in_bytes as f64; + let first_txn = txn_pb.transactions.as_slice().first().unwrap(); + let first_txn_version = first_txn.version; + let first_txn_timestamp = first_txn.timestamp.clone(); + let last_txn = txn_pb.transactions.as_slice().last().unwrap(); + let last_txn_version = last_txn.version; + let last_txn_timestamp = last_txn.timestamp.clone(); - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version = first_txn_version, - end_version = last_txn_version, - num_of_transactions = (last_txn_version - first_txn_version) as i64 + 1, - size_in_bytes, - duration_in_secs = txn_channel_fetch_latency.elapsed().as_secs_f64(), - tps = (last_txn_version as f64 - first_txn_version as f64) - / txn_channel_fetch_latency.elapsed().as_secs_f64(), - bytes_per_sec = - size_in_bytes / txn_channel_fetch_latency.elapsed().as_secs_f64(), - "[Parser][T#{}] Successfully fetched transaction batches from channel.", - task_index - ); - - // Ensure chain_id has not changed - if txn_pb.chain_id != chain_id { - error!( + info!( processor_name = processor_name, - stream_address = stream_address, - chain_id = txn_pb.chain_id, - existing_id = chain_id, - "[Parser][T#{}] Stream somehow changed chain id!", + service_type = PROCESSOR_SERVICE_TYPE, + start_version = first_txn_version, + end_version = last_txn_version, + num_of_transactions = (last_txn_version - first_txn_version) as i64 + 1, + size_in_bytes, + duration_in_secs = txn_channel_fetch_latency.elapsed().as_secs_f64(), + tps = (last_txn_version as f64 - first_txn_version as f64) + / txn_channel_fetch_latency.elapsed().as_secs_f64(), + bytes_per_sec = + size_in_bytes / txn_channel_fetch_latency.elapsed().as_secs_f64(), + "[Parser][T#{}] Successfully fetched transaction batches from channel.", task_index ); - panic!( - "[Parser][T#{}] Stream somehow changed chain id!", - task_index - ); - } - let processing_time = std::time::Instant::now(); - - let res = do_processor( - txn_pb, - processor_clone.clone(), - chain_id, - processor_name, - &auth_token, - enable_verbose_logging, - ) - .await; - - let _processed = match res { - Ok(versions) => { - PROCESSOR_SUCCESSES_COUNT - .with_label_values(&[processor_name]) - .inc(); - versions - }, - Err(e) => { + // Ensure chain_id has not changed + if txn_pb.chain_id != chain_id { error!( processor_name = processor_name, stream_address = stream_address, - error = ?e, - "[Parser][T#{}] Error processing transactions", task_index + chain_id = txn_pb.chain_id, + existing_id = chain_id, + "[Parser][T#{}] Stream somehow changed chain id!", + task_index ); - PROCESSOR_ERRORS_COUNT - .with_label_values(&[processor_name]) - .inc(); panic!( - "[Parser][T#{}] Error processing '{:}' transactions: {:?}", - task_index, processor_name, e + "[Parser][T#{}] Stream somehow changed chain id!", + task_index ); - }, - }; - - /* - let mut max_processing_duration_in_secs: f64 = processed.processing_duration_in_secs; - let mut max_db_insertion_duration_in_secs: f64 = processed.db_insertion_duration_in_secs; - max_processing_duration_in_secs = - max_processing_duration_in_secs.max(processed.processing_duration_in_secs); - max_db_insertion_duration_in_secs = - max_db_insertion_duration_in_secs.max(processed.db_insertion_duration_in_secs); - */ - - processor_clone - .update_last_processed_version(last_txn_version, last_txn_timestamp.clone()) - .await - .unwrap(); + } - let mut ma_locked = ma_clone.lock().await; - ma_locked.tick_now(last_txn_version - first_txn_version + 1); - let tps = (ma_locked.avg() * 1000.0) as u64; - info!( - processor_name = processor_name, - service_type = PROCESSOR_SERVICE_TYPE, - start_version = first_txn_version, - end_version = last_txn_version, - start_txn_timestamp_iso = first_txn_timestamp - .clone() - .map(|t| timestamp_to_iso(&t)) - .unwrap_or_default(), - end_txn_timestamp_iso = last_txn_timestamp - .map(|t| timestamp_to_iso(&t)) - .unwrap_or_default(), - num_of_transactions = last_txn_version - first_txn_version + 1, - task_count = concurrent_tasks, - size_in_bytes, - duration_in_secs = processing_time.elapsed().as_secs_f64(), - tps = tps, - bytes_per_sec = size_in_bytes / processing_time.elapsed().as_secs_f64(), - step = ProcessorStep::ProcessedMultipleBatches.get_step(), - "{}", - ProcessorStep::ProcessedMultipleBatches.get_label(), - ); - LATEST_PROCESSED_VERSION - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .set(last_txn_version as i64); - // TODO: do an atomic thing, or ideally move to an async metrics collector! - TRANSACTION_UNIX_TIMESTAMP - .with_label_values(&[ + let processing_time = std::time::Instant::now(); + + let res = do_processor( + txn_pb, + processor_clone.clone(), + chain_id, processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .set( - first_txn_timestamp - .map(|t| timestamp_to_unixtime(&t)) + &auth_token, + enable_verbose_logging, + ) + .await; + + let _processed = match res { + Ok(versions) => { + PROCESSOR_SUCCESSES_COUNT + .with_label_values(&[processor_name]) + .inc(); + versions + }, + Err(e) => { + error!( + processor_name = processor_name, + stream_address = stream_address, + error = ?e, + "[Parser][T#{}] Error processing transactions", task_index + ); + PROCESSOR_ERRORS_COUNT + .with_label_values(&[processor_name]) + .inc(); + panic!( + "[Parser][T#{}] Error processing '{:}' transactions: {:?}", + task_index, processor_name, e + ); + }, + }; + + /* + let mut max_processing_duration_in_secs: f64 = processed.processing_duration_in_secs; + let mut max_db_insertion_duration_in_secs: f64 = processed.db_insertion_duration_in_secs; + max_processing_duration_in_secs = + max_processing_duration_in_secs.max(processed.processing_duration_in_secs); + max_db_insertion_duration_in_secs = + max_db_insertion_duration_in_secs.max(processed.db_insertion_duration_in_secs); + */ + + processor_clone + .update_last_processed_version(last_txn_version, last_txn_timestamp.clone()) + .await + .unwrap(); + + let mut ma_locked = ma_clone.lock().await; + ma_locked.tick_now(last_txn_version - first_txn_version + 1); + let tps = (ma_locked.avg() * 1000.0) as u64; + info!( + processor_name = processor_name, + service_type = PROCESSOR_SERVICE_TYPE, + start_version = first_txn_version, + end_version = last_txn_version, + start_txn_timestamp_iso = first_txn_timestamp + .clone() + .map(|t| timestamp_to_iso(&t)) .unwrap_or_default(), - ); - PROCESSED_BYTES_COUNT - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), - ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .inc_by(size_in_bytes as u64); - NUM_TRANSACTIONS_PROCESSED_COUNT - .with_label_values(&[ - processor_name, - ProcessorStep::ProcessedMultipleBatches.get_step(), + end_txn_timestamp_iso = last_txn_timestamp + .map(|t| timestamp_to_iso(&t)) + .unwrap_or_default(), + num_of_transactions = last_txn_version - first_txn_version + 1, + task_count = concurrent_tasks, + size_in_bytes, + duration_in_secs = processing_time.elapsed().as_secs_f64(), + tps = tps, + bytes_per_sec = size_in_bytes / processing_time.elapsed().as_secs_f64(), + step = ProcessorStep::ProcessedMultipleBatches.get_step(), + "{}", ProcessorStep::ProcessedMultipleBatches.get_label(), - ]) - .inc_by(last_txn_version - first_txn_version + 1); - MULTI_BATCH_PROCESSING_TIME_IN_SECS - .with_label_values(&[processor_name]) - .set(processing_time.elapsed().as_secs_f64()); + ); + LATEST_PROCESSED_VERSION + .with_label_values(&[ + processor_name, + ProcessorStep::ProcessedMultipleBatches.get_step(), + ProcessorStep::ProcessedMultipleBatches.get_label(), + ]) + .set(last_txn_version as i64); + // TODO: do an atomic thing, or ideally move to an async metrics collector! + TRANSACTION_UNIX_TIMESTAMP + .with_label_values(&[ + processor_name, + ProcessorStep::ProcessedMultipleBatches.get_step(), + ProcessorStep::ProcessedMultipleBatches.get_label(), + ]) + .set( + first_txn_timestamp + .map(|t| timestamp_to_unixtime(&t)) + .unwrap_or_default(), + ); + PROCESSED_BYTES_COUNT + .with_label_values(&[ + processor_name, + ProcessorStep::ProcessedMultipleBatches.get_step(), + ProcessorStep::ProcessedMultipleBatches.get_label(), + ]) + .inc_by(size_in_bytes as u64); + NUM_TRANSACTIONS_PROCESSED_COUNT + .with_label_values(&[ + processor_name, + ProcessorStep::ProcessedMultipleBatches.get_step(), + ProcessorStep::ProcessedMultipleBatches.get_label(), + ]) + .inc_by(last_txn_version - first_txn_version + 1); + MULTI_BATCH_PROCESSING_TIME_IN_SECS + .with_label_values(&[processor_name]) + .set(processing_time.elapsed().as_secs_f64()); + } }); tasks.push(join_handle); From 58233bbd1dfedddfd6b5cdb4aba37575182a702a Mon Sep 17 00:00:00 2001 From: CapCap Date: Mon, 22 Jan 2024 23:21:02 -0800 Subject: [PATCH 09/28] custom runtime --- rust/Cargo.lock | 1 + rust/processor/Cargo.toml | 1 + rust/processor/src/main.rs | 24 +++++++++++++++++++----- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 38dae1c4..e905ba11 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1931,6 +1931,7 @@ dependencies = [ "hex", "kanal", "native-tls", + "num_cpus", "once_cell", "postgres-native-tls", "prometheus", diff --git a/rust/processor/Cargo.toml b/rust/processor/Cargo.toml index 1899d2a6..1ab6b511 100644 --- a/rust/processor/Cargo.toml +++ b/rust/processor/Cargo.toml @@ -56,4 +56,5 @@ url = { workspace = true } # Postgres SSL native-tls = "0.2.11" +num_cpus = "1.16.0" postgres-native-tls = "0.5.0" diff --git a/rust/processor/src/main.rs b/rust/processor/src/main.rs index dc1f4659..ed8de400 100644 --- a/rust/processor/src/main.rs +++ b/rust/processor/src/main.rs @@ -6,9 +6,23 @@ use clap::Parser; use processor::IndexerGrpcProcessorConfig; use server_framework::ServerArgs; -#[tokio::main(flavor = "multi_thread")] -async fn main() -> Result<()> { - let args = ServerArgs::parse(); - args.run::(tokio::runtime::Handle::current()) - .await +const RUNTIME_WORKER_MULTIPLIER: usize = 2; + +fn main() -> Result<()> { + let worker_threads = (num_cpus::get() * RUNTIME_WORKER_MULTIPLIER).min(16); + println!( + "Starting processor tokio runtime with worker_threads: {}", + worker_threads + ); + + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads((num_cpus::get() * RUNTIME_WORKER_MULTIPLIER).min(16)) + .build() + .unwrap() + .block_on(async { + let args = ServerArgs::parse(); + args.run::(tokio::runtime::Handle::current()) + .await + }) } From 45d3d879e73e78f0aa0098aa77931c6657113ec9 Mon Sep 17 00:00:00 2001 From: CapCap Date: Mon, 22 Jan 2024 23:29:19 -0800 Subject: [PATCH 10/28] bigger queues --- rust/processor/src/grpc_stream.rs | 10 +++++++--- rust/processor/src/worker.rs | 6 +----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/rust/processor/src/grpc_stream.rs b/rust/processor/src/grpc_stream.rs index 0d0344ea..a115938e 100644 --- a/rust/processor/src/grpc_stream.rs +++ b/rust/processor/src/grpc_stream.rs @@ -26,6 +26,10 @@ const GRPC_API_GATEWAY_API_KEY_HEADER: &str = "authorization"; const GRPC_REQUEST_NAME_HEADER: &str = "x-aptos-request-name"; /// GRPC connection id const GRPC_CONNECTION_ID: &str = "x-aptos-connection-id"; +/// We will try to reconnect to GRPC 5 times in case upstream connection is being updated +pub const RECONNECTION_MAX_RETRIES: u64 = 65; +/// 80MB +pub const MAX_RESPONSE_SIZE: usize = 1024 * 1024 * 80; pub fn grpc_request_builder( starting_version: u64, @@ -99,8 +103,8 @@ pub async fn get_stream( Ok(client) => client .accept_compressed(tonic::codec::CompressionEncoding::Gzip) .send_compressed(tonic::codec::CompressionEncoding::Gzip) - .max_decoding_message_size(crate::worker::MAX_RESPONSE_SIZE) - .max_encoding_message_size(crate::worker::MAX_RESPONSE_SIZE), + .max_decoding_message_size(MAX_RESPONSE_SIZE) + .max_encoding_message_size(MAX_RESPONSE_SIZE), Err(e) => { error!( processor_name = processor_name, @@ -461,7 +465,7 @@ pub async fn create_fetcher_loop( // TODO: Turn this into exponential backoff tokio::time::sleep(std::time::Duration::from_millis(100)).await; - if reconnection_retries >= crate::worker::RECONNECTION_MAX_RETRIES { + if reconnection_retries >= RECONNECTION_MAX_RETRIES { error!( processor_name = processor_name, service_type = crate::worker::PROCESSOR_SERVICE_TYPE, diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 018cce47..8d835378 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -39,11 +39,7 @@ use url::Url; // this is how large the fetch queue should be. Each bucket should have a max of 80MB or so, so a batch // of 50 means that we could potentially have at least 4.8GB of data in memory at any given time and that we should provision // machines accordingly. -pub const BUFFER_SIZE: usize = 100; -// 40MB -pub const MAX_RESPONSE_SIZE: usize = 1024 * 1024 * 40; -// We will try to reconnect to GRPC 5 times in case upstream connection is being updated -pub const RECONNECTION_MAX_RETRIES: u64 = 65; +pub const BUFFER_SIZE: usize = 300; // Consumer thread will wait X seconds before panicking if it doesn't receive any data pub const CONSUMER_THREAD_TIMEOUT_IN_SECS: u64 = 60 * 5; pub const PROCESSOR_SERVICE_TYPE: &str = "processor"; From 7af45ce1d3f3bd04cd2e9550ddecfb782a7cac44 Mon Sep 17 00:00:00 2001 From: CapCap Date: Mon, 22 Jan 2024 23:30:25 -0800 Subject: [PATCH 11/28] bigger message size just incase --- rust/processor/src/grpc_stream.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/processor/src/grpc_stream.rs b/rust/processor/src/grpc_stream.rs index a115938e..2ee9972b 100644 --- a/rust/processor/src/grpc_stream.rs +++ b/rust/processor/src/grpc_stream.rs @@ -28,8 +28,8 @@ const GRPC_REQUEST_NAME_HEADER: &str = "x-aptos-request-name"; const GRPC_CONNECTION_ID: &str = "x-aptos-connection-id"; /// We will try to reconnect to GRPC 5 times in case upstream connection is being updated pub const RECONNECTION_MAX_RETRIES: u64 = 65; -/// 80MB -pub const MAX_RESPONSE_SIZE: usize = 1024 * 1024 * 80; +/// 256MB +pub const MAX_RESPONSE_SIZE: usize = 1024 * 1024 * 256; pub fn grpc_request_builder( starting_version: u64, From 4d134242fbf9fc17b0ee39c18971955d8eceb2b1 Mon Sep 17 00:00:00 2001 From: CapCap Date: Tue, 23 Jan 2024 00:00:36 -0800 Subject: [PATCH 12/28] log cpu num and thread count --- rust/processor/src/main.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/rust/processor/src/main.rs b/rust/processor/src/main.rs index ed8de400..279a2d19 100644 --- a/rust/processor/src/main.rs +++ b/rust/processor/src/main.rs @@ -5,14 +5,17 @@ use anyhow::Result; use clap::Parser; use processor::IndexerGrpcProcessorConfig; use server_framework::ServerArgs; +use tracing::info; const RUNTIME_WORKER_MULTIPLIER: usize = 2; fn main() -> Result<()> { - let worker_threads = (num_cpus::get() * RUNTIME_WORKER_MULTIPLIER).min(16); - println!( - "Starting processor tokio runtime with worker_threads: {}", - worker_threads + let num_cpus = num_cpus::get(); + let worker_threads = (num_cpus * RUNTIME_WORKER_MULTIPLIER).min(16); + info!( + num_cpus = num_cpus, + num_worker_threads = worker_threads, + "[Processor] Starting processor tokio runtime", ); tokio::runtime::Builder::new_multi_thread() From 9af57883f5136fb7a2fad7c69867f36b9f4a1e1c Mon Sep 17 00:00:00 2001 From: CapCap Date: Tue, 23 Jan 2024 10:37:26 -0800 Subject: [PATCH 13/28] db pool to 650 --- rust/processor/src/utils/database.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/processor/src/utils/database.rs b/rust/processor/src/utils/database.rs index 958e36be..30f0e7fb 100644 --- a/rust/processor/src/utils/database.rs +++ b/rust/processor/src/utils/database.rs @@ -130,7 +130,7 @@ pub async fn new_db_pool(database_url: &str) -> Result { }; let pool = Pool::builder() // TODO: MAKE THIS CONFIGURABLE! - .max_size(800) + .max_size(650) .build(config) .await?; Ok(Arc::new(pool)) From dd0d3497a117433b5812079cd557ae6a6d9a0fb2 Mon Sep 17 00:00:00 2001 From: CapCap Date: Tue, 23 Jan 2024 12:41:41 -0800 Subject: [PATCH 14/28] ramp up to 30 d --- rust/processor/src/utils/database.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/processor/src/utils/database.rs b/rust/processor/src/utils/database.rs index 30f0e7fb..0ced20a0 100644 --- a/rust/processor/src/utils/database.rs +++ b/rust/processor/src/utils/database.rs @@ -45,7 +45,7 @@ pub const MAX_DIESEL_PARAM_SIZE: u16 = u16::MAX / 2; /// we may need to chunk an array of items based on how many columns are in the table. /// This function returns boundaries of chunks in the form of (start_index, end_index) pub fn get_chunks(num_items_to_insert: usize, _column_count: usize) -> Vec<(usize, usize)> { - let max_item_size = 20; // MAX_DIESEL_PARAM_SIZE as usize / column_count; + let max_item_size = 30; // MAX_DIESEL_PARAM_SIZE as usize / column_count; let mut chunk: (usize, usize) = (0, min(num_items_to_insert, max_item_size)); let mut chunks = vec![chunk]; while chunk.1 != num_items_to_insert { From 5e035f1b471675a0d6cd437c81ea91d898134922 Mon Sep 17 00:00:00 2001 From: CapCap Date: Tue, 23 Jan 2024 14:29:53 -0800 Subject: [PATCH 15/28] add metric for pb fetch wait times --- rust/processor/src/utils/counters.rs | 10 ++++++++++ rust/processor/src/worker.rs | 21 ++++++++++++++------- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/rust/processor/src/utils/counters.rs b/rust/processor/src/utils/counters.rs index 06e38494..84e73c0e 100644 --- a/rust/processor/src/utils/counters.rs +++ b/rust/processor/src/utils/counters.rs @@ -145,6 +145,16 @@ pub static PROCESSED_BYTES_COUNT: Lazy = Lazy::new(|| { .unwrap() }); +/// The amount of time that a thread spent waiting for a protobuf bundle of transactions +pub static PB_CHANNEL_FETCH_WAIT_TIME_SECS: Lazy = Lazy::new(|| { + register_gauge_vec!( + "indexer_processor_pb_channel_fetch_wait_time_secs", + "Count of bytes processed", + &["processor_name", "thread_number"] + ) + .unwrap() +}); + /// Count of transactions processed. pub static NUM_TRANSACTIONS_PROCESSED_COUNT: Lazy = Lazy::new(|| { register_int_counter_vec!( diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 8d835378..d4d7d68d 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -17,9 +17,10 @@ use crate::{ utils::{ counters::{ ProcessorStep, LATEST_PROCESSED_VERSION, MULTI_BATCH_PROCESSING_TIME_IN_SECS, - NUM_TRANSACTIONS_PROCESSED_COUNT, PROCESSED_BYTES_COUNT, - PROCESSOR_DATA_PROCESSED_LATENCY_IN_SECS, PROCESSOR_DATA_RECEIVED_LATENCY_IN_SECS, - PROCESSOR_ERRORS_COUNT, PROCESSOR_INVOCATIONS_COUNT, PROCESSOR_SUCCESSES_COUNT, + NUM_TRANSACTIONS_PROCESSED_COUNT, PB_CHANNEL_FETCH_WAIT_TIME_SECS, + PROCESSED_BYTES_COUNT, PROCESSOR_DATA_PROCESSED_LATENCY_IN_SECS, + PROCESSOR_DATA_RECEIVED_LATENCY_IN_SECS, PROCESSOR_ERRORS_COUNT, + PROCESSOR_INVOCATIONS_COUNT, PROCESSOR_SUCCESSES_COUNT, SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS, SINGLE_BATCH_PARSING_TIME_IN_SECS, SINGLE_BATCH_PROCESSING_TIME_IN_SECS, TRANSACTION_UNIX_TIMESTAMP, }, @@ -239,12 +240,18 @@ impl Worker { let join_handle = tokio::spawn(async move { loop { - let txn_pb = match timeout( + let pb_channel_fetch_time = std::time::Instant::now(); + let txn_pb_res = timeout( Duration::from_secs(CONSUMER_THREAD_TIMEOUT_IN_SECS), receiver_clone.recv(), ) - .await - { + .await; + // Track how much time this thread spent waiting for a pb bundle + PB_CHANNEL_FETCH_WAIT_TIME_SECS + .with_label_values(&[processor_name, &task_index.to_string()]) + .set(pb_channel_fetch_time.elapsed().as_secs_f64()); + + let txn_pb = match txn_pb_res { Ok(txn_pb) => match txn_pb { Ok(txn_pb) => txn_pb, // This happens when the channel is closed. We should panic. @@ -259,7 +266,7 @@ impl Worker { panic!("[Parser][T#{}] Channel closed", task_index); }, }, - Err(_) => { + Err(_e) => { error!( processor_name = processor_name, service_type = PROCESSOR_SERVICE_TYPE, From b316cb5d6708ad5a3644d62d0053d1a092479f95 Mon Sep 17 00:00:00 2001 From: CapCap Date: Tue, 23 Jan 2024 14:32:37 -0800 Subject: [PATCH 16/28] db up --- rust/processor/src/utils/database.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/processor/src/utils/database.rs b/rust/processor/src/utils/database.rs index 0ced20a0..3bce97ba 100644 --- a/rust/processor/src/utils/database.rs +++ b/rust/processor/src/utils/database.rs @@ -45,7 +45,7 @@ pub const MAX_DIESEL_PARAM_SIZE: u16 = u16::MAX / 2; /// we may need to chunk an array of items based on how many columns are in the table. /// This function returns boundaries of chunks in the form of (start_index, end_index) pub fn get_chunks(num_items_to_insert: usize, _column_count: usize) -> Vec<(usize, usize)> { - let max_item_size = 30; // MAX_DIESEL_PARAM_SIZE as usize / column_count; + let max_item_size = 40; // MAX_DIESEL_PARAM_SIZE as usize / column_count; let mut chunk: (usize, usize) = (0, min(num_items_to_insert, max_item_size)); let mut chunks = vec![chunk]; while chunk.1 != num_items_to_insert { @@ -130,7 +130,7 @@ pub async fn new_db_pool(database_url: &str) -> Result { }; let pool = Pool::builder() // TODO: MAKE THIS CONFIGURABLE! - .max_size(650) + .max_size(800) .build(config) .await?; Ok(Arc::new(pool)) From a1bad1f1dcc4162f11fea42bc264e691206d9803 Mon Sep 17 00:00:00 2001 From: CapCap Date: Tue, 23 Jan 2024 15:23:34 -0800 Subject: [PATCH 17/28] cpus --- rust/processor/src/main.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/rust/processor/src/main.rs b/rust/processor/src/main.rs index 279a2d19..8fdfbd4f 100644 --- a/rust/processor/src/main.rs +++ b/rust/processor/src/main.rs @@ -11,16 +11,15 @@ const RUNTIME_WORKER_MULTIPLIER: usize = 2; fn main() -> Result<()> { let num_cpus = num_cpus::get(); - let worker_threads = (num_cpus * RUNTIME_WORKER_MULTIPLIER).min(16); - info!( - num_cpus = num_cpus, - num_worker_threads = worker_threads, - "[Processor] Starting processor tokio runtime", + let worker_threads = (num_cpus * RUNTIME_WORKER_MULTIPLIER).max(16); + println!( + "[Processor] Starting processor tokio runtime: num_cpus={}, worker_threads={}", + num_cpus, worker_threads ); tokio::runtime::Builder::new_multi_thread() .enable_all() - .worker_threads((num_cpus::get() * RUNTIME_WORKER_MULTIPLIER).min(16)) + .worker_threads(worker_threads) .build() .unwrap() .block_on(async { From d7d491aa1a0fdd9aed0b2b61dc74e3adfa7d1e83 Mon Sep 17 00:00:00 2001 From: CapCap Date: Tue, 23 Jan 2024 15:29:12 -0800 Subject: [PATCH 18/28] lint --- rust/processor/src/main.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/processor/src/main.rs b/rust/processor/src/main.rs index 8fdfbd4f..62e4de53 100644 --- a/rust/processor/src/main.rs +++ b/rust/processor/src/main.rs @@ -5,7 +5,6 @@ use anyhow::Result; use clap::Parser; use processor::IndexerGrpcProcessorConfig; use server_framework::ServerArgs; -use tracing::info; const RUNTIME_WORKER_MULTIPLIER: usize = 2; From 2855174fedb4e120be680ca450792346c77b62f8 Mon Sep 17 00:00:00 2001 From: CapCap Date: Wed, 24 Jan 2024 01:47:50 -0800 Subject: [PATCH 19/28] multiple fetchers --- rust/processor/src/worker.rs | 48 ++++++++++++++++++++++++++++++------ 1 file changed, 40 insertions(+), 8 deletions(-) diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index d4d7d68d..91dda526 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -40,7 +40,7 @@ use url::Url; // this is how large the fetch queue should be. Each bucket should have a max of 80MB or so, so a batch // of 50 means that we could potentially have at least 4.8GB of data in memory at any given time and that we should provision // machines accordingly. -pub const BUFFER_SIZE: usize = 300; +pub const BUFFER_SIZE: usize = 100; // Consumer thread will wait X seconds before panicking if it doesn't receive any data pub const CONSUMER_THREAD_TIMEOUT_IN_SECS: u64 = 60 * 5; pub const PROCESSOR_SERVICE_TYPE: &str = "processor"; @@ -200,19 +200,51 @@ impl Worker { start_version = starting_version, "[Parser] Starting fetcher thread" ); - crate::grpc_stream::create_fetcher_loop( - tx, - indexer_grpc_data_service_address, + + let chunk_size = 1_000_000; + + let loop1 = crate::grpc_stream::create_fetcher_loop( + tx.clone(), + indexer_grpc_data_service_address.clone(), indexer_grpc_http2_ping_interval, indexer_grpc_http2_ping_timeout, starting_version, - request_ending_version, - auth_token, + request_ending_version.or(Some(starting_version + chunk_size)), + auth_token.clone(), processor_name.to_string(), starting_version, BUFFER_SIZE, - ) - .await + ); + + let loop2 = crate::grpc_stream::create_fetcher_loop( + tx.clone(), + indexer_grpc_data_service_address.clone(), + indexer_grpc_http2_ping_interval, + indexer_grpc_http2_ping_timeout, + starting_version + chunk_size, + Some(starting_version + chunk_size + chunk_size), + auth_token.clone(), + processor_name.to_string(), + starting_version, + BUFFER_SIZE, + ); + + let loop3 = crate::grpc_stream::create_fetcher_loop( + tx.clone(), + indexer_grpc_data_service_address.clone(), + indexer_grpc_http2_ping_interval, + indexer_grpc_http2_ping_timeout, + starting_version + chunk_size, + Some(starting_version + chunk_size + chunk_size), + auth_token.clone(), + processor_name.to_string(), + starting_version, + BUFFER_SIZE, + ); + + // join all 3 tasks + let _ = futures::future::join_all(vec![loop1, loop2, loop3]).await; + unreachable!("At least one PB fetcher tasks should run forever"); }); // This is the consumer side of the channel. These are the major states: From e4eb23d0dc6348289802c6c2f046e04aa7129bcf Mon Sep 17 00:00:00 2001 From: CapCap Date: Wed, 24 Jan 2024 02:23:09 -0800 Subject: [PATCH 20/28] try 2 --- rust/processor/src/grpc_stream.rs | 35 ++++++++++++++++--------------- rust/processor/src/worker.rs | 12 +++++------ 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/rust/processor/src/grpc_stream.rs b/rust/processor/src/grpc_stream.rs index 2ee9972b..619dbaa8 100644 --- a/rust/processor/src/grpc_stream.rs +++ b/rust/processor/src/grpc_stream.rs @@ -75,11 +75,11 @@ pub async fn get_stream( let channel = tonic::transport::Channel::from_shared( indexer_grpc_data_service_address.to_string(), ) - .expect( - "[Parser] Failed to build GRPC channel, perhaps because the data service URL is invalid", - ) - .http2_keep_alive_interval(indexer_grpc_http2_ping_interval) - .keep_alive_timeout(indexer_grpc_http2_ping_timeout); + .expect( + "[Parser] Failed to build GRPC channel, perhaps because the data service URL is invalid", + ) + .http2_keep_alive_interval(indexer_grpc_http2_ping_interval) + .keep_alive_timeout(indexer_grpc_http2_ping_timeout); // If the scheme is https, add a TLS config. let channel = if indexer_grpc_data_service_address.scheme() == "https" { @@ -116,7 +116,7 @@ pub async fn get_stream( "[Parser] Error connecting to GRPC client" ); panic!("[Parser] Error connecting to GRPC client"); - }, + } }; let count = ending_version.map(|v| (v as i64 - starting_version as i64 + 1) as u64); info!( @@ -157,7 +157,7 @@ pub async fn get_chain_id( auth_token.clone(), processor_name.to_string(), ) - .await; + .await; let connection_id = match response.metadata().get(GRPC_CONNECTION_ID) { Some(connection_id) => connection_id.to_str().unwrap().to_string(), None => "".to_string(), @@ -183,7 +183,7 @@ pub async fn get_chain_id( "[Parser] Error receiving datastream response for chain id" ); panic!("[Parser] Error receiving datastream response for chain id"); - }, + } None => { error!( processor_name = processor_name, @@ -193,7 +193,7 @@ pub async fn get_chain_id( "[Parser] Stream ended before getting response fo for chain id" ); panic!("[Parser] Stream ended before getting response fo for chain id"); - }, + } } } @@ -235,7 +235,7 @@ pub async fn create_fetcher_loop( auth_token.clone(), processor_name.to_string(), ) - .await; + .await; let mut connection_id = match response.metadata().get(GRPC_CONNECTION_ID) { Some(connection_id) => connection_id.to_str().unwrap().to_string(), None => "".to_string(), @@ -295,6 +295,7 @@ pub async fn create_fetcher_loop( ); let current_fetched_version = start_version; + // TODO: FIX THIS NONSENSICAL ERROR if last_fetched_version + 1 != current_fetched_version as i64 { error!( batch_start_version = batch_start_version, @@ -302,7 +303,7 @@ pub async fn create_fetcher_loop( current_fetched_version = current_fetched_version, "[Parser] Received batch with gap from GRPC stream" ); - panic!("[Parser] Received batch with gap from GRPC stream"); + // panic!("[Parser] Received batch with gap from GRPC stream"); } last_fetched_version = end_version as i64; batch_start_version = (last_fetched_version + 1) as u64; @@ -356,7 +357,7 @@ pub async fn create_fetcher_loop( txn_pb.size_in_bytes as f64 / txn_channel_send_latency.elapsed().as_secs_f64(); match txn_sender.send(txn_pb).await { - Ok(()) => {}, + Ok(()) => {} Err(e) => { error!( processor_name = processor_name, @@ -367,7 +368,7 @@ pub async fn create_fetcher_loop( "[Parser] Error sending GRPC response to channel." ); panic!("[Parser] Error sending GRPC response to channel.") - }, + } } info!( processor_name = processor_name, @@ -388,7 +389,7 @@ pub async fn create_fetcher_loop( .set((buffer_size - txn_sender.capacity()) as i64); grpc_channel_recv_latency = std::time::Instant::now(); true - }, + } Some(Err(rpc_error)) => { tracing::warn!( processor_name = processor_name, @@ -401,7 +402,7 @@ pub async fn create_fetcher_loop( "[Parser] Error receiving datastream response." ); false - }, + } None => { tracing::warn!( processor_name = processor_name, @@ -413,7 +414,7 @@ pub async fn create_fetcher_loop( "[Parser] Stream ended." ); false - }, + } }; // Check if we're at the end of the stream let is_end = if let Some(ending_version) = request_ending_version { @@ -493,7 +494,7 @@ pub async fn create_fetcher_loop( auth_token.clone(), processor_name.to_string(), ) - .await; + .await; connection_id = match response.metadata().get(GRPC_CONNECTION_ID) { Some(connection_id) => connection_id.to_str().unwrap().to_string(), None => "".to_string(), diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 91dda526..8af27e53 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -209,14 +209,14 @@ impl Worker { indexer_grpc_http2_ping_interval, indexer_grpc_http2_ping_timeout, starting_version, - request_ending_version.or(Some(starting_version + chunk_size)), + request_ending_version.or(Some(starting_version + chunk_size + chunk_size)), auth_token.clone(), processor_name.to_string(), starting_version, BUFFER_SIZE, ); - let loop2 = crate::grpc_stream::create_fetcher_loop( + /*let loop2 = crate::grpc_stream::create_fetcher_loop( tx.clone(), indexer_grpc_data_service_address.clone(), indexer_grpc_http2_ping_interval, @@ -227,15 +227,15 @@ impl Worker { processor_name.to_string(), starting_version, BUFFER_SIZE, - ); + );*/ let loop3 = crate::grpc_stream::create_fetcher_loop( tx.clone(), indexer_grpc_data_service_address.clone(), indexer_grpc_http2_ping_interval, indexer_grpc_http2_ping_timeout, - starting_version + chunk_size, - Some(starting_version + chunk_size + chunk_size), + starting_version + chunk_size + chunk_size, + None, auth_token.clone(), processor_name.to_string(), starting_version, @@ -243,7 +243,7 @@ impl Worker { ); // join all 3 tasks - let _ = futures::future::join_all(vec![loop1, loop2, loop3]).await; + let _ = futures::future::join_all(vec![loop1, loop3]).await; unreachable!("At least one PB fetcher tasks should run forever"); }); From a73d30ba46d8ed2354f1002493ae1df0cbbc9bbc Mon Sep 17 00:00:00 2001 From: CapCap Date: Wed, 24 Jan 2024 02:26:41 -0800 Subject: [PATCH 21/28] try 2 --- rust/processor/src/grpc_stream.rs | 32 +++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/rust/processor/src/grpc_stream.rs b/rust/processor/src/grpc_stream.rs index 619dbaa8..3c451f71 100644 --- a/rust/processor/src/grpc_stream.rs +++ b/rust/processor/src/grpc_stream.rs @@ -75,11 +75,11 @@ pub async fn get_stream( let channel = tonic::transport::Channel::from_shared( indexer_grpc_data_service_address.to_string(), ) - .expect( - "[Parser] Failed to build GRPC channel, perhaps because the data service URL is invalid", - ) - .http2_keep_alive_interval(indexer_grpc_http2_ping_interval) - .keep_alive_timeout(indexer_grpc_http2_ping_timeout); + .expect( + "[Parser] Failed to build GRPC channel, perhaps because the data service URL is invalid", + ) + .http2_keep_alive_interval(indexer_grpc_http2_ping_interval) + .keep_alive_timeout(indexer_grpc_http2_ping_timeout); // If the scheme is https, add a TLS config. let channel = if indexer_grpc_data_service_address.scheme() == "https" { @@ -116,7 +116,7 @@ pub async fn get_stream( "[Parser] Error connecting to GRPC client" ); panic!("[Parser] Error connecting to GRPC client"); - } + }, }; let count = ending_version.map(|v| (v as i64 - starting_version as i64 + 1) as u64); info!( @@ -157,7 +157,7 @@ pub async fn get_chain_id( auth_token.clone(), processor_name.to_string(), ) - .await; + .await; let connection_id = match response.metadata().get(GRPC_CONNECTION_ID) { Some(connection_id) => connection_id.to_str().unwrap().to_string(), None => "".to_string(), @@ -183,7 +183,7 @@ pub async fn get_chain_id( "[Parser] Error receiving datastream response for chain id" ); panic!("[Parser] Error receiving datastream response for chain id"); - } + }, None => { error!( processor_name = processor_name, @@ -193,7 +193,7 @@ pub async fn get_chain_id( "[Parser] Stream ended before getting response fo for chain id" ); panic!("[Parser] Stream ended before getting response fo for chain id"); - } + }, } } @@ -235,7 +235,7 @@ pub async fn create_fetcher_loop( auth_token.clone(), processor_name.to_string(), ) - .await; + .await; let mut connection_id = match response.metadata().get(GRPC_CONNECTION_ID) { Some(connection_id) => connection_id.to_str().unwrap().to_string(), None => "".to_string(), @@ -357,7 +357,7 @@ pub async fn create_fetcher_loop( txn_pb.size_in_bytes as f64 / txn_channel_send_latency.elapsed().as_secs_f64(); match txn_sender.send(txn_pb).await { - Ok(()) => {} + Ok(()) => {}, Err(e) => { error!( processor_name = processor_name, @@ -368,7 +368,7 @@ pub async fn create_fetcher_loop( "[Parser] Error sending GRPC response to channel." ); panic!("[Parser] Error sending GRPC response to channel.") - } + }, } info!( processor_name = processor_name, @@ -389,7 +389,7 @@ pub async fn create_fetcher_loop( .set((buffer_size - txn_sender.capacity()) as i64); grpc_channel_recv_latency = std::time::Instant::now(); true - } + }, Some(Err(rpc_error)) => { tracing::warn!( processor_name = processor_name, @@ -402,7 +402,7 @@ pub async fn create_fetcher_loop( "[Parser] Error receiving datastream response." ); false - } + }, None => { tracing::warn!( processor_name = processor_name, @@ -414,7 +414,7 @@ pub async fn create_fetcher_loop( "[Parser] Stream ended." ); false - } + }, }; // Check if we're at the end of the stream let is_end = if let Some(ending_version) = request_ending_version { @@ -494,7 +494,7 @@ pub async fn create_fetcher_loop( auth_token.clone(), processor_name.to_string(), ) - .await; + .await; connection_id = match response.metadata().get(GRPC_CONNECTION_ID) { Some(connection_id) => connection_id.to_str().unwrap().to_string(), None => "".to_string(), From c6d296edd12d51855d6108a075495aa3e09f692a Mon Sep 17 00:00:00 2001 From: CapCap Date: Wed, 24 Jan 2024 02:37:26 -0800 Subject: [PATCH 22/28] try 3 --- rust/processor/src/worker.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 8af27e53..b47bb1ce 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -209,14 +209,14 @@ impl Worker { indexer_grpc_http2_ping_interval, indexer_grpc_http2_ping_timeout, starting_version, - request_ending_version.or(Some(starting_version + chunk_size + chunk_size)), + request_ending_version.or(Some(starting_version + chunk_size)), auth_token.clone(), processor_name.to_string(), starting_version, BUFFER_SIZE, ); - /*let loop2 = crate::grpc_stream::create_fetcher_loop( + let loop2 = crate::grpc_stream::create_fetcher_loop( tx.clone(), indexer_grpc_data_service_address.clone(), indexer_grpc_http2_ping_interval, @@ -227,7 +227,7 @@ impl Worker { processor_name.to_string(), starting_version, BUFFER_SIZE, - );*/ + ); let loop3 = crate::grpc_stream::create_fetcher_loop( tx.clone(), @@ -243,7 +243,7 @@ impl Worker { ); // join all 3 tasks - let _ = futures::future::join_all(vec![loop1, loop3]).await; + let _ = futures::future::join_all(vec![loop1, loop2, loop3]).await; unreachable!("At least one PB fetcher tasks should run forever"); }); From faea356e1b1e857af2508c8894cc171c5a2dcdbc Mon Sep 17 00:00:00 2001 From: jillxuu Date: Wed, 24 Jan 2024 10:28:00 -0800 Subject: [PATCH 23/28] fix migration issue on non transaction index migration --- .../migrations/2023-11-09-234725_delegator_balances_3/up.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/processor/migrations/2023-11-09-234725_delegator_balances_3/up.sql b/rust/processor/migrations/2023-11-09-234725_delegator_balances_3/up.sql index e822f960..ac6ea59d 100644 --- a/rust/processor/migrations/2023-11-09-234725_delegator_balances_3/up.sql +++ b/rust/processor/migrations/2023-11-09-234725_delegator_balances_3/up.sql @@ -1,2 +1,2 @@ -- Your SQL goes here -CREATE INDEX cdb_insat_index ON current_delegator_balances (inserted_at); +CREATE INDEX IF NOT EXISTS cdb_insat_index ON current_delegator_balances (inserted_at); From c4d1b7e7949fa8d7c1c9128f66dd8665a1bbaead Mon Sep 17 00:00:00 2001 From: CapCap Date: Mon, 29 Jan 2024 13:43:21 -0800 Subject: [PATCH 24/28] TEMPORARY TRY DISABLE DB --- rust/processor/src/utils/database.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/rust/processor/src/utils/database.rs b/rust/processor/src/utils/database.rs index 3bce97ba..0c0f1a5c 100644 --- a/rust/processor/src/utils/database.rs +++ b/rust/processor/src/utils/database.rs @@ -205,15 +205,17 @@ where } pub async fn execute_in_chunks( - conn: PgDbPool, - build_query: fn(Vec) -> (U, Option<&'static str>), - items_to_insert: &[T], - chunk_size: usize, + _conn: PgDbPool, + _build_query: fn(Vec) -> (U, Option<&'static str>), + _items_to_insert: &[T], + _chunk_size: usize, ) -> Result<(), diesel::result::Error> where U: QueryFragment + diesel::query_builder::QueryId + Send, T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone, { + // TEMPORARILY SKIP ACTUALLY WRITING, SO WE CAN TEST THE FETCHING SPEED + /* let chunks = get_chunks(items_to_insert.len(), chunk_size); let futures = chunks.into_iter().map(|(start_ind, end_ind)| { @@ -230,10 +232,11 @@ where }); for res in futures_util::future::join_all(futures).await { res?; - } + }*/ Ok(()) } +#[allow(dead_code)] async fn execute_or_retry_cleaned( conn: PgDbPool, build_query: fn(Vec) -> (U, Option<&'static str>), From 750cb7094c5b627c99c2f4a500a519767c89448c Mon Sep 17 00:00:00 2001 From: CapCap Date: Mon, 29 Jan 2024 17:46:39 -0800 Subject: [PATCH 25/28] dont update processor status either --- rust/processor/src/processors/mod.rs | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/rust/processor/src/processors/mod.rs b/rust/processor/src/processors/mod.rs index 6242d7f5..76536584 100644 --- a/rust/processor/src/processors/mod.rs +++ b/rust/processor/src/processors/mod.rs @@ -34,18 +34,12 @@ use self::{ token_v2_processor::TokenV2Processor, user_transaction_processor::UserTransactionProcessor, }; -use crate::{ - models::processor_status::ProcessorStatus, - schema::processor_status, - utils::{ - counters::{GOT_CONNECTION_COUNT, UNABLE_TO_GET_CONNECTION_COUNT}, - database::{execute_with_better_error, PgDbPool, PgPoolConnection}, - util::parse_timestamp, - }, +use crate::utils::{ + counters::{GOT_CONNECTION_COUNT, UNABLE_TO_GET_CONNECTION_COUNT}, + database::{PgDbPool, PgPoolConnection}, }; use aptos_protos::transaction::v1::Transaction as ProtoTransaction; use async_trait::async_trait; -use diesel::{pg::upsert::excluded, prelude::*}; use enum_dispatch::enum_dispatch; use serde::{Deserialize, Serialize}; use std::{fmt::Debug, sync::Arc}; @@ -115,9 +109,10 @@ pub trait ProcessorTrait: Send + Sync + Debug { /// versions are successful because any gap would cause the processor to panic async fn update_last_processed_version( &self, - version: u64, - last_transaction_timestamp: Option, + _version: u64, + _last_transaction_timestamp: Option, ) -> anyhow::Result<()> { + /* let timestamp = last_transaction_timestamp.map(|t| parse_timestamp(&t, version as i64)); let status = ProcessorStatus { processor: self.name().to_string(), @@ -139,7 +134,7 @@ pub trait ProcessorTrait: Send + Sync + Debug { )), Some(" WHERE processor_status.last_success_version <= EXCLUDED.last_success_version "), ) - .await?; + .await?;*/ Ok(()) } } From 9978c2a383b69b5bceba9b3dae82de17c3998896 Mon Sep 17 00:00:00 2001 From: CapCap Date: Mon, 29 Jan 2024 20:27:10 -0800 Subject: [PATCH 26/28] only single fetcher loop --- rust/processor/src/worker.rs | 39 ++++-------------------------------- 1 file changed, 4 insertions(+), 35 deletions(-) diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index b47bb1ce..68a3b665 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -201,50 +201,19 @@ impl Worker { "[Parser] Starting fetcher thread" ); - let chunk_size = 1_000_000; - - let loop1 = crate::grpc_stream::create_fetcher_loop( - tx.clone(), - indexer_grpc_data_service_address.clone(), - indexer_grpc_http2_ping_interval, - indexer_grpc_http2_ping_timeout, - starting_version, - request_ending_version.or(Some(starting_version + chunk_size)), - auth_token.clone(), - processor_name.to_string(), - starting_version, - BUFFER_SIZE, - ); - - let loop2 = crate::grpc_stream::create_fetcher_loop( + crate::grpc_stream::create_fetcher_loop( tx.clone(), indexer_grpc_data_service_address.clone(), indexer_grpc_http2_ping_interval, indexer_grpc_http2_ping_timeout, - starting_version + chunk_size, - Some(starting_version + chunk_size + chunk_size), - auth_token.clone(), - processor_name.to_string(), starting_version, - BUFFER_SIZE, - ); - - let loop3 = crate::grpc_stream::create_fetcher_loop( - tx.clone(), - indexer_grpc_data_service_address.clone(), - indexer_grpc_http2_ping_interval, - indexer_grpc_http2_ping_timeout, - starting_version + chunk_size + chunk_size, - None, + request_ending_version, auth_token.clone(), processor_name.to_string(), starting_version, BUFFER_SIZE, - ); - - // join all 3 tasks - let _ = futures::future::join_all(vec![loop1, loop2, loop3]).await; - unreachable!("At least one PB fetcher tasks should run forever"); + ) + .await }); // This is the consumer side of the channel. These are the major states: From dfa617d1fd02ef5d4599cb956cc1ac549092c33e Mon Sep 17 00:00:00 2001 From: CapCap Date: Mon, 29 Jan 2024 20:43:28 -0800 Subject: [PATCH 27/28] try spawn_blocking --- rust/processor/src/worker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 68a3b665..da52f4d5 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -239,7 +239,7 @@ impl Worker { let ma_clone = ma.clone(); let receiver_clone = receiver.clone(); - let join_handle = tokio::spawn(async move { + let join_handle = tokio::task::spawn_blocking(async move { loop { let pb_channel_fetch_time = std::time::Instant::now(); let txn_pb_res = timeout( From 17e6d1c894b4a0346da0ccf8b6806a99564b22a8 Mon Sep 17 00:00:00 2001 From: CapCap Date: Mon, 29 Jan 2024 20:54:40 -0800 Subject: [PATCH 28/28] no spanw --- rust/processor/src/worker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index da52f4d5..68a3b665 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -239,7 +239,7 @@ impl Worker { let ma_clone = ma.clone(); let receiver_clone = receiver.clone(); - let join_handle = tokio::task::spawn_blocking(async move { + let join_handle = tokio::spawn(async move { loop { let pb_channel_fetch_time = std::time::Instant::now(); let txn_pb_res = timeout(