From f145f000957ebda4e972b48e8e5660ac8e7d8ff9 Mon Sep 17 00:00:00 2001 From: rtso <8248583+rtso@users.noreply.github.com> Date: Tue, 30 Jan 2024 13:32:33 -0800 Subject: [PATCH 1/2] Remove panic --- .../src/models/user_transactions_models/signatures.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/rust/processor/src/models/user_transactions_models/signatures.rs b/rust/processor/src/models/user_transactions_models/signatures.rs index 8f696523..691a0538 100644 --- a/rust/processor/src/models/user_transactions_models/signatures.rs +++ b/rust/processor/src/models/user_transactions_models/signatures.rs @@ -402,8 +402,15 @@ impl Signature { Ok(AnySignatureTypeEnumPb::Secp256k1Ecdsa) => { String::from("multi_key_secp256k1_ecdsa_signature") }, - _ => { - panic!("Unspecified signature type or un-recognized type is not supported") + wildcard => { + tracing::warn!( + "Unspecified signature type or un-recognized type is not supported: {:?}", + wildcard + ); + PROCESSOR_DATA_GAP_COUNT + .with_label_values(&["unspecified_signature_type"]) + .inc(); + "".to_string() }, }; signatures.push(Self { From 102508a1297e7524723003c60c52062298bf844a Mon Sep 17 00:00:00 2001 From: rtso <8248583+rtso@users.noreply.github.com> Date: Tue, 30 Jan 2024 13:45:46 -0800 Subject: [PATCH 2/2] Fix monitoring processor --- rust/processor/src/processors/mod.rs | 4 +++ .../src/processors/monitoring_processor.rs | 36 ++++++------------- rust/processor/src/worker.rs | 10 +++--- 3 files changed, 21 insertions(+), 29 deletions(-) diff --git a/rust/processor/src/processors/mod.rs b/rust/processor/src/processors/mod.rs index 268ed496..d0abf641 100644 --- a/rust/processor/src/processors/mod.rs +++ b/rust/processor/src/processors/mod.rs @@ -13,6 +13,7 @@ pub mod coin_processor; pub mod default_processor; pub mod events_processor; pub mod fungible_asset_processor; +pub mod monitoring_processor; pub mod nft_metadata_processor; pub mod objects_processor; pub mod stake_processor; @@ -27,6 +28,7 @@ use self::{ default_processor::DefaultProcessor, events_processor::EventsProcessor, fungible_asset_processor::FungibleAssetProcessor, + monitoring_processor::MonitoringProcessor, nft_metadata_processor::{NftMetadataProcessor, NftMetadataProcessorConfig}, objects_processor::ObjectsProcessor, stake_processor::StakeProcessor, @@ -177,6 +179,7 @@ pub enum ProcessorConfig { DefaultProcessor, EventsProcessor, FungibleAssetProcessor, + MonitoringProcessor, NftMetadataProcessor(NftMetadataProcessorConfig), ObjectsProcessor, StakeProcessor, @@ -217,6 +220,7 @@ pub enum Processor { DefaultProcessor, EventsProcessor, FungibleAssetProcessor, + MonitoringProcessor, NftMetadataProcessor, ObjectsProcessor, StakeProcessor, diff --git a/rust/processor/src/processors/monitoring_processor.rs b/rust/processor/src/processors/monitoring_processor.rs index 3ec20345..0895374e 100644 --- a/rust/processor/src/processors/monitoring_processor.rs +++ b/rust/processor/src/processors/monitoring_processor.rs @@ -2,29 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 use super::{ProcessingResult, ProcessorName, ProcessorTrait}; -use crate::{ - models::default_models::{ - block_metadata_transactions::BlockMetadataTransactionModel, - move_modules::MoveModule, - move_resources::MoveResource, - move_tables::{CurrentTableItem, TableItem, TableMetadata}, - transactions::TransactionModel, - v2_objects::{CurrentObject, Object}, - write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel}, - }, - schema, - utils::database::{ - clean_data_for_db, execute_with_better_error, get_chunks, MyDbConnection, PgDbPool, - PgPoolConnection, - }, -}; -use anyhow::bail; -use aptos_protos::transaction::v1::{write_set_change::Change, Transaction}; +use crate::utils::database::PgDbPool; +use aptos_protos::transaction::v1::Transaction; use async_trait::async_trait; -use diesel::{pg::upsert::excluded, result::Error, ExpressionMethods}; -use field_count::FieldCount; -use std::{collections::HashMap, fmt::Debug}; -use tracing::error; +use std::fmt::Debug; pub struct MonitoringProcessor { connection_pool: PgDbPool, @@ -41,7 +22,7 @@ impl Debug for MonitoringProcessor { let state = &self.connection_pool.state(); write!( f, - "DefaultTransactionProcessor {{ connections: {:?} idle_connections: {:?} }}", + "MonitoringProcessor {{ connections: {:?} idle_connections: {:?} }}", state.connections, state.idle_connections ) } @@ -55,12 +36,17 @@ impl ProcessorTrait for MonitoringProcessor { async fn process_transactions( &self, - transactions: Vec, + _transactions: Vec, start_version: u64, end_version: u64, _: Option, ) -> anyhow::Result { - Ok((end_version, transactions.len() as u64)) + Ok(ProcessingResult { + start_version, + end_version, + processing_duration_in_secs: 0.0, + db_insertion_duration_in_secs: 0.0, + }) } fn connection_pool(&self) -> &PgDbPool { diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 7374d9b1..5807ee08 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -8,10 +8,11 @@ use crate::{ account_transactions_processor::AccountTransactionsProcessor, ans_processor::AnsProcessor, coin_processor::CoinProcessor, default_processor::DefaultProcessor, events_processor::EventsProcessor, fungible_asset_processor::FungibleAssetProcessor, - nft_metadata_processor::NftMetadataProcessor, objects_processor::ObjectsProcessor, - stake_processor::StakeProcessor, token_processor::TokenProcessor, - token_v2_processor::TokenV2Processor, user_transaction_processor::UserTransactionProcessor, - ProcessingResult, Processor, ProcessorConfig, ProcessorTrait, + monitoring_processor::MonitoringProcessor, nft_metadata_processor::NftMetadataProcessor, + objects_processor::ObjectsProcessor, stake_processor::StakeProcessor, + token_processor::TokenProcessor, token_v2_processor::TokenV2Processor, + user_transaction_processor::UserTransactionProcessor, ProcessingResult, Processor, + ProcessorConfig, ProcessorTrait, }, schema::ledger_infos, utils::{ @@ -769,6 +770,7 @@ pub fn build_processor(config: &ProcessorConfig, db_pool: PgDbPool) -> Processor ProcessorConfig::FungibleAssetProcessor => { Processor::from(FungibleAssetProcessor::new(db_pool)) }, + ProcessorConfig::MonitoringProcessor => Processor::from(MonitoringProcessor::new(db_pool)), ProcessorConfig::NftMetadataProcessor(config) => { Processor::from(NftMetadataProcessor::new(db_pool, config.clone())) },