From 01ce3385fe3fb82198ff96b147b849cc729f5f5b Mon Sep 17 00:00:00 2001 From: Nasr Date: Fri, 20 Sep 2024 14:48:18 -0400 Subject: [PATCH] opt(torii-core): move off queryqueue for executing tx --- .../core/src/{query_queue.rs => executor.rs} | 42 ++++++++++--------- crates/torii/core/src/lib.rs | 2 +- crates/torii/core/src/sql.rs | 2 +- 3 files changed, 24 insertions(+), 22 deletions(-) rename crates/torii/core/src/{query_queue.rs => executor.rs} (85%) diff --git a/crates/torii/core/src/query_queue.rs b/crates/torii/core/src/executor.rs similarity index 85% rename from crates/torii/core/src/query_queue.rs rename to crates/torii/core/src/executor.rs index 589035ca4e..f854b69dd3 100644 --- a/crates/torii/core/src/query_queue.rs +++ b/crates/torii/core/src/executor.rs @@ -1,5 +1,6 @@ use std::collections::VecDeque; - +use std::sync::Arc; +use tokio::sync::mpsc::{channel, Receiver, Sender}; use anyhow::{Context, Result}; use dojo_types::schema::Ty; use sqlx::{FromRow, Pool, Sqlite}; @@ -52,28 +53,29 @@ pub enum QueryType { Other, } -impl QueryQueue { - pub fn new(pool: Pool) -> Self { - QueryQueue { pool, queue: VecDeque::new(), publish_queue: VecDeque::new() } - } +pub struct TxExecutor { + pool: Pool, + rx: Receiver, +} - pub fn enqueue>( - &mut self, - statement: S, - arguments: Vec, - query_type: QueryType, - ) { - self.queue.push_back((statement.into(), arguments, query_type)); - } +pub struct QueryMessage { + statement: String, + arguments: Vec, + query_type: QueryType, +} - pub fn push_publish(&mut self, value: BrokerMessage) { - self.publish_queue.push_back(value); +impl TxExecutor { + pub fn new(pool: Pool) -> (Self, Sender) { + let (tx, rx) = channel(100); // Adjust buffer size as needed + (TxExecutor { pool, rx }, tx) } - pub async fn execute_all(&mut self) -> Result<()> { + pub async fn run(&mut self) -> Result<()> { let mut tx = self.pool.begin().await?; + let mut publish_queue = Vec::new(); - while let Some((statement, arguments, query_type)) = self.queue.pop_front() { + while let Some(msg) = self.rx.recv().await { + let QueryMessage { statement, arguments, query_type } = msg; let mut query = sqlx::query(&statement); for arg in &arguments { @@ -95,7 +97,7 @@ impl QueryQueue { entity_updated.updated_model = Some(entity); entity_updated.deleted = false; let broker_message = BrokerMessage::EntityUpdated(entity_updated); - self.push_publish(broker_message); + publish_queue.push(broker_message); } QueryType::DeleteEntity(entity) => { let delete_model = query.execute(&mut *tx).await.with_context(|| { @@ -134,7 +136,7 @@ impl QueryQueue { } let broker_message = BrokerMessage::EntityUpdated(entity_updated); - self.push_publish(broker_message); + publish_queue.push(broker_message); } QueryType::Other => { query.execute(&mut *tx).await.with_context(|| { @@ -146,7 +148,7 @@ impl QueryQueue { tx.commit().await?; - while let Some(message) = self.publish_queue.pop_front() { + for message in publish_queue { send_broker_message(message); } diff --git a/crates/torii/core/src/lib.rs b/crates/torii/core/src/lib.rs index df6e8b3adc..d47e9bf71f 100644 --- a/crates/torii/core/src/lib.rs +++ b/crates/torii/core/src/lib.rs @@ -3,7 +3,7 @@ pub mod engine; pub mod error; pub mod model; pub mod processors; -pub mod query_queue; +pub mod executor; pub mod simple_broker; pub mod sql; pub mod types; diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index ccca4f4c7d..2e2574d1cd 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -16,7 +16,7 @@ use starknet_crypto::poseidon_hash_many; use tracing::{debug, warn}; use crate::cache::{Model, ModelCache}; -use crate::query_queue::{Argument, BrokerMessage, DeleteEntityQuery, QueryQueue, QueryType}; +use crate::executor::{Argument, BrokerMessage, DeleteEntityQuery, QueryQueue, QueryType}; use crate::types::{ Event as EventEmitted, EventMessage as EventMessageUpdated, Model as ModelRegistered, };