From d53a2e41a8eee70dfa476b6a7ea121348548d645 Mon Sep 17 00:00:00 2001 From: lambda-0x <0xlambda@protonmail.com> Date: Fri, 13 Sep 2024 19:17:00 +0530 Subject: [PATCH] feat(torii): index whitelisted erc20/erc721 commit-id:5231a946 --- Cargo.lock | 1 + bin/torii/src/main.rs | 85 +++- bin/torii/torii.toml | 8 + crates/torii/core/Cargo.toml | 1 + crates/torii/core/src/engine.rs | 182 +++++---- crates/torii/core/src/lib.rs | 1 - .../src/processors/erc20_legacy_transfer.rs | 58 +++ .../core/src/processors/erc20_transfer.rs | 58 +++ .../core/src/processors/erc721_transfer.rs | 66 +++ crates/torii/core/src/processors/mod.rs | 15 +- .../core/src/processors/store_set_record.rs | 3 +- crates/torii/core/src/sql/erc.rs | 382 ++++++++++++++++++ crates/torii/core/src/{sql.rs => sql/mod.rs} | 42 +- .../torii/core/src/{ => sql}/query_queue.rs | 0 .../core/src/{sql_test.rs => sql/test.rs} | 0 crates/torii/core/src/sql/utils.rs | 18 + crates/torii/core/src/types.rs | 50 +++ crates/torii/libp2p/src/server/mod.rs | 2 +- .../migrations/20240913104418_add_erc.sql | 35 ++ scripts/deploy_erc20_katana.sh | 3 + scripts/send_erc20_transfer.sh | 12 + 21 files changed, 928 insertions(+), 94 deletions(-) create mode 100644 bin/torii/torii.toml create mode 100644 crates/torii/core/src/processors/erc20_legacy_transfer.rs create mode 100644 crates/torii/core/src/processors/erc20_transfer.rs create mode 100644 crates/torii/core/src/processors/erc721_transfer.rs create mode 100644 crates/torii/core/src/sql/erc.rs rename crates/torii/core/src/{sql.rs => sql/mod.rs} (97%) rename crates/torii/core/src/{ => sql}/query_queue.rs (100%) rename crates/torii/core/src/{sql_test.rs => sql/test.rs} (100%) create mode 100644 crates/torii/core/src/sql/utils.rs create mode 100644 crates/torii/migrations/20240913104418_add_erc.sql create mode 100755 scripts/deploy_erc20_katana.sh create mode 100755 scripts/send_erc20_transfer.sh diff --git a/Cargo.lock b/Cargo.lock index 5e38a33324..8f22b50ad3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14757,6 +14757,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", + "toml 0.8.15", "tracing", ] diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index 374cce2829..7285d72281 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -12,6 +12,7 @@ use std::cmp; use std::net::SocketAddr; +use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; @@ -29,6 +30,9 @@ use tokio::sync::broadcast; use tokio::sync::broadcast::Sender; use tokio_stream::StreamExt; use torii_core::engine::{Engine, EngineConfig, Processors}; +use torii_core::processors::erc20_legacy_transfer::Erc20LegacyTransferProcessor; +use torii_core::processors::erc20_transfer::Erc20TransferProcessor; +use torii_core::processors::erc721_transfer::Erc721TransferProcessor; use torii_core::processors::event_message::EventMessageProcessor; use torii_core::processors::generate_event_processors_map; use torii_core::processors::metadata_update::MetadataUpdateProcessor; @@ -40,7 +44,7 @@ use torii_core::processors::store_update_member::StoreUpdateMemberProcessor; use torii_core::processors::store_update_record::StoreUpdateRecordProcessor; use torii_core::simple_broker::SimpleBroker; use torii_core::sql::Sql; -use torii_core::types::Model; +use torii_core::types::{ErcContract, ErcType, Model, ToriiConfig}; use torii_server::proxy::Proxy; use tracing::{error, info}; use tracing_subscriber::{fmt, EnvFilter}; @@ -130,11 +134,38 @@ struct Args { /// Max concurrent tasks #[arg(long, default_value = "100")] max_concurrent_tasks: usize, + + /// ERC contract addresses to index + #[arg(long, value_parser = parse_erc_contracts)] + #[arg(conflicts_with = "config")] + erc_contracts: Option>, + + /// Configuration file + #[arg(long)] + config: Option, } #[tokio::main] async fn main() -> anyhow::Result<()> { let args = Args::parse(); + + let mut start_block = args.start_block; + + let mut config = if let Some(path) = args.config { + ToriiConfig::load_from_path(&path)? + } else { + ToriiConfig::default() + }; + + if let Some(erc_contracts) = args.erc_contracts { + config.erc_contracts = erc_contracts; + } + + for address in &config.erc_contracts { + if address.start_block < start_block { + start_block = address.start_block; + } + } let filter_layer = EnvFilter::try_from_default_env() .unwrap_or_else(|_| EnvFilter::new("info,hyper_reverse_proxy=off")); @@ -179,17 +210,26 @@ async fn main() -> anyhow::Result<()> { // Get world address let world = WorldContractReader::new(args.world_address, provider.clone()); - let db = Sql::new(pool.clone(), args.world_address).await?; + let erc_contracts = config + .erc_contracts + .iter() + .map(|contract| (contract.contract_address, contract.clone())) + .collect(); + + let db = Sql::new(pool.clone(), args.world_address, &erc_contracts).await?; let processors = Processors { event: generate_event_processors_map(vec![ - Arc::new(RegisterModelProcessor), - Arc::new(StoreSetRecordProcessor), - Arc::new(MetadataUpdateProcessor), - Arc::new(StoreDelRecordProcessor), - Arc::new(EventMessageProcessor), - Arc::new(StoreUpdateRecordProcessor), - Arc::new(StoreUpdateMemberProcessor), + Box::new(RegisterModelProcessor), + Box::new(StoreSetRecordProcessor), + Box::new(MetadataUpdateProcessor), + Box::new(StoreDelRecordProcessor), + Box::new(EventMessageProcessor), + Box::new(StoreUpdateRecordProcessor), + Box::new(StoreUpdateMemberProcessor), + Box::new(Erc20LegacyTransferProcessor), + Box::new(Erc20TransferProcessor), + Box::new(Erc721TransferProcessor), ])?, transaction: vec![Box::new(StoreTransactionProcessor)], ..Processors::default() @@ -211,6 +251,7 @@ async fn main() -> anyhow::Result<()> { }, shutdown_tx.clone(), Some(block_tx), + erc_contracts, ); let shutdown_rx = shutdown_tx.subscribe(); @@ -307,3 +348,29 @@ async fn spawn_rebuilding_graphql_server( } } } + +// Parses clap cli argument which is expected to be in the format: +// - erc_type:address:start_block +// - address:start_block (erc_type defaults to ERC20) +fn parse_erc_contracts(s: &str) -> anyhow::Result> { + let parts: Vec<&str> = s.split(',').collect(); + let mut contracts = Vec::new(); + for part in parts { + match part.split(':').collect::>().as_slice() { + [r#type, address, start_block] => { + let contract_address = Felt::from_str(address).unwrap(); + let start_block = start_block.parse::()?; + let r#type = r#type.parse::()?; + contracts.push(ErcContract { contract_address, start_block, r#type }); + } + [address, start_block] => { + let contract_address = Felt::from_str(address)?; + let start_block = start_block.parse::()?; + let r#type = ErcType::default(); + contracts.push(ErcContract { contract_address, start_block, r#type }); + } + _ => return Err(anyhow::anyhow!("Invalid ERC contract format")), + } + } + Ok(contracts) +} diff --git a/bin/torii/torii.toml b/bin/torii/torii.toml new file mode 100644 index 0000000000..45305c0301 --- /dev/null +++ b/bin/torii/torii.toml @@ -0,0 +1,8 @@ +# Example configuration file for Torii +# erc_contracts = [ +# { contract_address = "0x1234567890abcdef1234567890abcdef12345678", start_block = 0, type = "ERC20" }, +# { contract_address = "0xabcdef1234567890abcdef1234567890abcdef12", start_block = 1, type = "ERC721" }, +# ] +# erc_contracts = [ +# { type = "ERC20", contract_address = "0x07fc13cc1f43f0b0519f84df8bf13bea4d9fd5ce2d748c3baf27bf90a565f60a", start_block = 0 }, +# ] \ No newline at end of file diff --git a/crates/torii/core/Cargo.toml b/crates/torii/core/Cargo.toml index a22ccfcc9c..a0d8df3647 100644 --- a/crates/torii/core/Cargo.toml +++ b/crates/torii/core/Cargo.toml @@ -38,6 +38,7 @@ thiserror.workspace = true tokio = { version = "1.32.0", features = [ "sync" ], default-features = true } tokio-stream = "0.1.11" tokio-util = "0.7.7" +toml.workspace = true tracing.workspace = true [dev-dependencies] diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 2330091a4d..ffad2595c8 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, HashMap}; +use std::collections::{btree_map, BTreeMap, HashMap}; use std::fmt::Debug; use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::Arc; @@ -6,11 +6,12 @@ use std::time::Duration; use anyhow::Result; use dojo_world::contracts::world::WorldContractReader; +use futures_util::future::join_all; use hashlink::LinkedHashMap; use starknet::core::types::{ - BlockId, BlockTag, EmittedEvent, Event, EventFilter, Felt, MaybePendingBlockWithReceipts, - MaybePendingBlockWithTxHashes, PendingBlockWithReceipts, ReceiptBlock, TransactionReceipt, - TransactionReceiptWithBlockInfo, TransactionWithReceipt, + BlockId, BlockTag, EmittedEvent, Event, EventFilter, EventsPage, Felt, + MaybePendingBlockWithReceipts, MaybePendingBlockWithTxHashes, PendingBlockWithReceipts, + ReceiptBlock, TransactionReceipt, TransactionReceiptWithBlockInfo, TransactionWithReceipt, }; use starknet::providers::Provider; use tokio::sync::broadcast::Sender; @@ -23,12 +24,13 @@ use tracing::{debug, error, info, trace, warn}; use crate::processors::event_message::EventMessageProcessor; use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor}; use crate::sql::Sql; +use crate::types::ErcContract; #[allow(missing_debug_implementations)] pub struct Processors { pub block: Vec>>, pub transaction: Vec>>, - pub event: HashMap>>, + pub event: HashMap>>>, pub catch_all_event: Box>, } @@ -106,6 +108,8 @@ pub struct Engine { config: EngineConfig, shutdown_tx: Sender<()>, block_tx: Option>, + // ERC tokens to index + tokens: HashMap, tasks: HashMap>, } @@ -123,6 +127,7 @@ impl Engine

