diff --git a/crates/torii/core/src/query_queue.rs b/crates/torii/core/src/query_queue.rs index 589035ca4e..5dfad77113 100644 --- a/crates/torii/core/src/query_queue.rs +++ b/crates/torii/core/src/query_queue.rs @@ -1,7 +1,7 @@ use std::collections::VecDeque; use anyhow::{Context, Result}; -use dojo_types::schema::Ty; +use dojo_types::schema::{Struct, Ty}; use sqlx::{FromRow, Pool, Sqlite}; use starknet::core::types::Felt; @@ -42,7 +42,7 @@ pub struct DeleteEntityQuery { pub entity_id: String, pub event_id: String, pub block_timestamp: String, - pub entity: Ty, + pub ty: Ty, } #[derive(Debug, Clone)] @@ -115,7 +115,8 @@ impl QueryQueue { .fetch_one(&mut *tx) .await?; let mut entity_updated = EntityUpdated::from_row(&row)?; - entity_updated.updated_model = Some(entity.entity); + entity_updated.updated_model = + Some(Ty::Struct(Struct { name: entity.ty.name(), children: vec![] })); let count = sqlx::query_scalar::<_, i64>( "SELECT count(*) FROM entity_model WHERE entity_id = ?", diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index ccca4f4c7d..249a3c4fef 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -361,7 +361,7 @@ impl Sql { entity_id: entity_id.clone(), event_id: event_id.to_string(), block_timestamp: utc_dt_string_from_timestamp(block_timestamp), - entity: entity.clone(), + ty: entity.clone(), }), ); diff --git a/crates/torii/grpc/src/server/mod.rs b/crates/torii/grpc/src/server/mod.rs index 3b497c6def..4dd245d03a 100644 --- a/crates/torii/grpc/src/server/mod.rs +++ b/crates/torii/grpc/src/server/mod.rs @@ -106,17 +106,11 @@ impl DojoWorld { Arc::clone(&state_diff_manager), )); - tokio::task::spawn(subscriptions::entity::Service::new( - pool.clone(), - Arc::clone(&entity_manager), - Arc::clone(&model_cache), - )); + tokio::task::spawn(subscriptions::entity::Service::new(Arc::clone(&entity_manager))); - tokio::task::spawn(subscriptions::event_message::Service::new( - pool.clone(), - Arc::clone(&event_message_manager), - Arc::clone(&model_cache), - )); + tokio::task::spawn(subscriptions::event_message::Service::new(Arc::clone( + &event_message_manager, + ))); tokio::task::spawn(subscriptions::event::Service::new(Arc::clone(&event_manager))); diff --git a/crates/torii/grpc/src/server/subscriptions/entity.rs b/crates/torii/grpc/src/server/subscriptions/entity.rs index 7c03d096a7..d7b03ae26e 100644 --- a/crates/torii/grpc/src/server/subscriptions/entity.rs +++ b/crates/torii/grpc/src/server/subscriptions/entity.rs @@ -8,13 +8,10 @@ use std::task::{Context, Poll}; use futures::Stream; use futures_util::StreamExt; use rand::Rng; -use sqlx::{Pool, Sqlite}; use starknet::core::types::Felt; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; -use torii_core::cache::ModelCache; use torii_core::error::{Error, ParseError}; -use torii_core::model::build_sql_query; use torii_core::simple_broker::SimpleBroker; use torii_core::sql::FELT_DELIMITER; use torii_core::types::Entity; @@ -22,7 +19,6 @@ use tracing::{error, trace}; use crate::proto; use crate::proto::world::SubscribeEntityResponse; -use crate::server::map_row_to_entity; use crate::types::{EntityKeysClause, PatternMatching}; pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::entity"; @@ -81,32 +77,16 @@ impl EntityManager { #[must_use = "Service does nothing unless polled"] #[allow(missing_debug_implementations)] pub struct Service { - pool: Pool, subs_manager: Arc, - model_cache: Arc, simple_broker: Pin + Send>>, } impl Service { - pub fn new( - pool: Pool, - subs_manager: Arc, - model_cache: Arc, - ) -> Self { - Self { - pool, - subs_manager, - model_cache, - simple_broker: Box::pin(SimpleBroker::::subscribe()), - } + pub fn new(subs_manager: Arc) -> Self { + Self { subs_manager, simple_broker: Box::pin(SimpleBroker::::subscribe()) } } - async fn publish_updates( - subs: Arc, - cache: Arc, - pool: Pool, - entity: &Entity, - ) -> Result<(), Error> { + async fn publish_updates(subs: Arc, entity: &Entity) -> Result<(), Error> { let mut closed_stream = Vec::new(); let hashed = Felt::from_str(&entity.id).map_err(ParseError::FromStr)?; let keys = entity @@ -204,41 +184,13 @@ impl Service { continue; } - let models_query = r#" - SELECT group_concat(entity_model.model_id) as model_ids - FROM entities - JOIN entity_model ON entities.id = entity_model.entity_id - WHERE entities.id = ? - GROUP BY entities.id - "#; - let (model_ids,): (String,) = - sqlx::query_as(models_query).bind(&entity.id).fetch_one(&pool).await?; - let model_ids: Vec = model_ids - .split(',') - .map(Felt::from_str) - .collect::>() - .map_err(ParseError::FromStr)?; - let schemas = cache.models(&model_ids).await?.into_iter().map(|m| m.schema).collect(); - - let (entity_query, arrays_queries, _) = build_sql_query( - &schemas, - "entities", - "entity_id", - Some("entities.id = ?"), - Some("entities.id = ?"), - None, - None, - )?; - - let row = sqlx::query(&entity_query).bind(&entity.id).fetch_one(&pool).await?; - let mut arrays_rows = HashMap::new(); - for (name, query) in arrays_queries { - let row = sqlx::query(&query).bind(&entity.id).fetch_all(&pool).await?; - arrays_rows.insert(name, row); - } - + // This should NEVER be None + let model = entity.updated_model.as_ref().unwrap().as_struct().unwrap().clone(); let resp = proto::world::SubscribeEntityResponse { - entity: Some(map_row_to_entity(&row, &arrays_rows, schemas.clone())?), + entity: Some(proto::types::Entity { + hashed_keys: hashed.to_bytes_be().to_vec(), + models: vec![model.into()], + }), subscription_id: *idx, }; @@ -264,10 +216,8 @@ impl Future for Service { while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) { let subs = Arc::clone(&pin.subs_manager); - let cache = Arc::clone(&pin.model_cache); - let pool = pin.pool.clone(); tokio::spawn(async move { - if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await { + if let Err(e) = Service::publish_updates(subs, &entity).await { error!(target = LOG_TARGET, error = %e, "Publishing entity update."); } }); diff --git a/crates/torii/grpc/src/server/subscriptions/event_message.rs b/crates/torii/grpc/src/server/subscriptions/event_message.rs index fabd5510aa..9bac36fb84 100644 --- a/crates/torii/grpc/src/server/subscriptions/event_message.rs +++ b/crates/torii/grpc/src/server/subscriptions/event_message.rs @@ -8,13 +8,10 @@ use std::task::{Context, Poll}; use futures::Stream; use futures_util::StreamExt; use rand::Rng; -use sqlx::{Pool, Sqlite}; use starknet::core::types::Felt; use tokio::sync::mpsc::{channel, Receiver}; use tokio::sync::RwLock; -use torii_core::cache::ModelCache; use torii_core::error::{Error, ParseError}; -use torii_core::model::build_sql_query; use torii_core::simple_broker::SimpleBroker; use torii_core::sql::FELT_DELIMITER; use torii_core::types::EventMessage; @@ -23,7 +20,6 @@ use tracing::{error, trace}; use super::entity::EntitiesSubscriber; use crate::proto; use crate::proto::world::SubscribeEntityResponse; -use crate::server::map_row_to_entity; use crate::types::{EntityKeysClause, PatternMatching}; pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::event_message"; @@ -75,30 +71,17 @@ impl EventMessageManager { #[must_use = "Service does nothing unless polled"] #[allow(missing_debug_implementations)] pub struct Service { - pool: Pool, subs_manager: Arc, - model_cache: Arc, simple_broker: Pin + Send>>, } impl Service { - pub fn new( - pool: Pool, - subs_manager: Arc, - model_cache: Arc, - ) -> Self { - Self { - pool, - subs_manager, - model_cache, - simple_broker: Box::pin(SimpleBroker::::subscribe()), - } + pub fn new(subs_manager: Arc) -> Self { + Self { subs_manager, simple_broker: Box::pin(SimpleBroker::::subscribe()) } } async fn publish_updates( subs: Arc, - cache: Arc, - pool: Pool, entity: &EventMessage, ) -> Result<(), Error> { let mut closed_stream = Vec::new(); @@ -182,42 +165,13 @@ impl Service { continue; } - // publish all updates if ids is empty or only ids that are subscribed to - let models_query = r#" - SELECT group_concat(event_model.model_id) as model_ids - FROM event_messages - JOIN event_model ON event_messages.id = event_model.entity_id - WHERE event_messages.id = ? - GROUP BY event_messages.id - "#; - let (model_ids,): (String,) = - sqlx::query_as(models_query).bind(&entity.id).fetch_one(&pool).await?; - let model_ids: Vec = model_ids - .split(',') - .map(Felt::from_str) - .collect::>() - .map_err(ParseError::FromStr)?; - let schemas = cache.models(&model_ids).await?.into_iter().map(|m| m.schema).collect(); - - let (entity_query, arrays_queries, _) = build_sql_query( - &schemas, - "event_messages", - "event_message_id", - Some("event_messages.id = ?"), - Some("event_messages.id = ?"), - None, - None, - )?; - - let row = sqlx::query(&entity_query).bind(&entity.id).fetch_one(&pool).await?; - let mut arrays_rows = HashMap::new(); - for (name, query) in arrays_queries { - let rows = sqlx::query(&query).bind(&entity.id).fetch_all(&pool).await?; - arrays_rows.insert(name, rows); - } - + // This should NEVER be None + let model = entity.updated_model.as_ref().unwrap().as_struct().unwrap().clone(); let resp = proto::world::SubscribeEntityResponse { - entity: Some(map_row_to_entity(&row, &arrays_rows, schemas.clone())?), + entity: Some(proto::types::Entity { + hashed_keys: hashed.to_bytes_be().to_vec(), + models: vec![model.into()], + }), subscription_id: *idx, }; @@ -243,10 +197,8 @@ impl Future for Service { while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) { let subs = Arc::clone(&pin.subs_manager); - let cache = Arc::clone(&pin.model_cache); - let pool = pin.pool.clone(); tokio::spawn(async move { - if let Err(e) = Service::publish_updates(subs, cache, pool, &entity).await { + if let Err(e) = Service::publish_updates(subs, &entity).await { error!(target = LOG_TARGET, error = %e, "Publishing entity update."); } });