Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt(torii): fetch block timestamp in parallel #2449

Open
wants to merge 1 commit into
base: spr/main/3bfe5955
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 39 additions & 16 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{btree_map, BTreeMap, HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::Debug;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::sync::Arc;
Expand Down Expand Up @@ -104,7 +104,7 @@
pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
world: Arc<WorldContractReader<P>>,
db: Sql,
provider: Box<P>,
provider: Arc<P>,
processors: Arc<Processors<P>>,
config: EngineConfig,
shutdown_tx: Sender<()>,
Expand Down Expand Up @@ -134,7 +134,7 @@
Self {
world: Arc::new(world),
db,
provider: Box::new(provider),
provider: Arc::new(provider),
processors: Arc::new(processors),
config,
shutdown_tx,
Expand Down Expand Up @@ -303,6 +303,8 @@
// Flatten events pages and events according to the pending block cursor
// to array of (block_number, transaction_hash)
let mut transactions = LinkedHashMap::new();

let mut block_set = HashSet::new();
for event in events {
let block_number = match event.block_number {
Some(block_number) => block_number,
Expand All @@ -329,19 +331,33 @@
}
};

// Keep track of last block number and fetch block timestamp
if let btree_map::Entry::Vacant(v) = blocks.entry(block_number) {
debug!("Fetching block timestamp for block number: {}", block_number);
let block_timestamp = self.get_block_timestamp(block_number).await?;
v.insert(block_timestamp);
}
block_set.insert(block_number);

transactions
.entry((block_number, event.transaction_hash))
.or_insert(vec![])
.push(event);
}

let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks));
let mut set: JoinSet<Result<(u64, u64), anyhow::Error>> = JoinSet::new();

for block_number in block_set {
let semaphore = semaphore.clone();
let provider = self.provider.clone();
set.spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
debug!("Fetching block timestamp for block number: {}", block_number);
let block_timestamp = get_block_timestamp(&provider, block_number).await?;
Ok((block_number, block_timestamp))
});
}

while let Some(result) = set.join_next().await {
let (block_number, block_timestamp) = result??;
blocks.insert(block_number, block_timestamp);
}

debug!("Transactions: {}", &transactions.len());
debug!("Blocks: {}", &blocks.len());

Expand Down Expand Up @@ -535,13 +551,6 @@
Ok(())
}

async fn get_block_timestamp(&self, block_number: u64) -> Result<u64> {
match self.provider.get_block_with_tx_hashes(BlockId::Number(block_number)).await? {
MaybePendingBlockWithTxHashes::Block(block) => Ok(block.timestamp),
MaybePendingBlockWithTxHashes::PendingBlock(block) => Ok(block.timestamp),
}
}

async fn process_transaction_with_events(
&mut self,
transaction_hash: Felt,
Expand Down Expand Up @@ -761,6 +770,10 @@
let mut continuation_token = None;

loop {
debug!(
"Fetching events page with continuation token: {:?}, for contract: {:?}",

Check warning on line 774 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L774

Added line #L774 was not covered by tests
continuation_token, events_filter.address
);
let events_page = provider
.get_events(events_filter.clone(), continuation_token.clone(), events_chunk_size)
.await?;
Expand All @@ -775,3 +788,13 @@

Ok((events_filter.address, events_pages))
}

async fn get_block_timestamp<P>(provider: &P, block_number: u64) -> Result<u64>
where
P: Provider + Sync,
{
match provider.get_block_with_tx_hashes(BlockId::Number(block_number)).await? {
MaybePendingBlockWithTxHashes::Block(block) => Ok(block.timestamp),
MaybePendingBlockWithTxHashes::PendingBlock(block) => Ok(block.timestamp),

Check warning on line 798 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L798

Added line #L798 was not covered by tests
}
}
Loading