{ config: EngineConfig, shutdown_tx: Sender<()>, block_tx: Option>, + tokens: HashMap, ) -> Self { Self { world: Arc::new(world), @@ -132,6 +137,7 @@ impl Engine

{ config, shutdown_tx, block_tx, + tokens, tasks: HashMap::new(), } } @@ -228,30 +234,41 @@ impl Engine

{ last_pending_block_world_tx: Option, ) -> Result { // Process all blocks from current to latest. - let get_events = |token: Option| { - self.provider.get_events( - EventFilter { - from_block: Some(BlockId::Number(from)), - to_block: Some(BlockId::Number(to)), - address: Some(self.world.address), - keys: None, - }, - token, - self.config.events_chunk_size, - ) + let world_events_filter = EventFilter { + from_block: Some(BlockId::Number(from)), + to_block: Some(BlockId::Number(to)), + address: Some(self.world.address), + keys: None, }; - // handle next events pages - let mut events_pages = vec![get_events(None).await?]; + let mut fetch_all_events_tasks = vec![]; + let world_events_pages = + get_all_events(&self.provider, world_events_filter, self.config.events_chunk_size); + + fetch_all_events_tasks.push(world_events_pages); + + for token in self.tokens.iter() { + let events_filter = EventFilter { + from_block: Some(BlockId::Number(from)), + to_block: Some(BlockId::Number(to)), + address: Some(*token.0), + keys: None, + }; + let token_events_pages = + get_all_events(&self.provider, events_filter, self.config.events_chunk_size); + + fetch_all_events_tasks.push(token_events_pages); + } + + let task_result = join_all(fetch_all_events_tasks).await; - while let Some(token) = &events_pages.last().unwrap().continuation_token { - debug!(target: LOG_TARGET, "Fetching events page with continuation token: {}", &token); - events_pages.push(get_events(Some(token.clone())).await?); + let mut events_pages = vec![]; + for result in task_result { + events_pages.extend(result?); } debug!(target: LOG_TARGET, "Total events pages fetched: {}", &events_pages.len()); // Transactions & blocks to process - let mut last_block = 0_u64; let mut blocks = BTreeMap::new(); // Flatten events pages and events according to the pending block cursor @@ -287,11 +304,9 @@ impl Engine

{ }; // Keep track of last block number and fetch block timestamp - if block_number > last_block { + if let btree_map::Entry::Vacant(v) = blocks.entry(block_number) { let block_timestamp = self.get_block_timestamp(block_number).await?; - blocks.insert(block_number, block_timestamp); - - last_block = block_number; + v.insert(block_timestamp); } // Then we skip all transactions until we reach the last pending processed @@ -500,14 +515,16 @@ impl Engine

{ let _permit = semaphore.acquire().await.unwrap(); let mut local_db = db.clone(); for ParallelizedEvent { event_id, event, block_number, block_timestamp } in events { - if let Some(processor) = processors.event.get(&event.keys[0]) { - debug!(target: LOG_TARGET, event_name = processor.event_key(), task_id = %task_id, "Processing parallelized event."); - - if let Err(e) = processor - .process(&world, &mut local_db, block_number, block_timestamp, &event_id, &event) - .await - { - error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, task_id = %task_id, "Processing parallelized event."); + if let Some(event_processors) = processors.event.get(&event.keys[0]) { + for processor in event_processors.iter() { + debug!(target: LOG_TARGET, event_name = processor.event_key(), task_id = %task_id, "Processing parallelized event."); + + if let Err(e) = processor + .process(&world, &mut local_db, block_number, block_timestamp, &event_id, &event) + .await + { + error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, task_id = %task_id, "Processing parallelized event."); + } } } } @@ -547,15 +564,7 @@ impl Engine

{ keys: event.keys.clone(), data: event.data.clone(), }; - Self::process_event( - self, - block_number, - block_timestamp, - &event_id, - &event, - // transaction_hash, - ) - .await?; + Self::process_event(self, block_number, block_timestamp, &event_id, &event).await?; } // Commented out this transaction processor because it requires an RPC call for each @@ -589,7 +598,9 @@ impl Engine

{ let mut world_event = false; if let Some(events) = events { for (event_idx, event) in events.iter().enumerate() { - if event.from_address != self.world.address { + if event.from_address != self.world.address + && !self.tokens.contains_key(&event.from_address) + { continue; } @@ -597,15 +608,7 @@ impl Engine

{ let event_id = format!("{:#064x}:{:#x}:{:#04x}", block_number, *transaction_hash, event_idx); - Self::process_event( - self, - block_number, - block_timestamp, - &event_id, - event, - // *transaction_hash, - ) - .await?; + Self::process_event(self, block_number, block_timestamp, &event_id, event).await?; } // if world_event { @@ -663,12 +666,10 @@ impl Engine

{ block_timestamp: u64, event_id: &str, event: &Event, - // transaction_hash: Felt, ) -> Result<()> { - // self.db.store_event(event_id, event, transaction_hash, block_timestamp); let event_key = event.keys[0]; - let Some(processor) = self.processors.event.get(&event_key) else { + let Some(processors) = self.processors.event.get(&event_key) else { // if we dont have a processor for this event, we try the catch all processor if self.processors.catch_all_event.validate(event) { if let Err(e) = self @@ -703,14 +704,19 @@ impl Engine

{ return Ok(()); }; - let task_identifier = match processor.event_key().as_str() { - "StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => { - let mut hasher = DefaultHasher::new(); - event.data[0].hash(&mut hasher); - event.data[1].hash(&mut hasher); - hasher.finish() + // For now we only have 1 processor for store* events + let task_identifier = if processors.len() == 1 { + match processors[0].event_key().as_str() { + "StoreSetRecord" | "StoreUpdateRecord" | "StoreUpdateMember" | "StoreDelRecord" => { + let mut hasher = DefaultHasher::new(); + event.data[0].hash(&mut hasher); + event.data[1].hash(&mut hasher); + hasher.finish() + } + _ => 0, } - _ => 0, + } else { + 0 }; // if we have a task identifier, we queue the event to be parallelized @@ -723,14 +729,54 @@ impl Engine

{ }); } else { // if we dont have a task identifier, we process the event immediately - if let Err(e) = processor - .process(&self.world, &mut self.db, block_number, block_timestamp, event_id, event) - .await - { - error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event."); + for processor in processors.iter() { + if !processor.validate(event) { + continue; + } + + if let Err(e) = processor + .process( + &self.world, + &mut self.db, + block_number, + block_timestamp, + event_id, + event, + ) + .await + { + error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event."); + } } } Ok(()) } } + +async fn get_all_events

( + provider: &P, + events_filter: EventFilter, + events_chunk_size: u64, +) -> Result> +where + P: Provider + Sync, +{ + let mut events_pages = Vec::new(); + let mut continuation_token = None; + + loop { + let events_page = provider + .get_events(events_filter.clone(), continuation_token.clone(), events_chunk_size) + .await?; + + continuation_token = events_page.continuation_token.clone(); + events_pages.push(events_page); + + if continuation_token.is_none() { + break; + } + } + + Ok(events_pages) +} diff --git a/crates/torii/core/src/lib.rs b/crates/torii/core/src/lib.rs index df6e8b3adc..c415bec0f8 100644 --- a/crates/torii/core/src/lib.rs +++ b/crates/torii/core/src/lib.rs @@ -3,7 +3,6 @@ pub mod engine; pub mod error; pub mod model; pub mod processors; -pub mod query_queue; pub mod simple_broker; pub mod sql; pub mod types; diff --git a/crates/torii/core/src/processors/erc20_legacy_transfer.rs b/crates/torii/core/src/processors/erc20_legacy_transfer.rs new file mode 100644 index 0000000000..41852cd89e --- /dev/null +++ b/crates/torii/core/src/processors/erc20_legacy_transfer.rs @@ -0,0 +1,58 @@ +use anyhow::Error; +use async_trait::async_trait; +use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; +use dojo_world::contracts::world::WorldContractReader; +use starknet::core::types::{Event, U256}; +use starknet::providers::Provider; +use tracing::info; + +use super::EventProcessor; +use crate::sql::Sql; + +pub(crate) const LOG_TARGET: &str = "torii_core::processors::erc20_legacy_transfer"; + +#[derive(Default, Debug)] +pub struct Erc20LegacyTransferProcessor; + +#[async_trait] +impl

EventProcessor

for Erc20LegacyTransferProcessor +where + P: Provider + Send + Sync + std::fmt::Debug, +{ + fn event_key(&self) -> String { + "Transfer".to_string() + } + + fn validate(&self, event: &Event) -> bool { + // key: [hash(Transfer)] + // data: [from, to, value.0, value.1] + if event.keys.len() == 1 && event.data.len() == 4 { + return true; + } + + false + } + + async fn process( + &self, + world: &WorldContractReader

, + db: &mut Sql, + _block_number: u64, + block_timestamp: u64, + _event_id: &str, + event: &Event, + ) -> Result<(), Error> { + let token_address = event.from_address; + let from = event.data[0]; + let to = event.data[1]; + + let value = U256Cainome::cairo_deserialize(&event.data, 2)?; + let value = U256::from_words(value.low, value.high); + + db.handle_erc20_transfer(token_address, from, to, value, world.provider(), block_timestamp) + .await?; + info!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "Legacy ERC20 Transfer"); + + Ok(()) + } +} diff --git a/crates/torii/core/src/processors/erc20_transfer.rs b/crates/torii/core/src/processors/erc20_transfer.rs new file mode 100644 index 0000000000..a98e288780 --- /dev/null +++ b/crates/torii/core/src/processors/erc20_transfer.rs @@ -0,0 +1,58 @@ +use anyhow::Error; +use async_trait::async_trait; +use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; +use dojo_world::contracts::world::WorldContractReader; +use starknet::core::types::{Event, U256}; +use starknet::providers::Provider; +use tracing::info; + +use super::EventProcessor; +use crate::sql::Sql; + +pub(crate) const LOG_TARGET: &str = "torii_core::processors::erc20_transfer"; + +#[derive(Default, Debug)] +pub struct Erc20TransferProcessor; + +#[async_trait] +impl

EventProcessor

for Erc20TransferProcessor +where + P: Provider + Send + Sync + std::fmt::Debug, +{ + fn event_key(&self) -> String { + "Transfer".to_string() + } + + fn validate(&self, event: &Event) -> bool { + // key: [hash(Transfer), from, to] + // data: [value.0, value.1] + if event.keys.len() == 3 && event.data.len() == 2 { + return true; + } + + false + } + + async fn process( + &self, + world: &WorldContractReader

, + db: &mut Sql, + _block_number: u64, + block_timestamp: u64, + _event_id: &str, + event: &Event, + ) -> Result<(), Error> { + let token_address = event.from_address; + let from = event.keys[1]; + let to = event.keys[2]; + + let value = U256Cainome::cairo_deserialize(&event.data, 0)?; + let value = U256::from_words(value.low, value.high); + + db.handle_erc20_transfer(token_address, from, to, value, world.provider(), block_timestamp) + .await?; + info!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "ERC20 Transfer"); + + Ok(()) + } +} diff --git a/crates/torii/core/src/processors/erc721_transfer.rs b/crates/torii/core/src/processors/erc721_transfer.rs new file mode 100644 index 0000000000..665de6424a --- /dev/null +++ b/crates/torii/core/src/processors/erc721_transfer.rs @@ -0,0 +1,66 @@ +use anyhow::Error; +use async_trait::async_trait; +use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; +use dojo_world::contracts::world::WorldContractReader; +use starknet::core::types::{Event, U256}; +use starknet::providers::Provider; +use tracing::info; + +use super::EventProcessor; +use crate::sql::Sql; + +pub(crate) const LOG_TARGET: &str = "torii_core::processors::erc721_transfer"; + +#[derive(Default, Debug)] +pub struct Erc721TransferProcessor; + +#[async_trait] +impl

EventProcessor

for Erc721TransferProcessor +where + P: Provider + Send + Sync + std::fmt::Debug, +{ + fn event_key(&self) -> String { + "Transfer".to_string() + } + + fn validate(&self, event: &Event) -> bool { + // ref: https://github.com/OpenZeppelin/cairo-contracts/blob/eabfa029b7b681d9e83bf171f723081b07891016/packages/token/src/erc721/erc721.cairo#L44-L53 + // key: [hash(Transfer), from, to, token_id.low, token_id.high] + // data: [] + if event.keys.len() == 5 && event.data.is_empty() { + return true; + } + + false + } + + async fn process( + &self, + world: &WorldContractReader

, + db: &mut Sql, + _block_number: u64, + block_timestamp: u64, + _event_id: &str, + event: &Event, + ) -> Result<(), Error> { + let token_address = event.from_address; + let from = event.keys[1]; + let to = event.keys[2]; + + let token_id = U256Cainome::cairo_deserialize(&event.keys, 3)?; + let token_id = U256::from_words(token_id.low, token_id.high); + + db.handle_erc721_transfer( + token_address, + from, + to, + token_id, + world.provider(), + block_timestamp, + ) + .await?; + info!(target: LOG_TARGET, from = ?from, to = ?to, token_id = ?token_id, "ERC721 Transfer"); + + Ok(()) + } +} diff --git a/crates/torii/core/src/processors/mod.rs b/crates/torii/core/src/processors/mod.rs index c6a8f13af5..f4bfddffd3 100644 --- a/crates/torii/core/src/processors/mod.rs +++ b/crates/torii/core/src/processors/mod.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::sync::Arc; use anyhow::{Error, Result}; use async_trait::async_trait; @@ -10,6 +9,12 @@ use starknet::providers::Provider; use crate::sql::Sql; +// pub mod erc20_legacy_transfer; +// pub mod erc20_transfer; +// pub mod erc721_transfer; +pub mod erc20_legacy_transfer; +pub mod erc20_transfer; +pub mod erc721_transfer; pub mod event_message; pub mod metadata_update; pub mod register_model; @@ -74,15 +79,17 @@ pub trait TransactionProcessor: Send + Sync { ) -> Result<(), Error>; } +type EventProcessors

= Vec>>; + /// Given a list of event processors, generate a map of event keys to the event processor pub fn generate_event_processors_map( - event_processor: Vec>>, -) -> Result>>> { + event_processor: EventProcessors

, +) -> Result>> { let mut event_processors = HashMap::new(); for processor in event_processor { let key = get_selector_from_name(processor.event_key().as_str())?; - event_processors.insert(key, processor); + event_processors.entry(key).or_insert(vec![]).push(processor); } Ok(event_processors) diff --git a/crates/torii/core/src/processors/store_set_record.rs b/crates/torii/core/src/processors/store_set_record.rs index c5f70a2a54..38e8e67415 100644 --- a/crates/torii/core/src/processors/store_set_record.rs +++ b/crates/torii/core/src/processors/store_set_record.rs @@ -8,7 +8,8 @@ use tracing::info; use super::EventProcessor; use crate::processors::{ENTITY_ID_INDEX, MODEL_INDEX, NUM_KEYS_INDEX}; -use crate::sql::{felts_sql_string, Sql}; +use crate::sql::utils::felts_sql_string; +use crate::sql::Sql; pub(crate) const LOG_TARGET: &str = "torii_core::processors::store_set_record"; diff --git a/crates/torii/core/src/sql/erc.rs b/crates/torii/core/src/sql/erc.rs new file mode 100644 index 0000000000..0aa3a879c7 --- /dev/null +++ b/crates/torii/core/src/sql/erc.rs @@ -0,0 +1,382 @@ +use anyhow::Result; +use cainome::cairo_serde::ByteArray; +use cainome::cairo_serde::CairoSerde; +use starknet::core::types::{BlockId, BlockTag, FunctionCall, U256}; +use starknet::core::utils::{get_selector_from_name, parse_cairo_short_string}; +use starknet::{core::types::Felt, providers::Provider}; +use std::ops::{Add, Sub}; + +use super::query_queue::{Argument, QueryQueue, QueryType}; +use super::utils::{sql_string_to_u256, u256_to_sql_string}; +use crate::utils::utc_dt_string_from_timestamp; + +use super::Sql; + +impl Sql { + pub async fn handle_erc20_transfer( + &mut self, + contract_address: Felt, + from: Felt, + to: Felt, + amount: U256, + provider: &P, + block_timestamp: u64, + ) -> Result<()> { + // unique token identifier in DB + let token_id = format!("{:#x}", contract_address); + + let token_exists: bool = + sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM tokens WHERE id = ?)") + .bind(token_id.clone()) + .fetch_one(&self.pool) + .await?; + + if !token_exists { + register_erc20_token_metadata( + contract_address, + &mut self.query_queue, + &token_id, + provider, + ) + .await?; + } + + register_erc_transfer_event( + contract_address, + from, + to, + amount, + &token_id, + block_timestamp, + &mut self.query_queue, + ); + + // Update balances in erc20_balance table + { + // NOTE: formatting here should match the format we use for Argument type in QueryQueue + // TODO: abstract this so they cannot mismatch + + // Since balance are stored as TEXT in db, we cannot directly use INSERT OR UPDATE + // statements. + // Fetch balances for both `from` and `to` addresses, update them and write back to db + let query = sqlx::query_as::<_, (String, String)>( + "SELECT account_address, balance FROM balances WHERE contract_address = ? AND \ + account_address IN (?, ?)", + ) + .bind(format!("{:#x}", contract_address)) + .bind(format!("{:#x}", from)) + .bind(format!("{:#x}", to)); + + // (address, balance) + let balances: Vec<(String, String)> = query.fetch_all(&self.pool).await?; + // (address, balance) is primary key in DB, and we are fetching for 2 addresses so there + // should be at most 2 rows returned + assert!(balances.len() <= 2); + + let from_balance = balances + .iter() + .find(|(address, _)| address == &format!("{:#x}", from)) + .map(|(_, balance)| balance.clone()) + .unwrap_or_else(|| format!("{:#64x}", crypto_bigint::U256::ZERO)); + + let to_balance = balances + .iter() + .find(|(address, _)| address == &format!("{:#x}", to)) + .map(|(_, balance)| balance.clone()) + .unwrap_or_else(|| format!("{:#64x}", crypto_bigint::U256::ZERO)); + + let from_balance = sql_string_to_u256(&from_balance); + let to_balance = sql_string_to_u256(&to_balance); + + let new_from_balance = + if from != Felt::ZERO { from_balance.sub(amount) } else { from_balance }; + let new_to_balance = if to != Felt::ZERO { to_balance.add(amount) } else { to_balance }; + + let update_query = " + INSERT INTO balances (id, balance, account_address, contract_address, token_id) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (id) + DO UPDATE SET balance = excluded.balance"; + + if from != Felt::ZERO { + self.query_queue.enqueue( + update_query, + vec![ + Argument::String(format!("{:#x}:{:#x}", from, contract_address)), + Argument::String(u256_to_sql_string(&new_from_balance)), + Argument::FieldElement(from), + Argument::FieldElement(contract_address), + Argument::String(token_id.clone()), + ], + QueryType::Other, + ); + } + + if to != Felt::ZERO { + self.query_queue.enqueue( + update_query, + vec![ + Argument::String(format!("{:#x}:{:#x}", to, contract_address)), + Argument::String(u256_to_sql_string(&new_to_balance)), + Argument::FieldElement(to), + Argument::FieldElement(contract_address), + Argument::String(token_id.clone()), + ], + QueryType::Other, + ); + } + } + self.query_queue.execute_all().await?; + + Ok(()) + } + + pub async fn handle_erc721_transfer( + &mut self, + contract_address: Felt, + from: Felt, + to: Felt, + token_id: U256, + provider: &P, + block_timestamp: u64, + ) -> Result<()> { + let token_id = format!("{:#x}:{}", contract_address, u256_to_sql_string(&token_id)); + let token_exists: bool = + sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM tokens WHERE id = ?)") + .bind(token_id.clone()) + .fetch_one(&self.pool) + .await?; + + if !token_exists { + register_erc721_token_metadata( + contract_address, + &mut self.query_queue, + &token_id, + provider, + ) + .await?; + } + + register_erc_transfer_event( + contract_address, + from, + to, + U256::from(1u8), + &token_id, + block_timestamp, + &mut self.query_queue, + ); + + // Update balances in erc721_balances table + { + let update_query = " + INSERT INTO balances (id, balance, account_address, contract_address, token_id) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (account_address, contract_address, token_id) + DO UPDATE SET balance = excluded.balance"; + + if from != Felt::ZERO { + self.query_queue.enqueue( + update_query, + vec![ + Argument::String(format!("{:#x}:{}", from, &token_id)), + Argument::FieldElement(from), + Argument::FieldElement(contract_address), + Argument::String(u256_to_sql_string(&U256::from(0u8))), + Argument::String(token_id.clone()), + ], + QueryType::Other, + ); + } + + if to != Felt::ZERO { + self.query_queue.enqueue( + update_query, + vec![ + Argument::String(format!("{:#x}:{}", to, &token_id)), + Argument::FieldElement(to), + Argument::FieldElement(contract_address), + Argument::String(u256_to_sql_string(&U256::from(1u8))), + Argument::String(token_id.clone()), + ], + QueryType::Other, + ); + } + } + self.query_queue.execute_all().await?; + + Ok(()) + } +} + +async fn register_erc20_token_metadata( + contract_address: Felt, + queue: &mut QueryQueue, + token_id: &str, + provider: &P, +) -> Result<()> { + // Fetch token information from the chain + let name = provider + .call( + FunctionCall { + contract_address, + entry_point_selector: get_selector_from_name("name").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await?; + + // len = 1 => return value felt (i.e. legacy erc20 token) + // len > 1 => return value ByteArray (i.e. new erc20 token) + let name = if name.len() == 1 { + parse_cairo_short_string(&name[0]).unwrap() + } else { + ByteArray::cairo_deserialize(&name, 0) + .expect("Return value not ByteArray") + .to_string() + .expect("Return value not String") + }; + + let symbol = provider + .call( + FunctionCall { + contract_address, + entry_point_selector: get_selector_from_name("symbol").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await?; + + let symbol = if symbol.len() == 1 { + parse_cairo_short_string(&symbol[0]).unwrap() + } else { + ByteArray::cairo_deserialize(&symbol, 0) + .expect("Return value not ByteArray") + .to_string() + .expect("Return value not String") + }; + + let decimals = provider + .call( + FunctionCall { + contract_address, + entry_point_selector: get_selector_from_name("decimals").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await?; + let decimals = u8::cairo_deserialize(&decimals, 0).expect("Return value not u8"); + + // Insert the token into the tokens table + queue.enqueue( + "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, ?, \ + ?, ?, ?)", + vec![ + Argument::String(token_id.to_string()), + Argument::FieldElement(contract_address), + Argument::String(name), + Argument::String(symbol), + Argument::Int(decimals.into()), + ], + QueryType::Other, + ); + + Ok(()) +} + +async fn register_erc721_token_metadata( + contract_address: Felt, + queue: &mut QueryQueue, + token_id: &str, + provider: &P, +) -> Result<()> { + // Fetch token information from the chain + let name = provider + .call( + FunctionCall { + contract_address, + entry_point_selector: get_selector_from_name("name").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await?; + + // len = 1 => return value felt (i.e. legacy erc721 token) + // len > 1 => return value ByteArray (i.e. new erc721 token) + let name = if name.len() == 1 { + parse_cairo_short_string(&name[0]).unwrap() + } else { + ByteArray::cairo_deserialize(&name, 0) + .expect("Return value not ByteArray") + .to_string() + .expect("Return value not String") + }; + + let symbol = provider + .call( + FunctionCall { + contract_address, + entry_point_selector: get_selector_from_name("symbol").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await?; + let symbol = if symbol.len() == 1 { + parse_cairo_short_string(&symbol[0]).unwrap() + } else { + ByteArray::cairo_deserialize(&symbol, 0) + .expect("Return value not ByteArray") + .to_string() + .expect("Return value not String") + }; + + let decimals = 0; + + // Insert the token into the tokens table + queue.enqueue( + "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, ?, \ + ?, ?, ?)", + vec![ + Argument::String(token_id.to_string()), + Argument::FieldElement(contract_address), + Argument::String(name), + Argument::String(symbol), + Argument::Int(decimals.into()), + ], + QueryType::Other, + ); + + Ok(()) +} + +fn register_erc_transfer_event( + contract_address: Felt, + from: Felt, + to: Felt, + amount: U256, + token_id: &str, + block_timestamp: u64, + queue: &mut QueryQueue, +) { + let insert_query = "INSERT INTO erc_transfers (contract_address, from_address, \ + to_address, amount, token_id, executed_at) VALUES (?, ?, ?, ?, ?, \ + ?)"; + + queue.enqueue( + insert_query, + vec![ + Argument::FieldElement(contract_address), + Argument::FieldElement(from), + Argument::FieldElement(to), + Argument::String(u256_to_sql_string(&amount)), + Argument::String(token_id.to_string()), + Argument::String(utc_dt_string_from_timestamp(block_timestamp)), + ], + QueryType::Other, + ); +} diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql/mod.rs similarity index 97% rename from crates/torii/core/src/sql.rs rename to crates/torii/core/src/sql/mod.rs index ccca4f4c7d..5d1efa20e1 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql/mod.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::convert::TryInto; use std::str::FromStr; use std::sync::Arc; @@ -14,13 +15,15 @@ use sqlx::{Pool, Sqlite}; use starknet::core::types::{Event, Felt, InvokeTransaction, Transaction}; use starknet_crypto::poseidon_hash_many; use tracing::{debug, warn}; +use utils::felts_sql_string; use crate::cache::{Model, ModelCache}; -use crate::query_queue::{Argument, BrokerMessage, DeleteEntityQuery, QueryQueue, QueryType}; use crate::types::{ - Event as EventEmitted, EventMessage as EventMessageUpdated, Model as ModelRegistered, + ErcContract, Event as EventEmitted, EventMessage as EventMessageUpdated, + Model as ModelRegistered, }; use crate::utils::{must_utc_datetime_from_timestamp, utc_dt_string_from_timestamp}; +use query_queue::{Argument, BrokerMessage, DeleteEntityQuery, QueryQueue, QueryType}; type IsEventMessage = bool; type IsStoreUpdate = bool; @@ -28,9 +31,12 @@ type IsStoreUpdate = bool; pub const WORLD_CONTRACT_TYPE: &str = "WORLD"; pub const FELT_DELIMITER: &str = "/"; +pub mod erc; +pub mod query_queue; #[cfg(test)] -#[path = "sql_test.rs"] +#[path = "test.rs"] mod test; +pub mod utils; #[derive(Debug)] pub struct Sql { @@ -52,7 +58,11 @@ impl Clone for Sql { } impl Sql { - pub async fn new(pool: Pool, world_address: Felt) -> Result { + pub async fn new( + pool: Pool, + world_address: Felt, + erc_contracts: &HashMap, + ) -> Result { let mut query_queue = QueryQueue::new(pool.clone()); query_queue.enqueue( @@ -66,6 +76,19 @@ impl Sql { QueryType::Other, ); + for contract in erc_contracts.values() { + query_queue.enqueue( + "INSERT OR IGNORE INTO contracts (id, contract_address, contract_type) VALUES (?, ?, \ + ?)", + vec![ + Argument::FieldElement(contract.contract_address), + Argument::FieldElement(contract.contract_address), + Argument::String(contract.r#type.to_string()), + ], + QueryType::Other, + ); + } + query_queue.execute_all().await?; Ok(Self { @@ -716,7 +739,11 @@ impl Sql { Ty::Enum(e) => { if e.options.iter().all( |o| { - if let Ty::Tuple(t) = &o.ty { t.is_empty() } else { false } + if let Ty::Tuple(t) = &o.ty { + t.is_empty() + } else { + false + } }, ) { return; @@ -1151,8 +1178,3 @@ impl Sql { Ok(()) } } - -pub fn felts_sql_string(felts: &[Felt]) -> String { - felts.iter().map(|k| format!("{:#x}", k)).collect::>().join(FELT_DELIMITER) - + FELT_DELIMITER -} diff --git a/crates/torii/core/src/query_queue.rs b/crates/torii/core/src/sql/query_queue.rs similarity index 100% rename from crates/torii/core/src/query_queue.rs rename to crates/torii/core/src/sql/query_queue.rs diff --git a/crates/torii/core/src/sql_test.rs b/crates/torii/core/src/sql/test.rs similarity index 100% rename from crates/torii/core/src/sql_test.rs rename to crates/torii/core/src/sql/test.rs diff --git a/crates/torii/core/src/sql/utils.rs b/crates/torii/core/src/sql/utils.rs new file mode 100644 index 0000000000..d9cbf1fab9 --- /dev/null +++ b/crates/torii/core/src/sql/utils.rs @@ -0,0 +1,18 @@ +use starknet::core::types::U256; +use starknet_crypto::Felt; + +use super::FELT_DELIMITER; + +pub fn felts_sql_string(felts: &[Felt]) -> String { + felts.iter().map(|k| format!("{:#x}", k)).collect::>().join(FELT_DELIMITER) + + FELT_DELIMITER +} + +pub(crate) fn u256_to_sql_string(u256: &U256) -> String { + format!("{:#064x}", u256) +} + +pub(crate) fn sql_string_to_u256(sql_string: &str) -> U256 { + let sql_string = sql_string.strip_prefix("0x").unwrap_or(sql_string); + U256::from(crypto_bigint::U256::from_be_hex(sql_string)) +} diff --git a/crates/torii/core/src/types.rs b/crates/torii/core/src/types.rs index de75fca94a..1618b03a44 100644 --- a/crates/torii/core/src/types.rs +++ b/crates/torii/core/src/types.rs @@ -1,4 +1,5 @@ use core::fmt; +use std::{path::PathBuf, str::FromStr}; use chrono::{DateTime, Utc}; use dojo_types::schema::Ty; @@ -84,3 +85,52 @@ pub struct Event { pub executed_at: DateTime, pub created_at: DateTime, } + +#[derive(Default, Deserialize, Debug, Clone)] +pub struct ToriiConfig { + /// ERC contract addresses to index + pub erc_contracts: Vec, +} + +impl ToriiConfig { + pub fn load_from_path(path: &PathBuf) -> Result { + let config = std::fs::read_to_string(path)?; + let config: Self = toml::from_str(&config)?; + Ok(config) + } +} + +#[derive(Default, Deserialize, Debug, Clone)] +pub struct ErcContract { + pub contract_address: Felt, + pub start_block: u64, + pub r#type: ErcType, +} + +#[derive(Default, Deserialize, Debug, Clone)] +pub enum ErcType { + #[default] + ERC20, + ERC721, +} + +impl FromStr for ErcType { + type Err = anyhow::Error; + + fn from_str(input: &str) -> Result { + match input.to_lowercase().as_str() { + "erc20" | "Er20" | "ERC20" => Ok(ErcType::ERC20), + "erc721" | "Er721" | "ERC721" => Ok(ErcType::ERC721), + _ => Err(anyhow::anyhow!("Invalid ERC type: {}", input)), + } + } +} + +impl std::fmt::Display for ErcType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ErcType::ERC20 => write!(f, "ERC20"), + ErcType::ERC721 => write!(f, "ERC721"), + } + } +} diff --git a/crates/torii/libp2p/src/server/mod.rs b/crates/torii/libp2p/src/server/mod.rs index 9bc1e25ce3..f18989e6ca 100644 --- a/crates/torii/libp2p/src/server/mod.rs +++ b/crates/torii/libp2p/src/server/mod.rs @@ -25,7 +25,7 @@ use starknet::core::types::{BlockId, BlockTag, Felt, FunctionCall}; use starknet::core::utils::get_selector_from_name; use starknet::providers::Provider; use starknet_crypto::poseidon_hash_many; -use torii_core::sql::{felts_sql_string, Sql}; +use torii_core::sql::{utils::felts_sql_string, Sql}; use tracing::{info, warn}; use webrtc::tokio::Certificate; diff --git a/crates/torii/migrations/20240913104418_add_erc.sql b/crates/torii/migrations/20240913104418_add_erc.sql new file mode 100644 index 0000000000..aca9f4d817 --- /dev/null +++ b/crates/torii/migrations/20240913104418_add_erc.sql @@ -0,0 +1,35 @@ +CREATE TABLE balances ( + -- account_address:contract_address:token_id + id TEXT NOT NULL PRIMARY KEY, + balance TEXT NOT NULL, + account_address TEXT NOT NULL, + contract_address TEXT NOT NULL, + -- contract_address:token_id + token_id TEXT NOT NULL, + FOREIGN KEY (token_id) REFERENCES tokens(id) +); + +CREATE INDEX balances_account_address ON balances (account_address); +CREATE INDEX balances_contract_address ON balances (contract_address); + +CREATE TABLE tokens ( + -- contract_address:token_id + id TEXT NOT NULL PRIMARY KEY, + contract_address TEXT NOT NULL, + name TEXT NOT NULL, + symbol TEXT NOT NULL, + decimals INTEGER NOT NULL, + FOREIGN KEY (contract_address) REFERENCES contracts(id) +); + +CREATE TABLE erc_transfers ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + contract_address TEXT NOT NULL, + from_address TEXT NOT NULL, + to_address TEXT NOT NULL, + amount TEXT NOT NULL, + -- contract_address:token_id + token_id TEXT NOT NULL, + executed_at DATETIME NOT NULL, + FOREIGN KEY (token_id) REFERENCES tokens(id) +); diff --git a/scripts/deploy_erc20_katana.sh b/scripts/deploy_erc20_katana.sh new file mode 100755 index 0000000000..3ad8d87937 --- /dev/null +++ b/scripts/deploy_erc20_katana.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +starkli deploy --account ../account.json --keystore ../signer.json --keystore-password "" 0x02a8846878b6ad1f54f6ba46f5f40e11cee755c677f130b2c4b60566c9003f1f 0x626c6f62 0x424c42 0x8 u256:10000000000 0xb3ff441a68610b30fd5e2abbf3a1548eb6ba6f3559f2862bf2dc757e5828ca --rpc http://localhost:5050 diff --git a/scripts/send_erc20_transfer.sh b/scripts/send_erc20_transfer.sh new file mode 100755 index 0000000000..b321d2fa19 --- /dev/null +++ b/scripts/send_erc20_transfer.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +if [ $# -eq 0 ]; then + echo "Error: Contract address argument is required." + echo "Usage: $0 " + exit 1 +fi + +contract_address=$1 +rpc="http://localhost:5050" + +starkli invoke $contract_address transfer 0x1234 u256:1 --account ../account.json --keystore ../signer.json --keystore-password "" --rpc $rpc