From fa9f4bd844681843e262e52fb298ed045f88c110 Mon Sep 17 00:00:00 2001 From: rtso <8248583+rtso@users.noreply.github.com> Date: Fri, 31 Jan 2025 18:02:17 -0500 Subject: [PATCH] comments --- .../common/models/event_models/raw_events.rs | 110 +++++++++--------- .../src/processors/events_processor.rs | 4 +- .../parquet_events_processor.rs | 4 +- .../events_processor/events_extractor.rs | 4 +- 4 files changed, 61 insertions(+), 61 deletions(-) diff --git a/rust/processor/src/db/common/models/event_models/raw_events.rs b/rust/processor/src/db/common/models/event_models/raw_events.rs index 21a7d31c..bf72789c 100644 --- a/rust/processor/src/db/common/models/event_models/raw_events.rs +++ b/rust/processor/src/db/common/models/event_models/raw_events.rs @@ -27,61 +27,6 @@ pub struct RawEvent { } impl RawEvent { - pub fn from_transaction(txn: &Transaction, processor_name: &str) -> Vec { - let txn_version = txn.version as i64; - let block_height = txn.block_height as i64; - let block_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version); - let size_info = match txn.size_info.as_ref() { - Some(size_info) => Some(size_info), - None => { - warn!(version = txn.version, "Transaction size info not found"); - None - }, - }; - let txn_data = match txn.txn_data.as_ref() { - Some(data) => data, - None => { - warn!( - transaction_version = txn_version, - "Transaction data doesn't exist" - ); - PROCESSOR_UNKNOWN_TYPE_COUNT - .with_label_values(&[processor_name]) - .inc(); - return vec![]; - }, - }; - let default = vec![]; - let raw_events = match txn_data { - TxnData::BlockMetadata(tx_inner) => &tx_inner.events, - TxnData::Genesis(tx_inner) => &tx_inner.events, - TxnData::User(tx_inner) => &tx_inner.events, - TxnData::Validator(tx_inner) => &tx_inner.events, - _ => &default, - }; - - let event_size_info = size_info.map(|info| info.event_size_info.as_slice()); - - raw_events - .iter() - .enumerate() - .map(|(index, event)| { - // event_size_info will be used for user transactions only, no promises for other transactions. - // If event_size_info is missing due, it defaults to 0. - // No need to backfill as event_size_info is primarily for debugging user transactions. - let size_info = event_size_info.and_then(|infos| infos.get(index)); - Self::from_event( - event, - txn_version, - block_height, - index as i64, - size_info, - Some(block_timestamp), - ) - }) - .collect::>() - } - fn from_event( event: &EventPB, txn_version: i64, @@ -112,3 +57,58 @@ impl RawEvent { } } } + +pub fn parse_events(txn: &Transaction, processor_name: &str) -> Vec { + let txn_version = txn.version as i64; + let block_height = txn.block_height as i64; + let block_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version); + let size_info = match txn.size_info.as_ref() { + Some(size_info) => Some(size_info), + None => { + warn!(version = txn.version, "Transaction size info not found"); + None + }, + }; + let txn_data = match txn.txn_data.as_ref() { + Some(data) => data, + None => { + warn!( + transaction_version = txn_version, + "Transaction data doesn't exist" + ); + PROCESSOR_UNKNOWN_TYPE_COUNT + .with_label_values(&[processor_name]) + .inc(); + return vec![]; + }, + }; + let default = vec![]; + let raw_events = match txn_data { + TxnData::BlockMetadata(tx_inner) => &tx_inner.events, + TxnData::Genesis(tx_inner) => &tx_inner.events, + TxnData::User(tx_inner) => &tx_inner.events, + TxnData::Validator(tx_inner) => &tx_inner.events, + _ => &default, + }; + + let event_size_info = size_info.map(|info| info.event_size_info.as_slice()); + + raw_events + .iter() + .enumerate() + .map(|(index, event)| { + // event_size_info will be used for user transactions only, no promises for other transactions. + // If event_size_info is missing due, it defaults to 0. + // No need to backfill as event_size_info is primarily for debugging user transactions. + let size_info = event_size_info.and_then(|infos| infos.get(index)); + RawEvent::from_event( + event, + txn_version, + block_height, + index as i64, + size_info, + Some(block_timestamp), + ) + }) + .collect::>() +} diff --git a/rust/processor/src/processors/events_processor.rs b/rust/processor/src/processors/events_processor.rs index 397c8e9f..ba48b95a 100644 --- a/rust/processor/src/processors/events_processor.rs +++ b/rust/processor/src/processors/events_processor.rs @@ -4,7 +4,7 @@ use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ db::{ - common::models::event_models::raw_events::RawEvent, + common::models::event_models::raw_events::parse_events, postgres::models::events_models::events::EventPG, }, gap_detectors::ProcessingResult, @@ -155,7 +155,7 @@ impl ProcessorTrait for EventsProcessor { pub fn process_transactions(transactions: Vec) -> Vec { let mut events = vec![]; for txn in &transactions { - let txn_events: Vec = RawEvent::from_transaction(txn, "EventsProcessor") + let txn_events: Vec = parse_events(txn, "EventsProcessor") .into_iter() .map(|e| e.into()) .collect(); diff --git a/rust/processor/src/processors/parquet_processors/parquet_events_processor.rs b/rust/processor/src/processors/parquet_processors/parquet_events_processor.rs index 6e9d3413..bfdce0a2 100644 --- a/rust/processor/src/processors/parquet_processors/parquet_events_processor.rs +++ b/rust/processor/src/processors/parquet_processors/parquet_events_processor.rs @@ -7,7 +7,7 @@ use crate::{ ParquetProcessingResult, }, db::{ - common::models::event_models::raw_events::RawEvent, + common::models::event_models::raw_events::parse_events, parquet::models::event_models::parquet_events::EventPQ, }, gap_detectors::ProcessingResult, @@ -128,7 +128,7 @@ pub fn process_transactions_parquet( let mut events = vec![]; for txn in &transactions { let txn_version = txn.version as i64; - let txn_events: Vec = RawEvent::from_transaction(txn, "ParquetEventsProcessor") + let txn_events: Vec = parse_events(txn, "ParquetEventsProcessor") .into_iter() .map(|e| e.into()) .collect(); diff --git a/rust/sdk-processor/src/steps/events_processor/events_extractor.rs b/rust/sdk-processor/src/steps/events_processor/events_extractor.rs index 8edea715..e09fbe6c 100644 --- a/rust/sdk-processor/src/steps/events_processor/events_extractor.rs +++ b/rust/sdk-processor/src/steps/events_processor/events_extractor.rs @@ -6,7 +6,7 @@ use aptos_indexer_processor_sdk::{ }; use async_trait::async_trait; use processor::db::{ - common::models::event_models::raw_events::RawEvent, + common::models::event_models::raw_events::parse_events, postgres::models::events_models::events::EventPG, }; use rayon::prelude::*; @@ -28,7 +28,7 @@ impl Processable for EventsExtractor { let events: Vec = item .data .par_iter() - .map(|txn| RawEvent::from_transaction(txn, "EventsProcessor")) + .map(|txn| parse_events(txn, "EventsProcessor")) .flatten() .map(|e| e.into()) .collect();