From 7ee793eabeff72bb78aae6f0d3ff84f738023c37 Mon Sep 17 00:00:00 2001 From: lambda-0x <0xlambda@protonmail.com> Date: Thu, 19 Sep 2024 20:27:23 +0530 Subject: [PATCH] opt(torii): fetch block timestamp in parallel commit-id:923dbba6 --- crates/torii/core/src/engine.rs | 55 +++++++++++++++++++++++---------- 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 75c0103abc..3964cc7075 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -1,4 +1,4 @@ -use std::collections::{btree_map, BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::Debug; use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::Arc; @@ -104,7 +104,7 @@ pub struct ParallelizedEvent { pub struct Engine { world: Arc>, db: Sql, - provider: Box

, + provider: Arc

, processors: Arc>, config: EngineConfig, shutdown_tx: Sender<()>, @@ -134,7 +134,7 @@ impl Engine

{ Self { world: Arc::new(world), db, - provider: Box::new(provider), + provider: Arc::new(provider), processors: Arc::new(processors), config, shutdown_tx, @@ -303,6 +303,8 @@ impl Engine

{ // Flatten events pages and events according to the pending block cursor // to array of (block_number, transaction_hash) let mut transactions = LinkedHashMap::new(); + + let mut block_set = HashSet::new(); for event in events { let block_number = match event.block_number { Some(block_number) => block_number, @@ -329,12 +331,7 @@ impl Engine

{ } }; - // Keep track of last block number and fetch block timestamp - if let btree_map::Entry::Vacant(v) = blocks.entry(block_number) { - debug!("Fetching block timestamp for block number: {}", block_number); - let block_timestamp = self.get_block_timestamp(block_number).await?; - v.insert(block_timestamp); - } + block_set.insert(block_number); transactions .entry((block_number, event.transaction_hash)) @@ -342,6 +339,25 @@ impl Engine

{ .push(event); } + let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks)); + let mut set: JoinSet> = JoinSet::new(); + + for block_number in block_set { + let semaphore = semaphore.clone(); + let provider = self.provider.clone(); + set.spawn(async move { + let _permit = semaphore.acquire().await.unwrap(); + debug!("Fetching block timestamp for block number: {}", block_number); + let block_timestamp = get_block_timestamp(&provider, block_number).await?; + Ok((block_number, block_timestamp)) + }); + } + + while let Some(result) = set.join_next().await { + let (block_number, block_timestamp) = result??; + blocks.insert(block_number, block_timestamp); + } + debug!("Transactions: {}", &transactions.len()); debug!("Blocks: {}", &blocks.len()); @@ -535,13 +551,6 @@ impl Engine

{ Ok(()) } - async fn get_block_timestamp(&self, block_number: u64) -> Result { - match self.provider.get_block_with_tx_hashes(BlockId::Number(block_number)).await? { - MaybePendingBlockWithTxHashes::Block(block) => Ok(block.timestamp), - MaybePendingBlockWithTxHashes::PendingBlock(block) => Ok(block.timestamp), - } - } - async fn process_transaction_with_events( &mut self, transaction_hash: Felt, @@ -761,6 +770,10 @@ where let mut continuation_token = None; loop { + debug!( + "Fetching events page with continuation token: {:?}, for contract: {:?}", + continuation_token, events_filter.address + ); let events_page = provider .get_events(events_filter.clone(), continuation_token.clone(), events_chunk_size) .await?; @@ -775,3 +788,13 @@ where Ok((events_filter.address, events_pages)) } + +async fn get_block_timestamp

(provider: &P, block_number: u64) -> Result +where + P: Provider + Sync, +{ + match provider.get_block_with_tx_hashes(BlockId::Number(block_number)).await? { + MaybePendingBlockWithTxHashes::Block(block) => Ok(block.timestamp), + MaybePendingBlockWithTxHashes::PendingBlock(block) => Ok(block.timestamp), + } +}