Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rtso committed Jan 31, 2025
1 parent 60fd50e commit fa9f4bd
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 61 deletions.
110 changes: 55 additions & 55 deletions rust/processor/src/db/common/models/event_models/raw_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,61 +27,6 @@ pub struct RawEvent {
}

impl RawEvent {
pub fn from_transaction(txn: &Transaction, processor_name: &str) -> Vec<RawEvent> {
let txn_version = txn.version as i64;
let block_height = txn.block_height as i64;
let block_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version);
let size_info = match txn.size_info.as_ref() {
Some(size_info) => Some(size_info),
None => {
warn!(version = txn.version, "Transaction size info not found");
None
},
};
let txn_data = match txn.txn_data.as_ref() {
Some(data) => data,
None => {
warn!(
transaction_version = txn_version,
"Transaction data doesn't exist"
);
PROCESSOR_UNKNOWN_TYPE_COUNT
.with_label_values(&[processor_name])
.inc();
return vec![];
},
};
let default = vec![];
let raw_events = match txn_data {
TxnData::BlockMetadata(tx_inner) => &tx_inner.events,
TxnData::Genesis(tx_inner) => &tx_inner.events,
TxnData::User(tx_inner) => &tx_inner.events,
TxnData::Validator(tx_inner) => &tx_inner.events,
_ => &default,
};

let event_size_info = size_info.map(|info| info.event_size_info.as_slice());

raw_events
.iter()
.enumerate()
.map(|(index, event)| {
// event_size_info will be used for user transactions only, no promises for other transactions.
// If event_size_info is missing due, it defaults to 0.
// No need to backfill as event_size_info is primarily for debugging user transactions.
let size_info = event_size_info.and_then(|infos| infos.get(index));
Self::from_event(
event,
txn_version,
block_height,
index as i64,
size_info,
Some(block_timestamp),
)
})
.collect::<Vec<Self>>()
}

fn from_event(
event: &EventPB,
txn_version: i64,
Expand Down Expand Up @@ -112,3 +57,58 @@ impl RawEvent {
}
}
}

pub fn parse_events(txn: &Transaction, processor_name: &str) -> Vec<RawEvent> {
let txn_version = txn.version as i64;
let block_height = txn.block_height as i64;
let block_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version);
let size_info = match txn.size_info.as_ref() {
Some(size_info) => Some(size_info),
None => {
warn!(version = txn.version, "Transaction size info not found");
None

Check warning on line 69 in rust/processor/src/db/common/models/event_models/raw_events.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/event_models/raw_events.rs#L68-L69

Added lines #L68 - L69 were not covered by tests
},
};
let txn_data = match txn.txn_data.as_ref() {
Some(data) => data,
None => {
warn!(

Check warning on line 75 in rust/processor/src/db/common/models/event_models/raw_events.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/event_models/raw_events.rs#L75

Added line #L75 was not covered by tests
transaction_version = txn_version,
"Transaction data doesn't exist"

Check warning on line 77 in rust/processor/src/db/common/models/event_models/raw_events.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/event_models/raw_events.rs#L77

Added line #L77 was not covered by tests
);
PROCESSOR_UNKNOWN_TYPE_COUNT
.with_label_values(&[processor_name])
.inc();
return vec![];

Check warning on line 82 in rust/processor/src/db/common/models/event_models/raw_events.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/event_models/raw_events.rs#L79-L82

Added lines #L79 - L82 were not covered by tests
},
};
let default = vec![];
let raw_events = match txn_data {
TxnData::BlockMetadata(tx_inner) => &tx_inner.events,
TxnData::Genesis(tx_inner) => &tx_inner.events,

Check warning on line 88 in rust/processor/src/db/common/models/event_models/raw_events.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/db/common/models/event_models/raw_events.rs#L88

Added line #L88 was not covered by tests
TxnData::User(tx_inner) => &tx_inner.events,
TxnData::Validator(tx_inner) => &tx_inner.events,
_ => &default,
};

let event_size_info = size_info.map(|info| info.event_size_info.as_slice());

raw_events
.iter()
.enumerate()
.map(|(index, event)| {
// event_size_info will be used for user transactions only, no promises for other transactions.
// If event_size_info is missing due, it defaults to 0.
// No need to backfill as event_size_info is primarily for debugging user transactions.
let size_info = event_size_info.and_then(|infos| infos.get(index));
RawEvent::from_event(
event,
txn_version,
block_height,
index as i64,
size_info,
Some(block_timestamp),
)
})
.collect::<Vec<RawEvent>>()
}
4 changes: 2 additions & 2 deletions rust/processor/src/processors/events_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use super::{DefaultProcessingResult, ProcessorName, ProcessorTrait};
use crate::{
db::{
common::models::event_models::raw_events::RawEvent,
common::models::event_models::raw_events::parse_events,
postgres::models::events_models::events::EventPG,
},
gap_detectors::ProcessingResult,
Expand Down Expand Up @@ -155,7 +155,7 @@ impl ProcessorTrait for EventsProcessor {
pub fn process_transactions(transactions: Vec<Transaction>) -> Vec<EventPG> {

Check warning on line 155 in rust/processor/src/processors/events_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/events_processor.rs#L155

Added line #L155 was not covered by tests
let mut events = vec![];
for txn in &transactions {
let txn_events: Vec<EventPG> = RawEvent::from_transaction(txn, "EventsProcessor")
let txn_events: Vec<EventPG> = parse_events(txn, "EventsProcessor")
.into_iter()
.map(|e| e.into())
.collect();

Check warning on line 161 in rust/processor/src/processors/events_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/events_processor.rs#L158-L161

Added lines #L158 - L161 were not covered by tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
ParquetProcessingResult,
},
db::{
common::models::event_models::raw_events::RawEvent,
common::models::event_models::raw_events::parse_events,
parquet::models::event_models::parquet_events::EventPQ,
},
gap_detectors::ProcessingResult,
Expand Down Expand Up @@ -128,7 +128,7 @@ pub fn process_transactions_parquet(
let mut events = vec![];
for txn in &transactions {
let txn_version = txn.version as i64;
let txn_events: Vec<EventPQ> = RawEvent::from_transaction(txn, "ParquetEventsProcessor")
let txn_events: Vec<EventPQ> = parse_events(txn, "ParquetEventsProcessor")
.into_iter()
.map(|e| e.into())
.collect();

Check warning on line 134 in rust/processor/src/processors/parquet_processors/parquet_events_processor.rs

View check run for this annotation

Codecov / codecov/patch

rust/processor/src/processors/parquet_processors/parquet_events_processor.rs#L131-L134

Added lines #L131 - L134 were not covered by tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use aptos_indexer_processor_sdk::{
};
use async_trait::async_trait;
use processor::db::{
common::models::event_models::raw_events::RawEvent,
common::models::event_models::raw_events::parse_events,
postgres::models::events_models::events::EventPG,
};
use rayon::prelude::*;
Expand All @@ -28,7 +28,7 @@ impl Processable for EventsExtractor {
let events: Vec<EventPG> = item
.data
.par_iter()
.map(|txn| RawEvent::from_transaction(txn, "EventsProcessor"))
.map(|txn| parse_events(txn, "EventsProcessor"))
.flatten()
.map(|e| e.into())
.collect();
Expand Down

0 comments on commit fa9f4bd

Please sign in to comment.