Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii-core): bitflags for indexing #2450

Merged
merged 3 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 19 additions & 2 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
use tokio::sync::broadcast;
use tokio::sync::broadcast::Sender;
use tokio_stream::StreamExt;
use torii_core::engine::{Engine, EngineConfig, Processors};
use torii_core::engine::{Engine, EngineConfig, IndexingFlags, Processors};
use torii_core::processors::event_message::EventMessageProcessor;
use torii_core::processors::generate_event_processors_map;
use torii_core::processors::metadata_update::MetadataUpdateProcessor;
Expand Down Expand Up @@ -132,6 +132,14 @@
/// Max concurrent tasks
#[arg(long, default_value = "100")]
max_concurrent_tasks: usize,

/// Whether or not to index world transactions
#[arg(long, action = ArgAction::Set, default_value_t = false)]
index_transactions: bool,

Check warning on line 138 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L137-L138

Added lines #L137 - L138 were not covered by tests

/// Whether or not to index raw events
#[arg(long, action = ArgAction::Set, default_value_t = true)]
index_raw_events: bool,

Check warning on line 142 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L141-L142

Added lines #L141 - L142 were not covered by tests
}

#[tokio::main]
Expand Down Expand Up @@ -195,7 +203,15 @@

let (block_tx, block_rx) = tokio::sync::mpsc::channel(100);

let mut engine = Engine::new(
let mut flags = IndexingFlags::empty();
if args.index_transactions {
flags.insert(IndexingFlags::TRANSACTIONS);
}
if args.index_raw_events {
flags.insert(IndexingFlags::RAW_EVENTS);
}

let mut engine: Engine<Arc<JsonRpcClient<HttpTransport>>> = Engine::new(

Check warning on line 214 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L206-L214

Added lines #L206 - L214 were not covered by tests
world,
db.clone(),
provider.clone(),
Expand All @@ -206,6 +222,7 @@
events_chunk_size: args.events_chunk_size,
index_pending: args.index_pending,
polling_interval: Duration::from_millis(args.polling_interval),
flags,

Check warning on line 225 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L225

Added line #L225 was not covered by tests
},
shutdown_tx.clone(),
Some(block_tx),
Expand Down
2 changes: 2 additions & 0 deletions crates/torii/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ tokio = { version = "1.32.0", features = [ "sync" ], default-features = true }
tokio-stream = "0.1.11"
tokio-util = "0.7.7"
tracing.workspace = true
clap.workspace = true
bitflags = "2.6.0"

[dev-dependencies]
camino.workspace = true
Expand Down
119 changes: 70 additions & 49 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
use std::time::Duration;

use anyhow::Result;
use bitflags::bitflags;
use dojo_world::contracts::world::WorldContractReader;
use hashlink::LinkedHashMap;
use starknet::core::types::{
BlockId, BlockTag, EmittedEvent, Event, EventFilter, Felt, MaybePendingBlockWithReceipts,
MaybePendingBlockWithTxHashes, PendingBlockWithReceipts, ReceiptBlock, TransactionReceipt,
TransactionReceiptWithBlockInfo, TransactionWithReceipt,
MaybePendingBlockWithTxHashes, PendingBlockWithReceipts, ReceiptBlock, Transaction,
TransactionReceipt, TransactionReceiptWithBlockInfo, TransactionWithReceipt,
};
use starknet::providers::Provider;
use tokio::sync::broadcast::Sender;
Expand Down Expand Up @@ -46,13 +47,22 @@
pub(crate) const LOG_TARGET: &str = "torii_core::engine";
pub const QUERY_QUEUE_BATCH_SIZE: usize = 1000;

bitflags! {
#[derive(Debug, Clone)]
pub struct IndexingFlags: u32 {
const TRANSACTIONS = 0b00000001;
const RAW_EVENTS = 0b00000010;
}
}

#[derive(Debug)]
pub struct EngineConfig {
pub polling_interval: Duration,
pub start_block: u64,
pub events_chunk_size: u64,
pub index_pending: bool,
pub max_concurrent_tasks: usize,
pub flags: IndexingFlags,
}

impl Default for EngineConfig {
Expand All @@ -63,6 +73,7 @@
events_chunk_size: 1024,
index_pending: true,
max_concurrent_tasks: 100,
flags: IndexingFlags::empty(),
}
}
}
Expand Down Expand Up @@ -447,13 +458,18 @@
for ((block_number, transaction_hash), events) in data.transactions {
debug!("Processing transaction hash: {:#x}", transaction_hash);
// Process transaction
// let transaction = self.provider.get_transaction_by_hash(transaction_hash).await?;
let transaction = if self.config.flags.contains(IndexingFlags::TRANSACTIONS) {
Some(self.provider.get_transaction_by_hash(transaction_hash).await?)

Check warning on line 462 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L462

Added line #L462 was not covered by tests
} else {
None
};

self.process_transaction_with_events(
transaction_hash,
events.as_slice(),
block_number,
data.blocks[&block_number],
transaction,
)
.await?;

Expand Down Expand Up @@ -537,6 +553,7 @@
events: &[EmittedEvent],
block_number: u64,
block_timestamp: u64,
transaction: Option<Transaction>,
) -> Result<()> {
for (event_idx, event) in events.iter().enumerate() {
let event_id =
Expand All @@ -553,24 +570,25 @@
block_timestamp,
&event_id,
&event,
// transaction_hash,
transaction_hash,
)
.await?;
}

