diff --git a/rust/processor/migrations/2023-12-16-233224_add_objects_model/down.sql b/rust/processor/migrations/2023-12-16-233224_add_objects_model/down.sql new file mode 100644 index 00000000..d70123ae --- /dev/null +++ b/rust/processor/migrations/2023-12-16-233224_add_objects_model/down.sql @@ -0,0 +1,5 @@ +-- This file should undo anything in `up.sql` +ALTER TABLE objects DROP COLUMN IF EXISTS is_token; +ALTER TABLE objects DROP COLUMN IF EXISTS is_fungible_asset; +ALTER TABLE current_objects DROP COLUMN IF EXISTS is_token; +ALTER TABLE current_objects DROP COLUMN IF EXISTS is_fungible_asset; \ No newline at end of file diff --git a/rust/processor/migrations/2023-12-16-233224_add_objects_model/up.sql b/rust/processor/migrations/2023-12-16-233224_add_objects_model/up.sql new file mode 100644 index 00000000..65fd8fb2 --- /dev/null +++ b/rust/processor/migrations/2023-12-16-233224_add_objects_model/up.sql @@ -0,0 +1,14 @@ +-- Your SQL goes here +ALTER TABLE objects +ADD COLUMN IF NOT EXISTS is_token BOOLEAN; +ALTER TABLE objects +ADD COLUMN IF NOT EXISTS is_fungible_asset BOOLEAN; +ALTER TABLE current_objects +ADD COLUMN IF NOT EXISTS is_token BOOLEAN; +ALTER TABLE current_objects +ADD COLUMN IF NOT EXISTS is_fungible_asset BOOLEAN; +ALTER TABLE block_metadata_transactions DROP CONSTRAINT IF EXISTS fk_versions; +ALTER TABLE move_modules DROP CONSTRAINT IF EXISTS fk_transaction_versions; +ALTER TABLE move_resources DROP CONSTRAINT IF EXISTS fk_transaction_versions; +ALTER TABLE table_items DROP CONSTRAINT IF EXISTS fk_transaction_versions; +ALTER TABLE write_set_changes DROP CONSTRAINT IF EXISTS fk_transaction_versions; \ No newline at end of file diff --git a/rust/processor/src/models/account_transaction_models/account_transactions.rs b/rust/processor/src/models/account_transaction_models/account_transactions.rs index 7bd10d9d..28c1d985 100644 --- a/rust/processor/src/models/account_transaction_models/account_transactions.rs +++ b/rust/processor/src/models/account_transaction_models/account_transactions.rs @@ -7,7 +7,7 @@ use crate::{ models::{ - token_v2_models::v2_token_utils::ObjectWithMetadata, + object_models::v2_object_utils::ObjectWithMetadata, user_transactions_models::user_transactions::UserTransaction, }, schema::account_transactions, diff --git a/rust/processor/src/models/default_models/mod.rs b/rust/processor/src/models/default_models/mod.rs index ccc9024e..22f3722d 100644 --- a/rust/processor/src/models/default_models/mod.rs +++ b/rust/processor/src/models/default_models/mod.rs @@ -6,5 +6,4 @@ pub mod move_modules; pub mod move_resources; pub mod move_tables; pub mod transactions; -pub mod v2_objects; pub mod write_set_changes; diff --git a/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_activities.rs b/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_activities.rs index ae1cda50..ee308034 100644 --- a/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_activities.rs +++ b/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_activities.rs @@ -6,9 +6,7 @@ #![allow(clippy::unused_unit)] use super::{ - v2_fungible_asset_utils::{ - FeeStatement, FungibleAssetAggregatedDataMapping, FungibleAssetEvent, - }, + v2_fungible_asset_utils::{FeeStatement, FungibleAssetEvent}, v2_fungible_metadata::FungibleAssetMetadataModel, }; use crate::{ @@ -17,6 +15,7 @@ use crate::{ coin_activities::CoinActivity, coin_utils::{CoinEvent, CoinInfoType, EventGuidResource}, }, + object_models::v2_object_utils::ObjectAggregatedDataMapping, token_v2_models::v2_token_utils::TokenStandard, }, schema::fungible_asset_activities, @@ -70,7 +69,7 @@ impl FungibleAssetActivity { txn_timestamp: chrono::NaiveDateTime, event_index: i64, entry_function_id_str: &Option, - fungible_asset_metadata: &FungibleAssetAggregatedDataMapping, + fungible_asset_metadata: &ObjectAggregatedDataMapping, conn: &mut PgPoolConnection<'_>, ) -> anyhow::Result> { let event_type = event.type_str.clone(); diff --git a/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_balances.rs b/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_balances.rs index f44b7882..30b1b89a 100644 --- a/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_balances.rs +++ b/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_balances.rs @@ -6,13 +6,13 @@ #![allow(clippy::unused_unit)] use super::{ - v2_fungible_asset_activities::EventToCoinType, - v2_fungible_asset_utils::{FungibleAssetAggregatedDataMapping, FungibleAssetStore}, + v2_fungible_asset_activities::EventToCoinType, v2_fungible_asset_utils::FungibleAssetStore, v2_fungible_metadata::FungibleAssetMetadataModel, }; use crate::{ models::{ coin_models::coin_utils::{CoinInfoType, CoinResource}, + object_models::v2_object_utils::ObjectAggregatedDataMapping, token_v2_models::v2_token_utils::TokenStandard, }, schema::{current_fungible_asset_balances, fungible_asset_balances}, @@ -70,7 +70,7 @@ impl FungibleAssetBalance { write_set_change_index: i64, txn_version: i64, txn_timestamp: chrono::NaiveDateTime, - fungible_asset_metadata: &FungibleAssetAggregatedDataMapping, + fungible_asset_metadata: &ObjectAggregatedDataMapping, conn: &mut PgPoolConnection<'_>, ) -> anyhow::Result> { if let Some(inner) = &FungibleAssetStore::from_write_resource(write_resource, txn_version)? diff --git a/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_utils.rs b/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_utils.rs index 22560408..0cc4d340 100644 --- a/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_utils.rs +++ b/rust/processor/src/models/fungible_asset_models/v2_fungible_asset_utils.rs @@ -6,10 +6,8 @@ use crate::{ models::{ - coin_models::coin_utils::COIN_ADDR, - default_models::{move_resources::MoveResource, v2_objects::CurrentObjectPK}, - token_models::token_utils::URI_LENGTH, - token_v2_models::v2_token_utils::{ObjectWithMetadata, ResourceReference, TokenV2}, + coin_models::coin_utils::COIN_ADDR, default_models::move_resources::MoveResource, + token_models::token_utils::URI_LENGTH, token_v2_models::v2_token_utils::ResourceReference, }, utils::util::{deserialize_from_string, truncate_str}, }; @@ -17,14 +15,10 @@ use anyhow::{Context, Result}; use aptos_protos::transaction::v1::WriteResource; use bigdecimal::BigDecimal; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; const FUNGIBLE_ASSET_LENGTH: usize = 32; const FUNGIBLE_ASSET_SYMBOL: usize = 10; -/// Tracks all fungible asset related data in a hashmap for quick access (keyed on address of the object core) -pub type FungibleAssetAggregatedDataMapping = HashMap; - #[derive(Serialize, Deserialize, Debug, Clone)] pub struct FeeStatement { #[serde(deserialize_with = "deserialize_from_string")] @@ -49,15 +43,6 @@ impl FeeStatement { } } -/// This contains objects used by fungible assets -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct FungibleAssetAggregatedData { - pub object: ObjectWithMetadata, - pub fungible_asset_metadata: Option, - pub fungible_asset_store: Option, - pub token: Option, -} - /* Section on fungible assets resources */ #[derive(Serialize, Deserialize, Debug, Clone)] pub struct FungibleAssetMetadata { diff --git a/rust/processor/src/models/fungible_asset_models/v2_fungible_metadata.rs b/rust/processor/src/models/fungible_asset_models/v2_fungible_metadata.rs index b3661f36..e68d3b25 100644 --- a/rust/processor/src/models/fungible_asset_models/v2_fungible_metadata.rs +++ b/rust/processor/src/models/fungible_asset_models/v2_fungible_metadata.rs @@ -5,10 +5,11 @@ #![allow(clippy::extra_unused_lifetimes)] #![allow(clippy::unused_unit)] -use super::v2_fungible_asset_utils::{FungibleAssetAggregatedDataMapping, FungibleAssetMetadata}; +use super::v2_fungible_asset_utils::FungibleAssetMetadata; use crate::{ models::{ coin_models::coin_utils::{CoinInfoType, CoinResource}, + object_models::v2_object_utils::ObjectAggregatedDataMapping, token_models::collection_datas::{QUERY_RETRIES, QUERY_RETRY_DELAY_MS}, token_v2_models::v2_token_utils::TokenStandard, }, @@ -58,7 +59,7 @@ impl FungibleAssetMetadataModel { write_resource: &WriteResource, txn_version: i64, txn_timestamp: chrono::NaiveDateTime, - fungible_asset_metadata: &FungibleAssetAggregatedDataMapping, + fungible_asset_metadata: &ObjectAggregatedDataMapping, ) -> anyhow::Result> { if let Some(inner) = &FungibleAssetMetadata::from_write_resource(write_resource, txn_version)? @@ -138,7 +139,7 @@ impl FungibleAssetMetadataModel { pub async fn is_address_fungible_asset( conn: &mut PgPoolConnection<'_>, address: &str, - fungible_asset_metadata: &FungibleAssetAggregatedDataMapping, + fungible_asset_metadata: &ObjectAggregatedDataMapping, ) -> bool { if let Some(metadata) = fungible_asset_metadata.get(address) { metadata.token.is_none() diff --git a/rust/processor/src/models/mod.rs b/rust/processor/src/models/mod.rs index ad0cbfbb..0db2cc05 100644 --- a/rust/processor/src/models/mod.rs +++ b/rust/processor/src/models/mod.rs @@ -8,6 +8,7 @@ pub mod default_models; pub mod events_models; pub mod fungible_asset_models; pub mod ledger_info; +pub mod object_models; pub mod processor_status; pub mod property_map; pub mod stake_models; diff --git a/rust/processor/src/models/object_models/mod.rs b/rust/processor/src/models/object_models/mod.rs new file mode 100644 index 00000000..63812d5c --- /dev/null +++ b/rust/processor/src/models/object_models/mod.rs @@ -0,0 +1,5 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +pub mod v2_object_utils; +pub mod v2_objects; diff --git a/rust/processor/src/models/object_models/v2_object_utils.rs b/rust/processor/src/models/object_models/v2_object_utils.rs new file mode 100644 index 00000000..4b040622 --- /dev/null +++ b/rust/processor/src/models/object_models/v2_object_utils.rs @@ -0,0 +1,100 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +// This is required because a diesel macro makes clippy sad +#![allow(clippy::extra_unused_lifetimes)] +#![allow(clippy::unused_unit)] + +use crate::{ + models::{ + default_models::move_resources::MoveResource, + fungible_asset_models::v2_fungible_asset_utils::{ + FungibleAssetMetadata, FungibleAssetStore, FungibleAssetSupply, + }, + token_v2_models::v2_token_utils::{ + AptosCollection, FixedSupply, PropertyMapModel, TokenV2, TransferEvent, + UnlimitedSupply, V2TokenResource, + }, + }, + utils::util::{deserialize_from_string, standardize_address}, +}; +use aptos_protos::transaction::v1::WriteResource; +use bigdecimal::BigDecimal; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +// PK of current_objects, i.e. object_address +pub type CurrentObjectPK = String; + +/// Tracks all object related metadata in a hashmap for quick access (keyed on address of the object) +pub type ObjectAggregatedDataMapping = HashMap; + +/// Index of the event so that we can write its inverse to the db as primary key (to avoid collisiona) +pub type EventIndex = i64; + +/// This contains metadata for the object. This only includes fungible asset and token v2 metadata for now. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ObjectAggregatedData { + pub object: ObjectWithMetadata, + pub transfer_event: Option<(EventIndex, TransferEvent)>, + // Fungible asset structs + pub fungible_asset_metadata: Option, + pub fungible_asset_supply: Option, + pub fungible_asset_store: Option, + // Token v2 structs + pub aptos_collection: Option, + pub fixed_supply: Option, + pub property_map: Option, + pub token: Option, + pub unlimited_supply: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ObjectCore { + pub allow_ungated_transfer: bool, + #[serde(deserialize_with = "deserialize_from_string")] + pub guid_creation_num: BigDecimal, + owner: String, +} + +impl ObjectCore { + pub fn get_owner_address(&self) -> String { + standardize_address(&self.owner) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ObjectWithMetadata { + pub object_core: ObjectCore, + pub state_key_hash: String, +} + +impl ObjectWithMetadata { + pub fn from_write_resource( + write_resource: &WriteResource, + txn_version: i64, + ) -> anyhow::Result> { + let type_str = MoveResource::get_outer_type_from_resource(write_resource); + if !V2TokenResource::is_resource_supported(type_str.as_str()) { + return Ok(None); + } + if let V2TokenResource::ObjectCore(inner) = V2TokenResource::from_resource( + &type_str, + &serde_json::from_str(write_resource.data.as_str()).unwrap(), + txn_version, + )? { + Ok(Some(Self { + object_core: inner, + state_key_hash: standardize_address( + hex::encode(write_resource.state_key_hash.as_slice()).as_str(), + ), + })) + } else { + Ok(None) + } + } + + pub fn get_state_key_hash(&self) -> String { + standardize_address(&self.state_key_hash) + } +} diff --git a/rust/processor/src/models/default_models/v2_objects.rs b/rust/processor/src/models/object_models/v2_objects.rs similarity index 79% rename from rust/processor/src/models/default_models/v2_objects.rs rename to rust/processor/src/models/object_models/v2_objects.rs index 9ea017ed..9bb7b201 100644 --- a/rust/processor/src/models/default_models/v2_objects.rs +++ b/rust/processor/src/models/object_models/v2_objects.rs @@ -5,14 +5,14 @@ #![allow(clippy::extra_unused_lifetimes)] #![allow(clippy::unused_unit)] -use super::move_resources::MoveResource; +use super::v2_object_utils::{CurrentObjectPK, ObjectAggregatedDataMapping}; use crate::{ models::{ + default_models::move_resources::MoveResource, token_models::collection_datas::{QUERY_RETRIES, QUERY_RETRY_DELAY_MS}, - token_v2_models::v2_token_utils::ObjectWithMetadata, }, schema::{current_objects, objects}, - utils::database::PgPoolConnection, + utils::{database::PgPoolConnection, util::standardize_address}, }; use aptos_protos::transaction::v1::{DeleteResource, WriteResource}; use bigdecimal::BigDecimal; @@ -22,9 +22,6 @@ use field_count::FieldCount; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -// PK of current_objects, i.e. object_address -pub type CurrentObjectPK = String; - #[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] #[diesel(primary_key(transaction_version, write_set_change_index))] #[diesel(table_name = objects)] @@ -36,6 +33,8 @@ pub struct Object { pub state_key_hash: String, pub guid_creation_num: BigDecimal, pub allow_ungated_transfer: bool, + pub is_token: Option, + pub is_fungible_asset: Option, pub is_deleted: bool, } @@ -49,6 +48,8 @@ pub struct CurrentObject { pub allow_ungated_transfer: bool, pub last_guid_creation_num: BigDecimal, pub last_transaction_version: i64, + pub is_token: Option, + pub is_fungible_asset: Option, pub is_deleted: bool, } @@ -64,6 +65,8 @@ pub struct CurrentObjectQuery { pub last_transaction_version: i64, pub is_deleted: bool, pub inserted_at: chrono::NaiveDateTime, + pub is_token: Option, + pub is_fungible_asset: Option, } impl Object { @@ -71,33 +74,39 @@ impl Object { write_resource: &WriteResource, txn_version: i64, write_set_change_index: i64, + object_metadata_mapping: &ObjectAggregatedDataMapping, ) -> anyhow::Result> { - if let Some(inner) = ObjectWithMetadata::from_write_resource(write_resource, txn_version)? { - let resource = MoveResource::from_write_resource( - write_resource, - 0, // Placeholder, this isn't used anyway - txn_version, - 0, // Placeholder, this isn't used anyway - ); - let object_core = &inner.object_core; + let address = standardize_address(&write_resource.address.to_string()); + if let Some(object_aggregated_metadata) = object_metadata_mapping.get(&address) { + // do something + let object_with_metadata = object_aggregated_metadata.object.clone(); + let object_core = object_with_metadata.object_core; Ok(Some(( Self { transaction_version: txn_version, write_set_change_index, - object_address: resource.address.clone(), + object_address: address.clone(), owner_address: object_core.get_owner_address(), - state_key_hash: resource.state_key_hash.clone(), + state_key_hash: object_with_metadata.state_key_hash.clone(), guid_creation_num: object_core.guid_creation_num.clone(), allow_ungated_transfer: object_core.allow_ungated_transfer, + is_token: Some(object_aggregated_metadata.token.is_some()), + is_fungible_asset: Some( + object_aggregated_metadata.fungible_asset_store.is_some(), + ), is_deleted: false, }, CurrentObject { - object_address: resource.address, + object_address: address, owner_address: object_core.get_owner_address(), - state_key_hash: resource.state_key_hash, + state_key_hash: object_with_metadata.state_key_hash, allow_ungated_transfer: object_core.allow_ungated_transfer, last_guid_creation_num: object_core.guid_creation_num.clone(), last_transaction_version: txn_version, + is_token: Some(object_aggregated_metadata.token.is_some()), + is_fungible_asset: Some( + object_aggregated_metadata.fungible_asset_store.is_some(), + ), is_deleted: false, }, ))) @@ -147,6 +156,8 @@ impl Object { state_key_hash: resource.state_key_hash.clone(), guid_creation_num: previous_object.last_guid_creation_num.clone(), allow_ungated_transfer: previous_object.allow_ungated_transfer, + is_token: previous_object.is_token, + is_fungible_asset: previous_object.is_fungible_asset, is_deleted: true, }, CurrentObject { @@ -156,6 +167,8 @@ impl Object { last_guid_creation_num: previous_object.last_guid_creation_num.clone(), allow_ungated_transfer: previous_object.allow_ungated_transfer, last_transaction_version: txn_version, + is_token: previous_object.is_token, + is_fungible_asset: previous_object.is_fungible_asset, is_deleted: true, }, ))) @@ -181,6 +194,8 @@ impl Object { allow_ungated_transfer: res.allow_ungated_transfer, last_guid_creation_num: res.last_guid_creation_num, last_transaction_version: res.last_transaction_version, + is_token: res.is_token, + is_fungible_asset: res.is_fungible_asset, is_deleted: res.is_deleted, }) }, diff --git a/rust/processor/src/models/token_v2_models/v2_collections.rs b/rust/processor/src/models/token_v2_models/v2_collections.rs index 34d82789..c229a464 100644 --- a/rust/processor/src/models/token_v2_models/v2_collections.rs +++ b/rust/processor/src/models/token_v2_models/v2_collections.rs @@ -5,10 +5,11 @@ #![allow(clippy::extra_unused_lifetimes)] #![allow(clippy::unused_unit)] -use super::v2_token_utils::{TokenStandard, TokenV2AggregatedDataMapping, V2TokenResource}; +use super::v2_token_utils::{TokenStandard, V2TokenResource}; use crate::{ models::{ default_models::move_resources::MoveResource, + object_models::v2_object_utils::ObjectAggregatedDataMapping, token_models::{ collection_datas::{CollectionData, QUERY_RETRIES, QUERY_RETRY_DELAY_MS}, token_utils::{CollectionDataIdType, TokenWriteSet}, @@ -82,7 +83,7 @@ impl CollectionV2 { txn_version: i64, write_set_change_index: i64, txn_timestamp: chrono::NaiveDateTime, - token_v2_metadata: &TokenV2AggregatedDataMapping, + token_v2_metadata: &ObjectAggregatedDataMapping, ) -> anyhow::Result> { let type_str = MoveResource::get_outer_type_from_resource(write_resource); if !V2TokenResource::is_resource_supported(type_str.as_str()) { diff --git a/rust/processor/src/models/token_v2_models/v2_token_activities.rs b/rust/processor/src/models/token_v2_models/v2_token_activities.rs index acf1c372..ab5e8aea 100644 --- a/rust/processor/src/models/token_v2_models/v2_token_activities.rs +++ b/rust/processor/src/models/token_v2_models/v2_token_activities.rs @@ -7,11 +7,12 @@ use super::{ v2_token_datas::TokenDataV2, - v2_token_utils::{TokenStandard, TokenV2AggregatedDataMapping, V2TokenEvent}, + v2_token_utils::{TokenStandard, V2TokenEvent}, }; use crate::{ models::{ fungible_asset_models::v2_fungible_asset_utils::FungibleAssetEvent, + object_models::v2_object_utils::ObjectAggregatedDataMapping, token_models::token_utils::{TokenDataIdType, TokenEvent}, }, schema::token_activities_v2, @@ -74,7 +75,7 @@ impl TokenActivityV2 { txn_timestamp: chrono::NaiveDateTime, event_index: i64, entry_function_id_str: &Option, - token_v2_metadata: &TokenV2AggregatedDataMapping, + token_v2_metadata: &ObjectAggregatedDataMapping, conn: &mut PgPoolConnection<'_>, ) -> anyhow::Result> { let event_type = event.type_str.clone(); @@ -143,7 +144,7 @@ impl TokenActivityV2 { txn_timestamp: chrono::NaiveDateTime, event_index: i64, entry_function_id_str: &Option, - token_v2_metadata: &TokenV2AggregatedDataMapping, + token_v2_metadata: &ObjectAggregatedDataMapping, ) -> anyhow::Result> { let event_type = event.type_str.clone(); if let Some(token_event) = diff --git a/rust/processor/src/models/token_v2_models/v2_token_datas.rs b/rust/processor/src/models/token_v2_models/v2_token_datas.rs index 4d31f3f3..895cfff5 100644 --- a/rust/processor/src/models/token_v2_models/v2_token_datas.rs +++ b/rust/processor/src/models/token_v2_models/v2_token_datas.rs @@ -5,10 +5,11 @@ #![allow(clippy::extra_unused_lifetimes)] #![allow(clippy::unused_unit)] -use super::v2_token_utils::{TokenStandard, TokenV2, TokenV2AggregatedDataMapping}; +use super::v2_token_utils::{TokenStandard, TokenV2}; use crate::{ models::{ fungible_asset_models::v2_fungible_metadata::FungibleAssetMetadataModel, + object_models::v2_object_utils::ObjectAggregatedDataMapping, token_models::{ collection_datas::{QUERY_RETRIES, QUERY_RETRY_DELAY_MS}, token_utils::TokenWriteSet, @@ -81,7 +82,7 @@ impl TokenDataV2 { txn_version: i64, write_set_change_index: i64, txn_timestamp: chrono::NaiveDateTime, - token_v2_metadata: &TokenV2AggregatedDataMapping, + token_v2_metadata: &ObjectAggregatedDataMapping, ) -> anyhow::Result> { if let Some(inner) = &TokenV2::from_write_resource(write_resource, txn_version)? { let token_data_id = standardize_address(&write_resource.address.to_string()); @@ -242,14 +243,16 @@ impl TokenDataV2 { pub async fn is_address_fungible_token( conn: &mut PgPoolConnection<'_>, address: &str, - token_v2_metadata: &TokenV2AggregatedDataMapping, + object_aggregated_data_mapping: &ObjectAggregatedDataMapping, ) -> bool { - if let Some(metadata) = token_v2_metadata.get(address) { - metadata.token.is_some() - } else { - // Look up in the db - Self::query_is_address_token(conn, address).await + // 1. If metadata is present, the object is a token iff token struct is also present in the object + if let Some(object_data) = object_aggregated_data_mapping.get(address) { + if object_data.fungible_asset_metadata.is_some() { + return object_data.token.is_some(); + } } + // 2. If metadata is not present, we will do a lookup in the db. + Self::query_is_address_token(conn, address).await } /// Try to see if an address is a token. We'll try a few times in case there is a race condition, diff --git a/rust/processor/src/models/token_v2_models/v2_token_metadata.rs b/rust/processor/src/models/token_v2_models/v2_token_metadata.rs index 05f2a0d9..14f1a67a 100644 --- a/rust/processor/src/models/token_v2_models/v2_token_metadata.rs +++ b/rust/processor/src/models/token_v2_models/v2_token_metadata.rs @@ -5,11 +5,12 @@ #![allow(clippy::extra_unused_lifetimes)] #![allow(clippy::unused_unit)] -use super::v2_token_utils::{TokenV2AggregatedDataMapping, TOKEN_V2_ADDR}; +use super::v2_token_utils::TOKEN_V2_ADDR; use crate::{ models::{ coin_models::coin_utils::COIN_ADDR, default_models::move_resources::MoveResource, + object_models::v2_object_utils::ObjectAggregatedDataMapping, token_models::token_utils::{NAME_LENGTH, TOKEN_ADDR}, }, schema::current_token_v2_metadata, @@ -40,7 +41,7 @@ impl CurrentTokenV2Metadata { pub fn from_write_resource( write_resource: &WriteResource, txn_version: i64, - token_v2_metadata: &TokenV2AggregatedDataMapping, + token_v2_metadata: &ObjectAggregatedDataMapping, ) -> anyhow::Result> { let object_address = standardize_address(&write_resource.address.to_string()); if let Some(metadata) = token_v2_metadata.get(&object_address) { diff --git a/rust/processor/src/models/token_v2_models/v2_token_ownerships.rs b/rust/processor/src/models/token_v2_models/v2_token_ownerships.rs index 73ee1107..e5a65804 100644 --- a/rust/processor/src/models/token_v2_models/v2_token_ownerships.rs +++ b/rust/processor/src/models/token_v2_models/v2_token_ownerships.rs @@ -7,14 +7,13 @@ use super::{ v2_token_datas::TokenDataV2, - v2_token_utils::{ - ObjectWithMetadata, TokenStandard, TokenV2AggregatedDataMapping, TokenV2Burned, - }, + v2_token_utils::{TokenStandard, TokenV2Burned}, }; use crate::{ models::{ default_models::move_resources::MoveResource, fungible_asset_models::v2_fungible_asset_utils::V2FungibleAssetResource, + object_models::v2_object_utils::{ObjectAggregatedDataMapping, ObjectWithMetadata}, token_models::{ collection_datas::{QUERY_RETRIES, QUERY_RETRY_DELAY_MS}, token_utils::TokenWriteSet, @@ -113,7 +112,7 @@ impl TokenOwnershipV2 { /// For nfts it's the same resources that we parse tokendatas from so we leverage the work done in there to get ownership data pub fn get_nft_v2_from_token_data( token_data: &TokenDataV2, - token_v2_metadata: &TokenV2AggregatedDataMapping, + token_v2_metadata: &ObjectAggregatedDataMapping, ) -> anyhow::Result< Option<( Self, @@ -361,7 +360,7 @@ impl TokenOwnershipV2 { txn_version: i64, write_set_change_index: i64, txn_timestamp: chrono::NaiveDateTime, - token_v2_metadata: &TokenV2AggregatedDataMapping, + token_v2_metadata: &ObjectAggregatedDataMapping, conn: &mut PgPoolConnection<'_>, ) -> anyhow::Result> { let type_str = MoveResource::get_outer_type_from_resource(write_resource); diff --git a/rust/processor/src/models/token_v2_models/v2_token_utils.rs b/rust/processor/src/models/token_v2_models/v2_token_utils.rs index ac733420..35785a0b 100644 --- a/rust/processor/src/models/token_v2_models/v2_token_utils.rs +++ b/rust/processor/src/models/token_v2_models/v2_token_utils.rs @@ -7,10 +7,8 @@ use crate::{ models::{ coin_models::coin_utils::COIN_ADDR, - default_models::{move_resources::MoveResource, v2_objects::CurrentObjectPK}, - fungible_asset_models::v2_fungible_asset_utils::{ - FungibleAssetMetadata, FungibleAssetStore, FungibleAssetSupply, - }, + default_models::move_resources::MoveResource, + object_models::v2_object_utils::{CurrentObjectPK, ObjectCore}, token_models::token_utils::{NAME_LENGTH, URI_LENGTH}, }, utils::util::{ @@ -23,34 +21,15 @@ use aptos_protos::transaction::v1::{Event, WriteResource}; use bigdecimal::BigDecimal; use serde::{Deserialize, Serialize}; use std::{ - collections::{HashMap, HashSet}, + collections::HashSet, fmt::{self, Formatter}, }; pub const TOKEN_V2_ADDR: &str = "0x0000000000000000000000000000000000000000000000000000000000000004"; -/// Tracks all token related data in a hashmap for quick access (keyed on address of the object core) -pub type TokenV2AggregatedDataMapping = HashMap; /// Tracks all token related data in a hashmap for quick access (keyed on address of the object core) pub type TokenV2Burned = HashSet; -/// Index of the event so that we can write its inverse to the db as primary key (to avoid collisiona) -pub type EventIndex = i64; - -/// This contains both metadata for fungible assets and fungible tokens -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct TokenV2AggregatedData { - pub aptos_collection: Option, - pub fixed_supply: Option, - pub fungible_asset_metadata: Option, - pub fungible_asset_supply: Option, - pub fungible_asset_store: Option, - pub object: ObjectWithMetadata, - pub property_map: Option, - pub token: Option, - pub transfer_event: Option<(EventIndex, TransferEvent)>, - pub unlimited_supply: Option, -} /// Tracks which token standard a token / collection is built upon #[derive(Serialize)] @@ -69,56 +48,6 @@ impl fmt::Display for TokenStandard { } } -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct ObjectCore { - pub allow_ungated_transfer: bool, - #[serde(deserialize_with = "deserialize_from_string")] - pub guid_creation_num: BigDecimal, - owner: String, -} - -impl ObjectCore { - pub fn get_owner_address(&self) -> String { - standardize_address(&self.owner) - } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct ObjectWithMetadata { - pub object_core: ObjectCore, - state_key_hash: String, -} - -impl ObjectWithMetadata { - pub fn from_write_resource( - write_resource: &WriteResource, - txn_version: i64, - ) -> anyhow::Result> { - let type_str = MoveResource::get_outer_type_from_resource(write_resource); - if !V2TokenResource::is_resource_supported(type_str.as_str()) { - return Ok(None); - } - if let V2TokenResource::ObjectCore(inner) = V2TokenResource::from_resource( - &type_str, - &serde_json::from_str(write_resource.data.as_str()).unwrap(), - txn_version, - )? { - Ok(Some(Self { - object_core: inner, - state_key_hash: standardize_address( - hex::encode(write_resource.state_key_hash.as_slice()).as_str(), - ), - })) - } else { - Ok(None) - } - } - - pub fn get_state_key_hash(&self) -> String { - standardize_address(&self.state_key_hash) - } -} - /* Section on Collection / Token */ #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Collection { @@ -412,6 +341,7 @@ impl PropertyMapModel { } } +// TODO: Split out ObjectCore into V2ObjectResource #[derive(Serialize, Deserialize, Debug, Clone)] pub enum V2TokenResource { AptosCollection(AptosCollection), diff --git a/rust/processor/src/processors/default_processor.rs b/rust/processor/src/processors/default_processor.rs index ec748275..f68f3311 100644 --- a/rust/processor/src/processors/default_processor.rs +++ b/rust/processor/src/processors/default_processor.rs @@ -9,7 +9,6 @@ use crate::{ move_resources::MoveResource, move_tables::{CurrentTableItem, TableItem, TableMetadata}, transactions::TransactionModel, - v2_objects::{CurrentObject, Object}, write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel}, }, schema, @@ -19,7 +18,7 @@ use crate::{ }, }; use anyhow::bail; -use aptos_protos::transaction::v1::{write_set_change::Change, Transaction}; +use aptos_protos::transaction::v1::Transaction; use async_trait::async_trait; use diesel::{pg::upsert::excluded, result::Error, ExpressionMethods}; use field_count::FieldCount; @@ -59,7 +58,6 @@ async fn insert_to_db_impl( &[CurrentTableItem], &[TableMetadata], ), - (objects, current_objects): (&[Object], &[CurrentObject]), ) -> Result<(), diesel::result::Error> { insert_transactions(conn, txns).await?; insert_block_metadata_transactions(conn, block_metadata_transactions).await?; @@ -69,8 +67,6 @@ async fn insert_to_db_impl( insert_table_items(conn, table_items).await?; insert_current_table_items(conn, current_table_items).await?; insert_table_metadata(conn, table_metadata).await?; - insert_objects(conn, objects).await?; - insert_current_objects(conn, current_objects).await?; Ok(()) } @@ -89,7 +85,6 @@ async fn insert_to_db( Vec, Vec, ), - (objects, current_objects): (Vec, Vec), ) -> Result<(), diesel::result::Error> { tracing::trace!( name = name, @@ -113,7 +108,6 @@ async fn insert_to_db( ¤t_table_items, &table_metadata, ), - (&objects, ¤t_objects), )) }) .await @@ -133,8 +127,6 @@ async fn insert_to_db( let table_items = clean_data_for_db(table_items, true); let current_table_items = clean_data_for_db(current_table_items, true); let table_metadata = clean_data_for_db(table_metadata, true); - let objects = clean_data_for_db(objects, true); - let current_objects = clean_data_for_db(current_objects, true); insert_to_db_impl( pg_conn, &txns, @@ -147,7 +139,6 @@ async fn insert_to_db( ¤t_table_items, &table_metadata, ), - (&objects, ¤t_objects), ) .await }) @@ -331,54 +322,6 @@ async fn insert_table_metadata( Ok(()) } -async fn insert_objects( - conn: &mut MyDbConnection, - items_to_insert: &[Object], -) -> Result<(), diesel::result::Error> { - use schema::objects::dsl::*; - let chunks = get_chunks(items_to_insert.len(), Object::field_count()); - for (start_ind, end_ind) in chunks { - execute_with_better_error( - conn, - diesel::insert_into(schema::objects::table) - .values(&items_to_insert[start_ind..end_ind]) - .on_conflict((transaction_version, write_set_change_index)) - .do_nothing(), - None, - ) - .await?; - } - Ok(()) -} - -async fn insert_current_objects( - conn: &mut MyDbConnection, - items_to_insert: &[CurrentObject], -) -> Result<(), diesel::result::Error> { - use schema::current_objects::dsl::*; - let chunks = get_chunks(items_to_insert.len(), CurrentObject::field_count()); - for (start_ind, end_ind) in chunks { - execute_with_better_error( - conn, - diesel::insert_into(schema::current_objects::table) - .values(&items_to_insert[start_ind..end_ind]) - .on_conflict(object_address) - .do_update() - .set(( - owner_address.eq(excluded(owner_address)), - state_key_hash.eq(excluded(state_key_hash)), - allow_ungated_transfer.eq(excluded(allow_ungated_transfer)), - last_guid_creation_num.eq(excluded(last_guid_creation_num)), - last_transaction_version.eq(excluded(last_transaction_version)), - is_deleted.eq(excluded(is_deleted)), - inserted_at.eq(excluded(inserted_at)), - )), - Some(" WHERE current_objects.last_transaction_version <= excluded.last_transaction_version "), - ).await?; - } - Ok(()) -} - #[async_trait] impl ProcessorTrait for DefaultProcessor { fn name(&self) -> &'static str { @@ -426,70 +369,15 @@ impl ProcessorTrait for DefaultProcessor { } } - // TODO, merge this loop with above - // Moving object handling here because we need a single object - // map through transactions for lookups - let mut all_objects = vec![]; - let mut all_current_objects = HashMap::new(); - for txn in &transactions { - let txn_version = txn.version as i64; - let changes = &txn - .info - .as_ref() - .unwrap_or_else(|| { - panic!( - "Transaction info doesn't exist! Transaction {}", - txn_version - ) - }) - .changes; - for (index, wsc) in changes.iter().enumerate() { - let index: i64 = index as i64; - match wsc.change.as_ref().unwrap() { - Change::WriteResource(inner) => { - if let Some((object, current_object)) = - &Object::from_write_resource(inner, txn_version, index).unwrap() - { - all_objects.push(object.clone()); - all_current_objects - .insert(object.object_address.clone(), current_object.clone()); - } - }, - Change::DeleteResource(inner) => { - // Passing all_current_objects into the function so that we can get the owner of the deleted - // resource if it was handled in the same batch - if let Some((object, current_object)) = Object::from_delete_resource( - inner, - txn_version, - index, - &all_current_objects, - &mut conn, - ) - .await - .unwrap() - { - all_objects.push(object.clone()); - all_current_objects - .insert(object.object_address.clone(), current_object.clone()); - } - }, - _ => {}, - }; - } - } // Getting list of values and sorting by pk in order to avoid postgres deadlock since we're doing multi threaded db writes let mut current_table_items = current_table_items .into_values() .collect::>(); let mut table_metadata = table_metadata.into_values().collect::>(); // Sort by PK - let mut all_current_objects = all_current_objects - .into_values() - .collect::>(); current_table_items .sort_by(|a, b| (&a.table_handle, &a.key_hash).cmp(&(&b.table_handle, &b.key_hash))); table_metadata.sort_by(|a, b| a.handle.cmp(&b.handle)); - all_current_objects.sort_by(|a, b| a.object_address.cmp(&b.object_address)); let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); let db_insertion_start = std::time::Instant::now(); @@ -509,7 +397,6 @@ impl ProcessorTrait for DefaultProcessor { current_table_items, table_metadata, ), - (all_objects, all_current_objects), ) .await; diff --git a/rust/processor/src/processors/fungible_asset_processor.rs b/rust/processor/src/processors/fungible_asset_processor.rs index 89d64acd..87f62459 100644 --- a/rust/processor/src/processors/fungible_asset_processor.rs +++ b/rust/processor/src/processors/fungible_asset_processor.rs @@ -9,13 +9,13 @@ use crate::{ v2_fungible_asset_balances::{ CurrentFungibleAssetBalance, CurrentFungibleAssetMapping, FungibleAssetBalance, }, - v2_fungible_asset_utils::{ - FeeStatement, FungibleAssetAggregatedData, FungibleAssetAggregatedDataMapping, - FungibleAssetMetadata, FungibleAssetStore, - }, + v2_fungible_asset_utils::{FeeStatement, FungibleAssetMetadata, FungibleAssetStore}, v2_fungible_metadata::{FungibleAssetMetadataMapping, FungibleAssetMetadataModel}, }, - token_v2_models::v2_token_utils::{ObjectWithMetadata, TokenV2}, + object_models::v2_object_utils::{ + ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, + }, + token_v2_models::v2_token_utils::TokenV2, }, schema, utils::{ @@ -330,7 +330,7 @@ async fn parse_v2_coin( let mut fungible_asset_metadata: FungibleAssetMetadataMapping = HashMap::new(); // Get Metadata for fungible assets by object - let mut fungible_asset_object_helper: FungibleAssetAggregatedDataMapping = HashMap::new(); + let mut fungible_asset_object_helper: ObjectAggregatedDataMapping = HashMap::new(); for txn in transactions { let txn_version = txn.version as i64; @@ -373,11 +373,18 @@ async fn parse_v2_coin( { fungible_asset_object_helper.insert( standardize_address(&wr.address.to_string()), - FungibleAssetAggregatedData { + ObjectAggregatedData { object, fungible_asset_metadata: None, fungible_asset_store: None, token: None, + // The following structs are unused in this processor + aptos_collection: None, + fixed_supply: None, + unlimited_supply: None, + property_map: None, + transfer_event: None, + fungible_asset_supply: None, }, ); } diff --git a/rust/processor/src/processors/mod.rs b/rust/processor/src/processors/mod.rs index c55184ba..94ae8d53 100644 --- a/rust/processor/src/processors/mod.rs +++ b/rust/processor/src/processors/mod.rs @@ -14,6 +14,7 @@ pub mod default_processor; pub mod events_processor; pub mod fungible_asset_processor; pub mod nft_metadata_processor; +pub mod objects_processor; pub mod stake_processor; pub mod token_processor; pub mod token_v2_processor; @@ -27,6 +28,7 @@ use self::{ events_processor::EventsProcessor, fungible_asset_processor::FungibleAssetProcessor, nft_metadata_processor::{NftMetadataProcessor, NftMetadataProcessorConfig}, + objects_processor::ObjectsProcessor, stake_processor::StakeProcessor, token_processor::{TokenProcessor, TokenProcessorConfig}, token_v2_processor::TokenV2Processor, @@ -167,6 +169,7 @@ pub enum ProcessorConfig { EventsProcessor, FungibleAssetProcessor, NftMetadataProcessor(NftMetadataProcessorConfig), + ObjectsProcessor, StakeProcessor, TokenProcessor(TokenProcessorConfig), TokenV2Processor, @@ -206,6 +209,7 @@ pub enum Processor { EventsProcessor, FungibleAssetProcessor, NftMetadataProcessor, + ObjectsProcessor, StakeProcessor, TokenProcessor, TokenV2Processor, diff --git a/rust/processor/src/processors/nft_metadata_processor.rs b/rust/processor/src/processors/nft_metadata_processor.rs index 71516bcc..c9c9a2e3 100644 --- a/rust/processor/src/processors/nft_metadata_processor.rs +++ b/rust/processor/src/processors/nft_metadata_processor.rs @@ -4,13 +4,13 @@ use super::{ProcessingResult, ProcessorName, ProcessorTrait}; use crate::{ models::{ + object_models::v2_object_utils::{ + ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, + }, token_models::tokens::{TableHandleToOwner, TableMetadataForToken}, token_v2_models::{ v2_collections::{CollectionV2, CurrentCollectionV2, CurrentCollectionV2PK}, v2_token_datas::{CurrentTokenDataV2, CurrentTokenDataV2PK, TokenDataV2}, - v2_token_utils::{ - ObjectWithMetadata, TokenV2AggregatedData, TokenV2AggregatedDataMapping, - }, }, }, utils::{ @@ -202,7 +202,7 @@ async fn parse_v2_token( let txn_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version); let transaction_info = txn.info.as_ref().expect("Transaction info doesn't exist!"); - let mut token_v2_metadata_helper: TokenV2AggregatedDataMapping = HashMap::new(); + let mut token_v2_metadata_helper: ObjectAggregatedDataMapping = HashMap::new(); for wsc in transaction_info.changes.iter() { if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { if let Some(object) = @@ -210,7 +210,7 @@ async fn parse_v2_token( { token_v2_metadata_helper.insert( standardize_address(&wr.address.to_string()), - TokenV2AggregatedData { + ObjectAggregatedData { aptos_collection: None, fixed_supply: None, object, diff --git a/rust/processor/src/processors/objects_processor.rs b/rust/processor/src/processors/objects_processor.rs new file mode 100644 index 00000000..8c3e467c --- /dev/null +++ b/rust/processor/src/processors/objects_processor.rs @@ -0,0 +1,320 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use super::{ProcessingResult, ProcessorName, ProcessorTrait}; +use crate::{ + models::{ + fungible_asset_models::v2_fungible_asset_utils::FungibleAssetStore, + object_models::{ + v2_object_utils::{ + ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, + }, + v2_objects::{CurrentObject, Object}, + }, + token_v2_models::v2_token_utils::TokenV2, + }, + schema, + utils::{ + database::{ + clean_data_for_db, execute_with_better_error, get_chunks, MyDbConnection, PgDbPool, + PgPoolConnection, + }, + util::standardize_address, + }, +}; +use anyhow::bail; +use aptos_protos::transaction::v1::{write_set_change::Change, Transaction}; +use async_trait::async_trait; +use diesel::{pg::upsert::excluded, result::Error, ExpressionMethods}; +use field_count::FieldCount; +use std::{collections::HashMap, fmt::Debug}; +use tracing::error; + +pub struct ObjectsProcessor { + connection_pool: PgDbPool, +} + +impl ObjectsProcessor { + pub fn new(connection_pool: PgDbPool) -> Self { + Self { connection_pool } + } +} + +impl Debug for ObjectsProcessor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let state = &self.connection_pool.state(); + write!( + f, + "ObjectsProcessor {{ connections: {:?} idle_connections: {:?} }}", + state.connections, state.idle_connections + ) + } +} + +async fn insert_to_db_impl( + conn: &mut MyDbConnection, + (objects, current_objects): (&[Object], &[CurrentObject]), +) -> Result<(), diesel::result::Error> { + insert_objects(conn, objects).await?; + insert_current_objects(conn, current_objects).await?; + Ok(()) +} + +async fn insert_to_db( + conn: &mut PgPoolConnection<'_>, + name: &'static str, + start_version: u64, + end_version: u64, + (objects, current_objects): (Vec, Vec), +) -> Result<(), diesel::result::Error> { + tracing::trace!( + name = name, + start_version = start_version, + end_version = end_version, + "Inserting to db", + ); + match conn + .build_transaction() + .read_write() + .run::<_, Error, _>(|pg_conn| { + Box::pin(insert_to_db_impl(pg_conn, (&objects, ¤t_objects))) + }) + .await + { + Ok(_) => Ok(()), + Err(_) => { + conn.build_transaction() + .read_write() + .run::<_, Error, _>(|pg_conn| { + Box::pin(async move { + let objects = clean_data_for_db(objects, true); + let current_objects = clean_data_for_db(current_objects, true); + insert_to_db_impl(pg_conn, (&objects, ¤t_objects)).await + }) + }) + .await + }, + } +} + +async fn insert_objects( + conn: &mut MyDbConnection, + items_to_insert: &[Object], +) -> Result<(), diesel::result::Error> { + use schema::objects::dsl::*; + let chunks = get_chunks(items_to_insert.len(), Object::field_count()); + for (start_ind, end_ind) in chunks { + execute_with_better_error( + conn, + diesel::insert_into(schema::objects::table) + .values(&items_to_insert[start_ind..end_ind]) + .on_conflict((transaction_version, write_set_change_index)) + .do_update() + .set(( + inserted_at.eq(excluded(inserted_at)), + is_token.eq(excluded(is_token)), + is_fungible_asset.eq(excluded(is_fungible_asset)), + )), + None, + ) + .await?; + } + Ok(()) +} + +async fn insert_current_objects( + conn: &mut MyDbConnection, + items_to_insert: &[CurrentObject], +) -> Result<(), diesel::result::Error> { + use schema::current_objects::dsl::*; + let chunks = get_chunks(items_to_insert.len(), CurrentObject::field_count()); + for (start_ind, end_ind) in chunks { + execute_with_better_error( + conn, + diesel::insert_into(schema::current_objects::table) + .values(&items_to_insert[start_ind..end_ind]) + .on_conflict(object_address) + .do_update() + .set(( + owner_address.eq(excluded(owner_address)), + state_key_hash.eq(excluded(state_key_hash)), + allow_ungated_transfer.eq(excluded(allow_ungated_transfer)), + last_guid_creation_num.eq(excluded(last_guid_creation_num)), + last_transaction_version.eq(excluded(last_transaction_version)), + is_deleted.eq(excluded(is_deleted)), + inserted_at.eq(excluded(inserted_at)), + is_token.eq(excluded(is_token)), + is_fungible_asset.eq(excluded(is_fungible_asset)), + )), + Some(" WHERE current_objects.last_transaction_version <= excluded.last_transaction_version "), + ).await?; + } + Ok(()) +} + +#[async_trait] +impl ProcessorTrait for ObjectsProcessor { + fn name(&self) -> &'static str { + ProcessorName::ObjectsProcessor.into() + } + + async fn process_transactions( + &self, + transactions: Vec, + start_version: u64, + end_version: u64, + _: Option, + ) -> anyhow::Result { + let processing_start = std::time::Instant::now(); + let mut conn = self.get_conn().await; + + // Moving object handling here because we need a single object + // map through transactions for lookups + let mut all_objects = vec![]; + let mut all_current_objects = HashMap::new(); + let mut object_metadata_helper: ObjectAggregatedDataMapping = HashMap::new(); + + for txn in &transactions { + let txn_version = txn.version as i64; + let changes = &txn + .info + .as_ref() + .unwrap_or_else(|| { + panic!( + "Transaction info doesn't exist! Transaction {}", + txn_version + ) + }) + .changes; + + // First pass to get all the object cores + for wsc in changes.iter() { + if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { + let address = standardize_address(&wr.address.to_string()); + if let Some(object_with_metadata) = + ObjectWithMetadata::from_write_resource(wr, txn_version).unwrap() + { + // Object core is the first struct that we need to get + object_metadata_helper.insert(address.clone(), ObjectAggregatedData { + object: object_with_metadata, + token: None, + fungible_asset_store: None, + // The following structs are unused in this processor + fungible_asset_metadata: None, + aptos_collection: None, + fixed_supply: None, + unlimited_supply: None, + property_map: None, + transfer_event: None, + fungible_asset_supply: None, + }); + } + } + } + + // Second pass to get all other structs related to the object + for wsc in changes.iter() { + if let Change::WriteResource(wr) = wsc.change.as_ref().unwrap() { + let address = standardize_address(&wr.address.to_string()); + + // Find structs related to object + if let Some(aggregated_data) = object_metadata_helper.get_mut(&address) { + if let Some(token) = TokenV2::from_write_resource(wr, txn_version).unwrap() + { + // Object is a token if it has 0x4::token::Token struct + aggregated_data.token = Some(token); + } + if let Some(fungible_asset_store) = + FungibleAssetStore::from_write_resource(wr, txn_version).unwrap() + { + // Object is a fungible asset if it has a 0x1::fungible_asset::FungibleAssetStore + aggregated_data.fungible_asset_store = Some(fungible_asset_store); + } + } + } + } + + // Third pass to construct the object data + for (index, wsc) in changes.iter().enumerate() { + let index: i64 = index as i64; + match wsc.change.as_ref().unwrap() { + Change::WriteResource(inner) => { + if let Some((object, current_object)) = &Object::from_write_resource( + inner, + txn_version, + index, + &object_metadata_helper, + ) + .unwrap() + { + all_objects.push(object.clone()); + all_current_objects + .insert(object.object_address.clone(), current_object.clone()); + } + }, + Change::DeleteResource(inner) => { + // Passing all_current_objects into the function so that we can get the owner of the deleted + // resource if it was handled in the same batch + if let Some((object, current_object)) = Object::from_delete_resource( + inner, + txn_version, + index, + &all_current_objects, + &mut conn, + ) + .await + .unwrap() + { + all_objects.push(object.clone()); + all_current_objects + .insert(object.object_address.clone(), current_object.clone()); + } + }, + _ => {}, + }; + } + } + + // Sort by PK + let mut all_current_objects = all_current_objects + .into_values() + .collect::>(); + all_current_objects.sort_by(|a, b| a.object_address.cmp(&b.object_address)); + + let processing_duration_in_secs = processing_start.elapsed().as_secs_f64(); + let db_insertion_start = std::time::Instant::now(); + + let tx_result = insert_to_db( + &mut conn, + self.name(), + start_version, + end_version, + (all_objects, all_current_objects), + ) + .await; + let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64(); + + match tx_result { + Ok(_) => Ok(ProcessingResult { + start_version, + end_version, + processing_duration_in_secs, + db_insertion_duration_in_secs, + }), + Err(e) => { + error!( + start_version = start_version, + end_version = end_version, + processor_name = self.name(), + error = ?e, + "[Parser] Error inserting transactions to db", + ); + bail!(e) + }, + } + } + + fn connection_pool(&self) -> &PgDbPool { + &self.connection_pool + } +} diff --git a/rust/processor/src/processors/token_v2_processor.rs b/rust/processor/src/processors/token_v2_processor.rs index e755a862..eaca3232 100644 --- a/rust/processor/src/processors/token_v2_processor.rs +++ b/rust/processor/src/processors/token_v2_processor.rs @@ -7,6 +7,9 @@ use crate::{ fungible_asset_models::v2_fungible_asset_utils::{ FungibleAssetMetadata, FungibleAssetStore, FungibleAssetSupply, }, + object_models::v2_object_utils::{ + ObjectAggregatedData, ObjectAggregatedDataMapping, ObjectWithMetadata, + }, token_models::tokens::{TableHandleToOwner, TableMetadataForToken}, token_v2_models::{ v2_collections::{CollectionV2, CurrentCollectionV2, CurrentCollectionV2PK}, @@ -18,8 +21,7 @@ use crate::{ TokenOwnershipV2, }, v2_token_utils::{ - AptosCollection, BurnEvent, FixedSupply, ObjectWithMetadata, PropertyMapModel, - TokenV2, TokenV2AggregatedData, TokenV2AggregatedDataMapping, TokenV2Burned, + AptosCollection, BurnEvent, FixedSupply, PropertyMapModel, TokenV2, TokenV2Burned, TransferEvent, UnlimitedSupply, }, }, @@ -501,7 +503,7 @@ async fn parse_v2_token( // Get Metadata for token v2 by object // We want to persist this through the entire batch so that even if a token is burned, // we can still get the object core metadata for it - let mut token_v2_metadata_helper: TokenV2AggregatedDataMapping = HashMap::new(); + let mut token_v2_metadata_helper: ObjectAggregatedDataMapping = HashMap::new(); // Basically token properties let mut current_token_v2_metadata: HashMap = HashMap::new(); @@ -531,7 +533,7 @@ async fn parse_v2_token( { token_v2_metadata_helper.insert( standardize_address(&wr.address.to_string()), - TokenV2AggregatedData { + ObjectAggregatedData { aptos_collection: None, fixed_supply: None, object, diff --git a/rust/processor/src/schema.rs b/rust/processor/src/schema.rs index 0f3b30ac..e805b058 100644 --- a/rust/processor/src/schema.rs +++ b/rust/processor/src/schema.rs @@ -449,6 +449,8 @@ diesel::table! { last_transaction_version -> Int8, is_deleted -> Bool, inserted_at -> Timestamp, + is_token -> Nullable, + is_fungible_asset -> Nullable, } } @@ -861,6 +863,8 @@ diesel::table! { allow_ungated_transfer -> Bool, is_deleted -> Bool, inserted_at -> Timestamp, + is_token -> Nullable, + is_fungible_asset -> Nullable, } } @@ -1198,8 +1202,6 @@ diesel::table! { } } -diesel::joinable!(block_metadata_transactions -> transactions (version)); - diesel::allow_tables_to_appear_in_same_query!( account_transactions, ans_lookup, diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 02e4cc31..01dfb0d3 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -8,10 +8,10 @@ use crate::{ account_transactions_processor::AccountTransactionsProcessor, ans_processor::AnsProcessor, coin_processor::CoinProcessor, default_processor::DefaultProcessor, events_processor::EventsProcessor, fungible_asset_processor::FungibleAssetProcessor, - nft_metadata_processor::NftMetadataProcessor, stake_processor::StakeProcessor, - token_processor::TokenProcessor, token_v2_processor::TokenV2Processor, - user_transaction_processor::UserTransactionProcessor, ProcessingResult, Processor, - ProcessorConfig, ProcessorTrait, + nft_metadata_processor::NftMetadataProcessor, objects_processor::ObjectsProcessor, + stake_processor::StakeProcessor, token_processor::TokenProcessor, + token_v2_processor::TokenV2Processor, user_transaction_processor::UserTransactionProcessor, + ProcessingResult, Processor, ProcessorConfig, ProcessorTrait, }, schema::ledger_infos, utils::{ @@ -767,6 +767,7 @@ pub fn build_processor(config: &ProcessorConfig, db_pool: PgDbPool) -> Processor ProcessorConfig::NftMetadataProcessor(config) => { Processor::from(NftMetadataProcessor::new(db_pool, config.clone())) }, + ProcessorConfig::ObjectsProcessor => Processor::from(ObjectsProcessor::new(db_pool)), ProcessorConfig::StakeProcessor => Processor::from(StakeProcessor::new(db_pool)), ProcessorConfig::TokenProcessor(config) => { Processor::from(TokenProcessor::new(db_pool, config.clone()))