Skip to content

Commit

Permalink
opt(torii-core): move off queryqueue for executing tx
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Sep 20, 2024
1 parent 8b7d6b2 commit 01ce338
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::VecDeque;

use std::sync::Arc;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use anyhow::{Context, Result};
use dojo_types::schema::Ty;
use sqlx::{FromRow, Pool, Sqlite};
Expand Down Expand Up @@ -52,28 +53,29 @@ pub enum QueryType {
Other,
}

impl QueryQueue {
pub fn new(pool: Pool<Sqlite>) -> Self {
QueryQueue { pool, queue: VecDeque::new(), publish_queue: VecDeque::new() }
}
pub struct TxExecutor {
pool: Pool<Sqlite>,
rx: Receiver<QueryMessage>,
}

pub fn enqueue<S: Into<String>>(
&mut self,
statement: S,
arguments: Vec<Argument>,
query_type: QueryType,
) {
self.queue.push_back((statement.into(), arguments, query_type));
}
pub struct QueryMessage {
statement: String,
arguments: Vec<Argument>,
query_type: QueryType,
}

pub fn push_publish(&mut self, value: BrokerMessage) {
self.publish_queue.push_back(value);
impl TxExecutor {
pub fn new(pool: Pool<Sqlite>) -> (Self, Sender<QueryMessage>) {
let (tx, rx) = channel(100); // Adjust buffer size as needed
(TxExecutor { pool, rx }, tx)
}

pub async fn execute_all(&mut self) -> Result<()> {
pub async fn run(&mut self) -> Result<()> {
let mut tx = self.pool.begin().await?;
let mut publish_queue = Vec::new();

while let Some((statement, arguments, query_type)) = self.queue.pop_front() {
while let Some(msg) = self.rx.recv().await {
let QueryMessage { statement, arguments, query_type } = msg;
let mut query = sqlx::query(&statement);

for arg in &arguments {
Expand All @@ -95,7 +97,7 @@ impl QueryQueue {
entity_updated.updated_model = Some(entity);
entity_updated.deleted = false;
let broker_message = BrokerMessage::EntityUpdated(entity_updated);
self.push_publish(broker_message);
publish_queue.push(broker_message);
}
QueryType::DeleteEntity(entity) => {
let delete_model = query.execute(&mut *tx).await.with_context(|| {
Expand Down Expand Up @@ -134,7 +136,7 @@ impl QueryQueue {
}

let broker_message = BrokerMessage::EntityUpdated(entity_updated);
self.push_publish(broker_message);
publish_queue.push(broker_message);
}
QueryType::Other => {
query.execute(&mut *tx).await.with_context(|| {
Expand All @@ -146,7 +148,7 @@ impl QueryQueue {

tx.commit().await?;

while let Some(message) = self.publish_queue.pop_front() {
for message in publish_queue {
send_broker_message(message);
}

Expand Down
2 changes: 1 addition & 1 deletion crates/torii/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub mod engine;
pub mod error;
pub mod model;
pub mod processors;
pub mod query_queue;
pub mod executor;
pub mod simple_broker;
pub mod sql;
pub mod types;
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use starknet_crypto::poseidon_hash_many;
use tracing::{debug, warn};

use crate::cache::{Model, ModelCache};
use crate::query_queue::{Argument, BrokerMessage, DeleteEntityQuery, QueryQueue, QueryType};
use crate::executor::{Argument, BrokerMessage, DeleteEntityQuery, QueryQueue, QueryType};
use crate::types::{
Event as EventEmitted, EventMessage as EventMessageUpdated, Model as ModelRegistered,
};
Expand Down

0 comments on commit 01ce338

Please sign in to comment.