// Commented out this transaction processor because it requires an RPC call for each
// transaction which is slowing down the sync process by alot.
// Self::process_transaction(
// self,
// block_number,
// block_timestamp,
// transaction_hash,
// transaction,
// )
// .await?;
if let Some(ref transaction) = transaction {
Self::process_transaction(
self,
block_number,
block_timestamp,
transaction_hash,
transaction,
)
.await?;

Check warning on line 586 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L579-L586

Added lines #L579 - L586 were not covered by tests
}

Ok(())
}

// Process a transaction and events from its receipt.
// Returns whether the transaction has a world event.
async fn process_transaction_with_receipt(
Expand Down Expand Up @@ -603,21 +621,21 @@
block_timestamp,
&event_id,
event,
// *transaction_hash,
*transaction_hash,

Check warning on line 624 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L624

Added line #L624 was not covered by tests
)
.await?;
}

// if world_event {
// Self::process_transaction(
// self,
// block_number,
// block_timestamp,
// transaction_hash,
// transaction,
// )
// .await?;
// }
if world_event && self.config.flags.contains(IndexingFlags::TRANSACTIONS) {
Self::process_transaction(
self,
block_number,
block_timestamp,
*transaction_hash,
&transaction_with_receipt.transaction,
)
.await?;
}

Check warning on line 638 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L629-L638

Added lines #L629 - L638 were not covered by tests
}

Ok(world_event)
Expand All @@ -634,38 +652,41 @@
Ok(())
}

// async fn process_transaction(
// &mut self,
// block_number: u64,
// block_timestamp: u64,
// transaction_hash: Felt,
// transaction: &Transaction,
// ) -> Result<()> {
// for processor in &self.processors.transaction {
// processor
// .process(
// &mut self.db,
// self.provider.as_ref(),
// block_number,
// block_timestamp,
// transaction_hash,
// transaction,
// )
// .await?
// }

// Ok(())
// }
async fn process_transaction(
&mut self,
block_number: u64,
block_timestamp: u64,
transaction_hash: Felt,
transaction: &Transaction,
) -> Result<()> {
for processor in &self.processors.transaction {
processor
.process(
&mut self.db,
self.provider.as_ref(),
block_number,
block_timestamp,
transaction_hash,
transaction,
)
.await?

Check warning on line 672 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L655-L672

Added lines #L655 - L672 were not covered by tests
}

Ok(())
}

Check warning on line 676 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L675-L676

Added lines #L675 - L676 were not covered by tests

async fn process_event(
&mut self,
block_number: u64,
block_timestamp: u64,
event_id: &str,
event: &Event,
// transaction_hash: Felt,
transaction_hash: Felt,
) -> Result<()> {
// self.db.store_event(event_id, event, transaction_hash, block_timestamp);
if self.config.flags.contains(IndexingFlags::RAW_EVENTS) {
self.db.store_event(event_id, event, transaction_hash, block_timestamp);

Check warning on line 687 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L687

Added line #L687 was not covered by tests
}

let event_key = event.keys[0];

let Some(processor) = self.processors.event.get(&event_key) else {
Expand Down
Loading